Sort of working poc.

This commit is contained in:
Mark Nellemann 2023-06-20 19:10:21 +02:00
parent 48f7699192
commit 9b974eb22c
5 changed files with 115 additions and 39 deletions

View file

@ -4,6 +4,8 @@
package biz.nellemann.jperf; package biz.nellemann.jperf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine; import picocli.CommandLine;
import picocli.CommandLine.Command; import picocli.CommandLine.Command;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -12,6 +14,10 @@ import java.util.concurrent.Callable;
description = "Network performance measurement tool.") description = "Network performance measurement tool.")
public class App implements Callable<Integer> { public class App implements Callable<Integer> {
final Logger log = LoggerFactory.getLogger(App.class);
@CommandLine.Option(names = { "-s", "--size" }, paramLabel = "SIZE", description = "the datagram size")
int size = 1500;
@Override @Override
public Integer call() throws Exception { // your business logic goes here... public Integer call() throws Exception { // your business logic goes here...
@ -20,31 +26,39 @@ public class App implements Callable<Integer> {
UdpServer udpServer = new UdpServer(); UdpServer udpServer = new UdpServer();
udpServer.start(); udpServer.start();
int sequence = 0;
long sequence = 0;
// Start client and send some messages // Start client and send some messages
UdpClient udpClient = new UdpClient(); UdpClient udpClient = new UdpClient();
// Start datagram // Start datagram
Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), 64, sequence++); Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), size, sequence++);
udpClient.send(datagram); udpClient.send(datagram);
Thread.sleep(100);
// TODO: Wait for ACK // TODO: Wait for ACK
datagram = udpClient.receive();
if(datagram.getType() != DataType.ACK.getValue()) {
log.warn("No ACK!");
return -1;
}
// Data datagrams ... // Data datagrams ...
for(int i = 0; i < 10; i++) { for(int i = 0; i < 100; i++) {
Thread.sleep(1000); datagram = new Datagram(DataType.DATA.getValue(), size, sequence++);
datagram = new Datagram(DataType.DATA.getValue(), 64, sequence++);
udpClient.send(datagram); udpClient.send(datagram);
//Thread.sleep(50);
} }
// End datagram // End datagram
datagram = new Datagram(DataType.END.getValue(), 64, sequence++); Thread.sleep(500);
datagram = new Datagram(DataType.END.getValue(), size, sequence++);
udpClient.send(datagram); udpClient.send(datagram);
udpClient.close(); udpClient.close();
Thread.sleep(1000);
return 0; return 0;

View file

