Combine results in one combo package that can be sent from client to server.

This commit is contained in:
Mark Nellemann 2021-09-17 11:53:57 +02:00
parent 1a77edfe81
commit db2f31b346
16 changed files with 194 additions and 39 deletions

View File

@ -41,8 +41,9 @@ application {
}
run {
systemProperty 'pf4j.pluginsDir', '../plugins/output/'
systemProperty 'sysmon.pluginsDir', '../plugins/output/'
systemProperty 'sysmon.cfgFile', 'doc/sysmon-client.toml'
systemProperty 'sysmon.debug', '1'
}
tasks.named('test') {

View File

@ -42,11 +42,20 @@ public class Application implements Callable<Integer> {
@Override
public Integer call() throws IOException {
if(enableDebug) {
String sysmonDebug = System.getProperty("sysmon.debug");
if(sysmonDebug != null || enableDebug) {
System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "INFO");
}
Configuration configuration = new Configuration();
String sysmonCfgFile = System.getProperty("sysmon.cfgFile");
if(sysmonCfgFile != null) {
configurationFile = new File(sysmonCfgFile);
}
String sysmonPluginsDir = System.getProperty("sysmon.pluginsDir");
if(sysmonPluginsDir != null) {
pluginPath = sysmonPluginsDir;
}
if(hostname == null || hostname.isEmpty()) {
try {
@ -57,16 +66,7 @@ public class Application implements Callable<Integer> {
}
}
String pf4jPluginsDir = System.getProperty("pf4j.pluginsDir");
if(pf4jPluginsDir != null) {
pluginPath = pf4jPluginsDir;
}
String sysmonCfgFile = System.getProperty("sysmon.cfgFile");
if(sysmonCfgFile != null) {
configurationFile = new File(sysmonCfgFile);
}
Configuration configuration = new Configuration();
if(configurationFile.exists()) {
try {

View File

@ -2,13 +2,15 @@ package sysmon.client;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.AggregationStrategies;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.camel.spi.Registry;
import org.pf4j.JarPluginManager;
import org.pf4j.PluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sysmon.shared.ComboResult;
import sysmon.shared.MetricExtension;
import sysmon.shared.MetricResult;
@ -61,13 +63,13 @@ public class ClientRouteBuilder extends RouteBuilder {
//from("timer:extensions?fixedRate=true&period=30s")
from("timer:"+provides+"?fixedRate=true&period=30s")
.bean(ext, "getMetrics")
//.doTry()
.outputType(MetricResult.class)
.process(new MetricEnrichProcessor(registry))
.choice().when(exchangeProperty("skip").isEqualTo(true))
.log(LoggingLevel.WARN,"Skipping empty measurement.")
.stop()
.otherwise()
.log("${body}")
.to("seda:metrics?discardWhenFull=true");
} else {
log.info(">>> Skipping extension (not supported or disabled): " + ext.getDescription());
@ -75,18 +77,20 @@ public class ClientRouteBuilder extends RouteBuilder {
}
from("seda:metrics")
.aggregate(constant(true), AggregationStrategies.beanAllowNull(ComboAppender.class, "append"))
//.aggregate(new GroupedExchangeAggregationStrategy()).constant(true)
//.aggregate(constant(true), new ListOfResultsStrategy())
// wait for 5 seconds to aggregate
.completionTimeout(5000L).to("seda:outbound");
// TODO: Make 'concurrentConsumers' configurable
from("seda:metrics?concurrentConsumers=1")
from("seda:outbound")
.setHeader(Exchange.HTTP_METHOD, constant("POST"))
//.setHeader(Exchange.CONTENT_TYPE, constant("application/json"))
.doTry()
//.process(new MetricProcessor())
.marshal().json(JsonLibrary.Jackson, MetricResult.class)
.marshal(new JacksonDataFormat(ComboResult.class))
.to((String)registry.lookupByName("myServerUrl"))
.doCatch(Exception.class)
.log(LoggingLevel.WARN,"Error: ${exception.message}")
//.log("Error sending metric to collector: ${body}")
.end();
}

View File

@ -0,0 +1,18 @@
package sysmon.client;
import sysmon.shared.ComboResult;
import sysmon.shared.MetricResult;
public class ComboAppender {
public ComboResult append(ComboResult comboResult, MetricResult metricResult) {
if (comboResult == null) {
comboResult = new ComboResult();
}
comboResult.getMetricResults().add(metricResult);
return comboResult;
}
}

View File

@ -0,0 +1,13 @@
package sysmon.client;
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
import sysmon.shared.MetricResult;
public class ListOfResultsStrategy extends AbstractListAggregationStrategy<MetricResult> {
@Override
public MetricResult getValue(Exchange exchange) {
return exchange.getIn().getBody(MetricResult.class);
}
}

View File

@ -1,4 +1,4 @@
version=0.0.12
version=0.1.2
pf4jVersion=3.6.0
slf4jVersion=1.7.32
camelVersion=3.11.2

View File

@ -6,7 +6,7 @@ class AixNetstatTest extends Specification {
void "test netstat parsing"() {
setup:
InputStream inputStream = getClass().getResourceAsStream('/netstat-aix.txt');
InputStream inputStream = getClass().getResourceAsStream('/netstat-aix.txt')
when:
AixNetstatParser parser = new AixNetstatParser(inputStream)

View File

@ -7,7 +7,7 @@ class AixProcessorTest extends Specification {
void "test AIX lparstat shared output processing"() {
setup:
InputStream inputStream = getClass().getResourceAsStream('/lparstat-aix-shared.txt');
InputStream inputStream = getClass().getResourceAsStream('/lparstat-aix-shared.txt')
when:
AixProcessorExtension extension = new AixProcessorExtension()
@ -26,7 +26,7 @@ class AixProcessorTest extends Specification {
void "test AIX lparstat dedicated output processing"() {
setup:
InputStream inputStream = getClass().getResourceAsStream('/lparstat-aix-dedicated.txt');
InputStream inputStream = getClass().getResourceAsStream('/lparstat-aix-dedicated.txt')
when:
AixProcessorExtension extension = new AixProcessorExtension()
@ -45,7 +45,7 @@ class AixProcessorTest extends Specification {
void "test Linux lparstat output processing"() {
setup:
InputStream inputStream = getClass().getResourceAsStream('/lparstat-linux.txt');
InputStream inputStream = getClass().getResourceAsStream('/lparstat-linux.txt')
when:
AixProcessorExtension extension = new AixProcessorExtension()

View File

@ -89,15 +89,15 @@ public class BaseProcessExtension implements MetricExtension {
continue;
}
String name = p.getName();
if(!includeList.contains(name)) {
// Skip process names not found in our includeList, only if the list is not empty or null
if(includeList != null && !includeList.isEmpty() && !includeList.contains(p.getName())) {
continue;
}
log.info("pid: " + p.getProcessID() + ", name: " + name + ", virt: " + p.getVirtualSize() + " rss: " + p.getResidentSetSize());
log.debug("pid: " + p.getProcessID() + ", name: " + p.getName() + ", virt: " + p.getVirtualSize() + " rss: " + p.getResidentSetSize());
HashMap<String, String> tagsMap = new HashMap<String, String>() {{
put("pid", String.valueOf(p.getProcessID()));
put("name", name);
put("name", p.getName());
}};
HashMap<String, Object> fieldsMap = new HashMap<String, Object>() {{

View File

@ -6,7 +6,7 @@ class LinuxNetstatTest extends Specification {
void "test netstat parsing"() {
setup:
InputStream inputStream = getClass().getResourceAsStream('/netstat-linux.txt');
InputStream inputStream = getClass().getResourceAsStream('/netstat-linux.txt')
when:
LinuxNetstatParser parser = new LinuxNetstatParser(inputStream)

View File

@ -31,6 +31,10 @@ application {
applicationDefaultJvmArgs = [ "-server", "-Xmx128m", "-XX:+UseG1GC" ]
}
run {
systemProperty 'sysmon.debug', '1'
}
tasks.named('test') {
// Use junit platform for unit tests.
useJUnitPlatform()

View File

@ -47,7 +47,8 @@ public class Application implements Callable<Integer> {
@Override
public Integer call() throws IOException {
if(enableDebug) {
String sysmonDebug = System.getProperty("sysmon.debug");
if(sysmonDebug != null || enableDebug) {
System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "INFO");
}

View File

@ -0,0 +1,74 @@
package sysmon.server;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sysmon.shared.ComboResult;
import sysmon.shared.Measurement;
import sysmon.shared.MetricResult;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ComboResultToPointProcessor implements Processor {
private static final Logger log = LoggerFactory.getLogger(ComboResultToPointProcessor.class);
private static String influxDbName;
ComboResultToPointProcessor(String influxDbName) {
ComboResultToPointProcessor.influxDbName = influxDbName;
}
@Override
public void process(Exchange exchange) throws Exception {
ComboResult comboResult = exchange.getIn().getBody(ComboResult.class);
//MetricResult metricResult = exchange.getIn().getBody(MetricResult.class);
//log.info("Size of measurements: " + measurementList.size());
BatchPoints.Builder batchPoints = BatchPoints
.database(ComboResultToPointProcessor.influxDbName)
.precision(TimeUnit.MILLISECONDS);
List<MetricResult> results = comboResult.getMetricResults();
for(MetricResult metricResult : results) {
List<Measurement> measurementList = metricResult.getMeasurements();
for(Measurement measurement : measurementList) {
Point.Builder point = Point.measurement(metricResult.getName())
.time(metricResult.getTimestamp(), TimeUnit.MILLISECONDS)
.tag("hostname", metricResult.getHostname());
for (Map.Entry<String,String> entry : measurement.getTags().entrySet()) {
//log.info("process() - tag: " + entry.getKey() + "=" + entry.getValue());
point.tag(entry.getKey(), entry.getValue());
}
for (Map.Entry<String,Object> entry : measurement.getFields().entrySet()) {
//log.info("process() - field: " + entry.getKey() + "=" + entry.getValue());
if(entry.getValue() instanceof Number) {
Number num = (Number) entry.getValue();
point.addField(entry.getKey(), num);
} else if(entry.getValue() instanceof Boolean) {
Boolean bol = (Boolean) entry.getValue();
point.addField(entry.getKey(), bol);
} else {
String str = (String) entry.getValue();
point.addField(entry.getKey(), str);
}
}
batchPoints.point(point.build());
}
}
exchange.getIn().setBody(batchPoints.build());
}
}

View File

@ -5,7 +5,7 @@ import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.rest.RestBindingMode;
import org.apache.camel.spi.Registry;
import sysmon.shared.MetricResult;
import sysmon.shared.ComboResult;
public class ServerRouteBuilder extends RouteBuilder {
@ -34,20 +34,24 @@ public class ServerRouteBuilder extends RouteBuilder {
.post("/metrics")
.consumes("application/json")
.produces("text/html")
.type(MetricResult.class)
.type(ComboResult.class)
.route()
.setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202))
.setHeader("Content-Type", constant("application/x-www-form-urlencoded"))
.to("seda:inbound?discardWhenFull=true")
.doTry()
.setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202))
.setHeader("Content-Type", constant("application/x-www-form-urlencoded"))
.to("seda:inbound?discardWhenFull=true")
.doCatch(Exception.class)
.log(LoggingLevel.WARN, "Error: ${exception.message}")
.end()
.endRest();
fromF("seda:inbound?concurrentConsumers=%s", threads)
.log(">>> metric: ${header.hostname} - ${body}")
.doTry()
.process(new MetricResultToPointProcessor(dbname))
.process(new ComboResultToPointProcessor(dbname))
.toF("influxdb://ref.myInfluxConnection?batch=true") //&retentionPolicy=autogen
.doCatch(Exception.class)
.log(LoggingLevel.WARN, "Error: ${exception}")
.log(LoggingLevel.WARN, "Error: ${exception.message}")
.end();
}

View File

@ -0,0 +1,34 @@
package sysmon.shared;
import java.io.Serializable;
import java.util.ArrayList;
public class ComboResult implements Serializable {
private static final long serialVersionUID = 1L;
private ArrayList<MetricResult> metricResults;
public ComboResult() {
metricResults = new ArrayList<>();
}
public ComboResult(ArrayList<MetricResult> metricResults) {
this.metricResults = metricResults;
}
public ArrayList<MetricResult> getMetricResults() {
return metricResults;
}
public void setMetricResults(ArrayList<MetricResult> metricResults) {
this.metricResults = metricResults;
}
@Override
public String toString() {
return "ComboResult of size: " + metricResults.size();
}
}

View File

@ -7,6 +7,8 @@ import java.util.Objects;
public class Measurement implements Serializable {
private static final long serialVersionUID = 1L;
private HashMap<String, String> tags = new HashMap<>();
private HashMap<String, Object> fields = new HashMap<>();