Added callback for ack messages

This commit is contained in:
senft-desktop 2012-11-30 17:43:12 +01:00
parent 36867f2813
commit 2d9fad0b8d
5 changed files with 41 additions and 16 deletions

View File

@ -5,7 +5,6 @@ import java.util.List;
import java.util.Random;
import java.util.logging.LogManager;
import node.Node;
import analysis.NetworkDumper;

View File

@ -23,10 +23,14 @@ public class Ack {
// The channel to re-send the message on
private DatagramChannel channel;
public Ack(int id, SocketAddress address, DatagramChannel channel) {
OnAckReceive callback;
public Ack(int id, SocketAddress address, DatagramChannel channel,
OnAckReceive onReceive) {
this.id = id;
this.address = address;
this.channel = channel;
this.callback = onReceive;
startThread();
}
@ -54,6 +58,10 @@ public class Ack {
}
public void setReceived() {
if (callback != null) {
callback.onReceive();
}
// Stop thread
try {
if (thread != null) {
@ -61,7 +69,6 @@ public class Ack {
thread.join();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

View File

@ -40,6 +40,8 @@ public class Node {
private Random generator;
protected boolean receivedAckForLastInvite;
public Node() {
System.setProperty("java.net.preferIPv4Stack", "true");
generator = new Random(System.currentTimeMillis());
@ -72,16 +74,23 @@ public class Node {
public Node spawn() throws IOException {
LOGGER.log(Level.FINE, "Name: " + getName() + ", Spawning new node.");
Node newNode = new Node();
sendInvite(newNode);
addNeighbor(newNode.getAddress());
receivedAckForLastInvite = false;
sendInvite(newNode);
return newNode;
}
private boolean sendInvite(Node newNode) {
private boolean sendInvite(final Node newNode) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
Ack ack = generateAck(newNode.getAddress());
Ack ack = generateAck(newNode.getAddress(), new OnAckReceive() {
@Override
public void onReceive() {
receivedAckForLastInvite = true;
}
});
buffer.put(MessageType.INVITE);
buffer.putInt(ack.getId());
buffer.flip();
@ -104,12 +113,12 @@ public class Node {
* the SocketAddress the ack should be received from
* @return the identifier for this ack
*/
private Ack generateAck(final SocketAddress addr) {
private Ack generateAck(final SocketAddress addr, OnAckReceive callback) {
int ack_id = generator.nextInt();
while (acks.containsKey(ack_id)) {
ack_id = generator.nextInt();
}
Ack newAck = new Ack(ack_id, addr, channel);
Ack newAck = new Ack(ack_id, addr, channel, callback);
acks.put(ack_id, newAck);
return newAck;
@ -270,13 +279,19 @@ public class Node {
* from the network.
*/
public void leave() {
if (!receivedAckForLastInvite || neighbors.isEmpty()) {
// This means the bootstrapping has not been finished. Either I have
// not been invited by a node, or the Node I invited didn't ack by
// now
return;
}
LOGGER.log(Level.INFO, "Name: {0}, Leaving...", getName());
for (int i = 0; i < neighbors.size(); i++) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
Ack ack = generateAck(neighbors.get(i));
buffer.put(MessageType.LEAVE);
buffer.putInt(ack.getId());
for (int i = 0; i < neighbors.size(); i++) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
Ack ack = generateAck(neighbors.get(i), null);
buffer.put(MessageType.LEAVE);
buffer.putInt(ack.getId());
if (neighbors.size() > 2) {

View File

@ -0,0 +1,5 @@
package node;
public interface OnAckReceive {
public void onReceive();
}

View File

@ -72,9 +72,8 @@ public class UDPHandler implements Runnable {
LOGGER.log(Level.INFO, "{0}: {1} is leaving. Sending ack #{2}",
new Object[] { node.getName(), from.toString(), ack_id });
node.sendAck(from, ack_id);
}
node.sendAck(from, ack_id);
}
private void receiveBroadcast(SocketAddress from) {