Allow for more details in metric measurements.

This commit is contained in:
Mark Nellemann 2021-05-08 18:55:37 +02:00
parent 0dcd02e2b5
commit ce896b479b
17 changed files with 204 additions and 103 deletions

View file

@ -7,15 +7,19 @@ import org.apache.camel.main.Main;
import picocli.CommandLine;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.util.concurrent.Callable;
@CommandLine.Command(name = "sysmon-client", mixinStandardHelpOptions = true)
public class Application implements Callable<Integer> {
@CommandLine.Option(names = { "-s", "--server-url" }, description = "Server URL [default: 'http://127.0.0.1:9925/metrics'].", defaultValue = "http://127.0.0.1:9925/metrics", paramLabel = "<url>")
@CommandLine.Option(names = { "-s", "--server-url" }, description = "Server URL (default: ${DEFAULT-VALUE}).", defaultValue = "http://127.0.0.1:9925/metrics", paramLabel = "<url>")
private URL serverUrl;
@CommandLine.Option(names = { "-n", "--hostname" }, description = "Client hostname.", paramLabel = "<name>")
private String hostname;
public static void main(String... args) {
int exitCode = new CommandLine(new Application()).execute(args);
@ -26,8 +30,13 @@ public class Application implements Callable<Integer> {
@Override
public Integer call() throws IOException {
if(hostname == null || hostname.isEmpty()) {
hostname = InetAddress.getLocalHost().getHostName();
}
Main main = new Main();
main.bind("myServerUrl", serverUrl.toString());
main.bind("myHostname", hostname);
main.configure().addRoutesBuilder(ClientRouteBuilder.class);
// now keep the application running until the JVM is terminated (ctrl + c or sigterm)

View file

@ -33,12 +33,12 @@ public class ClientRouteBuilder extends RouteBuilder {
log.info(">>> Enabling extension: " + ext.getDescription());
// Setup Camel route for this extension
from("timer:collect?period=5000")
from("timer:collect?period=15000")
.bean(ext, "getMetrics")
//.doTry()
.process(new MetricEnrichProcessor())
.process(new MetricEnrichProcessor(registry))
.choice().when(exchangeProperty("skip").isEqualTo(true))
.log("Skipping: ${body}")
.log("Skipping empty: ${body}")
.stop()
.otherwise()
.to("seda:metrics");

View file

@ -2,27 +2,34 @@ package org.sysmon.client;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.Registry;
import org.sysmon.shared.MetricResult;
public class MetricEnrichProcessor implements Processor {
// TODO: Read hostname from future configuration
private final static String hostname = "saruman";
private final Registry registry;
public MetricEnrichProcessor(Registry registry) {
this.registry = registry;
}
public void process(Exchange exchange) throws Exception {
MetricResult result = exchange.getIn().getBody(MetricResult.class);
MetricResult metricResult = exchange.getIn().getBody(MetricResult.class);
// We make sure MetricResults with no measurements are not sent further down the line
if(result == null || result.getMeasurements().size() < 1) {
if(metricResult == null || metricResult.getMeasurements().size() < 1) {
exchange.setProperty("skip", true);
return;
}
result.setHostname(hostname);
metricResult.setHostname((String)registry.lookupByName("myHostname"));
exchange.getIn().setHeader("component", result.getName());
exchange.getIn().setBody(result);
exchange.getIn().setHeader("hostname", metricResult.getHostname());
exchange.getIn().setHeader("metric", metricResult.getName());
exchange.getIn().setBody(metricResult);
}
}

View file

@ -3,13 +3,15 @@ package org.sysmon.plugins.sysmon_aix;
import org.pf4j.Extension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sysmon.shared.Measurement;
import org.sysmon.shared.MetricExtension;
import org.sysmon.shared.MeasurementPair;
import org.sysmon.shared.MetricResult;
import org.sysmon.shared.PluginHelper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Extension
public class AixProcessorExtension implements MetricExtension {
@ -35,12 +37,23 @@ public class AixProcessorExtension implements MetricExtension {
public MetricResult getMetrics() {
MetricResult result = new MetricResult("processor");
List<Measurement> measurementList = new ArrayList<>();
List<String> mpstat = PluginHelper.executeCommand("mpstat", "-a");
List<AixProcessorStat> processorStats = processCommandOutput(mpstat);
for(AixProcessorStat stat : processorStats) {
result.addMeasurement(new MeasurementPair(String.format("cpu%d", stat.getCpuNum()), stat.getUtilizationPercentage()));
Map<String, String> tagsMap = new HashMap<>();
tagsMap.put("cpu", stat.getName());
// TODO: entitlements as tag or field ?
Map<String, Object> fieldsMap = new HashMap<>();
fieldsMap.put("utilization", stat.getUtilizationPercentage());
measurementList.add(new Measurement(tagsMap, fieldsMap));
}
result.addMeasurements(measurementList);
return result;
}

View file

@ -2,7 +2,7 @@ package org.sysmon.plugins.sysmon_aix;
public class AixProcessorStat {
private final Integer cpuNum;
private final String name;
private final Float userTime;
private final Float systemTime;
private final Float waitTime;
@ -16,7 +16,7 @@ public class AixProcessorStat {
throw new UnsupportedOperationException("AIX mpstat CPU string error: " + procString);
}
this.cpuNum = Integer.parseInt(splitStr[0]);
this.name = "cpu" + splitStr[0];
this.userTime = Float.parseFloat(splitStr[23]);
this.systemTime = Float.parseFloat(splitStr[24]);
this.waitTime = Float.parseFloat(splitStr[25]);
@ -25,8 +25,8 @@ public class AixProcessorStat {
}
public Integer getCpuNum() {
return cpuNum;
public String getName() {
return name;
}
public Float getUserTime() {

View file

@ -1,8 +1,8 @@
package org.sysmon.plugins.sysmon_linux;
import org.pf4j.Extension;
import org.sysmon.shared.Measurement;
import org.sysmon.shared.MetricExtension;
import org.sysmon.shared.MeasurementPair;
import org.sysmon.shared.MetricResult;
import java.io.IOException;
@ -10,6 +10,7 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ -76,15 +77,14 @@ public class LinuxDiskExtension implements MetricExtension {
}
private List<MeasurementPair> calculate() {
private List<Measurement> calculate() {
List<MeasurementPair> measurementList = new ArrayList<>();
List<Measurement> measurementList = new ArrayList<>();
if(previousDiskStats == null || previousDiskStats.size() != currentDiskStats.size()) {
return measurementList;
}
for(int i = 0; i < currentDiskStats.size(); i++) {
LinuxDiskStat curStat = currentDiskStats.get(i);
@ -97,11 +97,19 @@ public class LinuxDiskExtension implements MetricExtension {
}
});
if(!ignore.get()) {
long timeSpendDoingIo = curStat.getTimeSpentOnIo() - preStat.getTimeSpentOnIo();
// TODO: Calculate differences for wanted disk io stats
measurementList.add(new MeasurementPair(curStat.getDevice() + "-iotime", timeSpendDoingIo));
HashMap<String, String> tagsMap = new HashMap<>();
tagsMap.put("device", curStat.getDevice());
HashMap<String, Object> fieldsMap = new HashMap<>();
fieldsMap.put("iotime", curStat.getTimeSpentOnIo() - preStat.getTimeSpentOnIo());
fieldsMap.put("readtime", curStat.getTimeSpentReading() - preStat.getTimeSpentReading());
fieldsMap.put("writetime", curStat.getTimeSpentWriting() - preStat.getTimeSpentWriting());
fieldsMap.put("reads", curStat.getSectorsRead() - preStat.getSectorsRead());
fieldsMap.put("writes", curStat.getSectorsWritten() - preStat.getSectorsWritten());
measurementList.add(new Measurement(tagsMap, fieldsMap));
}
}

View file

@ -113,5 +113,20 @@ public class LinuxDiskStat {
return timeSpentOnIo;
}
public Long getSectorsRead() {
return sectorsRead;
}
public Long getSectorsWritten() {
return sectorsWritten;
}
public Long getTimeSpentReading() {
return timeSpentReading;
}
public Long getTimeSpentWriting() {
return timeSpentWriting;
}
}

View file

@ -1,8 +1,8 @@
package org.sysmon.plugins.sysmon_linux;
import org.pf4j.Extension;
import org.sysmon.shared.Measurement;
import org.sysmon.shared.MetricExtension;
import org.sysmon.shared.MeasurementPair;
import org.sysmon.shared.MetricResult;
import java.io.IOException;
@ -10,7 +10,9 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -49,21 +51,29 @@ public class LinuxMemoryExtension implements MetricExtension {
}
private List<MeasurementPair> readProcFile() throws IOException {
private List<Measurement> readProcFile() throws IOException {
List<MeasurementPair> measurementList = new ArrayList<>();
List<Measurement> measurementList = new ArrayList<>();
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
List<String> allLines = Files.readAllLines(Paths.get("/proc/meminfo"), StandardCharsets.UTF_8);
for (String line : allLines) {
if (line.startsWith("Mem")) {
Matcher matcher = pattern.matcher(line);
if (matcher.find() && matcher.groupCount() == 2) {
measurementList.add(new MeasurementPair(matcher.group(1), matcher.group(2)));
String key = matcher.group(1).substring(3).toLowerCase(); // remove "Mem" and lowercase
Object value = matcher.group(2);
fieldsMap.put(key, value);
}
}
}
measurementList.add(new Measurement(tagsMap, fieldsMap));
return measurementList;
}

View file

@ -1,10 +1,9 @@
package org.sysmon.plugins.sysmon_linux;
import org.pf4j.Extension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sysmon.shared.MeasurementPair;
import org.sysmon.shared.Measurement;
import org.sysmon.shared.MetricExtension;
import org.sysmon.shared.MetricResult;
@ -13,7 +12,9 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Extension
public class LinuxProcessorExtension implements MetricExtension {
@ -53,11 +54,20 @@ public class LinuxProcessorExtension implements MetricExtension {
return result;
}
List<Measurement> measurementList = new ArrayList<>();
for(int i = 0; i < currentProcessorProc.size(); i++) {
LinuxProcessorStat stat = new LinuxProcessorStat(currentProcessorProc.get(i), previousProcessorProc.get(i));
result.addMeasurement(stat.getMeasurements());
Map<String, String> tagsMap = new HashMap<>();
tagsMap.put("cpu", stat.getName());
Map<String, Object> fieldsMap = stat.getFields();
measurementList.add(new Measurement(tagsMap, fieldsMap));
}
result.addMeasurements(measurementList);
return result;
}

View file

@ -1,10 +1,7 @@
package org.sysmon.plugins.sysmon_linux;
import org.sysmon.shared.MeasurementPair;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
public class LinuxProcessorStat {
@ -22,8 +19,18 @@ public class LinuxProcessorStat {
}
public MeasurementPair getMeasurements() {
return new MeasurementPair(cpuName, utilizationPercentage);
public String getName() {
return cpuName;
}
public Map<String, Object> getFields() {
HashMap<String, Object> fieldsMap = new HashMap<>();
fieldsMap.put("utilization", utilizationPercentage);
return fieldsMap;
}

View file

@ -36,8 +36,8 @@ class LinuxProcessorTest extends Specification {
LinuxProcessorStat processorStat = new LinuxProcessorStat(processorProcLine1, processorProcLine2)
then:
processorStat.getMeasurements().getName() == "cpu0"
processorStat.getMeasurements().getValue() == 42.13362f
processorStat.getName() == "cpu0"
processorStat.getFields().get("utilization") == 42.13362f
}

View file

@ -12,16 +12,16 @@ import java.util.concurrent.Callable;
@CommandLine.Command(name = "sysmon-server", mixinStandardHelpOptions = true)
public class Application implements Callable<Integer> {
@CommandLine.Option(names = { "-i", "--influxdb-url" }, description = "InfluxDB URL [default: 'http://localhost:8086'].", defaultValue = "http://localhost:8086", paramLabel = "<url>")
@CommandLine.Option(names = { "-i", "--influxdb-url" }, description = "InfluxDB URL (default: ${DEFAULT-VALUE})].", defaultValue = "http://localhost:8086", paramLabel = "<url>")
private URL influxUrl;
@CommandLine.Option(names = { "-u", "--influxdb-user" }, description = "InfluxDB User [default: 'root'].", defaultValue = "root", paramLabel = "<user>")
@CommandLine.Option(names = { "-u", "--influxdb-user" }, description = "InfluxDB Username (default: ${DEFAULT-VALUE})].", defaultValue = "root", paramLabel = "<user>")
private String influxUser;
@CommandLine.Option(names = { "-p", "--influxdb-pass" }, description = "InfluxDB Password [default: ''].", defaultValue = "", paramLabel = "<pass>")
@CommandLine.Option(names = { "-p", "--influxdb-pass" }, description = "InfluxDB Password (default: ${DEFAULT-VALUE}).", defaultValue = "", paramLabel = "<pass>")
private String influxPass;
@CommandLine.Option(names = { "-l", "--listen-port" }, description = "Listening port [default: '9925'].", defaultValue = "9925", paramLabel = "<port>")
@CommandLine.Option(names = { "-s", "--server-port" }, description = "Server port (default: ${DEFAULT-VALUE}).", defaultValue = "9925", paramLabel = "<port>")
private String listenPort;

View file

@ -3,14 +3,19 @@ package org.sysmon.server;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.influxdb.dto.Point;
import org.sysmon.shared.MeasurementPair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sysmon.shared.Measurement;
import org.sysmon.shared.MetricResult;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class MetricResultToPointProcessor implements Processor {
private static final Logger log = LoggerFactory.getLogger(MetricResultToPointProcessor.class);
@Override
public void process(Exchange exchange) throws Exception {
@ -20,18 +25,30 @@ public class MetricResultToPointProcessor implements Processor {
.time(metricResult.getTimestamp(), TimeUnit.MILLISECONDS)
.tag("hostname", metricResult.getHostname());
List<MeasurementPair> measurements = metricResult.getMeasurements();
for(MeasurementPair measurement : measurements) {
if(measurement.getValue() instanceof Number) {
Number num = (Number) measurement.getValue();
builder.addField(measurement.getName(), num);
} else if(measurement.getValue() instanceof Boolean) {
Boolean bol = (Boolean) measurement.getValue();
builder.addField(measurement.getName(), bol);
} else {
String str = (String) measurement.getValue();
builder.addField(measurement.getName(), str);
List<Measurement> measurements = metricResult.getMeasurements();
for(Measurement measurement : measurements) {
for (Map.Entry<String,String> entry : measurement.getTags().entrySet()) {
log.debug("process() - tag: " + entry.getKey() + "=" + entry.getValue());
builder.tag(entry.getKey(), entry.getValue());
}
for (Map.Entry<String,Object> entry : measurement.getFields().entrySet()) {
log.debug("process() - field: " + entry.getKey() + "=" + entry.getValue());
if(entry.getValue() instanceof Number) {
Number num = (Number) entry.getValue();
builder.addField(entry.getKey(), num);
} else if(entry.getValue() instanceof Boolean) {
Boolean bol = (Boolean) entry.getValue();
builder.addField(entry.getKey(), bol);
} else {
String str = (String) entry.getValue();
builder.addField(entry.getKey(), str);
}
}
}
exchange.getIn().setBody(builder.build());

View file

@ -36,7 +36,7 @@ public class ServerRouteBuilder extends RouteBuilder {
//from("seda:inbound").log("Got metric from: ${header.component}").to("mock:sink");
from("seda:inbound")
.log("Got metric from: ${header.component}")
.log(">>> metric: ${header.hostname} - ${header.metric}")
.doTry()
.process(new MetricResultToPointProcessor())
.log("${body}")

View file

@ -0,0 +1,36 @@
package org.sysmon.shared;
import java.util.HashMap;
import java.util.Map;
public class Measurement {
private Map<String, String> tags = new HashMap<>();
private Map<String, Object> fields = new HashMap<>();
public Measurement() {
}
public Measurement(Map<String, String> tags, Map<String, Object> fields) {
this.tags = tags;
this.fields = fields;
}
public Map<String, String> getTags() {
return tags;
}
public Map<String, Object> getFields() {
return fields;
}
public void setTags(Map<String, String> tags) {
this.tags = tags;
}
public void setFields(Map<String, Object> fields) {
this.fields = fields;
}
}

View file

@ -1,39 +0,0 @@
package org.sysmon.shared;
import java.io.Serializable;
public class MeasurementPair implements Serializable {
private static final long serialVersionUID = 1L;
private String name;
private Object value;
public MeasurementPair() { }
public MeasurementPair(String name, Object value) {
this.name = name;
this.value = value;
}
public String toString() {
return String.format("%s: %s", name, value);
}
public String getName() {
return name;
}
public Object getValue() {
return value;
}
public void setName(String name) {
this.name = name;
}
public void setValue(Object value) {
this.value = value;
}
}

View file

@ -4,6 +4,7 @@ import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class MetricResult implements Serializable {
@ -12,7 +13,7 @@ public class MetricResult implements Serializable {
private String name;
private Long timestamp; // epoch milli
private String hostname;
private List<MeasurementPair> measurements = new ArrayList<>();
private List<Measurement> measurements = new ArrayList<>();
public MetricResult() {
}
@ -22,11 +23,11 @@ public class MetricResult implements Serializable {
this.timestamp = Instant.now().toEpochMilli();
}
public void addMeasurements(List<MeasurementPair> measurementList) {
public void addMeasurements(List<Measurement> measurementList) {
this.measurements = measurementList;
}
public void addMeasurement(MeasurementPair measurement) {
public void addMeasurement(Measurement measurement) {
measurements.add(measurement);
}
@ -54,14 +55,21 @@ public class MetricResult implements Serializable {
return hostname;
}
public List<MeasurementPair> getMeasurements() {
public List<Measurement> getMeasurements() {
return measurements;
}
public String toString() {
StringBuilder sb = new StringBuilder(String.format("%s - %s\n", timestamp.toString(), name));
for(MeasurementPair mm : measurements) {
sb.append(mm.toString()).append("\n");
for(Measurement m : measurements) {
for (Map.Entry<String,String> entry : m.getTags().entrySet())
sb.append(entry.getKey() + " : " + entry.getValue());
for (Map.Entry<String,Object> entry : m.getFields().entrySet())
sb.append(entry.getKey() + " : " + entry.getValue());
sb.append(m.toString()).append("\n");
}
return sb.toString();