From 92a467c478e1e61aaa3a8b29c85ed084887be94f Mon Sep 17 00:00:00 2001 From: Michael Scholz Date: Sun, 10 Feb 2013 15:05:22 +0100 Subject: [PATCH 1/3] update sendFile stuff --- ws2012/P2P/uebungen/11/src/node/Node.java | 5 +- .../P2P/uebungen/11/src/node/UDPHandler.java | 227 ++++++++++++++++++ 2 files changed, 231 insertions(+), 1 deletion(-) diff --git a/ws2012/P2P/uebungen/11/src/node/Node.java b/ws2012/P2P/uebungen/11/src/node/Node.java index c9cacc9a..44f5b9ba 100644 --- a/ws2012/P2P/uebungen/11/src/node/Node.java +++ b/ws2012/P2P/uebungen/11/src/node/Node.java @@ -414,6 +414,10 @@ public class Node { return send(n, MessageType.LEAVE, null, false, null); } + public void storeFile(File file){ + this.files.add(file); + } + private void sendFile(NodeIdentifier nodeID, File file){ @@ -450,7 +454,6 @@ public class Node { //send chunk - String data = ""; if(eof){ diff --git a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java index 0c71c475..9aed6d90 100644 --- a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java +++ b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java @@ -1,14 +1,20 @@ package node; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import message.MessageType; public class UDPHandler implements Runnable { +<<<<<<< HEAD private final static Logger LOGGER = Logger.getLogger(UDPHandler.class .getName()); @@ -217,6 +223,227 @@ public class UDPHandler implements Runnable { LOGGER.log(Level.INFO, "Received [DATA] [{0}] from Node {1})", new Object[] { data.toString(), fromID }); +======= + private final static Logger LOGGER = Logger.getLogger(UDPHandler.class.getName()); + + public static final int BUF_SIZE = 512; + + private volatile boolean running = true; + private ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); + + private Node node; + + private String[] tempData; + + + public UDPHandler(Node node) { + this.node = node; + } + + /** + * Takes the buffer of this UDPHandler as is and tries to read an IP address + * (4 bytes and 1 int) from it. If there is no/incomplete or wrong data, + * this will fail. + * + * @return the address that has been read + */ + private InetSocketAddress getIPFromBuffer() { + StringBuilder theAddr = new StringBuilder(); + // Read 4 Bytes and 1 Integer = 1 IP address + for (int i = 0; i < 4; i++) { + theAddr.append(buffer.get()); + if (i < 3) { + theAddr.append("."); + } + } + int port = buffer.getInt(); + return new InetSocketAddress(theAddr.toString(), port); + } + + private Identifier getIDFromBuffer() { + int numBytes = Node.ID_BITS / 8; + byte[] result = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + result[i] = buffer.get(); + } + return new Identifier(Node.ID_BITS, result); + } + + /** + * Reads a triple from the channel and returns a + * {@link node.NodeIdentifier}. + * + * @return the read node ID + */ + private NodeIdentifier getNodeTripleFromBuffer() { + InetSocketAddress address = getIPFromBuffer(); + + int numBytes = Node.ID_BITS / 8; + byte[] result = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + result[i] = buffer.get(); + } + return new NodeIdentifier(Node.ID_BITS, result, address); + } + + public void run() { + InetSocketAddress from = null; + + // Run until it gets killed, and all my Acks have been answered + while (running || node.hasAcks()) { + try { + // Flag that indicates whether the routing table should be + // updated with the node we just received a message from. This + // needs to be done, because some messages trigger a direct + // answer. For example we send a PING to a node. That node + // answers with a PONG. Because we received a message from that + // node we will update our routing table and see that we already + // know this node. So we will PING that node... + boolean updateRT = true; + + // The address of the node that sent this message + from = (InetSocketAddress) node.getChannel().receive(buffer); + + // channel.receive() is non-blocking. So we need to check if + // something actually has been written to the buffer + if (buffer.remaining() != BUF_SIZE) { + buffer.flip(); + + byte messageType = buffer.get(); + + NodeIdentifier fromID = new NodeIdentifier(Node.ID_BITS, + getIDFromBuffer().getBytes(), from); + + Identifier rpcID = getIDFromBuffer(); + + switch (messageType) { + case MessageType.FIND_NODE: + receiveFindNode(fromID, rpcID); + break; + case MessageType.NODES: + receiveNodes(fromID, rpcID); + break; + case MessageType.PING: + updateRT = false; + receivePing(fromID, rpcID); + break; + case MessageType.PONG: + updateRT = false; + receivePong(fromID, rpcID); + break; + case MessageType.LEAVE: + // We don't have to do anything here because, after this + // switch block we call node.updateBuckets(...) which + // will try to ping the node we received this leave + // message from. That node will not answered because it + // directly shut down after sending the leave message. + // So the node will be removed from this routing table. + LOGGER.log(Level.INFO, "Received leave from {0}", + new Object[] { from.toString() }); + break; + case MessageType.FIND_VALUE: + receiveFindValue(fromID, rpcID); + break; + case MessageType.STORE: + receiveStore(fromID, rpcID); + break; + case MessageType.DATA: + receiveData(fromID, rpcID); + LOGGER.log(Level.INFO, "Received DATA from {0}", + new Object[] { from.toString() }); + break; + default: + LOGGER.log(Level.INFO, + "Received unknown command from {0}: [{1}]{2}", + new Object[] { from.toString(), messageType, + new String(buffer.array()) }); + } + + if (updateRT) { + node.updateBuckets(new NodeIdentifier(Node.ID_BITS, + fromID.getBytes(), from)); + } + + } else { + // If nothing has been read/received wait and read/receive + // again + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + buffer.clear(); + + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + + + + private void receiveData(NodeIdentifier fromID, Identifier rpcID) { + + StringBuilder sb = new StringBuilder(); + + while (buffer.hasRemaining()) { + sb.append(buffer.get()); + } + + String data = sb.toString(); + + String parts[] = data.split("-"); + + String fileAndChunk[] = parts[0].split("|"); + + String fileID = fileAndChunk[0]; + int chunkID = Integer.parseInt(fileAndChunk[1]); + + data = parts[1]; + + if(data.charAt(data.length()) == '!'){ //last chunk + //file zusammensetzen und im zielnode speichern + File file = new File("fileID"); + FileWriter fw; + try { + + fw = new FileWriter(file); + for(int i = 0; i < tempData.length; i++){ + fw.write(tempData[i]); + } + fw.flush(); + + + //store file in node + node.storeFile(file); + + + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + + + }else{ + tempData[chunkID] = data; + } + + + + // This should be the answer to a prior FIND_NODE -> mark this RPC ID as + // received + node.receivedRPC(fromID, rpcID); + + LOGGER.log(Level.INFO, "Received [DATA] [{0}] from Node {1})", + new Object[] { data.toString(), fromID }); + + + +>>>>>>> update sendFile stuff } private void receivePong(NodeIdentifier fromID, Identifier rpcID) { From 3166b7726adf5fbfb64ef6bbfb2f0ee139c08674 Mon Sep 17 00:00:00 2001 From: Michael Scholz Date: Sun, 10 Feb 2013 15:13:15 +0100 Subject: [PATCH 2/3] Revert "update sendFile stuff" This reverts commit 92a467c478e1e61aaa3a8b29c85ed084887be94f. --- ws2012/P2P/uebungen/11/src/node/Node.java | 5 +- .../P2P/uebungen/11/src/node/UDPHandler.java | 227 ------------------ 2 files changed, 1 insertion(+), 231 deletions(-) diff --git a/ws2012/P2P/uebungen/11/src/node/Node.java b/ws2012/P2P/uebungen/11/src/node/Node.java index 44f5b9ba..c9cacc9a 100644 --- a/ws2012/P2P/uebungen/11/src/node/Node.java +++ b/ws2012/P2P/uebungen/11/src/node/Node.java @@ -414,10 +414,6 @@ public class Node { return send(n, MessageType.LEAVE, null, false, null); } - public void storeFile(File file){ - this.files.add(file); - } - private void sendFile(NodeIdentifier nodeID, File file){ @@ -454,6 +450,7 @@ public class Node { //send chunk + String data = ""; if(eof){ diff --git a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java index 9aed6d90..0c71c475 100644 --- a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java +++ b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java @@ -1,20 +1,14 @@ package node; -import java.io.BufferedOutputStream; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import message.MessageType; public class UDPHandler implements Runnable { -<<<<<<< HEAD private final static Logger LOGGER = Logger.getLogger(UDPHandler.class .getName()); @@ -223,227 +217,6 @@ public class UDPHandler implements Runnable { LOGGER.log(Level.INFO, "Received [DATA] [{0}] from Node {1})", new Object[] { data.toString(), fromID }); -======= - private final static Logger LOGGER = Logger.getLogger(UDPHandler.class.getName()); - - public static final int BUF_SIZE = 512; - - private volatile boolean running = true; - private ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); - - private Node node; - - private String[] tempData; - - - public UDPHandler(Node node) { - this.node = node; - } - - /** - * Takes the buffer of this UDPHandler as is and tries to read an IP address - * (4 bytes and 1 int) from it. If there is no/incomplete or wrong data, - * this will fail. - * - * @return the address that has been read - */ - private InetSocketAddress getIPFromBuffer() { - StringBuilder theAddr = new StringBuilder(); - // Read 4 Bytes and 1 Integer = 1 IP address - for (int i = 0; i < 4; i++) { - theAddr.append(buffer.get()); - if (i < 3) { - theAddr.append("."); - } - } - int port = buffer.getInt(); - return new InetSocketAddress(theAddr.toString(), port); - } - - private Identifier getIDFromBuffer() { - int numBytes = Node.ID_BITS / 8; - byte[] result = new byte[numBytes]; - for (int i = 0; i < numBytes; i++) { - result[i] = buffer.get(); - } - return new Identifier(Node.ID_BITS, result); - } - - /** - * Reads a triple from the channel and returns a - * {@link node.NodeIdentifier}. - * - * @return the read node ID - */ - private NodeIdentifier getNodeTripleFromBuffer() { - InetSocketAddress address = getIPFromBuffer(); - - int numBytes = Node.ID_BITS / 8; - byte[] result = new byte[numBytes]; - for (int i = 0; i < numBytes; i++) { - result[i] = buffer.get(); - } - return new NodeIdentifier(Node.ID_BITS, result, address); - } - - public void run() { - InetSocketAddress from = null; - - // Run until it gets killed, and all my Acks have been answered - while (running || node.hasAcks()) { - try { - // Flag that indicates whether the routing table should be - // updated with the node we just received a message from. This - // needs to be done, because some messages trigger a direct - // answer. For example we send a PING to a node. That node - // answers with a PONG. Because we received a message from that - // node we will update our routing table and see that we already - // know this node. So we will PING that node... - boolean updateRT = true; - - // The address of the node that sent this message - from = (InetSocketAddress) node.getChannel().receive(buffer); - - // channel.receive() is non-blocking. So we need to check if - // something actually has been written to the buffer - if (buffer.remaining() != BUF_SIZE) { - buffer.flip(); - - byte messageType = buffer.get(); - - NodeIdentifier fromID = new NodeIdentifier(Node.ID_BITS, - getIDFromBuffer().getBytes(), from); - - Identifier rpcID = getIDFromBuffer(); - - switch (messageType) { - case MessageType.FIND_NODE: - receiveFindNode(fromID, rpcID); - break; - case MessageType.NODES: - receiveNodes(fromID, rpcID); - break; - case MessageType.PING: - updateRT = false; - receivePing(fromID, rpcID); - break; - case MessageType.PONG: - updateRT = false; - receivePong(fromID, rpcID); - break; - case MessageType.LEAVE: - // We don't have to do anything here because, after this - // switch block we call node.updateBuckets(...) which - // will try to ping the node we received this leave - // message from. That node will not answered because it - // directly shut down after sending the leave message. - // So the node will be removed from this routing table. - LOGGER.log(Level.INFO, "Received leave from {0}", - new Object[] { from.toString() }); - break; - case MessageType.FIND_VALUE: - receiveFindValue(fromID, rpcID); - break; - case MessageType.STORE: - receiveStore(fromID, rpcID); - break; - case MessageType.DATA: - receiveData(fromID, rpcID); - LOGGER.log(Level.INFO, "Received DATA from {0}", - new Object[] { from.toString() }); - break; - default: - LOGGER.log(Level.INFO, - "Received unknown command from {0}: [{1}]{2}", - new Object[] { from.toString(), messageType, - new String(buffer.array()) }); - } - - if (updateRT) { - node.updateBuckets(new NodeIdentifier(Node.ID_BITS, - fromID.getBytes(), from)); - } - - } else { - // If nothing has been read/received wait and read/receive - // again - try { - Thread.sleep(10); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - buffer.clear(); - - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - - - - private void receiveData(NodeIdentifier fromID, Identifier rpcID) { - - StringBuilder sb = new StringBuilder(); - - while (buffer.hasRemaining()) { - sb.append(buffer.get()); - } - - String data = sb.toString(); - - String parts[] = data.split("-"); - - String fileAndChunk[] = parts[0].split("|"); - - String fileID = fileAndChunk[0]; - int chunkID = Integer.parseInt(fileAndChunk[1]); - - data = parts[1]; - - if(data.charAt(data.length()) == '!'){ //last chunk - //file zusammensetzen und im zielnode speichern - File file = new File("fileID"); - FileWriter fw; - try { - - fw = new FileWriter(file); - for(int i = 0; i < tempData.length; i++){ - fw.write(tempData[i]); - } - fw.flush(); - - - //store file in node - node.storeFile(file); - - - } catch (IOException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - - - }else{ - tempData[chunkID] = data; - } - - - - // This should be the answer to a prior FIND_NODE -> mark this RPC ID as - // received - node.receivedRPC(fromID, rpcID); - - LOGGER.log(Level.INFO, "Received [DATA] [{0}] from Node {1})", - new Object[] { data.toString(), fromID }); - - - ->>>>>>> update sendFile stuff } private void receivePong(NodeIdentifier fromID, Identifier rpcID) { From 99713dbd17ec3dd49ed55a2fc92f7aef91e054e7 Mon Sep 17 00:00:00 2001 From: Michael Scholz Date: Sun, 10 Feb 2013 15:14:37 +0100 Subject: [PATCH 3/3] merge --- ws2012/P2P/uebungen/11/src/node/Node.java | 4 + .../P2P/uebungen/11/src/node/UDPHandler.java | 73 +++++++++++++++---- 2 files changed, 63 insertions(+), 14 deletions(-) diff --git a/ws2012/P2P/uebungen/11/src/node/Node.java b/ws2012/P2P/uebungen/11/src/node/Node.java index c9cacc9a..6c78aa46 100644 --- a/ws2012/P2P/uebungen/11/src/node/Node.java +++ b/ws2012/P2P/uebungen/11/src/node/Node.java @@ -414,6 +414,10 @@ public class Node { return send(n, MessageType.LEAVE, null, false, null); } + public void storeFile(File file){ + files.add(file); + } + private void sendFile(NodeIdentifier nodeID, File file){ diff --git a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java index 0c71c475..b69363a0 100644 --- a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java +++ b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java @@ -1,5 +1,7 @@ package node; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -18,6 +20,9 @@ public class UDPHandler implements Runnable { private ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); private Node node; + + + private String[] tempData; public UDPHandler(Node node) { this.node = node; @@ -200,25 +205,65 @@ public class UDPHandler implements Runnable { private void receiveData(NodeIdentifier fromID, Identifier rpcID) { - StringBuilder sb = new StringBuilder(); + StringBuilder sb = new StringBuilder(); - while (buffer.hasRemaining()) { - sb.append(buffer.get()); - } + while (buffer.hasRemaining()) { + sb.append(buffer.get()); + } + + String data = sb.toString(); + + String parts[] = data.split("-"); + + String fileAndChunk[] = parts[0].split("|"); + + String fileID = fileAndChunk[0]; + int chunkID = Integer.parseInt(fileAndChunk[1]); + + data = parts[1]; + + if(data.charAt(data.length()) == '!'){ //last chunk + //file zusammensetzen und im zielnode speichern + File file = new File("fileID"); + FileWriter fw; + try { + + fw = new FileWriter(file); + for(int i = 0; i < tempData.length; i++){ + fw.write(tempData[i]); + } + fw.flush(); + + + //store file in node + node.storeFile(file); + + + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + + + }else{ + tempData[chunkID] = data; + } + + - String data = sb.toString(); - - String parts[] = data.split("-"); - - // This should be the answer to a prior FIND_NODE -> mark this RPC ID as - // received - node.receivedRPC(fromID, rpcID); - - LOGGER.log(Level.INFO, "Received [DATA] [{0}] from Node {1})", - new Object[] { data.toString(), fromID }); + // This should be the answer to a prior FIND_NODE -> mark this RPC ID as + // received + node.receivedRPC(fromID, rpcID); + LOGGER.log(Level.INFO, "Received [DATA] [{0}] from Node {1})", + new Object[] { data.toString(), fromID }); + + } + + private void receivePong(NodeIdentifier fromID, Identifier rpcID) { LOGGER.log(Level.INFO, "Received [PONG] from {0}", new Object[] { fromID });