From aff17843272d629a4d6c9902cb4d85d545eb16be Mon Sep 17 00:00:00 2001 From: senft-desktop Date: Thu, 29 Nov 2012 17:12:41 +0100 Subject: [PATCH] Added broadcast sending [no answer so far] --- ws2012/P2P/uebungen/4/src/Main.java | 3 + .../P2P/uebungen/4/src/node/MessageType.java | 2 +- ws2012/P2P/uebungen/4/src/node/Node.java | 130 ++++++++++++++++-- 3 files changed, 121 insertions(+), 14 deletions(-) diff --git a/ws2012/P2P/uebungen/4/src/Main.java b/ws2012/P2P/uebungen/4/src/Main.java index b6806d5f..fa0fd23f 100644 --- a/ws2012/P2P/uebungen/4/src/Main.java +++ b/ws2012/P2P/uebungen/4/src/Main.java @@ -40,6 +40,9 @@ public class Main { if (nodes.containsKey(node)) { switch (cmd) { + case "br": + nodes.get(node).gatherInformationOfNetwork(); + break; case "spawn": if (splitted.length > 2) { Node newNode = nodes.get(node).spawn(); diff --git a/ws2012/P2P/uebungen/4/src/node/MessageType.java b/ws2012/P2P/uebungen/4/src/node/MessageType.java index 7421d26d..20f55ab8 100644 --- a/ws2012/P2P/uebungen/4/src/node/MessageType.java +++ b/ws2012/P2P/uebungen/4/src/node/MessageType.java @@ -6,5 +6,5 @@ public class MessageType { public final static byte NEW_NEIGHBOR = 2; public final static byte ACK = 3; public final static byte BROADCAST = 4; - public final static byte STATUS = 4; + public final static byte STATUS = 5; } diff --git a/ws2012/P2P/uebungen/4/src/node/Node.java b/ws2012/P2P/uebungen/4/src/node/Node.java index 18a09983..4648f3e8 100644 --- a/ws2012/P2P/uebungen/4/src/node/Node.java +++ b/ws2012/P2P/uebungen/4/src/node/Node.java @@ -21,6 +21,9 @@ public class Node { private static final int BUF_SIZE = 512; + // How long do we remember broadcast packets (ms)? + private static final int BROADCAST_TIMEOUT = 10000; + private DatagramChannel channel; private String name = "Not initialized"; @@ -29,13 +32,14 @@ public class Node { private Map acks = new HashMap<>(); + private Map broadcastPackets = new HashMap<>(); + private volatile Thread thread; private UDPListen udpListen; private Random generator; public Node() { - // debug System.setProperty("java.net.preferIPv4Stack", "true"); generator = new Random(System.currentTimeMillis()); try { @@ -116,9 +120,11 @@ public class Node { buffer.put(MessageType.NEW_NEIGHBOR); buffer.putInt(ack.getId()); InetSocketAddress a = (InetSocketAddress) neighbor; + for (String part : a.getHostString().split("\\.")) { buffer.put(Byte.valueOf(part)); } + buffer.putInt(a.getPort()); buffer.flip(); @@ -175,6 +181,43 @@ public class Node { return true; } + private void sendBroadcast(byte command, byte[] data) { + ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); + + // Random id? + int packet_id = generator.nextInt(); + + buffer.put(MessageType.BROADCAST); + buffer.putInt(packet_id); + buffer.put(command); + buffer.put(data); + + buffer.flip(); + + // Needed because the buffer gets cleared after first send + byte[] packet = buffer.array(); + + for (SocketAddress n : neighbors) { + try { + channel.send(ByteBuffer.wrap(packet), n); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public void gatherInformationOfNetwork() { + byte[] myAddr; + try { + myAddr = addrToBytes(((InetSocketAddress) channel.getLocalAddress())); + sendBroadcast(MessageType.STATUS, myAddr); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + /** * This node circularly links all neighbors (no mesh!) and removes itself * from the network. @@ -220,6 +263,15 @@ public class Node { return false; } + private byte[] addrToBytes(InetSocketAddress addr) { + ByteBuffer buffer = ByteBuffer.allocate(8); + for (String part : addr.getHostString().split("\\.")) { + buffer.put(Byte.valueOf(part)); + } + buffer.putInt(addr.getPort()); + return buffer.array(); + } + public int getNeighborId(SocketAddress addr) { for (int i = 0; i < neighbors.size(); i++) { if (neighbors.get(i).toString().equals(addr.toString())) { @@ -267,6 +319,17 @@ public class Node { } } + private boolean isNewBroadcast(int id) { + boolean isNew = true; + if (broadcastPackets.containsKey(id)) { + long receivedAt = broadcastPackets.get(id); + if (receivedAt < System.currentTimeMillis() + BROADCAST_TIMEOUT) { + isNew = false; + } + } + return isNew; + } + private void receiveLeave(SocketAddress from) { LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...", new Object[] { name, from.toString() }); @@ -281,21 +344,59 @@ public class Node { // ack } + private void receiveBroadcast(SocketAddress from) { + int packet_id = buf.getInt(); + byte command = buf.get(); + if (isNewBroadcast(packet_id)) { + broadcastPackets.put(packet_id, System.currentTimeMillis()); + + switch (command) { + case MessageType.STATUS: + InetSocketAddress originalSender = readIPFromBuffer(); + + LOGGER.log( + Level.INFO, + "{0}: received status broadcast packet from {1}. original sender: {2}", + new Object[] { name, from.toString(), + originalSender.toString() }); + + // TODO: Answer with 'my' neighbors + + break; + default: + LOGGER.log( + Level.INFO, + "{0}: received unknown broadcast packet from {1}. id: {2} type: {3}", + new Object[] { name, from.toString(), packet_id, + command }); + break; + } + } else { + LOGGER.log(Level.INFO, + "Received duplicate broadcast packet ({0}). Discarding...", + new Object[] { packet_id }); + } + } + + private InetSocketAddress readIPFromBuffer() { + StringBuilder theAddr = new StringBuilder(); + // Read 4 Bytes and 1 Integer = 1 IP address + for (int i = 0; i < 4; i++) { + theAddr.append(buf.get()); + if (i < 3) { + theAddr.append("."); + } + } + int port = buf.getInt(); + + return new InetSocketAddress( + theAddr.toString(), port); + } + private void receiveNewNeighbor(SocketAddress from) { int ack_id = buf.getInt(); - StringBuilder theAddr = new StringBuilder(); - // Read 4 Bytes and 1 Integer = 1 IP address - for (int i = 0; i < 4; i++) { - theAddr.append(buf.get()); - if (i < 3) { - theAddr.append("."); - } - } - int port = buf.getInt(); - - InetSocketAddress newNeighbor = new InetSocketAddress( - theAddr.toString(), port); + InetSocketAddress newNeighbor = readIPFromBuffer(); if (!hasNeighbor(newNeighbor)) { // Add this neighbor to my neighbor list if it @@ -337,6 +438,9 @@ public class Node { case MessageType.NEW_NEIGHBOR: receiveNewNeighbor(receivedFrom); break; + case MessageType.BROADCAST: + receiveBroadcast(receivedFrom); + break; default: LOGGER.log( Level.INFO,