Suppoet for TCP #1

Merged
nellemann merged 3 commits from tcp into main 2023-07-14 05:42:12 +00:00
8 changed files with 286 additions and 74 deletions
Showing only changes of commit 87c8c1f56e - Show all commits

View file

@ -12,14 +12,15 @@ You need Java (JRE) version 8 or later to run jnetperf.
- Run **/opt/jnetperf/bin/jperf**, if installed from package, or as **java -jar /path/to/jnetperf.jar** - Run **/opt/jnetperf/bin/jperf**, if installed from package, or as **java -jar /path/to/jnetperf.jar**
```shell ```shell
Usage: jnetperf [-hV] [-l=SIZE] [-n=NUM] [-p=PORT] (-c=SERVER | -s) Usage: jnetperf [-huV] [-l=SIZE] [-n=NUM] [-p=PORT] (-c=SERVER | -s)
For more information visit https://git.data.coop/nellemann/jnetperf For more information visit https://git.data.coop/nellemann/jnetperf
-c, --connect=SERVER Connect to remote server. -c, --connect=SERVER Connect to remote server.
-h, --help Show this help message and exit. -h, --help Show this help message and exit.
-l, --pkt-len=SIZE Datagram size in bytes, max 65507 [default: 65507]. -l, --pkt-len=SIZE Packet size in bytes [default: 1432].
-n, --pkt-num=NUM Number of packets to send [default: 150000]. -n, --pkt-num=NUM Number of packets to send [default: 150000].
-p, --port=PORT Network port [default: 4445]. -p, --port=PORT Network port [default: 4445].
-s, --server Run server and wait for client. -s, --server Run server and wait for client.
-u, --udp Use UDP network protocol [default: false].
-V, --version Print version information and exit. -V, --version Print version information and exit.
``` ```

View file

@ -40,8 +40,8 @@ public class Application implements Callable<Integer> {
boolean runServer = false; boolean runServer = false;
} }
@CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "SIZE", description = "Datagram size in bytes, max 65507 [default: ${DEFAULT-VALUE}].") @CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "SIZE", description = "Packet size in bytes [default: ${DEFAULT-VALUE}].")
int packetSize = 65507; // Min: 256 Max: 65507 int packetSize = Datagram.DEFAULT_LENGTH;
@CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send [default: ${DEFAULT-VALUE}].") @CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send [default: ${DEFAULT-VALUE}].")
int packetCount = 150_000; int packetCount = 150_000;
@ -49,6 +49,9 @@ public class Application implements Callable<Integer> {
@CommandLine.Option(names = { "-p", "--port" }, paramLabel = "PORT", description = "Network port [default: ${DEFAULT-VALUE}].") @CommandLine.Option(names = { "-p", "--port" }, paramLabel = "PORT", description = "Network port [default: ${DEFAULT-VALUE}].")
int port = 4445; int port = 4445;
@CommandLine.Option(names = { "-u", "--udp" }, description = "Use UDP network protocol [default: ${DEFAULT-VALUE}].")
boolean useUdp = false;
@Override @Override
@ -72,15 +75,30 @@ public class Application implements Callable<Integer> {
private void runClient(String remoteHost) throws InterruptedException, IOException { private void runClient(String remoteHost) throws InterruptedException, IOException {
if(useUdp) {
if(packetSize > Datagram.MAX_UDP_LENGTH) {
System.err.println("Packet size not allowed for UDP");
return;
}
UdpClient udpClient = new UdpClient(remoteHost, port, packetCount, packetSize); UdpClient udpClient = new UdpClient(remoteHost, port, packetCount, packetSize);
udpClient.start(); udpClient.start();
} else {
TcpClient tcpClient = new TcpClient(remoteHost, port, packetCount, packetSize);
tcpClient.start();
}
} }
private void runServer() throws SocketException, InterruptedException { private void runServer() throws IOException, InterruptedException {
if(useUdp) {
UdpServer udpServer = new UdpServer(port); UdpServer udpServer = new UdpServer(port);
udpServer.start(); udpServer.start();
udpServer.join(); udpServer.join();
} else {
TcpServer tcpServer = new TcpServer(port);
tcpServer.start();
tcpServer.join();
}
} }
} }

View file

