diff --git a/README.md b/README.md index 77813f5..caf93f3 100644 --- a/README.md +++ b/README.md @@ -12,16 +12,17 @@ You need Java (JRE) version 8 or later to run jnetperf. - Run **/opt/jnetperf/bin/jperf**, if installed from package, or as **java -jar /path/to/jnetperf.jar** ```shell -Usage: jnetperf [-huV] [-l=SIZE] [-n=NUM] [-p=PORT] (-c=SERVER | -s) +Usage: jnetperf [-huV] [-l=NUM] [-n=NUM] [-p=NUM] [-t=SEC] (-c=SRV | -s) For more information visit https://git.data.coop/nellemann/jnetperf - -c, --connect=SERVER Connect to remote server. - -h, --help Show this help message and exit. - -l, --pkt-len=SIZE Packet size in bytes [default: 1432]. - -n, --pkt-num=NUM Number of packets to send [default: 150000]. - -p, --port=PORT Network port [default: 4445]. - -s, --server Run server and wait for client. - -u, --udp Use UDP network protocol [default: false]. - -V, --version Print version information and exit. + -c, --connect=SRV Connect to remote server (client). + -h, --help Show this help message and exit. + -l, --pkt-len=NUM Packet size in bytes (client) [default: 1432]. + -n, --pkt-num=NUM Number of packets to send (client) [default: 150000]. + -p, --port=NUM Network port [default: 4445]. + -s, --server Run server and wait for client (server). + -t, --runtime=SEC Time to run, supersedes pkt-num (client) [default: 0]. + -u, --udp Use UDP network protocol [default: false]. + -V, --version Print version information and exit. ``` diff --git a/gradle.properties b/gradle.properties index 337b92a..dee50d9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ projectId = jnetperf projectGroup = biz.nellemann.jnetperf -projectVersion = 0.0.6 +projectVersion = 0.0.8 diff --git a/src/main/java/biz/nellemann/jnetperf/Application.java b/src/main/java/biz/nellemann/jnetperf/Application.java index f613750..4f705c7 100644 --- a/src/main/java/biz/nellemann/jnetperf/Application.java +++ b/src/main/java/biz/nellemann/jnetperf/Application.java @@ -20,6 +20,7 @@ import picocli.CommandLine; import picocli.CommandLine.Command; import java.io.IOException; +import java.util.Locale; import java.util.concurrent.Callable; @@ -32,20 +33,23 @@ public class Application implements Callable { RunMode runMode; static class RunMode { - @CommandLine.Option(names = { "-c", "--connect" }, required = true, description = "Connect to remote server.", paramLabel = "HOST") + @CommandLine.Option(names = { "-c", "--connect" }, required = true, description = "Connect to remote server (client).", paramLabel = "SRV") String remoteServer; - @CommandLine.Option(names = { "-s", "--server" }, required = true, description = "Run server and wait for client.") + @CommandLine.Option(names = { "-s", "--server" }, required = true, description = "Run server and wait for client (server).") boolean runServer = false; } - @CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "NUM", description = "Packet size in bytes [default: ${DEFAULT-VALUE}].", converter = SuffixConverter.class) + @CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "NUM", description = "Packet size in bytes (client) [default: ${DEFAULT-VALUE}].", converter = UnitSuffixConverter.class) int packetSize = Payload.DEFAULT_LENGTH; - @CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send [default: ${DEFAULT-VALUE}].", converter = SuffixConverter.class) + @CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send (client) [default: ${DEFAULT-VALUE}].", converter = UnitSuffixConverter.class) int packetCount = 150_000; - @CommandLine.Option(names = { "-p", "--port" }, paramLabel = "PORT", description = "Network port [default: ${DEFAULT-VALUE}].") + @CommandLine.Option(names = { "-t", "--runtime" }, paramLabel = "SEC", description = "Time to run, precedes pkt-num (client) [default: ${DEFAULT-VALUE}].", converter = TimeSuffixConverter.class) + int timeInSeconds = 0; + + @CommandLine.Option(names = { "-p", "--port" }, paramLabel = "NUM", description = "Network port [default: ${DEFAULT-VALUE}].") int port = 4445; @CommandLine.Option(names = { "-u", "--udp" }, description = "Use UDP network protocol [default: ${DEFAULT-VALUE}].") @@ -54,12 +58,19 @@ public class Application implements Callable { @Override - public Integer call() throws Exception { + public Integer call() { - if(runMode.runServer) { - runServer(); - } else if(runMode.remoteServer != null) { - runClient(runMode.remoteServer); + // Set locale to en_US to ensure correct/identical number formatting + Locale.setDefault(new Locale("en", "US")); + + try { + if (runMode.runServer) { + runServer(); + } else if (runMode.remoteServer != null) { + runClient(runMode.remoteServer); + } + } catch (IOException | InterruptedException e) { + System.err.println(e.getMessage()); } return 0; @@ -81,13 +92,14 @@ public class Application implements Callable { if(useUdp) { if(packetSize > Payload.MAX_UDP_LENGTH) { + System.err.println("Packetsize > MAX UDP: " + packetSize); packetSize = Payload.MAX_UDP_LENGTH; } - UdpClient udpClient = new UdpClient(remoteHost, port, packetSize, packetCount, 0); + UdpClient udpClient = new UdpClient(remoteHost, port, packetSize, packetCount, timeInSeconds); udpClient.start(); } else { - TcpClient tcpClient = new TcpClient(remoteHost, port, packetSize, packetCount, 0); + TcpClient tcpClient = new TcpClient(remoteHost, port, packetSize, packetCount, timeInSeconds); tcpClient.start(); } } diff --git a/src/main/java/biz/nellemann/jnetperf/Payload.java b/src/main/java/biz/nellemann/jnetperf/Payload.java index 2e07e7e..046d733 100644 --- a/src/main/java/biz/nellemann/jnetperf/Payload.java +++ b/src/main/java/biz/nellemann/jnetperf/Payload.java @@ -34,7 +34,7 @@ import java.util.Arrays; public class Payload { public final static int MIN_LENGTH = 64; - public final static int MAX_UDP_LENGTH = 65507; + public final static int MAX_UDP_LENGTH = 64000; public final static int DEFAULT_LENGTH = 1432; public final static int HEADER_LENGTH = 32; diff --git a/src/main/java/biz/nellemann/jnetperf/Statistics.java b/src/main/java/biz/nellemann/jnetperf/Statistics.java index 17aa357..d5629bd 100644 --- a/src/main/java/biz/nellemann/jnetperf/Statistics.java +++ b/src/main/java/biz/nellemann/jnetperf/Statistics.java @@ -31,6 +31,7 @@ public class Statistics { private long packetsPerSec; private long packetsUnacked = 0; private int tickItererations = 0; + private int tickTotal = 0; private final long[] bytesPerSecAvgTmp = new long[MAX_TICKS_AVG]; private final long[] packetsPerSecAvgTmp = new long[MAX_TICKS_AVG]; @@ -67,6 +68,7 @@ public class Statistics { if(tickItererations % LOG_AVG_MODULO == 0) { printAverage(); } + tickTotal++; } @@ -121,6 +123,11 @@ public class Statistics { } + public int getRuntime() { + return tickTotal; + } + + private long getAverage(long[] array, long fallback) { long avg = getAverage(array); if(avg < 1) { diff --git a/src/main/java/biz/nellemann/jnetperf/TcpClient.java b/src/main/java/biz/nellemann/jnetperf/TcpClient.java index cefeb1f..5fbd797 100644 --- a/src/main/java/biz/nellemann/jnetperf/TcpClient.java +++ b/src/main/java/biz/nellemann/jnetperf/TcpClient.java @@ -5,12 +5,13 @@ import org.slf4j.LoggerFactory; import java.io.*; import java.net.*; +import java.util.concurrent.atomic.AtomicBoolean; public class TcpClient { final Logger log = LoggerFactory.getLogger(TcpClient.class); - private Statistics statistics; + private final Statistics statistics; private DataOutputStream out; private DataInputStream in; @@ -21,18 +22,18 @@ public class TcpClient { private Socket socket; private final byte[] inBuffer = new byte[Payload.DEFAULT_LENGTH]; - private final int packetCount; - private final int packetSize; - private final int packetTime; + private final int packets; + private final int length; + private final int runtime; - public TcpClient(String hostname, int port, int size, int maxPackets, int maxTime) throws IOException { + public TcpClient(String hostname, int port, int length, int packets, int runtime) throws IOException { log.info("TcpClient() - target: {}, port: {}", hostname, port); this.port = port; - this.packetSize = size; - this.packetCount = maxPackets; - this.packetTime = maxTime; + this.length = length; + this.packets = packets; + this.runtime = runtime; address = InetAddress.getByName(hostname); statistics = new Statistics(); @@ -61,39 +62,53 @@ public class TcpClient { public void start() throws IOException, InterruptedException { + AtomicBoolean keepRunning = new AtomicBoolean(true); + Thread shutdownHook = new Thread(() -> { + keepRunning.set(false); + System.out.println("Stopping jnetperf, please wait ..."); + }); + Runtime.getRuntime().addShutdownHook(shutdownHook); + + long sequence = 0; socket = new Socket(address, port); in = new DataInputStream(socket.getInputStream()); out = new DataOutputStream(socket.getOutputStream()); - long sequence = 0; // Send handshake - Payload payload = new Payload(PayloadType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount); + Payload payload = new Payload(PayloadType.HANDSHAKE.getValue(), length, sequence++, packets); send(payload); - payload = receive(); if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); return; } - // Data datagrams ... - for(int i = 0; i < packetCount; i++) { - payload = new Payload(PayloadType.DATA.getValue(), packetSize, sequence++, packetCount); + // Send data + do { + payload = new Payload(PayloadType.DATA.getValue(), length, sequence++, packets); send(payload); payload = receive(); if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); } statistics.tick(); - } - // End datagram - //Thread.sleep(100); - payload = new Payload(PayloadType.END.getValue(), packetSize, sequence++, packetCount); + if (sequence > packets) { + System.out.println("Max packets reached"); + keepRunning.set(false);; + } + + if(runtime > 0 && statistics.getRuntime() > runtime) { + System.out.println("Max runtime reached"); + keepRunning.set(false); + } + + } while (keepRunning.get()); + + // Send end + payload = new Payload(PayloadType.END.getValue(), length, sequence++, packets); send(payload); - - // TODO: Wait for ACK payload = receive(); statistics.ack(); if(payload.getType() != PayloadType.ACK.getValue()) { diff --git a/src/main/java/biz/nellemann/jnetperf/TcpServer.java b/src/main/java/biz/nellemann/jnetperf/TcpServer.java index d9558eb..40d328c 100644 --- a/src/main/java/biz/nellemann/jnetperf/TcpServer.java +++ b/src/main/java/biz/nellemann/jnetperf/TcpServer.java @@ -12,6 +12,7 @@ public class TcpServer extends Thread { final Logger log = LoggerFactory.getLogger(TcpServer.class); + private final int port; private ServerSocket socket; private DataInputStream in; private DataOutputStream out; @@ -20,22 +21,20 @@ public class TcpServer extends Thread { public TcpServer(int port) throws IOException { log.info("TcpServer()"); - - socket = new ServerSocket(port); - socket.setSoTimeout(10000); + this.port = port; } public void run() { - boolean running = true; - try { - while (running) { + while (true) { + socket = new ServerSocket(port); + socket.setSoTimeout(0); // Wait indefinitely inBuffer = new byte[Payload.DEFAULT_LENGTH]; session(); + socket.close(); } - socket.close(); } catch(IOException e) { log.error(e.getMessage()); } @@ -95,8 +94,7 @@ public class TcpServer extends Thread { private Payload receive() throws IOException { in.readFully(inBuffer); - Payload payload = new Payload(inBuffer); - return payload; + return new Payload(inBuffer); } } diff --git a/src/main/java/biz/nellemann/jnetperf/TimeSuffixConverter.java b/src/main/java/biz/nellemann/jnetperf/TimeSuffixConverter.java new file mode 100644 index 0000000..d518d04 --- /dev/null +++ b/src/main/java/biz/nellemann/jnetperf/TimeSuffixConverter.java @@ -0,0 +1,42 @@ +package biz.nellemann.jnetperf; + +import picocli.CommandLine; + +import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class TimeSuffixConverter implements CommandLine.ITypeConverter { + + final private Pattern pattern = Pattern.compile("(\\d+)([smh])?", Pattern.CASE_INSENSITIVE); + + public Integer convert(String value) { + int seconds = 0; + + Matcher matcher = pattern.matcher(value); + if (matcher.find()) { + int number = Integer.parseInt(matcher.group(1)); + if(matcher.group(2) != null) { // We got the second, minute or hour suffix + String suffix = matcher.group(2); + switch (suffix.toLowerCase(Locale.ROOT)) { + case "s": + seconds = number; + break; + case "m": + seconds = number * 60; + break; + case "h": + seconds = number * 60 * 60; + break; + default: + System.err.println("Unknown suffix: " + suffix); + seconds = number; + } + } else { + seconds = number; + } + } + return seconds; + } + +} diff --git a/src/main/java/biz/nellemann/jnetperf/UdpClient.java b/src/main/java/biz/nellemann/jnetperf/UdpClient.java index 2e3cdd3..08ee081 100644 --- a/src/main/java/biz/nellemann/jnetperf/UdpClient.java +++ b/src/main/java/biz/nellemann/jnetperf/UdpClient.java @@ -21,6 +21,7 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,25 +30,25 @@ public class UdpClient { final Logger log = LoggerFactory.getLogger(UdpClient.class); - private Statistics statistics; + private final Statistics statistics; private final int port; private final InetAddress address; private final DatagramSocket socket; private final byte[] inBuffer = new byte[Payload.DEFAULT_LENGTH]; - private final int packetCount; - private final int packetSize; - private final int packeTime; + private final int length; + private final int packets; + private final int runtime; - public UdpClient(String hostname, int port, int size, int maxPackets, int maxTime) throws UnknownHostException, SocketException { + public UdpClient(String hostname, int port, int length, int packets, int runtime) throws UnknownHostException, SocketException { log.info("UdpClient() - target: {}, port: {}", hostname, port); this.port = port; - this.packetSize = size; - this.packetCount = maxPackets; - this.packeTime = maxTime; + this.length = length; + this.packets = packets; + this.runtime = runtime; socket = new DatagramSocket(); address = InetAddress.getByName(hostname); @@ -62,7 +63,7 @@ public class UdpClient { } private Payload receive() throws IOException { - DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length); + DatagramPacket packet = new DatagramPacket(inBuffer, Payload.DEFAULT_LENGTH); socket.receive(packet); return new Payload(inBuffer); } @@ -75,35 +76,51 @@ public class UdpClient { public void start() throws IOException, InterruptedException { + AtomicBoolean keepRunning = new AtomicBoolean(true); + Thread shutdownHook = new Thread(() -> { + keepRunning.set(false); + System.out.println("Stopping jnetperf, please wait ..."); + }); + Runtime.getRuntime().addShutdownHook(shutdownHook); + long sequence = 0; // Send handshake - Payload payload = new Payload(PayloadType.HANDSHAKE.getValue(), Payload.DEFAULT_LENGTH, sequence++, packetCount); + Payload payload = new Payload(PayloadType.HANDSHAKE.getValue(), Payload.DEFAULT_LENGTH, sequence++, packets); send(payload); - payload = receive(); if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); return; } - // Data datagrams ... - for(int i = 0; i < packetCount; i++) { - payload = new Payload(PayloadType.DATA.getValue(), packetSize, sequence++, packetCount); + // Send data + do { + payload = new Payload(PayloadType.DATA.getValue(), length, sequence++, packets); send(payload); payload = receive(); if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); } statistics.tick(); - } - // End datagram + if (sequence > packets) { + System.out.println("Max packets reached"); + keepRunning.set(false); + } + + if(runtime > 0 && statistics.getRuntime() > runtime) { + System.out.println("Max runtime reached"); + keepRunning.set(false); + } + + } while (keepRunning.get()); + + + // Send end //Thread.sleep(100); - payload = new Payload(PayloadType.END.getValue(), packetSize, sequence++, packetCount); + payload = new Payload(PayloadType.END.getValue(), length, sequence++, packets); send(payload); - - // TODO: Wait for ACK payload = receive(); statistics.ack(); if(payload.getType() != PayloadType.ACK.getValue()) { diff --git a/src/main/java/biz/nellemann/jnetperf/UdpServer.java b/src/main/java/biz/nellemann/jnetperf/UdpServer.java index ba300a9..4febd26 100644 --- a/src/main/java/biz/nellemann/jnetperf/UdpServer.java +++ b/src/main/java/biz/nellemann/jnetperf/UdpServer.java @@ -28,25 +28,25 @@ public class UdpServer extends Thread { final Logger log = LoggerFactory.getLogger(UdpServer.class); - private final DatagramSocket socket; + private final int port; + private DatagramSocket socket; private byte[] inBuffer; - public UdpServer(int port) throws SocketException { + public UdpServer(int port) { log.info("UdpServer()"); - socket = new DatagramSocket(port); + this.port = port; } public void run() { - boolean running = true; - try { - while (running) { + while (true) { inBuffer = new byte[Payload.DEFAULT_LENGTH]; + socket = new DatagramSocket(port); session(); + socket.close(); } - socket.close(); } catch(IOException e) { log.error(e.getMessage()); } diff --git a/src/main/java/biz/nellemann/jnetperf/SuffixConverter.java b/src/main/java/biz/nellemann/jnetperf/UnitSuffixConverter.java similarity index 93% rename from src/main/java/biz/nellemann/jnetperf/SuffixConverter.java rename to src/main/java/biz/nellemann/jnetperf/UnitSuffixConverter.java index 12d6016..f117433 100644 --- a/src/main/java/biz/nellemann/jnetperf/SuffixConverter.java +++ b/src/main/java/biz/nellemann/jnetperf/UnitSuffixConverter.java @@ -6,7 +6,7 @@ import java.util.Locale; import java.util.regex.Matcher; import java.util.regex.Pattern; -public class SuffixConverter implements CommandLine.ITypeConverter { +public class UnitSuffixConverter implements CommandLine.ITypeConverter { final private Pattern pattern = Pattern.compile("(\\d+)([kmg])?b?", Pattern.CASE_INSENSITIVE);