diff --git a/ws2012/P2P/uebungen/4/src/node/Ack.java b/ws2012/P2P/uebungen/4/src/node/Ack.java index 29bb167d..5ba41210 100644 --- a/ws2012/P2P/uebungen/4/src/node/Ack.java +++ b/ws2012/P2P/uebungen/4/src/node/Ack.java @@ -1,15 +1,44 @@ package node; +import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.DatagramChannel; +import java.util.logging.Level; +import java.util.logging.Logger; public class Ack { + private final static Logger LOGGER = Logger.getLogger(Ack.class.getName()); + // timeout in seconds + private final int TIMEOUT = 5000; + + private int id; private SocketAddress address; private ByteBuffer buf; - public Ack(SocketAddress address) { + private TimeoutThread timeout; + private volatile Thread thread; + + // The channel to re-send the message on + private DatagramChannel channel; + + public Ack(int id, SocketAddress address, DatagramChannel channel) { + this.id = id; this.address = address; + this.channel = channel; + startThread(); + } + + private void startThread() { + LOGGER.log(Level.INFO, "Starting timeout thread for ack #" + id); + timeout = new TimeoutThread(); + thread = new Thread(timeout); + thread.start(); + } + + public int getId() { + return id; } public boolean check(SocketAddress address) { @@ -23,4 +52,69 @@ public class Ack { public void setBuf(ByteBuffer buf) { this.buf = buf; } + + public void setReceived() { + // Stop thread + try { + if (thread != null) { + timeout.terminate(); + thread.join(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + private class TimeoutThread implements Runnable { + private volatile boolean notReceived = true; + + // When do we stop expecting an ack + private long timeToStop = System.currentTimeMillis() + TIMEOUT; + + @Override + public void run() { + while (notReceived && System.currentTimeMillis() < timeToStop) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + // Timeout hit -> re-send + if (notReceived) { + try { + LOGGER.log(Level.INFO, "Absent ack #" + id + + ". Resending to " + address.toString()); + + /** + * TODO: This would be the intuitive order (first re-send, + * then start the new TimeoutThread), right? Unfortunately + * this gives ugly log outputs, because the re-sent message + * arrives before the new thread is constructed, so we get: + * + *
+					 * [2012-11-28 07:53:05 PM] node.Node INFO:  Initialized node /127.0.0.1:37179 
+					 * a spawn b
+					 * [2012-11-28 07:53:15 PM] node.Node INFO:  Initialized node /127.0.0.1:35358
+					 * [2012-11-28 07:53:15 PM] node.Ack INFO:  Starting timeout thread for ack #-1276001492 
+					 * [2012-11-28 07:53:15 PM] node.Node INFO:  /127.0.0.1:35358 received invite from /127.0.0.1:37179 
+					 * [2012-11-28 07:53:20 PM] node.Ack INFO:  Absent ack #-1276001492). Resending to /127.0.0.1:35358
+					 * 
+ * + * No big deal, and we could just swap the statements, but + * meh... + */ + channel.send(buf, address); + startThread(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public void terminate() { + notReceived = false; + } + } } \ No newline at end of file diff --git a/ws2012/P2P/uebungen/4/src/node/Node.java b/ws2012/P2P/uebungen/4/src/node/Node.java index c291b6ce..6cba241e 100644 --- a/ws2012/P2P/uebungen/4/src/node/Node.java +++ b/ws2012/P2P/uebungen/4/src/node/Node.java @@ -13,322 +13,350 @@ import java.util.Random; import java.util.logging.Level; import java.util.logging.Logger; +import util.BufferUtil; + public class Node { - private final static Logger LOGGER = Logger.getLogger(Node.class.getName()); + private final static Logger LOGGER = Logger.getLogger(Node.class.getName()); - private static final int BUF_SIZE = 512; + private static final int BUF_SIZE = 512; - private DatagramChannel channel; + private DatagramChannel channel; - private String name = "Not initialized"; + private String name = "Not initialized"; - private List neighbors = new ArrayList<>(); + private List neighbors = new ArrayList<>(); - private Map acks = new HashMap<>(); + private Map acks = new HashMap<>(); - private volatile Thread thread; - private UDPListen udpListen; + private volatile Thread thread; + private UDPListen udpListen; - private Random generator; + private Random generator; - public Node() { - //debug - System.setProperty("java.net.preferIPv4Stack", "true"); - generator = new Random(System.currentTimeMillis()); - try { - channel = DatagramChannel.open(); - channel.socket().bind(new InetSocketAddress("localhost", 0)); - channel.configureBlocking(false); + public Node() { + // debug + System.setProperty("java.net.preferIPv4Stack", "true"); + generator = new Random(System.currentTimeMillis()); + try { + channel = DatagramChannel.open(); + channel.socket().bind(new InetSocketAddress("localhost", 0)); + channel.configureBlocking(false); - this.name = channel.socket().getLocalSocketAddress().toString(); + this.name = channel.socket().getLocalSocketAddress().toString(); - udpListen = new UDPListen(); + udpListen = new UDPListen(); thread = new Thread(udpListen); thread.start(); - LOGGER.log(Level.INFO, "Initialized node {0}", name); - } catch (IOException e) { - e.printStackTrace(); - } - } + LOGGER.log(Level.INFO, "Initialized node {0}", name); + } catch (IOException e) { + e.printStackTrace(); + } + } - /** - * Create another peer, mutually link creator and spawn. - * - * @return the spawned Node - * @throws IOException - * if no connection could be established to the new node - */ - public Node spawn() throws IOException { - // LOGGER.info("Name: " + getName() + ", Spawning new node."); - Node newNode = new Node(); - - sendInvite(newNode); - neighbors.add(newNode.getAddress()); + /** + * Create another peer, mutually link creator and spawn. + * + * @return the spawned Node + * @throws IOException + * if no connection could be established to the new node + */ + public Node spawn() throws IOException { + // LOGGER.info("Name: " + getName() + ", Spawning new node."); + Node newNode = new Node(); - return newNode; - } + sendInvite(newNode); + neighbors.add(newNode.getAddress()); - public boolean sendInvite(Node newNode){ - ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); - int ack_id = generateAck(newNode.getAddress()); - buffer.put(MessageType.INVITE); - buffer.putInt(ack_id); - buffer.flip(); + return newNode; + } - try { - channel.send(buffer, newNode.getAddress()); - } catch (IOException e) { - e.printStackTrace(); - return false; - } - return true; - } + public boolean sendInvite(Node newNode) { + ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); + Ack ack = generateAck(newNode.getAddress()); + buffer.put(MessageType.INVITE); + buffer.putInt(ack.getId()); + buffer.flip(); + + ack.setBuf(BufferUtil.clone(buffer)); - /** - * Adds a new ack, which this node is expecting to receive. - * - * @param addr - * the SocketAddress the ack should be received from - * @return the identifier for this ack - */ - private int generateAck(SocketAddress addr) { - int ack_id = generator.nextInt(); - acks.put(ack_id, new Ack(addr)); - return ack_id; - } + try { + channel.send(buffer, newNode.getAddress()); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + return true; + } - public SocketAddress getAddress() { - return channel.socket().getLocalSocketAddress();} + /** + * Adds a new ack, which this node is expecting to receive. + * + * @param addr + * the SocketAddress the ack should be received from + * @return the identifier for this ack + */ + private Ack generateAck(final SocketAddress addr) { + int ack_id = generator.nextInt(); + Ack newAck = new Ack(ack_id, addr, channel); + + acks.put(ack_id, newAck); + return newAck; + } - public boolean sendNewNeighbor(SocketAddress receiver, SocketAddress neighbor){ - ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); - int ack_id = generateAck(neighbor); - buffer.put(MessageType.NEW_NEIGHBOR); - buffer.putInt(ack_id); - InetSocketAddress a = (InetSocketAddress) neighbor; - for (String part : a.getHostString().split("\\.")) { - buffer.put(Byte.valueOf(part));} - buffer.putInt(a.getPort()); - buffer.flip(); + public SocketAddress getAddress() { + return channel.socket().getLocalSocketAddress(); + } - try { - channel.send(buffer, receiver); - } catch (IOException e) { - e.printStackTrace(); - return false; - } - return true; - } + public boolean sendNewNeighbor(SocketAddress receiver, + SocketAddress neighbor) { + ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); + Ack ack = generateAck(neighbor); + buffer.put(MessageType.NEW_NEIGHBOR); + buffer.putInt(ack.getId()); + InetSocketAddress a = (InetSocketAddress) neighbor; + for (String part : a.getHostString().split("\\.")) { + buffer.put(Byte.valueOf(part)); + } + buffer.putInt(a.getPort()); + buffer.flip(); - /** - * Sends an acknowledgment message to receiver (who hopefully is expecting - * it) - * - * @param receiver - * the node expecting an ack - * @param ack_id - * the id to identify the ack - */ - private boolean sendAck(SocketAddress receiver, int ack_id) { - ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); - buffer.put(MessageType.ACK); - buffer.putInt(ack_id); - buffer.flip(); - try { - channel.send(buffer, receiver); - } catch (IOException e) { - e.printStackTrace(); - return false; - } - return true; - } + ack.setBuf(BufferUtil.clone(buffer)); - private boolean sendLeave(SocketAddress neighbor){ - ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); - int ack_id = generateAck(neighbor); - buffer.put(MessageType.LEAVE); - buffer.putInt(ack_id); - buffer.flip(); + try { + channel.send(buffer, receiver); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + return true; + } - try { - channel.send(buffer, neighbor); - } catch (IOException e) { - e.printStackTrace(); - return false; - } - return true; - } - /** - * This node circularly links all neighbors (no mesh!) and removes itself - * from the network. - */ - public void leave() { - LOGGER.log(Level.INFO, "Name: {0}, Leaving...", getName()); + /** + * Sends an acknowledgment message to receiver (who hopefully is expecting + * it) + * + * @param receiver + * the node expecting an ack + * @param ack_id + * the id to identify the ack + */ + private boolean sendAck(SocketAddress receiver, int ack_id) { + ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); + buffer.put(MessageType.ACK); + buffer.putInt(ack_id); + buffer.flip(); + + try { + channel.send(buffer, receiver); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + return true; + } - for (int i = 0; i < neighbors.size(); i++) { - if (neighbors.size() > 2) { - int pred = ((i - 1) + neighbors.size()) % neighbors.size(); - int succ = (i + 1) % neighbors.size(); - sendNewNeighbor(neighbors.get(i), neighbors.get(succ)); - sendNewNeighbor(neighbors.get(i), neighbors.get(pred)); - } else if (neighbors.size() == 2) { - sendNewNeighbor(neighbors.get(i), neighbors.get(Math.abs(i - 1))); - } + private boolean sendLeave(SocketAddress neighbor) { + ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); + Ack ack = generateAck(neighbor); + buffer.put(MessageType.LEAVE); + buffer.putInt(ack.getId()); + buffer.flip(); - sendLeave(neighbors.get(i)); - } + ack.setBuf(BufferUtil.clone(buffer)); - try { - if (thread != null) { - udpListen.terminate(); - thread.join();} - } catch (InterruptedException e) { - e.printStackTrace();} - } + try { + channel.send(buffer, neighbor); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + return true; + } - public boolean hasNeighbor(SocketAddress addr) { - for (SocketAddress n : neighbors) { - if (n.toString().equals(addr.toString())) { - return true; - } - } - return false; - } + /** + * This node circularly links all neighbors (no mesh!) and removes itself + * from the network. + */ + public void leave() { + LOGGER.log(Level.INFO, "Name: {0}, Leaving...", getName()); - public int getNeighborId(SocketAddress addr) { - for (int i = 0; i < neighbors.size(); i++) { - if (neighbors.get(i).toString().equals(addr.toString())) { - return i; - } - } - return -1; - } + for (int i = 0; i < neighbors.size(); i++) { + if (neighbors.size() > 2) { + int pred = ((i - 1) + neighbors.size()) % neighbors.size(); + int succ = (i + 1) % neighbors.size(); + sendNewNeighbor(neighbors.get(i), neighbors.get(succ)); + sendNewNeighbor(neighbors.get(i), neighbors.get(pred)); + } else if (neighbors.size() == 2) { + sendNewNeighbor(neighbors.get(i), + neighbors.get(Math.abs(i - 1))); + } - public String getName() { - return this.name;} + sendLeave(neighbors.get(i)); + } - public String toString() { - StringBuilder result = new StringBuilder(256); - result.append("Node "); - result.append(getName()).append(", Neighbors: "); - result.append(neighbors); - return result.toString(); - } + try { + if (thread != null) { + udpListen.terminate(); + thread.join(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } - private class UDPListen implements Runnable { - private volatile boolean running = true; + public boolean hasNeighbor(SocketAddress addr) { + for (SocketAddress n : neighbors) { + if (n.toString().equals(addr.toString())) { + return true; + } + } + return false; + } + + public int getNeighborId(SocketAddress addr) { + for (int i = 0; i < neighbors.size(); i++) { + if (neighbors.get(i).toString().equals(addr.toString())) { + return i; + } + } + return -1; + } + + public String getName() { + return this.name; + } + + public String toString() { + StringBuilder result = new StringBuilder(256); + result.append("Node "); + result.append(getName()).append(", Neighbors: "); + result.append(neighbors); + return result.toString(); + } + + private class UDPListen implements Runnable { + private volatile boolean running = true; private ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); - private void receiveInvite(SocketAddress from){ - LOGGER.log(Level.INFO, "{0} received invite from {1}", new Object[]{name, from.toString()}); - int ack_id = buf.getInt(); - sendAck(from, ack_id); - neighbors.add(from); - } + private void receiveInvite(SocketAddress from) { + LOGGER.log(Level.INFO, "{0} received invite from {1}", + new Object[] { name, from.toString() }); + int ack_id = buf.getInt(); + sendAck(from, ack_id); + neighbors.add(from); + } - private void receiveAck(SocketAddress from){ - LOGGER.log(Level.INFO, "{0} received ack from {1}", new Object[]{name, from.toString()}); - int ack_id = buf.getInt(); - if (!checkAck(from, ack_id)) { - LOGGER.log(Level.WARNING, "Received unexpected ack from: {0}", from.toString());} - } + private void receiveAck(SocketAddress from) { + LOGGER.log(Level.INFO, "{0} received ack from {1}", new Object[] { + name, from.toString() }); + int ack_id = buf.getInt(); - private void receiveLeave(SocketAddress from){ - LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...", new Object[]{name, from.toString()}); + if (acks.containsKey(ack_id)) { + Ack theAck = acks.get(ack_id); + if (theAck.check(from)) { + theAck.setReceived(); + acks.remove(theAck); + } + } + } - int idToRemove = getNeighborId(from); - if (idToRemove != -1) { - neighbors.remove(idToRemove); - int ack_id = buf.getInt(); - sendAck(from, ack_id); - } - // If we don't know that neighbor, we don't have to - // ack - } + private void receiveLeave(SocketAddress from) { + LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...", + new Object[] { name, from.toString() }); - private void receiveNewNeighbor(SocketAddress from){ - int ack_id = buf.getInt(); - StringBuilder theAddr = new StringBuilder(); + int idToRemove = getNeighborId(from); + if (idToRemove != -1) { + neighbors.remove(idToRemove); + int ack_id = buf.getInt(); + sendAck(from, ack_id); + } + // If we don't know that neighbor, we don't have to + // ack + } - // Read 4 Bytes and 1 Integer = 1 IP address - for (int i = 0; i < 4; i++) { - theAddr.append(buf.get()); - if (i < 3){ - theAddr.append(".");} - } - int port = buf.getInt(); + private void receiveNewNeighbor(SocketAddress from) { + int ack_id = buf.getInt(); + StringBuilder theAddr = new StringBuilder(); - InetSocketAddress newNeighbor = new InetSocketAddress(theAddr.toString(), port); + // Read 4 Bytes and 1 Integer = 1 IP address + for (int i = 0; i < 4; i++) { + theAddr.append(buf.get()); + if (i < 3) { + theAddr.append("."); + } + } + int port = buf.getInt(); - if (!hasNeighbor(newNeighbor)) { - // Add this neighbor to my neighbor list if it - // was not present before - neighbors.add(newNeighbor); + InetSocketAddress newNeighbor = new InetSocketAddress( + theAddr.toString(), port); - LOGGER.log(Level.INFO, "{0} from {1} received new neighbor:{2}", new Object[]{name, from.toString(), newNeighbor.toString()}); - } + if (!hasNeighbor(newNeighbor)) { + // Add this neighbor to my neighbor list if it + // was not present before + neighbors.add(newNeighbor); - sendAck(from, ack_id); - } + LOGGER.log(Level.INFO, + "{0} from {1} received new neighbor:{2}", new Object[] { + name, from.toString(), newNeighbor.toString() }); + } - public void run() { - SocketAddress receivedFrom = null; + sendAck(from, ack_id); + } - while (running) { - try { - receivedFrom = channel.receive(buf); + public void run() { + SocketAddress receivedFrom = null; - // channel.receive() is non blocking. So we need to check if - // something actually has been written to the buffer - if (buf.remaining() != BUF_SIZE) { - buf.flip(); + while (running) { + try { + receivedFrom = channel.receive(buf); - byte messageType = buf.get(); + // channel.receive() is non blocking. So we need to check if + // something actually has been written to the buffer + if (buf.remaining() != BUF_SIZE) { + buf.flip(); - switch (messageType) { - case MessageType.INVITE: - receiveInvite(receivedFrom); - break; - case MessageType.ACK: - receiveAck(receivedFrom); - break; - case MessageType.LEAVE: - receiveLeave(receivedFrom); - break; - case MessageType.NEW_NEIGHBOR: - receiveNewNeighbor(receivedFrom); - break; - default: - LOGGER.log(Level.INFO, "{0} received unknown command from {1}: [{2}]{3}", new Object[]{name, receivedFrom.toString(), messageType, new String(buf.array())}); - } - } else { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - buf.clear(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } + byte messageType = buf.get(); - private boolean checkAck(SocketAddress receivedFrom, int ack_id) { - if (acks.containsKey(ack_id)) { - Ack theAck = acks.get(ack_id); - if (theAck.check(receivedFrom)) { - acks.remove(ack_id); - return true; - } - } - return false; - } + switch (messageType) { + case MessageType.INVITE: + receiveInvite(receivedFrom); + break; + case MessageType.ACK: + receiveAck(receivedFrom); + break; + case MessageType.LEAVE: + receiveLeave(receivedFrom); + break; + case MessageType.NEW_NEIGHBOR: + receiveNewNeighbor(receivedFrom); + break; + default: + LOGGER.log( + Level.INFO, + "{0} received unknown command from {1}: [{2}]{3}", + new Object[] { name, + receivedFrom.toString(), + messageType, + new String(buf.array()) }); + } + } else { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + buf.clear(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } - public void terminate() { - running = false;} - } + public void terminate() { + running = false; + } + } }