Added acks

This commit is contained in:
senft-lap 2012-11-27 15:50:50 +01:00
parent 1eb1055dcf
commit cc035d910a
2 changed files with 77 additions and 8 deletions

View File

@ -1,11 +1,8 @@
package node;
public class MessageType {
public final static byte INVITE = 7;
public final static byte INVITE = 0;
public final static byte LEAVE = 1;
public final static byte NEW_NEIGHBOR = 4;
public final static byte PING = 2;
public final static byte PONG = 3;
public final static byte NEW_NEIGHBOR = 2;
public final static byte ACK = 3;
}

View File

@ -6,9 +6,14 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.logging.Logger;
import util.BufferUtil;
public class Node {
private final static Logger LOGGER = Logger.getLogger(Node.class.getName());
@ -22,11 +27,16 @@ public class Node {
private List<SocketAddress> neighbors = new ArrayList<SocketAddress>();
private Map<Integer, Ack> acks = new HashMap<Integer, Ack>();
private volatile Thread thread;
private UDPListen udpListen;
private Random generator;
public Node() {
System.setProperty("java.net.preferIPv4Stack", "true");
generator = new Random(System.currentTimeMillis());
try {
channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress("localhost", 0));
@ -57,8 +67,10 @@ public class Node {
// LOGGER.info("Name: " + getName() + ", Spawning new node.");
Node newNode = new Node();
int ack_id = generateAck(MessageType.INVITE, newNode.getAddress());
buf.clear();
buf.put(MessageType.INVITE);
buf.putInt(ack_id);
buf.flip();
try {
@ -72,6 +84,12 @@ public class Node {
return newNode;
}
public int generateAck(byte type, SocketAddress addr) {
int ack_id = generator.nextInt();
acks.put(ack_id, new Ack(addr));
return ack_id;
}
public SocketAddress getAddress() {
return channel.socket().getLocalSocketAddress();
}
@ -81,11 +99,16 @@ public class Node {
InetSocketAddress a = (InetSocketAddress) addr;
buf.put(MessageType.NEW_NEIGHBOR);
int ack_id = generateAck(MessageType.NEW_NEIGHBOR, addr);
buf.putInt(ack_id);
for (String part : a.getHostString().split("\\.")) {
buf.put(Byte.valueOf(part));
}
buf.putInt(a.getPort());
// acks.get(ack_id).setBuf(BufferUtil.clone(buf));
buf.flip();
}
@ -116,7 +139,9 @@ public class Node {
}
buf.clear();
int ack_id = generateAck(MessageType.LEAVE, neighbors.get(i));
buf.put(MessageType.LEAVE);
buf.putInt(ack_id);
buf.flip();
try {
@ -163,6 +188,7 @@ public class Node {
while (running) {
SocketAddress receivedFrom = null;
int ack_id;
try {
receivedFrom = channel.receive(buf);
@ -176,9 +202,40 @@ public class Node {
case MessageType.INVITE:
LOGGER.info(name + " received invite from "
+ receivedFrom.toString());
ack_id = buf.getInt();
buf.clear();
buf.put(MessageType.ACK);
buf.putInt(ack_id);
buf.flip();
channel.send(buf, receivedFrom);
neighbors.add(receivedFrom);
break;
case MessageType.ACK:
ack_id = buf.getInt();
LOGGER.info(name + " received ack from "
+ receivedFrom.toString());
if (acks.containsKey(ack_id)) {
Ack theAck = acks.get(ack_id);
if (theAck.check(receivedFrom)) {
acks.remove(theAck);
} else {
LOGGER.info("Received unexpected ack from "
+ receivedFrom.toString());
}
} else {
LOGGER.info("Received unexpected ack from "
+ receivedFrom.toString());
}
break;
case MessageType.LEAVE:
LOGGER.info(name + ": " + receivedFrom.toString()
+ " is leaving. Deleting...");
@ -191,9 +248,17 @@ public class Node {
neighbors.remove(i);
}
}
ack_id = buf.getInt();
buf.clear();
buf.put(MessageType.ACK);
buf.putInt(ack_id);
buf.flip();
channel.send(buf, receivedFrom);
break;
case MessageType.NEW_NEIGHBOR:
ack_id = buf.getInt();
StringBuilder theAddr = new StringBuilder();
// Read 4 Bytes and 1 int
@ -217,13 +282,20 @@ public class Node {
+ new_neighbor.toString());
}
buf.clear();
buf.put(MessageType.ACK);
buf.putInt(ack_id);
buf.flip();
channel.send(buf, receivedFrom);
break;
default:
LOGGER.info(name
+ " received unknown command from "
+ receivedFrom.toString() + ": "
+ messageType + new String(buf.array()));
+ receivedFrom.toString() + ": ["
+ messageType + "]"
+ new String(buf.array()));
}
} else {
try {