@ -2,17 +2,20 @@ package biz.nellemann.jperf;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* *
* A datagram consists of * Datagram consists of the following
* * <p>
* <-------------------- HEADER 32 bytes --------------> <---------- DATA --------> * <------------------------- HEADER 32 bytes -------------------> <---------- DATA min 32 bytes -------->
* int-4bytes int-4bytes long-8bytes long-8bytes * _long _int _int _long _long
* TYPE SIZE SEQUENCE TIMESTAMP * 8_bytes 4_bytes 4_bytes 8_bytes 8_bytes
* MAGIC-ID TYPE LENGTH SEQUENCE TIMESTAMP
* *
*/ */
@ -20,29 +23,40 @@ public class Datagram {
final Logger log = LoggerFactory.getLogger(Datagram.class); final Logger log = LoggerFactory.getLogger(Datagram.class);
private final int HEADER_LENGTH = 24; private final int HEADER_LENGTH = 32;
private final byte[] MAGIC_ID = "jPerfTok".getBytes(StandardCharsets.UTF_8); // Must be 8-bytes
private final int type; private final int type;
private final int length; private final int length;
private final int realLength;
private final long sequence; private final long sequence;
private final long timestamp; private final long timestamp;
private final byte[] data; private final byte[] data;
/** /**
* Create new empty datagram * Create new empty datagram
* @param type * @param type
* @param lenght * @param length
* @param sequence * @param sequence
*/ */
public Datagram(int type, int lenght, long sequence) { public Datagram(int type, int length, long sequence) {
log.debug("Datagram() - of type: {}, length: {}, sequence: {}", type, length, sequence);
this.type = type; this.type = type;
this.length = lenght; this.length = length;
this.sequence = sequence; this.sequence = sequence;
this.timestamp = System.currentTimeMillis(); this.timestamp = System.currentTimeMillis();
this.data = new byte[lenght - HEADER_LENGTH]; if(type == DataType.DATA.getValue()) {
realLength = length;
data = new byte[length - HEADER_LENGTH];
} else {
realLength = HEADER_LENGTH * 2;
data = new byte[HEADER_LENGTH * 2];
}
} }
@ -51,11 +65,17 @@ public class Datagram {
* Assemble datagram from payload * Assemble datagram from payload
* @param payload * @param payload
*/ */
public Datagram(byte[] payload) { public Datagram(byte[] payload) throws IOException {
log.info("Datagram() 1"); log.debug("Datagram() magic ID is: {} bytes long and contains: {}", MAGIC_ID.length, MAGIC_ID.toString());
ByteBuffer buffer = ByteBuffer.wrap(payload); ByteBuffer buffer = ByteBuffer.wrap(payload);
byte[] id = new byte[8];
buffer.get(id);
if(!Arrays.equals(id, MAGIC_ID)) {
log.warn("Datagram() - magic ID does not match!");
throw new IOException();
}
// Order is importent when assembling header fields like this // Order is importent when assembling header fields like this
type = buffer.getInt(); type = buffer.getInt();
@ -63,14 +83,13 @@ public class Datagram {
sequence = buffer.getLong(); sequence = buffer.getLong();
timestamp = buffer.getLong(); timestamp = buffer.getLong();
log.info("Datagram() 2 "); realLength = length;
if(type == DataType.DATA.getValue()) {
log.info("Datagram() 3 "); data = new byte[length - HEADER_LENGTH];
buffer.get(data, 0, data.length);
data = new byte[length - HEADER_LENGTH]; } else {
buffer.get(data); data = new byte[HEADER_LENGTH * 2];
}
log.info("Datagram() 4");
} }
@ -78,11 +97,17 @@ public class Datagram {
return length; return length;
} }
public int getRealLength() {
return realLength;
}
public byte[] getPayload() throws IOException { public byte[] getPayload() throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(length);
log.debug("getPayload() - with type: {}, length: {}, sequence: {}", type, length, sequence);
ByteBuffer buffer = ByteBuffer.allocate(data.length + HEADER_LENGTH);
// Order is important // Order is important
buffer.put(MAGIC_ID);
buffer.putInt(type); buffer.putInt(type);
buffer.putInt(length); buffer.putInt(length);
buffer.putLong(sequence); buffer.putLong(sequence);

View file

@ -17,7 +17,7 @@ public class UdpClient {
private DatagramSocket socket; private DatagramSocket socket;
private InetAddress address; private InetAddress address;
private byte[] buf; private byte[] buf = new byte[256];
public UdpClient() throws UnknownHostException, SocketException { public UdpClient() throws UnknownHostException, SocketException {
log.info("UdpClient()"); log.info("UdpClient()");
@ -26,10 +26,16 @@ public class UdpClient {
} }
public void send(Datagram datagram) throws IOException { public void send(Datagram datagram) throws IOException {
DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getLength(), address, 4445); DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getRealLength(), address, 4445);
socket.send(packet); socket.send(packet);
} }
public Datagram receive() throws IOException {
DatagramPacket packet = new DatagramPacket(buf, buf.length);
socket.receive(packet);
Datagram datagram = new Datagram(buf);
return datagram;
}
public String sendEcho(String msg) throws IOException { public String sendEcho(String msg) throws IOException {
log.info("send() - msg: {}", msg); log.info("send() - msg: {}", msg);

View file

@ -14,7 +14,7 @@ public class UdpServer extends Thread {
final Logger log = LoggerFactory.getLogger(UdpServer.class); final Logger log = LoggerFactory.getLogger(UdpServer.class);
private DatagramSocket socket; private final DatagramSocket socket;
private boolean running; private boolean running;
private byte[] buf = new byte[256]; private byte[] buf = new byte[256];
@ -24,9 +24,13 @@ public class UdpServer extends Thread {
} }
public void run() { public void run() {
running = true; running = true;
long thisSequence = 0;
long lastSequence = 0;
try { try {
while (running) { while (running) {
DatagramPacket packet = new DatagramPacket(buf, buf.length); DatagramPacket packet = new DatagramPacket(buf, buf.length);
@ -35,34 +39,47 @@ public class UdpServer extends Thread {
InetAddress address = packet.getAddress(); InetAddress address = packet.getAddress();
int port = packet.getPort(); int port = packet.getPort();
log.debug("run() - buffer is: {}", buf.length);
Datagram datagram = new Datagram(buf); Datagram datagram = new Datagram(buf);
thisSequence = datagram.getSequence();
if(datagram.getType() == DataType.HANDSHAKE.getValue()) { if(datagram.getType() == DataType.HANDSHAKE.getValue()) {
log.info("Handshake from ..."); log.info("Handshake from ... {}, length: {}", address, datagram.getLength());
// Setup to receive larger datagrams
buf = new byte[datagram.getLength()];
// TODO: Send ACK
Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), 32, datagram.getSequence());
packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port);
socket.send(packet);
} }
if(datagram.getType() == DataType.END.getValue()) { if(datagram.getType() == DataType.END.getValue()) {
running = false; running = false;
log.info("Stopping ...."); log.info("Stopping ....");
// TODO: Reset ?
} }
if(datagram.getType() == DataType.DATA.getValue()) { if(datagram.getType() == DataType.DATA.getValue()) {
log.info("Data .... size: {}", datagram.getLength()); if(thisSequence == lastSequence + 1) {
log.info("Data .... size: {}, sequence: {}", datagram.getLength(), thisSequence);
} else {
log.warn("Data .... out of sequence: {} vs {}", thisSequence, lastSequence);
}
} }
// Send response ACK lastSequence = thisSequence;
Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), 32, datagram.getSequence());
packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port);
socket.send(packet);
} }
socket.close(); socket.close();
} catch(IOException e) { } catch(IOException e) {
log.error(e.getMessage());
} }
} }

View file

@ -0,0 +1,14 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<withJansi>false</withJansi>
<encoder>
<pattern>%cyan(%d{HH:mm:ss.SSS}) %gray([%thread]) %highlight(%-5level) %magenta(%logger{32}) - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>