basic transfere and chunking

problem with file ids -> data is not found Node:236
Can send request only to root node! Node:229
Data is not in right order! UdpHandler:254
Data is recieved, but Acks do lot of shit still!
do following:
root: store 123 abcdefghijklmnop
(client2: lookup 123)
(root: lookup 123)
clients2: request 123
This commit is contained in:
Ulf Gebhardt 2013-02-11 01:49:49 +01:00
parent d0880f7038
commit 554e8eff4c
7 changed files with 101 additions and 131 deletions

View File

@ -7,6 +7,7 @@ import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.MappedByteBuffer;
import java.util.logging.LogManager;
import node.FileIdentifier;
import node.Identifier;
import node.Node;
@ -31,42 +32,42 @@ public class CLI {
while ((s = in.readLine()) != null && s.length() != 0) {
String[] splitted = s.split(" ");
String cmd = splitted[0];
String nodeID = splitted[1];
String cmd = splitted[0];
switch (cmd) {
//status
case "status":
for (NodeIdentifier id : node.getNeighbors()) {
System.out.println(id);
}
break;
//lookup fileID
case "lookup":
String fileID = splitted[1];
// TODO not implemented
// Zum testen:
Identifier fileIDToFind = new Identifier(8, Identifier.getStaticIdentifier(8).getBytes());
Identifier fileIDToFind = new Identifier(8, fileID.getBytes());
node.findValue(fileIDToFind);
break;
case "data":
File file = new File("/Users/Michael/Uni/allgemeiner Git/college/ws2012/P2P/uebungen/11/testfile.txt");
FileWriter fw = new FileWriter(file, true);
fw.write("ichbininhalteinesfiles");
fw.flush();
fw.close();
//node.storeFile(file);
NodeIdentifier nodeIdenti = new NodeIdentifier(8,
nodeID.getBytes(), new InetSocketAddress(
"127.0.0.1", 50000));
node.sendFile(nodeIdenti, file);
//request fileID
case "request":
String fileID3 = splitted[1];
FileIdentifier fileIDToFind2 = new FileIdentifier(8, fileID3.getBytes());
node.sendDataReq(fileIDToFind2);
break;
//leave
case "leave":
node.leave();
break;
//store fileID data
case "store":
String fileID2 = splitted[1];
String data = splitted[2];
// TODO not implemented
// Zum testen:
Identifier fileIDToStore = new Identifier(8, Identifier.getStaticIdentifier(8).getBytes());
FileIdentifier fileIDToStore = new FileIdentifier(8, fileID2.getBytes());
node.store(fileIDToStore);
node.storeData(fileIDToStore,data);
break;
default:
System.out.println("Unknown command.");

View File

@ -110,7 +110,9 @@ public class Ack {
"Didn't receive RPC Ack {0} by now. Resending... ",
new Object[] { rpcId });
LOGGER.log(Level.INFO, receiver.getAddress().toString());
channel.send(buffer, receiver.getAddress());
if(!receiver.getAddress().toString().equals("/0.0.0.0:0")){
//TODO
channel.send(buffer, receiver.getAddress());}
} catch (IOException e) {
e.printStackTrace();
}

View File

@ -4,15 +4,16 @@ public class MessageType {
public final static byte FIND_NODE = 0;
public final static byte NODES = 1;
public final static byte PING = 10;
public final static byte PONG = 11;
public final static byte PING = 11;
public final static byte PONG = 12;
public final static byte LEAVE = 2;
public final static byte FIND_VALUE = 4;
public final static byte STORE = 5;
public final static byte DATA = 6;
public final static byte VALUE_NODES = 7;
public final static byte FOUND_VALUE = 8;
public final static byte ACK = 9;
public final static byte DATA_REQ = 7;
public final static byte VALUE_NODES = 8;
public final static byte FOUND_VALUE = 9;
public final static byte ACK = 10;
}

View File

@ -7,10 +7,10 @@ public class FileIdentifier extends Identifier {
private String fileID;
public FileIdentifier(int size, byte[] bytes, String fileID) {
super(size, bytes);
public FileIdentifier(int size, byte[] fileID) {
super(size, fileID);
//calculate SHA-256 Hash of key
/*//calculate SHA-256 Hash of key
try {
MessageDigest md = MessageDigest.getInstance("SHA-256");
md.update(fileID.getBytes());
@ -18,7 +18,7 @@ public class FileIdentifier extends Identifier {
} catch (NoSuchAlgorithmException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}*/
}

View File

@ -43,6 +43,7 @@ public class Node {
private static final Identifier INITIAL_ID = Identifier
.getStaticIdentifier(ID_BITS);
private static final int BUFFER_SIZE = 512;
private static final int CHUNK_SIZE = 4;
/**
* The size of an IP address (in bytes)
*/
@ -62,7 +63,7 @@ public class Node {
private Identifier nodeID = Identifier.getRandomIdentifier(ID_BITS);
private IRoutingTable routingTable = new RoutingTableImpl(BUCKET_SIZE, this);
private ArrayList<File> files = new ArrayList<File>();
private Map<Identifier, String> data = new HashMap<Identifier, String>();;
public Node() {
System.setProperty("java.net.preferIPv4Stack", "true");
@ -224,11 +225,31 @@ public class Node {
send(receiver, MessageType.ACK, rpcID, null, false, null);
}
public void sendData(NodeIdentifier receiver, FileIdentifier fileID,
ChunkIdentifier chunckID) {
public void sendDataReq(FileIdentifier fileID){
//TODO
String id = "128";
NodeIdentifier receiver = new NodeIdentifier(8, id.getBytes(), new InetSocketAddress("localhost", INITIAL_PORT));
send(receiver, MessageType.DATA_REQ, fileID.getBytes(), true, null);
}
// TODO: implement
public void sendData(NodeIdentifier receiver, FileIdentifier fileID) {
String data = "abcdefghijklmnop"; //TODO this shit does not work for some reason // this.data.get(fileID);
//String data = this.data.get(fileID);
if(data == null){
//TODO We dont have that data. -> DOES NOT WORK PROPERLY!
new Exception().printStackTrace();
return;
}
int CHUNK_COUNT = data.length()/CHUNK_SIZE;
for(int i = 0; i<CHUNK_COUNT; i++){
String chunk = fileID.toString() + "-" +
CHUNK_COUNT + "-" +
i + "-" +
data.substring(i*CHUNK_SIZE, (i+1)*CHUNK_SIZE);
send(receiver, MessageType.DATA, chunk.getBytes(), true, null);
}
}
public void sendPing(NodeIdentifier receiver, MessageCallback cb) {
@ -432,71 +453,15 @@ public class Node {
return send(n, MessageType.LEAVE, null, false, null);
}
public void storeFile(File file) {
files.add(file);
LOGGER.log(Level.INFO, "Stored FILE [{0}])",
new Object[] { file.getPath()});
public void storeData(Identifier id, String data) {
this.data.put(id, data);
LOGGER.log(Level.INFO, "Stored Data [{0}] as [{1}])",
new Object[] { data, id});
}
public void sendFile(NodeIdentifier nodeID, File file) {
// calculate chunk size = BUFFER - ID_BITS - fileID - eofChar
int CHUNK_SIZE = BUFFER_SIZE - ID_BITS - 10 - 1;
int FILE_SIZE = (int) file.length();
boolean eof = false;
int NUMBER_OF_CHUNKS = -1;
byte[] temp = null;
int totalBytesRead = 0;
Identifier fileID = new Identifier(8, file.getName().getBytes());
try {
InputStream inStream = new BufferedInputStream(new FileInputStream(
file));
while (totalBytesRead < FILE_SIZE) {
int bytesReamaining = FILE_SIZE - totalBytesRead;
if (bytesReamaining < CHUNK_SIZE) {
CHUNK_SIZE = bytesReamaining + 1;
eof = true;
}
temp = new byte[CHUNK_SIZE];
int bytesRead = inStream.read(temp, 0, CHUNK_SIZE);
if (bytesRead > 0) {
totalBytesRead += bytesRead;
NUMBER_OF_CHUNKS++;
}
// send chunk
String data = "";
if (eof) {
data = fileID.toString() + "-" + NUMBER_OF_CHUNKS + "-"
+ temp.toString() + "!";
} else {
data = fileID.toString() + "-" + NUMBER_OF_CHUNKS + "-"
+ temp.toString();
}
send(nodeID, MessageType.DATA, data.getBytes(), true, null);
}
inStream.close();
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

View File

@ -7,6 +7,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -23,8 +25,8 @@ public class UDPHandler implements Runnable {
private Node node;
private ArrayList<String> tempData = new ArrayList<String>();
Map<FileIdentifier, ArrayList<String>> chunklist = new HashMap<FileIdentifier, ArrayList<String>>();
public UDPHandler(Node node) {
this.node = node;
}
@ -44,7 +46,7 @@ public class UDPHandler implements Runnable {
if (i < 3) {
theAddr.append(".");
}
}
}
int port = buffer.getInt();
return new InetSocketAddress(theAddr.toString(), port);
}
@ -71,7 +73,8 @@ public class UDPHandler implements Runnable {
byte[] result = new byte[numBytes];
for (int i = 0; i < numBytes; i++) {
result[i] = buffer.get();
}
}
LOGGER.log(Level.INFO,"Read Buffer id: "+Node.ID_BITS+" result: "+result+" addr: "+address);
return new NodeIdentifier(Node.ID_BITS, result, address);
}
@ -147,6 +150,11 @@ public class UDPHandler implements Runnable {
LOGGER.log(Level.INFO, "Received DATA from {0}",
new Object[] { from.toString() });
break;
case MessageType.DATA_REQ:
receiveDataReq(fromID,rpcID);
LOGGER.log(Level.INFO, "Received DATA_REQ from {0}",
new Object[] { from.toString() });
break;
case MessageType.ACK:
receiveAck(fromID, rpcID);
break;
@ -206,7 +214,7 @@ public class UDPHandler implements Runnable {
StringBuilder nodes = new StringBuilder();
while (buffer.hasRemaining()) {
NodeIdentifier newID = getNodeTripleFromBuffer();
NodeIdentifier newID = getNodeTripleFromBuffer();
node.sendFindValue(newID, node.getSearchID());
nodes.append(newID).append(", ");
numReceived++;
@ -223,46 +231,32 @@ public class UDPHandler implements Runnable {
}
private void receiveData(NodeIdentifier fromID, Identifier rpcID) {
String data = new String(buffer.array());
String data = new String(buffer.array());
String parts[] = data.split("-");
String fileID = parts[0];
int chunkID = Integer.parseInt(parts[1]);
int chunkCount = Integer.parseInt(parts[1]);
int chunkID = Integer.parseInt(parts[2]);
String chunkContent = parts[3];
LOGGER.log(Level.INFO,"recieved Chunk file: "+fileID+" count: "+chunkCount+" id: "+chunkID);
String chunkContent = parts[2];
FileIdentifier fid = new FileIdentifier(8,fileID.getBytes());
if(chunklist.get(fid) == null){
chunklist.put(fid, new ArrayList<String>());
}
chunklist.get(fid).add(chunkContent);
tempData.add(chunkContent);
if (chunkContent.charAt(chunkContent.length()-1) == '!' || true) { // last chunk
// store file in node
//TODO: how to store chunks as FILE?
//tempData);
//PrintWriter out = new PrintWriter(new FileWriter(file));
//out.print(tempData);
BufferedWriter bw;
try {
File file = File.createTempFile(fileID, ".tmp");
bw = new BufferedWriter(new FileWriter(file));
for (int i = 0; i < tempData.size(); i++) {
bw.write(tempData.get(i));
}
bw.close();
node.storeFile(file);
} catch (IOException ex) {
ex.printStackTrace();
}
}
if(chunklist.get(fid).size() >= chunkCount){
LOGGER.log(Level.INFO,"FILE complete file: "+fileID+" count: "+chunkCount+" id: "+chunkID);
String file = "";
for(int i=0; i<chunklist.get(fid).size();i++){
//TODO Reihenfolge!!!
file += chunklist.get(fid).get(i);
}
node.storeData(fid, file);
chunklist.remove(fid);
}
// This should be the answer to a prior FIND_NODE -> mark this RPC ID as
// received
@ -273,6 +267,13 @@ public class UDPHandler implements Runnable {
}
private void receiveDataReq(NodeIdentifier fromID, Identifier rpcID) {
FileIdentifier fid = new FileIdentifier(8, buffer.array());
node.sendData(fromID, fid);
//TODO ?
node.receivedRPC(fromID, rpcID);
}
private void receivePong(NodeIdentifier fromID, Identifier rpcID) {
LOGGER.log(Level.INFO, "Received [PONG] from {0}",
new Object[] { fromID });

View File

@ -1 +1 @@
ichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfiles
ichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfilesichbininhalteinesfiles