Some cleanup. Agent still shows http status 500 even though it's sending data and it's recieved.

This commit is contained in:
Mark Nellemann 2021-05-06 13:26:53 +02:00
parent afdc293b43
commit d77e110387
10 changed files with 128 additions and 132 deletions

View file

@ -7,9 +7,43 @@
Runs on your host and collects metrics. Metrics are aggregated and sent to central *collector*. Runs on your host and collects metrics. Metrics are aggregated and sent to central *collector*.
Metrics are currently processor, memory and disk usage statistics.
## Collector
## TODO: Collector
Receives aggregated measurements from agents and saves metrics info InfluxDB. Receives aggregated measurements from agents and saves metrics info InfluxDB.
### Build & Test
Use the gradle build tool, which will download all required dependencies:
```shell
./gradlew clean build
```
### Local Testing
#### InfluxDB container
Start the InfluxDB container:
```shell
docker run --name=influxdb --rm -d -p 8086:8086 influxdb:1.8-alpine
```
To execute the Influx client from within the container:
```shell
docker exec -it influxdb influx
```
#### Grafana container
Start the Grafana container, linking it to the InfluxDB container:
```shell
docker run --name grafana --link influxdb:influxdb --rm -d -p 3000:3000 grafana/grafana:7.1.3
```
Setup Grafana to connect to the InfluxDB container by defining a new datasource on URL *http://influxdb:8086* named *sysmon*.

View file

@ -1,25 +0,0 @@
package org.sysmon.agent;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import java.util.ArrayList;
//simply combines Exchange body values into an ArrayList<Object>
class ArrayListAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody();
ArrayList<Object> list = null;
if (oldExchange == null) {
list = new ArrayList<Object>();
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
} else {
list = oldExchange.getIn().getBody(ArrayList.class);
list.add(newBody);
return oldExchange;
}
}
}

View file

@ -2,40 +2,23 @@ package org.sysmon.agent;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
import org.apache.camel.Processor; import org.apache.camel.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sysmon.shared.MetricResult; import org.sysmon.shared.MetricResult;
import org.sysmon.shared.dto.MetricMessageDTO;
import java.util.concurrent.atomic.AtomicLong;
public class MetricProcessor implements Processor { public class MetricProcessor implements Processor {
private final static Logger log = LoggerFactory.getLogger(MetricProcessor.class);
private static final AtomicLong counter = new AtomicLong();
/*
public void process(Exchange exchange) throws Exception {
MetricResult reading = exchange.getIn().getBody(MetricResult.class);
log.debug(reading.toString());
// do something with the payload and/or exchange here
//exchange.getIn().setBody("Changed body");
// do something...
MetricMessageDTO payload = new MetricMessageDTO("event " + reading, counter.getAndIncrement());
exchange.getIn().setBody(payload, MetricMessageDTO.class);
}*/
public void process(Exchange exchange) throws Exception { public void process(Exchange exchange) throws Exception {
MetricResult result = exchange.getIn().getBody(MetricResult.class); MetricResult result = exchange.getIn().getBody(MetricResult.class);
result.setHostname("sauron"); if(result.getMeasurementList().size() < 1) {
exchange.getIn().setBody(result); exchange.setProperty("skip", true);
}
exchange.getIn().setHeader("component", result.getName()); exchange.getIn().setHeader("component", result.getName());
// TODO: Read hostname from configuration
result.setHostname("sauron");
exchange.getIn().setBody(result);
} }
} }

View file

@ -2,7 +2,6 @@ package org.sysmon.agent;
import org.apache.camel.Exchange; import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder; import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.camel.model.dataformat.JsonLibrary; import org.apache.camel.model.dataformat.JsonLibrary;
import org.pf4j.JarPluginManager; import org.pf4j.JarPluginManager;
import org.pf4j.PluginManager; import org.pf4j.PluginManager;
@ -32,64 +31,31 @@ public class MyRouteBuilder extends RouteBuilder {
log.info(">>> Enabling extension: " + ext.getDescription()); log.info(">>> Enabling extension: " + ext.getDescription());
// Setup Camel route for this extension // Setup Camel route for this extension
from("timer:collect?period=5000") from("timer:collect?period=10000")
.bean(ext, "getMetrics") .bean(ext, "getMetrics")
.setHeader("ext", constant(ext.getName())) //.setHeader("ext", constant(ext.getName()))
.doTry()
.process(new MetricProcessor())
.choice()
.when(exchangeProperty("skip").isEqualTo(true))
.stop()
.otherwise()
.to("seda:metrics"); .to("seda:metrics");
} }
} }
/*
TODO: How to combine/wrap the individual metrics into a container which also contains
some extra information, such as our hostname, operating system, timestamp, etc.
Like one JSON or XML output with all metrics:
{
"hostname": "sauron",
"timestamp": "1322334343434",
"metrics": [
{ "processor": [
{ "cpu0":"10" },
{ "cpu1":"12" }
]},
{ "memory": [
{ "memUsed": "323434"},
{ "memFree": "4454545"}
]}
]
}
*/
/*
from("seda:metrics")
.aggregate(header("ext"), new ArrayListAggregationStrategy())
.completionInterval(5000)
.to("seda:aggregated");
from("seda:aggregated")
.log("${body}");
*/
// Send to collector when combined
from("seda:metrics") from("seda:metrics")
.setHeader(Exchange.HTTP_METHOD, constant("POST")) .setHeader(Exchange.HTTP_METHOD, constant("POST"))
.doTry() .doTry()
.process(new MetricProcessor()) //.process(new MetricProcessor())
.marshal().json(JsonLibrary.Jackson, MetricResult.class) .marshal().json(JsonLibrary.Jackson, MetricResult.class)
.to("http://127.0.0.1:9925/metrics") .to("http://127.0.0.1:9925/metrics")
.doCatch(Exception.class) .doCatch(Exception.class)
.log("Error sending metric to collector: ${exception}") .log("Error sending metric to collector: ${exception}")
.end(); .end();
} }
} }

View file

@ -1,18 +0,0 @@
package org.sysmon.agent;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
class StringAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(oldBody + "+" + newBody);
return oldExchange;
}
}

View file

@ -7,13 +7,11 @@ dependencies {
implementation project(':shared') implementation project(':shared')
implementation group: 'org.apache.camel', name: 'camel-core', version: camelVersion implementation group: 'org.apache.camel', name: 'camel-core', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-main', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-rest', version: camelVersion implementation group: 'org.apache.camel', name: 'camel-rest', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-jetty', version: camelVersion implementation group: 'org.apache.camel', name: 'camel-jetty', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-jackson', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-main', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-bean', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-timer', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-stream', version: camelVersion implementation group: 'org.apache.camel', name: 'camel-stream', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-jackson', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-influxdb', version: camelVersion implementation group: 'org.apache.camel', name: 'camel-influxdb', version: camelVersion
} }

View file

@ -1,8 +1,11 @@
package org.sysmon.collector; package org.sysmon.collector;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder; import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jackson.JacksonDataFormat; import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.camel.model.dataformat.JsonLibrary;
import org.apache.camel.model.rest.RestBindingMode; import org.apache.camel.model.rest.RestBindingMode;
import org.sysmon.collector.processor.MetricResultToPointProcessor;
import org.sysmon.shared.MetricResult; import org.sysmon.shared.MetricResult;
import org.sysmon.shared.dto.MetricMessageDTO; import org.sysmon.shared.dto.MetricMessageDTO;
@ -32,12 +35,18 @@ public class CollectorRouteBuilder extends RouteBuilder {
.to("seda:inbound") .to("seda:inbound")
.endRest(); .endRest();
from("seda:inbound").log("Got metric from: ${header.component}").to("mock:sink"); //from("seda:inbound").log("Got metric from: ${header.component}").to("mock:sink");
/*
from("seda:inbound") from("seda:inbound")
.to("influxdb://myInfluxConnection?databaseName=sysmon"); .log("Got metric from: ${header.component}")
*/ .doTry()
.process(new MetricResultToPointProcessor())
.log("${body}")
.to("influxdb://myInfluxConnection?databaseName=sysmon&retentionPolicy=autogen")
.doCatch(Exception.class)
.log("Error storing metric to InfluxDB: ${exception}")
.end();
} }

View file

@ -9,8 +9,6 @@ import org.sysmon.shared.dto.MetricMessageDTO;
public class IncomingMetricProcessor implements Processor { public class IncomingMetricProcessor implements Processor {
private final static Logger log = LoggerFactory.getLogger(IncomingMetricProcessor.class);
public void process(Exchange exchange) throws Exception { public void process(Exchange exchange) throws Exception {
MetricResult payload = exchange.getIn().getBody(MetricResult.class); MetricResult payload = exchange.getIn().getBody(MetricResult.class);

View file

@ -0,0 +1,40 @@
package org.sysmon.collector.processor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.influxdb.dto.Point;
import org.sysmon.shared.MetricMeasurement;
import org.sysmon.shared.MetricResult;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class MetricResultToPointProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
MetricResult metricResult = exchange.getIn().getBody(MetricResult.class);
Point.Builder builder = Point.measurement(metricResult.getName())
.time(metricResult.getTimestamp(), TimeUnit.MILLISECONDS)
.tag("hostname", metricResult.getHostname());
List<MetricMeasurement> measurements = metricResult.getMeasurementList();
for(MetricMeasurement measurement : measurements) {
if(measurement.getValue() instanceof Number) {
Number num = (Number) measurement.getValue();
builder.addField(measurement.getName(), num);
} else if(measurement.getValue() instanceof Boolean) {
Boolean bol = (Boolean) measurement.getValue();
builder.addField(measurement.getName(), bol);
} else {
String str = (String) measurement.getValue();
builder.addField(measurement.getName(), str);
}
}
exchange.getIn().setBody(builder.build());
}
}

View file

@ -11,10 +11,16 @@ import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@Extension @Extension
public class LinuxDiskExtension implements MetricExtension { public class LinuxDiskExtension implements MetricExtension {
private final static List<String> ignoreList = new ArrayList<String>() {{
add("dm-");
add("loop");
}};
private List<LinuxDiskStat> currentDiskStats; private List<LinuxDiskStat> currentDiskStats;
private List<LinuxDiskStat> previousDiskStats; private List<LinuxDiskStat> previousDiskStats;
@ -84,14 +90,19 @@ public class LinuxDiskExtension implements MetricExtension {
LinuxDiskStat curStat = currentDiskStats.get(i); LinuxDiskStat curStat = currentDiskStats.get(i);
LinuxDiskStat preStat = previousDiskStats.get(i); LinuxDiskStat preStat = previousDiskStats.get(i);
if(curStat.getDevice().startsWith("loop")) { AtomicBoolean ignore = new AtomicBoolean(false);
continue; ignoreList.forEach(str -> {
if(curStat.getDevice().startsWith(str)) {
ignore.set(true);
} }
});
if(!ignore.get()) {
long timeSpendDoingIo = curStat.getTimeSpentOnIo() - preStat.getTimeSpentOnIo(); long timeSpendDoingIo = curStat.getTimeSpentOnIo() - preStat.getTimeSpentOnIo();
// TODO: Calculate differences for wanted disk io stats // TODO: Calculate differences for wanted disk io stats
measurementList.add(new MetricMeasurement(curStat.getDevice() + "-iotime", timeSpendDoingIo)); measurementList.add(new MetricMeasurement(curStat.getDevice() + "-iotime", timeSpendDoingIo));
}
} }