Moved UDPListen to own thread

This commit is contained in:
senft-desktop 2012-11-29 18:05:45 +01:00
parent 1a61fdab0e
commit b8656a57b6
2 changed files with 248 additions and 191 deletions

View File

@ -19,23 +19,18 @@ public class Node {
private final static Logger LOGGER = Logger.getLogger(Node.class.getName());
private static final int BUF_SIZE = 512;
// How long do we remember broadcast packets (ms)?
private static final int BROADCAST_TIMEOUT = 10000;
public static final int BUF_SIZE = 512;
private DatagramChannel channel;
private String name = "Not initialized";
private String name = "Not initialized";
private List<SocketAddress> neighbors = new ArrayList<>();
private Map<Integer, Ack> acks = new HashMap<>();
private Map<Integer, Long> broadcastPackets = new HashMap<>();
private volatile Thread thread;
private UDPListen udpListen;
private UDPHandler udpListen;
private Random generator;
@ -49,7 +44,7 @@ public class Node {
this.name = channel.socket().getLocalSocketAddress().toString();
udpListen = new UDPListen();
udpListen = new UDPHandler(this);
thread = new Thread(udpListen);
thread.start();
@ -144,7 +139,7 @@ public class Node {
* @param ack_id
* the id to identify the ack
*/
private boolean sendAck(SocketAddress receiver, int ack_id) {
boolean sendAck(SocketAddress receiver, int ack_id) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
buffer.put(MessageType.ACK);
buffer.putInt(ack_id);
@ -183,7 +178,7 @@ public class Node {
sendBroadcast(packet_id, command, data);
}
private void sendBroadcast(int packet_id, byte command, byte[] data) {
void sendBroadcast(int packet_id, byte command, byte[] data) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
buffer.put(MessageType.BROADCAST);
@ -219,6 +214,42 @@ public class Node {
}
boolean hasAck(int ack_id) {
return acks.containsKey(ack_id);
}
Ack getAck(int ack_id) {
return acks.get(ack_id);
}
Ack removeAck(int ack_id) {
return acks.remove(ack_id);
}
boolean addNeighbor(SocketAddress node) {
if (!hasNeighbor(node)) {
neighbors.add(node);
return true;
}
return false;
}
/**
* Remove the given address from my list of neighbors.
*
* @param node
* address to remove
* @return True if an address was removed, else false
*/
boolean removeNeighbor(SocketAddress node) {
SocketAddress removed = null;
int idToRemove = getNeighborId(node);
if (idToRemove != -1) {
removed = neighbors.remove(idToRemove);
}
return removed != null;
}
/**
* This node circularly links all neighbors (no mesh!) and removes itself
* from the network.
@ -264,7 +295,7 @@ public class Node {
return false;
}
private byte[] addrToBytes(InetSocketAddress addr) {
byte[] addrToBytes(InetSocketAddress addr) {
ByteBuffer buffer = ByteBuffer.allocate(8);
for (String part : addr.getHostString().split("\\.")) {
buffer.put(Byte.valueOf(part));
@ -286,6 +317,10 @@ public class Node {
return this.name;
}
public DatagramChannel getChannel() {
return channel;
}
public String toString() {
StringBuilder result = new StringBuilder(256);
result.append("Node ");
@ -294,183 +329,4 @@ public class Node {
return result.toString();
}
private class UDPListen implements Runnable {
private volatile boolean running = true;
private ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
private void receiveInvite(SocketAddress from) {
LOGGER.log(Level.INFO, "{0} received invite from {1}",
new Object[] { name, from.toString() });
int ack_id = buf.getInt();
sendAck(from, ack_id);
neighbors.add(from);
}
private void receiveAck(SocketAddress from) {
LOGGER.log(Level.INFO, "{0} received ack from {1}", new Object[] {
name, from.toString() });
int ack_id = buf.getInt();
if (acks.containsKey(ack_id)) {
Ack theAck = acks.get(ack_id);
if (theAck.check(from)) {
theAck.setReceived();
acks.remove(theAck.getId());
}
}
}
private boolean isNewBroadcast(int id) {
boolean isNew = true;
if (broadcastPackets.containsKey(id)) {
long receivedAt = broadcastPackets.get(id);
if (receivedAt < System.currentTimeMillis() + BROADCAST_TIMEOUT) {
isNew = false;
}
}
return isNew;
}
private void receiveLeave(SocketAddress from) {
LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...",
new Object[] { name, from.toString() });
int idToRemove = getNeighborId(from);
if (idToRemove != -1) {
neighbors.remove(idToRemove);
int ack_id = buf.getInt();
sendAck(from, ack_id);
}
// If we don't know that neighbor, we don't have to
// ack
}
private void receiveBroadcast(SocketAddress from) {
int packet_id = buf.getInt();
byte command = buf.get();
if (isNewBroadcast(packet_id)) {
broadcastPackets.put(packet_id, System.currentTimeMillis());
switch (command) {
case MessageType.STATUS:
InetSocketAddress originalSender = readIPFromBuffer();
// Broadcast to my neighbors
sendBroadcast(packet_id, command,
addrToBytes(originalSender));
LOGGER.log(
Level.INFO,
"{0}: received status broadcast packet from {1}. original sender: {2}",
new Object[] { name, from.toString(),
originalSender.toString() });
// TODO: Answer with 'my' neighbors
break;
default:
LOGGER.log(
Level.INFO,
"{0}: received unknown broadcast packet from {1}. id: {2} type: {3}",
new Object[] { name, from.toString(), packet_id,
command });
break;
}
} else {
LOGGER.log(Level.INFO,
"Received duplicate broadcast packet ({0}). Discarding...",
new Object[] { packet_id });
}
}
private InetSocketAddress readIPFromBuffer() {
StringBuilder theAddr = new StringBuilder();
// Read 4 Bytes and 1 Integer = 1 IP address
for (int i = 0; i < 4; i++) {
theAddr.append(buf.get());
if (i < 3) {
theAddr.append(".");
}
}
int port = buf.getInt();
return new InetSocketAddress(
theAddr.toString(), port);
}
private void receiveNewNeighbor(SocketAddress from) {
int ack_id = buf.getInt();
InetSocketAddress newNeighbor = readIPFromBuffer();
if (!hasNeighbor(newNeighbor)) {
// Add this neighbor to my neighbor list if it
// was not present before
neighbors.add(newNeighbor);
LOGGER.log(Level.INFO,
"{0} from {1} received new neighbor:{2}", new Object[] {
name, from.toString(), newNeighbor.toString() });
}
sendAck(from, ack_id);
}
public void run() {
SocketAddress receivedFrom = null;
while (running) {
try {
receivedFrom = channel.receive(buf);
// channel.receive() is non blocking. So we need to check if
// something actually has been written to the buffer
if (buf.remaining() != BUF_SIZE) {
buf.flip();
byte messageType = buf.get();
switch (messageType) {
case MessageType.INVITE:
receiveInvite(receivedFrom);
break;
case MessageType.ACK:
receiveAck(receivedFrom);
break;
case MessageType.LEAVE:
receiveLeave(receivedFrom);
break;
case MessageType.NEW_NEIGHBOR:
receiveNewNeighbor(receivedFrom);
break;
case MessageType.BROADCAST:
receiveBroadcast(receivedFrom);
break;
default:
LOGGER.log(
Level.INFO,
"{0} received unknown command from {1}: [{2}]{3}",
new Object[] { name,
receivedFrom.toString(),
messageType,
new String(buf.array()) });
}
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
buf.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void terminate() {
running = false;
}
}
}

View File

@ -0,0 +1,201 @@
package node;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
public class UDPHandler implements Runnable {
private final static Logger LOGGER = Logger.getLogger(Node.class.getName());
public static final int BUF_SIZE = 512;
// How long do we remember broadcast packets (ms)?
private static final int BROADCAST_TIMEOUT = 10000;
private Map<Integer, Long> broadcastPackets = new HashMap<>();
private volatile boolean running = true;
private ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
private Node node;
public UDPHandler(Node node) {
this.node = node;
}
private void receiveInvite(SocketAddress from) {
LOGGER.log(Level.INFO, "{0} received invite from {1}", new Object[] {
node.getName(), from.toString() });
int ack_id = buf.getInt();
node.sendAck(from, ack_id);
node.addNeighbor(from);
}
private void receiveAck(SocketAddress from) {
int ack_id = buf.getInt();
LOGGER.log(Level.INFO, "{0} received ack ({1}) from {2}", new Object[] {
node.getName(), ack_id, from.toString() });
if (node.hasAck(ack_id)) {
Ack theAck = node.getAck(ack_id);
if (theAck.check(from)) {
// theAck.setReceived();
// node.removeAck(theAck.getId());
node.removeAck(ack_id).setReceived();
}
}
}
private boolean isNewBroadcast(int id) {
boolean isNew = true;
if (broadcastPackets.containsKey(id)) {
long receivedAt = broadcastPackets.get(id);
if (receivedAt < System.currentTimeMillis() + BROADCAST_TIMEOUT) {
isNew = false;
}
}
return isNew;
}
private void receiveLeave(SocketAddress from) {
LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...",
new Object[] { node.getName(), from.toString() });
if (node.removeNeighbor(from)) {
int ack_id = buf.getInt();
node.sendAck(from, ack_id);
}
// If we don't know that neighbor, we don't have to
// ack
}
private void receiveBroadcast(SocketAddress from) {
int packet_id = buf.getInt();
byte command = buf.get();
if (isNewBroadcast(packet_id)) {
broadcastPackets.put(packet_id, System.currentTimeMillis());
switch (command) {
case MessageType.STATUS:
InetSocketAddress originalSender = readIPFromBuffer();
// Broadcast to my neighbors
node.sendBroadcast(packet_id, command,
node.addrToBytes(originalSender));
LOGGER.log(
Level.INFO,
"{0}: received status broadcast packet from {1}. original sender: {2}",
new Object[] { node.getName(), from.toString(),
originalSender.toString() });
// TODO: Answer with 'my' neighbors
break;
default:
LOGGER.log(
Level.INFO,
"{0}: received unknown broadcast packet from {1}. id: {2} type: {3}",
new Object[] { node.getName(), from.toString(),
packet_id, command });
break;
}
} else {
LOGGER.log(Level.INFO,
"Received duplicate broadcast packet ({0}). Discarding...",
new Object[] { packet_id });
}
}
private InetSocketAddress readIPFromBuffer() {
StringBuilder theAddr = new StringBuilder();
// Read 4 Bytes and 1 Integer = 1 IP address
for (int i = 0; i < 4; i++) {
theAddr.append(buf.get());
if (i < 3) {
theAddr.append(".");
}
}
int port = buf.getInt();
return new InetSocketAddress(theAddr.toString(), port);
}
private void receiveNewNeighbor(SocketAddress from) {
int ack_id = buf.getInt();
InetSocketAddress newNeighbor = readIPFromBuffer();
node.addNeighbor(newNeighbor);
LOGGER.log(
Level.INFO,
"{0} from {1} received new neighbor:{2}",
new Object[] { node.getName(), from.toString(),
newNeighbor.toString() });
node.sendAck(from, ack_id);
}
public void run() {
SocketAddress receivedFrom = null;
while (running) {
try {
receivedFrom = node.getChannel().receive(buf);
// channel.receive() is non blocking. So we need to check if
// something actually has been written to the buffer
if (buf.remaining() != BUF_SIZE) {
buf.flip();
byte messageType = buf.get();
switch (messageType) {
case MessageType.INVITE:
receiveInvite(receivedFrom);
break;
case MessageType.ACK:
receiveAck(receivedFrom);
break;
case MessageType.LEAVE:
receiveLeave(receivedFrom);
break;
case MessageType.NEW_NEIGHBOR:
receiveNewNeighbor(receivedFrom);
break;
case MessageType.BROADCAST:
receiveBroadcast(receivedFrom);
break;
default:
LOGGER.log(
Level.INFO,
"{0} received unknown command from {1}: [{2}]{3}",
new Object[] { node.getName(),
receivedFrom.toString(), messageType,
new String(buf.array()) });
}
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
buf.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void terminate() {
running = false;
}
}