From b8656a57b60f29edc8ad79f7f6b9dd8b2029c26c Mon Sep 17 00:00:00 2001 From: senft-desktop Date: Thu, 29 Nov 2012 18:05:45 +0100 Subject: [PATCH] Moved UDPListen to own thread --- ws2012/P2P/uebungen/4/src/node/Node.java | 238 ++++-------------- .../P2P/uebungen/4/src/node/UDPHandler.java | 201 +++++++++++++++ 2 files changed, 248 insertions(+), 191 deletions(-) create mode 100644 ws2012/P2P/uebungen/4/src/node/UDPHandler.java diff --git a/ws2012/P2P/uebungen/4/src/node/Node.java b/ws2012/P2P/uebungen/4/src/node/Node.java index 86c9ed1a..f0fc3cf7 100644 --- a/ws2012/P2P/uebungen/4/src/node/Node.java +++ b/ws2012/P2P/uebungen/4/src/node/Node.java @@ -19,23 +19,18 @@ public class Node { private final static Logger LOGGER = Logger.getLogger(Node.class.getName()); - private static final int BUF_SIZE = 512; - - // How long do we remember broadcast packets (ms)? - private static final int BROADCAST_TIMEOUT = 10000; + public static final int BUF_SIZE = 512; private DatagramChannel channel; - private String name = "Not initialized"; + private String name = "Not initialized"; private List neighbors = new ArrayList<>(); private Map acks = new HashMap<>(); - private Map broadcastPackets = new HashMap<>(); - private volatile Thread thread; - private UDPListen udpListen; + private UDPHandler udpListen; private Random generator; @@ -49,7 +44,7 @@ public class Node { this.name = channel.socket().getLocalSocketAddress().toString(); - udpListen = new UDPListen(); + udpListen = new UDPHandler(this); thread = new Thread(udpListen); thread.start(); @@ -144,7 +139,7 @@ public class Node { * @param ack_id * the id to identify the ack */ - private boolean sendAck(SocketAddress receiver, int ack_id) { + boolean sendAck(SocketAddress receiver, int ack_id) { ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); buffer.put(MessageType.ACK); buffer.putInt(ack_id); @@ -183,7 +178,7 @@ public class Node { sendBroadcast(packet_id, command, data); } - private void sendBroadcast(int packet_id, byte command, byte[] data) { + void sendBroadcast(int packet_id, byte command, byte[] data) { ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); buffer.put(MessageType.BROADCAST); @@ -219,6 +214,42 @@ public class Node { } + boolean hasAck(int ack_id) { + return acks.containsKey(ack_id); + } + + Ack getAck(int ack_id) { + return acks.get(ack_id); + } + + Ack removeAck(int ack_id) { + return acks.remove(ack_id); + } + + boolean addNeighbor(SocketAddress node) { + if (!hasNeighbor(node)) { + neighbors.add(node); + return true; + } + return false; + } + + /** + * Remove the given address from my list of neighbors. + * + * @param node + * address to remove + * @return True if an address was removed, else false + */ + boolean removeNeighbor(SocketAddress node) { + SocketAddress removed = null; + int idToRemove = getNeighborId(node); + if (idToRemove != -1) { + removed = neighbors.remove(idToRemove); + } + return removed != null; + } + /** * This node circularly links all neighbors (no mesh!) and removes itself * from the network. @@ -264,7 +295,7 @@ public class Node { return false; } - private byte[] addrToBytes(InetSocketAddress addr) { + byte[] addrToBytes(InetSocketAddress addr) { ByteBuffer buffer = ByteBuffer.allocate(8); for (String part : addr.getHostString().split("\\.")) { buffer.put(Byte.valueOf(part)); @@ -286,6 +317,10 @@ public class Node { return this.name; } + public DatagramChannel getChannel() { + return channel; + } + public String toString() { StringBuilder result = new StringBuilder(256); result.append("Node "); @@ -294,183 +329,4 @@ public class Node { return result.toString(); } - private class UDPListen implements Runnable { - private volatile boolean running = true; - private ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); - - private void receiveInvite(SocketAddress from) { - LOGGER.log(Level.INFO, "{0} received invite from {1}", - new Object[] { name, from.toString() }); - int ack_id = buf.getInt(); - sendAck(from, ack_id); - neighbors.add(from); - } - - private void receiveAck(SocketAddress from) { - LOGGER.log(Level.INFO, "{0} received ack from {1}", new Object[] { - name, from.toString() }); - int ack_id = buf.getInt(); - - if (acks.containsKey(ack_id)) { - Ack theAck = acks.get(ack_id); - if (theAck.check(from)) { - theAck.setReceived(); - acks.remove(theAck.getId()); - } - } - } - - private boolean isNewBroadcast(int id) { - boolean isNew = true; - if (broadcastPackets.containsKey(id)) { - long receivedAt = broadcastPackets.get(id); - if (receivedAt < System.currentTimeMillis() + BROADCAST_TIMEOUT) { - isNew = false; - } - } - return isNew; - } - - private void receiveLeave(SocketAddress from) { - LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...", - new Object[] { name, from.toString() }); - - int idToRemove = getNeighborId(from); - if (idToRemove != -1) { - neighbors.remove(idToRemove); - int ack_id = buf.getInt(); - sendAck(from, ack_id); - } - // If we don't know that neighbor, we don't have to - // ack - } - - private void receiveBroadcast(SocketAddress from) { - int packet_id = buf.getInt(); - byte command = buf.get(); - if (isNewBroadcast(packet_id)) { - broadcastPackets.put(packet_id, System.currentTimeMillis()); - - switch (command) { - case MessageType.STATUS: - InetSocketAddress originalSender = readIPFromBuffer(); - - // Broadcast to my neighbors - sendBroadcast(packet_id, command, - addrToBytes(originalSender)); - - LOGGER.log( - Level.INFO, - "{0}: received status broadcast packet from {1}. original sender: {2}", - new Object[] { name, from.toString(), - originalSender.toString() }); - - // TODO: Answer with 'my' neighbors - - break; - default: - LOGGER.log( - Level.INFO, - "{0}: received unknown broadcast packet from {1}. id: {2} type: {3}", - new Object[] { name, from.toString(), packet_id, - command }); - break; - } - } else { - LOGGER.log(Level.INFO, - "Received duplicate broadcast packet ({0}). Discarding...", - new Object[] { packet_id }); - } - } - - private InetSocketAddress readIPFromBuffer() { - StringBuilder theAddr = new StringBuilder(); - // Read 4 Bytes and 1 Integer = 1 IP address - for (int i = 0; i < 4; i++) { - theAddr.append(buf.get()); - if (i < 3) { - theAddr.append("."); - } - } - int port = buf.getInt(); - - return new InetSocketAddress( - theAddr.toString(), port); - } - - private void receiveNewNeighbor(SocketAddress from) { - int ack_id = buf.getInt(); - - InetSocketAddress newNeighbor = readIPFromBuffer(); - - if (!hasNeighbor(newNeighbor)) { - // Add this neighbor to my neighbor list if it - // was not present before - neighbors.add(newNeighbor); - - LOGGER.log(Level.INFO, - "{0} from {1} received new neighbor:{2}", new Object[] { - name, from.toString(), newNeighbor.toString() }); - } - - sendAck(from, ack_id); - } - - public void run() { - SocketAddress receivedFrom = null; - - while (running) { - try { - receivedFrom = channel.receive(buf); - - // channel.receive() is non blocking. So we need to check if - // something actually has been written to the buffer - if (buf.remaining() != BUF_SIZE) { - buf.flip(); - - byte messageType = buf.get(); - - switch (messageType) { - case MessageType.INVITE: - receiveInvite(receivedFrom); - break; - case MessageType.ACK: - receiveAck(receivedFrom); - break; - case MessageType.LEAVE: - receiveLeave(receivedFrom); - break; - case MessageType.NEW_NEIGHBOR: - receiveNewNeighbor(receivedFrom); - break; - case MessageType.BROADCAST: - receiveBroadcast(receivedFrom); - break; - default: - LOGGER.log( - Level.INFO, - "{0} received unknown command from {1}: [{2}]{3}", - new Object[] { name, - receivedFrom.toString(), - messageType, - new String(buf.array()) }); - } - } else { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - buf.clear(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - public void terminate() { - running = false; - } } -} diff --git a/ws2012/P2P/uebungen/4/src/node/UDPHandler.java b/ws2012/P2P/uebungen/4/src/node/UDPHandler.java new file mode 100644 index 00000000..0da1cb9a --- /dev/null +++ b/ws2012/P2P/uebungen/4/src/node/UDPHandler.java @@ -0,0 +1,201 @@ +package node; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class UDPHandler implements Runnable { + private final static Logger LOGGER = Logger.getLogger(Node.class.getName()); + + public static final int BUF_SIZE = 512; + + // How long do we remember broadcast packets (ms)? + private static final int BROADCAST_TIMEOUT = 10000; + + private Map broadcastPackets = new HashMap<>(); + + private volatile boolean running = true; + private ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); + + private Node node; + + public UDPHandler(Node node) { + this.node = node; + } + + private void receiveInvite(SocketAddress from) { + LOGGER.log(Level.INFO, "{0} received invite from {1}", new Object[] { + node.getName(), from.toString() }); + int ack_id = buf.getInt(); + node.sendAck(from, ack_id); + node.addNeighbor(from); + } + + private void receiveAck(SocketAddress from) { + int ack_id = buf.getInt(); + + LOGGER.log(Level.INFO, "{0} received ack ({1}) from {2}", new Object[] { + node.getName(), ack_id, from.toString() }); + + if (node.hasAck(ack_id)) { + Ack theAck = node.getAck(ack_id); + if (theAck.check(from)) { + // theAck.setReceived(); + // node.removeAck(theAck.getId()); + node.removeAck(ack_id).setReceived(); + } + } + } + + private boolean isNewBroadcast(int id) { + boolean isNew = true; + if (broadcastPackets.containsKey(id)) { + long receivedAt = broadcastPackets.get(id); + if (receivedAt < System.currentTimeMillis() + BROADCAST_TIMEOUT) { + isNew = false; + } + } + return isNew; + } + + private void receiveLeave(SocketAddress from) { + LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...", + new Object[] { node.getName(), from.toString() }); + + if (node.removeNeighbor(from)) { + int ack_id = buf.getInt(); + node.sendAck(from, ack_id); + } + // If we don't know that neighbor, we don't have to + // ack + } + + private void receiveBroadcast(SocketAddress from) { + int packet_id = buf.getInt(); + byte command = buf.get(); + if (isNewBroadcast(packet_id)) { + broadcastPackets.put(packet_id, System.currentTimeMillis()); + + switch (command) { + case MessageType.STATUS: + InetSocketAddress originalSender = readIPFromBuffer(); + + // Broadcast to my neighbors + node.sendBroadcast(packet_id, command, + node.addrToBytes(originalSender)); + + LOGGER.log( + Level.INFO, + "{0}: received status broadcast packet from {1}. original sender: {2}", + new Object[] { node.getName(), from.toString(), + originalSender.toString() }); + + // TODO: Answer with 'my' neighbors + + break; + default: + LOGGER.log( + Level.INFO, + "{0}: received unknown broadcast packet from {1}. id: {2} type: {3}", + new Object[] { node.getName(), from.toString(), + packet_id, command }); + break; + } + } else { + LOGGER.log(Level.INFO, + "Received duplicate broadcast packet ({0}). Discarding...", + new Object[] { packet_id }); + } + } + + private InetSocketAddress readIPFromBuffer() { + StringBuilder theAddr = new StringBuilder(); + // Read 4 Bytes and 1 Integer = 1 IP address + for (int i = 0; i < 4; i++) { + theAddr.append(buf.get()); + if (i < 3) { + theAddr.append("."); + } + } + int port = buf.getInt(); + + return new InetSocketAddress(theAddr.toString(), port); + } + + private void receiveNewNeighbor(SocketAddress from) { + int ack_id = buf.getInt(); + + InetSocketAddress newNeighbor = readIPFromBuffer(); + + node.addNeighbor(newNeighbor); + + LOGGER.log( + Level.INFO, + "{0} from {1} received new neighbor:{2}", + new Object[] { node.getName(), from.toString(), + newNeighbor.toString() }); + + node.sendAck(from, ack_id); + } + + public void run() { + SocketAddress receivedFrom = null; + + while (running) { + try { + receivedFrom = node.getChannel().receive(buf); + + // channel.receive() is non blocking. So we need to check if + // something actually has been written to the buffer + if (buf.remaining() != BUF_SIZE) { + buf.flip(); + + byte messageType = buf.get(); + + switch (messageType) { + case MessageType.INVITE: + receiveInvite(receivedFrom); + break; + case MessageType.ACK: + receiveAck(receivedFrom); + break; + case MessageType.LEAVE: + receiveLeave(receivedFrom); + break; + case MessageType.NEW_NEIGHBOR: + receiveNewNeighbor(receivedFrom); + break; + case MessageType.BROADCAST: + receiveBroadcast(receivedFrom); + break; + default: + LOGGER.log( + Level.INFO, + "{0} received unknown command from {1}: [{2}]{3}", + new Object[] { node.getName(), + receivedFrom.toString(), messageType, + new String(buf.array()) }); + } + } else { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + buf.clear(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public void terminate() { + running = false; + } +}