Added broadcast sending [no answer so far]

This commit is contained in:
senft-desktop 2012-11-29 17:12:41 +01:00
parent 82326cad9a
commit aff1784327
3 changed files with 121 additions and 14 deletions

View File

@ -40,6 +40,9 @@ public class Main {
if (nodes.containsKey(node)) {
switch (cmd) {
case "br":
nodes.get(node).gatherInformationOfNetwork();
break;
case "spawn":
if (splitted.length > 2) {
Node newNode = nodes.get(node).spawn();

View File

@ -6,5 +6,5 @@ public class MessageType {
public final static byte NEW_NEIGHBOR = 2;
public final static byte ACK = 3;
public final static byte BROADCAST = 4;
public final static byte STATUS = 4;
public final static byte STATUS = 5;
}

View File

@ -21,6 +21,9 @@ public class Node {
private static final int BUF_SIZE = 512;
// How long do we remember broadcast packets (ms)?
private static final int BROADCAST_TIMEOUT = 10000;
private DatagramChannel channel;
private String name = "Not initialized";
@ -29,13 +32,14 @@ public class Node {
private Map<Integer, Ack> acks = new HashMap<>();
private Map<Integer, Long> broadcastPackets = new HashMap<>();
private volatile Thread thread;
private UDPListen udpListen;
private Random generator;
public Node() {
// debug
System.setProperty("java.net.preferIPv4Stack", "true");
generator = new Random(System.currentTimeMillis());
try {
@ -116,9 +120,11 @@ public class Node {
buffer.put(MessageType.NEW_NEIGHBOR);
buffer.putInt(ack.getId());
InetSocketAddress a = (InetSocketAddress) neighbor;
for (String part : a.getHostString().split("\\.")) {
buffer.put(Byte.valueOf(part));
}
buffer.putInt(a.getPort());
buffer.flip();
@ -175,6 +181,43 @@ public class Node {
return true;
}
private void sendBroadcast(byte command, byte[] data) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
// Random id?
int packet_id = generator.nextInt();
buffer.put(MessageType.BROADCAST);
buffer.putInt(packet_id);
buffer.put(command);
buffer.put(data);
buffer.flip();
// Needed because the buffer gets cleared after first send
byte[] packet = buffer.array();
for (SocketAddress n : neighbors) {
try {
channel.send(ByteBuffer.wrap(packet), n);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void gatherInformationOfNetwork() {
byte[] myAddr;
try {
myAddr = addrToBytes(((InetSocketAddress) channel.getLocalAddress()));
sendBroadcast(MessageType.STATUS, myAddr);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* This node circularly links all neighbors (no mesh!) and removes itself
* from the network.
@ -220,6 +263,15 @@ public class Node {
return false;
}
private byte[] addrToBytes(InetSocketAddress addr) {
ByteBuffer buffer = ByteBuffer.allocate(8);
for (String part : addr.getHostString().split("\\.")) {
buffer.put(Byte.valueOf(part));
}
buffer.putInt(addr.getPort());
return buffer.array();
}
public int getNeighborId(SocketAddress addr) {
for (int i = 0; i < neighbors.size(); i++) {
if (neighbors.get(i).toString().equals(addr.toString())) {
@ -267,6 +319,17 @@ public class Node {
}
}
private boolean isNewBroadcast(int id) {
boolean isNew = true;
if (broadcastPackets.containsKey(id)) {
long receivedAt = broadcastPackets.get(id);
if (receivedAt < System.currentTimeMillis() + BROADCAST_TIMEOUT) {
isNew = false;
}
}
return isNew;
}
private void receiveLeave(SocketAddress from) {
LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...",
new Object[] { name, from.toString() });
@ -281,21 +344,59 @@ public class Node {
// ack
}
private void receiveBroadcast(SocketAddress from) {
int packet_id = buf.getInt();
byte command = buf.get();
if (isNewBroadcast(packet_id)) {
broadcastPackets.put(packet_id, System.currentTimeMillis());
switch (command) {
case MessageType.STATUS:
InetSocketAddress originalSender = readIPFromBuffer();
LOGGER.log(
Level.INFO,
"{0}: received status broadcast packet from {1}. original sender: {2}",
new Object[] { name, from.toString(),
originalSender.toString() });
// TODO: Answer with 'my' neighbors
break;
default:
LOGGER.log(
Level.INFO,
"{0}: received unknown broadcast packet from {1}. id: {2} type: {3}",
new Object[] { name, from.toString(), packet_id,
command });
break;
}
} else {
LOGGER.log(Level.INFO,
"Received duplicate broadcast packet ({0}). Discarding...",
new Object[] { packet_id });
}
}
private InetSocketAddress readIPFromBuffer() {
StringBuilder theAddr = new StringBuilder();
// Read 4 Bytes and 1 Integer = 1 IP address
for (int i = 0; i < 4; i++) {
theAddr.append(buf.get());
if (i < 3) {
theAddr.append(".");
}
}
int port = buf.getInt();
return new InetSocketAddress(
theAddr.toString(), port);
}
private void receiveNewNeighbor(SocketAddress from) {
int ack_id = buf.getInt();
StringBuilder theAddr = new StringBuilder();
// Read 4 Bytes and 1 Integer = 1 IP address
for (int i = 0; i < 4; i++) {
theAddr.append(buf.get());
if (i < 3) {
theAddr.append(".");
}
}
int port = buf.getInt();
InetSocketAddress newNeighbor = new InetSocketAddress(
theAddr.toString(), port);
InetSocketAddress newNeighbor = readIPFromBuffer();
if (!hasNeighbor(newNeighbor)) {
// Add this neighbor to my neighbor list if it
@ -337,6 +438,9 @@ public class Node {
case MessageType.NEW_NEIGHBOR:
receiveNewNeighbor(receivedFrom);
break;
case MessageType.BROADCAST:
receiveBroadcast(receivedFrom);
break;
default:
LOGGER.log(
Level.INFO,