Refactoring, Documentation

This commit is contained in:
senft-desktop 2013-01-23 12:14:04 +01:00
parent bad05626b2
commit ffce7de23d
11 changed files with 240 additions and 84 deletions

View File

@ -1,7 +1,7 @@
handlers=java.util.logging.ConsoleHandler
.level=ALL
.level=FINEST
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %3$s %4$s: %5$s %n
java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
java.util.logging.ConsoleHandler.level = ALL
java.util.logging.ConsoleHandler.level = FINEST

View File

@ -33,6 +33,15 @@ public class CLI {
System.out.println(id);
}
break;
case "lookup":
// TODO not implemented
if (splitted.length < 2) {
System.out.println("Too few arguments.");
} else {
String key = splitted[1];
}
System.out.println("not implemented");
case "leave":
node.leave();
break;

View File

@ -1,5 +1,6 @@
package message;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.logging.Level;
@ -12,13 +13,24 @@ import util.BufferUtil;
public class Ack {
private final static Logger LOGGER = Logger.getLogger(Ack.class.getName());
// timeout in seconds
private final int TIMEOUT = 1000;
/**
* timeout in seconds
*/
private static final int TIMEOUT = 1000;
/**
* Maximum number of retries
*/
private static final int MAX_RETRIES = 3;
private Identifier rpcId;
private Identifier id;
private NodeIdentifier receiver;
private ByteBuffer buffer;
private int numRetries = 0;
private TimeoutThread timeout;
private Thread thread;
@ -29,7 +41,7 @@ public class Ack {
public Ack(Identifier id, NodeIdentifier receiver, DatagramChannel channel,
ByteBuffer buffer, MessageCallback cb) {
this.id = id;
this.rpcId = id;
this.receiver = receiver;
this.channel = channel;
this.buffer = BufferUtil.clone(buffer);
@ -38,14 +50,14 @@ public class Ack {
}
private void startThread() {
LOGGER.log(Level.FINE, "Starting timeout thread for RPC " + id);
LOGGER.log(Level.FINEST, "Starting timeout thread for RPC " + rpcId);
timeout = new TimeoutThread();
thread = new Thread(timeout);
thread.start();
}
public Identifier getID() {
return id;
return rpcId;
}
public boolean check(NodeIdentifier fromID) {
@ -74,7 +86,7 @@ public class Ack {
private class TimeoutThread implements Runnable {
private volatile boolean notReceived = true;
// When do we stop expecting an ack
// When do we stop expecting the ack
private long timeToStop = System.currentTimeMillis() + TIMEOUT;
@Override
@ -87,16 +99,34 @@ public class Ack {
}
}
// Timeout hit -> re-send
// Timeout hit!
if (notReceived) {
LOGGER.log(Level.INFO, "Absent RPC ack {0}.",
new Object[] { id });
if (callback != null) {
callback.onTimeout();
if (numRetries < MAX_RETRIES) {
try {
LOGGER.log(
Level.FINE,
"Didn't receive RPC Ack {0} by now. Resending... ",
new Object[] { rpcId });
channel.send(buffer, receiver.getAddress());
} catch (IOException e) {
e.printStackTrace();
}
startThread();
numRetries++;
} else {
LOGGER.log(Level.INFO, "Absent RPC ack {0}.",
new Object[] { rpcId });
if (callback != null) {
callback.onTimeout();
}
}
} else {
// has been received
// Message has been received in time
if (callback != null) {
callback.onReceive();
}

View File

@ -1,8 +1,22 @@
package message;
/**
* A callback to create asynchronous events that get triggered when a message
* (ack/answer) is received.
*
* @author jln
*
*/
public interface MessageCallback {
/**
* Called when the awaited message arrives.
*/
public void onReceive();
/**
* Called when the awaited message doesn't arrive (even after possible
* retries).
*/
public void onTimeout();
}

View File

@ -4,6 +4,13 @@ import java.math.BigInteger;
import java.util.BitSet;
import java.util.Random;
/**
* A Kademlia identifier. Can be used for identifying files as well as nodes
* (but for nodes check {@see NodeIdentifier}).
*
* @author jln
*
*/
public class Identifier {
private static Random random = new Random(System.currentTimeMillis());
@ -21,12 +28,27 @@ public class Identifier {
this.bits = bits;
}
/**
* Creates an ID exactly "in the middle" of the ID space. (If the ID space
* is 8 bit wide, this returns an ID valued 128).
*
* @param size
* the size of the id space
* @return an Identifier
*/
public static Identifier getStaticIdentifier(int size) {
BitSet middle = new BitSet(size);
middle.set(size - 1);
return new Identifier(size, middle);
}
/**
* Creates a random ID for the given id space size.
*
* @param size
* the size of the id space
* @return a random Identifier
*/
public static Identifier getRandomIdentifier(int size) {
BitSet bits = new BitSet(size);
@ -42,7 +64,7 @@ public class Identifier {
public BigInteger distanceTo(Identifier otherID) {
BitSet distance = (BitSet) bits.clone();
distance.xor(otherID.getBitSet());
distance.xor(otherID.bits);
return new BigInteger(1, distance.toByteArray());
}
@ -55,28 +77,13 @@ public class Identifier {
* @return true if the bit is set
*/
public boolean isBitSetAt(int index) {
// System.out.println("---------------------------------");
BigInteger intValue = new BigInteger(1, bits.toByteArray());
int numOfTrimmedZeros = size - intValue.bitLength();
if (index < numOfTrimmedZeros) {
// System.out.println("trimm-Zweig: " + index);
return false;
}
// System.out
// .println(bits + " " + bits.length() + ", "
// + numOfTrimmedZeros);
// System.out.println(bits + " " + toString() + "isSet? " + index + " ("
// + ((bits.length() - size) + index) + ") "
// + bits.get((bits.length() - size) + index));
// return bits.get((bits.length() - size) + index);
// System.out
// .println(bits.get(bits.length() - (index + numOfTrimmedZeros)
// - 1)
// + " " + index);
return bits.get(bits.length() - (index + numOfTrimmedZeros) - 1);
}
@ -89,7 +96,7 @@ public class Identifier {
if (!(o instanceof Identifier)) {
return false;
} else {
return bits.equals(((Identifier) o).getBitSet());
return bits.equals(((Identifier) o).bits);
}
}
@ -98,10 +105,6 @@ public class Identifier {
return toString().hashCode();
}
private BitSet getBitSet() {
return bits;
}
public String toString() {
return new BigInteger(1, bits.toByteArray()).toString();
}

View File

@ -16,10 +16,12 @@ import java.util.logging.Logger;
import message.Ack;
import message.MessageCallback;
import message.MessageType;
import routingtable.RoutingTable;
import routingtable.IRoutingTable;
import routingtable.RoutingTableImpl;
public class Node {
private final static Logger LOGGER = Logger.getLogger(Node.class.getName());
/**
* Size of ID space (has to be a multiple of 8)
*/
@ -42,9 +44,7 @@ public class Node {
/**
* The size of an IP address (in bytes)
*/
private static final int SIZE_IP_ADDRESS = 8;
private final static Logger LOGGER = Logger.getLogger(Node.class.getName());
public static final int SIZE_IP_ADDRESS = 8;
private InetSocketAddress address;
private DatagramChannel channel;
@ -56,7 +56,7 @@ public class Node {
private Identifier nodeID = Identifier.getRandomIdentifier(ID_BITS);
private RoutingTable rt = new RoutingTableImpl(BUCKET_SIZE, this);
private IRoutingTable routingTable = new RoutingTableImpl(BUCKET_SIZE, this);
public Node() {
System.setProperty("java.net.preferIPv4Stack", "true");
@ -70,7 +70,8 @@ public class Node {
this.nodeID = INITIAL_ID;
} catch (SocketException e) {
// Port 9999 is already bound -> pick a random port
// The initial port is already bound -> let the system pick a
// port
channel.socket().bind(new InetSocketAddress("localhost", 0));
address = (InetSocketAddress) channel.getLocalAddress();
}
@ -91,21 +92,27 @@ public class Node {
NodeIdentifier viaNode = new NodeIdentifier(ID_BITS,
INITIAL_ID.getBytes(), new InetSocketAddress(
"127.0.0.1", INITIAL_PORT));
LOGGER.log(Level.INFO, "Trying to join network via node {0}",
new Object[] { viaNode });
rt.insert(viaNode);
sendFindNode(viaNode, this.nodeID);
// sendPing(viaNode);
joinNetworkVia(viaNode);
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void joinNetworkVia(NodeIdentifier viaNode) {
LOGGER.log(Level.INFO, "Trying to join network via node {0}",
new Object[] { viaNode });
routingTable.insert(viaNode);
sendFindNode(viaNode, this.nodeID);
}
/**
* Creates and returns new ID (usually used as a RPC ID). This makes sure
* the ID is not yet used (in this node).
*
* @return an ID
*/
private Identifier createRPCID() {
Identifier rpcID = Identifier.getRandomIdentifier(ID_BITS);
while (rpcs.containsKey(rpcID)) {
@ -124,18 +131,31 @@ public class Node {
}
}
/**
* Gets all nodes of this nodes routing table, that a close to a given node
* and sends that list to a specific node.
*
* @param receiver
* The node to receive the list of nodes
* @param idToFind
* The ID to find close nodes of
* @param rpcID
* An RPC ID (because this is always an answer to a FIND_NODE
* RPC)
*/
void sendClosestNodesTo(NodeIdentifier receiver, Identifier idToFind,
Identifier rpcID) {
Set<NodeIdentifier> entries = rt.getClosestNodesTo(idToFind);
int numNodes = entries.size();
Set<NodeIdentifier> closeNodes = routingTable.getClosestNodesTo(idToFind);
int numNodes = closeNodes.size();
ByteBuffer nodes = ByteBuffer.allocate(numNodes * (ID_BITS / 8)
+ numNodes * SIZE_IP_ADDRESS);
for (NodeIdentifier id : entries) {
if (!receiver.equals(id)) {
nodes.put(id.getTripleAsBytes());
for (NodeIdentifier idToSend : closeNodes) {
// Don't send the node to itself
if (!receiver.equals(idToSend)) {
nodes.put(idToSend.getTripleAsBytes());
}
}
@ -146,12 +166,17 @@ public class Node {
LOGGER.log(
Level.INFO,
"Sending {0} nodes to to node {1} [FIND_NODE {2}] (rpcID={3})",
new Object[] { entries.size(), receiver, idToFind, rpcID });
new Object[] { closeNodes.size(), receiver, idToFind, rpcID });
}
}
public boolean sendPing(NodeIdentifier receiver, MessageCallback cb) {
return send(receiver, MessageType.PING, null, true, cb);
public void sendPing(NodeIdentifier receiver, MessageCallback cb) {
boolean successful = send(receiver, MessageType.PING, null, true, cb);
if (successful) {
LOGGER.log(Level.INFO, "Sending [PING] to node {0}",
new Object[] { receiver });
}
}
void sendPong(NodeIdentifier receiver, Identifier rpcID) {
@ -164,11 +189,53 @@ public class Node {
}
}
/**
* Send a message to a given ID (with a given RPC ID). You usually want to
* use this method when you know the RPC ID beforehand (e.g. if this is an
* ack or answer to a prior message).
*
* @param to
* the ID to send to
* @param messageType
* the message type
* @param data
* the data to send
* @param reliable
* flag, whether this has to be acked or not
* @param cb
* A callback that is executed when this message gets acked (or
* answered). This obviously is only of interest when the
* reliable flag is true
* @return true if the message was sent successfully
*/
private boolean send(NodeIdentifier to, byte messageType, byte[] data,
boolean reliable, MessageCallback cb) {
return send(to, messageType, createRPCID(), data, reliable, cb);
}
/**
* Send a message to a given ID (with a given RPC ID). You usually want to
* use this method when you know the RPC ID beforehand (e.g. if this is an
* ack or answer to a prior message).
*
* @param to
* the ID to send to
* @param messageType
* the message type
* @param rpcID
* the RPC ID of this message (if you don't know this use
* {@link #send(NodeIdentifier, byte, byte[], boolean, MessageCallback)}
* and a new random ID will be created)
* @param data
* the data to send
* @param reliable
* flag, whether this has to be acked or not
* @param cb
* A callback that is executed when this message gets acked (or
* answered). This obviously is only of interest when the
* reliable flag is true
* @return true if the message was sent successfully
*/
private boolean send(NodeIdentifier to, byte messageType, Identifier rpcID,
byte[] data, boolean reliable, MessageCallback cb) {
@ -197,7 +264,6 @@ public class Node {
} finally {
// Even if an exception occurred this should be reliable
if (reliable) {
// This message should be reliable (acked)
Ack newAck = new Ack(rpcID, to, channel, buffer, cb);
if (rpcs.containsKey(rpcID)) {
@ -219,18 +285,12 @@ public class Node {
return !rpcs.isEmpty();
}
protected boolean hasAck(Identifier rpcID) {
// TODO
// return rpcs.containsKey(ack_id);
return false;
}
public DatagramChannel getChannel() {
return channel;
}
public void updateBuckets(NodeIdentifier id) {
rt.insert(id);
routingTable.insert(id);
}
public Identifier getID() {
@ -238,7 +298,7 @@ public class Node {
}
public Set<NodeIdentifier> getNeighbors() {
return rt.getEntries();
return routingTable.getEntries();
}
public boolean receivedRPC(NodeIdentifier fromID, Identifier rpcID) {
@ -251,7 +311,7 @@ public class Node {
rpcsFromID.remove(ack);
removedAck = true;
LOGGER.log(Level.FINER, "Received rpc ack " + rpcID);
LOGGER.log(Level.FINEST, "Received RPC ack " + rpcID);
break;
}
@ -259,7 +319,7 @@ public class Node {
if (!removedAck) {
LOGGER.log(Level.WARNING,
"Received rpc ack {0}, but didn't expect that",
"Received RPC ack {0}, but didn't expect that",
new Object[] { rpcID });
}

View File

@ -5,6 +5,12 @@ import java.nio.ByteBuffer;
import util.BufferUtil;
/**
* Same as a {@link Identifier}, but this also stores an IP address.
*
* @author jln
*
*/
public class NodeIdentifier extends Identifier {
private InetSocketAddress address;
@ -15,7 +21,8 @@ public class NodeIdentifier extends Identifier {
}
public byte[] getTripleAsBytes() {
ByteBuffer result = ByteBuffer.allocate(8 + (Node.ID_BITS / 8));
ByteBuffer result = ByteBuffer.allocate(Node.SIZE_IP_ADDRESS
+ (Node.ID_BITS / 8));
result.put(BufferUtil.addrToBytes(address));
result.put(bits.toByteArray());
@ -25,10 +32,4 @@ public class NodeIdentifier extends Identifier {
public InetSocketAddress getAddress() {
return address;
}
public String toString() {
return super.toString() + " (" + address.toString() + ")";
// return bits.toString();
}
}
}

View File

@ -52,6 +52,12 @@ public class UDPHandler implements Runnable {
return new Identifier(Node.ID_BITS, result);
}
/**
* Reads a triple <IP address, port, id> from the channel and returns a
* {@link node.NodeIdentifier}.
*
* @return the read node ID
*/
private NodeIdentifier getNodeTripleFromBuffer() {
InetSocketAddress address = getIPFromBuffer();
@ -66,10 +72,19 @@ public class UDPHandler implements Runnable {
public void run() {
InetSocketAddress from = null;
// Run until I get killed, and all my acks have been answered
// Run until it gets killed, and all my Acks have been answered
while (running || node.hasAcks()) {
try {
// Flag that indicates whether the routing table should be
// updated with the node we just received a message from. This
// needs to be done, because some messages trigger a direct
// answer. For example we send a PING to a node. That node
// answers with a PONG. Because we received a message from that
// node we will update our routing table and see that we already
// know this node. So we will PING that node...
boolean updateRT = true;
// The address of the node that sent this message
from = (InetSocketAddress) node.getChannel().receive(buffer);
// channel.receive() is non-blocking. So we need to check if
@ -100,9 +115,12 @@ public class UDPHandler implements Runnable {
receivePong(fromID, rpcID);
break;
case MessageType.LEAVE:
// updateRT = false;
// receivePong(fromID, rpcID);
// do nothing, update buckets later
// We don't have to do anything here because, after this
// switch block we call node.updateBuckets(...) which
// will try to ping the node we received this leave
// message from. That node will not answered because it
// directly shut down after sending the leave message.
// So the node will be removed from this routing table.
LOGGER.log(Level.INFO, "Received leave from {0}",
new Object[] { from.toString() });
break;
@ -139,6 +157,9 @@ public class UDPHandler implements Runnable {
private void receivePong(NodeIdentifier fromID, Identifier rpcID) {
LOGGER.log(Level.INFO, "Received [PONG] from {0}",
new Object[] { fromID });
// This should be the answer to a prior PING -> mark this RPC ID as
// received
node.receivedRPC(fromID, rpcID);
}
@ -151,6 +172,8 @@ public class UDPHandler implements Runnable {
private void receiveNodes(NodeIdentifier fromID, Identifier rpcID) {
int numReceived = 0;
// This is just for the log message
StringBuilder nodes = new StringBuilder();
while (buffer.hasRemaining()) {
@ -160,6 +183,8 @@ public class UDPHandler implements Runnable {
numReceived++;
}
// This should be the answer to a prior FIND_NODE -> mark this RPC ID as
// received
node.receivedRPC(fromID, rpcID);
LOGGER.log(Level.INFO, "Received {0} [NODES] [{1}] from Node {2})",

View File

@ -72,6 +72,8 @@ public class Bucket {
@Override
public void onTimeout() {
LOGGER.log(Level.INFO, "Node didnt answer in time.");
// TODO: this should be propagated to the "upper" Routing
// Table, not just to this specific bucket
entries.remove(id);
}
});
@ -128,4 +130,16 @@ public class Bucket {
private boolean isLeaf() {
return left == null && right == null;
}
public void remove(NodeIdentifier node) {
if (isLeaf()) {
entries.remove(node);
} else {
if (node.isBitSetAt(level)) {
left.remove(node);
} else {
right.remove(node);
}
}
}
}

View File

@ -5,7 +5,7 @@ import java.util.Set;
import node.Identifier;
import node.NodeIdentifier;
public interface RoutingTable {
public interface IRoutingTable {
public void insert(NodeIdentifier id);

View File

@ -12,7 +12,7 @@ import node.Identifier;
import node.Node;
import node.NodeIdentifier;
public class RoutingTableImpl implements RoutingTable {
public class RoutingTableImpl implements IRoutingTable {
private Set<NodeIdentifier> entries = new HashSet<NodeIdentifier>();
private Bucket root;