added fine_value, still not finished yet
This commit is contained in:
parent
f3793804bf
commit
790fb3540d
@ -51,6 +51,8 @@ public class Node {
|
||||
|
||||
private Map<Identifier, List<Ack>> rpcs = new HashMap<Identifier, List<Ack>>();
|
||||
private Map<Identifier, Identifier> values = new HashMap<Identifier, Identifier>();
|
||||
|
||||
private Identifier searchID = null;
|
||||
|
||||
private Thread thread;
|
||||
private UDPHandler udpListen;
|
||||
@ -136,6 +138,9 @@ public class Node {
|
||||
}
|
||||
|
||||
void sendFindValue(NodeIdentifier receiver, Identifier idToFind) {
|
||||
// need to save the fileID because we need it for future searches
|
||||
this.searchID = idToFind;
|
||||
|
||||
boolean successful = send(receiver, MessageType.FIND_VALUE,
|
||||
idToFind.getBytes(), true, null);
|
||||
|
||||
@ -146,7 +151,7 @@ public class Node {
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets all nodes of this nodes routing table, that a close to a given node
|
||||
* Gets all nodes of this nodes routing table, that are close to a given node/fileID
|
||||
* and sends that list to a specific node.
|
||||
*
|
||||
* @param receiver
|
||||
@ -156,6 +161,8 @@ public class Node {
|
||||
* @param rpcID
|
||||
* An RPC ID (because this is always an answer to a FIND_NODE
|
||||
* RPC)
|
||||
* @param nodeType
|
||||
* If true, we search a specific node, else a fileID
|
||||
*/
|
||||
void sendClosestNodesTo(NodeIdentifier receiver, Identifier idToFind, Identifier rpcID, boolean nodeType) {
|
||||
byte msgtype = 0;
|
||||
@ -356,6 +363,10 @@ public class Node {
|
||||
public boolean hasKey(Identifier key) {
|
||||
return values.containsKey(key);
|
||||
}
|
||||
|
||||
public Identifier getSearchID() {
|
||||
return this.searchID;
|
||||
}
|
||||
|
||||
public boolean receivedRPC(NodeIdentifier fromID, Identifier rpcID) {
|
||||
List<Ack> rpcsFromID = rpcs.get(rpcID);
|
||||
|
||||
@ -9,256 +9,278 @@ import java.util.logging.Logger;
|
||||
import message.MessageType;
|
||||
|
||||
public class UDPHandler implements Runnable {
|
||||
private final static Logger LOGGER = Logger.getLogger(UDPHandler.class.getName());
|
||||
private final static Logger LOGGER = Logger.getLogger(UDPHandler.class
|
||||
.getName());
|
||||
|
||||
public static final int BUF_SIZE = 512;
|
||||
public static final int BUF_SIZE = 512;
|
||||
|
||||
private volatile boolean running = true;
|
||||
private ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
|
||||
private volatile boolean running = true;
|
||||
private ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
|
||||
|
||||
private Node node;
|
||||
private Node node;
|
||||
|
||||
public UDPHandler(Node node) {
|
||||
this.node = node;
|
||||
}
|
||||
public UDPHandler(Node node) {
|
||||
this.node = node;
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes the buffer of this UDPHandler as is and tries to read an IP address
|
||||
* (4 bytes and 1 int) from it. If there is no/incomplete or wrong data,
|
||||
* this will fail.
|
||||
*
|
||||
* @return the address that has been read
|
||||
*/
|
||||
private InetSocketAddress getIPFromBuffer() {
|
||||
StringBuilder theAddr = new StringBuilder();
|
||||
// Read 4 Bytes and 1 Integer = 1 IP address
|
||||
for (int i = 0; i < 4; i++) {
|
||||
theAddr.append(buffer.get());
|
||||
if (i < 3) {
|
||||
theAddr.append(".");
|
||||
}
|
||||
}
|
||||
int port = buffer.getInt();
|
||||
return new InetSocketAddress(theAddr.toString(), port);
|
||||
}
|
||||
/**
|
||||
* Takes the buffer of this UDPHandler as is and tries to read an IP address
|
||||
* (4 bytes and 1 int) from it. If there is no/incomplete or wrong data,
|
||||
* this will fail.
|
||||
*
|
||||
* @return the address that has been read
|
||||
*/
|
||||
private InetSocketAddress getIPFromBuffer() {
|
||||
StringBuilder theAddr = new StringBuilder();
|
||||
// Read 4 Bytes and 1 Integer = 1 IP address
|
||||
for (int i = 0; i < 4; i++) {
|
||||
theAddr.append(buffer.get());
|
||||
if (i < 3) {
|
||||
theAddr.append(".");
|
||||
}
|
||||
}
|
||||
int port = buffer.getInt();
|
||||
return new InetSocketAddress(theAddr.toString(), port);
|
||||
}
|
||||
|
||||
private Identifier getIDFromBuffer() {
|
||||
int numBytes = Node.ID_BITS / 8;
|
||||
byte[] result = new byte[numBytes];
|
||||
for (int i = 0; i < numBytes; i++) {
|
||||
result[i] = buffer.get();
|
||||
}
|
||||
return new Identifier(Node.ID_BITS, result);
|
||||
}
|
||||
private Identifier getIDFromBuffer() {
|
||||
int numBytes = Node.ID_BITS / 8;
|
||||
byte[] result = new byte[numBytes];
|
||||
for (int i = 0; i < numBytes; i++) {
|
||||
result[i] = buffer.get();
|
||||
}
|
||||
return new Identifier(Node.ID_BITS, result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a triple <IP address, port, id> from the channel and returns a
|
||||
* {@link node.NodeIdentifier}.
|
||||
*
|
||||
* @return the read node ID
|
||||
*/
|
||||
private NodeIdentifier getNodeTripleFromBuffer() {
|
||||
InetSocketAddress address = getIPFromBuffer();
|
||||
/**
|
||||
* Reads a triple <IP address, port, id> from the channel and returns a
|
||||
* {@link node.NodeIdentifier}.
|
||||
*
|
||||
* @return the read node ID
|
||||
*/
|
||||
private NodeIdentifier getNodeTripleFromBuffer() {
|
||||
InetSocketAddress address = getIPFromBuffer();
|
||||
|
||||
int numBytes = Node.ID_BITS / 8;
|
||||
byte[] result = new byte[numBytes];
|
||||
for (int i = 0; i < numBytes; i++) {
|
||||
result[i] = buffer.get();
|
||||
}
|
||||
return new NodeIdentifier(Node.ID_BITS, result, address);
|
||||
}
|
||||
int numBytes = Node.ID_BITS / 8;
|
||||
byte[] result = new byte[numBytes];
|
||||
for (int i = 0; i < numBytes; i++) {
|
||||
result[i] = buffer.get();
|
||||
}
|
||||
return new NodeIdentifier(Node.ID_BITS, result, address);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
InetSocketAddress from = null;
|
||||
public void run() {
|
||||
InetSocketAddress from = null;
|
||||
|
||||
// Run until it gets killed, and all my Acks have been answered
|
||||
while (running || node.hasAcks()) {
|
||||
try {
|
||||
// Flag that indicates whether the routing table should be
|
||||
// updated with the node we just received a message from. This
|
||||
// needs to be done, because some messages trigger a direct
|
||||
// answer. For example we send a PING to a node. That node
|
||||
// answers with a PONG. Because we received a message from that
|
||||
// node we will update our routing table and see that we already
|
||||
// know this node. So we will PING that node...
|
||||
boolean updateRT = true;
|
||||
// Run until it gets killed, and all my Acks have been answered
|
||||
while (running || node.hasAcks()) {
|
||||
try {
|
||||
// Flag that indicates whether the routing table should be
|
||||
// updated with the node we just received a message from. This
|
||||
// needs to be done, because some messages trigger a direct
|
||||
// answer. For example we send a PING to a node. That node
|
||||
// answers with a PONG. Because we received a message from that
|
||||
// node we will update our routing table and see that we already
|
||||
// know this node. So we will PING that node...
|
||||
boolean updateRT = true;
|
||||
|
||||
// The address of the node that sent this message
|
||||
from = (InetSocketAddress) node.getChannel().receive(buffer);
|
||||
// The address of the node that sent this message
|
||||
from = (InetSocketAddress) node.getChannel().receive(buffer);
|
||||
|
||||
// channel.receive() is non-blocking. So we need to check if
|
||||
// something actually has been written to the buffer
|
||||
if (buffer.remaining() != BUF_SIZE) {
|
||||
buffer.flip();
|
||||
// channel.receive() is non-blocking. So we need to check if
|
||||
// something actually has been written to the buffer
|
||||
if (buffer.remaining() != BUF_SIZE) {
|
||||
buffer.flip();
|
||||
|
||||
byte messageType = buffer.get();
|
||||
byte messageType = buffer.get();
|
||||
|
||||
NodeIdentifier fromID = new NodeIdentifier(Node.ID_BITS,
|
||||
getIDFromBuffer().getBytes(), from);
|
||||
NodeIdentifier fromID = new NodeIdentifier(Node.ID_BITS,
|
||||
getIDFromBuffer().getBytes(), from);
|
||||
|
||||
Identifier rpcID = getIDFromBuffer();
|
||||
Identifier rpcID = getIDFromBuffer();
|
||||
|
||||
switch (messageType) {
|
||||
case MessageType.FIND_NODE:
|
||||
receiveFindNode(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.NODES:
|
||||
receiveNodes(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.PING:
|
||||
updateRT = false;
|
||||
receivePing(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.PONG:
|
||||
updateRT = false;
|
||||
receivePong(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.LEAVE:
|
||||
// We don't have to do anything here because, after this
|
||||
// switch block we call node.updateBuckets(...) which
|
||||
// will try to ping the node we received this leave
|
||||
// message from. That node will not answered because it
|
||||
// directly shut down after sending the leave message.
|
||||
// So the node will be removed from this routing table.
|
||||
LOGGER.log(Level.INFO, "Received leave from {0}",
|
||||
new Object[] { from.toString() });
|
||||
break;
|
||||
case MessageType.FIND_VALUE:
|
||||
receiveFindValue(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.STORE:
|
||||
receiveStore(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.DATA:
|
||||
receiveData(fromID, rpcID);
|
||||
LOGGER.log(Level.INFO, "Received DATA from {0}",
|
||||
new Object[] { from.toString() });
|
||||
break;
|
||||
default:
|
||||
LOGGER.log(Level.INFO,
|
||||
"Received unknown command from {0}: [{1}]{2}",
|
||||
new Object[] { from.toString(), messageType,
|
||||
new String(buffer.array()) });
|
||||
}
|
||||
switch (messageType) {
|
||||
case MessageType.FIND_NODE:
|
||||
receiveFindNode(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.NODES:
|
||||
receiveNodes(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.PING:
|
||||
updateRT = false;
|
||||
receivePing(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.PONG:
|
||||
updateRT = false;
|
||||
receivePong(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.LEAVE:
|
||||
// We don't have to do anything here because, after this
|
||||
// switch block we call node.updateBuckets(...) which
|
||||
// will try to ping the node we received this leave
|
||||
// message from. That node will not answered because it
|
||||
// directly shut down after sending the leave message.
|
||||
// So the node will be removed from this routing table.
|
||||
LOGGER.log(Level.INFO, "Received leave from {0}",
|
||||
new Object[] { from.toString() });
|
||||
break;
|
||||
case MessageType.FIND_VALUE:
|
||||
receiveFindValue(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.VALUE_NODES:
|
||||
receiveValueNodes(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.STORE:
|
||||
receiveStore(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.DATA:
|
||||
receiveData(fromID, rpcID);
|
||||
LOGGER.log(Level.INFO, "Received DATA from {0}",
|
||||
new Object[] { from.toString() });
|
||||
break;
|
||||
default:
|
||||
LOGGER.log(Level.INFO,
|
||||
"Received unknown command from {0}: [{1}]{2}",
|
||||
new Object[] { from.toString(), messageType,
|
||||
new String(buffer.array()) });
|
||||
}
|
||||
|
||||
if (updateRT) {
|
||||
node.updateBuckets(new NodeIdentifier(Node.ID_BITS,
|
||||
fromID.getBytes(), from));
|
||||
}
|
||||
if (updateRT) {
|
||||
node.updateBuckets(new NodeIdentifier(Node.ID_BITS,
|
||||
fromID.getBytes(), from));
|
||||
}
|
||||
|
||||
} else {
|
||||
// If nothing has been read/received wait and read/receive
|
||||
// again
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// If nothing has been read/received wait and read/receive
|
||||
// again
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
buffer.clear();
|
||||
buffer.clear();
|
||||
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
private void receiveData(NodeIdentifier fromID, Identifier rpcID) {
|
||||
private void receiveValueNodes(NodeIdentifier fromID, Identifier rpcID) {
|
||||
int numReceived = 0;
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
// This is just for the log message
|
||||
StringBuilder nodes = new StringBuilder();
|
||||
|
||||
while (buffer.hasRemaining()) {
|
||||
sb.append(buffer.get());
|
||||
}
|
||||
|
||||
String data = sb.toString();
|
||||
|
||||
String parts[] = data.split("-");
|
||||
|
||||
|
||||
|
||||
while (buffer.hasRemaining()) {
|
||||
NodeIdentifier newID = getNodeTripleFromBuffer();
|
||||
node.sendFindValue(newID, node.getSearchID());
|
||||
nodes.append(newID).append(", ");
|
||||
numReceived++;
|
||||
}
|
||||
|
||||
// This should be the answer to a prior FIND_NODE -> mark this RPC ID as
|
||||
// received
|
||||
node.receivedRPC(fromID, rpcID);
|
||||
// This should be the answer to a prior FIND_NODE -> mark this RPC ID as
|
||||
// received
|
||||
node.receivedRPC(fromID, rpcID);
|
||||
|
||||
LOGGER.log(Level.INFO,
|
||||
"Received {0} [VALUE NODES] [{1}] from Node {2})",
|
||||
new Object[] { numReceived, nodes.toString(), fromID });
|
||||
}
|
||||
|
||||
private void receiveData(NodeIdentifier fromID, Identifier rpcID) {
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
while (buffer.hasRemaining()) {
|
||||
sb.append(buffer.get());
|
||||
}
|
||||
|
||||
String data = sb.toString();
|
||||
|
||||
String parts[] = data.split("-");
|
||||
|
||||
// This should be the answer to a prior FIND_NODE -> mark this RPC ID as
|
||||
// received
|
||||
node.receivedRPC(fromID, rpcID);
|
||||
|
||||
LOGGER.log(Level.INFO, "Received [DATA] [{0}] from Node {1})",
|
||||
new Object[] { data.toString(), fromID });
|
||||
|
||||
LOGGER.log(Level.INFO, "Received [DATA] [{0}] from Node {1})",
|
||||
new Object[] { data.toString(), fromID });
|
||||
|
||||
}
|
||||
|
||||
private void receivePong(NodeIdentifier fromID, Identifier rpcID) {
|
||||
LOGGER.log(Level.INFO, "Received [PONG] from {0}",
|
||||
new Object[] { fromID });
|
||||
LOGGER.log(Level.INFO, "Received [PONG] from {0}",
|
||||
new Object[] { fromID });
|
||||
|
||||
// This should be the answer to a prior PING -> mark this RPC ID as
|
||||
// received
|
||||
node.receivedRPC(fromID, rpcID);
|
||||
}
|
||||
// This should be the answer to a prior PING -> mark this RPC ID as
|
||||
// received
|
||||
node.receivedRPC(fromID, rpcID);
|
||||
}
|
||||
|
||||
private void receivePing(NodeIdentifier fromID, Identifier rpcID) {
|
||||
LOGGER.log(Level.INFO, "Received [PING] from {0}",
|
||||
new Object[] { fromID });
|
||||
node.sendPong(fromID, rpcID);
|
||||
}
|
||||
private void receivePing(NodeIdentifier fromID, Identifier rpcID) {
|
||||
LOGGER.log(Level.INFO, "Received [PING] from {0}",
|
||||
new Object[] { fromID });
|
||||
node.sendPong(fromID, rpcID);
|
||||
}
|
||||
|
||||
private void receiveNodes(NodeIdentifier fromID, Identifier rpcID) {
|
||||
private void receiveNodes(NodeIdentifier fromID, Identifier rpcID) {
|
||||
|
||||
int numReceived = 0;
|
||||
int numReceived = 0;
|
||||
|
||||
// This is just for the log message
|
||||
StringBuilder nodes = new StringBuilder();
|
||||
// This is just for the log message
|
||||
StringBuilder nodes = new StringBuilder();
|
||||
|
||||
while (buffer.hasRemaining()) {
|
||||
NodeIdentifier newID = getNodeTripleFromBuffer();
|
||||
node.updateBuckets(newID);
|
||||
nodes.append(newID).append(", ");
|
||||
numReceived++;
|
||||
}
|
||||
while (buffer.hasRemaining()) {
|
||||
NodeIdentifier newID = getNodeTripleFromBuffer();
|
||||
node.updateBuckets(newID);
|
||||
nodes.append(newID).append(", ");
|
||||
numReceived++;
|
||||
}
|
||||
|
||||
// This should be the answer to a prior FIND_NODE -> mark this RPC ID as
|
||||
// received
|
||||
node.receivedRPC(fromID, rpcID);
|
||||
// This should be the answer to a prior FIND_NODE -> mark this RPC ID as
|
||||
// received
|
||||
node.receivedRPC(fromID, rpcID);
|
||||
|
||||
LOGGER.log(Level.INFO, "Received {0} [NODES] [{1}] from Node {2})",
|
||||
new Object[] { numReceived, nodes.toString(), fromID });
|
||||
}
|
||||
LOGGER.log(Level.INFO, "Received {0} [NODES] [{1}] from Node {2})",
|
||||
new Object[] { numReceived, nodes.toString(), fromID });
|
||||
}
|
||||
|
||||
private void receiveFindNode(NodeIdentifier fromID, Identifier rpc_id) {
|
||||
Identifier idToFind = getIDFromBuffer();
|
||||
private void receiveFindNode(NodeIdentifier fromID, Identifier rpc_id) {
|
||||
Identifier idToFind = getIDFromBuffer();
|
||||
|
||||
LOGGER.log(Level.INFO, "Received [FIND_NODE {0}] from Node {1}",
|
||||
new Object[] { idToFind, fromID });
|
||||
LOGGER.log(Level.INFO, "Received [FIND_NODE {0}] from Node {1}",
|
||||
new Object[] { idToFind, fromID });
|
||||
|
||||
node.sendClosestNodesTo(fromID, idToFind, rpc_id, true);
|
||||
}
|
||||
|
||||
private void receiveStore(NodeIdentifier fromID, Identifier rpcID) {
|
||||
Identifier fileID = getIDFromBuffer();
|
||||
|
||||
LOGGER.log(Level.INFO, "Received [STORE {0}] from Node {1}",
|
||||
new Object[] { fileID, fromID });
|
||||
|
||||
node.storePair(fileID, fromID);
|
||||
node.receivedRPC(fromID, rpcID);
|
||||
}
|
||||
|
||||
private void receiveFindValue(NodeIdentifier fromID, Identifier rpcID) {
|
||||
Identifier fileID = getIDFromBuffer();
|
||||
|
||||
LOGGER.log(Level.INFO, "Received [FIND VALUE {0}] from Node {1}",
|
||||
new Object[] { fileID, fromID });
|
||||
|
||||
node.sendClosestNodesTo(fromID, fileID, rpcID, false);
|
||||
}
|
||||
|
||||
|
||||
node.sendClosestNodesTo(fromID, idToFind, rpc_id, true);
|
||||
}
|
||||
|
||||
public void terminate() {
|
||||
running = false;
|
||||
}
|
||||
private void receiveStore(NodeIdentifier fromID, Identifier rpcID) {
|
||||
Identifier fileID = getIDFromBuffer();
|
||||
|
||||
LOGGER.log(Level.INFO, "Received [STORE {0}] from Node {1}",
|
||||
new Object[] { fileID, fromID });
|
||||
|
||||
node.storePair(fileID, fromID);
|
||||
node.receivedRPC(fromID, rpcID);
|
||||
}
|
||||
|
||||
private void receiveFindValue(NodeIdentifier fromID, Identifier rpcID) {
|
||||
Identifier fileID = getIDFromBuffer();
|
||||
|
||||
LOGGER.log(Level.INFO, "Received [FIND VALUE {0}] from Node {1}",
|
||||
new Object[] { fileID, fromID });
|
||||
|
||||
if (node.hasKey(fileID)) {
|
||||
//TODO: NodeID, welche das File hat, schicken
|
||||
} else {
|
||||
node.sendClosestNodesTo(fromID, fileID, rpcID, false);
|
||||
}
|
||||
}
|
||||
|
||||
public void terminate() {
|
||||
running = false;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user