Compare commits

...

15 Commits
v0.0.5 ... main

Author SHA1 Message Date
Mark Nellemann 49ca2c186c Update README.md 2024-05-17 07:53:49 +00:00
Mark Nellemann 3deb336a46 Merge branch 'time'
continuous-integration/drone/push Build is passing Details
2023-07-18 15:24:45 +02:00
Mark Nellemann b757541053 Merge branch 'main' of git.data.coop:nellemann/jnetperf 2023-07-18 15:22:19 +02:00
Mark Nellemann 5b7fec6033 Add some tests of conversion and client-server comm.
continuous-integration/drone/push Build is passing Details
2023-07-18 15:21:52 +02:00
Mark Nellemann df69b2e65c Merge pull request 'New option to specify how long test should run.' (#2) from time into main
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/tag Build is passing Details
Reviewed-on: #2
2023-07-14 09:19:21 +00:00
Mark Nellemann 8d6c7f8140 Remove print statement.
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details
2023-07-14 11:18:44 +02:00
Mark Nellemann df951d6808 Merge branch 'main' into time
# Conflicts:
#	gradle.properties
#	src/main/java/biz/nellemann/jnetperf/Application.java
#	src/main/java/biz/nellemann/jnetperf/TcpServer.java
2023-07-14 11:18:22 +02:00
Mark Nellemann 0ca08be582 New option to specify how long test should run.
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is failing Details
2023-07-14 11:15:39 +02:00
Mark Nellemann 77b7984517 Wait indefinitely on server for connections.
continuous-integration/drone/push Build is passing Details
2023-07-14 08:15:15 +02:00
Mark Nellemann 9f67f98fec Merge branch 'main' of git.data.coop:nellemann/jnetperf 2023-07-14 07:42:40 +02:00
Mark Nellemann 660996a133 Merge pull request 'Suppoet for TCP' (#1) from tcp into main
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/tag Build is passing Details
Reviewed-on: #1
2023-07-14 05:42:11 +00:00
Mark Nellemann 7f4a5d28ac Support k,m,g suffix and some cleanup.
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details
2023-07-14 07:40:34 +02:00
Mark Nellemann 87c8c1f56e Initial support for TCP packets.
continuous-integration/drone/push Build is passing Details
2023-07-13 21:59:38 +02:00
Mark Nellemann 0062763439 Merge branch 'main' of git.data.coop:nellemann/jnetperf 2023-07-13 11:29:24 +02:00
Mark Nellemann 5f521322c7 Fix download link in README
continuous-integration/drone/push Build is passing Details
2023-07-05 16:06:40 +02:00
18 changed files with 714 additions and 202 deletions

View File

@ -1,55 +1,3 @@
# jnetperf
# Repository moved
Small utility to measure (single threaded) network performance between two hosts.
## Requirements
You need Java (JRE) version 8 or later to run jnetperf.
## Usage Instructions
- Install the jnetperf package (*.deb*, *.rpm* or *.jar*) from [downloads](https://bitbucket.org/mnellemann/jnetperf/downloads/) or compile from source.
- Run **/opt/jnetperf/bin/jperf**, if installed from package, or as **java -jar /path/to/jnetperf.jar**
```shell
Usage: jnetperf [-hV] [-l=SIZE] [-n=NUM] [-p=PORT] (-c=SERVER | -s)
For more information visit https://git.data.coop/nellemann/jnetperf
-c, --connect=SERVER Connect to remote server.
-h, --help Show this help message and exit.
-l, --pkt-len=SIZE Datagram size in bytes, max 65507 [default: 65507].
-n, --pkt-num=NUM Number of packets to send [default: 150000].
-p, --port=PORT Network port [default: 4445].
-s, --server Run server and wait for client.
-V, --version Print version information and exit.
```
## Examples
On *host A* run jnetperf as a server waiting for a connection from a client:
```shell
java -jar jnetperf-x.y.z-all.jar -s
```
On *host B* run jnetperf as a client connecting to the server and sending data:
```shell
java -jar jnetperf-x.y.z-all.jar -c server-ip
```
## Development Information
You need Java (JDK) version 8 or later to build jnetperf.
### Build & Test
Use the gradle build tool, which will download all required dependencies:
```shell
./gradlew clean build
```
Please visit [github.com/mnellemann/jnetperf](https://github.com/mnellemann/jnetperf)

View File

@ -1,3 +1,3 @@
projectId = jnetperf
projectGroup = biz.nellemann.jnetperf
projectVersion = 0.0.5
projectVersion = 0.0.8

View File

@ -20,7 +20,7 @@ import picocli.CommandLine;
import picocli.CommandLine.Command;
import java.io.IOException;
import java.net.SocketException;
import java.util.Locale;
import java.util.concurrent.Callable;
@ -33,31 +33,44 @@ public class Application implements Callable<Integer> {
RunMode runMode;
static class RunMode {
@CommandLine.Option(names = { "-c", "--connect" }, required = true, description = "Connect to remote server.", paramLabel = "SERVER")
@CommandLine.Option(names = { "-c", "--connect" }, required = true, description = "Connect to remote server (client).", paramLabel = "SRV")
String remoteServer;
@CommandLine.Option(names = { "-s", "--server" }, required = true, description = "Run server and wait for client.")
@CommandLine.Option(names = { "-s", "--server" }, required = true, description = "Run server and wait for client (server).")
boolean runServer = false;
}
@CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "SIZE", description = "Datagram size in bytes, max 65507 [default: ${DEFAULT-VALUE}].")
int packetSize = 65507; // Min: 256 Max: 65507
@CommandLine.Option(names = { "-l", "--pkt-len" }, paramLabel = "NUM", description = "Packet size in bytes (client) [default: ${DEFAULT-VALUE}].", converter = UnitSuffixConverter.class)
int packetSize = Payload.DEFAULT_LENGTH;
@CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send [default: ${DEFAULT-VALUE}].")
@CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "Number of packets to send (client) [default: ${DEFAULT-VALUE}].", converter = UnitSuffixConverter.class)
int packetCount = 150_000;
@CommandLine.Option(names = { "-p", "--port" }, paramLabel = "PORT", description = "Network port [default: ${DEFAULT-VALUE}].")
@CommandLine.Option(names = { "-t", "--runtime" }, paramLabel = "SEC", description = "Time to run, precedes pkt-num (client) [default: ${DEFAULT-VALUE}].", converter = TimeSuffixConverter.class)
int timeInSeconds = 0;
@CommandLine.Option(names = { "-p", "--port" }, paramLabel = "NUM", description = "Network port [default: ${DEFAULT-VALUE}].")
int port = 4445;
@CommandLine.Option(names = { "-u", "--udp" }, description = "Use UDP network protocol [default: ${DEFAULT-VALUE}].")
boolean useUdp = false;
@Override
public Integer call() throws Exception {
public Integer call() {
if(runMode.runServer) {
runServer();
} else if(runMode.remoteServer != null) {
runClient(runMode.remoteServer);
// Set locale to en_US to ensure correct/identical number formatting
Locale.setDefault(new Locale("en", "US"));
try {
if (runMode.runServer) {
runServer();
} else if (runMode.remoteServer != null) {
runClient(runMode.remoteServer);
}
} catch (IOException | InterruptedException e) {
System.err.println(e.getMessage());
}
return 0;
@ -72,15 +85,35 @@ public class Application implements Callable<Integer> {
private void runClient(String remoteHost) throws InterruptedException, IOException {
UdpClient udpClient = new UdpClient(remoteHost, port, packetCount, packetSize);
udpClient.start();
if(packetSize < Payload.MIN_LENGTH) {
packetSize = Payload.MIN_LENGTH;
}
if(useUdp) {
if(packetSize > Payload.MAX_UDP_LENGTH) {
packetSize = Payload.MAX_UDP_LENGTH;
}
UdpClient udpClient = new UdpClient(remoteHost, port, packetSize, packetCount, timeInSeconds);
udpClient.start();
} else {
TcpClient tcpClient = new TcpClient(remoteHost, port, packetSize, packetCount, timeInSeconds);
tcpClient.start();
}
}
private void runServer() throws SocketException, InterruptedException {
UdpServer udpServer = new UdpServer(port);
udpServer.start();
udpServer.join();
private void runServer() throws IOException, InterruptedException {
if(useUdp) {
UdpServer udpServer = new UdpServer(port);
udpServer.start();
udpServer.join();
} else {
TcpServer tcpServer = new TcpServer(port);
tcpServer.start();
tcpServer.join();
}
}
}

View File

@ -15,13 +15,10 @@
*/
package biz.nellemann.jnetperf;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
@ -34,16 +31,17 @@ import org.slf4j.LoggerFactory;
*
*/
public class Datagram {
public class Payload {
final Logger log = LoggerFactory.getLogger(Datagram.class);
public final static int MIN_LENGTH = 64;
public final static int MAX_UDP_LENGTH = 64000;
public final static int DEFAULT_LENGTH = 1432;
public final static int HEADER_LENGTH = 32;
private final int HEADER_LENGTH = 32;
private final byte[] MAGIC_ID = "jPerfTok".getBytes(StandardCharsets.UTF_8); // Must be 8-bytes
private final int type;
private final int length;
private final int realLength;
private final long curPkt;
private final long maxPkt;
private final byte[] data;
@ -55,71 +53,59 @@ public class Datagram {
* @param length
* @param currentPkt
*/
public Datagram(int type, int length, long currentPkt, long maxPkt) {
log.debug("Datagram() - of type: {}, length: {}, sequence: {}", type, length, currentPkt, maxPkt);
public Payload(int type, int length, long currentPkt, long maxPkt) {
this.type = type;
this.length = length;
this.curPkt = currentPkt;
this.maxPkt = maxPkt;
this.length = length;
if(type == DataType.DATA.getValue()) {
realLength = length;
data = new byte[length - HEADER_LENGTH];
if (type == PayloadType.HANDSHAKE.getValue()) {
data = new byte[DEFAULT_LENGTH - HEADER_LENGTH];
} else {
realLength = HEADER_LENGTH * 2;
data = new byte[HEADER_LENGTH * 2];
data = new byte[length - HEADER_LENGTH];
}
//random.nextBytes(data);
}
/**
* Assemble datagram from byte[] payload
* @param payload
*/
public Payload(byte[] payload) {
this(ByteBuffer.wrap(payload));
}
/**
* Assemble datagram from payload
* Assemble datagram from ByteBuffer payload
* @param payload
*/
public Datagram(byte[] payload) throws IOException {
public Payload(ByteBuffer payload) {
log.debug("Datagram() magic ID is: {} bytes long and contains: {}", MAGIC_ID.length, MAGIC_ID.toString());
ByteBuffer buffer = ByteBuffer.wrap(payload);
byte[] id = new byte[8];
buffer.get(id);
payload.get(id);
if(!Arrays.equals(id, MAGIC_ID)) {
log.warn("Datagram() - magic ID does not match!");
throw new IOException();
System.out.println(Arrays.toString(id));
System.out.println(Arrays.toString(MAGIC_ID));
throw new RuntimeException("Datagram magic ID does not match: " + MAGIC_ID);
}
// Order is importent when assembling header fields like this
type = buffer.getInt();
length = buffer.getInt();
curPkt = buffer.getLong();
maxPkt = buffer.getLong();
type = payload.getInt();
length = payload.getInt();
curPkt = payload.getLong();
maxPkt = payload.getLong();
realLength = length;
if(type == DataType.DATA.getValue()) {
data = new byte[length - HEADER_LENGTH];
buffer.get(data, 0, data.length);
} else {
data = new byte[HEADER_LENGTH * 2];
}
data = new byte[payload.limit() - payload.position()];
payload.get(data);
}
public int getLength() {
return length;
}
public int getRealLength() {
return realLength;
}
public byte[] getPayload() throws IOException {
public byte[] getPayload() {
log.debug("getPayload() - with type: {}, length: {}, sequence: {}", type, length, curPkt);
ByteBuffer buffer = ByteBuffer.allocate(data.length + HEADER_LENGTH);
// Order is important

View File

@ -15,13 +15,13 @@
*/
package biz.nellemann.jnetperf;
public enum DataType {
public enum PayloadType {
HANDSHAKE(1), DATA(2), ACK(4), END(9);
private final int value;
private DataType(int value) {
private PayloadType(int value) {
this.value = value;
}

View File

@ -25,12 +25,14 @@ public class Statistics {
private final int MAX_TICKS_AVG = 300;
private final int LOG_AVG_MODULO = 30;
private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault());
private long packetsTransferred, packetsTransferredTotal = 0;
private long packetsTransferred;
private long packetsTransferredTotal = 0;
private long bytesTransferred, bytesTransferredTotal = 0;
private long bytesPerSec;
private long packetsPerSec;
private long packetsUnacked = 0;
private int tickItererations = 0;
private int tickIterations = 0;
private int tickTotal = 0;
private final long[] bytesPerSecAvgTmp = new long[MAX_TICKS_AVG];
private final long[] packetsPerSecAvgTmp = new long[MAX_TICKS_AVG];
@ -51,8 +53,8 @@ public class Statistics {
// Because we do this every second ...
bytesPerSec = bytesTransferred;
packetsPerSec = packetsTransferred;
bytesPerSecAvgTmp[tickItererations] = bytesTransferred;
packetsPerSecAvgTmp[tickItererations] = packetsTransferred;
bytesPerSecAvgTmp[tickIterations] = bytesTransferred;
packetsPerSecAvgTmp[tickIterations] = packetsTransferred;
timestamp1 = timestamp2;
printStatus();
@ -60,13 +62,14 @@ public class Statistics {
bytesTransferred = 0;
packetsTransferred = 0;
if(++tickItererations >= MAX_TICKS_AVG) {
tickItererations = 0;
if(++tickIterations >= MAX_TICKS_AVG) {
tickIterations = 0;
}
if(tickItererations % LOG_AVG_MODULO == 0) {
if(tickIterations % LOG_AVG_MODULO == 0) {
printAverage();
}
tickTotal++;
}
@ -76,7 +79,6 @@ public class Statistics {
public void printStatus() {
System.out.printf("%-19s - Status: %10d pkt/s %14d B/s %12d KB/s %8d MB/s\n", formatter.format(Instant.now()), packetsPerSec, bytesPerSec, bytesPerSec/1_000, bytesPerSec/1_000_000);
}
@ -120,6 +122,14 @@ public class Statistics {
return packetsTransferredTotal;
}
public long getBytesTransferredTotal() {
return bytesTransferredTotal;
}
public int getRuntime() {
return tickTotal;
}
private long getAverage(long[] array, long fallback) {
long avg = getAverage(array);
@ -150,5 +160,4 @@ public class Statistics {
return avg;
}
}

View File

@ -0,0 +1,130 @@
package biz.nellemann.jnetperf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class TcpClient {
final Logger log = LoggerFactory.getLogger(TcpClient.class);
private final Statistics statistics;
private DataOutputStream out;
private DataInputStream in;
private final int port;
private final InetAddress address;
private Socket socket;
private final byte[] inBuffer = new byte[Payload.DEFAULT_LENGTH];
private final int packets;
private final int length;
private final int runtime;
public TcpClient(String hostname, int port, int length, int packets, int runtime) throws IOException {
log.info("TcpClient() - target: {}, port: {}", hostname, port);
this.port = port;
this.length = length;
this.packets = packets;
this.runtime = runtime;
address = InetAddress.getByName(hostname);
statistics = new Statistics();
}
private void send(Payload payload) throws IOException {
out.write(payload.getPayload());
statistics.transferPacket();
statistics.transferBytes(payload.getLength());
}
private Payload receive() throws IOException {
in.readFully(inBuffer);
return new Payload(inBuffer);
}
private void close() throws IOException {
in.close();
out.close();
socket.close();
}
public void start() throws IOException, InterruptedException {
AtomicBoolean keepRunning = new AtomicBoolean(true);
Thread shutdownHook = new Thread(() -> {
keepRunning.set(false);
System.out.println("Stopping jnetperf, please wait ...");
});
Runtime.getRuntime().addShutdownHook(shutdownHook);
long sequence = 0;
socket = new Socket(address, port);
in = new DataInputStream(socket.getInputStream());
out = new DataOutputStream(socket.getOutputStream());
// Send handshake
Payload payload = new Payload(PayloadType.HANDSHAKE.getValue(), length, sequence++, packets);
send(payload);
payload = receive();
if(payload.getType() != PayloadType.ACK.getValue()) {
log.warn("No ACK!");
return;
}
// Send data
do {
payload = new Payload(PayloadType.DATA.getValue(), length, sequence++, packets);
send(payload);
payload = receive();
if(payload.getType() != PayloadType.ACK.getValue()) {
log.warn("No ACK!");
}
statistics.tick();
if (sequence > packets) {
System.out.println("Max packets reached");
keepRunning.set(false);;
}
if(runtime > 0 && statistics.getRuntime() > runtime) {
System.out.println("Max runtime reached");
keepRunning.set(false);
}
} while (keepRunning.get());
// Send end
payload = new Payload(PayloadType.END.getValue(), length, sequence++, packets);
send(payload);
payload = receive();
statistics.ack();
if(payload.getType() != PayloadType.ACK.getValue()) {
log.warn("No ACK!");
return;
}
Thread.sleep(100);
close();
statistics.printAverage();
statistics.printSummary();
}
public Statistics getStatistics() {
return statistics;
}
}

View File

@ -0,0 +1,108 @@
package biz.nellemann.jnetperf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
public class TcpServer extends Thread {
final Logger log = LoggerFactory.getLogger(TcpServer.class);
private final int port;
private ServerSocket socket;
private DataInputStream in;
private DataOutputStream out;
private byte[] inBuffer;
private boolean runThread = true;
private boolean runSession = true;
public TcpServer(int port) throws IOException {
log.info("TcpServer()");
this.port = port;
}
public void run() {
try {
while (runThread) {
socket = new ServerSocket(port);
socket.setSoTimeout(0); // Wait indefinitely
inBuffer = new byte[Payload.DEFAULT_LENGTH];
session();
socket.close();
}
} catch(IOException e) {
log.error(e.getMessage());
}
}
public void session() throws IOException {
Statistics statistics = new Statistics();
boolean ackEnd = false;
runSession = true;
Socket server = socket.accept();
InetAddress address = socket.getInetAddress();
in = new DataInputStream(server.getInputStream());
out = new DataOutputStream(server.getOutputStream());
while (runSession) {
Payload payload = receive();
statistics.transferPacket();
statistics.transferBytes(payload.getLength());
if(payload.getType() == PayloadType.HANDSHAKE.getValue()) {
log.info("Handshake from ... {}", address);
// Setup to receive larger datagrams
inBuffer = new byte[payload.getLength()];
statistics.reset();
}
if(payload.getType() == PayloadType.END.getValue()) {
ackEnd = true;
}
// Send ACK
Payload responsePayload = new Payload(PayloadType.ACK.getValue(), Payload.DEFAULT_LENGTH, payload.getCurPkt(), 1);
out.write(responsePayload.getPayload());
statistics.ack();
statistics.tick();
if(ackEnd) {
runSession = false;
statistics.printAverage();
statistics.printSummary();
}
}
in.close();
out.close();
server.close();
}
private Payload receive() throws IOException {
in.readFully(inBuffer);
return new Payload(inBuffer);
}
public void finish() {
runThread = false;
runSession = false;
}
}

View File

@ -0,0 +1,41 @@
package biz.nellemann.jnetperf;
import picocli.CommandLine;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class TimeSuffixConverter implements CommandLine.ITypeConverter<Integer> {
final private Pattern pattern = Pattern.compile("(\\d+)([smh])?", Pattern.CASE_INSENSITIVE);
public Integer convert(String value) {
int seconds = 0;
Matcher matcher = pattern.matcher(value);
if (matcher.find()) {
int number = Integer.parseInt(matcher.group(1));
if(matcher.group(2) != null) { // We got the second, minute or hour suffix
String suffix = matcher.group(2);
switch (suffix.toLowerCase(Locale.ROOT)) {
case "s":
seconds = number;
break;
case "m":
seconds = number * 60;
break;
case "h":
seconds = number * 60 * 60;
break;
default:
throw new IllegalArgumentException("Unknown suffix: " + suffix);
}
} else {
seconds = number;
}
}
return seconds;
}
}

View File

@ -21,6 +21,7 @@ import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,40 +30,42 @@ public class UdpClient {
final Logger log = LoggerFactory.getLogger(UdpClient.class);
private Statistics statistics;
private final Statistics statistics;
private final int port;
private final InetAddress address;
private final DatagramSocket socket;
private final byte[] buf = new byte[256];
private final int packetCount;
private final int packetSize;
private final byte[] inBuffer = new byte[Payload.DEFAULT_LENGTH];
private final int length;
private final int packets;
private final int runtime;
public UdpClient(String hostname, int port, int packets, int size) throws UnknownHostException, SocketException {
public UdpClient(String hostname, int port, int length, int packets, int runtime) throws UnknownHostException, SocketException {
log.info("UdpClient() - target: {}, port: {}", hostname, port);
this.port = port;
this.packetCount = packets;
this.packetSize = size;
this.length = length;
this.packets = packets;
this.runtime = runtime;
socket = new DatagramSocket();
address = InetAddress.getByName(hostname);
statistics = new Statistics();
}
private void send(Datagram datagram) throws IOException {
DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getRealLength(), address, port);
private void send(Payload payload) throws IOException {
DatagramPacket packet = new DatagramPacket(payload.getPayload(), payload.getLength(), address, port);
socket.send(packet);
statistics.transferPacket();
statistics.transferBytes(datagram.getRealLength());
statistics.transferBytes(payload.getLength());
}
private Datagram receive() throws IOException {
DatagramPacket packet = new DatagramPacket(buf, buf.length);
private Payload receive() throws IOException {
DatagramPacket packet = new DatagramPacket(inBuffer, Payload.DEFAULT_LENGTH);
socket.receive(packet);
return new Datagram(buf);
return new Payload(inBuffer);
}
@ -73,38 +76,53 @@ public class UdpClient {
public void start() throws IOException, InterruptedException {
AtomicBoolean keepRunning = new AtomicBoolean(true);
Thread shutdownHook = new Thread(() -> {
keepRunning.set(false);
System.out.println("Stopping jnetperf, please wait ...");
});
Runtime.getRuntime().addShutdownHook(shutdownHook);
long sequence = 0;
// Send handshake
Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount);
send(datagram);
datagram = receive();
if(datagram.getType() != DataType.ACK.getValue()) {
Payload payload = new Payload(PayloadType.HANDSHAKE.getValue(), Payload.DEFAULT_LENGTH, sequence++, packets);
send(payload);
payload = receive();
if(payload.getType() != PayloadType.ACK.getValue()) {
log.warn("No ACK!");
return;
}
// Data datagrams ...
for(int i = 0; i < packetCount; i++) {
datagram = new Datagram(DataType.DATA.getValue(), packetSize, sequence++, packetCount);
send(datagram);
datagram = receive();
if(datagram.getType() != DataType.ACK.getValue()) {
// Send data
do {
payload = new Payload(PayloadType.DATA.getValue(), length, sequence++, packets);
send(payload);
payload = receive();
if(payload.getType() != PayloadType.ACK.getValue()) {
log.warn("No ACK!");
}
statistics.tick();
}
// End datagram
//Thread.sleep(100);
datagram = new Datagram(DataType.END.getValue(), packetSize, sequence++, packetCount);
send(datagram);
if (sequence > packets) {
System.out.println("Max packets reached");
keepRunning.set(false);
}
// TODO: Wait for ACK
datagram = receive();
if(runtime > 0 && statistics.getRuntime() > runtime) {
System.out.println("Max runtime reached");
keepRunning.set(false);
}
} while (keepRunning.get());
// Send end
payload = new Payload(PayloadType.END.getValue(), length, sequence++, packets);
send(payload);
payload = receive();
statistics.ack();
if(datagram.getType() != DataType.ACK.getValue()) {
if(payload.getType() != PayloadType.ACK.getValue()) {
log.warn("No ACK!");
return;
}
@ -115,4 +133,9 @@ public class UdpClient {
statistics.printSummary();
}
public Statistics getStatistics() {
return statistics;
}
}

View File

@ -28,26 +28,28 @@ public class UdpServer extends Thread {
final Logger log = LoggerFactory.getLogger(UdpServer.class);
private final DatagramSocket socket;
private byte[] buf = new byte[256];
private final int port;
private DatagramSocket socket;
private byte[] inBuffer;
private boolean runThread = true;
private boolean runSession = true;
public UdpServer(int port) throws SocketException {
public UdpServer(int port) {
log.info("UdpServer()");
socket = new DatagramSocket(port);
this.port = port;
}
public void run() {
boolean running = true;
try {
while (running) {
while (runThread) {
inBuffer = new byte[Payload.DEFAULT_LENGTH];
socket = new DatagramSocket(port);
session();
socket.close();
}
socket.close();
} catch(IOException e) {
log.error(e.getMessage());
}
@ -58,55 +60,55 @@ public class UdpServer extends Thread {
public void session() throws IOException {
Statistics statistics = new Statistics();
boolean running = true;
boolean ackEnd = false;
runSession = true;
while (running) {
while (runSession) {
DatagramPacket packet = new DatagramPacket(buf, buf.length);
DatagramPacket packet = new DatagramPacket(inBuffer, inBuffer.length);
socket.receive(packet);
InetAddress address = packet.getAddress();
int port = packet.getPort();
Datagram datagram = new Datagram(buf);
Payload payload = new Payload(packet.getData());
statistics.transferPacket();
statistics.transferBytes(datagram.getRealLength());
statistics.transferBytes(payload.getLength());
if(datagram.getType() == DataType.HANDSHAKE.getValue()) {
log.info("Handshake from ... {}, length: {}", address, datagram.getLength());
if(payload.getType() == PayloadType.HANDSHAKE.getValue()) {
log.info("Handshake from ... {}", address);
// Setup to receive larger datagrams
buf = new byte[datagram.getLength()];
inBuffer = new byte[payload.getLength()];
statistics.reset();
}
/*
if(datagram.getType() == DataType.DATA.getValue()) {
bytesReceived += datagram.getRealLength();
bytesReceivedTotal += datagram.getRealLength();
}*/
if(datagram.getType() == DataType.END.getValue()) {
if(payload.getType() == PayloadType.END.getValue()) {
ackEnd = true;
}
// Send ACK
Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), 32, datagram.getCurPkt(), 1);
packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port);
Payload responsePayload = new Payload(PayloadType.ACK.getValue(), Payload.DEFAULT_LENGTH, payload.getCurPkt(), 1);
packet = new DatagramPacket(responsePayload.getPayload(), responsePayload.getLength(), address, port);
socket.send(packet);
statistics.ack();
statistics.tick();
if(ackEnd && statistics.getPacketsTransferredTotal() > datagram.getMaxPkt()) {
running = false;
if(ackEnd) {
runSession = false;
statistics.printAverage();
statistics.printSummary();
}
}
}
public void finish() {
runThread = false;
runSession = false;
}
}

View File

@ -0,0 +1,42 @@
package biz.nellemann.jnetperf;
import picocli.CommandLine;
import java.util.Locale;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class UnitSuffixConverter implements CommandLine.ITypeConverter<Long> {
final private Pattern pattern = Pattern.compile("(\\d+)([kmg])?b?", Pattern.CASE_INSENSITIVE);
public Long convert(String value) {
long bytes = 0L;
Matcher matcher = pattern.matcher(value);
if (matcher.find()) {
long number = Long.parseLong(matcher.group(1));
if(matcher.group(2) != null) { // We got the kilo, mega og giga suffix
String suffix = matcher.group(2);
switch (suffix.toLowerCase(Locale.ROOT)) {
case "k":
bytes = number * 1024;
break;
case "m":
bytes = number * 1024 * 1024;
break;
case "g":
bytes = number * 1024 * 1024 * 1024;
break;
default:
throw new IllegalArgumentException("Unknown suffix: " + suffix);
}
} else {
bytes = number;
}
}
return bytes;
}
}

View File

@ -28,7 +28,9 @@ class VersionProvider implements CommandLine.IVersionProvider {
Manifest manifest = new Manifest(getClass().getResourceAsStream("/META-INF/MANIFEST.MF"));
Attributes attrs = manifest.getMainAttributes();
return new String[] { "${COMMAND-FULL-NAME} " + attrs.getValue("Build-Version") };
return new String[] {
"${COMMAND-FULL-NAME} " + attrs.getValue("Build-Version") + " (on ${os.name} ${os.version} ${os.arch})",
"JVM: ${java.version} (${java.vendor} ${java.vm.name} ${java.vm.version})" };
}
}

View File

@ -1,11 +0,0 @@
/*
* This Spock specification was generated by the Gradle 'init' task.
*/
package biz.nellemann.jnetperf
import spock.lang.Specification
class ApplicationTest extends Specification {
}

View File

@ -0,0 +1,45 @@
package biz.nellemann.jnetperf
import spock.lang.Shared
import spock.lang.Specification
class TcpClientServerTest extends Specification {
static final int port = 9876;
@Shared
TcpServer tcpServer = new TcpServer(port)
// run before every feature method
def setup() {
tcpServer.start();
}
// run after every feature method
def cleanup() {
tcpServer.finish()
}
// run before the first feature method
def setupSpec() {
}
// run after the last feature method
def cleanupSpec() {
}
def "test client to server communication"() {
setup:
TcpClient client = new TcpClient("localhost", port, 512, 100, 60)
when:
client.start()
then:
client.getStatistics().getPacketsTransferredTotal() == 102 // packets + handshake + end
client.getStatistics().getBytesTransferredTotal() == 52224
}
}

View File

@ -0,0 +1,38 @@
package biz.nellemann.jnetperf
import spock.lang.Shared
import spock.lang.Specification
class TimeSuffixConverterTest extends Specification {
@Shared
TimeSuffixConverter timeSuffixConverter = new TimeSuffixConverter();
def "test second to seconds"() {
when:
int seconds = timeSuffixConverter.convert("12s")
then:
seconds == 12;
}
def "test minute to seconds"() {
when:
int seconds = timeSuffixConverter.convert("120m")
then:
seconds == 7200;
}
def "test hour to seconds"() {
when:
int seconds = timeSuffixConverter.convert("48h")
then:
seconds == 172800;
}
}

View File

@ -0,0 +1,45 @@
package biz.nellemann.jnetperf
import spock.lang.Shared
import spock.lang.Specification
class UdpClientServerTest extends Specification {
static final int port = 9876;
@Shared
UdpServer udpServer = new UdpServer(port)
// run before every feature method
def setup() {
udpServer.start();
}
// run after every feature method
def cleanup() {
udpServer.finish()
}
// run before the first feature method
def setupSpec() {
}
// run after the last feature method
def cleanupSpec() {
}
def "test client to server communication"() {
setup:
UdpClient client = new UdpClient("localhost", port, 512, 100, 60)
when:
client.start()
then:
client.getStatistics().getPacketsTransferredTotal() == 102 // packets + handshake + end
client.getStatistics().getBytesTransferredTotal() == 53144 // TODO: Why is this larger than the TCP test ?
}
}

View File

@ -0,0 +1,71 @@
package biz.nellemann.jnetperf
import spock.lang.Shared
import spock.lang.Specification
class UnitSuffixConverterTest extends Specification {
@Shared
UnitSuffixConverter unitSuffixConverter = new UnitSuffixConverter();
def "test byte (b) to bytes"() {
when:
long bytes = unitSuffixConverter.convert("16b")
then:
bytes == 16;
}
def "test kilo (k) to bytes"() {
when:
long bytes = unitSuffixConverter.convert("2048k")
then:
bytes == 2097152;
}
def "test kilo (kb) to bytes"() {
when:
long bytes = unitSuffixConverter.convert("2048kb")
then:
bytes == 2097152;
}
def "test mega (m) to bytes"() {
when:
long bytes = unitSuffixConverter.convert("2m")
then:
bytes == 2097152;
}
def "test mega (mb) to bytes"() {
when:
long bytes = unitSuffixConverter.convert("2mb")
then:
bytes == 2097152;
}
def "test giga (g) to bytes"() {
when:
long bytes = unitSuffixConverter.convert("1g")
then:
bytes == 1073741824;
}
def "test giga (gb) to bytes"() {
when:
long bytes = unitSuffixConverter.convert("1gb")
then:
bytes == 1073741824;
}
}