Refactored to support multiple measurements in a result.

This commit is contained in:
Mark Nellemann 2021-09-02 21:40:22 +02:00
parent d102d2f9fc
commit 612a42547a
24 changed files with 243 additions and 93 deletions

View file

@ -54,12 +54,12 @@ public class ClientRouteBuilder extends RouteBuilder {
from("timer:extensions?fixedRate=true&period=30s") from("timer:extensions?fixedRate=true&period=30s")
.bean(ext, "getMetrics") .bean(ext, "getMetrics")
//.doTry() //.doTry()
.outputType(MetricResult.class)
.process(new MetricEnrichProcessor(registry)) .process(new MetricEnrichProcessor(registry))
.choice().when(exchangeProperty("skip").isEqualTo(true)) .choice().when(exchangeProperty("skip").isEqualTo(true))
.log("Skipping empty measurement.") .log("Skipping empty measurement.")
.stop() .stop()
.otherwise() .otherwise()
.log(">>> ${body}")
.to("seda:metrics?discardWhenFull=true"); .to("seda:metrics?discardWhenFull=true");
} else { } else {
log.info(">>> Skipping extension (not supported here): " + ext.getDescription()); log.info(">>> Skipping extension (not supported here): " + ext.getDescription());

View file

@ -20,7 +20,7 @@ public class MetricEnrichProcessor implements Processor {
MetricResult metricResult = exchange.getIn().getBody(MetricResult.class); MetricResult metricResult = exchange.getIn().getBody(MetricResult.class);
// We make sure MetricResults with no measurements are not sent further down the line // We make sure MetricResults with no measurements are not sent further down the line
if(metricResult == null || metricResult.getMeasurement() == null) { if(metricResult == null || metricResult.getMeasurements() == null) {
exchange.setProperty("skip", true); exchange.setProperty("skip", true);
return; return;
} }

View file

@ -40,3 +40,5 @@ camel.main.lightweight = true
# configure beans # configure beans
#camel.beans.metricProcessor = #class:org.sysmon.client.MetricProcessor #camel.beans.metricProcessor = #class:org.sysmon.client.MetricProcessor
#camel.dataformat.json-jackson.use-list = true

View file

@ -1,4 +1,4 @@
version=0.0.6 version=0.0.7
pf4jVersion=3.6.0 pf4jVersion=3.6.0
slf4jVersion=1.7.32 slf4jVersion=1.7.32
camelVersion=3.11.0 camelVersion=3.11.0

View file

@ -10,6 +10,7 @@ import sysmon.shared.PluginHelper;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
// Disabled // Disabled
@ -36,12 +37,12 @@ public class AixNetstatExtension implements MetricExtension {
@Override @Override
public String getName() { public String getName() {
return "aix-network-netstat"; return "aix_network_netstat";
} }
@Override @Override
public String getProvides() { public String getProvides() {
return "network-netstat"; return "network_netstat";
} }
@Override @Override
@ -52,8 +53,8 @@ public class AixNetstatExtension implements MetricExtension {
@Override @Override
public MetricResult getMetrics() throws Exception { public MetricResult getMetrics() throws Exception {
Map<String, String> tagsMap = null; HashMap<String, String> tagsMap = null;
Map<String, Object> fieldsMap = null; HashMap<String, Object> fieldsMap = null;
try (InputStream buf = PluginHelper.executeCommand("netstat -s -f inet")) { try (InputStream buf = PluginHelper.executeCommand("netstat -s -f inet")) {
AixNetstatParser parser = processCommandOutput(buf); AixNetstatParser parser = processCommandOutput(buf);
@ -62,7 +63,7 @@ public class AixNetstatExtension implements MetricExtension {
} }
log.debug(fieldsMap.toString()); log.debug(fieldsMap.toString());
return new MetricResult("network_netstat", new Measurement(tagsMap, fieldsMap)); return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap));
} }

View file

@ -130,12 +130,12 @@ public class AixNetstatParser {
} }
public Map<String, String> getTags() { public HashMap<String, String> getTags() {
return new HashMap<>(); return new HashMap<>();
} }
public Map<String, Object> getFields() { public HashMap<String, Object> getFields() {
Map<String, Object> fields = new HashMap<>(); HashMap<String, Object> fields = new HashMap<>();
fields.put("ip_forwarded", ipForwarded); fields.put("ip_forwarded", ipForwarded);
fields.put("ip_received", ipTotalPacketsReceived); fields.put("ip_received", ipTotalPacketsReceived);

View file

@ -10,6 +10,7 @@ import sysmon.shared.PluginHelper;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -37,12 +38,12 @@ public class AixProcessorExtension implements MetricExtension {
@Override @Override
public String getName() { public String getName() {
return "aix-processor"; return "aix_processor";
} }
@Override @Override
public String getProvides() { public String getProvides() {
return "processor-lpar"; return "processor_lpar";
} }
@Override @Override
@ -53,8 +54,8 @@ public class AixProcessorExtension implements MetricExtension {
@Override @Override
public MetricResult getMetrics() throws Exception { public MetricResult getMetrics() throws Exception {
Map<String, String> tagsMap = null; HashMap<String, String> tagsMap = null;
Map<String, Object> fieldsMap = null; HashMap<String, Object> fieldsMap = null;
try (InputStream buf = PluginHelper.executeCommand("lparstat 5 1")) { try (InputStream buf = PluginHelper.executeCommand("lparstat 5 1")) {
AixProcessorStat processorStat = processCommandOutput(buf); AixProcessorStat processorStat = processCommandOutput(buf);
@ -62,7 +63,7 @@ public class AixProcessorExtension implements MetricExtension {
fieldsMap = processorStat.getFields(); fieldsMap = processorStat.getFields();
} }
return new MetricResult("processor_lpar", new Measurement(tagsMap, fieldsMap)); return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap));
} }

View file

@ -140,12 +140,12 @@ public class AixProcessorStat {
return 100 - idle; return 100 - idle;
} }
public Map<String, String> getTags() { public HashMap<String, String> getTags() {
return new HashMap<>(); return new HashMap<>();
} }
public Map<String, Object> getFields() { public HashMap<String, Object> getFields() {
Map<String, Object> fields = new HashMap<>(); HashMap<String, Object> fields = new HashMap<>();
fields.put("lcpu", lcpu); fields.put("lcpu", lcpu);
fields.put("ent", ent); fields.put("ent", ent);
fields.put("user", user); fields.put("user", user);

View file

@ -30,7 +30,7 @@ public class BaseDiskExtension implements MetricExtension {
@Override @Override
public String getName() { public String getName() {
return "base-disk"; return "base_disk";
} }
@Override @Override
@ -52,8 +52,8 @@ public class BaseDiskExtension implements MetricExtension {
long transferTime = 0L; long transferTime = 0L;
long queueLength = 0L; long queueLength = 0L;
Map<String, String> tagsMap = new HashMap<>(); HashMap<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>(); HashMap<String, Object> fieldsMap = new HashMap<>();
List<HWDiskStore> diskStores = hardwareAbstractionLayer.getDiskStores(); List<HWDiskStore> diskStores = hardwareAbstractionLayer.getDiskStores();
for(HWDiskStore store : diskStores) { for(HWDiskStore store : diskStores) {
@ -73,7 +73,7 @@ public class BaseDiskExtension implements MetricExtension {
fieldsMap.put("queue", queueLength); fieldsMap.put("queue", queueLength);
log.debug(fieldsMap.toString()); log.debug(fieldsMap.toString());
return new MetricResult("disk", new Measurement(tagsMap, fieldsMap)); return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap));
} }
} }

View file

@ -27,7 +27,7 @@ public class BaseMemoryExtension implements MetricExtension {
@Override @Override
public String getName() { public String getName() {
return "base-memory"; return "base_memory";
} }
@Override @Override
@ -43,8 +43,8 @@ public class BaseMemoryExtension implements MetricExtension {
@Override @Override
public MetricResult getMetrics() { public MetricResult getMetrics() {
Map<String, String> tagsMap = new HashMap<>(); HashMap<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>(); HashMap<String, Object> fieldsMap = new HashMap<>();
long total = hardwareAbstractionLayer.getMemory().getTotal(); long total = hardwareAbstractionLayer.getMemory().getTotal();
long available = hardwareAbstractionLayer.getMemory().getAvailable(); long available = hardwareAbstractionLayer.getMemory().getAvailable();
@ -57,7 +57,7 @@ public class BaseMemoryExtension implements MetricExtension {
fieldsMap.put("virtual", hardwareAbstractionLayer.getMemory().getVirtualMemory().getVirtualInUse()); fieldsMap.put("virtual", hardwareAbstractionLayer.getMemory().getVirtualMemory().getVirtualInUse());
log.debug(fieldsMap.toString()); log.debug(fieldsMap.toString());
return new MetricResult("memory", new Measurement(tagsMap, fieldsMap)); return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap));
} }

View file

@ -29,7 +29,7 @@ public class BaseNetworkExtension implements MetricExtension {
@Override @Override
public String getName() { public String getName() {
return "base-network"; return "base_network";
} }
@Override @Override
@ -52,8 +52,8 @@ public class BaseNetworkExtension implements MetricExtension {
long txPackets = 0L; long txPackets = 0L;
long txErrs = 0L; long txErrs = 0L;
Map<String, String> tagsMap = new HashMap<>(); HashMap<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>(); HashMap<String, Object> fieldsMap = new HashMap<>();
List<NetworkIF> interfaces = hardwareAbstractionLayer.getNetworkIFs(); List<NetworkIF> interfaces = hardwareAbstractionLayer.getNetworkIFs();
for(NetworkIF netif : interfaces) { for(NetworkIF netif : interfaces) {
@ -75,7 +75,7 @@ public class BaseNetworkExtension implements MetricExtension {
fieldsMap.put("txErrors", txErrs); fieldsMap.put("txErrors", txErrs);
log.debug(fieldsMap.toString()); log.debug(fieldsMap.toString());
return new MetricResult("network", new Measurement(tagsMap, fieldsMap)); return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap));
} }
} }

View file

@ -36,4 +36,19 @@ public class BasePlugin extends Plugin {
return hardwareAbstractionLayer; return hardwareAbstractionLayer;
} }
public static SystemInfo getSystemInfo() {
try {
if(systemInfo == null) {
systemInfo = new SystemInfo();
}
} catch (UnsupportedOperationException e) {
log.warn(e.getMessage());
}
return systemInfo;
}
} }

View file

@ -0,0 +1,101 @@
package sysmon.plugins.os_base;
import org.pf4j.Extension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import oshi.SystemInfo;
import oshi.software.os.OSProcess;
import sysmon.shared.Measurement;
import sysmon.shared.MetricExtension;
import sysmon.shared.MetricResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@Extension
public class BaseProcessExtension implements MetricExtension {
private static final Logger log = LoggerFactory.getLogger(BaseProcessorExtension.class);
private final List<String> includeList = new ArrayList<String>() {{
add("java");
add("influxd");
add("grafana-server");
}};
private SystemInfo systemInfo;
@Override
public boolean isSupported() {
systemInfo = BasePlugin.getSystemInfo();
return systemInfo != null;
}
@Override
public String getName() {
return "base_process";
}
@Override
public String getProvides() {
return "process";
}
@Override
public String getDescription() {
return "Base Process Metrics";
}
@Override
public MetricResult getMetrics() {
// TODO: include-list and/or exclude-list of process names
ArrayList<Measurement> measurementList = new ArrayList<>();
List<OSProcess> processList = systemInfo.getOperatingSystem().getProcesses();
for(OSProcess p : processList) {
// Skip all the kernel processes
if(p.getResidentSetSize() < 1) {
continue;
}
String name = p.getName();
if(!includeList.contains(name)) {
continue;
}
log.info("pid: " + p.getProcessID() + ", name: " + name + ", virt: " + p.getVirtualSize() + " rss: " + p.getResidentSetSize());
//log.info(p.getProcessID() + " (" + p.getParentProcessID() + ") " + p.getName() + " " + p.getPath());
HashMap<String, String> tagsMap = new HashMap<>();
HashMap<String, Object> fieldsMap = new HashMap<>();
tagsMap.put("pid", String.valueOf(p.getProcessID()));
tagsMap.put("name", name);
fieldsMap.put("path", p.getPath());
fieldsMap.put("mem_rss", p.getResidentSetSize());
fieldsMap.put("mem_virt", p.getVirtualSize());
fieldsMap.put("kernel_time", p.getKernelTime());
fieldsMap.put("user_time", p.getUserTime());
fieldsMap.put("read_bytes", p.getBytesRead());
fieldsMap.put("write_bytes", p.getBytesWritten());
fieldsMap.put("files", p.getOpenFiles());
fieldsMap.put("threads", p.getThreadCount());
fieldsMap.put("user", p.getUser());
fieldsMap.put("group", p.getGroup());
measurementList.add(new Measurement(tagsMap, fieldsMap));
}
//log.info("Size of measurements: " + measurementList.size());
return new MetricResult(getName(), measurementList);
}
}

View file

@ -35,7 +35,7 @@ public class BaseProcessorExtension implements MetricExtension {
@Override @Override
public String getName() { public String getName() {
return "base-processor"; return "base_processor";
} }
@Override @Override
@ -52,8 +52,8 @@ public class BaseProcessorExtension implements MetricExtension {
@Override @Override
public MetricResult getMetrics() { public MetricResult getMetrics() {
Map<String, String> tagsMap = new HashMap<>(); HashMap<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>(); HashMap<String, Object> fieldsMap = new HashMap<>();
long[] ticks = hardwareAbstractionLayer.getProcessor().getSystemCpuLoadTicks(); long[] ticks = hardwareAbstractionLayer.getProcessor().getSystemCpuLoadTicks();
if(oldTicks == null || oldTicks.length != ticks.length) { if(oldTicks == null || oldTicks.length != ticks.length) {
@ -86,7 +86,7 @@ public class BaseProcessorExtension implements MetricExtension {
oldTicks = ticks; oldTicks = ticks;
log.debug(fieldsMap.toString()); log.debug(fieldsMap.toString());
return new MetricResult("processor", new Measurement(tagsMap, fieldsMap)); return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap));
} }
} }

View file

@ -10,6 +10,7 @@ import sysmon.shared.PluginHelper;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
// Disabled // Disabled
@ -36,12 +37,12 @@ public class LinuxNetstatExtension implements MetricExtension {
@Override @Override
public String getName() { public String getName() {
return "linux-network-netstat"; return "linux_network_netstat";
} }
@Override @Override
public String getProvides() { public String getProvides() {
return "network-netstat"; return "network_netstat";
} }
@Override @Override
@ -52,8 +53,8 @@ public class LinuxNetstatExtension implements MetricExtension {
@Override @Override
public MetricResult getMetrics() throws Exception { public MetricResult getMetrics() throws Exception {
Map<String, String> tagsMap = null; HashMap<String, String> tagsMap = null;
Map<String, Object> fieldsMap = null; HashMap<String, Object> fieldsMap = null;
try (InputStream inputStream = PluginHelper.executeCommand("netstat -s")) { try (InputStream inputStream = PluginHelper.executeCommand("netstat -s")) {
LinuxNetstatParser parser = processCommandOutput(inputStream); LinuxNetstatParser parser = processCommandOutput(inputStream);
@ -62,7 +63,7 @@ public class LinuxNetstatExtension implements MetricExtension {
} }
log.debug(fieldsMap.toString()); log.debug(fieldsMap.toString());
return new MetricResult("network_netstat", new Measurement(tagsMap, fieldsMap)); return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap));
} }

View file

@ -140,12 +140,12 @@ public class LinuxNetstatParser {
} }
public Map<String, String> getTags() { public HashMap<String, String> getTags() {
return new HashMap<>(); return new HashMap<>();
} }
public Map<String, Object> getFields() { public HashMap<String, Object> getFields() {
Map<String, Object> fields = new HashMap<>(); HashMap<String, Object> fields = new HashMap<>();
fields.put("ip_forwarded", ipForwarded); fields.put("ip_forwarded", ipForwarded);
fields.put("ip_received", ipTotalPacketsReceived); fields.put("ip_received", ipTotalPacketsReceived);
fields.put("ip_dropped", ipOutgoingPacketsDropped); fields.put("ip_dropped", ipOutgoingPacketsDropped);

View file

@ -76,13 +76,13 @@ public class LinuxNetworkSockStat {
} }
public Map<String, String> getTags() { public HashMap<String, String> getTags() {
return new HashMap<>(); return new HashMap<>();
} }
public Map<String, Object> getFields() { public HashMap<String, Object> getFields() {
Map<String, Object> fields = new HashMap<>(); HashMap<String, Object> fields = new HashMap<>();
fields.put("sockets", sockets); fields.put("sockets", sockets);
fields.put("tcp_inuse", tcp_inuse); fields.put("tcp_inuse", tcp_inuse);
fields.put("tcp_alloc", tcp_alloc); fields.put("tcp_alloc", tcp_alloc);

View file

@ -8,6 +8,7 @@ import sysmon.shared.MetricExtension;
import sysmon.shared.MetricResult; import sysmon.shared.MetricResult;
import sysmon.shared.PluginHelper; import sysmon.shared.PluginHelper;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -29,12 +30,12 @@ public class LinuxSockstatExtension implements MetricExtension {
@Override @Override
public String getName() { public String getName() {
return "linux-network-sockets"; return "linux_network_sockets";
} }
@Override @Override
public String getProvides() { public String getProvides() {
return "network-sockets"; return "network_sockets";
} }
@Override @Override
@ -47,11 +48,11 @@ public class LinuxSockstatExtension implements MetricExtension {
LinuxNetworkSockStat sockStat = processSockOutput(PluginHelper.readFile("/proc/net/sockstat")); LinuxNetworkSockStat sockStat = processSockOutput(PluginHelper.readFile("/proc/net/sockstat"));
Map<String, String> tagsMap = sockStat.getTags(); HashMap<String, String> tagsMap = sockStat.getTags();
Map<String, Object> fieldsMap = sockStat.getFields(); HashMap<String, Object> fieldsMap = sockStat.getFields();
log.debug(fieldsMap.toString()); log.debug(fieldsMap.toString());
return new MetricResult("network_sockets", new Measurement(tagsMap, fieldsMap)); return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap));
} }

View file

@ -2,6 +2,18 @@
Server component. Server component.
## Installation
TODO.
### Influx Database
Create a database for the metrics:
```text
CREATE DATABASE "sysmon" WITH DURATION 90d REPLICATION 1;
```
## Development ## Development

View file

@ -2,49 +2,66 @@ package sysmon.server;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
import org.apache.camel.Processor; import org.apache.camel.Processor;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point; import org.influxdb.dto.Point;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import sysmon.shared.Measurement; import sysmon.shared.Measurement;
import sysmon.shared.MetricResult; import sysmon.shared.MetricResult;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class MetricResultToPointProcessor implements Processor { public class MetricResultToPointProcessor implements Processor {
private static final Logger log = LoggerFactory.getLogger(MetricResultToPointProcessor.class); private static final Logger log = LoggerFactory.getLogger(MetricResultToPointProcessor.class);
private static String influxDbName;
MetricResultToPointProcessor(String influxDbName) {
MetricResultToPointProcessor.influxDbName = influxDbName;
}
@Override @Override
public void process(Exchange exchange) throws Exception { public void process(Exchange exchange) throws Exception {
MetricResult metricResult = exchange.getIn().getBody(MetricResult.class); MetricResult metricResult = exchange.getIn().getBody(MetricResult.class);
Measurement measurement = metricResult.getMeasurement(); List<Measurement> measurementList = metricResult.getMeasurements();
Point.Builder builder = Point.measurement(metricResult.getName()) //log.info("Size of measurements: " + measurementList.size());
.time(metricResult.getTimestamp(), TimeUnit.MILLISECONDS)
BatchPoints.Builder batchPoints = BatchPoints
.database(MetricResultToPointProcessor.influxDbName)
.precision(TimeUnit.MILLISECONDS)
.tag("hostname", metricResult.getHostname()); .tag("hostname", metricResult.getHostname());
for(Measurement measurement : measurementList) {
Point.Builder point = Point.measurement(metricResult.getName())
.time(metricResult.getTimestamp(), TimeUnit.MILLISECONDS);
for (Map.Entry<String,String> entry : measurement.getTags().entrySet()) { for (Map.Entry<String,String> entry : measurement.getTags().entrySet()) {
log.debug("process() - tag: " + entry.getKey() + "=" + entry.getValue()); //log.info("process() - tag: " + entry.getKey() + "=" + entry.getValue());
builder.tag(entry.getKey(), entry.getValue()); point.tag(entry.getKey(), entry.getValue());
} }
for (Map.Entry<String,Object> entry : measurement.getFields().entrySet()) { for (Map.Entry<String,Object> entry : measurement.getFields().entrySet()) {
log.debug("process() - field: " + entry.getKey() + "=" + entry.getValue()); //log.info("process() - field: " + entry.getKey() + "=" + entry.getValue());
if(entry.getValue() instanceof Number) { if(entry.getValue() instanceof Number) {
Number num = (Number) entry.getValue(); Number num = (Number) entry.getValue();
builder.addField(entry.getKey(), num); point.addField(entry.getKey(), num);
} else if(entry.getValue() instanceof Boolean) { } else if(entry.getValue() instanceof Boolean) {
Boolean bol = (Boolean) entry.getValue(); Boolean bol = (Boolean) entry.getValue();
builder.addField(entry.getKey(), bol); point.addField(entry.getKey(), bol);
} else { } else {
String str = (String) entry.getValue(); String str = (String) entry.getValue();
builder.addField(entry.getKey(), str); point.addField(entry.getKey(), str);
} }
} }
batchPoints.point(point.build());
}
exchange.getIn().setBody(builder.build()); exchange.getIn().setBody(batchPoints.build());
} }

View file

@ -3,6 +3,7 @@ package sysmon.server;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder; import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.influxdb.InfluxDbConstants; import org.apache.camel.component.influxdb.InfluxDbConstants;
import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.camel.model.rest.RestBindingMode; import org.apache.camel.model.rest.RestBindingMode;
import org.apache.camel.spi.Registry; import org.apache.camel.spi.Registry;
import sysmon.shared.MetricResult; import sysmon.shared.MetricResult;
@ -44,8 +45,8 @@ public class ServerRouteBuilder extends RouteBuilder {
fromF("seda:inbound?concurrentConsumers=%s", threads) fromF("seda:inbound?concurrentConsumers=%s", threads)
.log(">>> metric: ${header.hostname} - ${body}") .log(">>> metric: ${header.hostname} - ${body}")
.doTry() .doTry()
.process(new MetricResultToPointProcessor()) .process(new MetricResultToPointProcessor(dbname))
.toF("influxdb://ref.myInfluxConnection?databaseName=%s&retentionPolicy=autogen", dbname) .toF("influxdb://ref.myInfluxConnection?batch=true") //&retentionPolicy=autogen
.doCatch(Exception.class) .doCatch(Exception.class)
.log("Error storing metric to InfluxDB: ${exception}") .log("Error storing metric to InfluxDB: ${exception}")
.end(); .end();

View file

@ -41,3 +41,5 @@ camel.main.lightweight = true
# configure beans # configure beans
#camel.beans.incomingMetricProcessor = #class:IncomingMetricProcessor #camel.beans.incomingMetricProcessor = #class:IncomingMetricProcessor
#camel.beans.hello = #class:Hello #camel.beans.hello = #class:Hello
#camel.dataformat.json-jackson.use-list = true

View file

@ -1,19 +1,20 @@
package sysmon.shared; package sysmon.shared;
import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
public class Measurement { public class Measurement implements Serializable {
private Map<String, String> tags = new HashMap<>(); private HashMap<String, String> tags = new HashMap<>();
private Map<String, Object> fields = new HashMap<>(); private HashMap<String, Object> fields = new HashMap<>();
public Measurement() { public Measurement() {
} }
public Measurement(Map<String, String> tags, Map<String, Object> fields) { public Measurement(HashMap<String, String> tags, HashMap<String, Object> fields) {
this.tags = Objects.requireNonNull(tags); this.tags = Objects.requireNonNull(tags);
this.fields = Objects.requireNonNull(fields); this.fields = Objects.requireNonNull(fields);
} }
@ -26,12 +27,12 @@ public class Measurement {
return fields; return fields;
} }
public void setTags(Map<String, String> tags) { public void setTags(HashMap<String, String> tags) {
Objects.requireNonNull(tags); Objects.requireNonNull(tags);
this.tags = tags; this.tags = tags;
} }
public void setFields(Map<String, Object> fields) { public void setFields(HashMap<String, Object> fields) {
Objects.requireNonNull(fields); Objects.requireNonNull(fields);
this.fields = fields; this.fields = fields;
} }

View file

@ -13,7 +13,7 @@ public class MetricResult implements Serializable {
private String name; private String name;
private String hostname; private String hostname;
private Long timestamp; // epoch milli private Long timestamp; // epoch milli
private List<Measurement> measurements; private ArrayList<Measurement> measurements;
public MetricResult() { public MetricResult() {
} }
@ -31,7 +31,7 @@ public class MetricResult implements Serializable {
}}; }};
} }
public MetricResult(String name, List<Measurement> measurements) { public MetricResult(String name, ArrayList<Measurement> measurements) {
this.name = name; this.name = name;
this.timestamp = Instant.now().toEpochMilli(); this.timestamp = Instant.now().toEpochMilli();
this.measurements = measurements; this.measurements = measurements;
@ -43,7 +43,7 @@ public class MetricResult implements Serializable {
}}; }};
} }
public void setMeasurements(List<Measurement> measurements) { public void setMeasurements(ArrayList<Measurement> measurements) {
this.measurements = measurements; this.measurements = measurements;
} }
@ -71,27 +71,23 @@ public class MetricResult implements Serializable {
return hostname; return hostname;
} }
/*
public Measurement getMeasurement() { public Measurement getMeasurement() {
if(measurements != null && measurements.size() > 0) { if(measurements != null && !measurements.isEmpty()) {
return measurements.get(0); return measurements.get(0);
} }
return null; return null;
} }
*/
public List<Measurement> getMeasurements() { public ArrayList<Measurement> getMeasurements() {
if(measurements != null && measurements.size() > 0) {
return measurements; return measurements;
} }
return null;
}
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(String.format("%s - %s => ", timestamp.toString(), name)); StringBuilder sb = new StringBuilder(String.format("%s - %s => ", timestamp.toString(), name));
if(measurements != null && measurements.size() > 0) { if(measurements != null && !measurements.isEmpty()) {
sb.append("{"); sb.append("{");
for(Measurement m : measurements) { for(Measurement m : measurements) {
@ -109,7 +105,6 @@ public class MetricResult implements Serializable {
sb.append("},"); sb.append("},");
} }
return sb.append(" }").toString(); return sb.append(" }").toString();
} }