From ffce7de23dff4daa89fbe3eed18e6a5f1f818c01 Mon Sep 17 00:00:00 2001 From: senft-desktop Date: Wed, 23 Jan 2013 12:14:04 +0100 Subject: [PATCH] Refactoring, Documentation --- ws2012/P2P/uebungen/8/logging.properties | 4 +- ws2012/P2P/uebungen/8/src/CLI.java | 9 ++ ws2012/P2P/uebungen/8/src/message/Ack.java | 56 ++++++-- .../8/src/message/MessageCallback.java | 14 ++ .../P2P/uebungen/8/src/node/Identifier.java | 45 +++--- ws2012/P2P/uebungen/8/src/node/Node.java | 128 +++++++++++++----- .../uebungen/8/src/node/NodeIdentifier.java | 17 +-- .../P2P/uebungen/8/src/node/UDPHandler.java | 33 ++++- .../uebungen/8/src/routingtable/Bucket.java | 14 ++ .../{RoutingTable.java => IRoutingTable.java} | 2 +- .../8/src/routingtable/RoutingTableImpl.java | 2 +- 11 files changed, 240 insertions(+), 84 deletions(-) rename ws2012/P2P/uebungen/8/src/routingtable/{RoutingTable.java => IRoutingTable.java} (91%) diff --git a/ws2012/P2P/uebungen/8/logging.properties b/ws2012/P2P/uebungen/8/logging.properties index cca961a3..2157f57d 100644 --- a/ws2012/P2P/uebungen/8/logging.properties +++ b/ws2012/P2P/uebungen/8/logging.properties @@ -1,7 +1,7 @@ handlers=java.util.logging.ConsoleHandler -.level=ALL +.level=FINEST java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %3$s %4$s: %5$s %n java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter -java.util.logging.ConsoleHandler.level = ALL +java.util.logging.ConsoleHandler.level = FINEST diff --git a/ws2012/P2P/uebungen/8/src/CLI.java b/ws2012/P2P/uebungen/8/src/CLI.java index 5e7b9512..e7431148 100644 --- a/ws2012/P2P/uebungen/8/src/CLI.java +++ b/ws2012/P2P/uebungen/8/src/CLI.java @@ -33,6 +33,15 @@ public class CLI { System.out.println(id); } break; + case "lookup": + // TODO not implemented + if (splitted.length < 2) { + System.out.println("Too few arguments."); + } else { + String key = splitted[1]; + } + + System.out.println("not implemented"); case "leave": node.leave(); break; diff --git a/ws2012/P2P/uebungen/8/src/message/Ack.java b/ws2012/P2P/uebungen/8/src/message/Ack.java index ead9b8fb..b274fd90 100644 --- a/ws2012/P2P/uebungen/8/src/message/Ack.java +++ b/ws2012/P2P/uebungen/8/src/message/Ack.java @@ -1,5 +1,6 @@ package message; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.util.logging.Level; @@ -12,13 +13,24 @@ import util.BufferUtil; public class Ack { private final static Logger LOGGER = Logger.getLogger(Ack.class.getName()); - // timeout in seconds - private final int TIMEOUT = 1000; + /** + * timeout in seconds + */ + private static final int TIMEOUT = 1000; + + /** + * Maximum number of retries + */ + private static final int MAX_RETRIES = 3; + + private Identifier rpcId; - private Identifier id; private NodeIdentifier receiver; + private ByteBuffer buffer; + private int numRetries = 0; + private TimeoutThread timeout; private Thread thread; @@ -29,7 +41,7 @@ public class Ack { public Ack(Identifier id, NodeIdentifier receiver, DatagramChannel channel, ByteBuffer buffer, MessageCallback cb) { - this.id = id; + this.rpcId = id; this.receiver = receiver; this.channel = channel; this.buffer = BufferUtil.clone(buffer); @@ -38,14 +50,14 @@ public class Ack { } private void startThread() { - LOGGER.log(Level.FINE, "Starting timeout thread for RPC " + id); + LOGGER.log(Level.FINEST, "Starting timeout thread for RPC " + rpcId); timeout = new TimeoutThread(); thread = new Thread(timeout); thread.start(); } public Identifier getID() { - return id; + return rpcId; } public boolean check(NodeIdentifier fromID) { @@ -74,7 +86,7 @@ public class Ack { private class TimeoutThread implements Runnable { private volatile boolean notReceived = true; - // When do we stop expecting an ack + // When do we stop expecting the ack private long timeToStop = System.currentTimeMillis() + TIMEOUT; @Override @@ -87,16 +99,34 @@ public class Ack { } } - // Timeout hit -> re-send + // Timeout hit! + if (notReceived) { - LOGGER.log(Level.INFO, "Absent RPC ack {0}.", - new Object[] { id }); - if (callback != null) { - callback.onTimeout(); + if (numRetries < MAX_RETRIES) { + try { + LOGGER.log( + Level.FINE, + "Didn't receive RPC Ack {0} by now. Resending... ", + new Object[] { rpcId }); + channel.send(buffer, receiver.getAddress()); + } catch (IOException e) { + e.printStackTrace(); + } + + startThread(); + numRetries++; + } else { + + LOGGER.log(Level.INFO, "Absent RPC ack {0}.", + new Object[] { rpcId }); + + if (callback != null) { + callback.onTimeout(); + } } } else { - // has been received + // Message has been received in time if (callback != null) { callback.onReceive(); } diff --git a/ws2012/P2P/uebungen/8/src/message/MessageCallback.java b/ws2012/P2P/uebungen/8/src/message/MessageCallback.java index 9791a302..d88b3920 100644 --- a/ws2012/P2P/uebungen/8/src/message/MessageCallback.java +++ b/ws2012/P2P/uebungen/8/src/message/MessageCallback.java @@ -1,8 +1,22 @@ package message; +/** + * A callback to create asynchronous events that get triggered when a message + * (ack/answer) is received. + * + * @author jln + * + */ public interface MessageCallback { + /** + * Called when the awaited message arrives. + */ public void onReceive(); + /** + * Called when the awaited message doesn't arrive (even after possible + * retries). + */ public void onTimeout(); } diff --git a/ws2012/P2P/uebungen/8/src/node/Identifier.java b/ws2012/P2P/uebungen/8/src/node/Identifier.java index 1999e1ba..0b335fc5 100644 --- a/ws2012/P2P/uebungen/8/src/node/Identifier.java +++ b/ws2012/P2P/uebungen/8/src/node/Identifier.java @@ -4,6 +4,13 @@ import java.math.BigInteger; import java.util.BitSet; import java.util.Random; +/** + * A Kademlia identifier. Can be used for identifying files as well as nodes + * (but for nodes check {@see NodeIdentifier}). + * + * @author jln + * + */ public class Identifier { private static Random random = new Random(System.currentTimeMillis()); @@ -21,12 +28,27 @@ public class Identifier { this.bits = bits; } + /** + * Creates an ID exactly "in the middle" of the ID space. (If the ID space + * is 8 bit wide, this returns an ID valued 128). + * + * @param size + * the size of the id space + * @return an Identifier + */ public static Identifier getStaticIdentifier(int size) { BitSet middle = new BitSet(size); middle.set(size - 1); return new Identifier(size, middle); } + /** + * Creates a random ID for the given id space size. + * + * @param size + * the size of the id space + * @return a random Identifier + */ public static Identifier getRandomIdentifier(int size) { BitSet bits = new BitSet(size); @@ -42,7 +64,7 @@ public class Identifier { public BigInteger distanceTo(Identifier otherID) { BitSet distance = (BitSet) bits.clone(); - distance.xor(otherID.getBitSet()); + distance.xor(otherID.bits); return new BigInteger(1, distance.toByteArray()); } @@ -55,28 +77,13 @@ public class Identifier { * @return true if the bit is set */ public boolean isBitSetAt(int index) { - // System.out.println("---------------------------------"); BigInteger intValue = new BigInteger(1, bits.toByteArray()); int numOfTrimmedZeros = size - intValue.bitLength(); if (index < numOfTrimmedZeros) { - // System.out.println("trimm-Zweig: " + index); return false; } - // System.out - // .println(bits + " " + bits.length() + ", " - // + numOfTrimmedZeros); - // System.out.println(bits + " " + toString() + "isSet? " + index + " (" - // + ((bits.length() - size) + index) + ") " - // + bits.get((bits.length() - size) + index)); - - // return bits.get((bits.length() - size) + index); - - // System.out - // .println(bits.get(bits.length() - (index + numOfTrimmedZeros) - // - 1) - // + " " + index); return bits.get(bits.length() - (index + numOfTrimmedZeros) - 1); } @@ -89,7 +96,7 @@ public class Identifier { if (!(o instanceof Identifier)) { return false; } else { - return bits.equals(((Identifier) o).getBitSet()); + return bits.equals(((Identifier) o).bits); } } @@ -98,10 +105,6 @@ public class Identifier { return toString().hashCode(); } - private BitSet getBitSet() { - return bits; - } - public String toString() { return new BigInteger(1, bits.toByteArray()).toString(); } diff --git a/ws2012/P2P/uebungen/8/src/node/Node.java b/ws2012/P2P/uebungen/8/src/node/Node.java index a5e21a91..b8e7daf5 100644 --- a/ws2012/P2P/uebungen/8/src/node/Node.java +++ b/ws2012/P2P/uebungen/8/src/node/Node.java @@ -16,10 +16,12 @@ import java.util.logging.Logger; import message.Ack; import message.MessageCallback; import message.MessageType; -import routingtable.RoutingTable; +import routingtable.IRoutingTable; import routingtable.RoutingTableImpl; public class Node { + private final static Logger LOGGER = Logger.getLogger(Node.class.getName()); + /** * Size of ID space (has to be a multiple of 8) */ @@ -42,9 +44,7 @@ public class Node { /** * The size of an IP address (in bytes) */ - private static final int SIZE_IP_ADDRESS = 8; - - private final static Logger LOGGER = Logger.getLogger(Node.class.getName()); + public static final int SIZE_IP_ADDRESS = 8; private InetSocketAddress address; private DatagramChannel channel; @@ -56,7 +56,7 @@ public class Node { private Identifier nodeID = Identifier.getRandomIdentifier(ID_BITS); - private RoutingTable rt = new RoutingTableImpl(BUCKET_SIZE, this); + private IRoutingTable routingTable = new RoutingTableImpl(BUCKET_SIZE, this); public Node() { System.setProperty("java.net.preferIPv4Stack", "true"); @@ -70,7 +70,8 @@ public class Node { this.nodeID = INITIAL_ID; } catch (SocketException e) { - // Port 9999 is already bound -> pick a random port + // The initial port is already bound -> let the system pick a + // port channel.socket().bind(new InetSocketAddress("localhost", 0)); address = (InetSocketAddress) channel.getLocalAddress(); } @@ -91,21 +92,27 @@ public class Node { NodeIdentifier viaNode = new NodeIdentifier(ID_BITS, INITIAL_ID.getBytes(), new InetSocketAddress( "127.0.0.1", INITIAL_PORT)); - - LOGGER.log(Level.INFO, "Trying to join network via node {0}", - new Object[] { viaNode }); - - rt.insert(viaNode); - sendFindNode(viaNode, this.nodeID); - - // sendPing(viaNode); + joinNetworkVia(viaNode); } - } catch (IOException e) { e.printStackTrace(); } } + private void joinNetworkVia(NodeIdentifier viaNode) { + LOGGER.log(Level.INFO, "Trying to join network via node {0}", + new Object[] { viaNode }); + + routingTable.insert(viaNode); + sendFindNode(viaNode, this.nodeID); + } + + /** + * Creates and returns new ID (usually used as a RPC ID). This makes sure + * the ID is not yet used (in this node). + * + * @return an ID + */ private Identifier createRPCID() { Identifier rpcID = Identifier.getRandomIdentifier(ID_BITS); while (rpcs.containsKey(rpcID)) { @@ -124,18 +131,31 @@ public class Node { } } + /** + * Gets all nodes of this nodes routing table, that a close to a given node + * and sends that list to a specific node. + * + * @param receiver + * The node to receive the list of nodes + * @param idToFind + * The ID to find close nodes of + * @param rpcID + * An RPC ID (because this is always an answer to a FIND_NODE + * RPC) + */ void sendClosestNodesTo(NodeIdentifier receiver, Identifier idToFind, Identifier rpcID) { - Set entries = rt.getClosestNodesTo(idToFind); - int numNodes = entries.size(); + Set closeNodes = routingTable.getClosestNodesTo(idToFind); + int numNodes = closeNodes.size(); ByteBuffer nodes = ByteBuffer.allocate(numNodes * (ID_BITS / 8) + numNodes * SIZE_IP_ADDRESS); - for (NodeIdentifier id : entries) { - if (!receiver.equals(id)) { - nodes.put(id.getTripleAsBytes()); + for (NodeIdentifier idToSend : closeNodes) { + // Don't send the node to itself + if (!receiver.equals(idToSend)) { + nodes.put(idToSend.getTripleAsBytes()); } } @@ -146,12 +166,17 @@ public class Node { LOGGER.log( Level.INFO, "Sending {0} nodes to to node {1} [FIND_NODE {2}] (rpcID={3})", - new Object[] { entries.size(), receiver, idToFind, rpcID }); + new Object[] { closeNodes.size(), receiver, idToFind, rpcID }); } } - public boolean sendPing(NodeIdentifier receiver, MessageCallback cb) { - return send(receiver, MessageType.PING, null, true, cb); + public void sendPing(NodeIdentifier receiver, MessageCallback cb) { + boolean successful = send(receiver, MessageType.PING, null, true, cb); + + if (successful) { + LOGGER.log(Level.INFO, "Sending [PING] to node {0}", + new Object[] { receiver }); + } } void sendPong(NodeIdentifier receiver, Identifier rpcID) { @@ -164,11 +189,53 @@ public class Node { } } + /** + * Send a message to a given ID (with a given RPC ID). You usually want to + * use this method when you know the RPC ID beforehand (e.g. if this is an + * ack or answer to a prior message). + * + * @param to + * the ID to send to + * @param messageType + * the message type + * @param data + * the data to send + * @param reliable + * flag, whether this has to be acked or not + * @param cb + * A callback that is executed when this message gets acked (or + * answered). This obviously is only of interest when the + * reliable flag is true + * @return true if the message was sent successfully + */ private boolean send(NodeIdentifier to, byte messageType, byte[] data, boolean reliable, MessageCallback cb) { return send(to, messageType, createRPCID(), data, reliable, cb); } + /** + * Send a message to a given ID (with a given RPC ID). You usually want to + * use this method when you know the RPC ID beforehand (e.g. if this is an + * ack or answer to a prior message). + * + * @param to + * the ID to send to + * @param messageType + * the message type + * @param rpcID + * the RPC ID of this message (if you don't know this use + * {@link #send(NodeIdentifier, byte, byte[], boolean, MessageCallback)} + * and a new random ID will be created) + * @param data + * the data to send + * @param reliable + * flag, whether this has to be acked or not + * @param cb + * A callback that is executed when this message gets acked (or + * answered). This obviously is only of interest when the + * reliable flag is true + * @return true if the message was sent successfully + */ private boolean send(NodeIdentifier to, byte messageType, Identifier rpcID, byte[] data, boolean reliable, MessageCallback cb) { @@ -197,7 +264,6 @@ public class Node { } finally { // Even if an exception occurred this should be reliable if (reliable) { - // This message should be reliable (acked) Ack newAck = new Ack(rpcID, to, channel, buffer, cb); if (rpcs.containsKey(rpcID)) { @@ -219,18 +285,12 @@ public class Node { return !rpcs.isEmpty(); } - protected boolean hasAck(Identifier rpcID) { - // TODO - // return rpcs.containsKey(ack_id); - return false; - } - public DatagramChannel getChannel() { return channel; } public void updateBuckets(NodeIdentifier id) { - rt.insert(id); + routingTable.insert(id); } public Identifier getID() { @@ -238,7 +298,7 @@ public class Node { } public Set getNeighbors() { - return rt.getEntries(); + return routingTable.getEntries(); } public boolean receivedRPC(NodeIdentifier fromID, Identifier rpcID) { @@ -251,7 +311,7 @@ public class Node { rpcsFromID.remove(ack); removedAck = true; - LOGGER.log(Level.FINER, "Received rpc ack " + rpcID); + LOGGER.log(Level.FINEST, "Received RPC ack " + rpcID); break; } @@ -259,7 +319,7 @@ public class Node { if (!removedAck) { LOGGER.log(Level.WARNING, - "Received rpc ack {0}, but didn't expect that", + "Received RPC ack {0}, but didn't expect that", new Object[] { rpcID }); } diff --git a/ws2012/P2P/uebungen/8/src/node/NodeIdentifier.java b/ws2012/P2P/uebungen/8/src/node/NodeIdentifier.java index 51f014e0..37ca438f 100644 --- a/ws2012/P2P/uebungen/8/src/node/NodeIdentifier.java +++ b/ws2012/P2P/uebungen/8/src/node/NodeIdentifier.java @@ -5,6 +5,12 @@ import java.nio.ByteBuffer; import util.BufferUtil; +/** + * Same as a {@link Identifier}, but this also stores an IP address. + * + * @author jln + * + */ public class NodeIdentifier extends Identifier { private InetSocketAddress address; @@ -15,7 +21,8 @@ public class NodeIdentifier extends Identifier { } public byte[] getTripleAsBytes() { - ByteBuffer result = ByteBuffer.allocate(8 + (Node.ID_BITS / 8)); + ByteBuffer result = ByteBuffer.allocate(Node.SIZE_IP_ADDRESS + + (Node.ID_BITS / 8)); result.put(BufferUtil.addrToBytes(address)); result.put(bits.toByteArray()); @@ -25,10 +32,4 @@ public class NodeIdentifier extends Identifier { public InetSocketAddress getAddress() { return address; } - - public String toString() { - return super.toString() + " (" + address.toString() + ")"; - // return bits.toString(); - } - -} +} \ No newline at end of file diff --git a/ws2012/P2P/uebungen/8/src/node/UDPHandler.java b/ws2012/P2P/uebungen/8/src/node/UDPHandler.java index 71034b58..71dc04d8 100644 --- a/ws2012/P2P/uebungen/8/src/node/UDPHandler.java +++ b/ws2012/P2P/uebungen/8/src/node/UDPHandler.java @@ -52,6 +52,12 @@ public class UDPHandler implements Runnable { return new Identifier(Node.ID_BITS, result); } + /** + * Reads a triple from the channel and returns a + * {@link node.NodeIdentifier}. + * + * @return the read node ID + */ private NodeIdentifier getNodeTripleFromBuffer() { InetSocketAddress address = getIPFromBuffer(); @@ -66,10 +72,19 @@ public class UDPHandler implements Runnable { public void run() { InetSocketAddress from = null; - // Run until I get killed, and all my acks have been answered + // Run until it gets killed, and all my Acks have been answered while (running || node.hasAcks()) { try { + // Flag that indicates whether the routing table should be + // updated with the node we just received a message from. This + // needs to be done, because some messages trigger a direct + // answer. For example we send a PING to a node. That node + // answers with a PONG. Because we received a message from that + // node we will update our routing table and see that we already + // know this node. So we will PING that node... boolean updateRT = true; + + // The address of the node that sent this message from = (InetSocketAddress) node.getChannel().receive(buffer); // channel.receive() is non-blocking. So we need to check if @@ -100,9 +115,12 @@ public class UDPHandler implements Runnable { receivePong(fromID, rpcID); break; case MessageType.LEAVE: - // updateRT = false; - // receivePong(fromID, rpcID); - // do nothing, update buckets later + // We don't have to do anything here because, after this + // switch block we call node.updateBuckets(...) which + // will try to ping the node we received this leave + // message from. That node will not answered because it + // directly shut down after sending the leave message. + // So the node will be removed from this routing table. LOGGER.log(Level.INFO, "Received leave from {0}", new Object[] { from.toString() }); break; @@ -139,6 +157,9 @@ public class UDPHandler implements Runnable { private void receivePong(NodeIdentifier fromID, Identifier rpcID) { LOGGER.log(Level.INFO, "Received [PONG] from {0}", new Object[] { fromID }); + + // This should be the answer to a prior PING -> mark this RPC ID as + // received node.receivedRPC(fromID, rpcID); } @@ -151,6 +172,8 @@ public class UDPHandler implements Runnable { private void receiveNodes(NodeIdentifier fromID, Identifier rpcID) { int numReceived = 0; + + // This is just for the log message StringBuilder nodes = new StringBuilder(); while (buffer.hasRemaining()) { @@ -160,6 +183,8 @@ public class UDPHandler implements Runnable { numReceived++; } + // This should be the answer to a prior FIND_NODE -> mark this RPC ID as + // received node.receivedRPC(fromID, rpcID); LOGGER.log(Level.INFO, "Received {0} [NODES] [{1}] from Node {2})", diff --git a/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java b/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java index a8fa4463..271d063d 100644 --- a/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java +++ b/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java @@ -72,6 +72,8 @@ public class Bucket { @Override public void onTimeout() { LOGGER.log(Level.INFO, "Node didnt answer in time."); + // TODO: this should be propagated to the "upper" Routing + // Table, not just to this specific bucket entries.remove(id); } }); @@ -128,4 +130,16 @@ public class Bucket { private boolean isLeaf() { return left == null && right == null; } + + public void remove(NodeIdentifier node) { + if (isLeaf()) { + entries.remove(node); + } else { + if (node.isBitSetAt(level)) { + left.remove(node); + } else { + right.remove(node); + } + } + } } \ No newline at end of file diff --git a/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java b/ws2012/P2P/uebungen/8/src/routingtable/IRoutingTable.java similarity index 91% rename from ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java rename to ws2012/P2P/uebungen/8/src/routingtable/IRoutingTable.java index c7a995ec..8dd53f0a 100644 --- a/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java +++ b/ws2012/P2P/uebungen/8/src/routingtable/IRoutingTable.java @@ -5,7 +5,7 @@ import java.util.Set; import node.Identifier; import node.NodeIdentifier; -public interface RoutingTable { +public interface IRoutingTable { public void insert(NodeIdentifier id); diff --git a/ws2012/P2P/uebungen/8/src/routingtable/RoutingTableImpl.java b/ws2012/P2P/uebungen/8/src/routingtable/RoutingTableImpl.java index 1900191d..70b070ed 100644 --- a/ws2012/P2P/uebungen/8/src/routingtable/RoutingTableImpl.java +++ b/ws2012/P2P/uebungen/8/src/routingtable/RoutingTableImpl.java @@ -12,7 +12,7 @@ import node.Identifier; import node.Node; import node.NodeIdentifier; -public class RoutingTableImpl implements RoutingTable { +public class RoutingTableImpl implements IRoutingTable { private Set entries = new HashSet(); private Bucket root;