From 92a467c478e1e61aaa3a8b29c85ed084887be94f Mon Sep 17 00:00:00 2001 From: Michael Scholz Date: Sun, 10 Feb 2013 15:05:22 +0100 Subject: [PATCH] update sendFile stuff --- ws2012/P2P/uebungen/11/src/node/Node.java | 5 +- .../P2P/uebungen/11/src/node/UDPHandler.java | 227 ++++++++++++++++++ 2 files changed, 231 insertions(+), 1 deletion(-) diff --git a/ws2012/P2P/uebungen/11/src/node/Node.java b/ws2012/P2P/uebungen/11/src/node/Node.java index c9cacc9a..44f5b9ba 100644 --- a/ws2012/P2P/uebungen/11/src/node/Node.java +++ b/ws2012/P2P/uebungen/11/src/node/Node.java @@ -414,6 +414,10 @@ public class Node { return send(n, MessageType.LEAVE, null, false, null); } + public void storeFile(File file){ + this.files.add(file); + } + private void sendFile(NodeIdentifier nodeID, File file){ @@ -450,7 +454,6 @@ public class Node { //send chunk - String data = ""; if(eof){ diff --git a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java index 0c71c475..9aed6d90 100644 --- a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java +++ b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java @@ -1,14 +1,20 @@ package node; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import message.MessageType; public class UDPHandler implements Runnable { +<<<<<<< HEAD private final static Logger LOGGER = Logger.getLogger(UDPHandler.class .getName()); @@ -217,6 +223,227 @@ public class UDPHandler implements Runnable { LOGGER.log(Level.INFO, "Received [DATA] [{0}] from Node {1})", new Object[] { data.toString(), fromID }); +======= + private final static Logger LOGGER = Logger.getLogger(UDPHandler.class.getName()); + + public static final int BUF_SIZE = 512; + + private volatile boolean running = true; + private ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); + + private Node node; + + private String[] tempData; + + + public UDPHandler(Node node) { + this.node = node; + } + + /** + * Takes the buffer of this UDPHandler as is and tries to read an IP address + * (4 bytes and 1 int) from it. If there is no/incomplete or wrong data, + * this will fail. + * + * @return the address that has been read + */ + private InetSocketAddress getIPFromBuffer() { + StringBuilder theAddr = new StringBuilder(); + // Read 4 Bytes and 1 Integer = 1 IP address + for (int i = 0; i < 4; i++) { + theAddr.append(buffer.get()); + if (i < 3) { + theAddr.append("."); + } + } + int port = buffer.getInt(); + return new InetSocketAddress(theAddr.toString(), port); + } + + private Identifier getIDFromBuffer() { + int numBytes = Node.ID_BITS / 8; + byte[] result = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + result[i] = buffer.get(); + } + return new Identifier(Node.ID_BITS, result); + } + + /** + * Reads a triple from the channel and returns a + * {@link node.NodeIdentifier}. + * + * @return the read node ID + */ + private NodeIdentifier getNodeTripleFromBuffer() { + InetSocketAddress address = getIPFromBuffer(); + + int numBytes = Node.ID_BITS / 8; + byte[] result = new byte[numBytes]; + for (int i = 0; i < numBytes; i++) { + result[i] = buffer.get(); + } + return new NodeIdentifier(Node.ID_BITS, result, address); + } + + public void run() { + InetSocketAddress from = null; + + // Run until it gets killed, and all my Acks have been answered + while (running || node.hasAcks()) { + try { + // Flag that indicates whether the routing table should be + // updated with the node we just received a message from. This + // needs to be done, because some messages trigger a direct + // answer. For example we send a PING to a node. That node + // answers with a PONG. Because we received a message from that + // node we will update our routing table and see that we already + // know this node. So we will PING that node... + boolean updateRT = true; + + // The address of the node that sent this message + from = (InetSocketAddress) node.getChannel().receive(buffer); + + // channel.receive() is non-blocking. So we need to check if + // something actually has been written to the buffer + if (buffer.remaining() != BUF_SIZE) { + buffer.flip(); + + byte messageType = buffer.get(); + + NodeIdentifier fromID = new NodeIdentifier(Node.ID_BITS, + getIDFromBuffer().getBytes(), from); + + Identifier rpcID = getIDFromBuffer(); + + switch (messageType) { + case MessageType.FIND_NODE: + receiveFindNode(fromID, rpcID); + break; + case MessageType.NODES: + receiveNodes(fromID, rpcID); + break; + case MessageType.PING: + updateRT = false; + receivePing(fromID, rpcID); + break; + case MessageType.PONG: + updateRT = false; + receivePong(fromID, rpcID); + break; + case MessageType.LEAVE: + // We don't have to do anything here because, after this + // switch block we call node.updateBuckets(...) which + // will try to ping the node we received this leave + // message from. That node will not answered because it + // directly shut down after sending the leave message. + // So the node will be removed from this routing table. + LOGGER.log(Level.INFO, "Received leave from {0}", + new Object[] { from.toString() }); + break; + case MessageType.FIND_VALUE: + receiveFindValue(fromID, rpcID); + break; + case MessageType.STORE: + receiveStore(fromID, rpcID); + break; + case MessageType.DATA: + receiveData(fromID, rpcID); + LOGGER.log(Level.INFO, "Received DATA from {0}", + new Object[] { from.toString() }); + break; + default: + LOGGER.log(Level.INFO, + "Received unknown command from {0}: [{1}]{2}", + new Object[] { from.toString(), messageType, + new String(buffer.array()) }); + } + + if (updateRT) { + node.updateBuckets(new NodeIdentifier(Node.ID_BITS, + fromID.getBytes(), from)); + } + + } else { + // If nothing has been read/received wait and read/receive + // again + try { + Thread.sleep(10); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + buffer.clear(); + + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + + + + private void receiveData(NodeIdentifier fromID, Identifier rpcID) { + + StringBuilder sb = new StringBuilder(); + + while (buffer.hasRemaining()) { + sb.append(buffer.get()); + } + + String data = sb.toString(); + + String parts[] = data.split("-"); + + String fileAndChunk[] = parts[0].split("|"); + + String fileID = fileAndChunk[0]; + int chunkID = Integer.parseInt(fileAndChunk[1]); + + data = parts[1]; + + if(data.charAt(data.length()) == '!'){ //last chunk + //file zusammensetzen und im zielnode speichern + File file = new File("fileID"); + FileWriter fw; + try { + + fw = new FileWriter(file); + for(int i = 0; i < tempData.length; i++){ + fw.write(tempData[i]); + } + fw.flush(); + + + //store file in node + node.storeFile(file); + + + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + + + }else{ + tempData[chunkID] = data; + } + + + + // This should be the answer to a prior FIND_NODE -> mark this RPC ID as + // received + node.receivedRPC(fromID, rpcID); + + LOGGER.log(Level.INFO, "Received [DATA] [{0}] from Node {1})", + new Object[] { data.toString(), fromID }); + + + +>>>>>>> update sendFile stuff } private void receivePong(NodeIdentifier fromID, Identifier rpcID) {