diff --git a/build.xml b/build.xml new file mode 100644 index 0000000..793fefc --- /dev/null +++ b/build.xml @@ -0,0 +1,73 @@ + + + + + + + + + + + Builds, tests, and runs the project Kademlia. + + + diff --git a/build/.gitignore b/build/.gitignore new file mode 100644 index 0000000..0a00d70 --- /dev/null +++ b/build/.gitignore @@ -0,0 +1 @@ +*/ \ No newline at end of file diff --git a/logging.properties b/logging.properties new file mode 100644 index 0000000..2157f57 --- /dev/null +++ b/logging.properties @@ -0,0 +1,7 @@ +handlers=java.util.logging.ConsoleHandler + +.level=FINEST + +java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %3$s %4$s: %5$s %n +java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter +java.util.logging.ConsoleHandler.level = FINEST diff --git a/manifest.mf b/manifest.mf new file mode 100644 index 0000000..328e8e5 --- /dev/null +++ b/manifest.mf @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +X-COMMENT: Main-Class will be added automatically by build + diff --git a/nbproject/.gitignore b/nbproject/.gitignore new file mode 100644 index 0000000..623f09e --- /dev/null +++ b/nbproject/.gitignore @@ -0,0 +1 @@ +/private \ No newline at end of file diff --git a/nbproject/build-impl.xml b/nbproject/build-impl.xml new file mode 100644 index 0000000..a4a9efa --- /dev/null +++ b/nbproject/build-impl.xml @@ -0,0 +1,1403 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must set src.dir + Must set build.dir + Must set dist.dir + Must set build.classes.dir + Must set dist.javadoc.dir + Must set build.test.classes.dir + Must set build.test.results.dir + Must set build.classes.excludes + Must set dist.jar + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must set javac.includes + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + No tests executed. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must set JVM to use for profiling in profiler.info.jvm + Must set profiler agent JVM arguments in profiler.info.jvmargs.agent + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must select some files in the IDE or set javac.includes + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + To run this application from the command line without Ant, try: + + java -jar "${dist.jar.resolved}" + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must select one file in the IDE or set run.class + + + + Must select one file in the IDE or set run.class + + + + + + + + + + + + + + + + + + + + + + + Must select one file in the IDE or set debug.class + + + + + Must select one file in the IDE or set debug.class + + + + + Must set fix.includes + + + + + + + + + + This target only works when run from inside the NetBeans IDE. + + + + + + + + + Must select one file in the IDE or set profile.class + This target only works when run from inside the NetBeans IDE. + + + + + + + + + This target only works when run from inside the NetBeans IDE. + + + + + + + + + + + + + This target only works when run from inside the NetBeans IDE. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must select one file in the IDE or set run.class + + + + + + Must select some files in the IDE or set test.includes + + + + + Must select one file in the IDE or set run.class + + + + + Must select one file in the IDE or set applet.url + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Must select some files in the IDE or set javac.includes + + + + + + + + + + + + + + + + + + Some tests failed; see details above. + + + + + + + + + Must select some files in the IDE or set test.includes + + + + Some tests failed; see details above. + + + + Must select some files in the IDE or set test.class + Must select some method in the IDE or set test.method + + + + Some tests failed; see details above. + + + + + Must select one file in the IDE or set test.class + + + + Must select one file in the IDE or set test.class + Must select some method in the IDE or set test.method + + + + + + + + + + + + + + Must select one file in the IDE or set applet.url + + + + + + + + + Must select one file in the IDE or set applet.url + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/nbproject/genfiles.properties b/nbproject/genfiles.properties new file mode 100644 index 0000000..3a2424f --- /dev/null +++ b/nbproject/genfiles.properties @@ -0,0 +1,8 @@ +build.xml.data.CRC32=4d688e4d +build.xml.script.CRC32=c3cd04bd +build.xml.stylesheet.CRC32=8064a381@1.78.1.48 +# This file is used by a NetBeans-based IDE to track changes in generated files such as build-impl.xml. +# Do not edit this file. You may delete it but then the IDE will never regenerate such files for you. +nbproject/build-impl.xml.data.CRC32=4d688e4d +nbproject/build-impl.xml.script.CRC32=269076b4 +nbproject/build-impl.xml.stylesheet.CRC32=830a3534@1.80.1.48 diff --git a/nbproject/project.properties b/nbproject/project.properties new file mode 100644 index 0000000..019f285 --- /dev/null +++ b/nbproject/project.properties @@ -0,0 +1,84 @@ +annotation.processing.enabled=true +annotation.processing.enabled.in.editor=false +annotation.processing.processors.list= +annotation.processing.run.all.processors=true +annotation.processing.source.output=${build.generated.sources.dir}/ap-source-output +application.title=Kademlia +application.vendor=rylon +build.classes.dir=${build.dir}/classes +build.classes.excludes=**/*.java,**/*.form +# This directory is removed when the project is cleaned: +build.dir=build +build.generated.dir=${build.dir}/generated +build.generated.sources.dir=${build.dir}/generated-sources +# Only compile against the classpath explicitly listed here: +build.sysclasspath=ignore +build.test.classes.dir=${build.dir}/test/classes +build.test.results.dir=${build.dir}/test/results +# Uncomment to specify the preferred debugger connection transport: +#debug.transport=dt_socket +debug.classpath=\ + ${run.classpath} +debug.test.classpath=\ + ${run.test.classpath} +# This directory is removed when the project is cleaned: +dist.dir=dist +dist.jar=${dist.dir}/Kademlia.jar +dist.javadoc.dir=${dist.dir}/javadoc +endorsed.classpath= +excludes= +file.reference.11-src=src +includes=** +jar.archive.disabled=${jnlp.enabled} +jar.compress=false +jar.index=${jnlp.enabled} +javac.classpath= +# Space-separated list of extra javac options +javac.compilerargs= +javac.deprecation=false +javac.external.vm=false +javac.processorpath=\ + ${javac.classpath} +javac.source=1.8 +javac.target=1.8 +javac.test.classpath=\ + ${javac.classpath}:\ + ${build.classes.dir}:\ + ${libs.junit.classpath}:\ + ${libs.junit_4.classpath} +javac.test.processorpath=\ + ${javac.test.classpath} +javadoc.additionalparam= +javadoc.author=false +javadoc.encoding=${source.encoding} +javadoc.noindex=false +javadoc.nonavbar=false +javadoc.notree=false +javadoc.private=false +javadoc.splitindex=true +javadoc.use=true +javadoc.version=false +javadoc.windowtitle= +jnlp.codebase.type=no.codebase +jnlp.descriptor=application +jnlp.enabled=false +jnlp.mixed.code=defaut +jnlp.offline-allowed=false +jnlp.signed=false +main.class=CLI +manifest.file=manifest.mf +meta.inf.dir=${src.dir}/META-INF +mkdist.disabled=false +platform.active=default_platform +run.classpath=\ + ${javac.classpath}:\ + ${build.classes.dir} +# Space-separated list of JVM arguments used when running the project +# (you may also define separate properties like run-sys-prop.name=value instead of -Dname=value +# or test-sys-prop.name=value to set system properties for unit tests): +run.jvmargs= +run.test.classpath=\ + ${javac.test.classpath}:\ + ${build.test.classes.dir} +source.encoding=UTF-8 +src.dir=${file.reference.11-src} diff --git a/nbproject/project.xml b/nbproject/project.xml new file mode 100644 index 0000000..a87d938 --- /dev/null +++ b/nbproject/project.xml @@ -0,0 +1,13 @@ + + + org.netbeans.modules.java.j2seproject + + + Kademlia + + + + + + + diff --git a/src/CLI.java b/src/CLI.java new file mode 100644 index 0000000..40de48c --- /dev/null +++ b/src/CLI.java @@ -0,0 +1,78 @@ +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.InetSocketAddress; +import java.nio.MappedByteBuffer; +import java.util.logging.LogManager; +import node.FileIdentifier; + +import node.Identifier; +import node.Node; +import node.NodeIdentifier; + +public class CLI { + + public static void main(String[] args) throws IOException { + System.setProperty("java.util.logging.config.file", + "logging.properties"); + + try { + LogManager.getLogManager().readConfiguration(); + } catch (Exception e) { + e.printStackTrace(); + } + + Node node = new Node(); + + BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); + String s; + while ((s = in.readLine()) != null && s.length() != 0) { + String[] splitted = s.split(" "); + + String cmd = splitted[0]; + + switch (cmd) { + //status + case "status": + for (NodeIdentifier id : node.getNeighbors()) { + System.out.println(id); + } + break; + //lookup fileID + case "lookup": + String fileID = splitted[1]; + // TODO not implemented + // Zum testen: + FileIdentifier fileIDToFind = new FileIdentifier(1, fileID.getBytes()); + node.findValue(fileIDToFind); + break; + //request fileID + case "request": + String fileID3 = splitted[1]; + FileIdentifier fileIDToFind2 = new FileIdentifier(1, fileID3.getBytes()); + node.sendDataReq(fileIDToFind2); + break; + //leave + case "leave": + node.leave(); + break; + //store fileID data + case "store": + String fileID2 = splitted[1]; + String data = splitted[2]; + // TODO not implemented + // Zum testen: + FileIdentifier fileIDToStore = new FileIdentifier(1,fileID2.getBytes()); + node.store(fileIDToStore); + node.storeData(fileIDToStore,data); + break; + default: + System.out.println("Unknown command."); + break; + } + } + } +} \ No newline at end of file diff --git a/src/message/Ack.java b/src/message/Ack.java new file mode 100644 index 0000000..f1426e6 --- /dev/null +++ b/src/message/Ack.java @@ -0,0 +1,141 @@ +package message; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.util.logging.Level; +import java.util.logging.Logger; + +import node.Identifier; +import node.NodeIdentifier; +import util.BufferUtil; + +public class Ack { + private final static Logger LOGGER = Logger.getLogger(Ack.class.getName()); + + /** + * timeout in seconds + */ + private static final int TIMEOUT = 1000; + + /** + * Maximum number of retries + */ + private static final int MAX_RETRIES = 3; + + private Identifier rpcId; + + private NodeIdentifier receiver; + + private ByteBuffer buffer; + + private int numRetries = 0; + + private TimeoutThread timeout; + private Thread thread; + + // The channel to re-send the message on + private DatagramChannel channel; + + private MessageCallback callback; + + public Ack(Identifier id, NodeIdentifier receiver, DatagramChannel channel, + ByteBuffer buffer, MessageCallback cb) { + this.rpcId = id; + this.receiver = receiver; + this.channel = channel; + this.buffer = BufferUtil.clone(buffer); + this.callback = cb; + startThread(); + } + + private void startThread() { + LOGGER.log(Level.FINEST, "Starting timeout thread for RPC " + rpcId); + timeout = new TimeoutThread(); + thread = new Thread(timeout); + thread.start(); + } + + public Identifier getID() { + return rpcId; + } + + public boolean check(NodeIdentifier fromID) { + return fromID.equals(receiver); + } + + public ByteBuffer getBuf() { + return buffer; + } + + public void setBuf(ByteBuffer buf) { + this.buffer = buf; + } + + public void setReceived() { + // Stop thread + try { + if (thread != null) { + timeout.terminate(); + thread.join(); + } + } catch (InterruptedException e) { + } + } + + private class TimeoutThread implements Runnable { + private volatile boolean notReceived = true; + + // When do we stop expecting the ack + private long timeToStop = System.currentTimeMillis() + TIMEOUT; + + @Override + public void run() { + while (notReceived && System.currentTimeMillis() < timeToStop) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + // Timeout hit! + + if (notReceived) { + + if (numRetries < MAX_RETRIES) { + try { + LOGGER.log( + Level.FINE, + "Didn't receive RPC Ack {0} by now. Resending... ", + new Object[] { rpcId }); + LOGGER.log(Level.INFO, receiver.getAddress().toString()); + channel.send(buffer, receiver.getAddress()); + } catch (IOException e) { + e.printStackTrace(); + } + + startThread(); + numRetries++; + } else { + + LOGGER.log(Level.INFO, "Absent RPC ack {0}.", + new Object[] { rpcId }); + + if (callback != null) { + callback.onTimeout(); + } + } + } else { + // Message has been received in time + if (callback != null) { + callback.onReceive(); + } + } + } + + public void terminate() { + notReceived = false; + } + } +} \ No newline at end of file diff --git a/src/message/MessageCallback.java b/src/message/MessageCallback.java new file mode 100644 index 0000000..d88b392 --- /dev/null +++ b/src/message/MessageCallback.java @@ -0,0 +1,22 @@ +package message; + +/** + * A callback to create asynchronous events that get triggered when a message + * (ack/answer) is received. + * + * @author jln + * + */ +public interface MessageCallback { + + /** + * Called when the awaited message arrives. + */ + public void onReceive(); + + /** + * Called when the awaited message doesn't arrive (even after possible + * retries). + */ + public void onTimeout(); +} diff --git a/src/message/MessageType.java b/src/message/MessageType.java new file mode 100644 index 0000000..40cb09c --- /dev/null +++ b/src/message/MessageType.java @@ -0,0 +1,19 @@ +package message; + +public class MessageType { + public final static byte FIND_NODE = 0; + public final static byte NODES = 1; + + public final static byte PING = 11; + public final static byte PONG = 12; + + public final static byte LEAVE = 2; + + public final static byte FIND_VALUE = 4; + public final static byte STORE = 5; + public final static byte DATA = 6; + public final static byte DATA_REQ = 7; + public final static byte VALUE_NODES = 8; + public final static byte FOUND_VALUE = 9; + public final static byte ACK = 10; +} diff --git a/src/node/ChunkIdentifier.java b/src/node/ChunkIdentifier.java new file mode 100644 index 0000000..3bc9143 --- /dev/null +++ b/src/node/ChunkIdentifier.java @@ -0,0 +1,36 @@ +package node; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +public class ChunkIdentifier extends Identifier { + + private String chunkID; + private FileIdentifier fileID; + + public ChunkIdentifier(int size, byte[] bytes, FileIdentifier fileID, String chunkID) { + super(size, bytes); + + this.fileID = fileID; + + //calculate SHA-256 Hash of chunckID + try { + MessageDigest md = MessageDigest.getInstance("SHA-256"); + md.update(chunkID.getBytes()); + this.chunkID = md.digest().toString(); + } catch (NoSuchAlgorithmException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + public String getChunkID() { + return this.chunkID; + } + + public FileIdentifier getFileID(){ + return this.fileID; + } + +} diff --git a/src/node/FileIdentifier.java b/src/node/FileIdentifier.java new file mode 100644 index 0000000..0cd699e --- /dev/null +++ b/src/node/FileIdentifier.java @@ -0,0 +1,28 @@ +package node; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +public class FileIdentifier extends Identifier { + + private String fileID; + + public FileIdentifier(int size, byte[] fileID) { + super(size, fileID); + + /*//calculate SHA-256 Hash of key + try { + MessageDigest md = MessageDigest.getInstance("SHA-256"); + md.update(fileID.getBytes()); + this.fileID = md.digest().toString(); + } catch (NoSuchAlgorithmException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + }*/ + + } + + public String getKey() { + return this.fileID; + } +} diff --git a/src/node/Identifier.java b/src/node/Identifier.java new file mode 100644 index 0000000..fb44ff8 --- /dev/null +++ b/src/node/Identifier.java @@ -0,0 +1,111 @@ +package node; + +import java.math.BigInteger; +import java.util.BitSet; +import java.util.Random; + +/** + * A Kademlia identifier. Can be used for identifying files as well as nodes + * (but for nodes check {@see NodeIdentifier}). + * + * @author jln + * + */ +public class Identifier { + private static Random random = new Random(System.currentTimeMillis()); + + protected BitSet bits; + + private int size; + + public Identifier(int size, byte[] bytes) { + this.size = size; + this.bits = BitSet.valueOf(bytes); + } + + private Identifier(int size, BitSet bits) { + this.size = size; + this.bits = bits; + } + + /** + * Creates an ID exactly "in the middle" of the ID space. (If the ID space + * is 8 bit wide, this returns an ID valued 128). + * + * @param size + * the size of the id space + * @return an Identifier + */ + public static Identifier getStaticIdentifier(int size) { + BitSet middle = new BitSet(size); + middle.set(size - 1); + return new Identifier(size, middle); + } + + /** + * Creates a random ID for the given id space size. + * + * @param size + * the size of the id space + * @return a random Identifier + */ + public static Identifier getRandomIdentifier(int size) { + BitSet bits = new BitSet(size); + + for (int i = 0; i < size; i++) { + double threshold = random.nextGaussian(); + if (threshold > 0) { + bits.set(i); + } + } + + return new Identifier(size, bits); + } + + public BigInteger distanceTo(Identifier otherID) { + BitSet distance = (BitSet) bits.clone(); + distance.xor(otherID.bits); + return new BigInteger(1, distance.toByteArray()); + } + + /** + * Returns whether the bit at the given position is set or not. The MSB is + * at position 0. + * + * @param index + * the index to check + * @return true if the bit is set + */ + public boolean isBitSetAt(int index) { + BigInteger intValue = new BigInteger(1, bits.toByteArray()); + int numOfTrimmedZeros = size - intValue.bitLength(); + + if (index < numOfTrimmedZeros) { + return false; + } + + return bits.get(bits.length() - (index + numOfTrimmedZeros) - 1); + } + + public byte[] getBytes() { + return bits.toByteArray(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Identifier)) { + return false; + } else { + return bits.equals(((Identifier) o).bits); + } + } + + @Override + public int hashCode() { + return toString().hashCode(); + } + + public String toString() { + return new BigInteger(1, bits.toByteArray()).toString(); + } +} diff --git a/src/node/Node.java b/src/node/Node.java new file mode 100644 index 0000000..7436792 --- /dev/null +++ b/src/node/Node.java @@ -0,0 +1,476 @@ +package node; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import message.Ack; +import message.MessageCallback; +import message.MessageType; +import routingtable.IRoutingTable; +import routingtable.RoutingTableImpl; + +public class Node { + + private final static Logger LOGGER = Logger.getLogger(Node.class.getName()); + + /** + * Size of ID space (has to be a multiple of 8) + */ + public static final int ID_BITS = 8; + /** + * The bucket size + */ + public static final int BUCKET_SIZE = 2; + /** + * The first node is always spawned on port 50000 + */ + private static final int INITIAL_PORT = 50000; + private static final Identifier INITIAL_ID = Identifier + .getStaticIdentifier(ID_BITS); + private static final int BUFFER_SIZE = 512; + private static final int CHUNK_SIZE = 4; + /** + * The size of an IP address (in bytes) + */ + public static final int SIZE_IP_ADDRESS = 8; + + private InetSocketAddress address; + private DatagramChannel channel; + + public NodeIdentifier lastlookup = null; + + private Map> rpcs = new HashMap>(); + private Map values = new HashMap(); + + private Identifier searchID = null; + + private Thread thread; + private UDPHandler udpListen; + + private Identifier nodeID = Identifier.getRandomIdentifier(ID_BITS); + private IRoutingTable routingTable = new RoutingTableImpl(BUCKET_SIZE, this); + + private Map data = new HashMap();; + + public Node() { + System.setProperty("java.net.preferIPv4Stack", "true"); + + try { + channel = DatagramChannel.open(); + + try { + address = new InetSocketAddress("localhost", INITIAL_PORT); + channel.socket().bind(address); + + this.nodeID = INITIAL_ID; + } catch (SocketException e) { + // The initial port is already bound -> let the system pick a + // port + channel.socket().bind(new InetSocketAddress("localhost", 0)); + address = (InetSocketAddress) channel.getLocalAddress(); + } + + channel.configureBlocking(false); + + udpListen = new UDPHandler(this); + thread = new Thread(udpListen); + thread.start(); + + LOGGER.log(Level.INFO, "{0}: Initialized node {1} on {2}", + new Object[] { this.nodeID, getName(), address.toString() }); + + if (address.getPort() != INITIAL_PORT) { + // The port of this node is not the "INITIAL_PORT" (so it's not + // the first node in the network). So we try to join the network + // via the first node. + NodeIdentifier viaNode = new NodeIdentifier(ID_BITS, + INITIAL_ID.getBytes(), new InetSocketAddress( + "127.0.0.1", INITIAL_PORT)); + joinNetworkVia(viaNode); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void joinNetworkVia(NodeIdentifier viaNode) { + LOGGER.log(Level.INFO, "Trying to join network via node {0}", + new Object[] { viaNode }); + + routingTable.insert(viaNode); + sendFindNode(viaNode, this.nodeID); + } + + /** + * Creates and returns new ID (usually used as a RPC ID). This makes sure + * the ID is not yet used (in this node). + * + * @return an ID + */ + private Identifier createRPCID() { + Identifier rpcID = Identifier.getRandomIdentifier(ID_BITS); + while (rpcs.containsKey(rpcID)) { + rpcID = Identifier.getRandomIdentifier(ID_BITS); + } + return rpcID; + } + + void sendFindNode(NodeIdentifier receiver, Identifier idToFind) { + boolean successful = send(receiver, MessageType.FIND_NODE, + idToFind.getBytes(), true, null); + + if (successful) { + LOGGER.log(Level.INFO, "Sending [FIND_NODE {0}] to node {1}", + new Object[] { idToFind, receiver }); + } + } + + void sendFindValue(NodeIdentifier receiver, Identifier idToFind) { + // need to save the fileID because we need it for future searches + this.searchID = idToFind; + + LOGGER.log(Level.INFO, "Sending [FIND_VALUE {0}] to node {1}",new Object[] { idToFind, receiver }); + boolean successful = send(receiver, MessageType.FIND_VALUE, + idToFind.getBytes(), true, null); + + if (successful) { + LOGGER.log(Level.INFO, "Sent [FIND_VALUE {0}] to node {1}", + new Object[] { idToFind, receiver }); + } + } + + void sendFoundValue(NodeIdentifier receiver, Identifier idToFind, + Identifier rpcID) { + boolean successful = send(receiver, MessageType.FOUND_VALUE, rpcID, + values.get(idToFind).getBytes(), false, null); + + if (successful) { + LOGGER.log(Level.INFO, "Sending [FOUND_VALUE {0} -> {1}] to node {2}", + new Object[] { idToFind, values.get(idToFind), receiver }); + } + } + + /** + * Gets all nodes of this nodes routing table, that are close to a given + * node/fileID and sends that list to a specific node. + * + * @param receiver + * The node to receive the list of nodes + * @param idToFind + * The ID to find close nodes of + * @param rpcID + * An RPC ID (because this is always an answer to a FIND_NODE + * RPC) + * @param nodeType + * If true, we search a specific node, else a fileID + */ + void sendClosestNodesTo(NodeIdentifier receiver, Identifier idToFind, + Identifier rpcID, boolean nodeType) { + byte msgtype = 0; + if (nodeType) { + msgtype = MessageType.NODES; + } else { + msgtype = MessageType.VALUE_NODES; + } + + Set closeNodes = routingTable + .getClosestNodesTo(idToFind); + int numNodes = closeNodes.size(); + + ByteBuffer nodes = ByteBuffer.allocate(numNodes * (ID_BITS / 8) + + numNodes * SIZE_IP_ADDRESS); + + for (NodeIdentifier idToSend : closeNodes) { + // Don't send the node to itself + if (!receiver.equals(idToSend)) { + nodes.put(idToSend.getTripleAsBytes()); + } + } + + boolean successful = send(receiver, msgtype, rpcID, nodes.array(), + false, null); + + if (successful) { + LOGGER.log( + Level.INFO, + "Sending {0} nodes to to node {1} [FIND_NODE {2}] (rpcID={3})", + new Object[] { closeNodes.size(), receiver, idToFind, rpcID }); + } + } + + public void sendStore(NodeIdentifier receiver, Identifier fileID) { + boolean successful = send(receiver, MessageType.STORE, + fileID.getBytes(), true, null); + + if (successful) { + LOGGER.log(Level.INFO, "Sending [STORE {0}] to node {1}", + new Object[] { fileID, receiver }); + } + } + + public void sendAck(NodeIdentifier receiver, Identifier rpcID) { + send(receiver, MessageType.ACK, rpcID, null, false, null); + } + + public void sendDataReq(FileIdentifier fileID){ + //TODO + if(lastlookup == null){ + new Exception("lookup first!").printStackTrace(); + return;} + //String id = "128"; + //NodeIdentifier receiver = new NodeIdentifier(8, id.getBytes(), new InetSocketAddress("localhost", INITIAL_PORT)); + send(lastlookup, MessageType.DATA_REQ, fileID.getBytes(), true, null); + } + + public void sendData(NodeIdentifier receiver, Identifier fileID) { + + String data = this.data.get(fileID); + if(data == null){ + //TODO We dont have that data. -> DOES NOT WORK PROPERLY! + new Exception().printStackTrace(); + return; + } + int CHUNK_COUNT = data.length()/CHUNK_SIZE; + + for(int i = 0; i()); + rpcs.get(rpcID).add(newAck); + } + } + } + return successful; + } + + public String getName() { + return nodeID.toString(); + } + + public boolean hasAcks() { + return !rpcs.isEmpty(); + } + + public DatagramChannel getChannel() { + return channel; + } + + public void updateBuckets(NodeIdentifier id) { + routingTable.insert(id); + } + + public Identifier getID() { + return nodeID; + } + + public Set getNeighbors() { + return routingTable.getEntries(); + } + + public void storePair(Identifier key, Identifier nodeid) { + values.put(key, nodeid); + } + + public void store(Identifier key) { + + storePair(key,this.nodeID); + + Set nodes = routingTable.getClosestNodesTo(key); + + + for (NodeIdentifier node : nodes) { + sendStore(node, key); + } + } + + public void findValue(Identifier key) { + Set nodes = routingTable.getClosestNodesTo(key); + + + for (NodeIdentifier node : nodes) { + sendFindValue(node, key); + } + } + + public boolean hasKey(Identifier key) { + return values.containsKey(key); + } + + public Identifier getSearchID() { + return this.searchID; + } + + public boolean receivedRPC(NodeIdentifier fromID, Identifier rpcID) { + List rpcsFromID = rpcs.get(rpcID); + boolean removedAck = false; + + // wohl unschön, hier auf != null zu prüfen, da Fehler wo anders ist. + if (rpcsFromID != null) { + for (Ack ack : rpcsFromID) { + if (ack.check(fromID)) { + ack.setReceived(); + rpcsFromID.remove(ack); + removedAck = true; + + LOGGER.log(Level.FINEST, "Received RPC ack " + rpcID); + + break; + } + } + } + + if (!removedAck) { + LOGGER.log(Level.WARNING, + "Received RPC ack {0}, but didn't expect that", + new Object[] { rpcID }); + } + + return removedAck; + } + + public void leave() { + for (NodeIdentifier n : getNeighbors()) { + sendLeave(n); + } + System.exit(0); + } + + private boolean sendLeave(NodeIdentifier n) { + return send(n, MessageType.LEAVE, null, false, null); + } + + public void storeData(FileIdentifier id, String data) { + this.data.put(id, data); + LOGGER.log(Level.INFO, "Stored Data [{0}] as [{1}])", + new Object[] { data, id}); + } + + + public void sendFile(NodeIdentifier nodeID, File file) { + + + } +} \ No newline at end of file diff --git a/src/node/NodeIdentifier.java b/src/node/NodeIdentifier.java new file mode 100644 index 0000000..37ca438 --- /dev/null +++ b/src/node/NodeIdentifier.java @@ -0,0 +1,35 @@ +package node; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +import util.BufferUtil; + +/** + * Same as a {@link Identifier}, but this also stores an IP address. + * + * @author jln + * + */ +public class NodeIdentifier extends Identifier { + + private InetSocketAddress address; + + public NodeIdentifier(int size, byte[] bytes, InetSocketAddress address) { + super(size, bytes); + this.address = address; + } + + public byte[] getTripleAsBytes() { + ByteBuffer result = ByteBuffer.allocate(Node.SIZE_IP_ADDRESS + + (Node.ID_BITS / 8)); + + result.put(BufferUtil.addrToBytes(address)); + result.put(bits.toByteArray()); + return result.array(); + } + + public InetSocketAddress getAddress() { + return address; + } +} \ No newline at end of file diff --git a/src/node/UDPHandler.java b/src/node/UDPHandler.java new file mode 100644 index 0000000..ef592fa --- /dev/null +++ b/src/node/UDPHandler.java @@ -0,0 +1,353 @@ +package node; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +import message.MessageType; + +public class UDPHandler implements Runnable { + private final static Logger LOGGER = Logger.getLogger(UDPHandler.class + .getName()); + + public static final int BUF_SIZE = 512; + + private volatile boolean running = true; + private ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); + + private Node node; + + HashMap > chunklist = new HashMap >(); + + public UDPHandler(Node node) { + this.node = node; + } + + /** + * Takes the buffer of this UDPHandler as is and tries to read an IP address + * (4 bytes and 1 int) from it. If there is no/incomplete or wrong data, + * this will fail. + * + * @return the address that has been read + */ + private InetSocketAddress getIPFromBuffer() { + StringBuilder theAddr = new StringBuilder(); + // Read 4 Bytes and 1 Integer = 1 IP address + for (int i = 0; i < 4; i++) { + theAddr.append(buffer.get()); + if (i < 3) { + theAddr.append("."); + } + } + int port = buffer.getInt(); + return new InetSocketAddress(theAddr.toString(), port); + } + + private Identifier getIDFromBuffer() { + int numBytes = Node.ID_BITS / 8; + byte[] result = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + result[i] = buffer.get(); + } + return new Identifier(Node.ID_BITS, result); + } + + /** + * Reads a triple from the channel and returns a + * {@link node.NodeIdentifier}. + * + * @return the read node ID + */ + private NodeIdentifier getNodeTripleFromBuffer() { + InetSocketAddress address = getIPFromBuffer(); + + int numBytes = Node.ID_BITS / 8; + byte[] result = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + result[i] = buffer.get(); + } + LOGGER.log(Level.INFO,"Read Buffer id: "+Node.ID_BITS+" result: "+result+" addr: "+address); + return new NodeIdentifier(Node.ID_BITS, result, address); + } + + public void run() { + InetSocketAddress from = null; + + // Run until it gets killed, and all my Acks have been answered + while (running || node.hasAcks()) { + try { + // Flag that indicates whether the routing table should be + // updated with the node we just received a message from. This + // needs to be done, because some messages trigger a direct + // answer. For example we send a PING to a node. That node + // answers with a PONG. Because we received a message from that + // node we will update our routing table and see that we already + // know this node. So we will PING that node... + boolean updateRT = true; + + // The address of the node that sent this message + from = (InetSocketAddress) node.getChannel().receive(buffer); + + // channel.receive() is non-blocking. So we need to check if + // something actually has been written to the buffer + if (buffer.remaining() != BUF_SIZE) { + buffer.flip(); + + byte messageType = buffer.get(); + + NodeIdentifier fromID = new NodeIdentifier(Node.ID_BITS, + getIDFromBuffer().getBytes(), from); + + Identifier rpcID = getIDFromBuffer(); + + switch (messageType) { + case MessageType.FIND_NODE: + receiveFindNode(fromID, rpcID); + break; + case MessageType.NODES: + receiveNodes(fromID, rpcID); + break; + case MessageType.PING: + updateRT = false; + receivePing(fromID, rpcID); + break; + case MessageType.PONG: + updateRT = false; + receivePong(fromID, rpcID); + break; + case MessageType.LEAVE: + // We don't have to do anything here because, after this + // switch block we call node.updateBuckets(...) which + // will try to ping the node we received this leave + // message from. That node will not answered because it + // directly shut down after sending the leave message. + // So the node will be removed from this routing table. + LOGGER.log(Level.INFO, "Received leave from {0}", + new Object[] { from.toString() }); + break; + case MessageType.FIND_VALUE: + receiveFindValue(fromID, rpcID); + break; + case MessageType.VALUE_NODES: + receiveValueNodes(fromID, rpcID); + break; + case MessageType.FOUND_VALUE: + receiveFoundValue(fromID, rpcID); + break; + case MessageType.STORE: + receiveStore(fromID, rpcID); + break; + case MessageType.DATA: + receiveData(fromID, rpcID); + LOGGER.log(Level.INFO, "Received DATA from {0}", + new Object[] { from.toString() }); + break; + case MessageType.DATA_REQ: + receiveDataReq(fromID,rpcID); + LOGGER.log(Level.INFO, "Received DATA_REQ from {0}", + new Object[] { from.toString() }); + break; + case MessageType.ACK: + receiveAck(fromID, rpcID); + break; + default: + LOGGER.log(Level.INFO, + "Received unknown command from {0}: [{1}]{2}", + new Object[] { from.toString(), messageType, + new String(buffer.array()) }); + } + + if (updateRT) { + node.updateBuckets(new NodeIdentifier(Node.ID_BITS, + fromID.getBytes(), from)); + } + + } else { + // If nothing has been read/received wait and read/receive + // again + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + buffer.clear(); + + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + private void receiveAck(NodeIdentifier fromID, Identifier rpcID) { + // This should be the either answer to a prior STORE or FOUND_VALUE -> + // mark this RPC ID as received + node.receivedRPC(fromID, rpcID); + } + + private void receiveFoundValue(NodeIdentifier fromID, Identifier rpcID) { + Identifier idToFind = getIDFromBuffer(); + node.lastlookup = fromID; + // TODO Auto-generated method stub + // Node kontaktieren, damit Datei gesendet werden kann. + + // This should be the answer to a prior FIND_VALUE -> mark this RPC ID + // as received + node.receivedRPC(fromID, rpcID); + + LOGGER.log(Level.INFO, "Received [FOUND VALUE on Node {0}] from Node {1}", + new Object[] { idToFind, fromID }); + } + + private void receiveValueNodes(NodeIdentifier fromID, Identifier rpcID) { + int numReceived = 0; + + // This is just for the log message + StringBuilder nodes = new StringBuilder(); + + while (buffer.hasRemaining()) { + NodeIdentifier newID = getNodeTripleFromBuffer(); + node.sendFindValue(newID, node.getSearchID()); + nodes.append(newID).append(", "); + numReceived++; + } + + // This should be the answer to a prior FIND_VALUE -> mark this RPC ID + // as + // received + node.receivedRPC(fromID, rpcID); + + LOGGER.log(Level.INFO, + "Received {0} [VALUE NODES] [{1}] from Node {2})", + new Object[] { numReceived, nodes.toString(), fromID }); + } + + private void receiveData(NodeIdentifier fromID, Identifier rpcID) { + + String data = new String(buffer.array()); + String parts[] = data.split("-"); + + String fileID = parts[0]; + int chunkCount = Integer.parseInt(parts[1]); + int chunkID = Integer.parseInt(parts[2]); + String chunkContent = parts[3]; + LOGGER.log(Level.INFO,"recieved Chunk file: "+fileID+" count: "+chunkCount+" id: "+chunkID); + + FileIdentifier fid = new FileIdentifier(1,fileID.getBytes()); + if(chunklist.get(fid) == null){ + chunklist.put(fid, new HashMap()); + } + + if(chunklist.get(fid).get(chunkID) == null){ + chunklist.get(fid).put(chunkID, chunkContent); + } + + if(chunklist.get(fid).size() >= chunkCount){ + LOGGER.log(Level.INFO,"FILE complete file: "+fileID+" count: "+chunkCount+" id: "+chunkID); + String file = ""; + for(int i=0; i mark this RPC ID as + // received + node.receivedRPC(fromID, rpcID); + } + + private void receivePing(NodeIdentifier fromID, Identifier rpcID) { + LOGGER.log(Level.INFO, "Received [PING] from {0}", + new Object[] { fromID }); + node.sendPong(fromID, rpcID); + } + + private void receiveNodes(NodeIdentifier fromID, Identifier rpcID) { + + int numReceived = 0; + + // This is just for the log message + StringBuilder nodes = new StringBuilder(); + + while (buffer.hasRemaining()) { + NodeIdentifier newID = getNodeTripleFromBuffer(); + node.updateBuckets(newID); + nodes.append(newID).append(", "); + numReceived++; + } + + // This should be the answer to a prior FIND_NODE -> mark this RPC ID as + // received + node.receivedRPC(fromID, rpcID); + + LOGGER.log(Level.INFO, "Received {0} [NODES] [{1}] from Node {2})", + new Object[] { numReceived, nodes.toString(), fromID }); + } + + private void receiveFindNode(NodeIdentifier fromID, Identifier rpc_id) { + Identifier idToFind = getIDFromBuffer(); + + LOGGER.log(Level.INFO, "Received [FIND_NODE {0}] from Node {1}", + new Object[] { idToFind, fromID }); + + node.sendClosestNodesTo(fromID, idToFind, rpc_id, true); + } + + private void receiveStore(NodeIdentifier fromID, Identifier rpcID) { + Identifier fileID = getIDFromBuffer(); + + LOGGER.log(Level.INFO, "Received [STORE {0}] from Node {1}", + new Object[] { fileID, fromID }); + + node.storePair(fileID, fromID); + + node.sendAck(fromID, rpcID); + } + + private void receiveFindValue(NodeIdentifier fromID, Identifier rpcID) { + Identifier fileID = getIDFromBuffer(); + + LOGGER.log(Level.INFO, "Received [FIND VALUE {0}] from Node {1}", + new Object[] { fileID, fromID }); + + if (node.hasKey(fileID)) { + node.sendFoundValue(fromID, fileID, rpcID); + } else { + node.sendClosestNodesTo(fromID, fileID, rpcID, false); + } + } + + public void terminate() { + running = false; + } +} diff --git a/src/routingtable/Bucket.java b/src/routingtable/Bucket.java new file mode 100644 index 0000000..271d063 --- /dev/null +++ b/src/routingtable/Bucket.java @@ -0,0 +1,145 @@ +package routingtable; + +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +import message.MessageCallback; +import node.Node; +import node.NodeIdentifier; + +public class Bucket { + private final static Logger LOGGER = Logger.getLogger(Bucket.class + .getName()); + + private Bucket left; + private Bucket right; + + private List entries; + + private int bucketSize; + private int level; + + private Node node; + + public Bucket(int bucketSize, int level, Node node) { + this.bucketSize = bucketSize; + this.level = level; + this.node = node; + entries = new ArrayList(); + } + + /** + * Returns the nodes of this very bucket. + * + * @return + */ + public List getNodes() { + return entries; + } + + public boolean contains(NodeIdentifier id) { + if (!isLeaf()) { + return left.contains(id) || right.contains(id); + } + return entries.contains(id); + } + + /** + * Tries to update the given node. + * + * @param id + * @return true if the node is still available, else false + */ + public void update(final NodeIdentifier id) { + if (!isLeaf()) { + if (id.isBitSetAt(level)) { + left.update(id); + } else { + right.update(id); + } + } else { + node.sendPing(id, new MessageCallback() { + @Override + public void onReceive() { + LOGGER.log(Level.INFO, + "Node answered in time, moving to top of list."); + entries.remove(id); + entries.add(0, id); + } + + @Override + public void onTimeout() { + LOGGER.log(Level.INFO, "Node didnt answer in time."); + // TODO: this should be propagated to the "upper" Routing + // Table, not just to this specific bucket + entries.remove(id); + } + }); + } + } + + public void insert(NodeIdentifier newId) { + insert(newId, ""); + } + + public void insert(NodeIdentifier newId, String path) { + if (isLeaf()) { + if (entries.size() < bucketSize) { + LOGGER.log(Level.INFO, + "Added node {0} to RT [{1}] on level {2}", + new Object[] { newId, path, level }); + entries.add(newId); + } else { + LOGGER.log(Level.INFO, "Split on level " + level + + " while adding " + newId); + + LOGGER.log(Level.INFO, + "Distributing present nodes to lower buckets"); + + Bucket newLeft = new Bucket(bucketSize, level + 1, node); + Bucket newRight = new Bucket(bucketSize, level + 1, node); + + // Add the new entry and in the following loop distribute all + // existing entries to left/right + entries.add(newId); + + for (NodeIdentifier id : entries) { + if (id.isBitSetAt(level)) { + newLeft.insert(id, path + "1"); + } else { + newRight.insert(id, path + "0"); + } + } + + this.entries = null; + this.left = newLeft; + this.right = newRight; + } + } else { + if (newId.isBitSetAt(level)) { + left.insert(newId, path + "1"); + } else { + right.insert(newId, path + "0"); + } + } + + } + + private boolean isLeaf() { + return left == null && right == null; + } + + public void remove(NodeIdentifier node) { + if (isLeaf()) { + entries.remove(node); + } else { + if (node.isBitSetAt(level)) { + left.remove(node); + } else { + right.remove(node); + } + } + } +} \ No newline at end of file diff --git a/src/routingtable/IRoutingTable.java b/src/routingtable/IRoutingTable.java new file mode 100644 index 0000000..8dd53f0 --- /dev/null +++ b/src/routingtable/IRoutingTable.java @@ -0,0 +1,19 @@ +package routingtable; + +import java.util.Set; + +import node.Identifier; +import node.NodeIdentifier; + +public interface IRoutingTable { + + public void insert(NodeIdentifier id); + + public Set getClosestNodesTo(Identifier id); + + public boolean contains(NodeIdentifier node); + + public void remove(NodeIdentifier node); + + public Set getEntries(); +} \ No newline at end of file diff --git a/src/routingtable/RoutingTableImpl.java b/src/routingtable/RoutingTableImpl.java new file mode 100644 index 0000000..5a36c49 --- /dev/null +++ b/src/routingtable/RoutingTableImpl.java @@ -0,0 +1,79 @@ +package routingtable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import node.Identifier; +import node.Node; +import node.NodeIdentifier; + +public class RoutingTableImpl implements IRoutingTable { + private Set entries = new HashSet(); + + private Bucket root; + + private int bucketSize; + + public RoutingTableImpl(int bucketSize, Node node) { + this.bucketSize = bucketSize; + this.root = new Bucket(bucketSize, 0, node); + } + + @Override + public void insert(NodeIdentifier id) { + if (root.contains(id)) { + root.update(id); + } else { + entries.add(id); + root.insert(id); + } + } + + @Override + public Set getClosestNodesTo(final Identifier id) { + Set result = new HashSet(); + + if (entries.size() <= bucketSize) { + result.addAll(entries); + + } else { + List temp = new ArrayList(entries); + + Collections.sort(temp, new Comparator() { + @Override + public int compare(NodeIdentifier o1, NodeIdentifier o2) { + BigInteger dist1 = id.distanceTo(o1); + BigInteger dist2 = id.distanceTo(o2); + return dist1.compareTo(dist2); + } + }); + + for (int i = 0; i < bucketSize; i++) { + result.add(temp.get(i)); + } + result = new HashSet(temp.subList(0, + Node.BUCKET_SIZE)); + } + return result; + } + + @Override + public boolean contains(NodeIdentifier node) { + return root.contains(node); + } + + @Override + public void remove(NodeIdentifier node) { + + } + + @Override + public Set getEntries() { + return entries; + } +} \ No newline at end of file diff --git a/src/util/BufferUtil.java b/src/util/BufferUtil.java new file mode 100644 index 0000000..182ce91 --- /dev/null +++ b/src/util/BufferUtil.java @@ -0,0 +1,28 @@ +package util; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; + +public class BufferUtil { + + public static ByteBuffer clone(ByteBuffer original) { + ByteBuffer clone = ByteBuffer.allocate(original.capacity()); + + int oldPosition = original.position(); + original.rewind();// copy from the beginning + clone.put(original); + // original.rewind(); + original.position(oldPosition); + clone.flip(); + return clone; + } + + public static byte[] addrToBytes(InetSocketAddress addr) { + ByteBuffer buffer = ByteBuffer.allocate(8); + for (String part : addr.getHostString().split("\\.")) { + buffer.put(Byte.valueOf(part)); + } + buffer.putInt(addr.getPort()); + return buffer.array(); + } +} diff --git a/u5-1.pdf b/u5-1.pdf new file mode 100644 index 0000000..ce6dd92 Binary files /dev/null and b/u5-1.pdf differ