From 36867f2813c7c9b2ff39d013208fa78f48bf98cb Mon Sep 17 00:00:00 2001 From: senft-desktop Date: Fri, 30 Nov 2012 16:32:49 +0100 Subject: [PATCH] Sending leave and new neighbors in just one single packet. --- .../P2P/uebungen/4/src/RandomGenerator2.java | 3 +- .../P2P/uebungen/4/src/node/MessageType.java | 1 - ws2012/P2P/uebungen/4/src/node/Node.java | 83 +++++++------------ .../P2P/uebungen/4/src/node/UDPHandler.java | 33 +++----- 4 files changed, 40 insertions(+), 80 deletions(-) diff --git a/ws2012/P2P/uebungen/4/src/RandomGenerator2.java b/ws2012/P2P/uebungen/4/src/RandomGenerator2.java index 5cf038f0..254c6c31 100644 --- a/ws2012/P2P/uebungen/4/src/RandomGenerator2.java +++ b/ws2012/P2P/uebungen/4/src/RandomGenerator2.java @@ -5,9 +5,9 @@ import java.util.List; import java.util.Random; import java.util.logging.LogManager; -import analysis.NetworkDumper; import node.Node; +import analysis.NetworkDumper; public class RandomGenerator2 { @@ -75,6 +75,7 @@ public class RandomGenerator2 { while ((randomNode = getRandomNode()) == firstNode) { // Dont kill first node } + nodes.remove(randomNode); randomNode.leave(); numKilled++; } diff --git a/ws2012/P2P/uebungen/4/src/node/MessageType.java b/ws2012/P2P/uebungen/4/src/node/MessageType.java index 20f55ab8..daa9ad27 100644 --- a/ws2012/P2P/uebungen/4/src/node/MessageType.java +++ b/ws2012/P2P/uebungen/4/src/node/MessageType.java @@ -3,7 +3,6 @@ package node; public class MessageType { public final static byte INVITE = 0; public final static byte LEAVE = 1; - public final static byte NEW_NEIGHBOR = 2; public final static byte ACK = 3; public final static byte BROADCAST = 4; public final static byte STATUS = 5; diff --git a/ws2012/P2P/uebungen/4/src/node/Node.java b/ws2012/P2P/uebungen/4/src/node/Node.java index f85fd85a..a2275e2a 100644 --- a/ws2012/P2P/uebungen/4/src/node/Node.java +++ b/ws2012/P2P/uebungen/4/src/node/Node.java @@ -70,7 +70,7 @@ public class Node { * if no connection could be established to the new node */ public Node spawn() throws IOException { - // LOGGER.info("Name: " + getName() + ", Spawning new node."); + LOGGER.log(Level.FINE, "Name: " + getName() + ", Spawning new node."); Node newNode = new Node(); sendInvite(newNode); @@ -119,28 +119,6 @@ public class Node { return channel.socket().getLocalSocketAddress(); } - private boolean sendNewNeighbor(SocketAddress receiver, - SocketAddress neighbor) { - ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); - Ack ack = generateAck(neighbor); - buffer.put(MessageType.NEW_NEIGHBOR); - buffer.putInt(ack.getId()); - - byte[] addr = BufferUtil.addrToBytes((InetSocketAddress) neighbor); - buffer.put(addr); - buffer.flip(); - - ack.setBuf(BufferUtil.clone(buffer)); - - try { - channel.send(buffer, receiver); - } catch (IOException e) { - e.printStackTrace(); - return false; - } - return true; - } - /** * Sends an acknowledgment message to receiver (who hopefully is expecting * it) @@ -165,24 +143,6 @@ public class Node { return true; } - private boolean sendLeave(SocketAddress neighbor) { - ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); - Ack ack = generateAck(neighbor); - buffer.put(MessageType.LEAVE); - buffer.putInt(ack.getId()); - buffer.flip(); - - ack.setBuf(BufferUtil.clone(buffer)); - - try { - channel.send(buffer, neighbor); - } catch (IOException e) { - e.printStackTrace(); - return false; - } - return true; - } - /** * Shoot a new broadcast packet to the network. * @@ -258,13 +218,10 @@ public class Node { .getLocalAddress())); sendBroadcast(MessageType.STATUS, myAddr); } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); } } - // TODO: something to delete nodes from the overall network protected void setNeighborsOfNode(String node, List neighbors) { network.put(node, neighbors); } @@ -316,25 +273,37 @@ public class Node { LOGGER.log(Level.INFO, "Name: {0}, Leaving...", getName()); for (int i = 0; i < neighbors.size(); i++) { + ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); + Ack ack = generateAck(neighbors.get(i)); + buffer.put(MessageType.LEAVE); + buffer.putInt(ack.getId()); + if (neighbors.size() > 2) { - int pred = ((i - 1) + neighbors.size()) % neighbors.size(); + + int pred = ((i - 1) + neighbors.size()) % neighbors.size(); int succ = (i + 1) % neighbors.size(); - sendNewNeighbor(neighbors.get(i), neighbors.get(succ)); - sendNewNeighbor(neighbors.get(i), neighbors.get(pred)); + + buffer.put(BufferUtil.addrToBytes((InetSocketAddress) neighbors + .get(succ))); + buffer.put(BufferUtil.addrToBytes((InetSocketAddress) neighbors + .get(pred))); + } else if (neighbors.size() == 2) { - sendNewNeighbor(neighbors.get(i), - neighbors.get(Math.abs(i - 1))); + buffer.put(BufferUtil.addrToBytes((InetSocketAddress) neighbors + .get(Math.abs(i - 1)))); } - sendLeave(neighbors.get(i)); + buffer.flip(); + + ack.setBuf(BufferUtil.clone(buffer)); + + try { + channel.send(buffer, neighbors.get(i)); + } catch (IOException e) { + } } try { - // Wait until all we received all acks - while (!acks.isEmpty()) { - Thread.sleep(10); - } - if (thread != null) { udpListen.terminate(); thread.join(); @@ -382,4 +351,8 @@ public class Node { return network; } + public boolean hasAcks() { + return !acks.isEmpty(); + } + } diff --git a/ws2012/P2P/uebungen/4/src/node/UDPHandler.java b/ws2012/P2P/uebungen/4/src/node/UDPHandler.java index 637f28a7..4aacbebe 100644 --- a/ws2012/P2P/uebungen/4/src/node/UDPHandler.java +++ b/ws2012/P2P/uebungen/4/src/node/UDPHandler.java @@ -60,16 +60,21 @@ public class UDPHandler implements Runnable { } private void receiveLeave(SocketAddress from) { + int ack_id = buf.getInt(); + if (node.removeNeighbor(from)) { - int ack_id = buf.getInt(); + // Read all new neighbors that node gives to us only the node was + // one of my neighbors. + while (buf.hasRemaining()) { + InetSocketAddress newNeighbor = readIPFromBuffer(); + node.addNeighbor(newNeighbor); + } LOGGER.log(Level.INFO, "{0}: {1} is leaving. Sending ack #{2}", new Object[] { node.getName(), from.toString(), ack_id }); node.sendAck(from, ack_id); } - // If we don't know that neighbor, we don't have to - // ack } private void receiveBroadcast(SocketAddress from) { @@ -141,22 +146,6 @@ public class UDPHandler implements Runnable { return new InetSocketAddress(theAddr.toString(), port); } - private void receiveNewNeighbor(SocketAddress from) { - int ack_id = buf.getInt(); - - InetSocketAddress newNeighbor = readIPFromBuffer(); - - node.addNeighbor(newNeighbor); - - LOGGER.log( - Level.INFO, - "{0}: from {1} received new neighbor:{2}. Sending ack #{3}", - new Object[] { node.getName(), from.toString(), - newNeighbor.toString(), ack_id }); - - node.sendAck(from, ack_id); - } - private boolean isNewBroadcast(int id) { boolean isNew = true; if (broadcastPackets.containsKey(id)) { @@ -171,7 +160,8 @@ public class UDPHandler implements Runnable { public void run() { SocketAddress receivedFrom = null; - while (running) { + // Run until I get killed, and all my acks have been answered + while (running || node.hasAcks()) { try { receivedFrom = node.getChannel().receive(buf); @@ -192,9 +182,6 @@ public class UDPHandler implements Runnable { case MessageType.LEAVE: receiveLeave(receivedFrom); break; - case MessageType.NEW_NEIGHBOR: - receiveNewNeighbor(receivedFrom); - break; case MessageType.BROADCAST: receiveBroadcast(receivedFrom); break;