@ -0,0 +1,4 @@
package biz.nellemann.jnetperf;
public interface Client {
}

View file

@ -15,13 +15,10 @@
*/ */
package biz.nellemann.jnetperf; package biz.nellemann.jnetperf;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* *
@ -36,14 +33,14 @@ import org.slf4j.LoggerFactory;
public class Datagram { public class Datagram {
final Logger log = LoggerFactory.getLogger(Datagram.class); public final static int MAX_UDP_LENGTH = 65507;
public final static int DEFAULT_LENGTH = 1432;
public final static int HEADER_LENGTH = 32;
private final int HEADER_LENGTH = 32;
private final byte[] MAGIC_ID = "jPerfTok".getBytes(StandardCharsets.UTF_8); // Must be 8-bytes private final byte[] MAGIC_ID = "jPerfTok".getBytes(StandardCharsets.UTF_8); // Must be 8-bytes
private final int type; private final int type;
private final int length; private final int length;
private final int realLength;
private final long curPkt; private final long curPkt;
private final long maxPkt; private final long maxPkt;
private final byte[] data; private final byte[] data;
@ -56,70 +53,58 @@ public class Datagram {
* @param currentPkt * @param currentPkt
*/ */
public Datagram(int type, int length, long currentPkt, long maxPkt) { public Datagram(int type, int length, long currentPkt, long maxPkt) {
log.debug("Datagram() - of type: {}, length: {}, sequence: {}", type, length, currentPkt, maxPkt);
this.type = type; this.type = type;
this.length = length;
this.curPkt = currentPkt; this.curPkt = currentPkt;
this.maxPkt = maxPkt; this.maxPkt = maxPkt;
this.length = length;
if(type == DataType.DATA.getValue()) { if (type == DataType.HANDSHAKE.getValue()) {
realLength = length; data = new byte[DEFAULT_LENGTH - HEADER_LENGTH];
data = new byte[length - HEADER_LENGTH];
} else { } else {
realLength = HEADER_LENGTH * 2; data = new byte[length - HEADER_LENGTH];
data = new byte[HEADER_LENGTH * 2]; }
} }
//random.nextBytes(data); /**
* Assemble datagram from byte[] payload
* @param payload
*/
public Datagram(byte[] payload) {
this(ByteBuffer.wrap(payload));
} }
/** /**
* Assemble datagram from payload * Assemble datagram from ByteBuffer payload
* @param payload * @param payload
*/ */
public Datagram(byte[] payload) throws IOException { public Datagram(ByteBuffer payload) {
log.debug("Datagram() magic ID is: {} bytes long and contains: {}", MAGIC_ID.length, MAGIC_ID.toString());
ByteBuffer buffer = ByteBuffer.wrap(payload);
byte[] id = new byte[8]; byte[] id = new byte[8];
buffer.get(id); payload.get(id);
if(!Arrays.equals(id, MAGIC_ID)) { if(!Arrays.equals(id, MAGIC_ID)) {
log.warn("Datagram() - magic ID does not match!"); System.out.println(Arrays.toString(id));
throw new IOException(); System.out.println(Arrays.toString(MAGIC_ID));
throw new RuntimeException("Datagram magic ID does not match: " + MAGIC_ID);
} }
// Order is importent when assembling header fields like this // Order is importent when assembling header fields like this
type = buffer.getInt(); type = payload.getInt();
length = buffer.getInt(); length = payload.getInt();
curPkt = buffer.getLong(); curPkt = payload.getLong();
maxPkt = buffer.getLong(); maxPkt = payload.getLong();
realLength = length; data = new byte[payload.limit() - payload.position()];
if(type == DataType.DATA.getValue()) { payload.get(data);
data = new byte[length - HEADER_LENGTH];
buffer.get(data, 0, data.length);
} else {
data = new byte[HEADER_LENGTH * 2];
} }
}
public int getLength() { public int getLength() {
return length; return length;
} }
public int getRealLength() {
return realLength;
}
public byte[] getPayload() throws IOException { public byte[] getPayload() {
log.debug("getPayload() - with type: {}, length: {}, sequence: {}", type, length, curPkt);
ByteBuffer buffer = ByteBuffer.allocate(data.length + HEADER_LENGTH); ByteBuffer buffer = ByteBuffer.allocate(data.length + HEADER_LENGTH);
// Order is important // Order is important

View file

@ -0,0 +1,109 @@
package biz.nellemann.jnetperf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.*;
public class TcpClient {
final Logger log = LoggerFactory.getLogger(TcpClient.class);
private Statistics statistics;
private DataOutputStream out;
private DataInputStream in;
private final int port;
private final InetAddress address;
private Socket socket;
private final byte[] inBuffer = new byte[Datagram.DEFAULT_LENGTH];
private final int packetCount;
private final int packetSize;
public TcpClient(String hostname, int port, int packets, int size) throws IOException {
log.info("TcpClient() - target: {}, port: {}", hostname, port);
this.port = port;
this.packetCount = packets;
this.packetSize = size;
address = InetAddress.getByName(hostname);
statistics = new Statistics();
}
private void send(Datagram datagram) throws IOException {
out.write(datagram.getPayload());
statistics.transferPacket();
statistics.transferBytes(datagram.getLength());
}
private Datagram receive() throws IOException {
in.readFully(inBuffer);
return new Datagram(inBuffer);
}
private void close() throws IOException {
in.close();
out.close();
socket.close();
}
public void start() throws IOException, InterruptedException {
socket = new Socket(address, port);
in = new DataInputStream(socket.getInputStream());
out = new DataOutputStream(socket.getOutputStream());
long sequence = 0;
// Send handshake
Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount);
send(datagram);
datagram = receive();
if(datagram.getType() != DataType.ACK.getValue()) {
log.warn("No ACK!");
return;
}
// Data datagrams ...
for(int i = 0; i < packetCount; i++) {
datagram = new Datagram(DataType.DATA.getValue(), packetSize, sequence++, packetCount);
send(datagram);
datagram = receive();
if(datagram.getType() != DataType.ACK.getValue()) {
log.warn("No ACK!");
}
statistics.tick();
}
// End datagram
//Thread.sleep(100);
datagram = new Datagram(DataType.END.getValue(), packetSize, sequence++, packetCount);
send(datagram);
System.out.println("Sending END datagram");
// TODO: Wait for ACK
datagram = receive();
statistics.ack();
if(datagram.getType() != DataType.ACK.getValue()) {
log.warn("No ACK!");
return;
}
Thread.sleep(100);
close();
statistics.printAverage();
statistics.printSummary();
}
}

View file

@ -0,0 +1,103 @@
package biz.nellemann.jnetperf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
public class TcpServer extends Thread {
final Logger log = LoggerFactory.getLogger(TcpServer.class);
private ServerSocket socket;
private DataInputStream in;
private DataOutputStream out;
private byte[] inBuffer;
public TcpServer(int port) throws IOException {
log.info("TcpServer()");
socket = new ServerSocket(port);
socket.setSoTimeout(10000);
}
public void run() {
boolean running = true;
try {
while (running) {
inBuffer = new byte[Datagram.DEFAULT_LENGTH];
session();
}
socket.close();
} catch(IOException e) {
log.error(e.getMessage());
}
}
public void session() throws IOException {
Statistics statistics = new Statistics();
boolean running = true;
boolean ackEnd = false;
Socket server = socket.accept();
InetAddress address = socket.getInetAddress();
in = new DataInputStream(server.getInputStream());
out = new DataOutputStream(server.getOutputStream());
while (running) {
Datagram datagram = receive();
statistics.transferPacket();
statistics.transferBytes(datagram.getLength());
if(datagram.getType() == DataType.HANDSHAKE.getValue()) {
log.info("Handshake from ... {}", address);
// Setup to receive larger datagrams
inBuffer = new byte[datagram.getLength()];
statistics.reset();
}
if(datagram.getType() == DataType.END.getValue()) {
ackEnd = true;
}
// Send ACK
Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), Datagram.DEFAULT_LENGTH, datagram.getCurPkt(), 1);
out.write(responseDatagram.getPayload());
statistics.ack();
statistics.tick();
if(ackEnd && statistics.getPacketsTransferredTotal() > datagram.getMaxPkt()) {
running = false;
statistics.printAverage();
statistics.printSummary();
}
}
in.close();
out.close();
server.close();
}
private Datagram receive() throws IOException {
in.readFully(inBuffer);
Datagram datagram = new Datagram(inBuffer);
return datagram;
}
}

View file

@ -35,7 +35,7 @@ public class UdpClient {
private final InetAddress address; private final InetAddress address;
private final DatagramSocket socket; private final DatagramSocket socket;
private final byte[] buf = new byte[256]; private final byte[] inBuffer = new byte[Datagram.DEFAULT_LENGTH];
private final int packetCount; private final int packetCount;
private final int packetSize; private final int packetSize;
@ -53,16 +53,16 @@ public class UdpClient {
} }
private void send(Datagram datagram) throws IOException { private void send(Datagram datagram) throws IOException {
DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getRealLength(), address, port); DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getLength(), address, port);
socket.send(packet); socket.send(packet);
statistics.transferPacket(); statistics.transferPacket();
statistics.transferBytes(datagram.getRealLength()); statistics.transferBytes(datagram.getLength());
} }
private Datagram receive() throws IOException { private Datagram receive() throws IOException {
DatagramPacket packet = new DatagramPacket(buf, buf.length); DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length);
socket.receive(packet); socket.receive(packet);
return new Datagram(buf); return new Datagram(inBuffer);
} }
@ -76,7 +76,7 @@ public class UdpClient {
long sequence = 0; long sequence = 0;
// Send handshake // Send handshake
Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount); Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), Datagram.DEFAULT_LENGTH, sequence++, packetCount);
send(datagram); send(datagram);
datagram = receive(); datagram = receive();

