diff --git a/ws2012/P2P/uebungen/11/src/node/Node.java b/ws2012/P2P/uebungen/11/src/node/Node.java index 93641a35..414ee69b 100644 --- a/ws2012/P2P/uebungen/11/src/node/Node.java +++ b/ws2012/P2P/uebungen/11/src/node/Node.java @@ -51,6 +51,8 @@ public class Node { private Map> rpcs = new HashMap>(); private Map values = new HashMap(); + + private Identifier searchID = null; private Thread thread; private UDPHandler udpListen; @@ -136,6 +138,9 @@ public class Node { } void sendFindValue(NodeIdentifier receiver, Identifier idToFind) { + // need to save the fileID because we need it for future searches + this.searchID = idToFind; + boolean successful = send(receiver, MessageType.FIND_VALUE, idToFind.getBytes(), true, null); @@ -146,7 +151,7 @@ public class Node { } /** - * Gets all nodes of this nodes routing table, that a close to a given node + * Gets all nodes of this nodes routing table, that are close to a given node/fileID * and sends that list to a specific node. * * @param receiver @@ -156,6 +161,8 @@ public class Node { * @param rpcID * An RPC ID (because this is always an answer to a FIND_NODE * RPC) + * @param nodeType + * If true, we search a specific node, else a fileID */ void sendClosestNodesTo(NodeIdentifier receiver, Identifier idToFind, Identifier rpcID, boolean nodeType) { byte msgtype = 0; @@ -356,6 +363,10 @@ public class Node { public boolean hasKey(Identifier key) { return values.containsKey(key); } + + public Identifier getSearchID() { + return this.searchID; + } public boolean receivedRPC(NodeIdentifier fromID, Identifier rpcID) { List rpcsFromID = rpcs.get(rpcID); diff --git a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java index 129f2c21..2047a75a 100644 --- a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java +++ b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java @@ -9,256 +9,278 @@ import java.util.logging.Logger; import message.MessageType; public class UDPHandler implements Runnable { - private final static Logger LOGGER = Logger.getLogger(UDPHandler.class.getName()); + private final static Logger LOGGER = Logger.getLogger(UDPHandler.class + .getName()); - public static final int BUF_SIZE = 512; + public static final int BUF_SIZE = 512; - private volatile boolean running = true; - private ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); + private volatile boolean running = true; + private ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); - private Node node; + private Node node; - public UDPHandler(Node node) { - this.node = node; - } + public UDPHandler(Node node) { + this.node = node; + } - /** - * Takes the buffer of this UDPHandler as is and tries to read an IP address - * (4 bytes and 1 int) from it. If there is no/incomplete or wrong data, - * this will fail. - * - * @return the address that has been read - */ - private InetSocketAddress getIPFromBuffer() { - StringBuilder theAddr = new StringBuilder(); - // Read 4 Bytes and 1 Integer = 1 IP address - for (int i = 0; i < 4; i++) { - theAddr.append(buffer.get()); - if (i < 3) { - theAddr.append("."); - } - } - int port = buffer.getInt(); - return new InetSocketAddress(theAddr.toString(), port); - } + /** + * Takes the buffer of this UDPHandler as is and tries to read an IP address + * (4 bytes and 1 int) from it. If there is no/incomplete or wrong data, + * this will fail. + * + * @return the address that has been read + */ + private InetSocketAddress getIPFromBuffer() { + StringBuilder theAddr = new StringBuilder(); + // Read 4 Bytes and 1 Integer = 1 IP address + for (int i = 0; i < 4; i++) { + theAddr.append(buffer.get()); + if (i < 3) { + theAddr.append("."); + } + } + int port = buffer.getInt(); + return new InetSocketAddress(theAddr.toString(), port); + } - private Identifier getIDFromBuffer() { - int numBytes = Node.ID_BITS / 8; - byte[] result = new byte[numBytes]; - for (int i = 0; i < numBytes; i++) { - result[i] = buffer.get(); - } - return new Identifier(Node.ID_BITS, result); - } + private Identifier getIDFromBuffer() { + int numBytes = Node.ID_BITS / 8; + byte[] result = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + result[i] = buffer.get(); + } + return new Identifier(Node.ID_BITS, result); + } - /** - * Reads a triple from the channel and returns a - * {@link node.NodeIdentifier}. - * - * @return the read node ID - */ - private NodeIdentifier getNodeTripleFromBuffer() { - InetSocketAddress address = getIPFromBuffer(); + /** + * Reads a triple from the channel and returns a + * {@link node.NodeIdentifier}. + * + * @return the read node ID + */ + private NodeIdentifier getNodeTripleFromBuffer() { + InetSocketAddress address = getIPFromBuffer(); - int numBytes = Node.ID_BITS / 8; - byte[] result = new byte[numBytes]; - for (int i = 0; i < numBytes; i++) { - result[i] = buffer.get(); - } - return new NodeIdentifier(Node.ID_BITS, result, address); - } + int numBytes = Node.ID_BITS / 8; + byte[] result = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + result[i] = buffer.get(); + } + return new NodeIdentifier(Node.ID_BITS, result, address); + } - public void run() { - InetSocketAddress from = null; + public void run() { + InetSocketAddress from = null; - // Run until it gets killed, and all my Acks have been answered - while (running || node.hasAcks()) { - try { - // Flag that indicates whether the routing table should be - // updated with the node we just received a message from. This - // needs to be done, because some messages trigger a direct - // answer. For example we send a PING to a node. That node - // answers with a PONG. Because we received a message from that - // node we will update our routing table and see that we already - // know this node. So we will PING that node... - boolean updateRT = true; + // Run until it gets killed, and all my Acks have been answered + while (running || node.hasAcks()) { + try { + // Flag that indicates whether the routing table should be + // updated with the node we just received a message from. This + // needs to be done, because some messages trigger a direct + // answer. For example we send a PING to a node. That node + // answers with a PONG. Because we received a message from that + // node we will update our routing table and see that we already + // know this node. So we will PING that node... + boolean updateRT = true; - // The address of the node that sent this message - from = (InetSocketAddress) node.getChannel().receive(buffer); + // The address of the node that sent this message + from = (InetSocketAddress) node.getChannel().receive(buffer); - // channel.receive() is non-blocking. So we need to check if - // something actually has been written to the buffer - if (buffer.remaining() != BUF_SIZE) { - buffer.flip(); + // channel.receive() is non-blocking. So we need to check if + // something actually has been written to the buffer + if (buffer.remaining() != BUF_SIZE) { + buffer.flip(); - byte messageType = buffer.get(); + byte messageType = buffer.get(); - NodeIdentifier fromID = new NodeIdentifier(Node.ID_BITS, - getIDFromBuffer().getBytes(), from); + NodeIdentifier fromID = new NodeIdentifier(Node.ID_BITS, + getIDFromBuffer().getBytes(), from); - Identifier rpcID = getIDFromBuffer(); + Identifier rpcID = getIDFromBuffer(); - switch (messageType) { - case MessageType.FIND_NODE: - receiveFindNode(fromID, rpcID); - break; - case MessageType.NODES: - receiveNodes(fromID, rpcID); - break; - case MessageType.PING: - updateRT = false; - receivePing(fromID, rpcID); - break; - case MessageType.PONG: - updateRT = false; - receivePong(fromID, rpcID); - break; - case MessageType.LEAVE: - // We don't have to do anything here because, after this - // switch block we call node.updateBuckets(...) which - // will try to ping the node we received this leave - // message from. That node will not answered because it - // directly shut down after sending the leave message. - // So the node will be removed from this routing table. - LOGGER.log(Level.INFO, "Received leave from {0}", - new Object[] { from.toString() }); - break; - case MessageType.FIND_VALUE: - receiveFindValue(fromID, rpcID); - break; - case MessageType.STORE: - receiveStore(fromID, rpcID); - break; - case MessageType.DATA: - receiveData(fromID, rpcID); - LOGGER.log(Level.INFO, "Received DATA from {0}", - new Object[] { from.toString() }); - break; - default: - LOGGER.log(Level.INFO, - "Received unknown command from {0}: [{1}]{2}", - new Object[] { from.toString(), messageType, - new String(buffer.array()) }); - } + switch (messageType) { + case MessageType.FIND_NODE: + receiveFindNode(fromID, rpcID); + break; + case MessageType.NODES: + receiveNodes(fromID, rpcID); + break; + case MessageType.PING: + updateRT = false; + receivePing(fromID, rpcID); + break; + case MessageType.PONG: + updateRT = false; + receivePong(fromID, rpcID); + break; + case MessageType.LEAVE: + // We don't have to do anything here because, after this + // switch block we call node.updateBuckets(...) which + // will try to ping the node we received this leave + // message from. That node will not answered because it + // directly shut down after sending the leave message. + // So the node will be removed from this routing table. + LOGGER.log(Level.INFO, "Received leave from {0}", + new Object[] { from.toString() }); + break; + case MessageType.FIND_VALUE: + receiveFindValue(fromID, rpcID); + break; + case MessageType.VALUE_NODES: + receiveValueNodes(fromID, rpcID); + break; + case MessageType.STORE: + receiveStore(fromID, rpcID); + break; + case MessageType.DATA: + receiveData(fromID, rpcID); + LOGGER.log(Level.INFO, "Received DATA from {0}", + new Object[] { from.toString() }); + break; + default: + LOGGER.log(Level.INFO, + "Received unknown command from {0}: [{1}]{2}", + new Object[] { from.toString(), messageType, + new String(buffer.array()) }); + } - if (updateRT) { - node.updateBuckets(new NodeIdentifier(Node.ID_BITS, - fromID.getBytes(), from)); - } + if (updateRT) { + node.updateBuckets(new NodeIdentifier(Node.ID_BITS, + fromID.getBytes(), from)); + } - } else { - // If nothing has been read/received wait and read/receive - // again - try { - Thread.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + } else { + // If nothing has been read/received wait and read/receive + // again + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } - buffer.clear(); + buffer.clear(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } + } catch (IOException e) { + e.printStackTrace(); + } + } + } - - - - private void receiveData(NodeIdentifier fromID, Identifier rpcID) { + private void receiveValueNodes(NodeIdentifier fromID, Identifier rpcID) { + int numReceived = 0; - StringBuilder sb = new StringBuilder(); + // This is just for the log message + StringBuilder nodes = new StringBuilder(); - while (buffer.hasRemaining()) { - sb.append(buffer.get()); - } - - String data = sb.toString(); - - String parts[] = data.split("-"); - - - + while (buffer.hasRemaining()) { + NodeIdentifier newID = getNodeTripleFromBuffer(); + node.sendFindValue(newID, node.getSearchID()); + nodes.append(newID).append(", "); + numReceived++; + } - // This should be the answer to a prior FIND_NODE -> mark this RPC ID as - // received - node.receivedRPC(fromID, rpcID); + // This should be the answer to a prior FIND_NODE -> mark this RPC ID as + // received + node.receivedRPC(fromID, rpcID); + + LOGGER.log(Level.INFO, + "Received {0} [VALUE NODES] [{1}] from Node {2})", + new Object[] { numReceived, nodes.toString(), fromID }); + } + + private void receiveData(NodeIdentifier fromID, Identifier rpcID) { + + StringBuilder sb = new StringBuilder(); + + while (buffer.hasRemaining()) { + sb.append(buffer.get()); + } + + String data = sb.toString(); + + String parts[] = data.split("-"); + + // This should be the answer to a prior FIND_NODE -> mark this RPC ID as + // received + node.receivedRPC(fromID, rpcID); + + LOGGER.log(Level.INFO, "Received [DATA] [{0}] from Node {1})", + new Object[] { data.toString(), fromID }); - LOGGER.log(Level.INFO, "Received [DATA] [{0}] from Node {1})", - new Object[] { data.toString(), fromID }); - } private void receivePong(NodeIdentifier fromID, Identifier rpcID) { - LOGGER.log(Level.INFO, "Received [PONG] from {0}", - new Object[] { fromID }); + LOGGER.log(Level.INFO, "Received [PONG] from {0}", + new Object[] { fromID }); - // This should be the answer to a prior PING -> mark this RPC ID as - // received - node.receivedRPC(fromID, rpcID); - } + // This should be the answer to a prior PING -> mark this RPC ID as + // received + node.receivedRPC(fromID, rpcID); + } - private void receivePing(NodeIdentifier fromID, Identifier rpcID) { - LOGGER.log(Level.INFO, "Received [PING] from {0}", - new Object[] { fromID }); - node.sendPong(fromID, rpcID); - } + private void receivePing(NodeIdentifier fromID, Identifier rpcID) { + LOGGER.log(Level.INFO, "Received [PING] from {0}", + new Object[] { fromID }); + node.sendPong(fromID, rpcID); + } - private void receiveNodes(NodeIdentifier fromID, Identifier rpcID) { + private void receiveNodes(NodeIdentifier fromID, Identifier rpcID) { - int numReceived = 0; + int numReceived = 0; - // This is just for the log message - StringBuilder nodes = new StringBuilder(); + // This is just for the log message + StringBuilder nodes = new StringBuilder(); - while (buffer.hasRemaining()) { - NodeIdentifier newID = getNodeTripleFromBuffer(); - node.updateBuckets(newID); - nodes.append(newID).append(", "); - numReceived++; - } + while (buffer.hasRemaining()) { + NodeIdentifier newID = getNodeTripleFromBuffer(); + node.updateBuckets(newID); + nodes.append(newID).append(", "); + numReceived++; + } - // This should be the answer to a prior FIND_NODE -> mark this RPC ID as - // received - node.receivedRPC(fromID, rpcID); + // This should be the answer to a prior FIND_NODE -> mark this RPC ID as + // received + node.receivedRPC(fromID, rpcID); - LOGGER.log(Level.INFO, "Received {0} [NODES] [{1}] from Node {2})", - new Object[] { numReceived, nodes.toString(), fromID }); - } + LOGGER.log(Level.INFO, "Received {0} [NODES] [{1}] from Node {2})", + new Object[] { numReceived, nodes.toString(), fromID }); + } - private void receiveFindNode(NodeIdentifier fromID, Identifier rpc_id) { - Identifier idToFind = getIDFromBuffer(); + private void receiveFindNode(NodeIdentifier fromID, Identifier rpc_id) { + Identifier idToFind = getIDFromBuffer(); - LOGGER.log(Level.INFO, "Received [FIND_NODE {0}] from Node {1}", - new Object[] { idToFind, fromID }); + LOGGER.log(Level.INFO, "Received [FIND_NODE {0}] from Node {1}", + new Object[] { idToFind, fromID }); - node.sendClosestNodesTo(fromID, idToFind, rpc_id, true); - } - - private void receiveStore(NodeIdentifier fromID, Identifier rpcID) { - Identifier fileID = getIDFromBuffer(); - - LOGGER.log(Level.INFO, "Received [STORE {0}] from Node {1}", - new Object[] { fileID, fromID }); - - node.storePair(fileID, fromID); - node.receivedRPC(fromID, rpcID); - } - - private void receiveFindValue(NodeIdentifier fromID, Identifier rpcID) { - Identifier fileID = getIDFromBuffer(); - - LOGGER.log(Level.INFO, "Received [FIND VALUE {0}] from Node {1}", - new Object[] { fileID, fromID }); - - node.sendClosestNodesTo(fromID, fileID, rpcID, false); - } - - + node.sendClosestNodesTo(fromID, idToFind, rpc_id, true); + } - public void terminate() { - running = false; - } + private void receiveStore(NodeIdentifier fromID, Identifier rpcID) { + Identifier fileID = getIDFromBuffer(); + + LOGGER.log(Level.INFO, "Received [STORE {0}] from Node {1}", + new Object[] { fileID, fromID }); + + node.storePair(fileID, fromID); + node.receivedRPC(fromID, rpcID); + } + + private void receiveFindValue(NodeIdentifier fromID, Identifier rpcID) { + Identifier fileID = getIDFromBuffer(); + + LOGGER.log(Level.INFO, "Received [FIND VALUE {0}] from Node {1}", + new Object[] { fileID, fromID }); + + if (node.hasKey(fileID)) { + //TODO: NodeID, welche das File hat, schicken + } else { + node.sendClosestNodesTo(fromID, fileID, rpcID, false); + } + } + + public void terminate() { + running = false; + } }