From db2f31b346a4956fbbf570659ea1a30e5647f4c5 Mon Sep 17 00:00:00 2001 From: Mark Nellemann Date: Fri, 17 Sep 2021 11:53:57 +0200 Subject: [PATCH] Combine results in one combo package that can be sent from client to server. --- client/build.gradle | 3 +- .../main/java/sysmon/client/Application.java | 24 +++--- .../sysmon/client/ClientRouteBuilder.java | 20 +++-- .../java/sysmon/client/ComboAppender.java | 18 +++++ .../sysmon/client/ListOfResultsStrategy.java | 13 ++++ gradle.properties | 2 +- .../src/test/groovy/AixNetstatTest.groovy | 2 +- .../src/test/groovy/AixProcessorTest.groovy | 6 +- .../plugins/os_base/BaseProcessExtension.java | 8 +- .../src/test/groovy/LinuxNetstatTest.groovy | 2 +- server/build.gradle | 4 + .../main/java/sysmon/server/Application.java | 3 +- .../server/ComboResultToPointProcessor.java | 74 +++++++++++++++++++ .../sysmon/server/ServerRouteBuilder.java | 18 +++-- .../main/java/sysmon/shared/ComboResult.java | 34 +++++++++ .../main/java/sysmon/shared/Measurement.java | 2 + 16 files changed, 194 insertions(+), 39 deletions(-) create mode 100644 client/src/main/java/sysmon/client/ComboAppender.java create mode 100644 client/src/main/java/sysmon/client/ListOfResultsStrategy.java create mode 100644 server/src/main/java/sysmon/server/ComboResultToPointProcessor.java create mode 100644 shared/src/main/java/sysmon/shared/ComboResult.java diff --git a/client/build.gradle b/client/build.gradle index 8e7a2ad..d838cd0 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -41,8 +41,9 @@ application { } run { - systemProperty 'pf4j.pluginsDir', '../plugins/output/' + systemProperty 'sysmon.pluginsDir', '../plugins/output/' systemProperty 'sysmon.cfgFile', 'doc/sysmon-client.toml' + systemProperty 'sysmon.debug', '1' } tasks.named('test') { diff --git a/client/src/main/java/sysmon/client/Application.java b/client/src/main/java/sysmon/client/Application.java index 45fe046..22ee7ec 100644 --- a/client/src/main/java/sysmon/client/Application.java +++ b/client/src/main/java/sysmon/client/Application.java @@ -42,11 +42,20 @@ public class Application implements Callable { @Override public Integer call() throws IOException { - if(enableDebug) { + String sysmonDebug = System.getProperty("sysmon.debug"); + if(sysmonDebug != null || enableDebug) { System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "INFO"); } - Configuration configuration = new Configuration(); + String sysmonCfgFile = System.getProperty("sysmon.cfgFile"); + if(sysmonCfgFile != null) { + configurationFile = new File(sysmonCfgFile); + } + + String sysmonPluginsDir = System.getProperty("sysmon.pluginsDir"); + if(sysmonPluginsDir != null) { + pluginPath = sysmonPluginsDir; + } if(hostname == null || hostname.isEmpty()) { try { @@ -57,16 +66,7 @@ public class Application implements Callable { } } - String pf4jPluginsDir = System.getProperty("pf4j.pluginsDir"); - if(pf4jPluginsDir != null) { - pluginPath = pf4jPluginsDir; - } - - String sysmonCfgFile = System.getProperty("sysmon.cfgFile"); - if(sysmonCfgFile != null) { - configurationFile = new File(sysmonCfgFile); - } - + Configuration configuration = new Configuration(); if(configurationFile.exists()) { try { diff --git a/client/src/main/java/sysmon/client/ClientRouteBuilder.java b/client/src/main/java/sysmon/client/ClientRouteBuilder.java index f586c76..6adf7fe 100644 --- a/client/src/main/java/sysmon/client/ClientRouteBuilder.java +++ b/client/src/main/java/sysmon/client/ClientRouteBuilder.java @@ -2,13 +2,15 @@ package sysmon.client; import org.apache.camel.Exchange; import org.apache.camel.LoggingLevel; +import org.apache.camel.builder.AggregationStrategies; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.model.dataformat.JsonLibrary; +import org.apache.camel.component.jackson.JacksonDataFormat; import org.apache.camel.spi.Registry; import org.pf4j.JarPluginManager; import org.pf4j.PluginManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sysmon.shared.ComboResult; import sysmon.shared.MetricExtension; import sysmon.shared.MetricResult; @@ -61,13 +63,13 @@ public class ClientRouteBuilder extends RouteBuilder { //from("timer:extensions?fixedRate=true&period=30s") from("timer:"+provides+"?fixedRate=true&period=30s") .bean(ext, "getMetrics") - //.doTry() .outputType(MetricResult.class) .process(new MetricEnrichProcessor(registry)) .choice().when(exchangeProperty("skip").isEqualTo(true)) .log(LoggingLevel.WARN,"Skipping empty measurement.") .stop() .otherwise() + .log("${body}") .to("seda:metrics?discardWhenFull=true"); } else { log.info(">>> Skipping extension (not supported or disabled): " + ext.getDescription()); @@ -75,18 +77,20 @@ public class ClientRouteBuilder extends RouteBuilder { } + from("seda:metrics") + .aggregate(constant(true), AggregationStrategies.beanAllowNull(ComboAppender.class, "append")) + //.aggregate(new GroupedExchangeAggregationStrategy()).constant(true) + //.aggregate(constant(true), new ListOfResultsStrategy()) + // wait for 5 seconds to aggregate + .completionTimeout(5000L).to("seda:outbound"); - // TODO: Make 'concurrentConsumers' configurable - from("seda:metrics?concurrentConsumers=1") + from("seda:outbound") .setHeader(Exchange.HTTP_METHOD, constant("POST")) - //.setHeader(Exchange.CONTENT_TYPE, constant("application/json")) .doTry() - //.process(new MetricProcessor()) - .marshal().json(JsonLibrary.Jackson, MetricResult.class) + .marshal(new JacksonDataFormat(ComboResult.class)) .to((String)registry.lookupByName("myServerUrl")) .doCatch(Exception.class) .log(LoggingLevel.WARN,"Error: ${exception.message}") - //.log("Error sending metric to collector: ${body}") .end(); } diff --git a/client/src/main/java/sysmon/client/ComboAppender.java b/client/src/main/java/sysmon/client/ComboAppender.java new file mode 100644 index 0000000..5cc5b92 --- /dev/null +++ b/client/src/main/java/sysmon/client/ComboAppender.java @@ -0,0 +1,18 @@ +package sysmon.client; + +import sysmon.shared.ComboResult; +import sysmon.shared.MetricResult; + +public class ComboAppender { + + public ComboResult append(ComboResult comboResult, MetricResult metricResult) { + + if (comboResult == null) { + comboResult = new ComboResult(); + } + + comboResult.getMetricResults().add(metricResult); + return comboResult; + } + +} diff --git a/client/src/main/java/sysmon/client/ListOfResultsStrategy.java b/client/src/main/java/sysmon/client/ListOfResultsStrategy.java new file mode 100644 index 0000000..edc33f7 --- /dev/null +++ b/client/src/main/java/sysmon/client/ListOfResultsStrategy.java @@ -0,0 +1,13 @@ +package sysmon.client; + +import org.apache.camel.Exchange; +import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy; +import sysmon.shared.MetricResult; + +public class ListOfResultsStrategy extends AbstractListAggregationStrategy { + + @Override + public MetricResult getValue(Exchange exchange) { + return exchange.getIn().getBody(MetricResult.class); + } +} \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index 260fb63..03f3c8e 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=0.0.12 +version=0.1.2 pf4jVersion=3.6.0 slf4jVersion=1.7.32 camelVersion=3.11.2 diff --git a/plugins/os-aix/src/test/groovy/AixNetstatTest.groovy b/plugins/os-aix/src/test/groovy/AixNetstatTest.groovy index 716786b..52ad111 100644 --- a/plugins/os-aix/src/test/groovy/AixNetstatTest.groovy +++ b/plugins/os-aix/src/test/groovy/AixNetstatTest.groovy @@ -6,7 +6,7 @@ class AixNetstatTest extends Specification { void "test netstat parsing"() { setup: - InputStream inputStream = getClass().getResourceAsStream('/netstat-aix.txt'); + InputStream inputStream = getClass().getResourceAsStream('/netstat-aix.txt') when: AixNetstatParser parser = new AixNetstatParser(inputStream) diff --git a/plugins/os-aix/src/test/groovy/AixProcessorTest.groovy b/plugins/os-aix/src/test/groovy/AixProcessorTest.groovy index 5b21c5b..a71ce58 100644 --- a/plugins/os-aix/src/test/groovy/AixProcessorTest.groovy +++ b/plugins/os-aix/src/test/groovy/AixProcessorTest.groovy @@ -7,7 +7,7 @@ class AixProcessorTest extends Specification { void "test AIX lparstat shared output processing"() { setup: - InputStream inputStream = getClass().getResourceAsStream('/lparstat-aix-shared.txt'); + InputStream inputStream = getClass().getResourceAsStream('/lparstat-aix-shared.txt') when: AixProcessorExtension extension = new AixProcessorExtension() @@ -26,7 +26,7 @@ class AixProcessorTest extends Specification { void "test AIX lparstat dedicated output processing"() { setup: - InputStream inputStream = getClass().getResourceAsStream('/lparstat-aix-dedicated.txt'); + InputStream inputStream = getClass().getResourceAsStream('/lparstat-aix-dedicated.txt') when: AixProcessorExtension extension = new AixProcessorExtension() @@ -45,7 +45,7 @@ class AixProcessorTest extends Specification { void "test Linux lparstat output processing"() { setup: - InputStream inputStream = getClass().getResourceAsStream('/lparstat-linux.txt'); + InputStream inputStream = getClass().getResourceAsStream('/lparstat-linux.txt') when: AixProcessorExtension extension = new AixProcessorExtension() diff --git a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseProcessExtension.java b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseProcessExtension.java index 7b0d331..4ada212 100644 --- a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseProcessExtension.java +++ b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseProcessExtension.java @@ -89,15 +89,15 @@ public class BaseProcessExtension implements MetricExtension { continue; } - String name = p.getName(); - if(!includeList.contains(name)) { + // Skip process names not found in our includeList, only if the list is not empty or null + if(includeList != null && !includeList.isEmpty() && !includeList.contains(p.getName())) { continue; } - log.info("pid: " + p.getProcessID() + ", name: " + name + ", virt: " + p.getVirtualSize() + " rss: " + p.getResidentSetSize()); + log.debug("pid: " + p.getProcessID() + ", name: " + p.getName() + ", virt: " + p.getVirtualSize() + " rss: " + p.getResidentSetSize()); HashMap tagsMap = new HashMap() {{ put("pid", String.valueOf(p.getProcessID())); - put("name", name); + put("name", p.getName()); }}; HashMap fieldsMap = new HashMap() {{ diff --git a/plugins/os-linux/src/test/groovy/LinuxNetstatTest.groovy b/plugins/os-linux/src/test/groovy/LinuxNetstatTest.groovy index 4d3e91d..4f0e416 100644 --- a/plugins/os-linux/src/test/groovy/LinuxNetstatTest.groovy +++ b/plugins/os-linux/src/test/groovy/LinuxNetstatTest.groovy @@ -6,7 +6,7 @@ class LinuxNetstatTest extends Specification { void "test netstat parsing"() { setup: - InputStream inputStream = getClass().getResourceAsStream('/netstat-linux.txt'); + InputStream inputStream = getClass().getResourceAsStream('/netstat-linux.txt') when: LinuxNetstatParser parser = new LinuxNetstatParser(inputStream) diff --git a/server/build.gradle b/server/build.gradle index fd13fff..e0f95b3 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -31,6 +31,10 @@ application { applicationDefaultJvmArgs = [ "-server", "-Xmx128m", "-XX:+UseG1GC" ] } +run { + systemProperty 'sysmon.debug', '1' +} + tasks.named('test') { // Use junit platform for unit tests. useJUnitPlatform() diff --git a/server/src/main/java/sysmon/server/Application.java b/server/src/main/java/sysmon/server/Application.java index b8b0c16..1b4c8e7 100644 --- a/server/src/main/java/sysmon/server/Application.java +++ b/server/src/main/java/sysmon/server/Application.java @@ -47,7 +47,8 @@ public class Application implements Callable { @Override public Integer call() throws IOException { - if(enableDebug) { + String sysmonDebug = System.getProperty("sysmon.debug"); + if(sysmonDebug != null || enableDebug) { System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "INFO"); } diff --git a/server/src/main/java/sysmon/server/ComboResultToPointProcessor.java b/server/src/main/java/sysmon/server/ComboResultToPointProcessor.java new file mode 100644 index 0000000..5cbfbd6 --- /dev/null +++ b/server/src/main/java/sysmon/server/ComboResultToPointProcessor.java @@ -0,0 +1,74 @@ +package sysmon.server; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.influxdb.dto.BatchPoints; +import org.influxdb.dto.Point; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sysmon.shared.ComboResult; +import sysmon.shared.Measurement; +import sysmon.shared.MetricResult; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class ComboResultToPointProcessor implements Processor { + + private static final Logger log = LoggerFactory.getLogger(ComboResultToPointProcessor.class); + private static String influxDbName; + + ComboResultToPointProcessor(String influxDbName) { + ComboResultToPointProcessor.influxDbName = influxDbName; + } + + @Override + public void process(Exchange exchange) throws Exception { + + ComboResult comboResult = exchange.getIn().getBody(ComboResult.class); + //MetricResult metricResult = exchange.getIn().getBody(MetricResult.class); + + //log.info("Size of measurements: " + measurementList.size()); + + BatchPoints.Builder batchPoints = BatchPoints + .database(ComboResultToPointProcessor.influxDbName) + .precision(TimeUnit.MILLISECONDS); + + List results = comboResult.getMetricResults(); + for(MetricResult metricResult : results) { + + List measurementList = metricResult.getMeasurements(); + for(Measurement measurement : measurementList) { + + Point.Builder point = Point.measurement(metricResult.getName()) + .time(metricResult.getTimestamp(), TimeUnit.MILLISECONDS) + .tag("hostname", metricResult.getHostname()); + + for (Map.Entry entry : measurement.getTags().entrySet()) { + //log.info("process() - tag: " + entry.getKey() + "=" + entry.getValue()); + point.tag(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : measurement.getFields().entrySet()) { + //log.info("process() - field: " + entry.getKey() + "=" + entry.getValue()); + if(entry.getValue() instanceof Number) { + Number num = (Number) entry.getValue(); + point.addField(entry.getKey(), num); + } else if(entry.getValue() instanceof Boolean) { + Boolean bol = (Boolean) entry.getValue(); + point.addField(entry.getKey(), bol); + } else { + String str = (String) entry.getValue(); + point.addField(entry.getKey(), str); + } + } + batchPoints.point(point.build()); + } + } + + exchange.getIn().setBody(batchPoints.build()); + + } + +} diff --git a/server/src/main/java/sysmon/server/ServerRouteBuilder.java b/server/src/main/java/sysmon/server/ServerRouteBuilder.java index 46bcbeb..631f9e7 100644 --- a/server/src/main/java/sysmon/server/ServerRouteBuilder.java +++ b/server/src/main/java/sysmon/server/ServerRouteBuilder.java @@ -5,7 +5,7 @@ import org.apache.camel.LoggingLevel; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.model.rest.RestBindingMode; import org.apache.camel.spi.Registry; -import sysmon.shared.MetricResult; +import sysmon.shared.ComboResult; public class ServerRouteBuilder extends RouteBuilder { @@ -34,20 +34,24 @@ public class ServerRouteBuilder extends RouteBuilder { .post("/metrics") .consumes("application/json") .produces("text/html") - .type(MetricResult.class) + .type(ComboResult.class) .route() - .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202)) - .setHeader("Content-Type", constant("application/x-www-form-urlencoded")) - .to("seda:inbound?discardWhenFull=true") + .doTry() + .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202)) + .setHeader("Content-Type", constant("application/x-www-form-urlencoded")) + .to("seda:inbound?discardWhenFull=true") + .doCatch(Exception.class) + .log(LoggingLevel.WARN, "Error: ${exception.message}") + .end() .endRest(); fromF("seda:inbound?concurrentConsumers=%s", threads) .log(">>> metric: ${header.hostname} - ${body}") .doTry() - .process(new MetricResultToPointProcessor(dbname)) + .process(new ComboResultToPointProcessor(dbname)) .toF("influxdb://ref.myInfluxConnection?batch=true") //&retentionPolicy=autogen .doCatch(Exception.class) - .log(LoggingLevel.WARN, "Error: ${exception}") + .log(LoggingLevel.WARN, "Error: ${exception.message}") .end(); } diff --git a/shared/src/main/java/sysmon/shared/ComboResult.java b/shared/src/main/java/sysmon/shared/ComboResult.java new file mode 100644 index 0000000..014dad2 --- /dev/null +++ b/shared/src/main/java/sysmon/shared/ComboResult.java @@ -0,0 +1,34 @@ +package sysmon.shared; + +import java.io.Serializable; +import java.util.ArrayList; + +public class ComboResult implements Serializable { + + private static final long serialVersionUID = 1L; + + private ArrayList metricResults; + + + public ComboResult() { + metricResults = new ArrayList<>(); + } + + public ComboResult(ArrayList metricResults) { + this.metricResults = metricResults; + } + + public ArrayList getMetricResults() { + return metricResults; + } + + public void setMetricResults(ArrayList metricResults) { + this.metricResults = metricResults; + } + + @Override + public String toString() { + return "ComboResult of size: " + metricResults.size(); + } + +} diff --git a/shared/src/main/java/sysmon/shared/Measurement.java b/shared/src/main/java/sysmon/shared/Measurement.java index 07c9290..3e33715 100644 --- a/shared/src/main/java/sysmon/shared/Measurement.java +++ b/shared/src/main/java/sysmon/shared/Measurement.java @@ -7,6 +7,8 @@ import java.util.Objects; public class Measurement implements Serializable { + private static final long serialVersionUID = 1L; + private HashMap tags = new HashMap<>(); private HashMap fields = new HashMap<>();