2012-12-04 14:14:38 +01:00

372 lines
9.8 KiB
Java

package node;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
import util.BufferUtil;
public class Node {
private final static Logger LOGGER = Logger.getLogger(Node.class.getName());
public static final int BUF_SIZE = 512;
private DatagramChannel channel;
private String name = "Not initialized";
private List<SocketAddress> neighbors = new ArrayList<SocketAddress>();
private Map<Integer, Ack> acks = new HashMap<Integer, Ack>();
/**
* Saves the neighbor of each node in the network
*/
private Map<String, List<String>> network = new HashMap<>();
private volatile Thread thread;
private UDPHandler udpListen;
private Random generator;
protected boolean receivedAckForLastInvite;
public Node() {
System.setProperty("java.net.preferIPv4Stack", "true");
generator = new Random(System.currentTimeMillis());
try {
channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress("localhost", 0));
channel.configureBlocking(false);
this.name = channel.socket().getLocalSocketAddress().toString();
network.put(getName(), new LinkedList<String>());
udpListen = new UDPHandler(this);
thread = new Thread(udpListen);
thread.start();
LOGGER.log(Level.INFO, "Initialized node {0}", name);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Create another peer, mutually link creator and spawn.
*
* @return the spawned Node
* @throws IOException
* if no connection could be established to the new node
*/
public Node spawn() throws IOException {
LOGGER.log(Level.FINE, "Name: " + getName() + ", Spawning new node.");
Node newNode = new Node();
addNeighbor(newNode.getAddress());
receivedAckForLastInvite = false;
sendInvite(newNode);
return newNode;
}
private boolean sendInvite(final Node newNode) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
Ack ack = generateAck(newNode.getAddress(), new OnAckReceive() {
@Override
public void onReceive() {
receivedAckForLastInvite = true;
}
});
buffer.put(MessageType.INVITE);
buffer.putInt(ack.getId());
buffer.flip();
ack.setBuf(BufferUtil.clone(buffer));
try {
channel.send(buffer, newNode.getAddress());
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* Adds a new ack, which this node is expecting to receive.
*
* @param addr
* the SocketAddress the ack should be received from
* @return the identifier for this ack
*/
private Ack generateAck(final SocketAddress addr, OnAckReceive callback) {
int ack_id = generator.nextInt();
while (acks.containsKey(ack_id)) {
ack_id = generator.nextInt();
}
Ack newAck = new Ack(ack_id, addr, channel, callback);
acks.put(ack_id, newAck);
return newAck;
}
public SocketAddress getAddress() {
return channel.socket().getLocalSocketAddress();
}
/**
* Sends an acknowledgment message to receiver (who hopefully is expecting
* it)
*
* @param receiver
* the node expecting an ack
* @param ack_id
* the id to identify the ack
*/
boolean sendAck(SocketAddress receiver, int ack_id) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
buffer.put(MessageType.ACK);
buffer.putInt(ack_id);
buffer.flip();
try {
channel.send(buffer, receiver);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* Shoot a <b>new</b> broadcast packet to the network.
*
* @param command
* the command of the broadcast packet (by now only *
* {@link node.MessageType#STATUS}
* @param data
* the (optional) data. Depends on the command
*/
private void sendBroadcast(byte command, byte[] data) {
// Random id?
int packet_id = generator.nextInt();
forwardBroadcast(packet_id, command, data, null);
}
protected void forwardBroadcast(int packet_id, byte command, byte[] data,
SocketAddress receivedFrom) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
buffer.put(MessageType.BROADCAST);
buffer.putInt(packet_id);
buffer.put(command);
buffer.put(data);
buffer.flip();
// Needed because the buffer gets cleared after first send, so we save
// the current buffer in a byte[]
int bytesWrittenToBuffer = buffer.position();
byte[] packet = new byte[bytesWrittenToBuffer];
System.arraycopy(buffer.array(), 0, packet, 0, bytesWrittenToBuffer);
for (SocketAddress n : neighbors) {
if (!n.equals(receivedFrom)) {
try {
channel.send(buffer, n);
buffer.clear();
buffer.put(packet);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* Send my status (that is all my neighbors) to a given node.
*
* @param reveiver
* the node to receive my info
*/
protected void sendStatus(InetSocketAddress reveiver) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
buffer.put(MessageType.STATUS);
for (SocketAddress n : neighbors) {
buffer.put(BufferUtil.addrToBytes((InetSocketAddress) n));
}
buffer.flip();
try {
channel.send(buffer, reveiver);
} catch (IOException e) {
e.printStackTrace();
}
}
public void gatherInformationOfNetwork() {
try {
byte[] myAddr = BufferUtil.addrToBytes(((InetSocketAddress) channel
.getLocalAddress()));
sendBroadcast(MessageType.STATUS, myAddr);
} catch (IOException e) {
}
}
protected void setNeighborsOfNode(String node, List<String> neighbors) {
network.put(node, neighbors);
}
protected boolean hasAck(int ack_id) {
return acks.containsKey(ack_id);
}
protected Ack getAck(int ack_id) {
return acks.get(ack_id);
}
protected Ack removeAck(int ack_id) {
return acks.remove(ack_id);
}
protected boolean addNeighbor(SocketAddress newNeighbor) {
if (!hasNeighbor(newNeighbor)) {
neighbors.add(newNeighbor);
network.get(getName()).add(newNeighbor.toString());
return true;
}
return false;
}
/**
* Remove the given address from my list of neighbors.
*
* @param node
* address to remove
* @return True if an address was removed, else false
*/
protected boolean removeNeighbor(SocketAddress node) {
SocketAddress removed = null;
int idToRemove = getNeighborId(node);
if (idToRemove != -1) {
removed = neighbors.remove(idToRemove);
network.get(getName()).remove(node.toString());
}
return removed != null;
}
/**
* This node circularly links all neighbors (no mesh!) and removes itself
* from the network.
*/
public void leave() {
if (!receivedAckForLastInvite || neighbors.isEmpty()) {
// This means the bootstrapping has not been finished. Either I have
// not been invited by a node, or the Node I invited didn't ack by
// now
return;
}
LOGGER.log(Level.INFO, "Name: {0}, Leaving...", getName());
for (int i = 0; i < neighbors.size(); i++) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
Ack ack = generateAck(neighbors.get(i), null);
buffer.put(MessageType.LEAVE);
buffer.putInt(ack.getId());
if (neighbors.size() > 2) {
int pred = ((i - 1) + neighbors.size()) % neighbors.size();
int succ = (i + 1) % neighbors.size();
buffer.put(BufferUtil.addrToBytes((InetSocketAddress) neighbors
.get(succ)));
buffer.put(BufferUtil.addrToBytes((InetSocketAddress) neighbors
.get(pred)));
} else if (neighbors.size() == 2) {
buffer.put(BufferUtil.addrToBytes((InetSocketAddress) neighbors
.get(Math.abs(i - 1))));
}
buffer.flip();
ack.setBuf(BufferUtil.clone(buffer));
try {
channel.send(buffer, neighbors.get(i));
} catch (IOException e) {
}
}
try {
if (thread != null) {
udpListen.terminate();
thread.join();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public boolean hasNeighbor(SocketAddress addr) {
for (SocketAddress n : neighbors) {
if (n.toString().equals(addr.toString())) {
return true;
}
}
return false;
}
public int getNeighborId(SocketAddress addr) {
for (int i = 0; i < neighbors.size(); i++) {
if (neighbors.get(i).toString().equals(addr.toString())) {
return i;
}
}
return -1;
}
public String getName() {
return this.name;
}
public DatagramChannel getChannel() {
return channel;
}
public String toString() {
StringBuilder result = new StringBuilder(256);
result.append("Node ");
result.append(getName()).append(", Neighbors: ");
result.append(neighbors);
return result.toString();
}
public Map<String, List<String>> getNetwork() {
return network;
}
public boolean hasAcks() {
return !acks.isEmpty();
}
}