From 2b47b6fb9c47dcaf98291b08d0ae249818ee9b4a Mon Sep 17 00:00:00 2001 From: senft-desktop Date: Thu, 17 Jan 2013 20:22:58 +0100 Subject: [PATCH] Kill me --- ws2012/P2P/uebungen/8/src/CLI.java | 3 + ws2012/P2P/uebungen/8/src/message/Ack.java | 27 +-- .../8/src/message/MessageCallback.java | 8 + .../uebungen/8/src/message/MessageType.java | 1 + .../P2P/uebungen/8/src/node/Identifier.java | 56 +++++-- ws2012/P2P/uebungen/8/src/node/Node.java | 50 ++++-- .../uebungen/8/src/node/NodeIdentifier.java | 9 +- .../P2P/uebungen/8/src/node/UDPHandler.java | 22 ++- .../uebungen/8/src/routingtable/Bucket.java | 156 ++++++++++-------- .../8/src/routingtable/RoutingTable.java | 61 +------ .../8/src/routingtable/RoutingTableImpl.java | 79 +++++++++ 11 files changed, 292 insertions(+), 180 deletions(-) create mode 100644 ws2012/P2P/uebungen/8/src/message/MessageCallback.java create mode 100644 ws2012/P2P/uebungen/8/src/routingtable/RoutingTableImpl.java diff --git a/ws2012/P2P/uebungen/8/src/CLI.java b/ws2012/P2P/uebungen/8/src/CLI.java index d7c61fe2..5e7b9512 100644 --- a/ws2012/P2P/uebungen/8/src/CLI.java +++ b/ws2012/P2P/uebungen/8/src/CLI.java @@ -33,6 +33,9 @@ public class CLI { System.out.println(id); } break; + case "leave": + node.leave(); + break; default: System.out.println("Unknown command."); break; diff --git a/ws2012/P2P/uebungen/8/src/message/Ack.java b/ws2012/P2P/uebungen/8/src/message/Ack.java index b7b24eac..ead9b8fb 100644 --- a/ws2012/P2P/uebungen/8/src/message/Ack.java +++ b/ws2012/P2P/uebungen/8/src/message/Ack.java @@ -1,6 +1,5 @@ package message; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.util.logging.Level; @@ -26,14 +25,15 @@ public class Ack { // The channel to re-send the message on private DatagramChannel channel; + private MessageCallback callback; + public Ack(Identifier id, NodeIdentifier receiver, DatagramChannel channel, - ByteBuffer buffer) { - // TODO: this needs a max. number of retries (so that a ping to a failed - // host doesn't re-ping forever) + ByteBuffer buffer, MessageCallback cb) { this.id = id; this.receiver = receiver; this.channel = channel; this.buffer = BufferUtil.clone(buffer); + this.callback = cb; startThread(); } @@ -89,15 +89,16 @@ public class Ack { // Timeout hit -> re-send if (notReceived) { - try { - LOGGER.log(Level.INFO, - "Absent RPC ack {0}. Resending to {1}", - new Object[] { id, receiver.toString() }); - channel.send(buffer, receiver.getAddress()); - startThread(); - } catch (IOException e) { - System.out.println("ASDASD"); - e.printStackTrace(); + + LOGGER.log(Level.INFO, "Absent RPC ack {0}.", + new Object[] { id }); + if (callback != null) { + callback.onTimeout(); + } + } else { + // has been received + if (callback != null) { + callback.onReceive(); } } } diff --git a/ws2012/P2P/uebungen/8/src/message/MessageCallback.java b/ws2012/P2P/uebungen/8/src/message/MessageCallback.java new file mode 100644 index 00000000..9791a302 --- /dev/null +++ b/ws2012/P2P/uebungen/8/src/message/MessageCallback.java @@ -0,0 +1,8 @@ +package message; + +public interface MessageCallback { + + public void onReceive(); + + public void onTimeout(); +} diff --git a/ws2012/P2P/uebungen/8/src/message/MessageType.java b/ws2012/P2P/uebungen/8/src/message/MessageType.java index 7cf981e9..14d82565 100644 --- a/ws2012/P2P/uebungen/8/src/message/MessageType.java +++ b/ws2012/P2P/uebungen/8/src/message/MessageType.java @@ -6,4 +6,5 @@ public class MessageType { public final static byte PING = 10; public final static byte PONG = 11; + public static final byte LEAVE = 2; } diff --git a/ws2012/P2P/uebungen/8/src/node/Identifier.java b/ws2012/P2P/uebungen/8/src/node/Identifier.java index b7c74584..1999e1ba 100644 --- a/ws2012/P2P/uebungen/8/src/node/Identifier.java +++ b/ws2012/P2P/uebungen/8/src/node/Identifier.java @@ -9,31 +9,35 @@ public class Identifier { protected BitSet bits; - public Identifier(byte[] bytes) { + private int size; + + public Identifier(int size, byte[] bytes) { + this.size = size; this.bits = BitSet.valueOf(bytes); } - private Identifier(BitSet bits) { + private Identifier(int size, BitSet bits) { + this.size = size; this.bits = bits; } - public static Identifier getStaticIdentifier() { - BitSet middle = new BitSet(Node.ID_BITS); - middle.set(Node.ID_BITS - 1); - return new Identifier(middle); + public static Identifier getStaticIdentifier(int size) { + BitSet middle = new BitSet(size); + middle.set(size - 1); + return new Identifier(size, middle); } - public static Identifier getRandomIdentifier() { - BitSet bits = new BitSet(Node.ID_BITS); + public static Identifier getRandomIdentifier(int size) { + BitSet bits = new BitSet(size); - for (int i = 0; i < Node.ID_BITS; i++) { + for (int i = 0; i < size; i++) { double threshold = random.nextGaussian(); if (threshold > 0) { bits.set(i); } } - return new Identifier(bits); + return new Identifier(size, bits); } public BigInteger distanceTo(Identifier otherID) { @@ -42,8 +46,38 @@ public class Identifier { return new BigInteger(1, distance.toByteArray()); } + /** + * Returns whether the bit at the given position is set or not. The MSB is + * at position 0. + * + * @param index + * the index to check + * @return true if the bit is set + */ public boolean isBitSetAt(int index) { - return bits.get(index); + // System.out.println("---------------------------------"); + BigInteger intValue = new BigInteger(1, bits.toByteArray()); + int numOfTrimmedZeros = size - intValue.bitLength(); + + if (index < numOfTrimmedZeros) { + // System.out.println("trimm-Zweig: " + index); + return false; + } + + // System.out + // .println(bits + " " + bits.length() + ", " + // + numOfTrimmedZeros); + // System.out.println(bits + " " + toString() + "isSet? " + index + " (" + // + ((bits.length() - size) + index) + ") " + // + bits.get((bits.length() - size) + index)); + + // return bits.get((bits.length() - size) + index); + + // System.out + // .println(bits.get(bits.length() - (index + numOfTrimmedZeros) + // - 1) + // + " " + index); + return bits.get(bits.length() - (index + numOfTrimmedZeros) - 1); } public byte[] getBytes() { diff --git a/ws2012/P2P/uebungen/8/src/node/Node.java b/ws2012/P2P/uebungen/8/src/node/Node.java index 17c06d3b..a5e21a91 100644 --- a/ws2012/P2P/uebungen/8/src/node/Node.java +++ b/ws2012/P2P/uebungen/8/src/node/Node.java @@ -14,8 +14,10 @@ import java.util.logging.Level; import java.util.logging.Logger; import message.Ack; +import message.MessageCallback; import message.MessageType; import routingtable.RoutingTable; +import routingtable.RoutingTableImpl; public class Node { /** @@ -33,7 +35,7 @@ public class Node { */ private static final int INITIAL_PORT = 50000; private static final Identifier INITIAL_ID = Identifier - .getStaticIdentifier(); + .getStaticIdentifier(ID_BITS); private static final int BUFFER_SIZE = 512; @@ -52,9 +54,9 @@ public class Node { private Thread thread; private UDPHandler udpListen; - private Identifier nodeID = Identifier.getRandomIdentifier(); + private Identifier nodeID = Identifier.getRandomIdentifier(ID_BITS); - private RoutingTable rt = new RoutingTable(BUCKET_SIZE); + private RoutingTable rt = new RoutingTableImpl(BUCKET_SIZE, this); public Node() { System.setProperty("java.net.preferIPv4Stack", "true"); @@ -86,17 +88,17 @@ public class Node { // The port of this node is not the "INITIAL_PORT" (so it's not // the first node in the network). So we try to join the network // via the first node. - NodeIdentifier viaNode = new NodeIdentifier( + NodeIdentifier viaNode = new NodeIdentifier(ID_BITS, INITIAL_ID.getBytes(), new InetSocketAddress( "127.0.0.1", INITIAL_PORT)); LOGGER.log(Level.INFO, "Trying to join network via node {0}", new Object[] { viaNode }); - rt.update(viaNode); + rt.insert(viaNode); sendFindNode(viaNode, this.nodeID); - sendPing(viaNode); + // sendPing(viaNode); } } catch (IOException e) { @@ -105,16 +107,16 @@ public class Node { } private Identifier createRPCID() { - Identifier rpcID = Identifier.getRandomIdentifier(); + Identifier rpcID = Identifier.getRandomIdentifier(ID_BITS); while (rpcs.containsKey(rpcID)) { - rpcID = Identifier.getRandomIdentifier(); + rpcID = Identifier.getRandomIdentifier(ID_BITS); } return rpcID; } void sendFindNode(NodeIdentifier receiver, Identifier idToFind) { boolean successful = send(receiver, MessageType.FIND_NODE, - idToFind.getBytes(), true); + idToFind.getBytes(), true, null); if (successful) { LOGGER.log(Level.INFO, "Sending [FIND_NODE {0}] to node {1}", @@ -138,7 +140,7 @@ public class Node { } boolean successful = send(receiver, MessageType.NODES, rpcID, - nodes.array(), false); + nodes.array(), false, null); if (successful) { LOGGER.log( @@ -148,13 +150,13 @@ public class Node { } } - boolean sendPing(NodeIdentifier receiver) { - return send(receiver, MessageType.PING, null, true); + public boolean sendPing(NodeIdentifier receiver, MessageCallback cb) { + return send(receiver, MessageType.PING, null, true, cb); } void sendPong(NodeIdentifier receiver, Identifier rpcID) { boolean successful = send(receiver, MessageType.PONG, rpcID, null, - false); + false, null); if (successful) { LOGGER.log(Level.INFO, "Sending [PONG] to {0} (rpcID={1})", @@ -163,12 +165,13 @@ public class Node { } private boolean send(NodeIdentifier to, byte messageType, byte[] data, - boolean reliable) { - return send(to, messageType, createRPCID(), data, reliable); + boolean reliable, MessageCallback cb) { + return send(to, messageType, createRPCID(), data, reliable, cb); } private boolean send(NodeIdentifier to, byte messageType, Identifier rpcID, - byte[] data, boolean reliable) { + byte[] data, boolean reliable, MessageCallback cb) { + boolean successful = true; ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); @@ -196,7 +199,7 @@ public class Node { if (reliable) { // This message should be reliable (acked) - Ack newAck = new Ack(rpcID, to, channel, buffer); + Ack newAck = new Ack(rpcID, to, channel, buffer, cb); if (rpcs.containsKey(rpcID)) { rpcs.get(rpcID).add(newAck); } else { @@ -227,7 +230,7 @@ public class Node { } public void updateBuckets(NodeIdentifier id) { - rt.update(id); + rt.insert(id); } public Identifier getID() { @@ -262,4 +265,15 @@ public class Node { return removedAck; } + + public void leave() { + for (NodeIdentifier n : getNeighbors()) { + sendLeave(n); + } + System.exit(0); + } + + private boolean sendLeave(NodeIdentifier n) { + return send(n, MessageType.LEAVE, null, false, null); + } } diff --git a/ws2012/P2P/uebungen/8/src/node/NodeIdentifier.java b/ws2012/P2P/uebungen/8/src/node/NodeIdentifier.java index 60adecb5..51f014e0 100644 --- a/ws2012/P2P/uebungen/8/src/node/NodeIdentifier.java +++ b/ws2012/P2P/uebungen/8/src/node/NodeIdentifier.java @@ -1,6 +1,5 @@ package node; -import java.math.BigInteger; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -10,8 +9,8 @@ public class NodeIdentifier extends Identifier { private InetSocketAddress address; - public NodeIdentifier(byte[] bytes, InetSocketAddress address) { - super(bytes); + public NodeIdentifier(int size, byte[] bytes, InetSocketAddress address) { + super(size, bytes); this.address = address; } @@ -28,8 +27,8 @@ public class NodeIdentifier extends Identifier { } public String toString() { - return new BigInteger(1, bits.toByteArray()).toString() + " (" - + address.toString() + ")"; + return super.toString() + " (" + address.toString() + ")"; + // return bits.toString(); } } diff --git a/ws2012/P2P/uebungen/8/src/node/UDPHandler.java b/ws2012/P2P/uebungen/8/src/node/UDPHandler.java index 82814fb0..71034b58 100644 --- a/ws2012/P2P/uebungen/8/src/node/UDPHandler.java +++ b/ws2012/P2P/uebungen/8/src/node/UDPHandler.java @@ -49,7 +49,7 @@ public class UDPHandler implements Runnable { for (int i = 0; i < numBytes; i++) { result[i] = buffer.get(); } - return new Identifier(result); + return new Identifier(Node.ID_BITS, result); } private NodeIdentifier getNodeTripleFromBuffer() { @@ -60,7 +60,7 @@ public class UDPHandler implements Runnable { for (int i = 0; i < numBytes; i++) { result[i] = buffer.get(); } - return new NodeIdentifier(result, address); + return new NodeIdentifier(Node.ID_BITS, result, address); } public void run() { @@ -69,6 +69,7 @@ public class UDPHandler implements Runnable { // Run until I get killed, and all my acks have been answered while (running || node.hasAcks()) { try { + boolean updateRT = true; from = (InetSocketAddress) node.getChannel().receive(buffer); // channel.receive() is non-blocking. So we need to check if @@ -78,7 +79,7 @@ public class UDPHandler implements Runnable { byte messageType = buffer.get(); - NodeIdentifier fromID = new NodeIdentifier( + NodeIdentifier fromID = new NodeIdentifier(Node.ID_BITS, getIDFromBuffer().getBytes(), from); Identifier rpcID = getIDFromBuffer(); @@ -91,11 +92,20 @@ public class UDPHandler implements Runnable { receiveNodes(fromID, rpcID); break; case MessageType.PING: + updateRT = false; receivePing(fromID, rpcID); break; case MessageType.PONG: + updateRT = false; receivePong(fromID, rpcID); break; + case MessageType.LEAVE: + // updateRT = false; + // receivePong(fromID, rpcID); + // do nothing, update buckets later + LOGGER.log(Level.INFO, "Received leave from {0}", + new Object[] { from.toString() }); + break; default: LOGGER.log(Level.INFO, "Received unknown command from {0}: [{1}]{2}", @@ -103,8 +113,10 @@ public class UDPHandler implements Runnable { new String(buffer.array()) }); } - node.updateBuckets(new NodeIdentifier(fromID.getBytes(), - from)); + if (updateRT) { + node.updateBuckets(new NodeIdentifier(Node.ID_BITS, + fromID.getBytes(), from)); + } } else { // If nothing has been read/received wait and read/receive diff --git a/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java b/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java index 9b1c3012..a8fa4463 100644 --- a/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java +++ b/ws2012/P2P/uebungen/8/src/routingtable/Bucket.java @@ -5,117 +5,127 @@ import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; +import message.MessageCallback; +import node.Node; import node.NodeIdentifier; public class Bucket { private final static Logger LOGGER = Logger.getLogger(Bucket.class .getName()); + private Bucket left; + private Bucket right; + + private List entries; + private int bucketSize; + private int level; - private List entries = new ArrayList(); - private Bucket left = null; - private Bucket right = null; + private Node node; - private int level = 0; - - public Bucket(int size, int level) { + public Bucket(int bucketSize, int level, Node node) { + this.bucketSize = bucketSize; this.level = level; - this.bucketSize = size; + this.node = node; + entries = new ArrayList(); } - public void update(NodeIdentifier newID) { - update(this, newID); + /** + * Returns the nodes of this very bucket. + * + * @return + */ + public List getNodes() { + return entries; } - private void update(Bucket bucket, NodeIdentifier newID) { - if (!bucket.isLeaf()) { - if (newID.isBitSetAt(level)) { - - update(bucket.right, newID); + public boolean contains(NodeIdentifier id) { + if (!isLeaf()) { + return left.contains(id) || right.contains(id); + } + return entries.contains(id); + } + /** + * Tries to update the given node. + * + * @param id + * @return true if the node is still available, else false + */ + public void update(final NodeIdentifier id) { + if (!isLeaf()) { + if (id.isBitSetAt(level)) { + left.update(id); } else { - - update(bucket.left, newID); - + right.update(id); } - } else if (bucket.hasSpace()) { - if (bucket.entries.contains(newID)) { - // Move to beginning - LOGGER.log( - Level.FINE, - "Node {0} already in routing table. Move to end of bucket.", - new Object[] { newID }); - - if (bucket.entries.size() > 1) { - bucket.entries.remove(newID); - bucket.entries.add(newID); + } else { + node.sendPing(id, new MessageCallback() { + @Override + public void onReceive() { + LOGGER.log(Level.INFO, + "Node answered in time, moving to top of list."); + entries.remove(id); + entries.add(0, id); } + + @Override + public void onTimeout() { + LOGGER.log(Level.INFO, "Node didnt answer in time."); + entries.remove(id); + } + }); + } + } + + public void insert(NodeIdentifier newId) { + insert(newId, ""); + } + + public void insert(NodeIdentifier newId, String path) { + if (isLeaf()) { + if (entries.size() < bucketSize) { + LOGGER.log(Level.INFO, + "Added node {0} to RT [{1}] on level {2}", + new Object[] { newId, path, level }); + entries.add(newId); } else { - LOGGER.log(Level.INFO, "Added new node {0} to routing table.", - new Object[] { newID }); - bucket.entries.add(newID); - } + LOGGER.log(Level.INFO, "Split on level " + level + + " while adding " + newId); - } else { // Bucket is full - if (bucket.entries.contains(newID)) { - // Node is already present -> check if [last contacted node of - // this buffer] is still alive - LOGGER.log( - Level.FINE, - "Node {0} already in routing table. Check if still alive...", - new Object[] { newID }); + LOGGER.log(Level.INFO, + "Distributing present nodes to lower buckets"); - } else { - // TODO: only split if necessary - - Bucket newLeft = new Bucket(bucketSize, level + 1); - Bucket newRight = new Bucket(bucketSize, level + 1); + Bucket newLeft = new Bucket(bucketSize, level + 1, node); + Bucket newRight = new Bucket(bucketSize, level + 1, node); // Add the new entry and in the following loop distribute all // existing entries to left/right - bucket.entries.add(newID); + entries.add(newId); for (NodeIdentifier id : entries) { if (id.isBitSetAt(level)) { - newLeft.entries.add(id); + newLeft.insert(id, path + "1"); } else { - newRight.entries.add(id); + newRight.insert(id, path + "0"); } } - bucket.entries = null; - bucket.left = newLeft; - bucket.right = newRight; - } - } - } - public boolean hasNode(NodeIdentifier id) { - return hasNode(this, id); - } - - private boolean hasNode(Bucket bucket, NodeIdentifier idToFind) { - if (bucket.isLeaf()) { - for (NodeIdentifier id : bucket.entries) { - if (id.equals(idToFind)) { - return true; - } + this.entries = null; + this.left = newLeft; + this.right = newRight; } - return false; } else { - if (idToFind.isBitSetAt(level)) { - return bucket.hasNode(bucket.left, idToFind); + if (newId.isBitSetAt(level)) { + left.insert(newId, path + "1"); } else { - return bucket.hasNode(bucket.right, idToFind); + right.insert(newId, path + "0"); } } + } private boolean isLeaf() { - return left == null && right == null && entries != null; - } - - private boolean hasSpace() { - return entries.size() < bucketSize; + return left == null && right == null; } } \ No newline at end of file diff --git a/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java b/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java index 7882f917..c7a995ec 100644 --- a/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java +++ b/ws2012/P2P/uebungen/8/src/routingtable/RoutingTable.java @@ -1,68 +1,19 @@ package routingtable; -import java.math.BigInteger; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; import java.util.Set; import node.Identifier; -import node.Node; import node.NodeIdentifier; -public class RoutingTable { +public interface RoutingTable { - private Bucket tree; + public void insert(NodeIdentifier id); - private Set entries = new HashSet(); + public Set getClosestNodesTo(Identifier id); - private int bucketSize; + public boolean contains(NodeIdentifier node); - public RoutingTable(int bucketSize) { - this.tree = new Bucket(bucketSize, 0); - this.bucketSize = bucketSize; - } + public void remove(NodeIdentifier node); - public void update(NodeIdentifier id) { - entries.add(id); - tree.update(id); - } - - public Set getClosestNodesTo(final Identifier id) { - Set result = new HashSet(); - - if (entries.size() <= bucketSize) { - result.addAll(entries); - - } else { - List temp = new ArrayList(entries); - - Collections.sort(temp, new Comparator() { - @Override - public int compare(NodeIdentifier o1, NodeIdentifier o2) { - BigInteger dist1 = id.distanceTo(o1); - BigInteger dist2 = id.distanceTo(o2); - return dist1.compareTo(dist2); - } - }); - - - for (int i = 0; i < Node.BUCKET_SIZE; i++) { - result.add(temp.get(i)); - } - result = new HashSet(temp.subList(0, - Node.BUCKET_SIZE)); - } - return result; - } - - public void remove(Identifier node) { - // TODO - } - - public Set getEntries() { - return entries; - } + public Set getEntries(); } \ No newline at end of file diff --git a/ws2012/P2P/uebungen/8/src/routingtable/RoutingTableImpl.java b/ws2012/P2P/uebungen/8/src/routingtable/RoutingTableImpl.java new file mode 100644 index 00000000..1900191d --- /dev/null +++ b/ws2012/P2P/uebungen/8/src/routingtable/RoutingTableImpl.java @@ -0,0 +1,79 @@ +package routingtable; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import node.Identifier; +import node.Node; +import node.NodeIdentifier; + +public class RoutingTableImpl implements RoutingTable { + private Set entries = new HashSet(); + + private Bucket root; + + private int bucketSize; + + public RoutingTableImpl(int bucketSize, Node node) { + this.bucketSize = bucketSize; + this.root = new Bucket(bucketSize, 0, node); + } + + @Override + public void insert(NodeIdentifier id) { + if (root.contains(id)) { + root.update(id); + } else { + entries.add(id); + root.insert(id); + } + } + + @Override + public Set getClosestNodesTo(final Identifier id) { + Set result = new HashSet(); + + if (entries.size() <= bucketSize) { + result.addAll(entries); + + } else { + List temp = new ArrayList(entries); + + Collections.sort(temp, new Comparator() { + @Override + public int compare(NodeIdentifier o1, NodeIdentifier o2) { + BigInteger dist1 = id.distanceTo(o1); + BigInteger dist2 = id.distanceTo(o2); + return dist1.compareTo(dist2); + } + }); + + for (int i = 0; i < bucketSize; i++) { + result.add(temp.get(i)); + } + result = new HashSet(temp.subList(0, + Node.BUCKET_SIZE)); + } + return result; + } + + @Override + public boolean contains(NodeIdentifier node) { + return root.contains(node); + } + + @Override + public void remove(NodeIdentifier node) { + + } + + @Override + public Set getEntries() { + return entries; + } +} \ No newline at end of file