From 8d580c10f2a1e2a92e73364441e84f5cbbb4478f Mon Sep 17 00:00:00 2001 From: senft-desktop Date: Tue, 18 Dec 2012 12:50:43 +0100 Subject: [PATCH] First draft: kademlia --- ws2012/P2P/uebungen/8/.classpath | 6 + ws2012/P2P/uebungen/8/.project | 17 ++ ws2012/P2P/uebungen/8/logging.properties | 8 + ws2012/P2P/uebungen/8/src/message/Ack.java | 105 ++++++++++ .../uebungen/8/src/message/MessageType.java | 12 ++ .../P2P/uebungen/8/src/node/Identifier.java | 58 ++++++ ws2012/P2P/uebungen/8/src/node/Node.java | 184 ++++++++++++++++++ .../P2P/uebungen/8/src/node/UDPHandler.java | 121 ++++++++++++ .../uebungen/8/src/routingtable/Bucket.java | 125 ++++++++++++ .../8/src/routingtable/BucketEntry.java | 24 +++ .../8/src/routingtable/RoutingTable.java | 27 +++ .../P2P/uebungen/8/src/util/BufferUtil.java | 28 +++ 12 files changed, 715 insertions(+) create mode 100644 ws2012/P2P/uebungen/8/.classpath create mode 100644 ws2012/P2P/uebungen/8/.project create mode 100644 ws2012/P2P/uebungen/8/logging.properties create mode 100644 ws2012/P2P/uebungen/8/src/message/Ack.java create mode 100644 ws2012/P2P/uebungen/8/src/message/MessageType.java create mode 100644 ws2012/P2P/uebungen/8/src/node/Identifier.java create mode 100644 ws2012/P2P/uebungen/8/src/node/Node.java create mode 100644 ws2012/P2P/uebungen/8/src/node/UDPHandler.java create mode 100644 ws2012/P2P/uebungen/8/src/routingtable/Bucket.java create mode 100644 ws2012/P2P/uebungen/8/src/routingtable/BucketEntry.java create mode 100644 ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java create mode 100644 ws2012/P2P/uebungen/8/src/util/BufferUtil.java diff --git a/ws2012/P2P/uebungen/8/.classpath b/ws2012/P2P/uebungen/8/.classpath new file mode 100644 index 00000000..fb501163 --- /dev/null +++ b/ws2012/P2P/uebungen/8/.classpath @@ -0,0 +1,6 @@ + + + + + + diff --git a/ws2012/P2P/uebungen/8/.project b/ws2012/P2P/uebungen/8/.project new file mode 100644 index 00000000..d8b439b6 --- /dev/null +++ b/ws2012/P2P/uebungen/8/.project @@ -0,0 +1,17 @@ + + + 8 + + + + + + org.eclipse.jdt.core.javabuilder + + + + + + org.eclipse.jdt.core.javanature + + diff --git a/ws2012/P2P/uebungen/8/logging.properties b/ws2012/P2P/uebungen/8/logging.properties new file mode 100644 index 00000000..641865ca --- /dev/null +++ b/ws2012/P2P/uebungen/8/logging.properties @@ -0,0 +1,8 @@ +handlers=java.util.logging.FileHandler, java.util.logging.ConsoleHandler + +tutego.level = ALL + +java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %3$s %4$s: %5$s %n + +java.util.logging.ConsoleHandler.level=ALL +java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter \ No newline at end of file diff --git a/ws2012/P2P/uebungen/8/src/message/Ack.java b/ws2012/P2P/uebungen/8/src/message/Ack.java new file mode 100644 index 00000000..378532e9 --- /dev/null +++ b/ws2012/P2P/uebungen/8/src/message/Ack.java @@ -0,0 +1,105 @@ +package message; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class Ack { + private final static Logger LOGGER = Logger.getLogger(Ack.class.getName()); + + // timeout in seconds + private final int TIMEOUT = 1000; + + private int id; + private SocketAddress address; + private ByteBuffer buf; + + private TimeoutThread timeout; + private volatile Thread thread; + + // The channel to re-send the message on + private DatagramChannel channel; + + public Ack(int id, SocketAddress address, DatagramChannel channel) { + this.id = id; + this.address = address; + this.channel = channel; + startThread(); + } + + private void startThread() { + LOGGER.log(Level.FINE, "Starting timeout thread for ack #" + id); + timeout = new TimeoutThread(); + thread = new Thread(timeout); + thread.start(); + } + + public int getID() { + return id; + } + + public boolean check(SocketAddress address) { + return this.address.toString().equals(address.toString()); + } + + public ByteBuffer getBuf() { + return buf; + } + + public void setBuf(ByteBuffer buf) { + this.buf = buf; + } + + public void setReceived() { + // Stop thread + try { + if (thread != null) { + timeout.terminate(); + thread.join(); + } + } catch (InterruptedException e) { + } + } + + private class TimeoutThread implements Runnable { + private volatile boolean notReceived = true; + + // When do we stop expecting an ack + private long timeToStop = System.currentTimeMillis() + TIMEOUT; + + @Override + public void run() { + while (notReceived && System.currentTimeMillis() < timeToStop) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + // Timeout hit -> re-send + if (notReceived) { + try { + LOGGER.log(Level.INFO, "Absent ack #{0}. Resending to {1}", + new Object[] { id, address.toString() }); + + channel.send(buf, address); + startThread(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public void terminate() { + notReceived = false; + } + } + + public SocketAddress getAddresse() { + return address; + } +} \ No newline at end of file diff --git a/ws2012/P2P/uebungen/8/src/message/MessageType.java b/ws2012/P2P/uebungen/8/src/message/MessageType.java new file mode 100644 index 00000000..eebc5a73 --- /dev/null +++ b/ws2012/P2P/uebungen/8/src/message/MessageType.java @@ -0,0 +1,12 @@ +package message; + + +public class MessageType { + public final static byte FIND_NODE = 48; // 0 + public final static byte PING = 49; + public final static byte PONG = 50; + + public final static byte ACK = 51; + + public final static byte NODES = 52; +} diff --git a/ws2012/P2P/uebungen/8/src/node/Identifier.java b/ws2012/P2P/uebungen/8/src/node/Identifier.java new file mode 100644 index 00000000..7bd748a7 --- /dev/null +++ b/ws2012/P2P/uebungen/8/src/node/Identifier.java @@ -0,0 +1,58 @@ +package node; + +import java.nio.ByteBuffer; +import java.util.Random; + +public class Identifier { + private static Random random = new Random(System.currentTimeMillis()); + + private int id; + private boolean bits[] = new boolean[Node.ID_BITS]; + + public Identifier(int id) { + this.id = id; + + String bitstring = Integer.toBinaryString(id); + for (int i = bitstring.length(); i > 0; i--) { + bits[bits.length - i] = bitstring.charAt(bitstring.length() - i) == '1'; + } + } + + public static Identifier getRandomIdentifier() { + int max_value = (int) Math.pow(2, Node.ID_BITS); + int id = (int) Math.abs((Math.round((max_value / 2) + + random.nextGaussian() * (max_value / 2)) % max_value)); + return new Identifier(id); + } + + public int getId() { + return id; + } + + public int distanceTo(Identifier otherID) { + return id ^ otherID.getId(); + } + + public boolean isBitSetAt(int index) { + return bits[index]; + } + + public byte[] getBytes() { + ByteBuffer result = ByteBuffer.allocate(4); + result.putInt(id); + return result.array(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Identifier)) { + return false; + } else { + return id == ((Identifier) o).getId(); + } + } + + public String toString() { + return String.valueOf(id); + } +} diff --git a/ws2012/P2P/uebungen/8/src/node/Node.java b/ws2012/P2P/uebungen/8/src/node/Node.java new file mode 100644 index 00000000..828c23ba --- /dev/null +++ b/ws2012/P2P/uebungen/8/src/node/Node.java @@ -0,0 +1,184 @@ +package node; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.LogManager; +import java.util.logging.Logger; + +import message.Ack; +import message.MessageType; +import routingtable.RoutingTable; + +public class Node { + /** + * Size of ID space + */ + public static final int ID_BITS = 8; + + /** + * The bucket size + */ + private static final int BUCKET_SIZE = 2; + + /** + * The first node is always spawned on port 9999 + */ + private static final int INITIAL_PORT = 9999; + private static final Identifier INITIAL_ID = new Identifier((int) Math.pow( + 2, ID_BITS) / 2); + + private final static Logger LOGGER = Logger.getLogger(Node.class.getName()); + + private InetSocketAddress address; + private DatagramChannel channel; + + private Map acks = new HashMap(); + + private Thread thread; + private UDPHandler udpListen; + + private Identifier nodeID = Identifier.getRandomIdentifier(); + + private RoutingTable rt = new RoutingTable(BUCKET_SIZE); + + public Node() { + System.setProperty("java.net.preferIPv4Stack", "true"); + + try { + channel = DatagramChannel.open(); + + try { + address = new InetSocketAddress("localhost", INITIAL_PORT); + channel.socket().bind(address); + + this.nodeID = INITIAL_ID; + } catch (SocketException e) { + // Port 9999 is already bound -> pick a random port + channel.socket().bind(new InetSocketAddress("localhost", 0)); + address = (InetSocketAddress) channel.getLocalAddress(); + } + + channel.configureBlocking(false); + + udpListen = new UDPHandler(this); + thread = new Thread(udpListen); + thread.start(); + + LOGGER.log(Level.INFO, "Initialized node {0} on {1}", new Object[] { + getName(), address.toString() }); + + if (address.getPort() != INITIAL_PORT) { + joinNetworkVia( + new InetSocketAddress("localhost", INITIAL_PORT), + INITIAL_ID); + } + + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void joinNetworkVia(InetSocketAddress receiver, Identifier id) { + LOGGER.log(Level.INFO, "Trying to join network via node {0} ({1})", + new Object[] { id, receiver }); + + rt.update(id, receiver); + // Send a FIND_NODE. The node to find is my own id (bootstrapping...) + sendFindNode(receiver, nodeID); + } + + void sendFindNode(InetSocketAddress receiver, Identifier idToFind) { + ByteBuffer buffer = ByteBuffer.allocate(5 + 4 + 4); + + Identifier rpc_id = Identifier.getRandomIdentifier(); + + buffer.put(MessageType.FIND_NODE); + buffer.put(nodeID.getBytes()); + buffer.put(rpc_id.getBytes()); + buffer.put(idToFind.getBytes()); + + if (send(buffer, receiver)) { + LOGGER.log(Level.INFO, "Sending [FIND_NODE {0}] to node {1}", + new Object[] { idToFind, receiver }); + } + } + + void sendClosestNodesTo(InetSocketAddress receiver, Identifier receiverId, + Identifier idToFind, Identifier rpc_id) { + + ByteBuffer buffer = ByteBuffer.allocate(5 + 4 + 4); + + buffer.put(MessageType.NODES); + buffer.put(nodeID.getBytes()); + buffer.put(rpc_id.getBytes()); + + if (send(buffer, receiver)) { + LOGGER.log(Level.INFO, + "Sending [NODES XX] to node {0}. TODO: not implemented", + new Object[] { receiverId }); + } + } + + private boolean send(ByteBuffer buffer, InetSocketAddress to) { + buffer.flip(); + try { + channel.send(buffer, to); + return true; + } catch (IOException e) { + e.printStackTrace(); + return false; + } + } + + public String getName() { + return nodeID.toString(); + } + + public boolean hasAcks() { + return !acks.isEmpty(); + } + + protected boolean hasAck(int ack_id) { + return acks.containsKey(ack_id); + } + + protected Ack getAck(int ack_id) { + return acks.get(ack_id); + } + + protected void removeAck(int ack_id) { + acks.remove(ack_id).setReceived(); + } + + public DatagramChannel getChannel() { + return channel; + } + + public void updateBuckets(Identifier id, InetSocketAddress from) { + rt.update(id, from); + } + + public Identifier getID() { + return nodeID; + } + + public static void main(String[] args) { + System.setProperty("java.util.logging.config.file", + "logging.properties"); + + try { + LogManager.getLogManager().readConfiguration(); + } catch (Exception e) { + e.printStackTrace(); + } + + new Node(); + } +} diff --git a/ws2012/P2P/uebungen/8/src/node/UDPHandler.java b/ws2012/P2P/uebungen/8/src/node/UDPHandler.java new file mode 100644 index 00000000..f8adff43 --- /dev/null +++ b/ws2012/P2P/uebungen/8/src/node/UDPHandler.java @@ -0,0 +1,121 @@ +package node; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.logging.Level; +import java.util.logging.Logger; + +import message.MessageType; + +public class UDPHandler implements Runnable { + private final static Logger LOGGER = Logger.getLogger(UDPHandler.class + .getName()); + + public static final int BUF_SIZE = 512; + + private volatile boolean running = true; + private ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); + + private Node node; + + public UDPHandler(Node node) { + this.node = node; + } + + /** + * Takes the buffer of this UDPHandler as is and tries to read an IP address + * (4 bytes and 1 int) from it. If there is no/incomplete or wrong data, + * this will fail. + * + * @return the address that has been read + */ + private InetSocketAddress readIPFromBuffer() { + StringBuilder theAddr = new StringBuilder(); + // Read 4 Bytes and 1 Integer = 1 IP address + for (int i = 0; i < 4; i++) { + theAddr.append(buffer.get()); + if (i < 3) { + theAddr.append("."); + } + } + int port = buffer.getInt(); + return new InetSocketAddress(theAddr.toString(), port); + } + + public void run() { + InetSocketAddress from = null; + + // Run until I get killed, and all my acks have been answered + while (running || node.hasAcks()) { + try { + from = (InetSocketAddress) node.getChannel().receive(buffer); + + // channel.receive() is non-blocking. So we need to check if + // something actually has been written to the buffer + if (buffer.remaining() != BUF_SIZE) { + buffer.flip(); + + byte messageType = buffer.get(); + + // TODO: this should read exactly ID_BITS bits not just an + // int + Identifier fromID = new Identifier(buffer.getInt()); + Identifier rpc_id = new Identifier(buffer.getInt()); + + switch (messageType) { + case MessageType.FIND_NODE: + receiveFindNode(from, fromID, rpc_id); + break; + case MessageType.NODES: + receiveNodes(from, fromID, rpc_id); + break; + case MessageType.PING: + LOGGER.log(Level.INFO, "Received [ping] from {0}", + new Object[] { from.toString() }); + break; + default: + LOGGER.log(Level.INFO, + "Received unknown command from {0}: [{1}]{2}", + new Object[] { from.toString(), messageType, + new String(buffer.array()) }); + } + + node.updateBuckets(fromID, from); + } else { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + buffer.clear(); + + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + private void receiveNodes(InetSocketAddress from, Identifier fromID, + Identifier rpc_id) { + + LOGGER.log(Level.INFO, "Received [NODES XX] from Node {0} ({1})", + new Object[] { fromID, from.toString() }); + } + + private void receiveFindNode(InetSocketAddress from, Identifier fromID, + Identifier rpc_id) { + Identifier idToFind = new Identifier(buffer.getInt()); + + LOGGER.log(Level.INFO, "Received [FIND_NODE {0}] from Node {1} ({2})", + new Object[] { idToFind, fromID, from.toString() }); + + node.sendClosestNodesTo(from, fromID, idToFind, rpc_id); + } + + public void terminate() { + running = false; + } +} diff --git a/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java b/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java new file mode 100644 index 00000000..d9270054 --- /dev/null +++ b/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java @@ -0,0 +1,125 @@ +package routingtable; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import node.Identifier; + +public class Bucket { + private final static Logger LOGGER = Logger.getLogger(Bucket.class + .getName()); + + private int size; + + public List entrys = new ArrayList(); + public Bucket left = null; + public Bucket right = null; + + private int level = 0; + + public Bucket(int size, int level) { + this.level = level; + this.size = size; + } + + public void update(Identifier id, InetSocketAddress address) { + update(this, id, address); + } + + private void update(Bucket bucket, Identifier id, InetSocketAddress address) { + if (!bucket.isLeaf()) { + if (id.isBitSetAt(level)) { + + update(bucket.right, id, address); + + } else { + + update(bucket.left, id, address); + + } + } else if (bucket.isLeaf() && bucket.hasSpace()) { + + BucketEntry newEntry = new BucketEntry(id, address); + + if (entrys.contains(newEntry)) { + // Move to beginning + LOGGER.log( + Level.INFO, + "Node {0} ({1}) already in routing table. Move to end of bucket.", + new Object[] { id, address }); + entrys.remove(newEntry); + entrys.add(newEntry); + } else { + LOGGER.log(Level.INFO, + "Added new node {0} ({1}) to routing table.", + new Object[] { id, address }); + entrys.add(newEntry); + } + + } else { + // Leaf, but full -> split + + Bucket newLeft = new Bucket(size, level + 1); + Bucket newRight = new Bucket(size, level + 1); + + // Add the new entry and in the following loop distribute all + // existing entries to left/right + entrys.add(new BucketEntry(id, address)); + + for (BucketEntry entry : entrys) { + if (entry.id.isBitSetAt(level)) { + newLeft.entrys + .add(new BucketEntry(entry.id, entry.address)); + } else { + newRight.entrys + .add(new BucketEntry(entry.id, entry.address)); + } + } + bucket.entrys = null; + bucket.left = newLeft; + bucket.right = newRight; + + } + } + + public boolean hasNode(Identifier id) { + return hasNode(this, id); + } + + private boolean hasNode(Bucket bucket, Identifier idToFind) { + if (bucket.isLeaf()) { + for (BucketEntry entry : bucket.entrys) { + if (entry.id.equals(idToFind)) { + return true; + } + } + return false; + } else { + if (idToFind.isBitSetAt(level)) { + return bucket.hasNode(bucket.left, idToFind); + } else { + return bucket.hasNode(bucket.right, idToFind); + } + } + } + + private boolean isLeaf() { + return left == null && right == null; + } + + private boolean hasSpace() { + return entrys.size() < size; + } + + public List getClosestNodesTo(Identifier id) { + List result = new ArrayList(); + + // TODO + + return result; + } + +} \ No newline at end of file diff --git a/ws2012/P2P/uebungen/8/src/routingtable/BucketEntry.java b/ws2012/P2P/uebungen/8/src/routingtable/BucketEntry.java new file mode 100644 index 00000000..47e1c191 --- /dev/null +++ b/ws2012/P2P/uebungen/8/src/routingtable/BucketEntry.java @@ -0,0 +1,24 @@ +package routingtable; + +import java.net.InetSocketAddress; + +import node.Identifier; + +public class BucketEntry { + public Identifier id; + public InetSocketAddress address; + + public BucketEntry(Identifier id, InetSocketAddress address) { + this.id = id; + this.address = address; + } + + public boolean equals(Object o) { + if (!(o instanceof BucketEntry)) { + return false; + } else { + BucketEntry entry = (BucketEntry) o; + return id.equals(entry.id) && address.equals(entry.address); + } + } +} diff --git a/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java b/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java new file mode 100644 index 00000000..56287a9e --- /dev/null +++ b/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java @@ -0,0 +1,27 @@ +package routingtable; + +import java.net.InetSocketAddress; +import java.util.List; + +import node.Identifier; + +public class RoutingTable { + + private Bucket tree; + + public RoutingTable(int bucketSize) { + this.tree = new Bucket(bucketSize, 0); + } + + public void update(Identifier id, InetSocketAddress address) { + tree.update(id, address); + } + + public List getClosestNodesTo(Identifier id) { + return tree.getClosestNodesTo(id); + } + + public void remove(Identifier node) { + + } +} \ No newline at end of file diff --git a/ws2012/P2P/uebungen/8/src/util/BufferUtil.java b/ws2012/P2P/uebungen/8/src/util/BufferUtil.java new file mode 100644 index 00000000..182ce91a --- /dev/null +++ b/ws2012/P2P/uebungen/8/src/util/BufferUtil.java @@ -0,0 +1,28 @@ +package util; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +public class BufferUtil { + + public static ByteBuffer clone(ByteBuffer original) { + ByteBuffer clone = ByteBuffer.allocate(original.capacity()); + + int oldPosition = original.position(); + original.rewind();// copy from the beginning + clone.put(original); + // original.rewind(); + original.position(oldPosition); + clone.flip(); + return clone; + } + + public static byte[] addrToBytes(InetSocketAddress addr) { + ByteBuffer buffer = ByteBuffer.allocate(8); + for (String part : addr.getHostString().split("\\.")) { + buffer.put(Byte.valueOf(part)); + } + buffer.putInt(addr.getPort()); + return buffer.array(); + } +}