store and find should no work without any problems

This commit is contained in:
Denis 2013-02-10 18:27:38 +01:00
parent 72408bb79b
commit 42781fd21d
3 changed files with 62 additions and 40 deletions

View File

@ -14,4 +14,5 @@ public class MessageType {
public final static byte DATA = 6;
public final static byte VALUE_NODES = 7;
public final static byte FOUND_VALUE = 8;
public final static byte ACK = 9;
}

View File

@ -150,13 +150,14 @@ public class Node {
}
}
void sendFoundValue(NodeIdentifier receiver, Identifier idToFind, Identifier rpcID) {
boolean successful = send(receiver, MessageType.FOUND_VALUE, rpcID, values
.get(idToFind).getBytes(), true, null);
void sendFoundValue(NodeIdentifier receiver, Identifier idToFind,
Identifier rpcID) {
boolean successful = send(receiver, MessageType.FOUND_VALUE, rpcID,
values.get(idToFind).getBytes(), false, null);
if (successful) {
LOGGER.log(Level.INFO, "Sending [FOUND_VALUE {0}] to node {1}",
new Object[] { values.get(idToFind), receiver });
LOGGER.log(Level.INFO, "Sending [FOUND_VALUE {0} -> {1}] to node {2}",
new Object[] { idToFind, values.get(idToFind), receiver });
}
}
@ -218,6 +219,10 @@ public class Node {
}
}
public void sendAck(NodeIdentifier receiver, Identifier rpcID) {
send(receiver, MessageType.ACK, rpcID, null, false, null);
}
public void sendData(NodeIdentifier receiver, FileIdentifier fileID,
ChunkIdentifier chunckID) {
@ -387,16 +392,19 @@ public class Node {
public boolean receivedRPC(NodeIdentifier fromID, Identifier rpcID) {
List<Ack> rpcsFromID = rpcs.get(rpcID);
boolean removedAck = false;
// wohl unschön, hier auf != null zu prüfen, da Fehler wo anders ist.
if (rpcsFromID != null) {
for (Ack ack : rpcsFromID) {
if (ack.check(fromID)) {
ack.setReceived();
rpcsFromID.remove(ack);
removedAck = true;
for (Ack ack : rpcsFromID) {
if (ack.check(fromID)) {
ack.setReceived();
rpcsFromID.remove(ack);
removedAck = true;
LOGGER.log(Level.FINEST, "Received RPC ack " + rpcID);
LOGGER.log(Level.FINEST, "Received RPC ack " + rpcID);
break;
break;
}
}
}
@ -419,34 +427,35 @@ public class Node {
private boolean sendLeave(NodeIdentifier n) {
return send(n, MessageType.LEAVE, null, false, null);
}
public void storeFile(File file){
files.add(file);
}
public void storeFile(File file) {
files.add(file);
}
private void sendFile(NodeIdentifier nodeID, File file) {
//calculate chunk size
int CHUNK_SIZE = BUFFER_SIZE - 15;
int FILE_SIZE = (int)file.length();
boolean eof = false;
int NUMBER_OF_CHUNKS = 0;
byte[] temp = null;
int totalBytesRead = 0;
Identifier fileID = new Identifier(10, file.getName().getBytes());
try {
InputStream inStream = new BufferedInputStream(new FileInputStream(file));
while(totalBytesRead < FILE_SIZE){
int bytesReamaining = FILE_SIZE-totalBytesRead;
if(bytesReamaining < CHUNK_SIZE){
CHUNK_SIZE = bytesReamaining+1;
// calculate chunk size
int CHUNK_SIZE = BUFFER_SIZE - 15;
int FILE_SIZE = (int) file.length();
boolean eof = false;
int NUMBER_OF_CHUNKS = 0;
byte[] temp = null;
int totalBytesRead = 0;
Identifier fileID = new Identifier(10, file.getName().getBytes());
try {
InputStream inStream = new BufferedInputStream(new FileInputStream(
file));
while (totalBytesRead < FILE_SIZE) {
int bytesReamaining = FILE_SIZE - totalBytesRead;
if (bytesReamaining < CHUNK_SIZE) {
CHUNK_SIZE = bytesReamaining + 1;
eof = true;
}
temp = new byte[CHUNK_SIZE];

View File

@ -145,6 +145,9 @@ public class UDPHandler implements Runnable {
LOGGER.log(Level.INFO, "Received DATA from {0}",
new Object[] { from.toString() });
break;
case MessageType.ACK:
receiveAck(fromID, rpcID);
break;
default:
LOGGER.log(Level.INFO,
"Received unknown command from {0}: [{1}]{2}",
@ -175,7 +178,14 @@ public class UDPHandler implements Runnable {
}
}
private void receiveAck(NodeIdentifier fromID, Identifier rpcID) {
// This should be the either answer to a prior STORE or FOUND_VALUE ->
// mark this RPC ID as received
node.receivedRPC(fromID, rpcID);
}
private void receiveFoundValue(NodeIdentifier fromID, Identifier rpcID) {
Identifier idToFind = getIDFromBuffer();
// TODO Auto-generated method stub
// Node kontaktieren, damit Datei gesendet werden kann.
@ -183,8 +193,8 @@ public class UDPHandler implements Runnable {
// as received
node.receivedRPC(fromID, rpcID);
LOGGER.log(Level.INFO, "Received [FOUND VALUE] from Node {0}",
new Object[] { fromID });
LOGGER.log(Level.INFO, "Received [FOUND VALUE on Node {0}] from Node {1}",
new Object[] { idToFind, fromID });
}
private void receiveValueNodes(NodeIdentifier fromID, Identifier rpcID) {
@ -315,6 +325,8 @@ public class UDPHandler implements Runnable {
new Object[] { fileID, fromID });
node.storePair(fileID, fromID);
node.sendAck(fromID, rpcID);
}
private void receiveFindValue(NodeIdentifier fromID, Identifier rpcID) {