From 87c8c1f56e63535436f20d4c65f9ce57b9c3fb2a Mon Sep 17 00:00:00 2001 From: Mark Nellemann Date: Thu, 13 Jul 2023 21:59:38 +0200 Subject: [PATCH] Initial support for TCP packets. --- README.md | 5 +- .../biz/nellemann/jnetperf/Application.java | 34 ++++-- .../java/biz/nellemann/jnetperf/Client.java | 4 + .../java/biz/nellemann/jnetperf/Datagram.java | 69 +++++------ .../biz/nellemann/jnetperf/TcpClient.java | 109 ++++++++++++++++++ .../biz/nellemann/jnetperf/TcpServer.java | 103 +++++++++++++++++ .../biz/nellemann/jnetperf/UdpClient.java | 12 +- .../biz/nellemann/jnetperf/UdpServer.java | 24 ++-- 8 files changed, 286 insertions(+), 74 deletions(-) create mode 100644 src/main/java/biz/nellemann/jnetperf/Client.java create mode 100644 src/main/java/biz/nellemann/jnetperf/TcpClient.java create mode 100644 src/main/java/biz/nellemann/jnetperf/TcpServer.java diff --git a/README.md b/README.md index d0f13c7..77813f5 100644 --- a/README.md +++ b/README.md @@ -12,14 +12,15 @@ You need Java (JRE) version 8 or later to run jnetperf. - Run **/opt/jnetperf/bin/jperf**, if installed from package, or as **java -jar /path/to/jnetperf.jar** ```shell -Usage: jnetperf [-hV] [-l=SIZE] [-n=NUM] [-p=PORT] (-c=SERVER | -s) +Usage: jnetperf [-huV] [-l=SIZE] [-n=NUM] [-p=PORT] (-c=SERVER | -s) For more information visit https://git.data.coop/nellemann/jnetperf -c, --connect=SERVER Connect to remote server. -h, --help Show this help message and exit. - -l, --pkt-len=SIZE Datagram size in bytes, max 65507 [default: 65507]. + -l, --pkt-len=SIZE Packet size in bytes [default: 1432]. -n, --pkt-num=NUM Number of packets to send [default: 150000]. -p, --port=PORT Network port [default: 4445]. -s, --server Run server and wait for client. + -u, --udp Use UDP network protocol [default: false]. -V, --version Print version information and exit. ``` diff --git a/src/main/java/biz/nellemann/jnetperf/Application.java b/src/main/java/biz/nellemann/jnetperf/Application.java index a2744ee..26ec379 100644 --- a/src/main/java/biz/nellemann/jnetperf/Application.java +++ b/src/main/java/biz/nellemann/jnetperf/Application.java @@ -40,8 +40,8 @@ public class Application implements Callable { boolean runServer = false; } - @CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "SIZE", description = "Datagram size in bytes, max 65507 [default: ${DEFAULT-VALUE}].") - int packetSize = 65507; // Min: 256 Max: 65507 + @CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "SIZE", description = "Packet size in bytes [default: ${DEFAULT-VALUE}].") + int packetSize = Datagram.DEFAULT_LENGTH; @CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send [default: ${DEFAULT-VALUE}].") int packetCount = 150_000; @@ -49,6 +49,9 @@ public class Application implements Callable { @CommandLine.Option(names = { "-p", "--port" }, paramLabel = "PORT", description = "Network port [default: ${DEFAULT-VALUE}].") int port = 4445; + @CommandLine.Option(names = { "-u", "--udp" }, description = "Use UDP network protocol [default: ${DEFAULT-VALUE}].") + boolean useUdp = false; + @Override @@ -72,15 +75,30 @@ public class Application implements Callable { private void runClient(String remoteHost) throws InterruptedException, IOException { - UdpClient udpClient = new UdpClient(remoteHost, port, packetCount, packetSize); - udpClient.start(); + if(useUdp) { + if(packetSize > Datagram.MAX_UDP_LENGTH) { + System.err.println("Packet size not allowed for UDP"); + return; + } + UdpClient udpClient = new UdpClient(remoteHost, port, packetCount, packetSize); + udpClient.start(); + } else { + TcpClient tcpClient = new TcpClient(remoteHost, port, packetCount, packetSize); + tcpClient.start(); + } } - private void runServer() throws SocketException, InterruptedException { - UdpServer udpServer = new UdpServer(port); - udpServer.start(); - udpServer.join(); + private void runServer() throws IOException, InterruptedException { + if(useUdp) { + UdpServer udpServer = new UdpServer(port); + udpServer.start(); + udpServer.join(); + } else { + TcpServer tcpServer = new TcpServer(port); + tcpServer.start(); + tcpServer.join(); + } } } diff --git a/src/main/java/biz/nellemann/jnetperf/Client.java b/src/main/java/biz/nellemann/jnetperf/Client.java new file mode 100644 index 0000000..7fd4562 --- /dev/null +++ b/src/main/java/biz/nellemann/jnetperf/Client.java @@ -0,0 +1,4 @@ +package biz.nellemann.jnetperf; + +public interface Client { +} diff --git a/src/main/java/biz/nellemann/jnetperf/Datagram.java b/src/main/java/biz/nellemann/jnetperf/Datagram.java index d1233aa..485bac1 100644 --- a/src/main/java/biz/nellemann/jnetperf/Datagram.java +++ b/src/main/java/biz/nellemann/jnetperf/Datagram.java @@ -15,13 +15,10 @@ */ package biz.nellemann.jnetperf; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @@ -36,14 +33,14 @@ import org.slf4j.LoggerFactory; public class Datagram { - final Logger log = LoggerFactory.getLogger(Datagram.class); + public final static int MAX_UDP_LENGTH = 65507; + public final static int DEFAULT_LENGTH = 1432; + public final static int HEADER_LENGTH = 32; - private final int HEADER_LENGTH = 32; private final byte[] MAGIC_ID = "jPerfTok".getBytes(StandardCharsets.UTF_8); // Must be 8-bytes private final int type; private final int length; - private final int realLength; private final long curPkt; private final long maxPkt; private final byte[] data; @@ -56,70 +53,58 @@ public class Datagram { * @param currentPkt */ public Datagram(int type, int length, long currentPkt, long maxPkt) { - - log.debug("Datagram() - of type: {}, length: {}, sequence: {}", type, length, currentPkt, maxPkt); - this.type = type; - this.length = length; this.curPkt = currentPkt; this.maxPkt = maxPkt; + this.length = length; - if(type == DataType.DATA.getValue()) { - realLength = length; - data = new byte[length - HEADER_LENGTH]; + if (type == DataType.HANDSHAKE.getValue()) { + data = new byte[DEFAULT_LENGTH - HEADER_LENGTH]; } else { - realLength = HEADER_LENGTH * 2; - data = new byte[HEADER_LENGTH * 2]; + data = new byte[length - HEADER_LENGTH]; } - - //random.nextBytes(data); } + /** + * Assemble datagram from byte[] payload + * @param payload + */ + public Datagram(byte[] payload) { + this(ByteBuffer.wrap(payload)); + } /** - * Assemble datagram from payload + * Assemble datagram from ByteBuffer payload * @param payload */ - public Datagram(byte[] payload) throws IOException { + public Datagram(ByteBuffer payload) { - log.debug("Datagram() magic ID is: {} bytes long and contains: {}", MAGIC_ID.length, MAGIC_ID.toString()); - - ByteBuffer buffer = ByteBuffer.wrap(payload); byte[] id = new byte[8]; - buffer.get(id); + payload.get(id); if(!Arrays.equals(id, MAGIC_ID)) { - log.warn("Datagram() - magic ID does not match!"); - throw new IOException(); + System.out.println(Arrays.toString(id)); + System.out.println(Arrays.toString(MAGIC_ID)); + throw new RuntimeException("Datagram magic ID does not match: " + MAGIC_ID); } // Order is importent when assembling header fields like this - type = buffer.getInt(); - length = buffer.getInt(); - curPkt = buffer.getLong(); - maxPkt = buffer.getLong(); + type = payload.getInt(); + length = payload.getInt(); + curPkt = payload.getLong(); + maxPkt = payload.getLong(); - realLength = length; - if(type == DataType.DATA.getValue()) { - data = new byte[length - HEADER_LENGTH]; - buffer.get(data, 0, data.length); - } else { - data = new byte[HEADER_LENGTH * 2]; - } + data = new byte[payload.limit() - payload.position()]; + payload.get(data); } - public int getLength() { return length; } - public int getRealLength() { - return realLength; - } - public byte[] getPayload() throws IOException { + public byte[] getPayload() { - log.debug("getPayload() - with type: {}, length: {}, sequence: {}", type, length, curPkt); ByteBuffer buffer = ByteBuffer.allocate(data.length + HEADER_LENGTH); // Order is important diff --git a/src/main/java/biz/nellemann/jnetperf/TcpClient.java b/src/main/java/biz/nellemann/jnetperf/TcpClient.java new file mode 100644 index 0000000..43da15a --- /dev/null +++ b/src/main/java/biz/nellemann/jnetperf/TcpClient.java @@ -0,0 +1,109 @@ +package biz.nellemann.jnetperf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.*; + +public class TcpClient { + + final Logger log = LoggerFactory.getLogger(TcpClient.class); + + private Statistics statistics; + + private DataOutputStream out; + private DataInputStream in; + + + private final int port; + private final InetAddress address; + private Socket socket; + + private final byte[] inBuffer = new byte[Datagram.DEFAULT_LENGTH]; + private final int packetCount; + private final int packetSize; + + + public TcpClient(String hostname, int port, int packets, int size) throws IOException { + log.info("TcpClient() - target: {}, port: {}", hostname, port); + + this.port = port; + this.packetCount = packets; + this.packetSize = size; + + address = InetAddress.getByName(hostname); + statistics = new Statistics(); + } + + + private void send(Datagram datagram) throws IOException { + out.write(datagram.getPayload()); + statistics.transferPacket(); + statistics.transferBytes(datagram.getLength()); + } + + + private Datagram receive() throws IOException { + in.readFully(inBuffer); + return new Datagram(inBuffer); + } + + + private void close() throws IOException { + in.close(); + out.close(); + socket.close(); + } + + + public void start() throws IOException, InterruptedException { + + socket = new Socket(address, port); + in = new DataInputStream(socket.getInputStream()); + out = new DataOutputStream(socket.getOutputStream()); + + long sequence = 0; + + // Send handshake + Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount); + send(datagram); + + datagram = receive(); + if(datagram.getType() != DataType.ACK.getValue()) { + log.warn("No ACK!"); + return; + } + + // Data datagrams ... + for(int i = 0; i < packetCount; i++) { + datagram = new Datagram(DataType.DATA.getValue(), packetSize, sequence++, packetCount); + send(datagram); + datagram = receive(); + if(datagram.getType() != DataType.ACK.getValue()) { + log.warn("No ACK!"); + } + statistics.tick(); + } + + // End datagram + //Thread.sleep(100); + datagram = new Datagram(DataType.END.getValue(), packetSize, sequence++, packetCount); + send(datagram); + System.out.println("Sending END datagram"); + + // TODO: Wait for ACK + datagram = receive(); + statistics.ack(); + if(datagram.getType() != DataType.ACK.getValue()) { + log.warn("No ACK!"); + return; + } + + Thread.sleep(100); + close(); + statistics.printAverage(); + statistics.printSummary(); + } + +} diff --git a/src/main/java/biz/nellemann/jnetperf/TcpServer.java b/src/main/java/biz/nellemann/jnetperf/TcpServer.java new file mode 100644 index 0000000..d56ca29 --- /dev/null +++ b/src/main/java/biz/nellemann/jnetperf/TcpServer.java @@ -0,0 +1,103 @@ +package biz.nellemann.jnetperf; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.ByteBuffer; + +public class TcpServer extends Thread { + + final Logger log = LoggerFactory.getLogger(TcpServer.class); + + private ServerSocket socket; + private DataInputStream in; + private DataOutputStream out; + private byte[] inBuffer; + + + public TcpServer(int port) throws IOException { + log.info("TcpServer()"); + + socket = new ServerSocket(port); + socket.setSoTimeout(10000); + } + + + public void run() { + + boolean running = true; + + try { + while (running) { + inBuffer = new byte[Datagram.DEFAULT_LENGTH]; + session(); + } + socket.close(); + } catch(IOException e) { + log.error(e.getMessage()); + } + + } + + + public void session() throws IOException { + + Statistics statistics = new Statistics(); + boolean running = true; + boolean ackEnd = false; + + Socket server = socket.accept(); + InetAddress address = socket.getInetAddress(); + + in = new DataInputStream(server.getInputStream()); + out = new DataOutputStream(server.getOutputStream()); + + while (running) { + + Datagram datagram = receive(); + statistics.transferPacket(); + statistics.transferBytes(datagram.getLength()); + + if(datagram.getType() == DataType.HANDSHAKE.getValue()) { + log.info("Handshake from ... {}", address); + // Setup to receive larger datagrams + inBuffer = new byte[datagram.getLength()]; + statistics.reset(); + } + + if(datagram.getType() == DataType.END.getValue()) { + ackEnd = true; + } + + // Send ACK + Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), Datagram.DEFAULT_LENGTH, datagram.getCurPkt(), 1); + out.write(responseDatagram.getPayload()); + statistics.ack(); + + statistics.tick(); + if(ackEnd && statistics.getPacketsTransferredTotal() > datagram.getMaxPkt()) { + running = false; + statistics.printAverage(); + statistics.printSummary(); + } + + } + + in.close(); + out.close(); + server.close(); + + } + + + private Datagram receive() throws IOException { + in.readFully(inBuffer); + Datagram datagram = new Datagram(inBuffer); + return datagram; + } + +} diff --git a/src/main/java/biz/nellemann/jnetperf/UdpClient.java b/src/main/java/biz/nellemann/jnetperf/UdpClient.java index edead01..ce4b52c 100644 --- a/src/main/java/biz/nellemann/jnetperf/UdpClient.java +++ b/src/main/java/biz/nellemann/jnetperf/UdpClient.java @@ -35,7 +35,7 @@ public class UdpClient { private final InetAddress address; private final DatagramSocket socket; - private final byte[] buf = new byte[256]; + private final byte[] inBuffer = new byte[Datagram.DEFAULT_LENGTH]; private final int packetCount; private final int packetSize; @@ -53,16 +53,16 @@ public class UdpClient { } private void send(Datagram datagram) throws IOException { - DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getRealLength(), address, port); + DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getLength(), address, port); socket.send(packet); statistics.transferPacket(); - statistics.transferBytes(datagram.getRealLength()); + statistics.transferBytes(datagram.getLength()); } private Datagram receive() throws IOException { - DatagramPacket packet = new DatagramPacket(buf, buf.length); + DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length); socket.receive(packet); - return new Datagram(buf); + return new Datagram(inBuffer); } @@ -76,7 +76,7 @@ public class UdpClient { long sequence = 0; // Send handshake - Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount); + Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), Datagram.DEFAULT_LENGTH, sequence++, packetCount); send(datagram); datagram = receive(); diff --git a/src/main/java/biz/nellemann/jnetperf/UdpServer.java b/src/main/java/biz/nellemann/jnetperf/UdpServer.java index c5901f2..775b0fc 100644 --- a/src/main/java/biz/nellemann/jnetperf/UdpServer.java +++ b/src/main/java/biz/nellemann/jnetperf/UdpServer.java @@ -29,7 +29,7 @@ public class UdpServer extends Thread { final Logger log = LoggerFactory.getLogger(UdpServer.class); private final DatagramSocket socket; - private byte[] buf = new byte[256]; + private byte[] inBuffer; public UdpServer(int port) throws SocketException { @@ -42,11 +42,10 @@ public class UdpServer extends Thread { boolean running = true; try { - while (running) { + inBuffer = new byte[Datagram.DEFAULT_LENGTH]; session(); } - socket.close(); } catch(IOException e) { log.error(e.getMessage()); @@ -63,35 +62,29 @@ public class UdpServer extends Thread { while (running) { - DatagramPacket packet = new DatagramPacket(buf, buf.length); + DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length); socket.receive(packet); InetAddress address = packet.getAddress(); int port = packet.getPort(); - Datagram datagram = new Datagram(buf); + Datagram datagram = new Datagram(packet.getData()); statistics.transferPacket(); - statistics.transferBytes(datagram.getRealLength()); + statistics.transferBytes(datagram.getLength()); if(datagram.getType() == DataType.HANDSHAKE.getValue()) { - log.info("Handshake from ... {}, length: {}", address, datagram.getLength()); + log.info("Handshake from ... {}", address); // Setup to receive larger datagrams - buf = new byte[datagram.getLength()]; + inBuffer = new byte[datagram.getLength()]; statistics.reset(); } - /* - if(datagram.getType() == DataType.DATA.getValue()) { - bytesReceived += datagram.getRealLength(); - bytesReceivedTotal += datagram.getRealLength(); - }*/ - if(datagram.getType() == DataType.END.getValue()) { ackEnd = true; } // Send ACK - Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), 32, datagram.getCurPkt(), 1); + Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), Datagram.DEFAULT_LENGTH, datagram.getCurPkt(), 1); packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port); socket.send(packet); statistics.ack(); @@ -103,7 +96,6 @@ public class UdpServer extends Thread { statistics.printSummary(); } - }