View file

@ -29,7 +29,7 @@ public class UdpServer extends Thread {
final Logger log = LoggerFactory.getLogger(UdpServer.class); final Logger log = LoggerFactory.getLogger(UdpServer.class);
private final DatagramSocket socket; private final DatagramSocket socket;
private byte[] buf = new byte[256]; private byte[] inBuffer;
public UdpServer(int port) throws SocketException { public UdpServer(int port) throws SocketException {
@ -42,11 +42,10 @@ public class UdpServer extends Thread {
boolean running = true; boolean running = true;
try { try {
while (running) { while (running) {
inBuffer = new byte[Datagram.DEFAULT_LENGTH];
session(); session();
} }
socket.close(); socket.close();
} catch(IOException e) { } catch(IOException e) {
log.error(e.getMessage()); log.error(e.getMessage());
@ -63,35 +62,29 @@ public class UdpServer extends Thread {
while (running) { while (running) {
DatagramPacket packet = new DatagramPacket(buf, buf.length); DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length);
socket.receive(packet); socket.receive(packet);
InetAddress address = packet.getAddress(); InetAddress address = packet.getAddress();
int port = packet.getPort(); int port = packet.getPort();
Datagram datagram = new Datagram(buf); Datagram datagram = new Datagram(packet.getData());
statistics.transferPacket(); statistics.transferPacket();
statistics.transferBytes(datagram.getRealLength()); statistics.transferBytes(datagram.getLength());
if(datagram.getType() == DataType.HANDSHAKE.getValue()) { if(datagram.getType() == DataType.HANDSHAKE.getValue()) {
log.info("Handshake from ... {}, length: {}", address, datagram.getLength()); log.info("Handshake from ... {}", address);
// Setup to receive larger datagrams // Setup to receive larger datagrams
buf = new byte[datagram.getLength()]; inBuffer = new byte[datagram.getLength()];
statistics.reset(); statistics.reset();
} }
/*
if(datagram.getType() == DataType.DATA.getValue()) {
bytesReceived += datagram.getRealLength();
bytesReceivedTotal += datagram.getRealLength();
}*/
if(datagram.getType() == DataType.END.getValue()) { if(datagram.getType() == DataType.END.getValue()) {
ackEnd = true; ackEnd = true;
} }
// Send ACK // Send ACK
Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), 32, datagram.getCurPkt(), 1); Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), Datagram.DEFAULT_LENGTH, datagram.getCurPkt(), 1);
packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port); packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port);
socket.send(packet); socket.send(packet);
statistics.ack(); statistics.ack();
@ -103,7 +96,6 @@ public class UdpServer extends Thread {
statistics.printSummary(); statistics.printSummary();
} }
} }