From 5b5cf3f3727fea5f455cb3a870a14d600d56427a Mon Sep 17 00:00:00 2001 From: Mark Nellemann Date: Thu, 22 Jun 2023 08:40:31 +0200 Subject: [PATCH] more wip --- LICENSE | 202 ++++++++++++++++++ README.md | 31 +++ build.gradle | 61 +++++- src/main/java/biz/nellemann/jperf/App.java | 117 ++++++---- .../java/biz/nellemann/jperf/Datagram.java | 39 ++-- .../java/biz/nellemann/jperf/UdpClient.java | 12 +- .../java/biz/nellemann/jperf/UdpServer.java | 147 ++++++++----- 7 files changed, 500 insertions(+), 109 deletions(-) create mode 100644 LICENSE create mode 100644 README.md diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d645695 --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..56fa3d2 --- /dev/null +++ b/README.md @@ -0,0 +1,31 @@ +# jPerf + +Small utility to measure network performance. + +## Requirements + +You need Java (JRE) version 11 or later to run jperf. + +## Usage Instructions + +- Install the jperf package (*.deb*, *.rpm* or *.jar*) from [downloads](https://bitbucket.org/mnellemann/jperf/downloads/) or compile from source. +- Run **/opt/jperf/bin/jperf**, if installed from package +- Or as **java -jar /path/to/jperf.jar** + +To change the temporary directory where disk-load files are written to, use the *-Djava.io.tmpdir=/mytempdir* option. + +```shell +Usage: ... +``` + +## Development Information + +You need Java (JDK) version 11 or later to build jperf. + +### Build & Test + +Use the gradle build tool, which will download all required dependencies: + +```shell +./gradlew clean build run +``` diff --git a/build.gradle b/build.gradle index a7de37e..8faa79e 100644 --- a/build.gradle +++ b/build.gradle @@ -7,15 +7,15 @@ */ plugins { - // Apply the groovy plugin to also add support for Groovy (needed for Spock) + id 'java' id 'groovy' - - // Apply the application plugin to add support for building a CLI application in Java. id 'application' + id "com.github.johnrengelman.shadow" version "7.1.2" + id "net.nemerosa.versioning" version "2.15.1" + id "com.netflix.nebula.ospackage" version "11.2.0" } repositories { - // Use Maven Central for resolving dependencies. mavenCentral() } @@ -46,3 +46,56 @@ tasks.named('test') { // Use JUnit Platform for unit tests. useJUnitPlatform() } + + +jar { + manifest { + attributes( + 'Created-By' : "Gradle ${gradle.gradleVersion}", + 'Build-OS' : "${System.properties['os.name']} ${System.properties['os.arch']} ${System.properties['os.version']}", + 'Build-Jdk' : "${System.properties['java.version']} (${System.properties['java.vendor']} ${System.properties['java.vm.version']})", + 'Build-User' : System.properties['user.name'], + 'Build-Version' : versioning.info.tag ?: (versioning.info.branch + "-" + versioning.info.build), + 'Build-Revision' : versioning.info.commit, + 'Build-Timestamp': new Date().format("yyyy-MM-dd'T'HH:mm:ss.SSSZ").toString(), + ) + } +} + +apply plugin: 'com.netflix.nebula.ospackage' +ospackage { + packageName = 'jperf' + release = '1' + user = 'root' + packager = "Mark Nellemann " + + into '/opt/jperf' + + from(shadowJar.outputs.files) { + into 'lib' + } + + from('build/scriptsShadow') { + into 'bin' + } + + from(['README.md', 'LICENSE']) { + into 'doc' + } + +} + +buildDeb { + dependsOn build, startShadowScripts +} + +buildRpm { + dependsOn build, startShadowScripts + os = org.redline_rpm.header.Os.LINUX +} + +tasks.register("packages") { + group "build" + dependsOn ":buildDeb" + dependsOn ":buildRpm" +} diff --git a/src/main/java/biz/nellemann/jperf/App.java b/src/main/java/biz/nellemann/jperf/App.java index adbca2f..26786ba 100644 --- a/src/main/java/biz/nellemann/jperf/App.java +++ b/src/main/java/biz/nellemann/jperf/App.java @@ -8,6 +8,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine; import picocli.CommandLine.Command; + +import java.io.IOException; +import java.net.SocketException; import java.util.concurrent.Callable; @Command(name = "jperf", mixinStandardHelpOptions = true, version = "0.1", @@ -16,62 +19,45 @@ public class App implements Callable { final Logger log = LoggerFactory.getLogger(App.class); - @CommandLine.Option(names = { "-s", "--pkt-size" }, paramLabel = "SIZE", description = "datagram size") - int packetSize = 1500; - @CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "datagrams to send") - int packetCount = 100; + @CommandLine.Option(names = { "-c", "--connect" }, paramLabel = "SERVER", description = "run client and connect to remote server") + String remoteServer; + + @CommandLine.Option(names = { "-s", "--server" }, description = "run server and wait for client") + boolean runServer = false; + + @CommandLine.Option(names = { "-l", "--pkt-size" }, paramLabel = "SIZE", description = "datagram size in bytes, max 65507") + //int packetSize = 16384; // Min: 256 Max: 65507 + int packetSize = 65507; // Min: 256 Max: 65507 + + @CommandLine.Option(names = { "-n", "--pkt-num" }, paramLabel = "NUM", description = "number of packets to send") + int packetCount = 5000; @CommandLine.Option(names = { "-p", "--port" }, paramLabel = "PORT", description = "network port") int port = 4445; + @CommandLine.Option(names = { "-w", "--send-wait" }, paramLabel = "MILLISEC", description = "delay in millis between sending packets") + long sendWait = 3; + @Override public Integer call() throws Exception { // your business logic goes here... - // Start server - UdpServer udpServer = new UdpServer(port); - udpServer.start(); + if(runServer) { + runServer(); + } - - long sequence = 0; - - - // Start client and send some messages - UdpClient udpClient = new UdpClient("localhost", port); - - // Start datagram - Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), packetSize, sequence++); - udpClient.send(datagram); - Thread.sleep(100); - - // TODO: Wait for ACK - datagram = udpClient.receive(); - if(datagram.getType() != DataType.ACK.getValue()) { - log.warn("No ACK!"); - return -1; + if(remoteServer != null) { + runClient(remoteServer); } - // Data datagrams ... - for(int i = 0; i < packetCount; i++) { - datagram = new Datagram(DataType.DATA.getValue(), packetSize, sequence++); - udpClient.send(datagram); - //Thread.sleep(50); - } - - // End datagram - Thread.sleep(500); - datagram = new Datagram(DataType.END.getValue(), packetSize, sequence++); - udpClient.send(datagram); - - udpClient.close(); - Thread.sleep(1500); - + //udpServer.printSummary(); return 0; } + // this example implements Callable, so parsing, error handling and handling user // requests for usage help or version help can be done with one line of code. public static void main(String... args) { @@ -79,4 +65,57 @@ public class App implements Callable { System.exit(exitCode); } + + + private void runClient(String remoteHost) throws InterruptedException, IOException { + long sequence = 0; + + // Start client and send some messages + UdpClient udpClient = new UdpClient(remoteHost, port); + + // Start datagram + Datagram datagram = new Datagram(DataType.HANDSHAKE.getValue(), packetSize, sequence++, packetCount); + udpClient.send(datagram); + Thread.sleep(100); + + // TODO: Wait for ACK + datagram = udpClient.receive(); + if(datagram.getType() != DataType.ACK.getValue()) { + log.warn("No ACK!"); + return; + } + + // Data datagrams ... + for(int i = 0; i < packetCount; i++) { + datagram = new Datagram(DataType.DATA.getValue(), packetSize, sequence++, packetCount); + udpClient.send(datagram); + Thread.sleep(sendWait); + } + + // End datagram + Thread.sleep(100); + datagram = new Datagram(DataType.END.getValue(), packetSize, sequence++, packetCount); + udpClient.send(datagram); + + // TODO: Wait for ACK + datagram = udpClient.receive(); + if(datagram.getType() != DataType.ACK.getValue()) { + log.warn("No ACK!"); + return; + } + + + udpClient.close(); + Thread.sleep(1000); + + udpClient.printStatistics(); + } + + private void runServer() throws SocketException, InterruptedException { + // Start server + UdpServer udpServer = new UdpServer(port); + udpServer.start(); + udpServer.join(); + } + } diff --git a/src/main/java/biz/nellemann/jperf/Datagram.java b/src/main/java/biz/nellemann/jperf/Datagram.java index 9a88f1d..4ddc014 100644 --- a/src/main/java/biz/nellemann/jperf/Datagram.java +++ b/src/main/java/biz/nellemann/jperf/Datagram.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,7 +16,7 @@ import org.slf4j.LoggerFactory; * <------------------------- HEADER 32 bytes --------------> <---------- DATA min 32 bytes --------> * _long _int _int _long _long * 8_bytes 4_bytes 4_bytes 8_bytes 8_bytes - * MAGIC-ID TYPE LENGTH SEQUENCE TIMESTAMP + * MAGIC-ID TYPE LENGTH CUR_PKT MAX_PKT * */ @@ -23,6 +24,8 @@ public class Datagram { final Logger log = LoggerFactory.getLogger(Datagram.class); + private final Random random = new Random(); + private final int HEADER_LENGTH = 32; private final byte[] MAGIC_ID = "jPerfTok".getBytes(StandardCharsets.UTF_8); // Must be 8-bytes @@ -30,8 +33,8 @@ public class Datagram { private final int type; private final int length; private final int realLength; - private final long sequence; - private final long timestamp; + private final long curPkt; + private final long maxPkt; private final byte[] data; @@ -39,16 +42,16 @@ public class Datagram { * Create new empty datagram * @param type * @param length - * @param sequence + * @param currentPkt */ - public Datagram(int type, int length, long sequence) { + public Datagram(int type, int length, long currentPkt, long maxPkt) { - log.debug("Datagram() - of type: {}, length: {}, sequence: {}", type, length, sequence); + log.debug("Datagram() - of type: {}, length: {}, sequence: {}", type, length, currentPkt, maxPkt); this.type = type; this.length = length; - this.sequence = sequence; - this.timestamp = System.currentTimeMillis(); + this.curPkt = currentPkt; + this.maxPkt = maxPkt; if(type == DataType.DATA.getValue()) { realLength = length; @@ -57,6 +60,8 @@ public class Datagram { realLength = HEADER_LENGTH * 2; data = new byte[HEADER_LENGTH * 2]; } + + //random.nextBytes(data); } @@ -80,8 +85,8 @@ public class Datagram { // Order is importent when assembling header fields like this type = buffer.getInt(); length = buffer.getInt(); - sequence = buffer.getLong(); - timestamp = buffer.getLong(); + curPkt = buffer.getLong(); + maxPkt = buffer.getLong(); realLength = length; if(type == DataType.DATA.getValue()) { @@ -103,15 +108,15 @@ public class Datagram { public byte[] getPayload() throws IOException { - log.debug("getPayload() - with type: {}, length: {}, sequence: {}", type, length, sequence); + log.debug("getPayload() - with type: {}, length: {}, sequence: {}", type, length, curPkt); ByteBuffer buffer = ByteBuffer.allocate(data.length + HEADER_LENGTH); // Order is important buffer.put(MAGIC_ID); buffer.putInt(type); buffer.putInt(length); - buffer.putLong(sequence); - buffer.putLong(timestamp); + buffer.putLong(curPkt); + buffer.putLong(maxPkt); buffer.put(data); @@ -124,8 +129,12 @@ public class Datagram { } - public long getSequence() { - return sequence; + public long getCurPkt() { + return curPkt; + } + + public long getMaxPkt() { + return maxPkt; } diff --git a/src/main/java/biz/nellemann/jperf/UdpClient.java b/src/main/java/biz/nellemann/jperf/UdpClient.java index ba0370c..d854257 100644 --- a/src/main/java/biz/nellemann/jperf/UdpClient.java +++ b/src/main/java/biz/nellemann/jperf/UdpClient.java @@ -6,6 +6,7 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; +import java.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,10 +17,12 @@ public class UdpClient { private final int port; private final InetAddress address; - - private DatagramSocket socket; + private final DatagramSocket socket; private byte[] buf = new byte[256]; + private long packetsSent = 0; + private long bytesSent = 0; + public UdpClient(String hostname, int port) throws UnknownHostException, SocketException { log.info("UdpClient() - target: {}, port: {}", hostname, port); @@ -31,6 +34,8 @@ public class UdpClient { public void send(Datagram datagram) throws IOException { DatagramPacket packet = new DatagramPacket(datagram.getPayload(), datagram.getRealLength(), address, port); socket.send(packet); + packetsSent++; + bytesSent += datagram.getRealLength(); } public Datagram receive() throws IOException { @@ -54,4 +59,7 @@ public class UdpClient { socket.close(); } + public void printStatistics() { + System.out.printf("%s sent: %d pkts\t %d B\t %d KB\t %d MB\n", Instant.now().toString(), packetsSent, bytesSent, bytesSent/1000, bytesSent/1_000_000); + } } diff --git a/src/main/java/biz/nellemann/jperf/UdpServer.java b/src/main/java/biz/nellemann/jperf/UdpServer.java index 3c7b424..840c4c4 100644 --- a/src/main/java/biz/nellemann/jperf/UdpServer.java +++ b/src/main/java/biz/nellemann/jperf/UdpServer.java @@ -5,6 +5,8 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.SocketException; +import java.time.Duration; +import java.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,61 +17,25 @@ public class UdpServer extends Thread { private final DatagramSocket socket; - private boolean running; private byte[] buf = new byte[256]; + long pktsReceived, pktsReceivedTotal = 0; + long bytesReceived, bytesReceivedTotal = 0; + long bytesPerSec, pktsPerSec = 0; + public UdpServer(int port) throws SocketException { - log.info("UdpServer() - port: {}", port); + log.info("UdpServer()"); socket = new DatagramSocket(port); } public void run() { - running = true; - long thisSequence = 0; - long lastSequence = 0; + boolean running = true; try { while (running) { - - DatagramPacket packet = new DatagramPacket(buf, buf.length); - socket.receive(packet); - - InetAddress address = packet.getAddress(); - int port = packet.getPort(); - - log.debug("run() - buffer is: {}", buf.length); - - Datagram datagram = new Datagram(buf); - thisSequence = datagram.getSequence(); - - if(datagram.getType() == DataType.HANDSHAKE.getValue()) { - log.info("Handshake from ... {}", address); - - // Setup to receive larger datagrams - buf = new byte[datagram.getLength()]; - - // TODO: Send ACK - Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), 32, datagram.getSequence()); - packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port); - socket.send(packet); - - } - - if(datagram.getType() == DataType.END.getValue()) { - running = false; - } - - if(datagram.getType() == DataType.DATA.getValue()) { - if(thisSequence == lastSequence + 1) { - log.info("Data .... size: {}, sequence: {}", datagram.getLength(), thisSequence); - } else { - log.warn("Data .... out of sequence: {} vs {}", thisSequence, lastSequence); - } - } - - lastSequence = thisSequence; + session(); } socket.close(); @@ -80,14 +46,97 @@ public class UdpServer extends Thread { } - private void ack() { + public void printStatistics() { + // Because we do this every second ... + bytesPerSec = bytesReceived; + pktsPerSec = pktsReceived; + + System.out.printf("%s recv: %d pkt/s\t %d B/s\t %d KB/s\t %d MB/s\n", Instant.now().toString(), pktsPerSec, bytesPerSec, bytesPerSec/1_000, bytesPerSec/1_000_000); + pktsReceived = 0; + bytesReceived = 0; + } + + public void printSummary() { + System.out.printf("%s recv: %d pkts\t %d B\t %d KB\t %d MB\n", Instant.now().toString(), pktsReceivedTotal, bytesReceivedTotal, bytesReceivedTotal/1_000, bytesReceivedTotal/1_000_000); + } + + + public void session() throws IOException { + + boolean running = true; + + boolean ackEnd = false; + long thisSequence, lastSequence = 0; + Instant startInstant = Instant.now(); + Instant checkInstant; + + while (running) { + + DatagramPacket packet = new DatagramPacket(buf, buf.length); + socket.receive(packet); + + InetAddress address = packet.getAddress(); + int port = packet.getPort(); + + Datagram datagram = new Datagram(buf); + thisSequence = datagram.getCurPkt(); + + + if(datagram.getType() == DataType.HANDSHAKE.getValue()) { + log.info("Handshake from ... {}, length: {}", address, datagram.getLength()); + + // Setup to receive larger datagrams + buf = new byte[datagram.getLength()]; + + // Send ACK + Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), 32, datagram.getCurPkt(), 1); + packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port); + socket.send(packet); + + } + + if(datagram.getType() == DataType.DATA.getValue()) { + bytesReceived += datagram.getLength(); + bytesReceivedTotal += datagram.getLength(); + + if(thisSequence == lastSequence + 1) { + //log.info("Data .... size: {}, sequence: {}", datagram.getLength(), thisSequence); + } else { + //log.warn("Data .... out of sequence: {} vs {}", thisSequence, lastSequence); + } + } + + + if(datagram.getType() == DataType.END.getValue()) { + ackEnd = true; + } + + + // Every second + checkInstant = Instant.now(); + if(Duration.between(startInstant, checkInstant).toSeconds() >= 1) { + printStatistics(); + startInstant = checkInstant; + } + + if(ackEnd && pktsReceivedTotal > datagram.getMaxPkt()) { + // Send ACK + Datagram responseDatagram = new Datagram(DataType.ACK.getValue(), 32, datagram.getCurPkt(), 1); + packet = new DatagramPacket(responseDatagram.getPayload(), responseDatagram.getLength(), address, port); + socket.send(packet); + + printSummary(); + running = false; + } + + + lastSequence = thisSequence; + pktsReceived++; + pktsReceivedTotal++; + + } } - private void receive(String msg) { - log.info("receive() - msg: {}", msg); - } - - }