Added TimeoutThread for acks and resending of not-acknowledged msgs

This commit is contained in:
senft-desktop 2012-11-28 20:02:30 +01:00
parent ec37895f80
commit eea5415f2f
2 changed files with 389 additions and 267 deletions

View File

@ -1,15 +1,44 @@
package node;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Ack {
private final static Logger LOGGER = Logger.getLogger(Ack.class.getName());
// timeout in seconds
private final int TIMEOUT = 5000;
private int id;
private SocketAddress address;
private ByteBuffer buf;
public Ack(SocketAddress address) {
private TimeoutThread timeout;
private volatile Thread thread;
// The channel to re-send the message on
private DatagramChannel channel;
public Ack(int id, SocketAddress address, DatagramChannel channel) {
this.id = id;
this.address = address;
this.channel = channel;
startThread();
}
private void startThread() {
LOGGER.log(Level.INFO, "Starting timeout thread for ack #" + id);
timeout = new TimeoutThread();
thread = new Thread(timeout);
thread.start();
}
public int getId() {
return id;
}
public boolean check(SocketAddress address) {
@ -23,4 +52,69 @@ public class Ack {
public void setBuf(ByteBuffer buf) {
this.buf = buf;
}
public void setReceived() {
// Stop thread
try {
if (thread != null) {
timeout.terminate();
thread.join();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private class TimeoutThread implements Runnable {
private volatile boolean notReceived = true;
// When do we stop expecting an ack
private long timeToStop = System.currentTimeMillis() + TIMEOUT;
@Override
public void run() {
while (notReceived && System.currentTimeMillis() < timeToStop) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// Timeout hit -> re-send
if (notReceived) {
try {
LOGGER.log(Level.INFO, "Absent ack #" + id
+ ". Resending to " + address.toString());
/**
* TODO: This would be the intuitive order (first re-send,
* then start the new TimeoutThread), right? Unfortunately
* this gives ugly log outputs, because the re-sent message
* arrives before the new thread is constructed, so we get:
*
* <pre>
* [2012-11-28 07:53:05 PM] node.Node INFO: Initialized node /127.0.0.1:37179
* a spawn b
* [2012-11-28 07:53:15 PM] node.Node INFO: Initialized node /127.0.0.1:35358
* [2012-11-28 07:53:15 PM] node.Ack INFO: Starting timeout thread for ack #-1276001492
* [2012-11-28 07:53:15 PM] node.Node INFO: /127.0.0.1:35358 received invite from /127.0.0.1:37179
* [2012-11-28 07:53:20 PM] node.Ack INFO: Absent ack #-1276001492). Resending to /127.0.0.1:35358
* </pre>
*
* No big deal, and we could just swap the statements, but
* meh...
*/
channel.send(buf, address);
startThread();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void terminate() {
notReceived = false;
}
}
}

View File

@ -13,322 +13,350 @@ import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
import util.BufferUtil;
public class Node {
private final static Logger LOGGER = Logger.getLogger(Node.class.getName());
private final static Logger LOGGER = Logger.getLogger(Node.class.getName());
private static final int BUF_SIZE = 512;
private static final int BUF_SIZE = 512;
private DatagramChannel channel;
private DatagramChannel channel;
private String name = "Not initialized";
private String name = "Not initialized";
private List<SocketAddress> neighbors = new ArrayList<>();
private List<SocketAddress> neighbors = new ArrayList<>();
private Map<Integer, Ack> acks = new HashMap<>();
private Map<Integer, Ack> acks = new HashMap<>();
private volatile Thread thread;
private UDPListen udpListen;
private volatile Thread thread;
private UDPListen udpListen;
private Random generator;
private Random generator;
public Node() {
//debug
System.setProperty("java.net.preferIPv4Stack", "true");
generator = new Random(System.currentTimeMillis());
try {
channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress("localhost", 0));
channel.configureBlocking(false);
public Node() {
// debug
System.setProperty("java.net.preferIPv4Stack", "true");
generator = new Random(System.currentTimeMillis());
try {
channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress("localhost", 0));
channel.configureBlocking(false);
this.name = channel.socket().getLocalSocketAddress().toString();
this.name = channel.socket().getLocalSocketAddress().toString();
udpListen = new UDPListen();
udpListen = new UDPListen();
thread = new Thread(udpListen);
thread.start();
LOGGER.log(Level.INFO, "Initialized node {0}", name);
} catch (IOException e) {
e.printStackTrace();
}
}
LOGGER.log(Level.INFO, "Initialized node {0}", name);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Create another peer, mutually link creator and spawn.
*
* @return the spawned Node
* @throws IOException
* if no connection could be established to the new node
*/
public Node spawn() throws IOException {
// LOGGER.info("Name: " + getName() + ", Spawning new node.");
Node newNode = new Node();
sendInvite(newNode);
neighbors.add(newNode.getAddress());
/**
* Create another peer, mutually link creator and spawn.
*
* @return the spawned Node
* @throws IOException
* if no connection could be established to the new node
*/
public Node spawn() throws IOException {
// LOGGER.info("Name: " + getName() + ", Spawning new node.");
Node newNode = new Node();
return newNode;
}
sendInvite(newNode);
neighbors.add(newNode.getAddress());
public boolean sendInvite(Node newNode){
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
int ack_id = generateAck(newNode.getAddress());
buffer.put(MessageType.INVITE);
buffer.putInt(ack_id);
buffer.flip();
return newNode;
}
try {
channel.send(buffer, newNode.getAddress());
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public boolean sendInvite(Node newNode) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
Ack ack = generateAck(newNode.getAddress());
buffer.put(MessageType.INVITE);
buffer.putInt(ack.getId());
buffer.flip();
ack.setBuf(BufferUtil.clone(buffer));
/**
* Adds a new ack, which this node is expecting to receive.
*
* @param addr
* the SocketAddress the ack should be received from
* @return the identifier for this ack
*/
private int generateAck(SocketAddress addr) {
int ack_id = generator.nextInt();
acks.put(ack_id, new Ack(addr));
return ack_id;
}
try {
channel.send(buffer, newNode.getAddress());
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public SocketAddress getAddress() {
return channel.socket().getLocalSocketAddress();}
/**
* Adds a new ack, which this node is expecting to receive.
*
* @param addr
* the SocketAddress the ack should be received from
* @return the identifier for this ack
*/
private Ack generateAck(final SocketAddress addr) {
int ack_id = generator.nextInt();
Ack newAck = new Ack(ack_id, addr, channel);
acks.put(ack_id, newAck);
return newAck;
}
public boolean sendNewNeighbor(SocketAddress receiver, SocketAddress neighbor){
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
int ack_id = generateAck(neighbor);
buffer.put(MessageType.NEW_NEIGHBOR);
buffer.putInt(ack_id);
InetSocketAddress a = (InetSocketAddress) neighbor;
for (String part : a.getHostString().split("\\.")) {
buffer.put(Byte.valueOf(part));}
buffer.putInt(a.getPort());
buffer.flip();
public SocketAddress getAddress() {
return channel.socket().getLocalSocketAddress();
}
try {
channel.send(buffer, receiver);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public boolean sendNewNeighbor(SocketAddress receiver,
SocketAddress neighbor) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
Ack ack = generateAck(neighbor);
buffer.put(MessageType.NEW_NEIGHBOR);
buffer.putInt(ack.getId());
InetSocketAddress a = (InetSocketAddress) neighbor;
for (String part : a.getHostString().split("\\.")) {
buffer.put(Byte.valueOf(part));
}
buffer.putInt(a.getPort());
buffer.flip();
/**
* Sends an acknowledgment message to receiver (who hopefully is expecting
* it)
*
* @param receiver
* the node expecting an ack
* @param ack_id
* the id to identify the ack
*/
private boolean sendAck(SocketAddress receiver, int ack_id) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
buffer.put(MessageType.ACK);
buffer.putInt(ack_id);
buffer.flip();
try {
channel.send(buffer, receiver);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
ack.setBuf(BufferUtil.clone(buffer));
private boolean sendLeave(SocketAddress neighbor){
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
int ack_id = generateAck(neighbor);
buffer.put(MessageType.LEAVE);
buffer.putInt(ack_id);
buffer.flip();
try {
channel.send(buffer, receiver);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
try {
channel.send(buffer, neighbor);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* This node circularly links all neighbors (no mesh!) and removes itself
* from the network.
*/
public void leave() {
LOGGER.log(Level.INFO, "Name: {0}, Leaving...", getName());
/**
* Sends an acknowledgment message to receiver (who hopefully is expecting
* it)
*
* @param receiver
* the node expecting an ack
* @param ack_id
* the id to identify the ack
*/
private boolean sendAck(SocketAddress receiver, int ack_id) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
buffer.put(MessageType.ACK);
buffer.putInt(ack_id);
buffer.flip();
try {
channel.send(buffer, receiver);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
for (int i = 0; i < neighbors.size(); i++) {
if (neighbors.size() > 2) {
int pred = ((i - 1) + neighbors.size()) % neighbors.size();
int succ = (i + 1) % neighbors.size();
sendNewNeighbor(neighbors.get(i), neighbors.get(succ));
sendNewNeighbor(neighbors.get(i), neighbors.get(pred));
} else if (neighbors.size() == 2) {
sendNewNeighbor(neighbors.get(i), neighbors.get(Math.abs(i - 1)));
}
private boolean sendLeave(SocketAddress neighbor) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
Ack ack = generateAck(neighbor);
buffer.put(MessageType.LEAVE);
buffer.putInt(ack.getId());
buffer.flip();
sendLeave(neighbors.get(i));
}
ack.setBuf(BufferUtil.clone(buffer));
try {
if (thread != null) {
udpListen.terminate();
thread.join();}
} catch (InterruptedException e) {
e.printStackTrace();}
}
try {
channel.send(buffer, neighbor);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
public boolean hasNeighbor(SocketAddress addr) {
for (SocketAddress n : neighbors) {
if (n.toString().equals(addr.toString())) {
return true;
}
}
return false;
}
/**
* This node circularly links all neighbors (no mesh!) and removes itself
* from the network.
*/
public void leave() {
LOGGER.log(Level.INFO, "Name: {0}, Leaving...", getName());
public int getNeighborId(SocketAddress addr) {
for (int i = 0; i < neighbors.size(); i++) {
if (neighbors.get(i).toString().equals(addr.toString())) {
return i;
}
}
return -1;
}
for (int i = 0; i < neighbors.size(); i++) {
if (neighbors.size() > 2) {
int pred = ((i - 1) + neighbors.size()) % neighbors.size();
int succ = (i + 1) % neighbors.size();
sendNewNeighbor(neighbors.get(i), neighbors.get(succ));
sendNewNeighbor(neighbors.get(i), neighbors.get(pred));
} else if (neighbors.size() == 2) {
sendNewNeighbor(neighbors.get(i),
neighbors.get(Math.abs(i - 1)));
}
public String getName() {
return this.name;}
sendLeave(neighbors.get(i));
}
public String toString() {
StringBuilder result = new StringBuilder(256);
result.append("Node ");
result.append(getName()).append(", Neighbors: ");
result.append(neighbors);
return result.toString();
}
try {
if (thread != null) {
udpListen.terminate();
thread.join();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private class UDPListen implements Runnable {
private volatile boolean running = true;
public boolean hasNeighbor(SocketAddress addr) {
for (SocketAddress n : neighbors) {
if (n.toString().equals(addr.toString())) {
return true;
}
}
return false;
}
public int getNeighborId(SocketAddress addr) {
for (int i = 0; i < neighbors.size(); i++) {
if (neighbors.get(i).toString().equals(addr.toString())) {
return i;
}
}
return -1;
}
public String getName() {
return this.name;
}
public String toString() {
StringBuilder result = new StringBuilder(256);
result.append("Node ");
result.append(getName()).append(", Neighbors: ");
result.append(neighbors);
return result.toString();
}
private class UDPListen implements Runnable {
private volatile boolean running = true;
private ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
private void receiveInvite(SocketAddress from){
LOGGER.log(Level.INFO, "{0} received invite from {1}", new Object[]{name, from.toString()});
int ack_id = buf.getInt();
sendAck(from, ack_id);
neighbors.add(from);
}
private void receiveInvite(SocketAddress from) {
LOGGER.log(Level.INFO, "{0} received invite from {1}",
new Object[] { name, from.toString() });
int ack_id = buf.getInt();
sendAck(from, ack_id);
neighbors.add(from);
}
private void receiveAck(SocketAddress from){
LOGGER.log(Level.INFO, "{0} received ack from {1}", new Object[]{name, from.toString()});
int ack_id = buf.getInt();
if (!checkAck(from, ack_id)) {
LOGGER.log(Level.WARNING, "Received unexpected ack from: {0}", from.toString());}
}
private void receiveAck(SocketAddress from) {
LOGGER.log(Level.INFO, "{0} received ack from {1}", new Object[] {
name, from.toString() });
int ack_id = buf.getInt();
private void receiveLeave(SocketAddress from){
LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...", new Object[]{name, from.toString()});
if (acks.containsKey(ack_id)) {
Ack theAck = acks.get(ack_id);
if (theAck.check(from)) {
theAck.setReceived();
acks.remove(theAck);
}
}
}
int idToRemove = getNeighborId(from);
if (idToRemove != -1) {
neighbors.remove(idToRemove);
int ack_id = buf.getInt();
sendAck(from, ack_id);
}
// If we don't know that neighbor, we don't have to
// ack
}
private void receiveLeave(SocketAddress from) {
LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...",
new Object[] { name, from.toString() });
private void receiveNewNeighbor(SocketAddress from){
int ack_id = buf.getInt();
StringBuilder theAddr = new StringBuilder();
int idToRemove = getNeighborId(from);
if (idToRemove != -1) {
neighbors.remove(idToRemove);
int ack_id = buf.getInt();
sendAck(from, ack_id);
}
// If we don't know that neighbor, we don't have to
// ack
}
// Read 4 Bytes and 1 Integer = 1 IP address
for (int i = 0; i < 4; i++) {
theAddr.append(buf.get());
if (i < 3){
theAddr.append(".");}
}
int port = buf.getInt();
private void receiveNewNeighbor(SocketAddress from) {
int ack_id = buf.getInt();
StringBuilder theAddr = new StringBuilder();
InetSocketAddress newNeighbor = new InetSocketAddress(theAddr.toString(), port);
// Read 4 Bytes and 1 Integer = 1 IP address
for (int i = 0; i < 4; i++) {
theAddr.append(buf.get());
if (i < 3) {
theAddr.append(".");
}
}
int port = buf.getInt();
if (!hasNeighbor(newNeighbor)) {
// Add this neighbor to my neighbor list if it
// was not present before
neighbors.add(newNeighbor);
InetSocketAddress newNeighbor = new InetSocketAddress(
theAddr.toString(), port);
LOGGER.log(Level.INFO, "{0} from {1} received new neighbor:{2}", new Object[]{name, from.toString(), newNeighbor.toString()});
}
if (!hasNeighbor(newNeighbor)) {
// Add this neighbor to my neighbor list if it
// was not present before
neighbors.add(newNeighbor);
sendAck(from, ack_id);
}
LOGGER.log(Level.INFO,
"{0} from {1} received new neighbor:{2}", new Object[] {
name, from.toString(), newNeighbor.toString() });
}
public void run() {
SocketAddress receivedFrom = null;
sendAck(from, ack_id);
}
while (running) {
try {
receivedFrom = channel.receive(buf);
public void run() {
SocketAddress receivedFrom = null;
// channel.receive() is non blocking. So we need to check if
// something actually has been written to the buffer
if (buf.remaining() != BUF_SIZE) {
buf.flip();
while (running) {
try {
receivedFrom = channel.receive(buf);
byte messageType = buf.get();
// channel.receive() is non blocking. So we need to check if
// something actually has been written to the buffer
if (buf.remaining() != BUF_SIZE) {
buf.flip();
switch (messageType) {
case MessageType.INVITE:
receiveInvite(receivedFrom);
break;
case MessageType.ACK:
receiveAck(receivedFrom);
break;
case MessageType.LEAVE:
receiveLeave(receivedFrom);
break;
case MessageType.NEW_NEIGHBOR:
receiveNewNeighbor(receivedFrom);
break;
default:
LOGGER.log(Level.INFO, "{0} received unknown command from {1}: [{2}]{3}", new Object[]{name, receivedFrom.toString(), messageType, new String(buf.array())});
}
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
buf.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
byte messageType = buf.get();
private boolean checkAck(SocketAddress receivedFrom, int ack_id) {
if (acks.containsKey(ack_id)) {
Ack theAck = acks.get(ack_id);
if (theAck.check(receivedFrom)) {
acks.remove(ack_id);
return true;
}
}
return false;
}
switch (messageType) {
case MessageType.INVITE:
receiveInvite(receivedFrom);
break;
case MessageType.ACK:
receiveAck(receivedFrom);
break;
case MessageType.LEAVE:
receiveLeave(receivedFrom);
break;
case MessageType.NEW_NEIGHBOR:
receiveNewNeighbor(receivedFrom);
break;
default:
LOGGER.log(
Level.INFO,
"{0} received unknown command from {1}: [{2}]{3}",
new Object[] { name,
receivedFrom.toString(),
messageType,
new String(buf.array()) });
}
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
buf.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void terminate() {
running = false;}
}
public void terminate() {
running = false;
}
}
}