diff --git a/ws2012/P2P/uebungen/11/src/message/MessageType.java b/ws2012/P2P/uebungen/11/src/message/MessageType.java index 31816d33..6d535b02 100644 --- a/ws2012/P2P/uebungen/11/src/message/MessageType.java +++ b/ws2012/P2P/uebungen/11/src/message/MessageType.java @@ -14,4 +14,5 @@ public class MessageType { public final static byte DATA = 6; public final static byte VALUE_NODES = 7; public final static byte FOUND_VALUE = 8; + public final static byte ACK = 9; } diff --git a/ws2012/P2P/uebungen/11/src/node/Node.java b/ws2012/P2P/uebungen/11/src/node/Node.java index 465cd917..9cfd8b00 100644 --- a/ws2012/P2P/uebungen/11/src/node/Node.java +++ b/ws2012/P2P/uebungen/11/src/node/Node.java @@ -150,13 +150,14 @@ public class Node { } } - void sendFoundValue(NodeIdentifier receiver, Identifier idToFind, Identifier rpcID) { - boolean successful = send(receiver, MessageType.FOUND_VALUE, rpcID, values - .get(idToFind).getBytes(), true, null); + void sendFoundValue(NodeIdentifier receiver, Identifier idToFind, + Identifier rpcID) { + boolean successful = send(receiver, MessageType.FOUND_VALUE, rpcID, + values.get(idToFind).getBytes(), false, null); if (successful) { - LOGGER.log(Level.INFO, "Sending [FOUND_VALUE {0}] to node {1}", - new Object[] { values.get(idToFind), receiver }); + LOGGER.log(Level.INFO, "Sending [FOUND_VALUE {0} -> {1}] to node {2}", + new Object[] { idToFind, values.get(idToFind), receiver }); } } @@ -218,6 +219,10 @@ public class Node { } } + public void sendAck(NodeIdentifier receiver, Identifier rpcID) { + send(receiver, MessageType.ACK, rpcID, null, false, null); + } + public void sendData(NodeIdentifier receiver, FileIdentifier fileID, ChunkIdentifier chunckID) { @@ -387,16 +392,19 @@ public class Node { public boolean receivedRPC(NodeIdentifier fromID, Identifier rpcID) { List rpcsFromID = rpcs.get(rpcID); boolean removedAck = false; + + // wohl unschön, hier auf != null zu prüfen, da Fehler wo anders ist. + if (rpcsFromID != null) { + for (Ack ack : rpcsFromID) { + if (ack.check(fromID)) { + ack.setReceived(); + rpcsFromID.remove(ack); + removedAck = true; - for (Ack ack : rpcsFromID) { - if (ack.check(fromID)) { - ack.setReceived(); - rpcsFromID.remove(ack); - removedAck = true; + LOGGER.log(Level.FINEST, "Received RPC ack " + rpcID); - LOGGER.log(Level.FINEST, "Received RPC ack " + rpcID); - - break; + break; + } } } @@ -419,34 +427,35 @@ public class Node { private boolean sendLeave(NodeIdentifier n) { return send(n, MessageType.LEAVE, null, false, null); } - - public void storeFile(File file){ - files.add(file); - } + + public void storeFile(File file) { + files.add(file); + } private void sendFile(NodeIdentifier nodeID, File file) { - //calculate chunk size - int CHUNK_SIZE = BUFFER_SIZE - 15; - - int FILE_SIZE = (int)file.length(); - - boolean eof = false; - - int NUMBER_OF_CHUNKS = 0; - byte[] temp = null; - - int totalBytesRead = 0; - - Identifier fileID = new Identifier(10, file.getName().getBytes()); - - try { - InputStream inStream = new BufferedInputStream(new FileInputStream(file)); - - while(totalBytesRead < FILE_SIZE){ - int bytesReamaining = FILE_SIZE-totalBytesRead; - if(bytesReamaining < CHUNK_SIZE){ - CHUNK_SIZE = bytesReamaining+1; + // calculate chunk size + int CHUNK_SIZE = BUFFER_SIZE - 15; + + int FILE_SIZE = (int) file.length(); + + boolean eof = false; + + int NUMBER_OF_CHUNKS = 0; + byte[] temp = null; + + int totalBytesRead = 0; + + Identifier fileID = new Identifier(10, file.getName().getBytes()); + + try { + InputStream inStream = new BufferedInputStream(new FileInputStream( + file)); + + while (totalBytesRead < FILE_SIZE) { + int bytesReamaining = FILE_SIZE - totalBytesRead; + if (bytesReamaining < CHUNK_SIZE) { + CHUNK_SIZE = bytesReamaining + 1; eof = true; } temp = new byte[CHUNK_SIZE]; diff --git a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java index 6ca668bb..6e8c1c4c 100644 --- a/ws2012/P2P/uebungen/11/src/node/UDPHandler.java +++ b/ws2012/P2P/uebungen/11/src/node/UDPHandler.java @@ -145,6 +145,9 @@ public class UDPHandler implements Runnable { LOGGER.log(Level.INFO, "Received DATA from {0}", new Object[] { from.toString() }); break; + case MessageType.ACK: + receiveAck(fromID, rpcID); + break; default: LOGGER.log(Level.INFO, "Received unknown command from {0}: [{1}]{2}", @@ -175,7 +178,14 @@ public class UDPHandler implements Runnable { } } + private void receiveAck(NodeIdentifier fromID, Identifier rpcID) { + // This should be the either answer to a prior STORE or FOUND_VALUE -> + // mark this RPC ID as received + node.receivedRPC(fromID, rpcID); + } + private void receiveFoundValue(NodeIdentifier fromID, Identifier rpcID) { + Identifier idToFind = getIDFromBuffer(); // TODO Auto-generated method stub // Node kontaktieren, damit Datei gesendet werden kann. @@ -183,8 +193,8 @@ public class UDPHandler implements Runnable { // as received node.receivedRPC(fromID, rpcID); - LOGGER.log(Level.INFO, "Received [FOUND VALUE] from Node {0}", - new Object[] { fromID }); + LOGGER.log(Level.INFO, "Received [FOUND VALUE on Node {0}] from Node {1}", + new Object[] { idToFind, fromID }); } private void receiveValueNodes(NodeIdentifier fromID, Identifier rpcID) { @@ -315,6 +325,8 @@ public class UDPHandler implements Runnable { new Object[] { fileID, fromID }); node.storePair(fileID, fromID); + + node.sendAck(fromID, rpcID); } private void receiveFindValue(NodeIdentifier fromID, Identifier rpcID) {