package node; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import util.BufferUtil; public class UDPHandler implements Runnable { private final static Logger LOGGER = Logger.getLogger(Node.class.getName()); public static final int BUF_SIZE = 512; // How long do we remember broadcast packets (ms)? private static final int BROADCAST_TIMEOUT = 10000; private Map broadcastPackets = new HashMap<>(); private volatile boolean running = true; private ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE); private Node node; public UDPHandler(Node node) { this.node = node; } private void receiveInvite(SocketAddress from) { LOGGER.log(Level.INFO, "{0}: received invite from {1}", new Object[] { node.getName(), from.toString() }); int ack_id = buf.getInt(); node.sendAck(from, ack_id); node.addNeighbor(from); } private void receiveAck(SocketAddress from) { int ack_id = buf.getInt(); LOGGER.log(Level.INFO, "{0}: received ack ({1}) from {2}", new Object[] { node.getName(), ack_id, from.toString() }); if (node.hasAck(ack_id)) { Ack theAck = node.getAck(ack_id); if (theAck.check(from)) { // theAck.setReceived(); // node.removeAck(theAck.getId()); node.removeAck(ack_id).setReceived(); } } } private void receiveLeave(SocketAddress from) { LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...", new Object[] { node.getName(), from.toString() }); if (node.removeNeighbor(from)) { int ack_id = buf.getInt(); node.sendAck(from, ack_id); } // If we don't know that neighbor, we don't have to // ack } private void receiveBroadcast(SocketAddress from) { int packet_id = buf.getInt(); byte command = buf.get(); if (isNewBroadcast(packet_id)) { // Save packet id so we don't forward a packet multiple times broadcastPackets.put(packet_id, System.currentTimeMillis()); switch (command) { case MessageType.STATUS: InetSocketAddress originalSender = readIPFromBuffer(); // Broadcast to my neighbors node.sendBroadcast(packet_id, command, BufferUtil.addrToBytes(originalSender)); LOGGER.log( Level.INFO, "{0}: received status broadcast packet from {1}. original sender: {2}", new Object[] { node.getName(), from.toString(), originalSender.toString() }); node.sendStatus(originalSender); break; default: LOGGER.log( Level.INFO, "{0}: received unknown broadcast packet from {1}. id: {2} type: {3}", new Object[] { node.getName(), from.toString(), packet_id, command }); break; } } else { LOGGER.log(Level.INFO, "{0}: Received duplicate broadcast packet ({0}). Discarding...", new Object[] { node.getName(), packet_id }); } } private void receiveStatus(SocketAddress receivedFrom) { List neighbors = new ArrayList(); while (buf.hasRemaining()) { neighbors.add(readIPFromBuffer()); } // Keep track of all neighbors.. } private InetSocketAddress readIPFromBuffer() { StringBuilder theAddr = new StringBuilder(); // Read 4 Bytes and 1 Integer = 1 IP address for (int i = 0; i < 4; i++) { theAddr.append(buf.get()); if (i < 3) { theAddr.append("."); } } int port = buf.getInt(); return new InetSocketAddress(theAddr.toString(), port); } private void receiveNewNeighbor(SocketAddress from) { int ack_id = buf.getInt(); InetSocketAddress newNeighbor = readIPFromBuffer(); node.addNeighbor(newNeighbor); LOGGER.log( Level.INFO, "{0}: from {1} received new neighbor:{2}", new Object[] { node.getName(), from.toString(), newNeighbor.toString() }); node.sendAck(from, ack_id); } private boolean isNewBroadcast(int id) { boolean isNew = true; if (broadcastPackets.containsKey(id)) { long receivedAt = broadcastPackets.get(id); if (receivedAt < System.currentTimeMillis() + BROADCAST_TIMEOUT) { isNew = false; } } return isNew; } public void run() { SocketAddress receivedFrom = null; while (running) { try { receivedFrom = node.getChannel().receive(buf); // channel.receive() is non blocking. So we need to check if // something actually has been written to the buffer if (buf.remaining() != BUF_SIZE) { buf.flip(); byte messageType = buf.get(); switch (messageType) { case MessageType.INVITE: receiveInvite(receivedFrom); break; case MessageType.ACK: receiveAck(receivedFrom); break; case MessageType.LEAVE: receiveLeave(receivedFrom); break; case MessageType.NEW_NEIGHBOR: receiveNewNeighbor(receivedFrom); break; case MessageType.BROADCAST: receiveBroadcast(receivedFrom); break; case MessageType.STATUS: receiveStatus(receivedFrom); break; default: LOGGER.log( Level.INFO, "{0}: received unknown command from {1}: [{2}]{3}", new Object[] { node.getName(), receivedFrom.toString(), messageType, new String(buf.array()) }); } } else { try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } buf.clear(); } catch (IOException e) { e.printStackTrace(); } } } public void terminate() { running = false; } }