package node; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; import message.MessageType; import routingtable.BucketEntry; public class UDPHandler implements Runnable { private final static Logger LOGGER = Logger.getLogger(UDPHandler.class .getName()); public static final int BUF_SIZE = 512; private volatile boolean running = true; private ByteBuffer buffer = ByteBuffer.allocate(BUF_SIZE); private Node node; public UDPHandler(Node node) { this.node = node; } /** * Takes the buffer of this UDPHandler as is and tries to read an IP address * (4 bytes and 1 int) from it. If there is no/incomplete or wrong data, * this will fail. * * @return the address that has been read */ private InetSocketAddress getIPFromBuffer() { StringBuilder theAddr = new StringBuilder(); // Read 4 Bytes and 1 Integer = 1 IP address for (int i = 0; i < 4; i++) { theAddr.append(buffer.get()); if (i < 3) { theAddr.append("."); } } int port = buffer.getInt(); return new InetSocketAddress(theAddr.toString(), port); } private Identifier getIDFromBuffer() { int numBytes = Node.ID_BITS / 8; byte[] result = new byte[numBytes]; for (int i = 0; i < numBytes; i++) { result[i] = buffer.get(); } return new Identifier(result); } public void run() { InetSocketAddress from = null; // Run until I get killed, and all my acks have been answered while (running || node.hasAcks()) { try { from = (InetSocketAddress) node.getChannel().receive(buffer); // channel.receive() is non-blocking. So we need to check if // something actually has been written to the buffer if (buffer.remaining() != BUF_SIZE) { buffer.flip(); byte messageType = buffer.get(); Identifier fromID = getIDFromBuffer(); Identifier rpc_id = getIDFromBuffer(); switch (messageType) { case MessageType.FIND_NODE: receiveFindNode(from, fromID, rpc_id); break; case MessageType.NODES: receiveNodes(from, fromID, rpc_id); break; case MessageType.PING: LOGGER.log(Level.INFO, "Received [ping] from {0}", new Object[] { from.toString() }); break; default: LOGGER.log(Level.INFO, "Received unknown command from {0}: [{1}]{2}", new Object[] { from.toString(), messageType, new String(buffer.array()) }); } node.updateBuckets(fromID, from); } else { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } buffer.clear(); } catch (IOException e) { e.printStackTrace(); } } } private void receiveNodes(InetSocketAddress from, Identifier fromID, Identifier rpc_id) { Set readEntries = new HashSet(); while (buffer.hasRemaining()) { InetSocketAddress address = getIPFromBuffer(); Identifier id = getIDFromBuffer(); BucketEntry entry = new BucketEntry(id, address); readEntries.add(entry); // TODO: Just add all nodes?! node.updateBuckets(id, address); } LOGGER.log(Level.INFO, "Received {0} [NODES] from Node {1} ({2})", new Object[] { readEntries.size(), fromID, from.toString() }); } private void receiveFindNode(InetSocketAddress from, Identifier fromID, Identifier rpc_id) { Identifier idToFind = getIDFromBuffer(); LOGGER.log(Level.INFO, "Received [FIND_NODE {0}] from Node {1} ({2})", new Object[] { idToFind, fromID, from.toString() }); node.sendClosestNodesTo(from, fromID, idToFind, rpc_id); } public void terminate() { running = false; } }