Everything should work (except for the RoutingTable)
This commit is contained in:
parent
036b77cf2d
commit
7967603f75
@ -1,105 +1,109 @@
|
||||
package message;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import node.Identifier;
|
||||
import node.NodeIdentifier;
|
||||
import util.BufferUtil;
|
||||
|
||||
public class Ack {
|
||||
private final static Logger LOGGER = Logger.getLogger(Ack.class.getName());
|
||||
private final static Logger LOGGER = Logger.getLogger(Ack.class.getName());
|
||||
|
||||
// timeout in seconds
|
||||
private final int TIMEOUT = 1000;
|
||||
// timeout in seconds
|
||||
private final int TIMEOUT = 1000;
|
||||
|
||||
private int id;
|
||||
private SocketAddress address;
|
||||
private ByteBuffer buf;
|
||||
private Identifier id;
|
||||
private NodeIdentifier receiver;
|
||||
private ByteBuffer buffer;
|
||||
|
||||
private TimeoutThread timeout;
|
||||
private volatile Thread thread;
|
||||
private TimeoutThread timeout;
|
||||
private Thread thread;
|
||||
|
||||
// The channel to re-send the message on
|
||||
private DatagramChannel channel;
|
||||
// The channel to re-send the message on
|
||||
private DatagramChannel channel;
|
||||
|
||||
public Ack(int id, SocketAddress address, DatagramChannel channel) {
|
||||
this.id = id;
|
||||
this.address = address;
|
||||
this.channel = channel;
|
||||
startThread();
|
||||
}
|
||||
public Ack(Identifier id, NodeIdentifier receiver, DatagramChannel channel,
|
||||
ByteBuffer buffer) {
|
||||
// TODO: this needs a max. number of retries (so that a ping to a failed
|
||||
// host doesn't re-ping forever)
|
||||
this.id = id;
|
||||
this.receiver = receiver;
|
||||
this.channel = channel;
|
||||
this.buffer = BufferUtil.clone(buffer);
|
||||
startThread();
|
||||
}
|
||||
|
||||
private void startThread() {
|
||||
LOGGER.log(Level.FINE, "Starting timeout thread for ack #" + id);
|
||||
timeout = new TimeoutThread();
|
||||
thread = new Thread(timeout);
|
||||
thread.start();
|
||||
}
|
||||
private void startThread() {
|
||||
LOGGER.log(Level.FINE, "Starting timeout thread for RPC " + id);
|
||||
timeout = new TimeoutThread();
|
||||
thread = new Thread(timeout);
|
||||
thread.start();
|
||||
}
|
||||
|
||||
public int getID() {
|
||||
return id;
|
||||
}
|
||||
public Identifier getID() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public boolean check(SocketAddress address) {
|
||||
return this.address.toString().equals(address.toString());
|
||||
}
|
||||
public boolean check(NodeIdentifier fromID) {
|
||||
return fromID.equals(receiver);
|
||||
}
|
||||
|
||||
public ByteBuffer getBuf() {
|
||||
return buf;
|
||||
}
|
||||
public ByteBuffer getBuf() {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public void setBuf(ByteBuffer buf) {
|
||||
this.buf = buf;
|
||||
}
|
||||
public void setBuf(ByteBuffer buf) {
|
||||
this.buffer = buf;
|
||||
}
|
||||
|
||||
public void setReceived() {
|
||||
// Stop thread
|
||||
try {
|
||||
if (thread != null) {
|
||||
timeout.terminate();
|
||||
thread.join();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
public void setReceived() {
|
||||
// Stop thread
|
||||
try {
|
||||
if (thread != null) {
|
||||
timeout.terminate();
|
||||
thread.join();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
|
||||
private class TimeoutThread implements Runnable {
|
||||
private volatile boolean notReceived = true;
|
||||
private class TimeoutThread implements Runnable {
|
||||
private volatile boolean notReceived = true;
|
||||
|
||||
// When do we stop expecting an ack
|
||||
private long timeToStop = System.currentTimeMillis() + TIMEOUT;
|
||||
// When do we stop expecting an ack
|
||||
private long timeToStop = System.currentTimeMillis() + TIMEOUT;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (notReceived && System.currentTimeMillis() < timeToStop) {
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void run() {
|
||||
while (notReceived && System.currentTimeMillis() < timeToStop) {
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
// Timeout hit -> re-send
|
||||
if (notReceived) {
|
||||
try {
|
||||
LOGGER.log(Level.INFO, "Absent ack #{0}. Resending to {1}",
|
||||
new Object[] { id, address.toString() });
|
||||
// Timeout hit -> re-send
|
||||
if (notReceived) {
|
||||
try {
|
||||
LOGGER.log(Level.INFO,
|
||||
"Absent RPC ack {0}. Resending to {1}",
|
||||
new Object[] { id, receiver.toString() });
|
||||
channel.send(buffer, receiver.getAddress());
|
||||
startThread();
|
||||
} catch (IOException e) {
|
||||
System.out.println("ASDASD");
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
channel.send(buf, address);
|
||||
startThread();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void terminate() {
|
||||
notReceived = false;
|
||||
}
|
||||
}
|
||||
|
||||
public SocketAddress getAddresse() {
|
||||
return address;
|
||||
}
|
||||
public void terminate() {
|
||||
notReceived = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,12 +1,9 @@
|
||||
package message;
|
||||
|
||||
|
||||
public class MessageType {
|
||||
public final static byte FIND_NODE = 48; // 0
|
||||
public final static byte PING = 49;
|
||||
public final static byte PONG = 50;
|
||||
public final static byte FIND_NODE = 0;
|
||||
public final static byte NODES = 1;
|
||||
|
||||
public final static byte ACK = 51;
|
||||
|
||||
public final static byte NODES = 52;
|
||||
public final static byte PING = 10;
|
||||
public final static byte PONG = 11;
|
||||
}
|
||||
|
||||
@ -5,11 +5,12 @@ import java.net.InetSocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.DatagramChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.LogManager;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import message.Ack;
|
||||
@ -34,14 +35,19 @@ public class Node {
|
||||
private static final Identifier INITIAL_ID = Identifier
|
||||
.getStaticIdentifier();
|
||||
|
||||
private static final int _BUFFER_SIZE = 512;
|
||||
private static final int BUFFER_SIZE = 512;
|
||||
|
||||
/**
|
||||
* The size of an IP address (in bytes)
|
||||
*/
|
||||
private static final int SIZE_IP_ADDRESS = 8;
|
||||
|
||||
private final static Logger LOGGER = Logger.getLogger(Node.class.getName());
|
||||
|
||||
private InetSocketAddress address;
|
||||
private DatagramChannel channel;
|
||||
|
||||
private Map<Integer, Ack> acks = new HashMap<Integer, Ack>();
|
||||
private Map<Identifier, List<Ack>> rpcs = new HashMap<Identifier, List<Ack>>();
|
||||
|
||||
private Thread thread;
|
||||
private UDPHandler udpListen;
|
||||
@ -77,8 +83,20 @@ public class Node {
|
||||
getName(), address.toString() });
|
||||
|
||||
if (address.getPort() != INITIAL_PORT) {
|
||||
joinNetworkVia(new NodeIdentifier(INITIAL_ID.getBytes(),
|
||||
new InetSocketAddress("127.0.0.1", INITIAL_PORT)));
|
||||
// The port of this node is not the "INITIAL_PORT" (so it's not
|
||||
// the first node in the network). So we try to join the network
|
||||
// via the first node.
|
||||
NodeIdentifier viaNode = new NodeIdentifier(
|
||||
INITIAL_ID.getBytes(), new InetSocketAddress(
|
||||
"127.0.0.1", INITIAL_PORT));
|
||||
|
||||
LOGGER.log(Level.INFO, "Trying to join network via node {0}",
|
||||
new Object[] { viaNode });
|
||||
|
||||
rt.update(viaNode);
|
||||
sendFindNode(viaNode, this.nodeID);
|
||||
|
||||
sendPing(viaNode);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
@ -86,82 +104,122 @@ public class Node {
|
||||
}
|
||||
}
|
||||
|
||||
private void joinNetworkVia(NodeIdentifier receiver) {
|
||||
LOGGER.log(Level.INFO, "Trying to join network via node {0}",
|
||||
new Object[] { receiver });
|
||||
|
||||
rt.update(receiver);
|
||||
// Send a FIND_NODE. The node to find is my own id (bootstrapping...)
|
||||
sendFindNode(receiver, nodeID);
|
||||
private Identifier createRPCID() {
|
||||
Identifier rpcID = Identifier.getRandomIdentifier();
|
||||
while (rpcs.containsKey(rpcID)) {
|
||||
rpcID = Identifier.getRandomIdentifier();
|
||||
}
|
||||
return rpcID;
|
||||
}
|
||||
|
||||
void sendFindNode(NodeIdentifier receiver, Identifier idToFind) {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(_BUFFER_SIZE);
|
||||
boolean successful = send(receiver, MessageType.FIND_NODE,
|
||||
idToFind.getBytes(), true);
|
||||
|
||||
Identifier rpc_id = Identifier.getRandomIdentifier();
|
||||
|
||||
buffer.put(MessageType.FIND_NODE);
|
||||
buffer.put(nodeID.getBytes());
|
||||
buffer.put(rpc_id.getBytes());
|
||||
buffer.put(idToFind.getBytes());
|
||||
|
||||
if (send(buffer, receiver.getAddress())) {
|
||||
if (successful) {
|
||||
LOGGER.log(Level.INFO, "Sending [FIND_NODE {0}] to node {1}",
|
||||
new Object[] { idToFind, receiver });
|
||||
}
|
||||
}
|
||||
|
||||
void sendClosestNodesTo(NodeIdentifier receiver, Identifier idToFind,
|
||||
Identifier rpc_id) {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(_BUFFER_SIZE);
|
||||
|
||||
buffer.put(MessageType.NODES);
|
||||
buffer.put(nodeID.getBytes());
|
||||
buffer.put(rpc_id.getBytes());
|
||||
Identifier rpcID) {
|
||||
|
||||
Set<NodeIdentifier> entries = rt.getClosestNodesTo(idToFind);
|
||||
int numNodes = entries.size();
|
||||
|
||||
ByteBuffer nodes = ByteBuffer.allocate(numNodes * (ID_BITS / 8)
|
||||
+ numNodes * SIZE_IP_ADDRESS);
|
||||
|
||||
for (NodeIdentifier id : entries) {
|
||||
if (!receiver.equals(id)) {
|
||||
buffer.put(id.getTripleAsBytes());
|
||||
nodes.put(id.getTripleAsBytes());
|
||||
}
|
||||
}
|
||||
|
||||
if (send(buffer, receiver.getAddress())) {
|
||||
LOGGER.log(Level.INFO,
|
||||
"Sending {0} nodes to to node {1} [FIND_NODE {2}].",
|
||||
new Object[] { entries.size(), receiver, idToFind });
|
||||
boolean successful = send(receiver, MessageType.NODES, rpcID,
|
||||
nodes.array(), false);
|
||||
|
||||
if (successful) {
|
||||
LOGGER.log(
|
||||
Level.INFO,
|
||||
"Sending {0} nodes to to node {1} [FIND_NODE {2}] (rpcID={3})",
|
||||
new Object[] { entries.size(), receiver, idToFind, rpcID });
|
||||
}
|
||||
}
|
||||
|
||||
private boolean send(ByteBuffer buffer, InetSocketAddress to) {
|
||||
buffer.flip();
|
||||
try {
|
||||
channel.send(buffer, to);
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return false;
|
||||
boolean sendPing(NodeIdentifier receiver) {
|
||||
return send(receiver, MessageType.PING, null, true);
|
||||
}
|
||||
|
||||
void sendPong(NodeIdentifier receiver, Identifier rpcID) {
|
||||
boolean successful = send(receiver, MessageType.PONG, rpcID, null,
|
||||
false);
|
||||
|
||||
if (successful) {
|
||||
LOGGER.log(Level.INFO, "Sending [PONG] to {0} (rpcID={1})",
|
||||
new Object[] { receiver, rpcID });
|
||||
}
|
||||
}
|
||||
|
||||
private boolean send(NodeIdentifier to, byte messageType, byte[] data,
|
||||
boolean reliable) {
|
||||
return send(to, messageType, createRPCID(), data, reliable);
|
||||
}
|
||||
|
||||
private boolean send(NodeIdentifier to, byte messageType, Identifier rpcID,
|
||||
byte[] data, boolean reliable) {
|
||||
boolean successful = true;
|
||||
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
|
||||
|
||||
buffer.put(messageType);
|
||||
buffer.put(this.nodeID.getBytes());
|
||||
buffer.put(rpcID.getBytes());
|
||||
|
||||
if (data != null) {
|
||||
buffer.put(data);
|
||||
}
|
||||
|
||||
buffer.flip();
|
||||
|
||||
try {
|
||||
|
||||
channel.send(buffer, to.getAddress());
|
||||
|
||||
} catch (IOException e) {
|
||||
|
||||
LOGGER.log(Level.SEVERE, "Failed to write to channel", e);
|
||||
successful = false;
|
||||
|
||||
} finally {
|
||||
// Even if an exception occurred this should be reliable
|
||||
if (reliable) {
|
||||
// This message should be reliable (acked)
|
||||
|
||||
Ack newAck = new Ack(rpcID, to, channel, buffer);
|
||||
if (rpcs.containsKey(rpcID)) {
|
||||
rpcs.get(rpcID).add(newAck);
|
||||
} else {
|
||||
rpcs.put(rpcID, new ArrayList<Ack>());
|
||||
rpcs.get(rpcID).add(newAck);
|
||||
}
|
||||
}
|
||||
}
|
||||
return successful;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return nodeID.toString();
|
||||
}
|
||||
|
||||
public boolean hasAcks() {
|
||||
return !acks.isEmpty();
|
||||
return !rpcs.isEmpty();
|
||||
}
|
||||
|
||||
protected boolean hasAck(int ack_id) {
|
||||
return acks.containsKey(ack_id);
|
||||
}
|
||||
|
||||
protected Ack getAck(int ack_id) {
|
||||
return acks.get(ack_id);
|
||||
}
|
||||
|
||||
protected void removeAck(int ack_id) {
|
||||
acks.remove(ack_id).setReceived();
|
||||
protected boolean hasAck(Identifier rpcID) {
|
||||
// TODO
|
||||
// return rpcs.containsKey(ack_id);
|
||||
return false;
|
||||
}
|
||||
|
||||
public DatagramChannel getChannel() {
|
||||
@ -180,16 +238,28 @@ public class Node {
|
||||
return rt.getEntries();
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
System.setProperty("java.util.logging.config.file",
|
||||
"logging.properties");
|
||||
public boolean receivedRPC(NodeIdentifier fromID, Identifier rpcID) {
|
||||
List<Ack> rpcsFromID = rpcs.get(rpcID);
|
||||
boolean removedAck = false;
|
||||
|
||||
try {
|
||||
LogManager.getLogManager().readConfiguration();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
for (Ack ack : rpcsFromID) {
|
||||
if (ack.check(fromID)) {
|
||||
ack.setReceived();
|
||||
rpcsFromID.remove(ack);
|
||||
removedAck = true;
|
||||
|
||||
LOGGER.log(Level.FINER, "Received rpc ack " + rpcID);
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
new Node();
|
||||
if (!removedAck) {
|
||||
LOGGER.log(Level.WARNING,
|
||||
"Received rpc ack {0}, but didn't expect that",
|
||||
new Object[] { rpcID });
|
||||
}
|
||||
|
||||
return removedAck;
|
||||
}
|
||||
}
|
||||
|
||||
@ -80,18 +80,21 @@ public class UDPHandler implements Runnable {
|
||||
|
||||
NodeIdentifier fromID = new NodeIdentifier(
|
||||
getIDFromBuffer().getBytes(), from);
|
||||
Identifier rpc_id = getIDFromBuffer();
|
||||
|
||||
Identifier rpcID = getIDFromBuffer();
|
||||
|
||||
switch (messageType) {
|
||||
case MessageType.FIND_NODE:
|
||||
receiveFindNode(fromID, rpc_id);
|
||||
receiveFindNode(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.NODES:
|
||||
receiveNodes(fromID, rpc_id);
|
||||
receiveNodes(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.PING:
|
||||
LOGGER.log(Level.INFO, "Received [ping] from {0}",
|
||||
new Object[] { from.toString() });
|
||||
receivePing(fromID, rpcID);
|
||||
break;
|
||||
case MessageType.PONG:
|
||||
receivePong(fromID, rpcID);
|
||||
break;
|
||||
default:
|
||||
LOGGER.log(Level.INFO,
|
||||
@ -104,6 +107,8 @@ public class UDPHandler implements Runnable {
|
||||
from));
|
||||
|
||||
} else {
|
||||
// If nothing has been read/received wait and read/receive
|
||||
// again
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
@ -119,18 +124,34 @@ public class UDPHandler implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
private void receiveNodes(NodeIdentifier fromID, Identifier rpc_id) {
|
||||
private void receivePong(NodeIdentifier fromID, Identifier rpcID) {
|
||||
LOGGER.log(Level.INFO, "Received [PONG] from {0}",
|
||||
new Object[] { fromID });
|
||||
node.receivedRPC(fromID, rpcID);
|
||||
}
|
||||
|
||||
private void receivePing(NodeIdentifier fromID, Identifier rpcID) {
|
||||
LOGGER.log(Level.INFO, "Received [PING] from {0}",
|
||||
new Object[] { fromID });
|
||||
node.sendPong(fromID, rpcID);
|
||||
}
|
||||
|
||||
private void receiveNodes(NodeIdentifier fromID, Identifier rpcID) {
|
||||
|
||||
int numReceived = 0;
|
||||
StringBuilder nodes = new StringBuilder();
|
||||
|
||||
while (buffer.hasRemaining()) {
|
||||
NodeIdentifier newID = getNodeTripleFromBuffer();
|
||||
node.updateBuckets(newID);
|
||||
nodes.append(newID).append(", ");
|
||||
numReceived++;
|
||||
}
|
||||
|
||||
LOGGER.log(Level.INFO, "Received {0} [NODES] from Node {1})",
|
||||
new Object[] { numReceived, fromID });
|
||||
node.receivedRPC(fromID, rpcID);
|
||||
|
||||
LOGGER.log(Level.INFO, "Received {0} [NODES] [{1}] from Node {2})",
|
||||
new Object[] { numReceived, nodes.toString(), fromID });
|
||||
}
|
||||
|
||||
private void receiveFindNode(NodeIdentifier fromID, Identifier rpc_id) {
|
||||
|
||||
@ -40,26 +40,33 @@ public class Bucket {
|
||||
|
||||
}
|
||||
} else if (bucket.hasSpace()) {
|
||||
System.out.println(entries == null);
|
||||
if (entries.contains(newID)) {
|
||||
if (bucket.entries.contains(newID)) {
|
||||
// Move to beginning
|
||||
LOGGER.log(
|
||||
Level.FINE,
|
||||
"Node {0} already in routing table. Move to end of bucket.",
|
||||
new Object[] { newID });
|
||||
if (entries.size() > 1) {
|
||||
entries.remove(newID);
|
||||
entries.add(newID);
|
||||
|
||||
if (bucket.entries.size() > 1) {
|
||||
bucket.entries.remove(newID);
|
||||
bucket.entries.add(newID);
|
||||
}
|
||||
} else {
|
||||
LOGGER.log(Level.INFO, "Added new node {0} to routing table.",
|
||||
new Object[] { newID });
|
||||
entries.add(newID);
|
||||
bucket.entries.add(newID);
|
||||
}
|
||||
|
||||
} else { // Bucket is full
|
||||
System.out.println(entries == null);
|
||||
if (!entries.contains(newID)) {
|
||||
if (bucket.entries.contains(newID)) {
|
||||
// Node is already present -> check if [last contacted node of
|
||||
// this buffer] is still alive
|
||||
LOGGER.log(
|
||||
Level.FINE,
|
||||
"Node {0} already in routing table. Check if still alive...",
|
||||
new Object[] { newID });
|
||||
|
||||
} else {
|
||||
// TODO: only split if necessary
|
||||
|
||||
Bucket newLeft = new Bucket(bucketSize, level + 1);
|
||||
@ -67,24 +74,18 @@ public class Bucket {
|
||||
|
||||
// Add the new entry and in the following loop distribute all
|
||||
// existing entries to left/right
|
||||
entries.add(newID);
|
||||
bucket.entries.add(newID);
|
||||
|
||||
for (NodeIdentifier entry : entries) {
|
||||
if (entry.isBitSetAt(level)) {
|
||||
newLeft.entries.add(entry);
|
||||
for (NodeIdentifier id : entries) {
|
||||
if (id.isBitSetAt(level)) {
|
||||
newLeft.entries.add(id);
|
||||
} else {
|
||||
newRight.entries.add(entry);
|
||||
newRight.entries.add(id);
|
||||
}
|
||||
}
|
||||
bucket.entries = null;
|
||||
bucket.left = newLeft;
|
||||
bucket.right = newRight;
|
||||
} else {
|
||||
// Node is already present -> check if it's still alive
|
||||
LOGGER.log(
|
||||
Level.FINE,
|
||||
"Node {0} already in routing table. Check if still alive...",
|
||||
new Object[] { newID });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -48,12 +48,13 @@ public class RoutingTable {
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
for (int i = 0; i < Node.BUCKET_SIZE; i++) {
|
||||
result.add(temp.get(i));
|
||||
}
|
||||
|
||||
result = new HashSet<NodeIdentifier>(temp.subList(0,
|
||||
Node.BUCKET_SIZE));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user