235 lines
5.2 KiB
Java

package node;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
public class Node {
private final static Logger LOGGER = Logger.getLogger(Node.class.getName());
private static final int BUF_SIZE = 512;
private DatagramChannel channel;
private ByteBuffer buf;
private String name = "Not initialized";
private List<SocketAddress> neighbors = new ArrayList<SocketAddress>();
private volatile Thread thread;
private UDPListen udpListen;
public Node() {
System.setProperty("java.net.preferIPv4Stack", "true");
try {
channel = DatagramChannel.open();
channel.socket().bind(new InetSocketAddress("localhost", 0));
channel.configureBlocking(false);
buf = ByteBuffer.allocate(BUF_SIZE);
this.name = channel.socket().getLocalSocketAddress().toString();
udpListen = new UDPListen();
thread = new Thread(udpListen);
thread.start();
LOGGER.info("Initialized node " + name);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Create another peer, mutually link creator and spawn.
*
* @return
* @throws IOException
*/
public Node spawn() throws IOException {
LOGGER.info("Name: " + getName() + ", Spawning new node.");
// create a new node
Node newNode = new Node();
buf.clear();
buf.put(MessageType.INVITE);
buf.flip();
try {
channel.send(buf, newNode.getAddress());
} catch (IOException e) {
e.printStackTrace();
}
neighbors.add(newNode.getAddress());
return newNode;
}
public SocketAddress getAddress() {
return channel.socket().getLocalSocketAddress();
}
private void putAddrInBuf(ByteBuffer buf, SocketAddress addr) {
buf.clear();
InetSocketAddress a = (InetSocketAddress) addr;
buf.put(MessageType.NEW_NEIGHBOR);
for (String part : a.getHostString().split("\\.")) {
buf.put(Byte.valueOf(part));
}
buf.putInt(a.getPort());
buf.flip();
}
/**
* Circularly link all neighbors, remove itself form all neighbors and exit.
*/
public void leave() {
LOGGER.info("Name: " + getName()
+ ", Leaving... Announcing to my neighbors: " + neighbors);
// loop over each neighbor i
for (int i = 0; i < neighbors.size(); i++) {
for (int j = 0; j < neighbors.size(); j++) {
if(i != j){
try {
// send all neighbors j to neighbor i
putAddrInBuf(buf, neighbors.get(j));
channel.send(buf, neighbors.get(i));
} catch (IOException e) {
e.printStackTrace();
}
}
}
// send LEAVE to neighbor i
buf.clear();
buf.put(MessageType.LEAVE);
buf.flip();
try {
channel.send(buf, neighbors.get(i));
} catch (IOException e) {
e.printStackTrace();
}
}
// Destroy thread
try {
if (thread != null) {
udpListen.terminate();
thread.join();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public boolean hasNeighbor(SocketAddress adr) {
for (SocketAddress n : neighbors) {
if (n.toString().equals(adr.toString())) {
return true;
}
}
return false;
}
public String getName() {
return this.name;
}
public class UDPListen implements Runnable {
private volatile boolean running = true;
public void run() {
while (running) {
SocketAddress receivedFrom = null;
try {
receivedFrom = channel.receive(buf);
if (buf.remaining() != 512) {
buf.flip();
byte messageType = buf.get();
switch (messageType) {
case MessageType.INVITE:
LOGGER.info(name + " received invite from "
+ receivedFrom.toString());
neighbors.add(receivedFrom);
break;
case MessageType.LEAVE:
LOGGER.info(name + ": " + receivedFrom.toString()
+ " is leaving. Deleting...");
// search the neighbor in the list and remove him.
for (int i = 0; i < neighbors.size(); i++) {
if (((InetSocketAddress) neighbors.get(i))
.getPort() == ((InetSocketAddress) receivedFrom)
.getPort()) {
neighbors.remove(i);
}
}
break;
case MessageType.NEW_NEIGHBOR:
StringBuilder theAddr = new StringBuilder();
// Read 4 Bytes and 1 int
for (int i = 0; i < 4; i++) {
theAddr.append(buf.get());
if (i < 3)
theAddr.append(".");
}
int port = buf.getInt();
InetSocketAddress new_neighbor = new InetSocketAddress(
theAddr.toString(), port);
// check, if we have the neighbor already.
if (!hasNeighbor(new_neighbor)) {
neighbors.add(new_neighbor);
}
LOGGER.info(name + " from "
+ receivedFrom.toString()
+ " received new neighbor:"
+ new_neighbor.toString());
break;
default:
LOGGER.info(name
+ " received unknown command from "
+ receivedFrom.toString() + ": "
+ messageType + new String(buf.array()));
}
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
buf.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void terminate() {
running = false;
}
}
}