diff --git a/ws2012/P2P/uebungen/11/build.xml b/ws2012/P2P/uebungen/11/build.xml new file mode 100644 index 00000000..e9521c06 --- /dev/null +++ b/ws2012/P2P/uebungen/11/build.xml @@ -0,0 +1,74 @@ + + + + + + + + + + + Builds, tests, and runs the project p2p_u5. + + + diff --git a/ws2012/P2P/uebungen/11/logging.properties b/ws2012/P2P/uebungen/11/logging.properties new file mode 100644 index 00000000..2157f57d --- /dev/null +++ b/ws2012/P2P/uebungen/11/logging.properties @@ -0,0 +1,7 @@ +handlers=java.util.logging.ConsoleHandler + +.level=FINEST + +java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %3$s %4$s: %5$s %n +java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter +java.util.logging.ConsoleHandler.level = FINEST diff --git a/ws2012/P2P/uebungen/11/manifest.mf b/ws2012/P2P/uebungen/11/manifest.mf new file mode 100644 index 00000000..328e8e5b --- /dev/null +++ b/ws2012/P2P/uebungen/11/manifest.mf @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +X-COMMENT: Main-Class will be added automatically by build + diff --git a/ws2012/P2P/uebungen/11/nbproject/build-impl.xml b/ws2012/P2P/uebungen/11/nbproject/build-impl.xml new file mode 100644 index 00000000..89bd7f22 --- /dev/null +++ b/ws2012/P2P/uebungen/11/nbproject/build-impl.xml @@ -0,0 +1,924 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must set platform.home + Must set platform.bootcp + Must set platform.java + Must set platform.javac + + The J2SE Platform is not correctly set up. + Your active platform is: ${platform.active}, but the corresponding property "platforms.${platform.active}.home" is not found in the project's properties files. + Either open the project in the IDE and setup the Platform with the same name or add it manually. + For example like this: + ant -Duser.properties.file=<path_to_property_file> jar (where you put the property "platforms.${platform.active}.home" in a .properties file) + or ant -Dplatforms.${platform.active}.home=<path_to_JDK_home> jar (where no properties file is used) + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must set src.dir + Must set build.dir + Must set dist.dir + Must set build.classes.dir + Must set dist.javadoc.dir + Must set build.test.classes.dir + Must set build.test.results.dir + Must set build.classes.excludes + Must set dist.jar + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must set javac.includes + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must select some files in the IDE or set javac.includes + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + To run this application from the command line without Ant, try: + + + + + + + ${platform.java} -cp "${run.classpath.with.dist.jar}" ${main.class} + + + + + + + + + + + + To run this application from the command line without Ant, try: + + ${platform.java} -jar "${dist.jar.resolved}" + + + + + + + + To run this application from the command line without Ant, try: + + ${platform.java} -jar "${dist.jar.resolved}" + + + + + + + + + + + + + + + + + + + Must select one file in the IDE or set run.class + + + + Must select one file in the IDE or set run.class + + + + + + + + + + + + + + + + + + + + + + + Must select one file in the IDE or set debug.class + + + + + Must select one file in the IDE or set debug.class + + + + + Must set fix.includes + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must select some files in the IDE or set javac.includes + + + + + + + + + + + + + + + + + + Some tests failed; see details above. + + + + + + + + + Must select some files in the IDE or set test.includes + + + + Some tests failed; see details above. + + + + + Must select one file in the IDE or set test.class + + + + + + + + + + + + + + + + + + + + + + + + + + + Must select one file in the IDE or set applet.url + + + + + + + + + Must select one file in the IDE or set applet.url + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/ws2012/P2P/uebungen/11/nbproject/genfiles.properties b/ws2012/P2P/uebungen/11/nbproject/genfiles.properties new file mode 100644 index 00000000..d26104a8 --- /dev/null +++ b/ws2012/P2P/uebungen/11/nbproject/genfiles.properties @@ -0,0 +1,8 @@ +build.xml.data.CRC32=b48d6d8f +build.xml.script.CRC32=d537a005 +build.xml.stylesheet.CRC32=28e38971@1.38.3.45 +# This file is used by a NetBeans-based IDE to track changes in generated files such as build-impl.xml. +# Do not edit this file. You may delete it but then the IDE will never regenerate such files for you. +nbproject/build-impl.xml.data.CRC32=b48d6d8f +nbproject/build-impl.xml.script.CRC32=0fa0a410 +nbproject/build-impl.xml.stylesheet.CRC32=229523de@1.38.3.45 diff --git a/ws2012/P2P/uebungen/11/nbproject/private/config.properties b/ws2012/P2P/uebungen/11/nbproject/private/config.properties new file mode 100644 index 00000000..e69de29b diff --git a/ws2012/P2P/uebungen/11/nbproject/private/private.properties b/ws2012/P2P/uebungen/11/nbproject/private/private.properties new file mode 100644 index 00000000..86cef05c --- /dev/null +++ b/ws2012/P2P/uebungen/11/nbproject/private/private.properties @@ -0,0 +1,6 @@ +compile.on.save=true +do.depend=false +do.jar=true +javac.debug=true +javadoc.preview=true +user.properties.file=C:\\Users\\rylon\\.netbeans\\6.9\\build.properties diff --git a/ws2012/P2P/uebungen/11/nbproject/project.properties b/ws2012/P2P/uebungen/11/nbproject/project.properties new file mode 100644 index 00000000..71db5e9f --- /dev/null +++ b/ws2012/P2P/uebungen/11/nbproject/project.properties @@ -0,0 +1,81 @@ +annotation.processing.enabled=true +annotation.processing.enabled.in.editor=false +annotation.processing.run.all.processors=true +annotation.processing.source.output=${build.generated.sources.dir}/ap-source-output +application.title=p2p_u5 +application.vendor=rylon +build.classes.dir=${build.dir}/classes +build.classes.excludes=**/*.java,**/*.form +# This directory is removed when the project is cleaned: +build.dir=build +build.generated.dir=${build.dir}/generated +build.generated.sources.dir=${build.dir}/generated-sources +# Only compile against the classpath explicitly listed here: +build.sysclasspath=ignore +build.test.classes.dir=${build.dir}/test/classes +build.test.results.dir=${build.dir}/test/results +# Uncomment to specify the preferred debugger connection transport: +#debug.transport=dt_socket +debug.classpath=\ + ${run.classpath} +debug.test.classpath=\ + ${run.test.classpath} +# This directory is removed when the project is cleaned: +dist.dir=dist +dist.jar=${dist.dir}/p2p_u5.jar +dist.javadoc.dir=${dist.dir}/javadoc +endorsed.classpath= +excludes= +file.reference.11-src=src +includes=** +jar.archive.disabled=${jnlp.enabled} +jar.compress=false +jar.index=${jnlp.enabled} +javac.classpath= +# Space-separated list of extra javac options +javac.compilerargs= +javac.deprecation=false +javac.processorpath=\ + ${javac.classpath} +javac.source=1.7 +javac.target=1.7 +javac.test.classpath=\ + ${javac.classpath}:\ + ${build.classes.dir}:\ + ${libs.junit.classpath}:\ + ${libs.junit_4.classpath} +javac.test.processorpath=\ + ${javac.test.classpath} +javadoc.additionalparam= +javadoc.author=false +javadoc.encoding=${source.encoding} +javadoc.noindex=false +javadoc.nonavbar=false +javadoc.notree=false +javadoc.private=false +javadoc.splitindex=true +javadoc.use=true +javadoc.version=false +javadoc.windowtitle= +jnlp.codebase.type=no.codebase +jnlp.descriptor=application +jnlp.enabled=false +jnlp.mixed.code=defaut +jnlp.offline-allowed=false +jnlp.signed=false +main.class=CLI +manifest.file=manifest.mf +meta.inf.dir=${src.dir}/META-INF +platform.active=JDK_1.7 +run.classpath=\ + ${javac.classpath}:\ + ${build.classes.dir} +# Space-separated list of JVM arguments used when running the project +# (you may also define separate properties like run-sys-prop.name=value instead of -Dname=value +# or test-sys-prop.name=value to set system properties for unit tests): +run.jvmargs= +run.test.classpath=\ + ${javac.test.classpath}:\ + ${build.test.classes.dir} +source.encoding=UTF-8 +src.dir=${file.reference.11-src} diff --git a/ws2012/P2P/uebungen/11/nbproject/project.xml b/ws2012/P2P/uebungen/11/nbproject/project.xml new file mode 100644 index 00000000..26766c71 --- /dev/null +++ b/ws2012/P2P/uebungen/11/nbproject/project.xml @@ -0,0 +1,14 @@ + + + org.netbeans.modules.java.j2seproject + + + p2p_u5 + + + + + + + + diff --git a/ws2012/P2P/uebungen/11/src/CLI.java b/ws2012/P2P/uebungen/11/src/CLI.java new file mode 100644 index 00000000..b6cd5db7 --- /dev/null +++ b/ws2012/P2P/uebungen/11/src/CLI.java @@ -0,0 +1,54 @@ +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.logging.LogManager; + +import node.Node; +import node.NodeIdentifier; + +public class CLI { + + public static void main(String[] args) throws IOException { + System.setProperty("java.util.logging.config.file", + "logging.properties"); + + try { + LogManager.getLogManager().readConfiguration(); + } catch (Exception e) { + e.printStackTrace(); + } + + Node node = new Node(); + + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + String s; + while ((s = in.readLine()) != null && s.length() != 0) { + String[] splitted = s.split(" "); + + String cmd = splitted[0]; + + switch (cmd) { + case "status": + for (NodeIdentifier id : node.getNeighbors()) { + System.out.println(id); + } + break; + case "lookup": + // TODO not implemented + if (splitted.length < 2) { + System.out.println("Too few arguments."); + } else { + String key = splitted[1]; + } + System.out.println("not implemented"); + break; + case "leave": + node.leave(); + break; + default: + System.out.println("Unknown command."); + break; + } + } + } +} \ No newline at end of file diff --git a/ws2012/P2P/uebungen/11/src/message/Ack.java b/ws2012/P2P/uebungen/11/src/message/Ack.java new file mode 100644 index 00000000..b274fd90 --- /dev/null +++ b/ws2012/P2P/uebungen/11/src/message/Ack.java @@ -0,0 +1,140 @@ +package message; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.util.logging.Level; +import java.util.logging.Logger; + +import node.Identifier; +import node.NodeIdentifier; +import util.BufferUtil; + +public class Ack { + private final static Logger LOGGER = Logger.getLogger(Ack.class.getName()); + + /** + * timeout in seconds + */ + private static final int TIMEOUT = 1000; + + /** + * Maximum number of retries + */ + private static final int MAX_RETRIES = 3; + + private Identifier rpcId; + + private NodeIdentifier receiver; + + private ByteBuffer buffer; + + private int numRetries = 0; + + private TimeoutThread timeout; + private Thread thread; + + // The channel to re-send the message on + private DatagramChannel channel; + + private MessageCallback callback; + + public Ack(Identifier id, NodeIdentifier receiver, DatagramChannel channel, + ByteBuffer buffer, MessageCallback cb) { + this.rpcId = id; + this.receiver = receiver; + this.channel = channel; + this.buffer = BufferUtil.clone(buffer); + this.callback = cb; + startThread(); + } + + private void startThread() { + LOGGER.log(Level.FINEST, "Starting timeout thread for RPC " + rpcId); + timeout = new TimeoutThread(); + thread = new Thread(timeout); + thread.start(); + } + + public Identifier getID() { + return rpcId; + } + + public boolean check(NodeIdentifier fromID) { + return fromID.equals(receiver); + } + + public ByteBuffer getBuf() { + return buffer; + } + + public void setBuf(ByteBuffer buf) { + this.buffer = buf; + } + + public void setReceived() { + // Stop thread + try { + if (thread != null) { + timeout.terminate(); + thread.join(); + } + } catch (InterruptedException e) { + } + } + + private class TimeoutThread implements Runnable { + private volatile boolean notReceived = true; + + // When do we stop expecting the ack + private long timeToStop = System.currentTimeMillis() + TIMEOUT; + + @Override + public void run() { + while (notReceived && System.currentTimeMillis() < timeToStop) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + // Timeout hit! + + if (notReceived) { + + if (numRetries < MAX_RETRIES) { + try { + LOGGER.log( + Level.FINE, + "Didn't receive RPC Ack {0} by now. Resending... ", + new Object[] { rpcId }); + channel.send(buffer, receiver.getAddress()); + } catch (IOException e) { + e.printStackTrace(); + } + + startThread(); + numRetries++; + } else { + + LOGGER.log(Level.INFO, "Absent RPC ack {0}.", + new Object[] { rpcId }); + + if (callback != null) { + callback.onTimeout(); + } + } + } else { + // Message has been received in time + if (callback != null) { + callback.onReceive(); + } + } + } + + public void terminate() { + notReceived = false; + } + } +} \ No newline at end of file diff --git a/ws2012/P2P/uebungen/11/src/message/MessageCallback.java b/ws2012/P2P/uebungen/11/src/message/MessageCallback.java new file mode 100644 index 00000000..d88b3920 --- /dev/null +++ b/ws2012/P2P/uebungen/11/src/message/MessageCallback.java @@ -0,0 +1,22 @@ +package message; + +/** + * A callback to create asynchronous events that get triggered when a message + * (ack/answer) is received. + * + * @author jln + * + */ +public interface MessageCallback { + + /** + * Called when the awaited message arrives. + */ + public void onReceive(); + + /** + * Called when the awaited message doesn't arrive (even after possible + * retries). + */ + public void onTimeout(); +} diff --git a/ws2012/P2P/uebungen/11/src/message/MessageType.java b/ws2012/P2P/uebungen/11/src/message/MessageType.java new file mode 100644 index 00000000..c9b84e92 --- /dev/null +++ b/ws2012/P2P/uebungen/11/src/message/MessageType.java @@ -0,0 +1,15 @@ +package message; + +public class MessageType { + public final static byte FIND_NODE = 0; + public final static byte NODES = 1; + + public final static byte PING = 10; + public final static byte PONG = 11; + + public final static byte LEAVE = 2; + + public final static byte FIND_VALUE = 4; + public final static byte STORE = 5; + public final static byte DATA = 6; +} diff --git a/ws2012/P2P/uebungen/11/src/node/Identifier.java b/ws2012/P2P/uebungen/11/src/node/Identifier.java new file mode 100644 index 00000000..0b335fc5 --- /dev/null +++ b/ws2012/P2P/uebungen/11/src/node/Identifier.java @@ -0,0 +1,111 @@ +package node; + +import java.math.BigInteger; +import java.util.BitSet; +import java.util.Random; + +/** + * A Kademlia identifier. Can be used for identifying files as well as nodes + * (but for nodes check {@see NodeIdentifier}). + * + * @author jln + * + */ +public class Identifier { + private static Random random = new Random(System.currentTimeMillis()); + + protected BitSet bits; + + private int size; + + public Identifier(int size, byte[] bytes) { + this.size = size; + this.bits = BitSet.valueOf(bytes); + } + + private Identifier(int size, BitSet bits) { + this.size = size; + this.bits = bits; + } + + /** + * Creates an ID exactly "in the middle" of the ID space. (If the ID space + * is 8 bit wide, this returns an ID valued 128). + * + * @param size + * the size of the id space + * @return an Identifier + */ + public static Identifier getStaticIdentifier(int size) { + BitSet middle = new BitSet(size); + middle.set(size - 1); + return new Identifier(size, middle); + } + + /** + * Creates a random ID for the given id space size. + * + * @param size + * the size of the id space + * @return a random Identifier + */ + public static Identifier getRandomIdentifier(int size) { + BitSet bits = new BitSet(size); + + for (int i = 0; i < size; i++) { + double threshold = random.nextGaussian(); + if (threshold > 0) { + bits.set(i); + } + } + + return new Identifier(size, bits); + } + + public BigInteger distanceTo(Identifier otherID) { + BitSet distance = (BitSet) bits.clone(); + distance.xor(otherID.bits); + return new BigInteger(1, distance.toByteArray()); + } + + /** + * Returns whether the bit at the given position is set or not. The MSB is + * at position 0. + * + * @param index + * the index to check + * @return true if the bit is set + */ + public boolean isBitSetAt(int index) { + BigInteger intValue = new BigInteger(1, bits.toByteArray()); + int numOfTrimmedZeros = size - intValue.bitLength(); + + if (index < numOfTrimmedZeros) { + return false; + } + + return bits.get(bits.length() - (index + numOfTrimmedZeros) - 1); + } + + public byte[] getBytes() { + return bits.toByteArray(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Identifier)) { + return false; + } else { + return bits.equals(((Identifier) o).bits); + } + } + + @Override + public int hashCode() { + return toString().hashCode(); + } + + public String toString() { + return new BigInteger(1, bits.toByteArray()).toString(); + } +} diff --git a/ws2012/P2P/uebungen/11/src/node/Node.java b/ws2012/P2P/uebungen/11/src/node/Node.java new file mode 100644 index 00000000..569fc260 --- /dev/null +++ b/ws2012/P2P/uebungen/11/src/node/Node.java @@ -0,0 +1,341 @@ +package node; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import message.Ack; +import message.MessageCallback; +import message.MessageType; +import routingtable.IRoutingTable; +import routingtable.RoutingTableImpl; + +public class Node { + private final static Logger LOGGER = Logger.getLogger(Node.class.getName()); + + /** + * Size of ID space (has to be a multiple of 8) + */ + public static final int ID_BITS = 8; + /** + * The bucket size + */ + public static final int BUCKET_SIZE = 2; + /** + * The first node is always spawned on port 50000 + */ + private static final int INITIAL_PORT = 50000; + private static final Identifier INITIAL_ID = Identifier.getStaticIdentifier(ID_BITS); + private static final int BUFFER_SIZE = 512; + /** + * The size of an IP address (in bytes) + */ + public static final int SIZE_IP_ADDRESS = 8; + + private InetSocketAddress address; + private DatagramChannel channel; + + private Map> rpcs = new HashMap>(); + + private Thread thread; + private UDPHandler udpListen; + + private Identifier nodeID = Identifier.getRandomIdentifier(ID_BITS); + private IRoutingTable routingTable = new RoutingTableImpl(BUCKET_SIZE, this); + + public Node() { + System.setProperty("java.net.preferIPv4Stack", "true"); + + try { + channel = DatagramChannel.open(); + + try { + address = new InetSocketAddress("localhost", INITIAL_PORT); + channel.socket().bind(address); + + this.nodeID = INITIAL_ID; + } catch (SocketException e) { + // The initial port is already bound -> let the system pick a + // port + channel.socket().bind(new InetSocketAddress("localhost", 0)); + address = (InetSocketAddress) channel.getLocalAddress(); + } + + channel.configureBlocking(false); + + udpListen = new UDPHandler(this); + thread = new Thread(udpListen); + thread.start(); + + LOGGER.log(Level.INFO, "{0}: Initialized node {1} on {2}", new Object[] { this.nodeID, getName(), address.toString() }); + + if (address.getPort() != INITIAL_PORT) { + // The port of this node is not the "INITIAL_PORT" (so it's not + // the first node in the network). So we try to join the network + // via the first node. + NodeIdentifier viaNode = new NodeIdentifier(ID_BITS, + INITIAL_ID.getBytes(), new InetSocketAddress( + "127.0.0.1", INITIAL_PORT)); + joinNetworkVia(viaNode); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void joinNetworkVia(NodeIdentifier viaNode) { + LOGGER.log(Level.INFO, "Trying to join network via node {0}", + new Object[] { viaNode }); + + routingTable.insert(viaNode); + sendFindNode(viaNode, this.nodeID); + } + + /** + * Creates and returns new ID (usually used as a RPC ID). This makes sure + * the ID is not yet used (in this node). + * + * @return an ID + */ + private Identifier createRPCID() { + Identifier rpcID = Identifier.getRandomIdentifier(ID_BITS); + while (rpcs.containsKey(rpcID)) { + rpcID = Identifier.getRandomIdentifier(ID_BITS); + } + return rpcID; + } + + void sendFindNode(NodeIdentifier receiver, Identifier idToFind) { + boolean successful = send(receiver, MessageType.FIND_NODE, + idToFind.getBytes(), true, null); + + if (successful) { + LOGGER.log(Level.INFO, "Sending [FIND_NODE {0}] to node {1}", + new Object[] { idToFind, receiver }); + } + } + + void sendFindValue(NodeIdentifier receiver, Identifier idToFind) { + boolean successful = send(receiver, MessageType.FIND_VALUE, + idToFind.getBytes(), true, null); + + if (successful) { + LOGGER.log(Level.INFO, "Sending [FIND_VALUE {0}] to node {1}", + new Object[] { idToFind, receiver }); + } + } + + /** + * Gets all nodes of this nodes routing table, that a close to a given node + * and sends that list to a specific node. + * + * @param receiver + * The node to receive the list of nodes + * @param idToFind + * The ID to find close nodes of + * @param rpcID + * An RPC ID (because this is always an answer to a FIND_NODE + * RPC) + */ + void sendClosestNodesTo(NodeIdentifier receiver, Identifier idToFind, Identifier rpcID) { + //TODO modify to match FIND_VALUE + Set closeNodes = routingTable.getClosestNodesTo(idToFind); + int numNodes = closeNodes.size(); + + ByteBuffer nodes = ByteBuffer.allocate(numNodes * (ID_BITS / 8) + + numNodes * SIZE_IP_ADDRESS); + + for (NodeIdentifier idToSend : closeNodes) { + // Don't send the node to itself + if (!receiver.equals(idToSend)) { + nodes.put(idToSend.getTripleAsBytes()); + } + } + + boolean successful = send(receiver, MessageType.NODES, rpcID, + nodes.array(), false, null); + + if (successful) { + LOGGER.log( + Level.INFO, + "Sending {0} nodes to to node {1} [FIND_NODE {2}] (rpcID={3})", + new Object[] { closeNodes.size(), receiver, idToFind, rpcID }); + } + } + + public void sendPing(NodeIdentifier receiver, MessageCallback cb) { + boolean successful = send(receiver, MessageType.PING, null, true, cb); + + if (successful) { + LOGGER.log(Level.INFO, "Sending [PING] to node {0}", + new Object[] { receiver }); + } + } + + void sendPong(NodeIdentifier receiver, Identifier rpcID) { + boolean successful = send(receiver, MessageType.PONG, rpcID, null, + false, null); + + if (successful) { + LOGGER.log(Level.INFO, "Sending [PONG] to {0} (rpcID={1})", + new Object[] { receiver, rpcID }); + } + } + + /** + * Send a message to a given ID (with a given RPC ID). You usually want to + * use this method when you know the RPC ID beforehand (e.g. if this is an + * ack or answer to a prior message). + * + * @param to + * the ID to send to + * @param messageType + * the message type + * @param data + * the data to send + * @param reliable + * flag, whether this has to be acked or not + * @param cb + * A callback that is executed when this message gets acked (or + * answered). This obviously is only of interest when the + * reliable flag is true + * @return true if the message was sent successfully + */ + private boolean send(NodeIdentifier to, byte messageType, byte[] data, + boolean reliable, MessageCallback cb) { + return send(to, messageType, createRPCID(), data, reliable, cb); + } + + /** + * Send a message to a given ID (with a given RPC ID). You usually want to + * use this method when you know the RPC ID beforehand (e.g. if this is an + * ack or answer to a prior message). + * + * @param to + * the ID to send to + * @param messageType + * the message type + * @param rpcID + * the RPC ID of this message (if you don't know this use + * {@link #send(NodeIdentifier, byte, byte[], boolean, MessageCallback)} + * and a new random ID will be created) + * @param data + * the data to send + * @param reliable + * flag, whether this has to be acked or not + * @param cb + * A callback that is executed when this message gets acked (or + * answered). This obviously is only of interest when the + * reliable flag is true + * @return true if the message was sent successfully + */ + private boolean send(NodeIdentifier to, byte messageType, Identifier rpcID, + byte[] data, boolean reliable, MessageCallback cb) { + + boolean successful = true; + ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); + + buffer.put(messageType); + buffer.put(this.nodeID.getBytes()); + buffer.put(rpcID.getBytes()); + + if (data != null) { + buffer.put(data); + } + + buffer.flip(); + + try { + + channel.send(buffer, to.getAddress()); + + } catch (IOException e) { + + LOGGER.log(Level.SEVERE, "Failed to write to channel", e); + successful = false; + + } finally { + // Even if an exception occurred this should be reliable + if (reliable) { + + Ack newAck = new Ack(rpcID, to, channel, buffer, cb); + if (rpcs.containsKey(rpcID)) { + rpcs.get(rpcID).add(newAck); + } else { + rpcs.put(rpcID, new ArrayList()); + rpcs.get(rpcID).add(newAck); + } + } + } + return successful; + } + + public String getName() { + return nodeID.toString(); + } + + public boolean hasAcks() { + return !rpcs.isEmpty(); + } + + public DatagramChannel getChannel() { + return channel; + } + + public void updateBuckets(NodeIdentifier id) { + routingTable.insert(id); + } + + public Identifier getID() { + return nodeID; + } + + public Set getNeighbors() { + return routingTable.getEntries(); + } + + public boolean receivedRPC(NodeIdentifier fromID, Identifier rpcID) { + List rpcsFromID = rpcs.get(rpcID); + boolean removedAck = false; + + for (Ack ack : rpcsFromID) { + if (ack.check(fromID)) { + ack.setReceived(); + rpcsFromID.remove(ack); + removedAck = true; + + LOGGER.log(Level.FINEST, "Received RPC ack " + rpcID); + + break; + } + } + + if (!removedAck) { + LOGGER.log(Level.WARNING, + "Received RPC ack {0}, but didn't expect that", + new Object[] { rpcID }); + } + + return removedAck; + } + + public void leave() { + for (NodeIdentifier n : getNeighbors()) { + sendLeave(n); + } + System.exit(0); + } + + private boolean sendLeave(NodeIdentifier n) { + return send(n, MessageType.LEAVE, null, false, null); + } +} diff --git a/ws2012/P2P/uebungen/11/src/node/NodeIdentifier.java b/ws2012/P2P/uebungen/11/src/node/NodeIdentifier.java new file mode 100644 index 00000000..37ca438f --- /dev/null +++ b/ws2012/P2P/uebungen/11/src/node/NodeIdentifier.java @@ -0,0 +1,35 @@ +package node; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +import util.BufferUtil; + +/** + * Same as a {@link Identifier}, but this also stores an IP address. + * + * @author jln + * + */ +public class NodeIdentifier extends Identifier { + + private InetSocketAddress address; + + public NodeIdentifier(int size, byte[] bytes, InetSocketAddress address) { + super(size, bytes); + this.address = address; + } + + public byte[] getTripleAsBytes() { + ByteBuffer result = ByteBuffer.allocate(Node.SIZE_IP_ADDRESS + + (Node.ID_BITS / 8)); + + result.put(BufferUtil.addrToBytes(address)); + result.put(bits.toByteArray()); + return result.array(); + } + + public InetSocketAddress getAddress() { + return address; + } +} \ No newline at end of file diff --git a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java new file mode 100644 index 00000000..aac1b59a --- /dev/null +++ b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java @@ -0,0 +1,220 @@ +package node; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.logging.Level; +import java.util.logging.Logger; + +import message.MessageType; + +public class UDPHandler implements Runnable { + private final static Logger LOGGER = Logger.getLogger(UDPHandler.class.getName()); + + public static final int BUF_SIZE = 512; + + private volatile boolean running = true; + private ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); + + private Node node; + + public UDPHandler(Node node) { + this.node = node; + } + + /** + * Takes the buffer of this UDPHandler as is and tries to read an IP address + * (4 bytes and 1 int) from it. If there is no/incomplete or wrong data, + * this will fail. + * + * @return the address that has been read + */ + private InetSocketAddress getIPFromBuffer() { + StringBuilder theAddr = new StringBuilder(); + // Read 4 Bytes and 1 Integer = 1 IP address + for (int i = 0; i < 4; i++) { + theAddr.append(buffer.get()); + if (i < 3) { + theAddr.append("."); + } + } + int port = buffer.getInt(); + return new InetSocketAddress(theAddr.toString(), port); + } + + private Identifier getIDFromBuffer() { + int numBytes = Node.ID_BITS / 8; + byte[] result = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + result[i] = buffer.get(); + } + return new Identifier(Node.ID_BITS, result); + } + + /** + * Reads a triple from the channel and returns a + * {@link node.NodeIdentifier}. + * + * @return the read node ID + */ + private NodeIdentifier getNodeTripleFromBuffer() { + InetSocketAddress address = getIPFromBuffer(); + + int numBytes = Node.ID_BITS / 8; + byte[] result = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + result[i] = buffer.get(); + } + return new NodeIdentifier(Node.ID_BITS, result, address); + } + + public void run() { + InetSocketAddress from = null; + + // Run until it gets killed, and all my Acks have been answered + while (running || node.hasAcks()) { + try { + // Flag that indicates whether the routing table should be + // updated with the node we just received a message from. This + // needs to be done, because some messages trigger a direct + // answer. For example we send a PING to a node. That node + // answers with a PONG. Because we received a message from that + // node we will update our routing table and see that we already + // know this node. So we will PING that node... + boolean updateRT = true; + + // The address of the node that sent this message + from = (InetSocketAddress) node.getChannel().receive(buffer); + + // channel.receive() is non-blocking. So we need to check if + // something actually has been written to the buffer + if (buffer.remaining() != BUF_SIZE) { + buffer.flip(); + + byte messageType = buffer.get(); + + NodeIdentifier fromID = new NodeIdentifier(Node.ID_BITS, + getIDFromBuffer().getBytes(), from); + + Identifier rpcID = getIDFromBuffer(); + + switch (messageType) { + case MessageType.FIND_NODE: + receiveFindNode(fromID, rpcID); + break; + case MessageType.NODES: + receiveNodes(fromID, rpcID); + break; + case MessageType.PING: + updateRT = false; + receivePing(fromID, rpcID); + break; + case MessageType.PONG: + updateRT = false; + receivePong(fromID, rpcID); + break; + case MessageType.LEAVE: + // We don't have to do anything here because, after this + // switch block we call node.updateBuckets(...) which + // will try to ping the node we received this leave + // message from. That node will not answered because it + // directly shut down after sending the leave message. + // So the node will be removed from this routing table. + LOGGER.log(Level.INFO, "Received leave from {0}", + new Object[] { from.toString() }); + break; + case MessageType.FIND_VALUE: + //TODO implement + LOGGER.log(Level.INFO, "Received FIND_VALUE from {0}", + new Object[] { from.toString() }); + break; + case MessageType.STORE: + //TODO implemnt + LOGGER.log(Level.INFO, "Received STORE from {0}", + new Object[] { from.toString() }); + break; + case MessageType.DATA: + //TODO implemnt + LOGGER.log(Level.INFO, "Received DATA from {0}", + new Object[] { from.toString() }); + break; + default: + LOGGER.log(Level.INFO, + "Received unknown command from {0}: [{1}]{2}", + new Object[] { from.toString(), messageType, + new String(buffer.array()) }); + } + + if (updateRT) { + node.updateBuckets(new NodeIdentifier(Node.ID_BITS, + fromID.getBytes(), from)); + } + + } else { + // If nothing has been read/received wait and read/receive + // again + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + buffer.clear(); + + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + private void receivePong(NodeIdentifier fromID, Identifier rpcID) { + LOGGER.log(Level.INFO, "Received [PONG] from {0}", + new Object[] { fromID }); + + // This should be the answer to a prior PING -> mark this RPC ID as + // received + node.receivedRPC(fromID, rpcID); + } + + private void receivePing(NodeIdentifier fromID, Identifier rpcID) { + LOGGER.log(Level.INFO, "Received [PING] from {0}", + new Object[] { fromID }); + node.sendPong(fromID, rpcID); + } + + private void receiveNodes(NodeIdentifier fromID, Identifier rpcID) { + + int numReceived = 0; + + // This is just for the log message + StringBuilder nodes = new StringBuilder(); + + while (buffer.hasRemaining()) { + NodeIdentifier newID = getNodeTripleFromBuffer(); + node.updateBuckets(newID); + nodes.append(newID).append(", "); + numReceived++; + } + + // This should be the answer to a prior FIND_NODE -> mark this RPC ID as + // received + node.receivedRPC(fromID, rpcID); + + LOGGER.log(Level.INFO, "Received {0} [NODES] [{1}] from Node {2})", + new Object[] { numReceived, nodes.toString(), fromID }); + } + + private void receiveFindNode(NodeIdentifier fromID, Identifier rpc_id) { + Identifier idToFind = getIDFromBuffer(); + + LOGGER.log(Level.INFO, "Received [FIND_NODE {0}] from Node {1}", + new Object[] { idToFind, fromID }); + + node.sendClosestNodesTo(fromID, idToFind, rpc_id); + } + + public void terminate() { + running = false; + } +} diff --git a/ws2012/P2P/uebungen/11/src/routingtable/Bucket.java b/ws2012/P2P/uebungen/11/src/routingtable/Bucket.java new file mode 100644 index 00000000..271d063d --- /dev/null +++ b/ws2012/P2P/uebungen/11/src/routingtable/Bucket.java @@ -0,0 +1,145 @@ +package routingtable; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import message.MessageCallback; +import node.Node; +import node.NodeIdentifier; + +public class Bucket { + private final static Logger LOGGER = Logger.getLogger(Bucket.class + .getName()); + + private Bucket left; + private Bucket right; + + private List entries; + + private int bucketSize; + private int level; + + private Node node; + + public Bucket(int bucketSize, int level, Node node) { + this.bucketSize = bucketSize; + this.level = level; + this.node = node; + entries = new ArrayList(); + } + + /** + * Returns the nodes of this very bucket. + * + * @return + */ + public List getNodes() { + return entries; + } + + public boolean contains(NodeIdentifier id) { + if (!isLeaf()) { + return left.contains(id) || right.contains(id); + } + return entries.contains(id); + } + + /** + * Tries to update the given node. + * + * @param id + * @return true if the node is still available, else false + */ + public void update(final NodeIdentifier id) { + if (!isLeaf()) { + if (id.isBitSetAt(level)) { + left.update(id); + } else { + right.update(id); + } + } else { + node.sendPing(id, new MessageCallback() { + @Override + public void onReceive() { + LOGGER.log(Level.INFO, + "Node answered in time, moving to top of list."); + entries.remove(id); + entries.add(0, id); + } + + @Override + public void onTimeout() { + LOGGER.log(Level.INFO, "Node didnt answer in time."); + // TODO: this should be propagated to the "upper" Routing + // Table, not just to this specific bucket + entries.remove(id); + } + }); + } + } + + public void insert(NodeIdentifier newId) { + insert(newId, ""); + } + + public void insert(NodeIdentifier newId, String path) { + if (isLeaf()) { + if (entries.size() < bucketSize) { + LOGGER.log(Level.INFO, + "Added node {0} to RT [{1}] on level {2}", + new Object[] { newId, path, level }); + entries.add(newId); + } else { + LOGGER.log(Level.INFO, "Split on level " + level + + " while adding " + newId); + + LOGGER.log(Level.INFO, + "Distributing present nodes to lower buckets"); + + Bucket newLeft = new Bucket(bucketSize, level + 1, node); + Bucket newRight = new Bucket(bucketSize, level + 1, node); + + // Add the new entry and in the following loop distribute all + // existing entries to left/right + entries.add(newId); + + for (NodeIdentifier id : entries) { + if (id.isBitSetAt(level)) { + newLeft.insert(id, path + "1"); + } else { + newRight.insert(id, path + "0"); + } + } + + this.entries = null; + this.left = newLeft; + this.right = newRight; + } + } else { + if (newId.isBitSetAt(level)) { + left.insert(newId, path + "1"); + } else { + right.insert(newId, path + "0"); + } + } + + } + + private boolean isLeaf() { + return left == null && right == null; + } + + public void remove(NodeIdentifier node) { + if (isLeaf()) { + entries.remove(node); + } else { + if (node.isBitSetAt(level)) { + left.remove(node); + } else { + right.remove(node); + } + } + } +} \ No newline at end of file diff --git a/ws2012/P2P/uebungen/11/src/routingtable/IRoutingTable.java b/ws2012/P2P/uebungen/11/src/routingtable/IRoutingTable.java new file mode 100644 index 00000000..8dd53f0a --- /dev/null +++ b/ws2012/P2P/uebungen/11/src/routingtable/IRoutingTable.java @@ -0,0 +1,19 @@ +package routingtable; + +import java.util.Set; + +import node.Identifier; +import node.NodeIdentifier; + +public interface IRoutingTable { + + public void insert(NodeIdentifier id); + + public Set getClosestNodesTo(Identifier id); + + public boolean contains(NodeIdentifier node); + + public void remove(NodeIdentifier node); + + public Set getEntries(); +} \ No newline at end of file diff --git a/ws2012/P2P/uebungen/11/src/routingtable/RoutingTableImpl.java b/ws2012/P2P/uebungen/11/src/routingtable/RoutingTableImpl.java new file mode 100644 index 00000000..70b070ed --- /dev/null +++ b/ws2012/P2P/uebungen/11/src/routingtable/RoutingTableImpl.java @@ -0,0 +1,79 @@ +package routingtable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import node.Identifier; +import node.Node; +import node.NodeIdentifier; + +public class RoutingTableImpl implements IRoutingTable { + private Set entries = new HashSet(); + + private Bucket root; + + private int bucketSize; + + public RoutingTableImpl(int bucketSize, Node node) { + this.bucketSize = bucketSize; + this.root = new Bucket(bucketSize, 0, node); + } + + @Override + public void insert(NodeIdentifier id) { + if (root.contains(id)) { + root.update(id); + } else { + entries.add(id); + root.insert(id); + } + } + + @Override + public Set getClosestNodesTo(final Identifier id) { + Set result = new HashSet(); + + if (entries.size() <= bucketSize) { + result.addAll(entries); + + } else { + List temp = new ArrayList(entries); + + Collections.sort(temp, new Comparator() { + @Override + public int compare(NodeIdentifier o1, NodeIdentifier o2) { + BigInteger dist1 = id.distanceTo(o1); + BigInteger dist2 = id.distanceTo(o2); + return dist1.compareTo(dist2); + } + }); + + for (int i = 0; i < bucketSize; i++) { + result.add(temp.get(i)); + } + result = new HashSet(temp.subList(0, + Node.BUCKET_SIZE)); + } + return result; + } + + @Override + public boolean contains(NodeIdentifier node) { + return root.contains(node); + } + + @Override + public void remove(NodeIdentifier node) { + + } + + @Override + public Set getEntries() { + return entries; + } +} \ No newline at end of file diff --git a/ws2012/P2P/uebungen/11/src/util/BufferUtil.java b/ws2012/P2P/uebungen/11/src/util/BufferUtil.java new file mode 100644 index 00000000..182ce91a --- /dev/null +++ b/ws2012/P2P/uebungen/11/src/util/BufferUtil.java @@ -0,0 +1,28 @@ +package util; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +public class BufferUtil { + + public static ByteBuffer clone(ByteBuffer original) { + ByteBuffer clone = ByteBuffer.allocate(original.capacity()); + + int oldPosition = original.position(); + original.rewind();// copy from the beginning + clone.put(original); + // original.rewind(); + original.position(oldPosition); + clone.flip(); + return clone; + } + + public static byte[] addrToBytes(InetSocketAddress addr) { + ByteBuffer buffer = ByteBuffer.allocate(8); + for (String part : addr.getHostString().split("\\.")) { + buffer.put(Byte.valueOf(part)); + } + buffer.putInt(addr.getPort()); + return buffer.array(); + } +}