From 612a42547a0241a80c5e0cd46ade725445a8a1ae Mon Sep 17 00:00:00 2001 From: Mark Nellemann Date: Thu, 2 Sep 2021 21:40:22 +0200 Subject: [PATCH] Refactored to support multiple measurements in a result. --- .../sysmon/client/ClientRouteBuilder.java | 2 +- .../sysmon/client/MetricEnrichProcessor.java | 2 +- .../src/main/resources/application.properties | 2 + gradle.properties | 2 +- .../plugins/os_aix/AixNetstatExtension.java | 11 +- .../plugins/os_aix/AixNetstatParser.java | 6 +- .../plugins/os_aix/AixProcessorExtension.java | 11 +- .../plugins/os_aix/AixProcessorStat.java | 6 +- .../plugins/os_base/BaseDiskExtension.java | 8 +- .../plugins/os_base/BaseMemoryExtension.java | 8 +- .../plugins/os_base/BaseNetworkExtension.java | 8 +- .../sysmon/plugins/os_base/BasePlugin.java | 15 +++ .../plugins/os_base/BaseProcessExtension.java | 101 ++++++++++++++++++ .../os_base/BaseProcessorExtension.java | 8 +- .../os_linux/LinuxNetstatExtension.java | 11 +- .../plugins/os_linux/LinuxNetstatParser.java | 6 +- .../os_linux/LinuxNetworkSockStat.java | 6 +- .../os_linux/LinuxSockstatExtension.java | 11 +- server/README.md | 12 +++ .../server/MetricResultToPointProcessor.java | 57 ++++++---- .../sysmon/server/ServerRouteBuilder.java | 5 +- .../src/main/resources/application.properties | 2 + .../main/java/sysmon/shared/Measurement.java | 13 +-- .../main/java/sysmon/shared/MetricResult.java | 23 ++-- 24 files changed, 243 insertions(+), 93 deletions(-) create mode 100644 plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseProcessExtension.java diff --git a/client/src/main/java/sysmon/client/ClientRouteBuilder.java b/client/src/main/java/sysmon/client/ClientRouteBuilder.java index 51683aa..d5f15dd 100644 --- a/client/src/main/java/sysmon/client/ClientRouteBuilder.java +++ b/client/src/main/java/sysmon/client/ClientRouteBuilder.java @@ -54,12 +54,12 @@ public class ClientRouteBuilder extends RouteBuilder { from("timer:extensions?fixedRate=true&period=30s") .bean(ext, "getMetrics") //.doTry() + .outputType(MetricResult.class) .process(new MetricEnrichProcessor(registry)) .choice().when(exchangeProperty("skip").isEqualTo(true)) .log("Skipping empty measurement.") .stop() .otherwise() - .log(">>> ${body}") .to("seda:metrics?discardWhenFull=true"); } else { log.info(">>> Skipping extension (not supported here): " + ext.getDescription()); diff --git a/client/src/main/java/sysmon/client/MetricEnrichProcessor.java b/client/src/main/java/sysmon/client/MetricEnrichProcessor.java index 0ce6004..1a46d46 100644 --- a/client/src/main/java/sysmon/client/MetricEnrichProcessor.java +++ b/client/src/main/java/sysmon/client/MetricEnrichProcessor.java @@ -20,7 +20,7 @@ public class MetricEnrichProcessor implements Processor { MetricResult metricResult = exchange.getIn().getBody(MetricResult.class); // We make sure MetricResults with no measurements are not sent further down the line - if(metricResult == null || metricResult.getMeasurement() == null) { + if(metricResult == null || metricResult.getMeasurements() == null) { exchange.setProperty("skip", true); return; } diff --git a/client/src/main/resources/application.properties b/client/src/main/resources/application.properties index 3789f73..ef85751 100644 --- a/client/src/main/resources/application.properties +++ b/client/src/main/resources/application.properties @@ -40,3 +40,5 @@ camel.main.lightweight = true # configure beans #camel.beans.metricProcessor = #class:org.sysmon.client.MetricProcessor + +#camel.dataformat.json-jackson.use-list = true \ No newline at end of file diff --git a/gradle.properties b/gradle.properties index da8f1bf..73e8f63 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=0.0.6 +version=0.0.7 pf4jVersion=3.6.0 slf4jVersion=1.7.32 camelVersion=3.11.0 diff --git a/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixNetstatExtension.java b/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixNetstatExtension.java index 007410a..ae93a31 100644 --- a/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixNetstatExtension.java +++ b/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixNetstatExtension.java @@ -10,6 +10,7 @@ import sysmon.shared.PluginHelper; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; import java.util.Map; // Disabled @@ -36,12 +37,12 @@ public class AixNetstatExtension implements MetricExtension { @Override public String getName() { - return "aix-network-netstat"; + return "aix_network_netstat"; } @Override public String getProvides() { - return "network-netstat"; + return "network_netstat"; } @Override @@ -52,8 +53,8 @@ public class AixNetstatExtension implements MetricExtension { @Override public MetricResult getMetrics() throws Exception { - Map tagsMap = null; - Map fieldsMap = null; + HashMap tagsMap = null; + HashMap fieldsMap = null; try (InputStream buf = PluginHelper.executeCommand("netstat -s -f inet")) { AixNetstatParser parser = processCommandOutput(buf); @@ -62,7 +63,7 @@ public class AixNetstatExtension implements MetricExtension { } log.debug(fieldsMap.toString()); - return new MetricResult("network_netstat", new Measurement(tagsMap, fieldsMap)); + return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap)); } diff --git a/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixNetstatParser.java b/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixNetstatParser.java index fad950f..21c5ef3 100644 --- a/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixNetstatParser.java +++ b/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixNetstatParser.java @@ -130,12 +130,12 @@ public class AixNetstatParser { } - public Map getTags() { + public HashMap getTags() { return new HashMap<>(); } - public Map getFields() { - Map fields = new HashMap<>(); + public HashMap getFields() { + HashMap fields = new HashMap<>(); fields.put("ip_forwarded", ipForwarded); fields.put("ip_received", ipTotalPacketsReceived); diff --git a/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixProcessorExtension.java b/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixProcessorExtension.java index a9f12ca..5ee57ed 100644 --- a/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixProcessorExtension.java +++ b/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixProcessorExtension.java @@ -10,6 +10,7 @@ import sysmon.shared.PluginHelper; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -37,12 +38,12 @@ public class AixProcessorExtension implements MetricExtension { @Override public String getName() { - return "aix-processor"; + return "aix_processor"; } @Override public String getProvides() { - return "processor-lpar"; + return "processor_lpar"; } @Override @@ -53,8 +54,8 @@ public class AixProcessorExtension implements MetricExtension { @Override public MetricResult getMetrics() throws Exception { - Map tagsMap = null; - Map fieldsMap = null; + HashMap tagsMap = null; + HashMap fieldsMap = null; try (InputStream buf = PluginHelper.executeCommand("lparstat 5 1")) { AixProcessorStat processorStat = processCommandOutput(buf); @@ -62,7 +63,7 @@ public class AixProcessorExtension implements MetricExtension { fieldsMap = processorStat.getFields(); } - return new MetricResult("processor_lpar", new Measurement(tagsMap, fieldsMap)); + return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap)); } diff --git a/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixProcessorStat.java b/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixProcessorStat.java index c1f62b3..c42d801 100644 --- a/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixProcessorStat.java +++ b/plugins/os-aix/src/main/java/sysmon/plugins/os_aix/AixProcessorStat.java @@ -140,12 +140,12 @@ public class AixProcessorStat { return 100 - idle; } - public Map getTags() { + public HashMap getTags() { return new HashMap<>(); } - public Map getFields() { - Map fields = new HashMap<>(); + public HashMap getFields() { + HashMap fields = new HashMap<>(); fields.put("lcpu", lcpu); fields.put("ent", ent); fields.put("user", user); diff --git a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseDiskExtension.java b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseDiskExtension.java index 3f5fa2d..9443593 100644 --- a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseDiskExtension.java +++ b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseDiskExtension.java @@ -30,7 +30,7 @@ public class BaseDiskExtension implements MetricExtension { @Override public String getName() { - return "base-disk"; + return "base_disk"; } @Override @@ -52,8 +52,8 @@ public class BaseDiskExtension implements MetricExtension { long transferTime = 0L; long queueLength = 0L; - Map tagsMap = new HashMap<>(); - Map fieldsMap = new HashMap<>(); + HashMap tagsMap = new HashMap<>(); + HashMap fieldsMap = new HashMap<>(); List diskStores = hardwareAbstractionLayer.getDiskStores(); for(HWDiskStore store : diskStores) { @@ -73,7 +73,7 @@ public class BaseDiskExtension implements MetricExtension { fieldsMap.put("queue", queueLength); log.debug(fieldsMap.toString()); - return new MetricResult("disk", new Measurement(tagsMap, fieldsMap)); + return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap)); } } \ No newline at end of file diff --git a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseMemoryExtension.java b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseMemoryExtension.java index 024c46a..5058faa 100644 --- a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseMemoryExtension.java +++ b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseMemoryExtension.java @@ -27,7 +27,7 @@ public class BaseMemoryExtension implements MetricExtension { @Override public String getName() { - return "base-memory"; + return "base_memory"; } @Override @@ -43,8 +43,8 @@ public class BaseMemoryExtension implements MetricExtension { @Override public MetricResult getMetrics() { - Map tagsMap = new HashMap<>(); - Map fieldsMap = new HashMap<>(); + HashMap tagsMap = new HashMap<>(); + HashMap fieldsMap = new HashMap<>(); long total = hardwareAbstractionLayer.getMemory().getTotal(); long available = hardwareAbstractionLayer.getMemory().getAvailable(); @@ -57,7 +57,7 @@ public class BaseMemoryExtension implements MetricExtension { fieldsMap.put("virtual", hardwareAbstractionLayer.getMemory().getVirtualMemory().getVirtualInUse()); log.debug(fieldsMap.toString()); - return new MetricResult("memory", new Measurement(tagsMap, fieldsMap)); + return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap)); } diff --git a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseNetworkExtension.java b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseNetworkExtension.java index a16efba..1c961f5 100644 --- a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseNetworkExtension.java +++ b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseNetworkExtension.java @@ -29,7 +29,7 @@ public class BaseNetworkExtension implements MetricExtension { @Override public String getName() { - return "base-network"; + return "base_network"; } @Override @@ -52,8 +52,8 @@ public class BaseNetworkExtension implements MetricExtension { long txPackets = 0L; long txErrs = 0L; - Map tagsMap = new HashMap<>(); - Map fieldsMap = new HashMap<>(); + HashMap tagsMap = new HashMap<>(); + HashMap fieldsMap = new HashMap<>(); List interfaces = hardwareAbstractionLayer.getNetworkIFs(); for(NetworkIF netif : interfaces) { @@ -75,7 +75,7 @@ public class BaseNetworkExtension implements MetricExtension { fieldsMap.put("txErrors", txErrs); log.debug(fieldsMap.toString()); - return new MetricResult("network", new Measurement(tagsMap, fieldsMap)); + return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap)); } } diff --git a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BasePlugin.java b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BasePlugin.java index 87f193e..4be658b 100644 --- a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BasePlugin.java +++ b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BasePlugin.java @@ -36,4 +36,19 @@ public class BasePlugin extends Plugin { return hardwareAbstractionLayer; } + + public static SystemInfo getSystemInfo() { + + try { + if(systemInfo == null) { + systemInfo = new SystemInfo(); + } + + } catch (UnsupportedOperationException e) { + log.warn(e.getMessage()); + } + + return systemInfo; + } + } diff --git a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseProcessExtension.java b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseProcessExtension.java new file mode 100644 index 0000000..94b05ea --- /dev/null +++ b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseProcessExtension.java @@ -0,0 +1,101 @@ +package sysmon.plugins.os_base; + +import org.pf4j.Extension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import oshi.SystemInfo; +import oshi.software.os.OSProcess; +import sysmon.shared.Measurement; +import sysmon.shared.MetricExtension; +import sysmon.shared.MetricResult; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +@Extension +public class BaseProcessExtension implements MetricExtension { + + private static final Logger log = LoggerFactory.getLogger(BaseProcessorExtension.class); + + private final List includeList = new ArrayList() {{ + add("java"); + add("influxd"); + add("grafana-server"); + }}; + + private SystemInfo systemInfo; + + @Override + public boolean isSupported() { + systemInfo = BasePlugin.getSystemInfo(); + return systemInfo != null; + } + + @Override + public String getName() { + return "base_process"; + } + + @Override + public String getProvides() { + return "process"; + } + + @Override + public String getDescription() { + return "Base Process Metrics"; + } + + + @Override + public MetricResult getMetrics() { + + // TODO: include-list and/or exclude-list of process names + + ArrayList measurementList = new ArrayList<>(); + + List processList = systemInfo.getOperatingSystem().getProcesses(); + for(OSProcess p : processList) { + + // Skip all the kernel processes + if(p.getResidentSetSize() < 1) { + continue; + } + + String name = p.getName(); + if(!includeList.contains(name)) { + continue; + } + log.info("pid: " + p.getProcessID() + ", name: " + name + ", virt: " + p.getVirtualSize() + " rss: " + p.getResidentSetSize()); + + //log.info(p.getProcessID() + " (" + p.getParentProcessID() + ") " + p.getName() + " " + p.getPath()); + + HashMap tagsMap = new HashMap<>(); + HashMap fieldsMap = new HashMap<>(); + + tagsMap.put("pid", String.valueOf(p.getProcessID())); + tagsMap.put("name", name); + + fieldsMap.put("path", p.getPath()); + fieldsMap.put("mem_rss", p.getResidentSetSize()); + fieldsMap.put("mem_virt", p.getVirtualSize()); + fieldsMap.put("kernel_time", p.getKernelTime()); + fieldsMap.put("user_time", p.getUserTime()); + fieldsMap.put("read_bytes", p.getBytesRead()); + fieldsMap.put("write_bytes", p.getBytesWritten()); + fieldsMap.put("files", p.getOpenFiles()); + fieldsMap.put("threads", p.getThreadCount()); + fieldsMap.put("user", p.getUser()); + fieldsMap.put("group", p.getGroup()); + + measurementList.add(new Measurement(tagsMap, fieldsMap)); + } + + //log.info("Size of measurements: " + measurementList.size()); + return new MetricResult(getName(), measurementList); + } + +} + + diff --git a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseProcessorExtension.java b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseProcessorExtension.java index c444fc6..76cf809 100644 --- a/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseProcessorExtension.java +++ b/plugins/os-base/src/main/java/sysmon/plugins/os_base/BaseProcessorExtension.java @@ -35,7 +35,7 @@ public class BaseProcessorExtension implements MetricExtension { @Override public String getName() { - return "base-processor"; + return "base_processor"; } @Override @@ -52,8 +52,8 @@ public class BaseProcessorExtension implements MetricExtension { @Override public MetricResult getMetrics() { - Map tagsMap = new HashMap<>(); - Map fieldsMap = new HashMap<>(); + HashMap tagsMap = new HashMap<>(); + HashMap fieldsMap = new HashMap<>(); long[] ticks = hardwareAbstractionLayer.getProcessor().getSystemCpuLoadTicks(); if(oldTicks == null || oldTicks.length != ticks.length) { @@ -86,7 +86,7 @@ public class BaseProcessorExtension implements MetricExtension { oldTicks = ticks; log.debug(fieldsMap.toString()); - return new MetricResult("processor", new Measurement(tagsMap, fieldsMap)); + return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap)); } } diff --git a/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxNetstatExtension.java b/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxNetstatExtension.java index 0a1d80b..1b561e1 100644 --- a/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxNetstatExtension.java +++ b/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxNetstatExtension.java @@ -10,6 +10,7 @@ import sysmon.shared.PluginHelper; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; import java.util.Map; // Disabled @@ -36,12 +37,12 @@ public class LinuxNetstatExtension implements MetricExtension { @Override public String getName() { - return "linux-network-netstat"; + return "linux_network_netstat"; } @Override public String getProvides() { - return "network-netstat"; + return "network_netstat"; } @Override @@ -52,8 +53,8 @@ public class LinuxNetstatExtension implements MetricExtension { @Override public MetricResult getMetrics() throws Exception { - Map tagsMap = null; - Map fieldsMap = null; + HashMap tagsMap = null; + HashMap fieldsMap = null; try (InputStream inputStream = PluginHelper.executeCommand("netstat -s")) { LinuxNetstatParser parser = processCommandOutput(inputStream); @@ -62,7 +63,7 @@ public class LinuxNetstatExtension implements MetricExtension { } log.debug(fieldsMap.toString()); - return new MetricResult("network_netstat", new Measurement(tagsMap, fieldsMap)); + return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap)); } diff --git a/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxNetstatParser.java b/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxNetstatParser.java index a46883a..876c105 100644 --- a/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxNetstatParser.java +++ b/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxNetstatParser.java @@ -140,12 +140,12 @@ public class LinuxNetstatParser { } - public Map getTags() { + public HashMap getTags() { return new HashMap<>(); } - public Map getFields() { - Map fields = new HashMap<>(); + public HashMap getFields() { + HashMap fields = new HashMap<>(); fields.put("ip_forwarded", ipForwarded); fields.put("ip_received", ipTotalPacketsReceived); fields.put("ip_dropped", ipOutgoingPacketsDropped); diff --git a/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxNetworkSockStat.java b/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxNetworkSockStat.java index 91148fe..ae1f77e 100644 --- a/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxNetworkSockStat.java +++ b/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxNetworkSockStat.java @@ -76,13 +76,13 @@ public class LinuxNetworkSockStat { } - public Map getTags() { + public HashMap getTags() { return new HashMap<>(); } - public Map getFields() { - Map fields = new HashMap<>(); + public HashMap getFields() { + HashMap fields = new HashMap<>(); fields.put("sockets", sockets); fields.put("tcp_inuse", tcp_inuse); fields.put("tcp_alloc", tcp_alloc); diff --git a/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxSockstatExtension.java b/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxSockstatExtension.java index 772ff83..46e6dc9 100644 --- a/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxSockstatExtension.java +++ b/plugins/os-linux/src/main/java/sysmon/plugins/os_linux/LinuxSockstatExtension.java @@ -8,6 +8,7 @@ import sysmon.shared.MetricExtension; import sysmon.shared.MetricResult; import sysmon.shared.PluginHelper; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,12 +30,12 @@ public class LinuxSockstatExtension implements MetricExtension { @Override public String getName() { - return "linux-network-sockets"; + return "linux_network_sockets"; } @Override public String getProvides() { - return "network-sockets"; + return "network_sockets"; } @Override @@ -47,11 +48,11 @@ public class LinuxSockstatExtension implements MetricExtension { LinuxNetworkSockStat sockStat = processSockOutput(PluginHelper.readFile("/proc/net/sockstat")); - Map tagsMap = sockStat.getTags(); - Map fieldsMap = sockStat.getFields(); + HashMap tagsMap = sockStat.getTags(); + HashMap fieldsMap = sockStat.getFields(); log.debug(fieldsMap.toString()); - return new MetricResult("network_sockets", new Measurement(tagsMap, fieldsMap)); + return new MetricResult(getName(), new Measurement(tagsMap, fieldsMap)); } diff --git a/server/README.md b/server/README.md index b9f5a98..4546314 100644 --- a/server/README.md +++ b/server/README.md @@ -2,6 +2,18 @@ Server component. +## Installation + +TODO. + +### Influx Database + +Create a database for the metrics: + +```text +CREATE DATABASE "sysmon" WITH DURATION 90d REPLICATION 1; +``` + ## Development diff --git a/server/src/main/java/sysmon/server/MetricResultToPointProcessor.java b/server/src/main/java/sysmon/server/MetricResultToPointProcessor.java index 26eff05..7afafd0 100644 --- a/server/src/main/java/sysmon/server/MetricResultToPointProcessor.java +++ b/server/src/main/java/sysmon/server/MetricResultToPointProcessor.java @@ -2,49 +2,66 @@ package sysmon.server; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import sysmon.shared.Measurement; import sysmon.shared.MetricResult; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; public class MetricResultToPointProcessor implements Processor { private static final Logger log = LoggerFactory.getLogger(MetricResultToPointProcessor.class); + private static String influxDbName; + + MetricResultToPointProcessor(String influxDbName) { + MetricResultToPointProcessor.influxDbName = influxDbName; + } @Override public void process(Exchange exchange) throws Exception { MetricResult metricResult = exchange.getIn().getBody(MetricResult.class); - Measurement measurement = metricResult.getMeasurement(); + List measurementList = metricResult.getMeasurements(); - Point.Builder builder = Point.measurement(metricResult.getName()) - .time(metricResult.getTimestamp(), TimeUnit.MILLISECONDS) - .tag("hostname", metricResult.getHostname()); + //log.info("Size of measurements: " + measurementList.size()); - for (Map.Entry entry : measurement.getTags().entrySet()) { - log.debug("process() - tag: " + entry.getKey() + "=" + entry.getValue()); - builder.tag(entry.getKey(), entry.getValue()); - } + BatchPoints.Builder batchPoints = BatchPoints + .database(MetricResultToPointProcessor.influxDbName) + .precision(TimeUnit.MILLISECONDS) + .tag("hostname", metricResult.getHostname()); - for (Map.Entry entry : measurement.getFields().entrySet()) { - log.debug("process() - field: " + entry.getKey() + "=" + entry.getValue()); - if(entry.getValue() instanceof Number) { - Number num = (Number) entry.getValue(); - builder.addField(entry.getKey(), num); - } else if(entry.getValue() instanceof Boolean) { - Boolean bol = (Boolean) entry.getValue(); - builder.addField(entry.getKey(), bol); - } else { - String str = (String) entry.getValue(); - builder.addField(entry.getKey(), str); + for(Measurement measurement : measurementList) { + + Point.Builder point = Point.measurement(metricResult.getName()) + .time(metricResult.getTimestamp(), TimeUnit.MILLISECONDS); + + for (Map.Entry entry : measurement.getTags().entrySet()) { + //log.info("process() - tag: " + entry.getKey() + "=" + entry.getValue()); + point.tag(entry.getKey(), entry.getValue()); } + + for (Map.Entry entry : measurement.getFields().entrySet()) { + //log.info("process() - field: " + entry.getKey() + "=" + entry.getValue()); + if(entry.getValue() instanceof Number) { + Number num = (Number) entry.getValue(); + point.addField(entry.getKey(), num); + } else if(entry.getValue() instanceof Boolean) { + Boolean bol = (Boolean) entry.getValue(); + point.addField(entry.getKey(), bol); + } else { + String str = (String) entry.getValue(); + point.addField(entry.getKey(), str); + } + } + batchPoints.point(point.build()); } - exchange.getIn().setBody(builder.build()); + exchange.getIn().setBody(batchPoints.build()); } diff --git a/server/src/main/java/sysmon/server/ServerRouteBuilder.java b/server/src/main/java/sysmon/server/ServerRouteBuilder.java index 0cc0f39..a51e849 100644 --- a/server/src/main/java/sysmon/server/ServerRouteBuilder.java +++ b/server/src/main/java/sysmon/server/ServerRouteBuilder.java @@ -3,6 +3,7 @@ package sysmon.server; import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.influxdb.InfluxDbConstants; +import org.apache.camel.component.jackson.JacksonDataFormat; import org.apache.camel.model.rest.RestBindingMode; import org.apache.camel.spi.Registry; import sysmon.shared.MetricResult; @@ -44,8 +45,8 @@ public class ServerRouteBuilder extends RouteBuilder { fromF("seda:inbound?concurrentConsumers=%s", threads) .log(">>> metric: ${header.hostname} - ${body}") .doTry() - .process(new MetricResultToPointProcessor()) - .toF("influxdb://ref.myInfluxConnection?databaseName=%s&retentionPolicy=autogen", dbname) + .process(new MetricResultToPointProcessor(dbname)) + .toF("influxdb://ref.myInfluxConnection?batch=true") //&retentionPolicy=autogen .doCatch(Exception.class) .log("Error storing metric to InfluxDB: ${exception}") .end(); diff --git a/server/src/main/resources/application.properties b/server/src/main/resources/application.properties index c6e2b8f..f9f82ea 100644 --- a/server/src/main/resources/application.properties +++ b/server/src/main/resources/application.properties @@ -41,3 +41,5 @@ camel.main.lightweight = true # configure beans #camel.beans.incomingMetricProcessor = #class:IncomingMetricProcessor #camel.beans.hello = #class:Hello + +#camel.dataformat.json-jackson.use-list = true \ No newline at end of file diff --git a/shared/src/main/java/sysmon/shared/Measurement.java b/shared/src/main/java/sysmon/shared/Measurement.java index 35ce34d..07c9290 100644 --- a/shared/src/main/java/sysmon/shared/Measurement.java +++ b/shared/src/main/java/sysmon/shared/Measurement.java @@ -1,19 +1,20 @@ package sysmon.shared; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.Objects; -public class Measurement { +public class Measurement implements Serializable { - private Map tags = new HashMap<>(); - private Map fields = new HashMap<>(); + private HashMap tags = new HashMap<>(); + private HashMap fields = new HashMap<>(); public Measurement() { } - public Measurement(Map tags, Map fields) { + public Measurement(HashMap tags, HashMap fields) { this.tags = Objects.requireNonNull(tags); this.fields = Objects.requireNonNull(fields); } @@ -26,12 +27,12 @@ public class Measurement { return fields; } - public void setTags(Map tags) { + public void setTags(HashMap tags) { Objects.requireNonNull(tags); this.tags = tags; } - public void setFields(Map fields) { + public void setFields(HashMap fields) { Objects.requireNonNull(fields); this.fields = fields; } diff --git a/shared/src/main/java/sysmon/shared/MetricResult.java b/shared/src/main/java/sysmon/shared/MetricResult.java index d031743..a4c3b35 100644 --- a/shared/src/main/java/sysmon/shared/MetricResult.java +++ b/shared/src/main/java/sysmon/shared/MetricResult.java @@ -13,7 +13,7 @@ public class MetricResult implements Serializable { private String name; private String hostname; private Long timestamp; // epoch milli - private List measurements; + private ArrayList measurements; public MetricResult() { } @@ -31,7 +31,7 @@ public class MetricResult implements Serializable { }}; } - public MetricResult(String name, List measurements) { + public MetricResult(String name, ArrayList measurements) { this.name = name; this.timestamp = Instant.now().toEpochMilli(); this.measurements = measurements; @@ -43,7 +43,7 @@ public class MetricResult implements Serializable { }}; } - public void setMeasurements(List measurements) { + public void setMeasurements(ArrayList measurements) { this.measurements = measurements; } @@ -71,27 +71,23 @@ public class MetricResult implements Serializable { return hostname; } + /* public Measurement getMeasurement() { - if(measurements != null && measurements.size() > 0) { + if(measurements != null && !measurements.isEmpty()) { return measurements.get(0); } return null; } + */ - public List getMeasurements() { - if(measurements != null && measurements.size() > 0) { - return measurements; - } - return null; - + public ArrayList getMeasurements() { + return measurements; } - - public String toString() { StringBuilder sb = new StringBuilder(String.format("%s - %s => ", timestamp.toString(), name)); - if(measurements != null && measurements.size() > 0) { + if(measurements != null && !measurements.isEmpty()) { sb.append("{"); for(Measurement m : measurements) { @@ -109,7 +105,6 @@ public class MetricResult implements Serializable { sb.append("},"); } - return sb.append(" }").toString(); }