From 87c8c1f56e63535436f20d4c65f9ce57b9c3fb2a Mon Sep 17 00:00:00 2001 From: Mark Nellemann Date: Thu, 13 Jul 2023 21:59:38 +0200 Subject: [PATCH 1/2] Initial support for TCP packets. --- README.md | 5 +- .../biz/nellemann/jnetperf/Application.java | 34 ++++-- .../java/biz/nellemann/jnetperf/Client.java | 4 + .../java/biz/nellemann/jnetperf/Datagram.java | 69 +++++------ .../biz/nellemann/jnetperf/TcpClient.java | 109 ++++++++++++++++++ .../biz/nellemann/jnetperf/TcpServer.java | 103 +++++++++++++++++ .../biz/nellemann/jnetperf/UdpClient.java | 12 +- .../biz/nellemann/jnetperf/UdpServer.java | 24 ++-- 8 files changed, 286 insertions(+), 74 deletions(-) create mode 100644 src/main/java/biz/nellemann/jnetperf/Client.java create mode 100644 src/main/java/biz/nellemann/jnetperf/TcpClient.java create mode 100644 src/main/java/biz/nellemann/jnetperf/TcpServer.java diff --git a/README.md b/README.md index d0f13c7..77813f5 100644 --- a/README.md +++ b/README.md @@ -12,14 +12,15 @@ You need Java (JRE) version 8 or later to run jnetperf. - Run **/opt/jnetperf/bin/jperf**, if installed from package, or as **java -jar /path/to/jnetperf.jar** ```shell -Usage: jnetperf [-hV] [-l=SIZE] [-n=NUM] [-p=PORT] (-c=SERVER | -s) +Usage: jnetperf [-huV] [-l=SIZE] [-n=NUM] [-p=PORT] (-c=SERVER | -s) For more information visit https://git.data.coop/nellemann/jnetperf -c, --connect=SERVER Connect to remote server. -h, --help Show this help message and exit. - -l, --pkt-len=SIZE Datagram size in bytes, max 65507 [default: 65507]. + -l, --pkt-len=SIZE Packet size in bytes [default: 1432]. -n, --pkt-num=NUM Number of packets to send [default: 150000]. -p, --port=PORT Network port [default: 4445]. -s, --server Run server and wait for client. + -u, --udp Use UDP network protocol [default: false]. -V, --version Print version information and exit. ``` diff --git a/src/main/java/biz/nellemann/jnetperf/Application.java b/src/main/java/biz/nellemann/jnetperf/Application.java index a2744ee..26ec379 100644 --- a/src/main/java/biz/nellemann/jnetperf/Application.java +++ b/src/main/java/biz/nellemann/jnetperf/Application.java @@ -40,8 +40,8 @@ public class Application implements Callable { boolean runServer = false; } - @CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "SIZE", description = "Datagram size in bytes, max 65507 [default: ${DEFAULT-VALUE}].") - int packetSize = 65507; // Min: 256 Max: 65507 + @CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "SIZE", description = "Packet size in bytes [default: ${DEFAULT-VALUE}].") + int packetSize = Datagram.DEFAULT_LENGTH; @CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send [default: ${DEFAULT-VALUE}].") int packetCount = 150_000; @@ -49,6 +49,9 @@ public class Application implements Callable { @CommandLine.Option(names = { "-p", "--port" }, paramLabel = "PORT", description = "Network port [default: ${DEFAULT-VALUE}].") int port = 4445; + @CommandLine.Option(names = { "-u", "--udp" }, description = "Use UDP network protocol [default: ${DEFAULT-VALUE}].") + boolean useUdp = false; + @Override @@ -72,15 +75,30 @@ public class Application implements Callable { private void runClient(String remoteHost) throws InterruptedException, IOException { - UdpClient udpClient = new UdpClient(remoteHost, port, packetCount, packetSize); - udpClient.start(); + if(useUdp) { + if(packetSize > Datagram.MAX_UDP_LENGTH) { + System.err.println("Packet size not allowed for UDP"); + return; + } + UdpClient udpClient = new UdpClient(remoteHost, port, packetCount, packetSize); + udpClient.start(); + } else { + TcpClient tcpClient = new TcpClient(remoteHost, port, packetCount, packetSize); + tcpClient.start(); + } } - private void runServer() throws SocketException, InterruptedException { - UdpServer udpServer = new UdpServer(port); - udpServer.start(); - udpServer.join(); + private void runServer() throws IOException, InterruptedException { + if(useUdp) { + UdpServer udpServer = new UdpServer(port); + udpServer.start(); + udpServer.join(); + } else { + TcpServer tcpServer = new TcpServer(port); + tcpServer.start(); + tcpServer.join(); + } } } diff --git a/src/main/java/biz/nellemann/jnetperf/Client.java b/src/main/java/biz/nellemann/jnetperf/Client.java new file mode 100644 index 0000000..7fd4562 --- /dev/null +++ b/src/main/java/biz/nellemann/jnetperf/Client.java @@ -0,0 +1,4 @@ +package biz.nellemann.jnetperf; + +public interface Client { +} diff --git a/src/main/java/biz/nellemann/jnetperf/Datagram.java b/src/main/java/biz/nellemann/jnetperf/Datagram.java index d1233aa..485bac1 100644 --- a/src/main/java/biz/nellemann/jnetperf/Datagram.java +++ b/src/main/java/biz/nellemann/jnetperf/Datagram.java @@ -15,13 +15,10 @@ */ package biz.nellemann.jnetperf; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @@ -36,14 +33,14 @@ import org.slf4j.LoggerFactory; public class Datagram { - final Logger log = LoggerFactory.getLogger(Datagram.class); + public final static int MAX_UDP_LENGTH = 65507; + public final static int DEFAULT_LENGTH = 1432; + public final static int HEADER_LENGTH = 32; - private final int HEADER_LENGTH = 32; private final byte[] MAGIC_ID = "jPerfTok".getBytes(StandardCharsets.UTF_8); // Must be 8-bytes private final int type; private final int length; - private final int realLength; private final long curPkt; private final long maxPkt; private final byte[] data; @@ -56,70 +53,58 @@ public class Datagram { * @param currentPkt */ public Datagram(int type, int length, long currentPkt, long maxPkt) { - - log.debug("Datagram() - of type: {}, length: {}, sequence: {}", type, length, currentPkt, maxPkt); - this.type = type; - this.length = length; this.curPkt = currentPkt; this.maxPkt = maxPkt; + this.length = length; - if(type == DataType.DATA.getValue()) { - realLength = length; - data = new byte[length - HEADER_LENGTH]; + if (type == DataType.HANDSHAKE.getValue()) { + data = new byte[DEFAULT_LENGTH - HEADER_LENGTH]; } else { - realLength = HEADER_LENGTH * 2; - data = new byte[HEADER_LENGTH * 2]; + data = new byte[length - HEADER_LENGTH]; } - - //random.nextBytes(data); } + /** + * Assemble datagram from byte[] payload + * @param payload + */ + public Datagram(byte[] payload) { + this(ByteBuffer.wrap(payload)); + } /** - * Assemble datagram from payload + * Assemble datagram from ByteBuffer payload * @param payload */ - public Datagram(byte[] payload) throws IOException { + public Datagram(ByteBuffer payload) { - log.debug("Datagram() magic ID is: {} bytes long and contains: {}", MAGIC_ID.length, MAGIC_ID.toString()); - - ByteBuffer buffer = ByteBuffer.wrap(payload); byte[] id = new byte[8]; - buffer.get(id); + payload.get(id); if(!Arrays.equals(id, MAGIC_ID)) { - log.warn("Datagram() - magic ID does not match!"); - throw new IOException(); + System.out.println(Arrays.toString(id)); + System.out.println(Arrays.toString(MAGIC_ID)); + throw new RuntimeException("Datagram magic ID does not match: " + MAGIC_ID); } // Order is importent when assembling header fields like this - type = buffer.getInt(); - length = buffer.getInt(); - curPkt = buffer.getLong(); - maxPkt = buffer.getLong(); + type = payload.getInt(); + length = payload.getInt(); + curPkt = payload.getLong(); + maxPkt = payload.getLong(); - realLength = length; - if(type == DataType.DATA.getValue()) { - data = new byte[length - HEADER_LENGTH]; - buffer.get(data, 0, data.length); - } else { - data = new byte[HEADER_LENGTH * 2]; - } + data = new byte[payload.limit() - payload.position()]; + payload.get(data); } - public int getLength() { return length; } - public int getRealLength() { - return realLength; - } - public byte[] getPayload() throws IOException { + public byte[] getPayload() { - log.debug("getPayload() - with type: {}, length: {}, sequence: {}", type, length, curPkt); ByteBuffer buffer = ByteBuffer.allocate(data.length + HEADER_LENGTH); // Order is important diff --git a/src/main/java/biz/nellemann/jnetperf/TcpClient.java b/src/main/java/biz/nellemann/jnetperf/TcpClient.java new file mode 100644 index 0000000..43da15a --- /dev/null +++ b/src/main/java/biz/nellemann/jnetperf/TcpClient.java @@ -0,0 +1,109 @@ +package biz.nellemann.jnetperf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.*; + +public class TcpClient { + + final Logger log = LoggerFactory.getLogger(TcpClient.class); + + private Statistics statistics; + + private DataOutputStream out; + private DataInputStream in; + + + private final int port; + private final InetAddress address; + private Socket socket; + + private final byte[] inBuffer = new byte[Datagram.DEFAULT_LENGTH]; + private final int packetCount; + private final int packetSize; + + + public TcpClient(String hostname, int port, int packets, int size) throws IOException { + log.info("TcpClient() - target: {}, port: {}", hostname, port); + + this.port = port; + this.packetCount = packets; + this.packetSize = size; + + address = InetAddress.getByName(hostname); + statistics = new Statistics(); + } + + + private void send(Datagram datagram) throws IOException { + out.write(datagram.getPayload()); + statistics.transferPacket(); + statistics.transferBytes(datagram.getLength()); + } + + + private Datagram receive() throws IOException { + in.readFully(inBuffer); + return new Datagram(inBuffer); + } + + + private void close() throws IOException { + in.close(); + out.close(); + socket.close(); + } + + + public void start() throws IOException, InterruptedException { + + socket = new Socket(address, port); + in = new DataInputStream(socket.getInputStream()); + out = new DataOutputStream(socket.getOutputStream()); + + long sequence = 0; + + // Send handshake + Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount); + send(datagram); + + datagram = receive(); + if(datagram.getType() != DataType.ACK.getValue()) { + log.warn("No ACK!"); + return; + } + + // Data datagrams ... + for(int i = 0; i < packetCount; i++) { + datagram = new Datagram(DataType.DATA.getValue(), packetSize, sequence++, packetCount); + send(datagram); + datagram = receive(); + if(datagram.getType() != DataType.ACK.getValue()) { + log.warn("No ACK!"); + } + statistics.tick(); + } + + // End datagram + //Thread.sleep(100); + datagram = new Datagram(DataType.END.getValue(), packetSize, sequence++, packetCount); + send(datagram); + System.out.println("Sending END datagram"); + + // TODO: Wait for ACK + datagram = receive(); + statistics.ack(); + if(datagram.getType() != DataType.ACK.getValue()) { + log.warn("No ACK!"); + return; + } + + Thread.sleep(100); + close(); + statistics.printAverage(); + statistics.printSummary(); + } + +} diff --git a/src/main/java/biz/nellemann/jnetperf/TcpServer.java b/src/main/java/biz/nellemann/jnetperf/TcpServer.java new file mode 100644 index 0000000..d56ca29 --- /dev/null +++ b/src/main/java/biz/nellemann/jnetperf/TcpServer.java @@ -0,0 +1,103 @@ +package biz.nellemann.jnetperf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.ByteBuffer; + +public class TcpServer extends Thread { + + final Logger log = LoggerFactory.getLogger(TcpServer.class); + + private ServerSocket socket; + private DataInputStream in; + private DataOutputStream out; + private byte[] inBuffer; + + + public TcpServer(int port) throws IOException { + log.info("TcpServer()"); + + socket = new ServerSocket(port); + socket.setSoTimeout(10000); + } + + + public void run() { + + boolean running = true; + + try { + while (running) { + inBuffer = new byte[Datagram.DEFAULT_LENGTH]; + session(); + } + socket.close(); + } catch(IOException e) { + log.error(e.getMessage()); + } + + } + + + public void session() throws IOException { + + Statistics statistics = new Statistics(); + boolean running = true; + boolean ackEnd = false; + + Socket server = socket.accept(); + InetAddress address = socket.getInetAddress(); + + in = new DataInputStream(server.getInputStream()); + out = new DataOutputStream(server.getOutputStream()); + + while (running) { + + Datagram datagram = receive(); + statistics.transferPacket(); + statistics.transferBytes(datagram.getLength()); + + if(datagram.getType() == DataType.HANDSHAKE.getValue()) { + log.info("Handshake from ... {}", address); + // Setup to receive larger datagrams + inBuffer = new byte[datagram.getLength()]; + statistics.reset(); + } + + if(datagram.getType() == DataType.END.getValue()) { + ackEnd = true; + } + + // Send ACK + Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), Datagram.DEFAULT_LENGTH, datagram.getCurPkt(), 1); + out.write(responseDatagram.getPayload()); + statistics.ack(); + + statistics.tick(); + if(ackEnd && statistics.getPacketsTransferredTotal() > datagram.getMaxPkt()) { + running = false; + statistics.printAverage(); + statistics.printSummary(); + } + + } + + in.close(); + out.close(); + server.close(); + + } + + + private Datagram receive() throws IOException { + in.readFully(inBuffer); + Datagram datagram = new Datagram(inBuffer); + return datagram; + } + +} diff --git a/src/main/java/biz/nellemann/jnetperf/UdpClient.java b/src/main/java/biz/nellemann/jnetperf/UdpClient.java index edead01..ce4b52c 100644 --- a/src/main/java/biz/nellemann/jnetperf/UdpClient.java +++ b/src/main/java/biz/nellemann/jnetperf/UdpClient.java @@ -35,7 +35,7 @@ public class UdpClient { private final InetAddress address; private final DatagramSocket socket; - private final byte[] buf = new byte[256]; + private final byte[] inBuffer = new byte[Datagram.DEFAULT_LENGTH]; private final int packetCount; private final int packetSize; @@ -53,16 +53,16 @@ public class UdpClient { } private void send(Datagram datagram) throws IOException { - DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getRealLength(), address, port); + DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getLength(), address, port); socket.send(packet); statistics.transferPacket(); - statistics.transferBytes(datagram.getRealLength()); + statistics.transferBytes(datagram.getLength()); } private Datagram receive() throws IOException { - DatagramPacket packet = new DatagramPacket(buf, buf.length); + DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length); socket.receive(packet); - return new Datagram(buf); + return new Datagram(inBuffer); } @@ -76,7 +76,7 @@ public class UdpClient { long sequence = 0; // Send handshake - Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount); + Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), Datagram.DEFAULT_LENGTH, sequence++, packetCount); send(datagram); datagram = receive(); diff --git a/src/main/java/biz/nellemann/jnetperf/UdpServer.java b/src/main/java/biz/nellemann/jnetperf/UdpServer.java index c5901f2..775b0fc 100644 --- a/src/main/java/biz/nellemann/jnetperf/UdpServer.java +++ b/src/main/java/biz/nellemann/jnetperf/UdpServer.java @@ -29,7 +29,7 @@ public class UdpServer extends Thread { final Logger log = LoggerFactory.getLogger(UdpServer.class); private final DatagramSocket socket; - private byte[] buf = new byte[256]; + private byte[] inBuffer; public UdpServer(int port) throws SocketException { @@ -42,11 +42,10 @@ public class UdpServer extends Thread { boolean running = true; try { - while (running) { + inBuffer = new byte[Datagram.DEFAULT_LENGTH]; session(); } - socket.close(); } catch(IOException e) { log.error(e.getMessage()); @@ -63,35 +62,29 @@ public class UdpServer extends Thread { while (running) { - DatagramPacket packet = new DatagramPacket(buf, buf.length); + DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length); socket.receive(packet); InetAddress address = packet.getAddress(); int port = packet.getPort(); - Datagram datagram = new Datagram(buf); + Datagram datagram = new Datagram(packet.getData()); statistics.transferPacket(); - statistics.transferBytes(datagram.getRealLength()); + statistics.transferBytes(datagram.getLength()); if(datagram.getType() == DataType.HANDSHAKE.getValue()) { - log.info("Handshake from ... {}, length: {}", address, datagram.getLength()); + log.info("Handshake from ... {}", address); // Setup to receive larger datagrams - buf = new byte[datagram.getLength()]; + inBuffer = new byte[datagram.getLength()]; statistics.reset(); } - /* - if(datagram.getType() == DataType.DATA.getValue()) { - bytesReceived += datagram.getRealLength(); - bytesReceivedTotal += datagram.getRealLength(); - }*/ - if(datagram.getType() == DataType.END.getValue()) { ackEnd = true; } // Send ACK - Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), 32, datagram.getCurPkt(), 1); + Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), Datagram.DEFAULT_LENGTH, datagram.getCurPkt(), 1); packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port); socket.send(packet); statistics.ack(); @@ -103,7 +96,6 @@ public class UdpServer extends Thread { statistics.printSummary(); } - } From 7f4a5d28aceea7e9016ffe7b5955035113bca4b7 Mon Sep 17 00:00:00 2001 From: Mark Nellemann Date: Fri, 14 Jul 2023 07:40:34 +0200 Subject: [PATCH 2/2] Support k,m,g suffix and some cleanup. --- gradle.properties | 2 +- .../biz/nellemann/jnetperf/Application.java | 24 ++++++----- .../java/biz/nellemann/jnetperf/Client.java | 4 -- .../jnetperf/{Datagram.java => Payload.java} | 11 ++--- .../{DataType.java => PayloadType.java} | 4 +- .../nellemann/jnetperf/SuffixConverter.java | 42 ++++++++++++++++++ .../biz/nellemann/jnetperf/TcpClient.java | 43 ++++++++++--------- .../biz/nellemann/jnetperf/TcpServer.java | 25 ++++++----- .../biz/nellemann/jnetperf/UdpClient.java | 42 +++++++++--------- .../biz/nellemann/jnetperf/UdpServer.java | 18 ++++---- .../nellemann/jnetperf/VersionProvider.java | 4 +- 11 files changed, 133 insertions(+), 86 deletions(-) delete mode 100644 src/main/java/biz/nellemann/jnetperf/Client.java rename src/main/java/biz/nellemann/jnetperf/{Datagram.java => Payload.java} (92%) rename src/main/java/biz/nellemann/jnetperf/{DataType.java => PayloadType.java} (92%) create mode 100644 src/main/java/biz/nellemann/jnetperf/SuffixConverter.java diff --git a/gradle.properties b/gradle.properties index 56ae239..337b92a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ projectId = jnetperf projectGroup = biz.nellemann.jnetperf -projectVersion = 0.0.5 +projectVersion = 0.0.6 diff --git a/src/main/java/biz/nellemann/jnetperf/Application.java b/src/main/java/biz/nellemann/jnetperf/Application.java index 26ec379..f613750 100644 --- a/src/main/java/biz/nellemann/jnetperf/Application.java +++ b/src/main/java/biz/nellemann/jnetperf/Application.java @@ -20,7 +20,6 @@ import picocli.CommandLine; import picocli.CommandLine.Command; import java.io.IOException; -import java.net.SocketException; import java.util.concurrent.Callable; @@ -33,17 +32,17 @@ public class Application implements Callable { RunMode runMode; static class RunMode { - @CommandLine.Option(names = { "-c", "--connect" }, required = true, description = "Connect to remote server.", paramLabel = "SERVER") + @CommandLine.Option(names = { "-c", "--connect" }, required = true, description = "Connect to remote server.", paramLabel = "HOST") String remoteServer; @CommandLine.Option(names = { "-s", "--server" }, required = true, description = "Run server and wait for client.") boolean runServer = false; } - @CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "SIZE", description = "Packet size in bytes [default: ${DEFAULT-VALUE}].") - int packetSize = Datagram.DEFAULT_LENGTH; + @CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "NUM", description = "Packet size in bytes [default: ${DEFAULT-VALUE}].", converter = SuffixConverter.class) + int packetSize = Payload.DEFAULT_LENGTH; - @CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send [default: ${DEFAULT-VALUE}].") + @CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send [default: ${DEFAULT-VALUE}].", converter = SuffixConverter.class) int packetCount = 150_000; @CommandLine.Option(names = { "-p", "--port" }, paramLabel = "PORT", description = "Network port [default: ${DEFAULT-VALUE}].") @@ -75,15 +74,20 @@ public class Application implements Callable { private void runClient(String remoteHost) throws InterruptedException, IOException { + + if(packetSize < Payload.MIN_LENGTH) { + packetSize = Payload.MIN_LENGTH; + } + if(useUdp) { - if(packetSize > Datagram.MAX_UDP_LENGTH) { - System.err.println("Packet size not allowed for UDP"); - return; + if(packetSize > Payload.MAX_UDP_LENGTH) { + packetSize = Payload.MAX_UDP_LENGTH; } - UdpClient udpClient = new UdpClient(remoteHost, port, packetCount, packetSize); + UdpClient udpClient = new UdpClient(remoteHost, port, packetSize, packetCount, 0); udpClient.start(); + } else { - TcpClient tcpClient = new TcpClient(remoteHost, port, packetCount, packetSize); + TcpClient tcpClient = new TcpClient(remoteHost, port, packetSize, packetCount, 0); tcpClient.start(); } } diff --git a/src/main/java/biz/nellemann/jnetperf/Client.java b/src/main/java/biz/nellemann/jnetperf/Client.java deleted file mode 100644 index 7fd4562..0000000 --- a/src/main/java/biz/nellemann/jnetperf/Client.java +++ /dev/null @@ -1,4 +0,0 @@ -package biz.nellemann.jnetperf; - -public interface Client { -} diff --git a/src/main/java/biz/nellemann/jnetperf/Datagram.java b/src/main/java/biz/nellemann/jnetperf/Payload.java similarity index 92% rename from src/main/java/biz/nellemann/jnetperf/Datagram.java rename to src/main/java/biz/nellemann/jnetperf/Payload.java index 485bac1..2e07e7e 100644 --- a/src/main/java/biz/nellemann/jnetperf/Datagram.java +++ b/src/main/java/biz/nellemann/jnetperf/Payload.java @@ -31,8 +31,9 @@ import java.util.Arrays; * */ -public class Datagram { +public class Payload { + public final static int MIN_LENGTH = 64; public final static int MAX_UDP_LENGTH = 65507; public final static int DEFAULT_LENGTH = 1432; public final static int HEADER_LENGTH = 32; @@ -52,13 +53,13 @@ public class Datagram { * @param length * @param currentPkt */ - public Datagram(int type, int length, long currentPkt, long maxPkt) { + public Payload(int type, int length, long currentPkt, long maxPkt) { this.type = type; this.curPkt = currentPkt; this.maxPkt = maxPkt; this.length = length; - if (type == DataType.HANDSHAKE.getValue()) { + if (type == PayloadType.HANDSHAKE.getValue()) { data = new byte[DEFAULT_LENGTH - HEADER_LENGTH]; } else { data = new byte[length - HEADER_LENGTH]; @@ -69,7 +70,7 @@ public class Datagram { * Assemble datagram from byte[] payload * @param payload */ - public Datagram(byte[] payload) { + public Payload(byte[] payload) { this(ByteBuffer.wrap(payload)); } @@ -78,7 +79,7 @@ public class Datagram { * Assemble datagram from ByteBuffer payload * @param payload */ - public Datagram(ByteBuffer payload) { + public Payload(ByteBuffer payload) { byte[] id = new byte[8]; payload.get(id); diff --git a/src/main/java/biz/nellemann/jnetperf/DataType.java b/src/main/java/biz/nellemann/jnetperf/PayloadType.java similarity index 92% rename from src/main/java/biz/nellemann/jnetperf/DataType.java rename to src/main/java/biz/nellemann/jnetperf/PayloadType.java index 973b938..80bcc6c 100644 --- a/src/main/java/biz/nellemann/jnetperf/DataType.java +++ b/src/main/java/biz/nellemann/jnetperf/PayloadType.java @@ -15,13 +15,13 @@ */ package biz.nellemann.jnetperf; -public enum DataType { +public enum PayloadType { HANDSHAKE(1), DATA(2), ACK(4), END(9); private final int value; - private DataType(int value) { + private PayloadType(int value) { this.value = value; } diff --git a/src/main/java/biz/nellemann/jnetperf/SuffixConverter.java b/src/main/java/biz/nellemann/jnetperf/SuffixConverter.java new file mode 100644 index 0000000..12d6016 --- /dev/null +++ b/src/main/java/biz/nellemann/jnetperf/SuffixConverter.java @@ -0,0 +1,42 @@ +package biz.nellemann.jnetperf; + +import picocli.CommandLine; + +import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SuffixConverter implements CommandLine.ITypeConverter { + + final private Pattern pattern = Pattern.compile("(\\d+)([kmg])?b?", Pattern.CASE_INSENSITIVE); + + public Integer convert(String value) { + int bytes = 0; + + Matcher matcher = pattern.matcher(value); + if (matcher.find()) { + int number = Integer.parseInt(matcher.group(1)); + if(matcher.group(2) != null) { // We got the kilo, mega og giga suffix + String suffix = matcher.group(2); + switch (suffix.toLowerCase(Locale.ROOT)) { + case "k": + bytes = number * 1024; + break; + case "m": + bytes = number * 1024 * 1024; + break; + case "g": + bytes = number * 1024 * 1024 * 1024; + break; + default: + System.err.println("Unknown suffix: " + suffix); + bytes = number; + } + } else { + bytes = number; + } + } + return bytes; + } + +} diff --git a/src/main/java/biz/nellemann/jnetperf/TcpClient.java b/src/main/java/biz/nellemann/jnetperf/TcpClient.java index 43da15a..cefeb1f 100644 --- a/src/main/java/biz/nellemann/jnetperf/TcpClient.java +++ b/src/main/java/biz/nellemann/jnetperf/TcpClient.java @@ -20,33 +20,35 @@ public class TcpClient { private final InetAddress address; private Socket socket; - private final byte[] inBuffer = new byte[Datagram.DEFAULT_LENGTH]; + private final byte[] inBuffer = new byte[Payload.DEFAULT_LENGTH]; private final int packetCount; private final int packetSize; + private final int packetTime; - public TcpClient(String hostname, int port, int packets, int size) throws IOException { + public TcpClient(String hostname, int port, int size, int maxPackets, int maxTime) throws IOException { log.info("TcpClient() - target: {}, port: {}", hostname, port); this.port = port; - this.packetCount = packets; this.packetSize = size; + this.packetCount = maxPackets; + this.packetTime = maxTime; address = InetAddress.getByName(hostname); statistics = new Statistics(); } - private void send(Datagram datagram) throws IOException { - out.write(datagram.getPayload()); + private void send(Payload payload) throws IOException { + out.write(payload.getPayload()); statistics.transferPacket(); - statistics.transferBytes(datagram.getLength()); + statistics.transferBytes(payload.getLength()); } - private Datagram receive() throws IOException { + private Payload receive() throws IOException { in.readFully(inBuffer); - return new Datagram(inBuffer); + return new Payload(inBuffer); } @@ -66,21 +68,21 @@ public class TcpClient { long sequence = 0; // Send handshake - Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount); - send(datagram); + Payload payload = new Payload(PayloadType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount); + send(payload); - datagram = receive(); - if(datagram.getType() != DataType.ACK.getValue()) { + payload = receive(); + if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); return; } // Data datagrams ... for(int i = 0; i < packetCount; i++) { - datagram = new Datagram(DataType.DATA.getValue(), packetSize, sequence++, packetCount); - send(datagram); - datagram = receive(); - if(datagram.getType() != DataType.ACK.getValue()) { + payload = new Payload(PayloadType.DATA.getValue(), packetSize, sequence++, packetCount); + send(payload); + payload = receive(); + if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); } statistics.tick(); @@ -88,14 +90,13 @@ public class TcpClient { // End datagram //Thread.sleep(100); - datagram = new Datagram(DataType.END.getValue(), packetSize, sequence++, packetCount); - send(datagram); - System.out.println("Sending END datagram"); + payload = new Payload(PayloadType.END.getValue(), packetSize, sequence++, packetCount); + send(payload); // TODO: Wait for ACK - datagram = receive(); + payload = receive(); statistics.ack(); - if(datagram.getType() != DataType.ACK.getValue()) { + if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); return; } diff --git a/src/main/java/biz/nellemann/jnetperf/TcpServer.java b/src/main/java/biz/nellemann/jnetperf/TcpServer.java index d56ca29..d9558eb 100644 --- a/src/main/java/biz/nellemann/jnetperf/TcpServer.java +++ b/src/main/java/biz/nellemann/jnetperf/TcpServer.java @@ -7,7 +7,6 @@ import java.io.*; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; -import java.nio.ByteBuffer; public class TcpServer extends Thread { @@ -33,7 +32,7 @@ public class TcpServer extends Thread { try { while (running) { - inBuffer = new byte[Datagram.DEFAULT_LENGTH]; + inBuffer = new byte[Payload.DEFAULT_LENGTH]; session(); } socket.close(); @@ -58,28 +57,28 @@ public class TcpServer extends Thread { while (running) { - Datagram datagram = receive(); + Payload payload = receive(); statistics.transferPacket(); - statistics.transferBytes(datagram.getLength()); + statistics.transferBytes(payload.getLength()); - if(datagram.getType() == DataType.HANDSHAKE.getValue()) { + if(payload.getType() == PayloadType.HANDSHAKE.getValue()) { log.info("Handshake from ... {}", address); // Setup to receive larger datagrams - inBuffer = new byte[datagram.getLength()]; + inBuffer = new byte[payload.getLength()]; statistics.reset(); } - if(datagram.getType() == DataType.END.getValue()) { + if(payload.getType() == PayloadType.END.getValue()) { ackEnd = true; } // Send ACK - Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), Datagram.DEFAULT_LENGTH, datagram.getCurPkt(), 1); - out.write(responseDatagram.getPayload()); + Payload responsePayload = new Payload(PayloadType.ACK.getValue(), Payload.DEFAULT_LENGTH, payload.getCurPkt(), 1); + out.write(responsePayload.getPayload()); statistics.ack(); statistics.tick(); - if(ackEnd && statistics.getPacketsTransferredTotal() > datagram.getMaxPkt()) { + if(ackEnd) { running = false; statistics.printAverage(); statistics.printSummary(); @@ -94,10 +93,10 @@ public class TcpServer extends Thread { } - private Datagram receive() throws IOException { + private Payload receive() throws IOException { in.readFully(inBuffer); - Datagram datagram = new Datagram(inBuffer); - return datagram; + Payload payload = new Payload(inBuffer); + return payload; } } diff --git a/src/main/java/biz/nellemann/jnetperf/UdpClient.java b/src/main/java/biz/nellemann/jnetperf/UdpClient.java index ce4b52c..2e3cdd3 100644 --- a/src/main/java/biz/nellemann/jnetperf/UdpClient.java +++ b/src/main/java/biz/nellemann/jnetperf/UdpClient.java @@ -35,34 +35,36 @@ public class UdpClient { private final InetAddress address; private final DatagramSocket socket; - private final byte[] inBuffer = new byte[Datagram.DEFAULT_LENGTH]; + private final byte[] inBuffer = new byte[Payload.DEFAULT_LENGTH]; private final int packetCount; private final int packetSize; + private final int packeTime; - public UdpClient(String hostname, int port, int packets, int size) throws UnknownHostException, SocketException { + public UdpClient(String hostname, int port, int size, int maxPackets, int maxTime) throws UnknownHostException, SocketException { log.info("UdpClient() - target: {}, port: {}", hostname, port); this.port = port; - this.packetCount = packets; this.packetSize = size; + this.packetCount = maxPackets; + this.packeTime = maxTime; socket = new DatagramSocket(); address = InetAddress.getByName(hostname); statistics = new Statistics(); } - private void send(Datagram datagram) throws IOException { - DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getLength(), address, port); + private void send(Payload payload) throws IOException { + DatagramPacket packet = new DatagramPacket(payload.getPayload(), payload.getLength(), address, port); socket.send(packet); statistics.transferPacket(); - statistics.transferBytes(datagram.getLength()); + statistics.transferBytes(payload.getLength()); } - private Datagram receive() throws IOException { + private Payload receive() throws IOException { DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length); socket.receive(packet); - return new Datagram(inBuffer); + return new Payload(inBuffer); } @@ -76,21 +78,21 @@ public class UdpClient { long sequence = 0; // Send handshake - Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), Datagram.DEFAULT_LENGTH, sequence++, packetCount); - send(datagram); + Payload payload = new Payload(PayloadType.HANDSHAKE.getValue(), Payload.DEFAULT_LENGTH, sequence++, packetCount); + send(payload); - datagram = receive(); - if(datagram.getType() != DataType.ACK.getValue()) { + payload = receive(); + if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); return; } // Data datagrams ... for(int i = 0; i < packetCount; i++) { - datagram = new Datagram(DataType.DATA.getValue(), packetSize, sequence++, packetCount); - send(datagram); - datagram = receive(); - if(datagram.getType() != DataType.ACK.getValue()) { + payload = new Payload(PayloadType.DATA.getValue(), packetSize, sequence++, packetCount); + send(payload); + payload = receive(); + if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); } statistics.tick(); @@ -98,13 +100,13 @@ public class UdpClient { // End datagram //Thread.sleep(100); - datagram = new Datagram(DataType.END.getValue(), packetSize, sequence++, packetCount); - send(datagram); + payload = new Payload(PayloadType.END.getValue(), packetSize, sequence++, packetCount); + send(payload); // TODO: Wait for ACK - datagram = receive(); + payload = receive(); statistics.ack(); - if(datagram.getType() != DataType.ACK.getValue()) { + if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); return; } diff --git a/src/main/java/biz/nellemann/jnetperf/UdpServer.java b/src/main/java/biz/nellemann/jnetperf/UdpServer.java index 775b0fc..ba300a9 100644 --- a/src/main/java/biz/nellemann/jnetperf/UdpServer.java +++ b/src/main/java/biz/nellemann/jnetperf/UdpServer.java @@ -43,7 +43,7 @@ public class UdpServer extends Thread { try { while (running) { - inBuffer = new byte[Datagram.DEFAULT_LENGTH]; + inBuffer = new byte[Payload.DEFAULT_LENGTH]; session(); } socket.close(); @@ -68,29 +68,29 @@ public class UdpServer extends Thread { InetAddress address = packet.getAddress(); int port = packet.getPort(); - Datagram datagram = new Datagram(packet.getData()); + Payload payload = new Payload(packet.getData()); statistics.transferPacket(); - statistics.transferBytes(datagram.getLength()); + statistics.transferBytes(payload.getLength()); - if(datagram.getType() == DataType.HANDSHAKE.getValue()) { + if(payload.getType() == PayloadType.HANDSHAKE.getValue()) { log.info("Handshake from ... {}", address); // Setup to receive larger datagrams - inBuffer = new byte[datagram.getLength()]; + inBuffer = new byte[payload.getLength()]; statistics.reset(); } - if(datagram.getType() == DataType.END.getValue()) { + if(payload.getType() == PayloadType.END.getValue()) { ackEnd = true; } // Send ACK - Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), Datagram.DEFAULT_LENGTH, datagram.getCurPkt(), 1); - packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port); + Payload responsePayload = new Payload(PayloadType.ACK.getValue(), Payload.DEFAULT_LENGTH, payload.getCurPkt(), 1); + packet = new DatagramPacket(responsePayload.getPayload(), responsePayload.getLength(), address, port); socket.send(packet); statistics.ack(); statistics.tick(); - if(ackEnd && statistics.getPacketsTransferredTotal() > datagram.getMaxPkt()) { + if(ackEnd) { running = false; statistics.printAverage(); statistics.printSummary(); diff --git a/src/main/java/biz/nellemann/jnetperf/VersionProvider.java b/src/main/java/biz/nellemann/jnetperf/VersionProvider.java index 7cb3ee3..9e7abda 100644 --- a/src/main/java/biz/nellemann/jnetperf/VersionProvider.java +++ b/src/main/java/biz/nellemann/jnetperf/VersionProvider.java @@ -28,7 +28,9 @@ class VersionProvider implements CommandLine.IVersionProvider { Manifest manifest = new Manifest(getClass().getResourceAsStream("/META-INF/MANIFEST.MF")); Attributes attrs = manifest.getMainAttributes(); - return new String[] { "${COMMAND-FULL-NAME} " + attrs.getValue("Build-Version") }; + return new String[] { + "${COMMAND-FULL-NAME} " + attrs.getValue("Build-Version") + " (on ${os.name} ${os.version} ${os.arch})", + "JVM: ${java.version} (${java.vendor} ${java.vm.name} ${java.vm.version})" }; } } \ No newline at end of file