diff --git a/ws2012/P2P/uebungen/8/src/message/Ack.java b/ws2012/P2P/uebungen/8/src/message/Ack.java index 378532e9..b7b24eac 100644 --- a/ws2012/P2P/uebungen/8/src/message/Ack.java +++ b/ws2012/P2P/uebungen/8/src/message/Ack.java @@ -1,105 +1,109 @@ package message; import java.io.IOException; -import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.util.logging.Level; import java.util.logging.Logger; +import node.Identifier; +import node.NodeIdentifier; +import util.BufferUtil; + public class Ack { - private final static Logger LOGGER = Logger.getLogger(Ack.class.getName()); + private final static Logger LOGGER = Logger.getLogger(Ack.class.getName()); - // timeout in seconds - private final int TIMEOUT = 1000; + // timeout in seconds + private final int TIMEOUT = 1000; - private int id; - private SocketAddress address; - private ByteBuffer buf; + private Identifier id; + private NodeIdentifier receiver; + private ByteBuffer buffer; - private TimeoutThread timeout; - private volatile Thread thread; + private TimeoutThread timeout; + private Thread thread; - // The channel to re-send the message on - private DatagramChannel channel; + // The channel to re-send the message on + private DatagramChannel channel; - public Ack(int id, SocketAddress address, DatagramChannel channel) { - this.id = id; - this.address = address; - this.channel = channel; - startThread(); - } + public Ack(Identifier id, NodeIdentifier receiver, DatagramChannel channel, + ByteBuffer buffer) { + // TODO: this needs a max. number of retries (so that a ping to a failed + // host doesn't re-ping forever) + this.id = id; + this.receiver = receiver; + this.channel = channel; + this.buffer = BufferUtil.clone(buffer); + startThread(); + } - private void startThread() { - LOGGER.log(Level.FINE, "Starting timeout thread for ack #" + id); - timeout = new TimeoutThread(); - thread = new Thread(timeout); - thread.start(); - } + private void startThread() { + LOGGER.log(Level.FINE, "Starting timeout thread for RPC " + id); + timeout = new TimeoutThread(); + thread = new Thread(timeout); + thread.start(); + } - public int getID() { - return id; - } + public Identifier getID() { + return id; + } - public boolean check(SocketAddress address) { - return this.address.toString().equals(address.toString()); - } + public boolean check(NodeIdentifier fromID) { + return fromID.equals(receiver); + } - public ByteBuffer getBuf() { - return buf; - } + public ByteBuffer getBuf() { + return buffer; + } - public void setBuf(ByteBuffer buf) { - this.buf = buf; - } + public void setBuf(ByteBuffer buf) { + this.buffer = buf; + } - public void setReceived() { - // Stop thread - try { - if (thread != null) { - timeout.terminate(); - thread.join(); - } - } catch (InterruptedException e) { - } - } + public void setReceived() { + // Stop thread + try { + if (thread != null) { + timeout.terminate(); + thread.join(); + } + } catch (InterruptedException e) { + } + } - private class TimeoutThread implements Runnable { - private volatile boolean notReceived = true; + private class TimeoutThread implements Runnable { + private volatile boolean notReceived = true; - // When do we stop expecting an ack - private long timeToStop = System.currentTimeMillis() + TIMEOUT; + // When do we stop expecting an ack + private long timeToStop = System.currentTimeMillis() + TIMEOUT; - @Override - public void run() { - while (notReceived && System.currentTimeMillis() < timeToStop) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + @Override + public void run() { + while (notReceived && System.currentTimeMillis() < timeToStop) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } - // Timeout hit -> re-send - if (notReceived) { - try { - LOGGER.log(Level.INFO, "Absent ack #{0}. Resending to {1}", - new Object[] { id, address.toString() }); + // Timeout hit -> re-send + if (notReceived) { + try { + LOGGER.log(Level.INFO, + "Absent RPC ack {0}. Resending to {1}", + new Object[] { id, receiver.toString() }); + channel.send(buffer, receiver.getAddress()); + startThread(); + } catch (IOException e) { + System.out.println("ASDASD"); + e.printStackTrace(); + } + } + } - channel.send(buf, address); - startThread(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - public void terminate() { - notReceived = false; - } - } - - public SocketAddress getAddresse() { - return address; - } + public void terminate() { + notReceived = false; + } + } } \ No newline at end of file diff --git a/ws2012/P2P/uebungen/8/src/message/MessageType.java b/ws2012/P2P/uebungen/8/src/message/MessageType.java index eebc5a73..7cf981e9 100644 --- a/ws2012/P2P/uebungen/8/src/message/MessageType.java +++ b/ws2012/P2P/uebungen/8/src/message/MessageType.java @@ -1,12 +1,9 @@ package message; - public class MessageType { - public final static byte FIND_NODE = 48; // 0 - public final static byte PING = 49; - public final static byte PONG = 50; + public final static byte FIND_NODE = 0; + public final static byte NODES = 1; - public final static byte ACK = 51; - - public final static byte NODES = 52; + public final static byte PING = 10; + public final static byte PONG = 11; } diff --git a/ws2012/P2P/uebungen/8/src/node/Node.java b/ws2012/P2P/uebungen/8/src/node/Node.java index cad00fd1..17c06d3b 100644 --- a/ws2012/P2P/uebungen/8/src/node/Node.java +++ b/ws2012/P2P/uebungen/8/src/node/Node.java @@ -5,11 +5,12 @@ import java.net.InetSocketAddress; import java.net.SocketException; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.logging.Level; -import java.util.logging.LogManager; import java.util.logging.Logger; import message.Ack; @@ -34,14 +35,19 @@ public class Node { private static final Identifier INITIAL_ID = Identifier .getStaticIdentifier(); - private static final int _BUFFER_SIZE = 512; + private static final int BUFFER_SIZE = 512; + + /** + * The size of an IP address (in bytes) + */ + private static final int SIZE_IP_ADDRESS = 8; private final static Logger LOGGER = Logger.getLogger(Node.class.getName()); private InetSocketAddress address; private DatagramChannel channel; - private Map acks = new HashMap(); + private Map> rpcs = new HashMap>(); private Thread thread; private UDPHandler udpListen; @@ -77,8 +83,20 @@ public class Node { getName(), address.toString() }); if (address.getPort() != INITIAL_PORT) { - joinNetworkVia(new NodeIdentifier(INITIAL_ID.getBytes(), - new InetSocketAddress("127.0.0.1", INITIAL_PORT))); + // The port of this node is not the "INITIAL_PORT" (so it's not + // the first node in the network). So we try to join the network + // via the first node. + NodeIdentifier viaNode = new NodeIdentifier( + INITIAL_ID.getBytes(), new InetSocketAddress( + "127.0.0.1", INITIAL_PORT)); + + LOGGER.log(Level.INFO, "Trying to join network via node {0}", + new Object[] { viaNode }); + + rt.update(viaNode); + sendFindNode(viaNode, this.nodeID); + + sendPing(viaNode); } } catch (IOException e) { @@ -86,82 +104,122 @@ public class Node { } } - private void joinNetworkVia(NodeIdentifier receiver) { - LOGGER.log(Level.INFO, "Trying to join network via node {0}", - new Object[] { receiver }); - - rt.update(receiver); - // Send a FIND_NODE. The node to find is my own id (bootstrapping...) - sendFindNode(receiver, nodeID); + private Identifier createRPCID() { + Identifier rpcID = Identifier.getRandomIdentifier(); + while (rpcs.containsKey(rpcID)) { + rpcID = Identifier.getRandomIdentifier(); + } + return rpcID; } void sendFindNode(NodeIdentifier receiver, Identifier idToFind) { - ByteBuffer buffer = ByteBuffer.allocate(_BUFFER_SIZE); + boolean successful = send(receiver, MessageType.FIND_NODE, + idToFind.getBytes(), true); - Identifier rpc_id = Identifier.getRandomIdentifier(); - - buffer.put(MessageType.FIND_NODE); - buffer.put(nodeID.getBytes()); - buffer.put(rpc_id.getBytes()); - buffer.put(idToFind.getBytes()); - - if (send(buffer, receiver.getAddress())) { + if (successful) { LOGGER.log(Level.INFO, "Sending [FIND_NODE {0}] to node {1}", new Object[] { idToFind, receiver }); } } void sendClosestNodesTo(NodeIdentifier receiver, Identifier idToFind, - Identifier rpc_id) { - ByteBuffer buffer = ByteBuffer.allocate(_BUFFER_SIZE); - - buffer.put(MessageType.NODES); - buffer.put(nodeID.getBytes()); - buffer.put(rpc_id.getBytes()); + Identifier rpcID) { Set entries = rt.getClosestNodesTo(idToFind); + int numNodes = entries.size(); + + ByteBuffer nodes = ByteBuffer.allocate(numNodes * (ID_BITS / 8) + + numNodes * SIZE_IP_ADDRESS); + for (NodeIdentifier id : entries) { if (!receiver.equals(id)) { - buffer.put(id.getTripleAsBytes()); + nodes.put(id.getTripleAsBytes()); } } - if (send(buffer, receiver.getAddress())) { - LOGGER.log(Level.INFO, - "Sending {0} nodes to to node {1} [FIND_NODE {2}].", - new Object[] { entries.size(), receiver, idToFind }); + boolean successful = send(receiver, MessageType.NODES, rpcID, + nodes.array(), false); + + if (successful) { + LOGGER.log( + Level.INFO, + "Sending {0} nodes to to node {1} [FIND_NODE {2}] (rpcID={3})", + new Object[] { entries.size(), receiver, idToFind, rpcID }); } } - private boolean send(ByteBuffer buffer, InetSocketAddress to) { - buffer.flip(); - try { - channel.send(buffer, to); - return true; - } catch (IOException e) { - e.printStackTrace(); - return false; + boolean sendPing(NodeIdentifier receiver) { + return send(receiver, MessageType.PING, null, true); + } + + void sendPong(NodeIdentifier receiver, Identifier rpcID) { + boolean successful = send(receiver, MessageType.PONG, rpcID, null, + false); + + if (successful) { + LOGGER.log(Level.INFO, "Sending [PONG] to {0} (rpcID={1})", + new Object[] { receiver, rpcID }); } } + private boolean send(NodeIdentifier to, byte messageType, byte[] data, + boolean reliable) { + return send(to, messageType, createRPCID(), data, reliable); + } + + private boolean send(NodeIdentifier to, byte messageType, Identifier rpcID, + byte[] data, boolean reliable) { + boolean successful = true; + ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); + + buffer.put(messageType); + buffer.put(this.nodeID.getBytes()); + buffer.put(rpcID.getBytes()); + + if (data != null) { + buffer.put(data); + } + + buffer.flip(); + + try { + + channel.send(buffer, to.getAddress()); + + } catch (IOException e) { + + LOGGER.log(Level.SEVERE, "Failed to write to channel", e); + successful = false; + + } finally { + // Even if an exception occurred this should be reliable + if (reliable) { + // This message should be reliable (acked) + + Ack newAck = new Ack(rpcID, to, channel, buffer); + if (rpcs.containsKey(rpcID)) { + rpcs.get(rpcID).add(newAck); + } else { + rpcs.put(rpcID, new ArrayList()); + rpcs.get(rpcID).add(newAck); + } + } + } + return successful; + } + public String getName() { return nodeID.toString(); } public boolean hasAcks() { - return !acks.isEmpty(); + return !rpcs.isEmpty(); } - protected boolean hasAck(int ack_id) { - return acks.containsKey(ack_id); - } - - protected Ack getAck(int ack_id) { - return acks.get(ack_id); - } - - protected void removeAck(int ack_id) { - acks.remove(ack_id).setReceived(); + protected boolean hasAck(Identifier rpcID) { + // TODO + // return rpcs.containsKey(ack_id); + return false; } public DatagramChannel getChannel() { @@ -180,16 +238,28 @@ public class Node { return rt.getEntries(); } - public static void main(String[] args) { - System.setProperty("java.util.logging.config.file", - "logging.properties"); + public boolean receivedRPC(NodeIdentifier fromID, Identifier rpcID) { + List rpcsFromID = rpcs.get(rpcID); + boolean removedAck = false; - try { - LogManager.getLogManager().readConfiguration(); - } catch (Exception e) { - e.printStackTrace(); + for (Ack ack : rpcsFromID) { + if (ack.check(fromID)) { + ack.setReceived(); + rpcsFromID.remove(ack); + removedAck = true; + + LOGGER.log(Level.FINER, "Received rpc ack " + rpcID); + + break; + } } - new Node(); + if (!removedAck) { + LOGGER.log(Level.WARNING, + "Received rpc ack {0}, but didn't expect that", + new Object[] { rpcID }); + } + + return removedAck; } } diff --git a/ws2012/P2P/uebungen/8/src/node/UDPHandler.java b/ws2012/P2P/uebungen/8/src/node/UDPHandler.java index 63df20a9..82814fb0 100644 --- a/ws2012/P2P/uebungen/8/src/node/UDPHandler.java +++ b/ws2012/P2P/uebungen/8/src/node/UDPHandler.java @@ -80,18 +80,21 @@ public class UDPHandler implements Runnable { NodeIdentifier fromID = new NodeIdentifier( getIDFromBuffer().getBytes(), from); - Identifier rpc_id = getIDFromBuffer(); + + Identifier rpcID = getIDFromBuffer(); switch (messageType) { case MessageType.FIND_NODE: - receiveFindNode(fromID, rpc_id); + receiveFindNode(fromID, rpcID); break; case MessageType.NODES: - receiveNodes(fromID, rpc_id); + receiveNodes(fromID, rpcID); break; case MessageType.PING: - LOGGER.log(Level.INFO, "Received [ping] from {0}", - new Object[] { from.toString() }); + receivePing(fromID, rpcID); + break; + case MessageType.PONG: + receivePong(fromID, rpcID); break; default: LOGGER.log(Level.INFO, @@ -104,6 +107,8 @@ public class UDPHandler implements Runnable { from)); } else { + // If nothing has been read/received wait and read/receive + // again try { Thread.sleep(10); } catch (InterruptedException e) { @@ -119,18 +124,34 @@ public class UDPHandler implements Runnable { } } - private void receiveNodes(NodeIdentifier fromID, Identifier rpc_id) { + private void receivePong(NodeIdentifier fromID, Identifier rpcID) { + LOGGER.log(Level.INFO, "Received [PONG] from {0}", + new Object[] { fromID }); + node.receivedRPC(fromID, rpcID); + } + + private void receivePing(NodeIdentifier fromID, Identifier rpcID) { + LOGGER.log(Level.INFO, "Received [PING] from {0}", + new Object[] { fromID }); + node.sendPong(fromID, rpcID); + } + + private void receiveNodes(NodeIdentifier fromID, Identifier rpcID) { int numReceived = 0; + StringBuilder nodes = new StringBuilder(); while (buffer.hasRemaining()) { NodeIdentifier newID = getNodeTripleFromBuffer(); node.updateBuckets(newID); + nodes.append(newID).append(", "); numReceived++; } - LOGGER.log(Level.INFO, "Received {0} [NODES] from Node {1})", - new Object[] { numReceived, fromID }); + node.receivedRPC(fromID, rpcID); + + LOGGER.log(Level.INFO, "Received {0} [NODES] [{1}] from Node {2})", + new Object[] { numReceived, nodes.toString(), fromID }); } private void receiveFindNode(NodeIdentifier fromID, Identifier rpc_id) { diff --git a/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java b/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java index e9ee7e9a..9b1c3012 100644 --- a/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java +++ b/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java @@ -40,26 +40,33 @@ public class Bucket { } } else if (bucket.hasSpace()) { - System.out.println(entries == null); - if (entries.contains(newID)) { + if (bucket.entries.contains(newID)) { // Move to beginning LOGGER.log( Level.FINE, "Node {0} already in routing table. Move to end of bucket.", new Object[] { newID }); - if (entries.size() > 1) { - entries.remove(newID); - entries.add(newID); + + if (bucket.entries.size() > 1) { + bucket.entries.remove(newID); + bucket.entries.add(newID); } } else { LOGGER.log(Level.INFO, "Added new node {0} to routing table.", new Object[] { newID }); - entries.add(newID); + bucket.entries.add(newID); } } else { // Bucket is full - System.out.println(entries == null); - if (!entries.contains(newID)) { + if (bucket.entries.contains(newID)) { + // Node is already present -> check if [last contacted node of + // this buffer] is still alive + LOGGER.log( + Level.FINE, + "Node {0} already in routing table. Check if still alive...", + new Object[] { newID }); + + } else { // TODO: only split if necessary Bucket newLeft = new Bucket(bucketSize, level + 1); @@ -67,24 +74,18 @@ public class Bucket { // Add the new entry and in the following loop distribute all // existing entries to left/right - entries.add(newID); + bucket.entries.add(newID); - for (NodeIdentifier entry : entries) { - if (entry.isBitSetAt(level)) { - newLeft.entries.add(entry); + for (NodeIdentifier id : entries) { + if (id.isBitSetAt(level)) { + newLeft.entries.add(id); } else { - newRight.entries.add(entry); + newRight.entries.add(id); } } bucket.entries = null; bucket.left = newLeft; bucket.right = newRight; - } else { - // Node is already present -> check if it's still alive - LOGGER.log( - Level.FINE, - "Node {0} already in routing table. Check if still alive...", - new Object[] { newID }); } } } diff --git a/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java b/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java index decb7e6f..7882f917 100644 --- a/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java +++ b/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java @@ -48,12 +48,13 @@ public class RoutingTable { } }); + for (int i = 0; i < Node.BUCKET_SIZE; i++) { result.add(temp.get(i)); } - + result = new HashSet(temp.subList(0, + Node.BUCKET_SIZE)); } - return result; }