This commit is contained in:
senft-desktop 2013-01-17 20:22:58 +01:00
parent 6aa66896ab
commit 2b47b6fb9c
11 changed files with 292 additions and 180 deletions

View File

@ -33,6 +33,9 @@ public class CLI {
System.out.println(id);
}
break;
case "leave":
node.leave();
break;
default:
System.out.println("Unknown command.");
break;

View File

@ -1,6 +1,5 @@
package message;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.logging.Level;
@ -26,14 +25,15 @@ public class Ack {
// The channel to re-send the message on
private DatagramChannel channel;
private MessageCallback callback;
public Ack(Identifier id, NodeIdentifier receiver, DatagramChannel channel,
ByteBuffer buffer) {
// TODO: this needs a max. number of retries (so that a ping to a failed
// host doesn't re-ping forever)
ByteBuffer buffer, MessageCallback cb) {
this.id = id;
this.receiver = receiver;
this.channel = channel;
this.buffer = BufferUtil.clone(buffer);
this.callback = cb;
startThread();
}
@ -89,15 +89,16 @@ public class Ack {
// Timeout hit -> re-send
if (notReceived) {
try {
LOGGER.log(Level.INFO,
"Absent RPC ack {0}. Resending to {1}",
new Object[] { id, receiver.toString() });
channel.send(buffer, receiver.getAddress());
startThread();
} catch (IOException e) {
System.out.println("ASDASD");
e.printStackTrace();
LOGGER.log(Level.INFO, "Absent RPC ack {0}.",
new Object[] { id });
if (callback != null) {
callback.onTimeout();
}
} else {
// has been received
if (callback != null) {
callback.onReceive();
}
}
}

View File

@ -0,0 +1,8 @@
package message;
public interface MessageCallback {
public void onReceive();
public void onTimeout();
}

View File

@ -6,4 +6,5 @@ public class MessageType {
public final static byte PING = 10;
public final static byte PONG = 11;
public static final byte LEAVE = 2;
}

View File

@ -9,31 +9,35 @@ public class Identifier {
protected BitSet bits;
public Identifier(byte[] bytes) {
private int size;
public Identifier(int size, byte[] bytes) {
this.size = size;
this.bits = BitSet.valueOf(bytes);
}
private Identifier(BitSet bits) {
private Identifier(int size, BitSet bits) {
this.size = size;
this.bits = bits;
}
public static Identifier getStaticIdentifier() {
BitSet middle = new BitSet(Node.ID_BITS);
middle.set(Node.ID_BITS - 1);
return new Identifier(middle);
public static Identifier getStaticIdentifier(int size) {
BitSet middle = new BitSet(size);
middle.set(size - 1);
return new Identifier(size, middle);
}
public static Identifier getRandomIdentifier() {
BitSet bits = new BitSet(Node.ID_BITS);
public static Identifier getRandomIdentifier(int size) {
BitSet bits = new BitSet(size);
for (int i = 0; i < Node.ID_BITS; i++) {
for (int i = 0; i < size; i++) {
double threshold = random.nextGaussian();
if (threshold > 0) {
bits.set(i);
}
}
return new Identifier(bits);
return new Identifier(size, bits);
}
public BigInteger distanceTo(Identifier otherID) {
@ -42,8 +46,38 @@ public class Identifier {
return new BigInteger(1, distance.toByteArray());
}
/**
* Returns whether the bit at the given position is set or not. The MSB is
* at position 0.
*
* @param index
* the index to check
* @return true if the bit is set
*/
public boolean isBitSetAt(int index) {
return bits.get(index);
// System.out.println("---------------------------------");
BigInteger intValue = new BigInteger(1, bits.toByteArray());
int numOfTrimmedZeros = size - intValue.bitLength();
if (index < numOfTrimmedZeros) {
// System.out.println("trimm-Zweig: " + index);
return false;
}
// System.out
// .println(bits + " " + bits.length() + ", "
// + numOfTrimmedZeros);
// System.out.println(bits + " " + toString() + "isSet? " + index + " ("
// + ((bits.length() - size) + index) + ") "
// + bits.get((bits.length() - size) + index));
// return bits.get((bits.length() - size) + index);
// System.out
// .println(bits.get(bits.length() - (index + numOfTrimmedZeros)
// - 1)
// + " " + index);
return bits.get(bits.length() - (index + numOfTrimmedZeros) - 1);
}
public byte[] getBytes() {

View File

@ -14,8 +14,10 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import message.Ack;
import message.MessageCallback;
import message.MessageType;
import routingtable.RoutingTable;
import routingtable.RoutingTableImpl;
public class Node {
/**
@ -33,7 +35,7 @@ public class Node {
*/
private static final int INITIAL_PORT = 50000;
private static final Identifier INITIAL_ID = Identifier
.getStaticIdentifier();
.getStaticIdentifier(ID_BITS);
private static final int BUFFER_SIZE = 512;
@ -52,9 +54,9 @@ public class Node {
private Thread thread;
private UDPHandler udpListen;
private Identifier nodeID = Identifier.getRandomIdentifier();
private Identifier nodeID = Identifier.getRandomIdentifier(ID_BITS);
private RoutingTable rt = new RoutingTable(BUCKET_SIZE);
private RoutingTable rt = new RoutingTableImpl(BUCKET_SIZE, this);
public Node() {
System.setProperty("java.net.preferIPv4Stack", "true");
@ -86,17 +88,17 @@ public class Node {
// The port of this node is not the "INITIAL_PORT" (so it's not
// the first node in the network). So we try to join the network
// via the first node.
NodeIdentifier viaNode = new NodeIdentifier(
NodeIdentifier viaNode = new NodeIdentifier(ID_BITS,
INITIAL_ID.getBytes(), new InetSocketAddress(
"127.0.0.1", INITIAL_PORT));
LOGGER.log(Level.INFO, "Trying to join network via node {0}",
new Object[] { viaNode });
rt.update(viaNode);
rt.insert(viaNode);
sendFindNode(viaNode, this.nodeID);
sendPing(viaNode);
// sendPing(viaNode);
}
} catch (IOException e) {
@ -105,16 +107,16 @@ public class Node {
}
private Identifier createRPCID() {
Identifier rpcID = Identifier.getRandomIdentifier();
Identifier rpcID = Identifier.getRandomIdentifier(ID_BITS);
while (rpcs.containsKey(rpcID)) {
rpcID = Identifier.getRandomIdentifier();
rpcID = Identifier.getRandomIdentifier(ID_BITS);
}
return rpcID;
}
void sendFindNode(NodeIdentifier receiver, Identifier idToFind) {
boolean successful = send(receiver, MessageType.FIND_NODE,
idToFind.getBytes(), true);
idToFind.getBytes(), true, null);
if (successful) {
LOGGER.log(Level.INFO, "Sending [FIND_NODE {0}] to node {1}",
@ -138,7 +140,7 @@ public class Node {
}
boolean successful = send(receiver, MessageType.NODES, rpcID,
nodes.array(), false);
nodes.array(), false, null);
if (successful) {
LOGGER.log(
@ -148,13 +150,13 @@ public class Node {
}
}
boolean sendPing(NodeIdentifier receiver) {
return send(receiver, MessageType.PING, null, true);
public boolean sendPing(NodeIdentifier receiver, MessageCallback cb) {
return send(receiver, MessageType.PING, null, true, cb);
}
void sendPong(NodeIdentifier receiver, Identifier rpcID) {
boolean successful = send(receiver, MessageType.PONG, rpcID, null,
false);
false, null);
if (successful) {
LOGGER.log(Level.INFO, "Sending [PONG] to {0} (rpcID={1})",
@ -163,12 +165,13 @@ public class Node {
}
private boolean send(NodeIdentifier to, byte messageType, byte[] data,
boolean reliable) {
return send(to, messageType, createRPCID(), data, reliable);
boolean reliable, MessageCallback cb) {
return send(to, messageType, createRPCID(), data, reliable, cb);
}
private boolean send(NodeIdentifier to, byte messageType, Identifier rpcID,
byte[] data, boolean reliable) {
byte[] data, boolean reliable, MessageCallback cb) {
boolean successful = true;
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
@ -196,7 +199,7 @@ public class Node {
if (reliable) {
// This message should be reliable (acked)
Ack newAck = new Ack(rpcID, to, channel, buffer);
Ack newAck = new Ack(rpcID, to, channel, buffer, cb);
if (rpcs.containsKey(rpcID)) {
rpcs.get(rpcID).add(newAck);
} else {
@ -227,7 +230,7 @@ public class Node {
}
public void updateBuckets(NodeIdentifier id) {
rt.update(id);
rt.insert(id);
}
public Identifier getID() {
@ -262,4 +265,15 @@ public class Node {
return removedAck;
}
public void leave() {
for (NodeIdentifier n : getNeighbors()) {
sendLeave(n);
}
System.exit(0);
}
private boolean sendLeave(NodeIdentifier n) {
return send(n, MessageType.LEAVE, null, false, null);
}
}

View File

@ -1,6 +1,5 @@
package node;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -10,8 +9,8 @@ public class NodeIdentifier extends Identifier {
private InetSocketAddress address;
public NodeIdentifier(byte[] bytes, InetSocketAddress address) {
super(bytes);
public NodeIdentifier(int size, byte[] bytes, InetSocketAddress address) {
super(size, bytes);
this.address = address;
}
@ -28,8 +27,8 @@ public class NodeIdentifier extends Identifier {
}
public String toString() {
return new BigInteger(1, bits.toByteArray()).toString() + " ("
+ address.toString() + ")";
return super.toString() + " (" + address.toString() + ")";
// return bits.toString();
}
}

View File

@ -49,7 +49,7 @@ public class UDPHandler implements Runnable {
for (int i = 0; i < numBytes; i++) {
result[i] = buffer.get();
}
return new Identifier(result);
return new Identifier(Node.ID_BITS, result);
}
private NodeIdentifier getNodeTripleFromBuffer() {
@ -60,7 +60,7 @@ public class UDPHandler implements Runnable {
for (int i = 0; i < numBytes; i++) {
result[i] = buffer.get();
}
return new NodeIdentifier(result, address);
return new NodeIdentifier(Node.ID_BITS, result, address);
}
public void run() {
@ -69,6 +69,7 @@ public class UDPHandler implements Runnable {
// Run until I get killed, and all my acks have been answered
while (running || node.hasAcks()) {
try {
boolean updateRT = true;
from = (InetSocketAddress) node.getChannel().receive(buffer);
// channel.receive() is non-blocking. So we need to check if
@ -78,7 +79,7 @@ public class UDPHandler implements Runnable {
byte messageType = buffer.get();
NodeIdentifier fromID = new NodeIdentifier(
NodeIdentifier fromID = new NodeIdentifier(Node.ID_BITS,
getIDFromBuffer().getBytes(), from);
Identifier rpcID = getIDFromBuffer();
@ -91,11 +92,20 @@ public class UDPHandler implements Runnable {
receiveNodes(fromID, rpcID);
break;
case MessageType.PING:
updateRT = false;
receivePing(fromID, rpcID);
break;
case MessageType.PONG:
updateRT = false;
receivePong(fromID, rpcID);
break;
case MessageType.LEAVE:
// updateRT = false;
// receivePong(fromID, rpcID);
// do nothing, update buckets later
LOGGER.log(Level.INFO, "Received leave from {0}",
new Object[] { from.toString() });
break;
default:
LOGGER.log(Level.INFO,
"Received unknown command from {0}: [{1}]{2}",
@ -103,8 +113,10 @@ public class UDPHandler implements Runnable {
new String(buffer.array()) });
}
node.updateBuckets(new NodeIdentifier(fromID.getBytes(),
from));
if (updateRT) {
node.updateBuckets(new NodeIdentifier(Node.ID_BITS,
fromID.getBytes(), from));
}
} else {
// If nothing has been read/received wait and read/receive

View File

@ -5,117 +5,127 @@ import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import message.MessageCallback;
import node.Node;
import node.NodeIdentifier;
public class Bucket {
private final static Logger LOGGER = Logger.getLogger(Bucket.class
.getName());
private Bucket left;
private Bucket right;
private List<NodeIdentifier> entries;
private int bucketSize;
private int level;
private List<NodeIdentifier> entries = new ArrayList<NodeIdentifier>();
private Bucket left = null;
private Bucket right = null;
private Node node;
private int level = 0;
public Bucket(int size, int level) {
public Bucket(int bucketSize, int level, Node node) {
this.bucketSize = bucketSize;
this.level = level;
this.bucketSize = size;
this.node = node;
entries = new ArrayList<NodeIdentifier>();
}
public void update(NodeIdentifier newID) {
update(this, newID);
/**
* Returns the nodes of this very bucket.
*
* @return
*/
public List<NodeIdentifier> getNodes() {
return entries;
}
private void update(Bucket bucket, NodeIdentifier newID) {
if (!bucket.isLeaf()) {
if (newID.isBitSetAt(level)) {
update(bucket.right, newID);
public boolean contains(NodeIdentifier id) {
if (!isLeaf()) {
return left.contains(id) || right.contains(id);
}
return entries.contains(id);
}
/**
* Tries to update the given node.
*
* @param id
* @return true if the node is still available, else false
*/
public void update(final NodeIdentifier id) {
if (!isLeaf()) {
if (id.isBitSetAt(level)) {
left.update(id);
} else {
update(bucket.left, newID);
right.update(id);
}
} else if (bucket.hasSpace()) {
if (bucket.entries.contains(newID)) {
// Move to beginning
LOGGER.log(
Level.FINE,
"Node {0} already in routing table. Move to end of bucket.",
new Object[] { newID });
if (bucket.entries.size() > 1) {
bucket.entries.remove(newID);
bucket.entries.add(newID);
} else {
node.sendPing(id, new MessageCallback() {
@Override
public void onReceive() {
LOGGER.log(Level.INFO,
"Node answered in time, moving to top of list.");
entries.remove(id);
entries.add(0, id);
}
@Override
public void onTimeout() {
LOGGER.log(Level.INFO, "Node didnt answer in time.");
entries.remove(id);
}
});
}
}
public void insert(NodeIdentifier newId) {
insert(newId, "");
}
public void insert(NodeIdentifier newId, String path) {
if (isLeaf()) {
if (entries.size() < bucketSize) {
LOGGER.log(Level.INFO,
"Added node {0} to RT [{1}] on level {2}",
new Object[] { newId, path, level });
entries.add(newId);
} else {
LOGGER.log(Level.INFO, "Added new node {0} to routing table.",
new Object[] { newID });
bucket.entries.add(newID);
}
LOGGER.log(Level.INFO, "Split on level " + level
+ " while adding " + newId);
} else { // Bucket is full
if (bucket.entries.contains(newID)) {
// Node is already present -> check if [last contacted node of
// this buffer] is still alive
LOGGER.log(
Level.FINE,
"Node {0} already in routing table. Check if still alive...",
new Object[] { newID });
LOGGER.log(Level.INFO,
"Distributing present nodes to lower buckets");
} else {
// TODO: only split if necessary
Bucket newLeft = new Bucket(bucketSize, level + 1);
Bucket newRight = new Bucket(bucketSize, level + 1);
Bucket newLeft = new Bucket(bucketSize, level + 1, node);
Bucket newRight = new Bucket(bucketSize, level + 1, node);
// Add the new entry and in the following loop distribute all
// existing entries to left/right
bucket.entries.add(newID);
entries.add(newId);
for (NodeIdentifier id : entries) {
if (id.isBitSetAt(level)) {
newLeft.entries.add(id);
newLeft.insert(id, path + "1");
} else {
newRight.entries.add(id);
newRight.insert(id, path + "0");
}
}
bucket.entries = null;
bucket.left = newLeft;
bucket.right = newRight;
}
}
}
public boolean hasNode(NodeIdentifier id) {
return hasNode(this, id);
}
private boolean hasNode(Bucket bucket, NodeIdentifier idToFind) {
if (bucket.isLeaf()) {
for (NodeIdentifier id : bucket.entries) {
if (id.equals(idToFind)) {
return true;
}
this.entries = null;
this.left = newLeft;
this.right = newRight;
}
return false;
} else {
if (idToFind.isBitSetAt(level)) {
return bucket.hasNode(bucket.left, idToFind);
if (newId.isBitSetAt(level)) {
left.insert(newId, path + "1");
} else {
return bucket.hasNode(bucket.right, idToFind);
right.insert(newId, path + "0");
}
}
}
private boolean isLeaf() {
return left == null && right == null && entries != null;
}
private boolean hasSpace() {
return entries.size() < bucketSize;
return left == null && right == null;
}
}

View File

@ -1,68 +1,19 @@
package routingtable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import node.Identifier;
import node.Node;
import node.NodeIdentifier;
public class RoutingTable {
public interface RoutingTable {
private Bucket tree;
public void insert(NodeIdentifier id);
private Set<NodeIdentifier> entries = new HashSet<NodeIdentifier>();
public Set<NodeIdentifier> getClosestNodesTo(Identifier id);
private int bucketSize;
public boolean contains(NodeIdentifier node);
public RoutingTable(int bucketSize) {
this.tree = new Bucket(bucketSize, 0);
this.bucketSize = bucketSize;
}
public void remove(NodeIdentifier node);
public void update(NodeIdentifier id) {
entries.add(id);
tree.update(id);
}
public Set<NodeIdentifier> getClosestNodesTo(final Identifier id) {
Set<NodeIdentifier> result = new HashSet<NodeIdentifier>();
if (entries.size() <= bucketSize) {
result.addAll(entries);
} else {
List<NodeIdentifier> temp = new ArrayList<NodeIdentifier>(entries);
Collections.sort(temp, new Comparator<NodeIdentifier>() {
@Override
public int compare(NodeIdentifier o1, NodeIdentifier o2) {
BigInteger dist1 = id.distanceTo(o1);
BigInteger dist2 = id.distanceTo(o2);
return dist1.compareTo(dist2);
}
});
for (int i = 0; i < Node.BUCKET_SIZE; i++) {
result.add(temp.get(i));
}
result = new HashSet<NodeIdentifier>(temp.subList(0,
Node.BUCKET_SIZE));
}
return result;
}
public void remove(Identifier node) {
// TODO
}
public Set<NodeIdentifier> getEntries() {
return entries;
}
public Set<NodeIdentifier> getEntries();
}

View File

@ -0,0 +1,79 @@
package routingtable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import node.Identifier;
import node.Node;
import node.NodeIdentifier;
public class RoutingTableImpl implements RoutingTable {
private Set<NodeIdentifier> entries = new HashSet<NodeIdentifier>();
private Bucket root;
private int bucketSize;
public RoutingTableImpl(int bucketSize, Node node) {
this.bucketSize = bucketSize;
this.root = new Bucket(bucketSize, 0, node);
}
@Override
public void insert(NodeIdentifier id) {
if (root.contains(id)) {
root.update(id);
} else {
entries.add(id);
root.insert(id);
}
}
@Override
public Set<NodeIdentifier> getClosestNodesTo(final Identifier id) {
Set<NodeIdentifier> result = new HashSet<NodeIdentifier>();
if (entries.size() <= bucketSize) {
result.addAll(entries);
} else {
List<NodeIdentifier> temp = new ArrayList<NodeIdentifier>(entries);
Collections.sort(temp, new Comparator<NodeIdentifier>() {
@Override
public int compare(NodeIdentifier o1, NodeIdentifier o2) {
BigInteger dist1 = id.distanceTo(o1);
BigInteger dist2 = id.distanceTo(o2);
return dist1.compareTo(dist2);
}
});
for (int i = 0; i < bucketSize; i++) {
result.add(temp.get(i));
}
result = new HashSet<NodeIdentifier>(temp.subList(0,
Node.BUCKET_SIZE));
}
return result;
}
@Override
public boolean contains(NodeIdentifier node) {
return root.contains(node);
}
@Override
public void remove(NodeIdentifier node) {
}
@Override
public Set<NodeIdentifier> getEntries() {
return entries;
}
}