diff --git a/src/main/java/biz/nellemann/jperf/App.java b/src/main/java/biz/nellemann/jperf/App.java index 8dfa504..324e7d9 100644 --- a/src/main/java/biz/nellemann/jperf/App.java +++ b/src/main/java/biz/nellemann/jperf/App.java @@ -4,6 +4,8 @@ package biz.nellemann.jperf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import picocli.CommandLine; import picocli.CommandLine.Command; import java.util.concurrent.Callable; @@ -12,6 +14,10 @@ import java.util.concurrent.Callable; description = "Network performance measurement tool.") public class App implements Callable { + final Logger log = LoggerFactory.getLogger(App.class); + + @CommandLine.Option(names = { "-s", "--size" }, paramLabel = "SIZE", description = "the datagram size") + int size = 1500; @Override public Integer call() throws Exception { // your business logic goes here... @@ -20,31 +26,39 @@ public class App implements Callable { UdpServer udpServer = new UdpServer(); udpServer.start(); - int sequence = 0; + + long sequence = 0; + // Start client and send some messages UdpClient udpClient = new UdpClient(); // Start datagram - Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), 64, sequence++); + Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), size, sequence++); udpClient.send(datagram); + Thread.sleep(100); // TODO: Wait for ACK + datagram = udpClient.receive(); + if(datagram.getType() != DataType.ACK.getValue()) { + log.warn("No ACK!"); + return -1; + } // Data datagrams ... - for(int i = 0; i < 10; i++) { - Thread.sleep(1000); - datagram = new Datagram(DataType.DATA.getValue(), 64, sequence++); + for(int i = 0; i < 100; i++) { + datagram = new Datagram(DataType.DATA.getValue(), size, sequence++); udpClient.send(datagram); + //Thread.sleep(50); } // End datagram - datagram = new Datagram(DataType.END.getValue(), 64, sequence++); + Thread.sleep(500); + datagram = new Datagram(DataType.END.getValue(), size, sequence++); udpClient.send(datagram); udpClient.close(); - Thread.sleep(1000); return 0; diff --git a/src/main/java/biz/nellemann/jperf/Datagram.java b/src/main/java/biz/nellemann/jperf/Datagram.java index ccbb466..bf34589 100644 --- a/src/main/java/biz/nellemann/jperf/Datagram.java +++ b/src/main/java/biz/nellemann/jperf/Datagram.java @@ -2,17 +2,20 @@ package biz.nellemann.jperf; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * - * A datagram consists of - * - * <-------------------- HEADER 32 bytes --------------> <---------- DATA --------> - * int-4bytes int-4bytes long-8bytes long-8bytes - * TYPE SIZE SEQUENCE TIMESTAMP + * Datagram consists of the following + *

+ * <------------------------- HEADER 32 bytes -------------------> <---------- DATA min 32 bytes --------> + * _long _int _int _long _long + * 8_bytes 4_bytes 4_bytes 8_bytes 8_bytes + * MAGIC-ID TYPE LENGTH SEQUENCE TIMESTAMP * */ @@ -20,29 +23,40 @@ public class Datagram { final Logger log = LoggerFactory.getLogger(Datagram.class); - private final int HEADER_LENGTH = 24; + private final int HEADER_LENGTH = 32; + + private final byte[] MAGIC_ID = "jPerfTok".getBytes(StandardCharsets.UTF_8); // Must be 8-bytes private final int type; private final int length; + private final int realLength; private final long sequence; private final long timestamp; - private final byte[] data; /** * Create new empty datagram * @param type - * @param lenght + * @param length * @param sequence */ - public Datagram(int type, int lenght, long sequence) { + public Datagram(int type, int length, long sequence) { + + log.debug("Datagram() - of type: {}, length: {}, sequence: {}", type, length, sequence); + this.type = type; - this.length = lenght; + this.length = length; this.sequence = sequence; this.timestamp = System.currentTimeMillis(); - this.data = new byte[lenght - HEADER_LENGTH]; + if(type == DataType.DATA.getValue()) { + realLength = length; + data = new byte[length - HEADER_LENGTH]; + } else { + realLength = HEADER_LENGTH * 2; + data = new byte[HEADER_LENGTH * 2]; + } } @@ -51,11 +65,17 @@ public class Datagram { * Assemble datagram from payload * @param payload */ - public Datagram(byte[] payload) { + public Datagram(byte[] payload) throws IOException { - log.info("Datagram() 1"); + log.debug("Datagram() magic ID is: {} bytes long and contains: {}", MAGIC_ID.length, MAGIC_ID.toString()); ByteBuffer buffer = ByteBuffer.wrap(payload); + byte[] id = new byte[8]; + buffer.get(id); + if(!Arrays.equals(id, MAGIC_ID)) { + log.warn("Datagram() - magic ID does not match!"); + throw new IOException(); + } // Order is importent when assembling header fields like this type = buffer.getInt(); @@ -63,14 +83,13 @@ public class Datagram { sequence = buffer.getLong(); timestamp = buffer.getLong(); - log.info("Datagram() 2 "); - - log.info("Datagram() 3 "); - - data = new byte[length - HEADER_LENGTH]; - buffer.get(data); - - log.info("Datagram() 4"); + realLength = length; + if(type == DataType.DATA.getValue()) { + data = new byte[length - HEADER_LENGTH]; + buffer.get(data, 0, data.length); + } else { + data = new byte[HEADER_LENGTH * 2]; + } } @@ -78,11 +97,17 @@ public class Datagram { return length; } + public int getRealLength() { + return realLength; + } public byte[] getPayload() throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(length); + + log.debug("getPayload() - with type: {}, length: {}, sequence: {}", type, length, sequence); + ByteBuffer buffer = ByteBuffer.allocate(data.length + HEADER_LENGTH); // Order is important + buffer.put(MAGIC_ID); buffer.putInt(type); buffer.putInt(length); buffer.putLong(sequence); diff --git a/src/main/java/biz/nellemann/jperf/UdpClient.java b/src/main/java/biz/nellemann/jperf/UdpClient.java index b5f0189..a1fd4ee 100644 --- a/src/main/java/biz/nellemann/jperf/UdpClient.java +++ b/src/main/java/biz/nellemann/jperf/UdpClient.java @@ -17,7 +17,7 @@ public class UdpClient { private DatagramSocket socket; private InetAddress address; - private byte[] buf; + private byte[] buf = new byte[256]; public UdpClient() throws UnknownHostException, SocketException { log.info("UdpClient()"); @@ -26,10 +26,16 @@ public class UdpClient { } public void send(Datagram datagram) throws IOException { - DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getLength(), address, 4445); + DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getRealLength(), address, 4445); socket.send(packet); } + public Datagram receive() throws IOException { + DatagramPacket packet = new DatagramPacket(buf, buf.length); + socket.receive(packet); + Datagram datagram = new Datagram(buf); + return datagram; + } public String sendEcho(String msg) throws IOException { log.info("send() - msg: {}", msg); diff --git a/src/main/java/biz/nellemann/jperf/UdpServer.java b/src/main/java/biz/nellemann/jperf/UdpServer.java index ba73fb1..df4123c 100644 --- a/src/main/java/biz/nellemann/jperf/UdpServer.java +++ b/src/main/java/biz/nellemann/jperf/UdpServer.java @@ -14,7 +14,7 @@ public class UdpServer extends Thread { final Logger log = LoggerFactory.getLogger(UdpServer.class); - private DatagramSocket socket; + private final DatagramSocket socket; private boolean running; private byte[] buf = new byte[256]; @@ -24,9 +24,13 @@ public class UdpServer extends Thread { } public void run() { + running = true; + long thisSequence = 0; + long lastSequence = 0; try { + while (running) { DatagramPacket packet = new DatagramPacket(buf, buf.length); @@ -35,34 +39,47 @@ public class UdpServer extends Thread { InetAddress address = packet.getAddress(); int port = packet.getPort(); + log.debug("run() - buffer is: {}", buf.length); Datagram datagram = new Datagram(buf); + thisSequence = datagram.getSequence(); if(datagram.getType() == DataType.HANDSHAKE.getValue()) { - log.info("Handshake from ..."); + log.info("Handshake from ... {}, length: {}", address, datagram.getLength()); + + // Setup to receive larger datagrams + buf = new byte[datagram.getLength()]; + + // TODO: Send ACK + Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), 32, datagram.getSequence()); + packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port); + socket.send(packet); } if(datagram.getType() == DataType.END.getValue()) { running = false; log.info("Stopping ...."); + // TODO: Reset ? } if(datagram.getType() == DataType.DATA.getValue()) { - log.info("Data .... size: {}", datagram.getLength()); + if(thisSequence == lastSequence + 1) { + log.info("Data .... size: {}, sequence: {}", datagram.getLength(), thisSequence); + } else { + log.warn("Data .... out of sequence: {} vs {}", thisSequence, lastSequence); + } } - // Send response ACK - Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), 32, datagram.getSequence()); - packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port); - socket.send(packet); + lastSequence = thisSequence; + } socket.close(); } catch(IOException e) { - + log.error(e.getMessage()); } } diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..18a7a14 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,14 @@ + + + + false + + %cyan(%d{HH:mm:ss.SSS}) %gray([%thread]) %highlight(%-5level) %magenta(%logger{32}) - %msg%n + + + + + + + + \ No newline at end of file