Suppoet for TCP #1

Merged
nellemann merged 3 commits from tcp into main 2023-07-14 05:42:12 +00:00
11 changed files with 133 additions and 86 deletions
Showing only changes of commit 7f4a5d28ac - Show all commits

View file

@ -1,3 +1,3 @@
projectId = jnetperf projectId = jnetperf
projectGroup = biz.nellemann.jnetperf projectGroup = biz.nellemann.jnetperf
projectVersion = 0.0.5 projectVersion = 0.0.6

View file

@ -20,7 +20,6 @@ import picocli.CommandLine;
import picocli.CommandLine.Command; import picocli.CommandLine.Command;
import java.io.IOException; import java.io.IOException;
import java.net.SocketException;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -33,17 +32,17 @@ public class Application implements Callable<Integer> {
RunMode runMode; RunMode runMode;
static class RunMode { static class RunMode {
@CommandLine.Option(names = { "-c", "--connect" }, required = true, description = "Connect to remote server.", paramLabel = "SERVER") @CommandLine.Option(names = { "-c", "--connect" }, required = true, description = "Connect to remote server.", paramLabel = "HOST")
String remoteServer; String remoteServer;
@CommandLine.Option(names = { "-s", "--server" }, required = true, description = "Run server and wait for client.") @CommandLine.Option(names = { "-s", "--server" }, required = true, description = "Run server and wait for client.")
boolean runServer = false; boolean runServer = false;
} }
@CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "SIZE", description = "Packet size in bytes [default: ${DEFAULT-VALUE}].") @CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "NUM", description = "Packet size in bytes [default: ${DEFAULT-VALUE}].", converter = SuffixConverter.class)
int packetSize = Datagram.DEFAULT_LENGTH; int packetSize = Payload.DEFAULT_LENGTH;
@CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send [default: ${DEFAULT-VALUE}].") @CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send [default: ${DEFAULT-VALUE}].", converter = SuffixConverter.class)
int packetCount = 150_000; int packetCount = 150_000;
@CommandLine.Option(names = { "-p", "--port" }, paramLabel = "PORT", description = "Network port [default: ${DEFAULT-VALUE}].") @CommandLine.Option(names = { "-p", "--port" }, paramLabel = "PORT", description = "Network port [default: ${DEFAULT-VALUE}].")
@ -75,15 +74,20 @@ public class Application implements Callable<Integer> {
private void runClient(String remoteHost) throws InterruptedException, IOException { private void runClient(String remoteHost) throws InterruptedException, IOException {
if(packetSize < Payload.MIN_LENGTH) {
packetSize = Payload.MIN_LENGTH;
}
if(useUdp) { if(useUdp) {
if(packetSize > Datagram.MAX_UDP_LENGTH) { if(packetSize > Payload.MAX_UDP_LENGTH) {
System.err.println("Packet size not allowed for UDP"); packetSize = Payload.MAX_UDP_LENGTH;
return;
} }
UdpClient udpClient = new UdpClient(remoteHost, port, packetCount, packetSize); UdpClient udpClient = new UdpClient(remoteHost, port, packetSize, packetCount, 0);
udpClient.start(); udpClient.start();
} else { } else {
TcpClient tcpClient = new TcpClient(remoteHost, port, packetCount, packetSize); TcpClient tcpClient = new TcpClient(remoteHost, port, packetSize, packetCount, 0);
tcpClient.start(); tcpClient.start();
} }
} }

View file

@ -1,4 +0,0 @@
package biz.nellemann.jnetperf;
public interface Client {
}

View file

@ -31,8 +31,9 @@ import java.util.Arrays;
* *
*/ */
public class Datagram { public class Payload {
public final static int MIN_LENGTH = 64;
public final static int MAX_UDP_LENGTH = 65507; public final static int MAX_UDP_LENGTH = 65507;
public final static int DEFAULT_LENGTH = 1432; public final static int DEFAULT_LENGTH = 1432;
public final static int HEADER_LENGTH = 32; public final static int HEADER_LENGTH = 32;
@ -52,13 +53,13 @@ public class Datagram {
* @param length * @param length
* @param currentPkt * @param currentPkt
*/ */
public Datagram(int type, int length, long currentPkt, long maxPkt) { public Payload(int type, int length, long currentPkt, long maxPkt) {
this.type = type; this.type = type;
this.curPkt = currentPkt; this.curPkt = currentPkt;
this.maxPkt = maxPkt; this.maxPkt = maxPkt;
this.length = length; this.length = length;
if (type == DataType.HANDSHAKE.getValue()) { if (type == PayloadType.HANDSHAKE.getValue()) {
data = new byte[DEFAULT_LENGTH - HEADER_LENGTH]; data = new byte[DEFAULT_LENGTH - HEADER_LENGTH];
} else { } else {
data = new byte[length - HEADER_LENGTH]; data = new byte[length - HEADER_LENGTH];
@ -69,7 +70,7 @@ public class Datagram {
* Assemble datagram from byte[] payload * Assemble datagram from byte[] payload
* @param payload * @param payload
*/ */
public Datagram(byte[] payload) { public Payload(byte[] payload) {
this(ByteBuffer.wrap(payload)); this(ByteBuffer.wrap(payload));
} }
@ -78,7 +79,7 @@ public class Datagram {
* Assemble datagram from ByteBuffer payload * Assemble datagram from ByteBuffer payload
* @param payload * @param payload
*/ */
public Datagram(ByteBuffer payload) { public Payload(ByteBuffer payload) {
byte[] id = new byte[8]; byte[] id = new byte[8];
payload.get(id); payload.get(id);

View file

@ -15,13 +15,13 @@
*/ */
package biz.nellemann.jnetperf; package biz.nellemann.jnetperf;
public enum DataType { public enum PayloadType {
HANDSHAKE(1), DATA(2), ACK(4), END(9); HANDSHAKE(1), DATA(2), ACK(4), END(9);
private final int value; private final int value;
private DataType(int value) { private PayloadType(int value) {
this.value = value; this.value = value;
} }

View file

@ -0,0 +1,42 @@
package biz.nellemann.jnetperf;
import picocli.CommandLine;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SuffixConverter implements CommandLine.ITypeConverter<Integer> {
final private Pattern pattern = Pattern.compile("(\\d+)([kmg])?b?", Pattern.CASE_INSENSITIVE);
public Integer convert(String value) {
int bytes = 0;
Matcher matcher = pattern.matcher(value);
if (matcher.find()) {
int number = Integer.parseInt(matcher.group(1));
if(matcher.group(2) != null) { // We got the kilo, mega og giga suffix
String suffix = matcher.group(2);
switch (suffix.toLowerCase(Locale.ROOT)) {
case "k":
bytes = number * 1024;
break;
case "m":
bytes = number * 1024 * 1024;
break;
case "g":
bytes = number * 1024 * 1024 * 1024;
break;
default:
System.err.println("Unknown suffix: " + suffix);
bytes = number;
}
} else {
bytes = number;
}
}
return bytes;
}
}

View file

@ -20,33 +20,35 @@ public class TcpClient {
private final InetAddress address; private final InetAddress address;
private Socket socket; private Socket socket;
private final byte[] inBuffer = new byte[Datagram.DEFAULT_LENGTH]; private final byte[] inBuffer = new byte[Payload.DEFAULT_LENGTH];
private final int packetCount; private final int packetCount;
private final int packetSize; private final int packetSize;
private final int packetTime;
public TcpClient(String hostname, int port, int packets, int size) throws IOException { public TcpClient(String hostname, int port, int size, int maxPackets, int maxTime) throws IOException {
log.info("TcpClient() - target: {}, port: {}", hostname, port); log.info("TcpClient() - target: {}, port: {}", hostname, port);
this.port = port; this.port = port;
this.packetCount = packets;
this.packetSize = size; this.packetSize = size;
this.packetCount = maxPackets;
this.packetTime = maxTime;
address = InetAddress.getByName(hostname); address = InetAddress.getByName(hostname);
statistics = new Statistics(); statistics = new Statistics();
} }
private void send(Datagram datagram) throws IOException { private void send(Payload payload) throws IOException {
out.write(datagram.getPayload()); out.write(payload.getPayload());
statistics.transferPacket(); statistics.transferPacket();
statistics.transferBytes(datagram.getLength()); statistics.transferBytes(payload.getLength());
} }
private Datagram receive() throws IOException { private Payload receive() throws IOException {
in.readFully(inBuffer); in.readFully(inBuffer);
return new Datagram(inBuffer); return new Payload(inBuffer);
} }
@ -66,21 +68,21 @@ public class TcpClient {
long sequence = 0; long sequence = 0;
// Send handshake // Send handshake
Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount); Payload payload = new Payload(PayloadType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount);
send(datagram); send(payload);
datagram = receive(); payload = receive();
if(datagram.getType() != DataType.ACK.getValue()) { if(payload.getType() != PayloadType.ACK.getValue()) {
log.warn("No ACK!"); log.warn("No ACK!");
return; return;
} }
// Data datagrams ... // Data datagrams ...
for(int i = 0; i < packetCount; i++) { for(int i = 0; i < packetCount; i++) {
datagram = new Datagram(DataType.DATA.getValue(), packetSize, sequence++, packetCount); payload = new Payload(PayloadType.DATA.getValue(), packetSize, sequence++, packetCount);
send(datagram); send(payload);
datagram = receive(); payload = receive();
if(datagram.getType() != DataType.ACK.getValue()) { if(payload.getType() != PayloadType.ACK.getValue()) {
log.warn("No ACK!"); log.warn("No ACK!");
} }
statistics.tick(); statistics.tick();
@ -88,14 +90,13 @@ public class TcpClient {
// End datagram // End datagram
//Thread.sleep(100); //Thread.sleep(100);
datagram = new Datagram(DataType.END.getValue(), packetSize, sequence++, packetCount); payload = new Payload(PayloadType.END.getValue(), packetSize, sequence++, packetCount);
send(datagram); send(payload);
System.out.println("Sending END datagram");
// TODO: Wait for ACK // TODO: Wait for ACK
datagram = receive(); payload = receive();
statistics.ack(); statistics.ack();
if(datagram.getType() != DataType.ACK.getValue()) { if(payload.getType() != PayloadType.ACK.getValue()) {
log.warn("No ACK!"); log.warn("No ACK!");
return; return;
} }

View file

@ -7,7 +7,6 @@ import java.io.*;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.nio.ByteBuffer;
public class TcpServer extends Thread { public class TcpServer extends Thread {
@ -33,7 +32,7 @@ public class TcpServer extends Thread {
try { try {
while (running) { while (running) {
inBuffer = new byte[Datagram.DEFAULT_LENGTH]; inBuffer = new byte[Payload.DEFAULT_LENGTH];
session(); session();
} }
socket.close(); socket.close();
@ -58,28 +57,28 @@ public class TcpServer extends Thread {
while (running) { while (running) {
Datagram datagram = receive(); Payload payload = receive();
statistics.transferPacket(); statistics.transferPacket();
statistics.transferBytes(datagram.getLength()); statistics.transferBytes(payload.getLength());
if(datagram.getType() == DataType.HANDSHAKE.getValue()) { if(payload.getType() == PayloadType.HANDSHAKE.getValue()) {
log.info("Handshake from ... {}", address); log.info("Handshake from ... {}", address);
// Setup to receive larger datagrams // Setup to receive larger datagrams
inBuffer = new byte[datagram.getLength()]; inBuffer = new byte[payload.getLength()];
statistics.reset(); statistics.reset();
} }
if(datagram.getType() == DataType.END.getValue()) { if(payload.getType() == PayloadType.END.getValue()) {
ackEnd = true; ackEnd = true;
} }
// Send ACK // Send ACK
Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), Datagram.DEFAULT_LENGTH, datagram.getCurPkt(), 1); Payload responsePayload = new Payload(PayloadType.ACK.getValue(), Payload.DEFAULT_LENGTH, payload.getCurPkt(), 1);
out.write(responseDatagram.getPayload()); out.write(responsePayload.getPayload());
statistics.ack(); statistics.ack();
statistics.tick(); statistics.tick();
if(ackEnd && statistics.getPacketsTransferredTotal() > datagram.getMaxPkt()) { if(ackEnd) {
running = false; running = false;
statistics.printAverage(); statistics.printAverage();
statistics.printSummary(); statistics.printSummary();
@ -94,10 +93,10 @@ public class TcpServer extends Thread {
} }
private Datagram receive() throws IOException { private Payload receive() throws IOException {
in.readFully(inBuffer); in.readFully(inBuffer);
Datagram datagram = new Datagram(inBuffer); Payload payload = new Payload(inBuffer);
return datagram; return payload;
} }
} }

View file

@ -35,34 +35,36 @@ public class UdpClient {
private final InetAddress address; private final InetAddress address;
private final DatagramSocket socket; private final DatagramSocket socket;
private final byte[] inBuffer = new byte[Datagram.DEFAULT_LENGTH]; private final byte[] inBuffer = new byte[Payload.DEFAULT_LENGTH];
private final int packetCount; private final int packetCount;
private final int packetSize; private final int packetSize;
private final int packeTime;
public UdpClient(String hostname, int port, int packets, int size) throws UnknownHostException, SocketException { public UdpClient(String hostname, int port, int size, int maxPackets, int maxTime) throws UnknownHostException, SocketException {
log.info("UdpClient() - target: {}, port: {}", hostname, port); log.info("UdpClient() - target: {}, port: {}", hostname, port);
this.port = port; this.port = port;
this.packetCount = packets;
this.packetSize = size; this.packetSize = size;
this.packetCount = maxPackets;
this.packeTime = maxTime;
socket = new DatagramSocket(); socket = new DatagramSocket();
address = InetAddress.getByName(hostname); address = InetAddress.getByName(hostname);
statistics = new Statistics(); statistics = new Statistics();
} }
private void send(Datagram datagram) throws IOException { private void send(Payload payload) throws IOException {
DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getLength(), address, port); DatagramPacket packet = new DatagramPacket(payload.getPayload(), payload.getLength(), address, port);
socket.send(packet); socket.send(packet);
statistics.transferPacket(); statistics.transferPacket();
statistics.transferBytes(datagram.getLength()); statistics.transferBytes(payload.getLength());
} }
private Datagram receive() throws IOException { private Payload receive() throws IOException {
DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length); DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length);
socket.receive(packet); socket.receive(packet);
return new Datagram(inBuffer); return new Payload(inBuffer);
} }
@ -76,21 +78,21 @@ public class UdpClient {
long sequence = 0; long sequence = 0;
// Send handshake // Send handshake
Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), Datagram.DEFAULT_LENGTH, sequence++, packetCount); Payload payload = new Payload(PayloadType.HANDSHAKE.getValue(), Payload.DEFAULT_LENGTH, sequence++, packetCount);
send(datagram); send(payload);
datagram = receive(); payload = receive();
if(datagram.getType() != DataType.ACK.getValue()) { if(payload.getType() != PayloadType.ACK.getValue()) {
log.warn("No ACK!"); log.warn("No ACK!");
return; return;
} }
// Data datagrams ... // Data datagrams ...
for(int i = 0; i < packetCount; i++) { for(int i = 0; i < packetCount; i++) {
datagram = new Datagram(DataType.DATA.getValue(), packetSize, sequence++, packetCount); payload = new Payload(PayloadType.DATA.getValue(), packetSize, sequence++, packetCount);
send(datagram); send(payload);
datagram = receive(); payload = receive();
if(datagram.getType() != DataType.ACK.getValue()) { if(payload.getType() != PayloadType.ACK.getValue()) {
log.warn("No ACK!"); log.warn("No ACK!");
} }
statistics.tick(); statistics.tick();
@ -98,13 +100,13 @@ public class UdpClient {
// End datagram // End datagram
//Thread.sleep(100); //Thread.sleep(100);
datagram = new Datagram(DataType.END.getValue(), packetSize, sequence++, packetCount); payload = new Payload(PayloadType.END.getValue(), packetSize, sequence++, packetCount);
send(datagram); send(payload);
// TODO: Wait for ACK // TODO: Wait for ACK
datagram = receive(); payload = receive();
statistics.ack(); statistics.ack();
if(datagram.getType() != DataType.ACK.getValue()) { if(payload.getType() != PayloadType.ACK.getValue()) {
log.warn("No ACK!"); log.warn("No ACK!");
return; return;
} }

View file

@ -43,7 +43,7 @@ public class UdpServer extends Thread {
try { try {
while (running) { while (running) {
inBuffer = new byte[Datagram.DEFAULT_LENGTH]; inBuffer = new byte[Payload.DEFAULT_LENGTH];
session(); session();
} }
socket.close(); socket.close();
@ -68,29 +68,29 @@ public class UdpServer extends Thread {
InetAddress address = packet.getAddress(); InetAddress address = packet.getAddress();
int port = packet.getPort(); int port = packet.getPort();
Datagram datagram = new Datagram(packet.getData()); Payload payload = new Payload(packet.getData());
statistics.transferPacket(); statistics.transferPacket();
statistics.transferBytes(datagram.getLength()); statistics.transferBytes(payload.getLength());
if(datagram.getType() == DataType.HANDSHAKE.getValue()) { if(payload.getType() == PayloadType.HANDSHAKE.getValue()) {
log.info("Handshake from ... {}", address); log.info("Handshake from ... {}", address);
// Setup to receive larger datagrams // Setup to receive larger datagrams
inBuffer = new byte[datagram.getLength()]; inBuffer = new byte[payload.getLength()];
statistics.reset(); statistics.reset();
} }
if(datagram.getType() == DataType.END.getValue()) { if(payload.getType() == PayloadType.END.getValue()) {
ackEnd = true; ackEnd = true;
} }
// Send ACK // Send ACK
Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), Datagram.DEFAULT_LENGTH, datagram.getCurPkt(), 1); Payload responsePayload = new Payload(PayloadType.ACK.getValue(), Payload.DEFAULT_LENGTH, payload.getCurPkt(), 1);
packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port); packet = new DatagramPacket(responsePayload.getPayload(), responsePayload.getLength(), address, port);
socket.send(packet); socket.send(packet);
statistics.ack(); statistics.ack();
statistics.tick(); statistics.tick();
if(ackEnd && statistics.getPacketsTransferredTotal() > datagram.getMaxPkt()) { if(ackEnd) {
running = false; running = false;
statistics.printAverage(); statistics.printAverage();
statistics.printSummary(); statistics.printSummary();

View file

@ -28,7 +28,9 @@ class VersionProvider implements CommandLine.IVersionProvider {
Manifest manifest = new Manifest(getClass().getResourceAsStream("/META-INF/MANIFEST.MF")); Manifest manifest = new Manifest(getClass().getResourceAsStream("/META-INF/MANIFEST.MF"));
Attributes attrs = manifest.getMainAttributes(); Attributes attrs = manifest.getMainAttributes();
return new String[] { "${COMMAND-FULL-NAME} " + attrs.getValue("Build-Version") }; return new String[] {
"${COMMAND-FULL-NAME} " + attrs.getValue("Build-Version") + " (on ${os.name} ${os.version} ${os.arch})",
"JVM: ${java.version} (${java.vendor} ${java.vm.name} ${java.vm.version})" };
} }
} }