college/ws2012/P2P/uebungen/4/src/node/UDPHandler.java

227 lines
7.6 KiB
Java

package node;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import util.BufferUtil;
public class UDPHandler implements Runnable {
private final static Logger LOGGER = Logger.getLogger(Node.class.getName());
public static final int BUF_SIZE = 512;
// How long do we remember broadcast packets (ms)?
private static final int BROADCAST_TIMEOUT = 10000;
private Map<Integer, Long> broadcastPackets = new HashMap<>();
private volatile boolean running = true;
private ByteBuffer buf = ByteBuffer.allocate(BUF_SIZE);
private Node node;
public UDPHandler(Node node) {
this.node = node;
}
private void receiveInvite(SocketAddress from) {
LOGGER.log(Level.INFO, "{0}: received invite from {1}", new Object[] {
node.getName(), from.toString() });
int ack_id = buf.getInt();
node.sendAck(from, ack_id);
node.addNeighbor(from);
}
private void receiveAck(SocketAddress from) {
int ack_id = buf.getInt();
LOGGER.log(Level.INFO, "{0}: received ack ({1}) from {2}",
new Object[] {
node.getName(), ack_id, from.toString() });
if (node.hasAck(ack_id)) {
Ack theAck = node.getAck(ack_id);
if (theAck.check(from)) {
// theAck.setReceived();
// node.removeAck(theAck.getId());
node.removeAck(ack_id).setReceived();
}
}
}
private void receiveLeave(SocketAddress from) {
LOGGER.log(Level.INFO, "{0}: {1} is leaving. Deleting...",
new Object[] { node.getName(), from.toString() });
if (node.removeNeighbor(from)) {
int ack_id = buf.getInt();
node.sendAck(from, ack_id);
}
// If we don't know that neighbor, we don't have to
// ack
}
private void receiveBroadcast(SocketAddress from) {
int packet_id = buf.getInt();
byte command = buf.get();
if (isNewBroadcast(packet_id)) {
// Save packet id so we don't forward a packet multiple times
broadcastPackets.put(packet_id, System.currentTimeMillis());
switch (command) {
case MessageType.STATUS:
InetSocketAddress originalSender = readIPFromBuffer();
// Broadcast to my neighbors (in case of a STATUS broadcast
// "originalSender" and "data" are the same, because the only
// data of a STATUS broadcast is the original sender
node.forwardBroadcast(packet_id, command,
BufferUtil.addrToBytes(originalSender), from);
LOGGER.log(
Level.INFO,
"{0}: received status broadcast packet from {1}. original sender: {2}",
new Object[] { node.getName(), from.toString(),
originalSender.toString() });
node.sendStatus(originalSender);
break;
default:
LOGGER.log(
Level.INFO,
"{0}: received unknown broadcast packet from {1}. id: {2} type: {3}",
new Object[] { node.getName(), from.toString(),
packet_id, command });
break;
}
} else {
LOGGER.log(Level.INFO,
"{0}: Received duplicate broadcast packet ({0}). Discarding...",
new Object[] { node.getName(), packet_id });
}
}
private void receiveStatus(SocketAddress receivedFrom) {
LOGGER.log(Level.INFO, "{0}: Received status from {1}",
new Object[] { node.getName(), receivedFrom.toString() });
List<String> neighbors = new ArrayList<String>();
while (buf.hasRemaining()) {
String node = readIPFromBuffer().toString();
neighbors.add(node);
}
// Keep track of all neighbors..
node.setNeighborsOfNode(receivedFrom.toString(), neighbors);
}
private InetSocketAddress readIPFromBuffer() {
StringBuilder theAddr = new StringBuilder();
// Read 4 Bytes and 1 Integer = 1 IP address
for (int i = 0; i < 4; i++) {
theAddr.append(buf.get());
if (i < 3) {
theAddr.append(".");
}
}
int port = buf.getInt();
return new InetSocketAddress(theAddr.toString(), port);
}
private void receiveNewNeighbor(SocketAddress from) {
int ack_id = buf.getInt();
InetSocketAddress newNeighbor = readIPFromBuffer();
node.addNeighbor(newNeighbor);
LOGGER.log(
Level.INFO,
"{0}: from {1} received new neighbor:{2}",
new Object[] { node.getName(), from.toString(),
newNeighbor.toString() });
node.sendAck(from, ack_id);
}
private boolean isNewBroadcast(int id) {
boolean isNew = true;
if (broadcastPackets.containsKey(id)) {
long receivedAt = broadcastPackets.get(id);
if (receivedAt < System.currentTimeMillis() + BROADCAST_TIMEOUT) {
isNew = false;
}
}
return isNew;
}
public void run() {
SocketAddress receivedFrom = null;
while (running) {
try {
receivedFrom = node.getChannel().receive(buf);
// channel.receive() is non blocking. So we need to check if
// something actually has been written to the buffer
if (buf.remaining() != BUF_SIZE) {
buf.flip();
byte messageType = buf.get();
switch (messageType) {
case MessageType.INVITE:
receiveInvite(receivedFrom);
break;
case MessageType.ACK:
receiveAck(receivedFrom);
break;
case MessageType.LEAVE:
receiveLeave(receivedFrom);
break;
case MessageType.NEW_NEIGHBOR:
receiveNewNeighbor(receivedFrom);
break;
case MessageType.BROADCAST:
receiveBroadcast(receivedFrom);
break;
case MessageType.STATUS:
receiveStatus(receivedFrom);
break;
default:
LOGGER.log(
Level.INFO,
"{0}: received unknown command from {1}: [{2}]{3}",
new Object[] { node.getName(),
receivedFrom.toString(), messageType,
new String(buf.array()) });
}
} else {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
buf.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void terminate() {
running = false;
}
}