336 lines
11 KiB
Java

package node;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Node {
private final static Logger LOGGER = Logger.getLogger(Node.class.getName());
private static final int BUF_SIZE = 512;
private DatagramChannel channel;
private ByteBuffer buf;
private String name = "Not initialized";
private List<SocketAddress> neighbors = new ArrayList<>();
private Map<Integer, Ack> acks = new HashMap<>();
private volatile Thread thread;
private UDPListen udpListen;
private Random generator;
public Node() {
//debug
System.setProperty("java.net.preferIPv4Stack", "true");
generator = new Random(System.currentTimeMillis());
try {
channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress("localhost", 0));
channel.configureBlocking(false);
buf = ByteBuffer.allocate(BUF_SIZE);
this.name = channel.socket().getLocalSocketAddress().toString();
udpListen = new UDPListen();
thread = new Thread(udpListen);
LOGGER.log(Level.INFO, "Initialized node {0}", name);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Create another peer, mutually link creator and spawn.
*
* @return the spawned Node
* @throws IOException
* if no connection could be established to the new node
*/
public Node spawn() throws IOException {
// LOGGER.info("Name: " + getName() + ", Spawning new node.");
Node newNode = new Node();
Send_Invite(newNode);
neighbors.add(newNode.getAddress());
return newNode;
}
public boolean Send_Invite(Node newNode){
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
int ack_id = generateAck(newNode.getAddress());
buffer.put(MessageType.INVITE);
buffer.putInt(ack_id);
buffer.flip();
try {
channel.send(buffer, newNode.getAddress());
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* Adds a new ack, which this node is expecting to receive.
*
* @param addr
* the SocketAddress the ack should be received from
* @return the identifier for this ack
*/
private int generateAck(SocketAddress addr) {
int ack_id = generator.nextInt();
acks.put(ack_id, new Ack(addr));
return ack_id;
}
public SocketAddress getAddress() {
return channel.socket().getLocalSocketAddress();}
public boolean Send_NewNeighbor(SocketAddress receiver, SocketAddress neighbor){
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
int ack_id = generateAck(neighbor);
buffer.put(MessageType.NEW_NEIGHBOR);
buffer.putInt(ack_id);
InetSocketAddress a = (InetSocketAddress) neighbor;
for (String part : a.getHostString().split("\\.")) {
buffer.put(Byte.valueOf(part));}
buffer.putInt(a.getPort());
buffer.flip();
try {
channel.send(buffer, receiver);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* Sends an acknowledgment message to receiver (who hopefully is expecting
* it)
*
* @param receiver
* the node expecting an ack
* @param ack_id
* the id to identify the ack
*/
private boolean Send_Ack(SocketAddress receiver, int ack_id) {
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
buffer.put(MessageType.ACK);
buffer.putInt(ack_id);
buffer.flip();
try {
channel.send(buffer, receiver);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
private boolean Send_Leave(SocketAddress neighbor){
ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE);
int ack_id = generateAck(neighbor);
buffer.put(MessageType.LEAVE);
buffer.putInt(ack_id);
buffer.flip();
try {
channel.send(buffer, neighbor);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* This node circularly links all neighbors (no mesh!) and removes itself
* from the network.
*/
public void leave() {
LOGGER.log(Level.INFO, "Name: {0}, Leaving...", getName());
for (int i = 0; i < neighbors.size(); i++) {
if (neighbors.size() > 2) {
int pred = ((i - 1) + neighbors.size()) % neighbors.size();
int succ = (i + 1) % neighbors.size();
Send_NewNeighbor(neighbors.get(i), neighbors.get(succ));
Send_NewNeighbor(neighbors.get(i), neighbors.get(pred));
} else if (neighbors.size() == 2) {
Send_NewNeighbor(neighbors.get(i), neighbors.get(Math.abs(i - 1)));
}
Send_Leave(neighbors.get(i));
}
try {
if (thread != null) {
udpListen.terminate();
thread.join();}
} catch (InterruptedException e) {
e.printStackTrace();}
}
public boolean hasNeighbor(SocketAddress addr) {
for (SocketAddress n : neighbors) {
if (n.toString().equals(addr.toString())) {
return true;
}
}
return false;
}
public int getNeighborId(SocketAddress addr) {
for (int i = 0; i < neighbors.size(); i++) {
if (neighbors.get(i).toString().equals(addr.toString())) {
return i;
}
}
return -1;
}
public String getName() {
return this.name;}
public String toString() {
StringBuilder result = new StringBuilder(256);
result.append("Node ");
result.append(getName()).append(", Neighbors: ");
result.append(neighbors);
return result.toString();
}
private class UDPListen implements Runnable {
private volatile boolean running = true;
private void Receive_Invite(SocketAddress from){
LOGGER.log(Level.INFO, "{0} received invite from {1}", new Object[]{name, from.toString()});
int ack_id = buf.getInt();
Send_Ack(from, ack_id);
neighbors.add(from);
}
private void Receive_Ack(SocketAddress from){
LOGGER.log(Level.INFO, "{0} received ack from {1}", new Object[]{name, from.toString()});
int ack_id = buf.getInt();
if (!checkAck(from, ack_id)) {
LOGGER.log(Level.WARNING, "Received unexpected ack from: {0}", from.toString());}
}
private void Receive_Leave(SocketAddress from){
LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...", new Object[]{name, from.toString()});
int idToRemove = getNeighborId(from);
if (idToRemove != -1) {
neighbors.remove(idToRemove);
int ack_id = buf.getInt();
Send_Ack(from, ack_id);
}
// If we don't know that neighbor, we don't have to
// ack
}
private void Receive_NewNeighbor(SocketAddress from){
int ack_id = buf.getInt();
StringBuilder theAddr = new StringBuilder();
// Read 4 Bytes and 1 Integer = 1 IP address
for (int i = 0; i < 4; i++) {
theAddr.append(buf.get());
if (i < 3){
theAddr.append(".");}
}
int port = buf.getInt();
InetSocketAddress new_neighbor = new InetSocketAddress(theAddr.toString(), port);
if (!hasNeighbor(new_neighbor)) {
// Add this neighbor to my neighbor list if it
// was not present before
neighbors.add(new_neighbor);
LOGGER.log(Level.INFO, "{0} from {1} received new neighbor:{2}", new Object[]{name, from.toString(), new_neighbor.toString()});
}
Send_Ack(from, ack_id);
}
public void run() {
SocketAddress receivedFrom = null;
while (running) {
try {
receivedFrom = channel.receive(buf);
// channel.receive() is non blocking. So we need to check if
// something actually has been written to the buffer
if (buf.remaining() != BUF_SIZE) {
buf.flip();
byte messageType = buf.get();
switch (messageType) {
case MessageType.INVITE:
Receive_Invite(receivedFrom);
break;
case MessageType.ACK:
Receive_Ack(receivedFrom);
break;
case MessageType.LEAVE:
Receive_Leave(receivedFrom);
break;
case MessageType.NEW_NEIGHBOR:
Receive_NewNeighbor(receivedFrom);
break;
default:
LOGGER.log(Level.INFO, "{0} received unknown command from {1}: [{2}]{3}", new Object[]{name, receivedFrom.toString(), messageType, new String(buf.array())});
}
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
buf.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private boolean checkAck(SocketAddress receivedFrom, int ack_id) {
if (acks.containsKey(ack_id)) {
Ack theAck = acks.get(ack_id);
if (theAck.check(receivedFrom)) {
acks.remove(ack_id);
return true;
}
}
return false;
}
public void terminate() {
running = false;}
}
}