diff --git a/README.md b/README.md index d0f13c7..77813f5 100644 --- a/README.md +++ b/README.md @@ -12,14 +12,15 @@ You need Java (JRE) version 8 or later to run jnetperf. - Run **/opt/jnetperf/bin/jperf**, if installed from package, or as **java -jar /path/to/jnetperf.jar** ```shell -Usage: jnetperf [-hV] [-l=SIZE] [-n=NUM] [-p=PORT] (-c=SERVER | -s) +Usage: jnetperf [-huV] [-l=SIZE] [-n=NUM] [-p=PORT] (-c=SERVER | -s) For more information visit https://git.data.coop/nellemann/jnetperf -c, --connect=SERVER Connect to remote server. -h, --help Show this help message and exit. - -l, --pkt-len=SIZE Datagram size in bytes, max 65507 [default: 65507]. + -l, --pkt-len=SIZE Packet size in bytes [default: 1432]. -n, --pkt-num=NUM Number of packets to send [default: 150000]. -p, --port=PORT Network port [default: 4445]. -s, --server Run server and wait for client. + -u, --udp Use UDP network protocol [default: false]. -V, --version Print version information and exit. ``` diff --git a/gradle.properties b/gradle.properties index 56ae239..337b92a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ projectId = jnetperf projectGroup = biz.nellemann.jnetperf -projectVersion = 0.0.5 +projectVersion = 0.0.6 diff --git a/src/main/java/biz/nellemann/jnetperf/Application.java b/src/main/java/biz/nellemann/jnetperf/Application.java index a2744ee..f613750 100644 --- a/src/main/java/biz/nellemann/jnetperf/Application.java +++ b/src/main/java/biz/nellemann/jnetperf/Application.java @@ -20,7 +20,6 @@ import picocli.CommandLine; import picocli.CommandLine.Command; import java.io.IOException; -import java.net.SocketException; import java.util.concurrent.Callable; @@ -33,22 +32,25 @@ public class Application implements Callable { RunMode runMode; static class RunMode { - @CommandLine.Option(names = { "-c", "--connect" }, required = true, description = "Connect to remote server.", paramLabel = "SERVER") + @CommandLine.Option(names = { "-c", "--connect" }, required = true, description = "Connect to remote server.", paramLabel = "HOST") String remoteServer; @CommandLine.Option(names = { "-s", "--server" }, required = true, description = "Run server and wait for client.") boolean runServer = false; } - @CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "SIZE", description = "Datagram size in bytes, max 65507 [default: ${DEFAULT-VALUE}].") - int packetSize = 65507; // Min: 256 Max: 65507 + @CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "NUM", description = "Packet size in bytes [default: ${DEFAULT-VALUE}].", converter = SuffixConverter.class) + int packetSize = Payload.DEFAULT_LENGTH; - @CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send [default: ${DEFAULT-VALUE}].") + @CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send [default: ${DEFAULT-VALUE}].", converter = SuffixConverter.class) int packetCount = 150_000; @CommandLine.Option(names = { "-p", "--port" }, paramLabel = "PORT", description = "Network port [default: ${DEFAULT-VALUE}].") int port = 4445; + @CommandLine.Option(names = { "-u", "--udp" }, description = "Use UDP network protocol [default: ${DEFAULT-VALUE}].") + boolean useUdp = false; + @Override @@ -72,15 +74,35 @@ public class Application implements Callable { private void runClient(String remoteHost) throws InterruptedException, IOException { - UdpClient udpClient = new UdpClient(remoteHost, port, packetCount, packetSize); - udpClient.start(); + + if(packetSize < Payload.MIN_LENGTH) { + packetSize = Payload.MIN_LENGTH; + } + + if(useUdp) { + if(packetSize > Payload.MAX_UDP_LENGTH) { + packetSize = Payload.MAX_UDP_LENGTH; + } + UdpClient udpClient = new UdpClient(remoteHost, port, packetSize, packetCount, 0); + udpClient.start(); + + } else { + TcpClient tcpClient = new TcpClient(remoteHost, port, packetSize, packetCount, 0); + tcpClient.start(); + } } - private void runServer() throws SocketException, InterruptedException { - UdpServer udpServer = new UdpServer(port); - udpServer.start(); - udpServer.join(); + private void runServer() throws IOException, InterruptedException { + if(useUdp) { + UdpServer udpServer = new UdpServer(port); + udpServer.start(); + udpServer.join(); + } else { + TcpServer tcpServer = new TcpServer(port); + tcpServer.start(); + tcpServer.join(); + } } } diff --git a/src/main/java/biz/nellemann/jnetperf/Datagram.java b/src/main/java/biz/nellemann/jnetperf/Payload.java similarity index 59% rename from src/main/java/biz/nellemann/jnetperf/Datagram.java rename to src/main/java/biz/nellemann/jnetperf/Payload.java index d1233aa..2e07e7e 100644 --- a/src/main/java/biz/nellemann/jnetperf/Datagram.java +++ b/src/main/java/biz/nellemann/jnetperf/Payload.java @@ -15,13 +15,10 @@ */ package biz.nellemann.jnetperf; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @@ -34,16 +31,17 @@ import org.slf4j.LoggerFactory; * */ -public class Datagram { +public class Payload { - final Logger log = LoggerFactory.getLogger(Datagram.class); + public final static int MIN_LENGTH = 64; + public final static int MAX_UDP_LENGTH = 65507; + public final static int DEFAULT_LENGTH = 1432; + public final static int HEADER_LENGTH = 32; - private final int HEADER_LENGTH = 32; private final byte[] MAGIC_ID = "jPerfTok".getBytes(StandardCharsets.UTF_8); // Must be 8-bytes private final int type; private final int length; - private final int realLength; private final long curPkt; private final long maxPkt; private final byte[] data; @@ -55,71 +53,59 @@ public class Datagram { * @param length * @param currentPkt */ - public Datagram(int type, int length, long currentPkt, long maxPkt) { - - log.debug("Datagram() - of type: {}, length: {}, sequence: {}", type, length, currentPkt, maxPkt); - + public Payload(int type, int length, long currentPkt, long maxPkt) { this.type = type; - this.length = length; this.curPkt = currentPkt; this.maxPkt = maxPkt; + this.length = length; - if(type == DataType.DATA.getValue()) { - realLength = length; - data = new byte[length - HEADER_LENGTH]; + if (type == PayloadType.HANDSHAKE.getValue()) { + data = new byte[DEFAULT_LENGTH - HEADER_LENGTH]; } else { - realLength = HEADER_LENGTH * 2; - data = new byte[HEADER_LENGTH * 2]; + data = new byte[length - HEADER_LENGTH]; } - - //random.nextBytes(data); } + /** + * Assemble datagram from byte[] payload + * @param payload + */ + public Payload(byte[] payload) { + this(ByteBuffer.wrap(payload)); + } /** - * Assemble datagram from payload + * Assemble datagram from ByteBuffer payload * @param payload */ - public Datagram(byte[] payload) throws IOException { + public Payload(ByteBuffer payload) { - log.debug("Datagram() magic ID is: {} bytes long and contains: {}", MAGIC_ID.length, MAGIC_ID.toString()); - - ByteBuffer buffer = ByteBuffer.wrap(payload); byte[] id = new byte[8]; - buffer.get(id); + payload.get(id); if(!Arrays.equals(id, MAGIC_ID)) { - log.warn("Datagram() - magic ID does not match!"); - throw new IOException(); + System.out.println(Arrays.toString(id)); + System.out.println(Arrays.toString(MAGIC_ID)); + throw new RuntimeException("Datagram magic ID does not match: " + MAGIC_ID); } // Order is importent when assembling header fields like this - type = buffer.getInt(); - length = buffer.getInt(); - curPkt = buffer.getLong(); - maxPkt = buffer.getLong(); + type = payload.getInt(); + length = payload.getInt(); + curPkt = payload.getLong(); + maxPkt = payload.getLong(); - realLength = length; - if(type == DataType.DATA.getValue()) { - data = new byte[length - HEADER_LENGTH]; - buffer.get(data, 0, data.length); - } else { - data = new byte[HEADER_LENGTH * 2]; - } + data = new byte[payload.limit() - payload.position()]; + payload.get(data); } - public int getLength() { return length; } - public int getRealLength() { - return realLength; - } - public byte[] getPayload() throws IOException { + public byte[] getPayload() { - log.debug("getPayload() - with type: {}, length: {}, sequence: {}", type, length, curPkt); ByteBuffer buffer = ByteBuffer.allocate(data.length + HEADER_LENGTH); // Order is important diff --git a/src/main/java/biz/nellemann/jnetperf/DataType.java b/src/main/java/biz/nellemann/jnetperf/PayloadType.java similarity index 92% rename from src/main/java/biz/nellemann/jnetperf/DataType.java rename to src/main/java/biz/nellemann/jnetperf/PayloadType.java index 973b938..80bcc6c 100644 --- a/src/main/java/biz/nellemann/jnetperf/DataType.java +++ b/src/main/java/biz/nellemann/jnetperf/PayloadType.java @@ -15,13 +15,13 @@ */ package biz.nellemann.jnetperf; -public enum DataType { +public enum PayloadType { HANDSHAKE(1), DATA(2), ACK(4), END(9); private final int value; - private DataType(int value) { + private PayloadType(int value) { this.value = value; } diff --git a/src/main/java/biz/nellemann/jnetperf/SuffixConverter.java b/src/main/java/biz/nellemann/jnetperf/SuffixConverter.java new file mode 100644 index 0000000..12d6016 --- /dev/null +++ b/src/main/java/biz/nellemann/jnetperf/SuffixConverter.java @@ -0,0 +1,42 @@ +package biz.nellemann.jnetperf; + +import picocli.CommandLine; + +import java.util.Locale; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SuffixConverter implements CommandLine.ITypeConverter { + + final private Pattern pattern = Pattern.compile("(\\d+)([kmg])?b?", Pattern.CASE_INSENSITIVE); + + public Integer convert(String value) { + int bytes = 0; + + Matcher matcher = pattern.matcher(value); + if (matcher.find()) { + int number = Integer.parseInt(matcher.group(1)); + if(matcher.group(2) != null) { // We got the kilo, mega og giga suffix + String suffix = matcher.group(2); + switch (suffix.toLowerCase(Locale.ROOT)) { + case "k": + bytes = number * 1024; + break; + case "m": + bytes = number * 1024 * 1024; + break; + case "g": + bytes = number * 1024 * 1024 * 1024; + break; + default: + System.err.println("Unknown suffix: " + suffix); + bytes = number; + } + } else { + bytes = number; + } + } + return bytes; + } + +} diff --git a/src/main/java/biz/nellemann/jnetperf/TcpClient.java b/src/main/java/biz/nellemann/jnetperf/TcpClient.java new file mode 100644 index 0000000..cefeb1f --- /dev/null +++ b/src/main/java/biz/nellemann/jnetperf/TcpClient.java @@ -0,0 +1,110 @@ +package biz.nellemann.jnetperf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.*; + +public class TcpClient { + + final Logger log = LoggerFactory.getLogger(TcpClient.class); + + private Statistics statistics; + + private DataOutputStream out; + private DataInputStream in; + + + private final int port; + private final InetAddress address; + private Socket socket; + + private final byte[] inBuffer = new byte[Payload.DEFAULT_LENGTH]; + private final int packetCount; + private final int packetSize; + private final int packetTime; + + + public TcpClient(String hostname, int port, int size, int maxPackets, int maxTime) throws IOException { + log.info("TcpClient() - target: {}, port: {}", hostname, port); + + this.port = port; + this.packetSize = size; + this.packetCount = maxPackets; + this.packetTime = maxTime; + + address = InetAddress.getByName(hostname); + statistics = new Statistics(); + } + + + private void send(Payload payload) throws IOException { + out.write(payload.getPayload()); + statistics.transferPacket(); + statistics.transferBytes(payload.getLength()); + } + + + private Payload receive() throws IOException { + in.readFully(inBuffer); + return new Payload(inBuffer); + } + + + private void close() throws IOException { + in.close(); + out.close(); + socket.close(); + } + + + public void start() throws IOException, InterruptedException { + + socket = new Socket(address, port); + in = new DataInputStream(socket.getInputStream()); + out = new DataOutputStream(socket.getOutputStream()); + + long sequence = 0; + + // Send handshake + Payload payload = new Payload(PayloadType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount); + send(payload); + + payload = receive(); + if(payload.getType() != PayloadType.ACK.getValue()) { + log.warn("No ACK!"); + return; + } + + // Data datagrams ... + for(int i = 0; i < packetCount; i++) { + payload = new Payload(PayloadType.DATA.getValue(), packetSize, sequence++, packetCount); + send(payload); + payload = receive(); + if(payload.getType() != PayloadType.ACK.getValue()) { + log.warn("No ACK!"); + } + statistics.tick(); + } + + // End datagram + //Thread.sleep(100); + payload = new Payload(PayloadType.END.getValue(), packetSize, sequence++, packetCount); + send(payload); + + // TODO: Wait for ACK + payload = receive(); + statistics.ack(); + if(payload.getType() != PayloadType.ACK.getValue()) { + log.warn("No ACK!"); + return; + } + + Thread.sleep(100); + close(); + statistics.printAverage(); + statistics.printSummary(); + } + +} diff --git a/src/main/java/biz/nellemann/jnetperf/TcpServer.java b/src/main/java/biz/nellemann/jnetperf/TcpServer.java new file mode 100644 index 0000000..d9558eb --- /dev/null +++ b/src/main/java/biz/nellemann/jnetperf/TcpServer.java @@ -0,0 +1,102 @@ +package biz.nellemann.jnetperf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; + +public class TcpServer extends Thread { + + final Logger log = LoggerFactory.getLogger(TcpServer.class); + + private ServerSocket socket; + private DataInputStream in; + private DataOutputStream out; + private byte[] inBuffer; + + + public TcpServer(int port) throws IOException { + log.info("TcpServer()"); + + socket = new ServerSocket(port); + socket.setSoTimeout(10000); + } + + + public void run() { + + boolean running = true; + + try { + while (running) { + inBuffer = new byte[Payload.DEFAULT_LENGTH]; + session(); + } + socket.close(); + } catch(IOException e) { + log.error(e.getMessage()); + } + + } + + + public void session() throws IOException { + + Statistics statistics = new Statistics(); + boolean running = true; + boolean ackEnd = false; + + Socket server = socket.accept(); + InetAddress address = socket.getInetAddress(); + + in = new DataInputStream(server.getInputStream()); + out = new DataOutputStream(server.getOutputStream()); + + while (running) { + + Payload payload = receive(); + statistics.transferPacket(); + statistics.transferBytes(payload.getLength()); + + if(payload.getType() == PayloadType.HANDSHAKE.getValue()) { + log.info("Handshake from ... {}", address); + // Setup to receive larger datagrams + inBuffer = new byte[payload.getLength()]; + statistics.reset(); + } + + if(payload.getType() == PayloadType.END.getValue()) { + ackEnd = true; + } + + // Send ACK + Payload responsePayload = new Payload(PayloadType.ACK.getValue(), Payload.DEFAULT_LENGTH, payload.getCurPkt(), 1); + out.write(responsePayload.getPayload()); + statistics.ack(); + + statistics.tick(); + if(ackEnd) { + running = false; + statistics.printAverage(); + statistics.printSummary(); + } + + } + + in.close(); + out.close(); + server.close(); + + } + + + private Payload receive() throws IOException { + in.readFully(inBuffer); + Payload payload = new Payload(inBuffer); + return payload; + } + +} diff --git a/src/main/java/biz/nellemann/jnetperf/UdpClient.java b/src/main/java/biz/nellemann/jnetperf/UdpClient.java index edead01..2e3cdd3 100644 --- a/src/main/java/biz/nellemann/jnetperf/UdpClient.java +++ b/src/main/java/biz/nellemann/jnetperf/UdpClient.java @@ -35,34 +35,36 @@ public class UdpClient { private final InetAddress address; private final DatagramSocket socket; - private final byte[] buf = new byte[256]; + private final byte[] inBuffer = new byte[Payload.DEFAULT_LENGTH]; private final int packetCount; private final int packetSize; + private final int packeTime; - public UdpClient(String hostname, int port, int packets, int size) throws UnknownHostException, SocketException { + public UdpClient(String hostname, int port, int size, int maxPackets, int maxTime) throws UnknownHostException, SocketException { log.info("UdpClient() - target: {}, port: {}", hostname, port); this.port = port; - this.packetCount = packets; this.packetSize = size; + this.packetCount = maxPackets; + this.packeTime = maxTime; socket = new DatagramSocket(); address = InetAddress.getByName(hostname); statistics = new Statistics(); } - private void send(Datagram datagram) throws IOException { - DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getRealLength(), address, port); + private void send(Payload payload) throws IOException { + DatagramPacket packet = new DatagramPacket(payload.getPayload(), payload.getLength(), address, port); socket.send(packet); statistics.transferPacket(); - statistics.transferBytes(datagram.getRealLength()); + statistics.transferBytes(payload.getLength()); } - private Datagram receive() throws IOException { - DatagramPacket packet = new DatagramPacket(buf, buf.length); + private Payload receive() throws IOException { + DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length); socket.receive(packet); - return new Datagram(buf); + return new Payload(inBuffer); } @@ -76,21 +78,21 @@ public class UdpClient { long sequence = 0; // Send handshake - Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount); - send(datagram); + Payload payload = new Payload(PayloadType.HANDSHAKE.getValue(), Payload.DEFAULT_LENGTH, sequence++, packetCount); + send(payload); - datagram = receive(); - if(datagram.getType() != DataType.ACK.getValue()) { + payload = receive(); + if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); return; } // Data datagrams ... for(int i = 0; i < packetCount; i++) { - datagram = new Datagram(DataType.DATA.getValue(), packetSize, sequence++, packetCount); - send(datagram); - datagram = receive(); - if(datagram.getType() != DataType.ACK.getValue()) { + payload = new Payload(PayloadType.DATA.getValue(), packetSize, sequence++, packetCount); + send(payload); + payload = receive(); + if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); } statistics.tick(); @@ -98,13 +100,13 @@ public class UdpClient { // End datagram //Thread.sleep(100); - datagram = new Datagram(DataType.END.getValue(), packetSize, sequence++, packetCount); - send(datagram); + payload = new Payload(PayloadType.END.getValue(), packetSize, sequence++, packetCount); + send(payload); // TODO: Wait for ACK - datagram = receive(); + payload = receive(); statistics.ack(); - if(datagram.getType() != DataType.ACK.getValue()) { + if(payload.getType() != PayloadType.ACK.getValue()) { log.warn("No ACK!"); return; } diff --git a/src/main/java/biz/nellemann/jnetperf/UdpServer.java b/src/main/java/biz/nellemann/jnetperf/UdpServer.java index c5901f2..ba300a9 100644 --- a/src/main/java/biz/nellemann/jnetperf/UdpServer.java +++ b/src/main/java/biz/nellemann/jnetperf/UdpServer.java @@ -29,7 +29,7 @@ public class UdpServer extends Thread { final Logger log = LoggerFactory.getLogger(UdpServer.class); private final DatagramSocket socket; - private byte[] buf = new byte[256]; + private byte[] inBuffer; public UdpServer(int port) throws SocketException { @@ -42,11 +42,10 @@ public class UdpServer extends Thread { boolean running = true; try { - while (running) { + inBuffer = new byte[Payload.DEFAULT_LENGTH]; session(); } - socket.close(); } catch(IOException e) { log.error(e.getMessage()); @@ -63,47 +62,40 @@ public class UdpServer extends Thread { while (running) { - DatagramPacket packet = new DatagramPacket(buf, buf.length); + DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length); socket.receive(packet); InetAddress address = packet.getAddress(); int port = packet.getPort(); - Datagram datagram = new Datagram(buf); + Payload payload = new Payload(packet.getData()); statistics.transferPacket(); - statistics.transferBytes(datagram.getRealLength()); + statistics.transferBytes(payload.getLength()); - if(datagram.getType() == DataType.HANDSHAKE.getValue()) { - log.info("Handshake from ... {}, length: {}", address, datagram.getLength()); + if(payload.getType() == PayloadType.HANDSHAKE.getValue()) { + log.info("Handshake from ... {}", address); // Setup to receive larger datagrams - buf = new byte[datagram.getLength()]; + inBuffer = new byte[payload.getLength()]; statistics.reset(); } - /* - if(datagram.getType() == DataType.DATA.getValue()) { - bytesReceived += datagram.getRealLength(); - bytesReceivedTotal += datagram.getRealLength(); - }*/ - - if(datagram.getType() == DataType.END.getValue()) { + if(payload.getType() == PayloadType.END.getValue()) { ackEnd = true; } // Send ACK - Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), 32, datagram.getCurPkt(), 1); - packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port); + Payload responsePayload = new Payload(PayloadType.ACK.getValue(), Payload.DEFAULT_LENGTH, payload.getCurPkt(), 1); + packet = new DatagramPacket(responsePayload.getPayload(), responsePayload.getLength(), address, port); socket.send(packet); statistics.ack(); statistics.tick(); - if(ackEnd && statistics.getPacketsTransferredTotal() > datagram.getMaxPkt()) { + if(ackEnd) { running = false; statistics.printAverage(); statistics.printSummary(); } - } diff --git a/src/main/java/biz/nellemann/jnetperf/VersionProvider.java b/src/main/java/biz/nellemann/jnetperf/VersionProvider.java index 7cb3ee3..9e7abda 100644 --- a/src/main/java/biz/nellemann/jnetperf/VersionProvider.java +++ b/src/main/java/biz/nellemann/jnetperf/VersionProvider.java @@ -28,7 +28,9 @@ class VersionProvider implements CommandLine.IVersionProvider { Manifest manifest = new Manifest(getClass().getResourceAsStream("/META-INF/MANIFEST.MF")); Attributes attrs = manifest.getMainAttributes(); - return new String[] { "${COMMAND-FULL-NAME} " + attrs.getValue("Build-Version") }; + return new String[] { + "${COMMAND-FULL-NAME} " + attrs.getValue("Build-Version") + " (on ${os.name} ${os.version} ${os.arch})", + "JVM: ${java.version} (${java.vendor} ${java.vm.name} ${java.vm.version})" }; } } \ No newline at end of file