diff --git a/ws2012/P2P/uebungen/4/src/node/MessageType.java b/ws2012/P2P/uebungen/4/src/node/MessageType.java index 8637a0bf..ea6fe7e1 100644 --- a/ws2012/P2P/uebungen/4/src/node/MessageType.java +++ b/ws2012/P2P/uebungen/4/src/node/MessageType.java @@ -1,11 +1,8 @@ package node; public class MessageType { - public final static byte INVITE = 7; + public final static byte INVITE = 0; public final static byte LEAVE = 1; - - public final static byte NEW_NEIGHBOR = 4; - - public final static byte PING = 2; - public final static byte PONG = 3; + public final static byte NEW_NEIGHBOR = 2; + public final static byte ACK = 3; } diff --git a/ws2012/P2P/uebungen/4/src/node/Node.java b/ws2012/P2P/uebungen/4/src/node/Node.java index 046b397a..f84f34af 100644 --- a/ws2012/P2P/uebungen/4/src/node/Node.java +++ b/ws2012/P2P/uebungen/4/src/node/Node.java @@ -6,9 +6,14 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.DatagramChannel; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.logging.Logger; +import util.BufferUtil; + public class Node { private final static Logger LOGGER = Logger.getLogger(Node.class.getName()); @@ -22,11 +27,16 @@ public class Node { private List neighbors = new ArrayList(); + private Map acks = new HashMap(); + private volatile Thread thread; private UDPListen udpListen; + private Random generator; + public Node() { System.setProperty("java.net.preferIPv4Stack", "true"); + generator = new Random(System.currentTimeMillis()); try { channel = DatagramChannel.open(); channel.socket().bind(new InetSocketAddress("localhost", 0)); @@ -57,8 +67,10 @@ public class Node { // LOGGER.info("Name: " + getName() + ", Spawning new node."); Node newNode = new Node(); + int ack_id = generateAck(MessageType.INVITE, newNode.getAddress()); buf.clear(); buf.put(MessageType.INVITE); + buf.putInt(ack_id); buf.flip(); try { @@ -72,6 +84,12 @@ public class Node { return newNode; } + public int generateAck(byte type, SocketAddress addr) { + int ack_id = generator.nextInt(); + acks.put(ack_id, new Ack(addr)); + return ack_id; + } + public SocketAddress getAddress() { return channel.socket().getLocalSocketAddress(); } @@ -81,11 +99,16 @@ public class Node { InetSocketAddress a = (InetSocketAddress) addr; buf.put(MessageType.NEW_NEIGHBOR); + int ack_id = generateAck(MessageType.NEW_NEIGHBOR, addr); + buf.putInt(ack_id); for (String part : a.getHostString().split("\\.")) { buf.put(Byte.valueOf(part)); } buf.putInt(a.getPort()); + + // acks.get(ack_id).setBuf(BufferUtil.clone(buf)); + buf.flip(); } @@ -116,7 +139,9 @@ public class Node { } buf.clear(); + int ack_id = generateAck(MessageType.LEAVE, neighbors.get(i)); buf.put(MessageType.LEAVE); + buf.putInt(ack_id); buf.flip(); try { @@ -163,6 +188,7 @@ public class Node { while (running) { SocketAddress receivedFrom = null; + int ack_id; try { receivedFrom = channel.receive(buf); @@ -176,9 +202,40 @@ public class Node { case MessageType.INVITE: LOGGER.info(name + " received invite from " + receivedFrom.toString()); + + ack_id = buf.getInt(); + + buf.clear(); + buf.put(MessageType.ACK); + buf.putInt(ack_id); + buf.flip(); + channel.send(buf, receivedFrom); + neighbors.add(receivedFrom); break; + case MessageType.ACK: + ack_id = buf.getInt(); + + LOGGER.info(name + " received ack from " + + receivedFrom.toString()); + + if (acks.containsKey(ack_id)) { + Ack theAck = acks.get(ack_id); + if (theAck.check(receivedFrom)) { + acks.remove(theAck); + + } else { + LOGGER.info("Received unexpected ack from " + + receivedFrom.toString()); + } + } else { + LOGGER.info("Received unexpected ack from " + + receivedFrom.toString()); + } + + break; + case MessageType.LEAVE: LOGGER.info(name + ": " + receivedFrom.toString() + " is leaving. Deleting..."); @@ -191,9 +248,17 @@ public class Node { neighbors.remove(i); } } + ack_id = buf.getInt(); + + buf.clear(); + buf.put(MessageType.ACK); + buf.putInt(ack_id); + buf.flip(); + channel.send(buf, receivedFrom); break; case MessageType.NEW_NEIGHBOR: + ack_id = buf.getInt(); StringBuilder theAddr = new StringBuilder(); // Read 4 Bytes and 1 int @@ -217,13 +282,20 @@ public class Node { + new_neighbor.toString()); } + buf.clear(); + buf.put(MessageType.ACK); + buf.putInt(ack_id); + buf.flip(); + channel.send(buf, receivedFrom); + break; default: LOGGER.info(name + " received unknown command from " - + receivedFrom.toString() + ": " - + messageType + new String(buf.array())); + + receivedFrom.toString() + ": [" + + messageType + "]" + + new String(buf.array())); } } else { try {