Merged master into development
This commit is contained in:
commit
ab938338b8
|
@ -41,8 +41,9 @@ application {
|
|||
}
|
||||
|
||||
run {
|
||||
systemProperty 'pf4j.pluginsDir', '../plugins/output/'
|
||||
systemProperty 'sysmon.pluginsDir', '../plugins/output/'
|
||||
systemProperty 'sysmon.cfgFile', 'doc/sysmon-client.toml'
|
||||
systemProperty 'sysmon.debug', '1'
|
||||
}
|
||||
|
||||
tasks.named('test') {
|
||||
|
|
|
@ -42,11 +42,20 @@ public class Application implements Callable<Integer> {
|
|||
@Override
|
||||
public Integer call() throws IOException {
|
||||
|
||||
if(enableDebug) {
|
||||
String sysmonDebug = System.getProperty("sysmon.debug");
|
||||
if(sysmonDebug != null || enableDebug) {
|
||||
System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "INFO");
|
||||
}
|
||||
|
||||
Configuration configuration = new Configuration();
|
||||
String sysmonCfgFile = System.getProperty("sysmon.cfgFile");
|
||||
if(sysmonCfgFile != null) {
|
||||
configurationFile = new File(sysmonCfgFile);
|
||||
}
|
||||
|
||||
String sysmonPluginsDir = System.getProperty("sysmon.pluginsDir");
|
||||
if(sysmonPluginsDir != null) {
|
||||
pluginPath = sysmonPluginsDir;
|
||||
}
|
||||
|
||||
if(hostname == null || hostname.isEmpty()) {
|
||||
try {
|
||||
|
@ -57,16 +66,7 @@ public class Application implements Callable<Integer> {
|
|||
}
|
||||
}
|
||||
|
||||
String pf4jPluginsDir = System.getProperty("pf4j.pluginsDir");
|
||||
if(pf4jPluginsDir != null) {
|
||||
pluginPath = pf4jPluginsDir;
|
||||
}
|
||||
|
||||
String sysmonCfgFile = System.getProperty("sysmon.cfgFile");
|
||||
if(sysmonCfgFile != null) {
|
||||
configurationFile = new File(sysmonCfgFile);
|
||||
}
|
||||
|
||||
Configuration configuration = new Configuration();
|
||||
|
||||
if(configurationFile.exists()) {
|
||||
try {
|
||||
|
|
|
@ -2,13 +2,15 @@ package sysmon.client;
|
|||
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.LoggingLevel;
|
||||
import org.apache.camel.builder.AggregationStrategies;
|
||||
import org.apache.camel.builder.RouteBuilder;
|
||||
import org.apache.camel.model.dataformat.JsonLibrary;
|
||||
import org.apache.camel.component.jackson.JacksonDataFormat;
|
||||
import org.apache.camel.spi.Registry;
|
||||
import org.pf4j.JarPluginManager;
|
||||
import org.pf4j.PluginManager;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import sysmon.shared.ComboResult;
|
||||
import sysmon.shared.MetricExtension;
|
||||
import sysmon.shared.MetricResult;
|
||||
|
||||
|
@ -61,13 +63,13 @@ public class ClientRouteBuilder extends RouteBuilder {
|
|||
//from("timer:extensions?fixedRate=true&period=30s")
|
||||
from("timer:"+provides+"?fixedRate=true&period=30s")
|
||||
.bean(ext, "getMetrics")
|
||||
//.doTry()
|
||||
.outputType(MetricResult.class)
|
||||
.process(new MetricEnrichProcessor(registry))
|
||||
.choice().when(exchangeProperty("skip").isEqualTo(true))
|
||||
.log(LoggingLevel.WARN,"Skipping empty measurement.")
|
||||
.stop()
|
||||
.otherwise()
|
||||
.log("${body}")
|
||||
.to("seda:metrics?discardWhenFull=true");
|
||||
} else {
|
||||
log.info(">>> Skipping extension (not supported or disabled): " + ext.getDescription());
|
||||
|
@ -75,18 +77,20 @@ public class ClientRouteBuilder extends RouteBuilder {
|
|||
|
||||
}
|
||||
|
||||
from("seda:metrics")
|
||||
.aggregate(constant(true), AggregationStrategies.beanAllowNull(ComboAppender.class, "append"))
|
||||
//.aggregate(new GroupedExchangeAggregationStrategy()).constant(true)
|
||||
//.aggregate(constant(true), new ListOfResultsStrategy())
|
||||
// wait for 5 seconds to aggregate
|
||||
.completionTimeout(5000L).to("seda:outbound");
|
||||
|
||||
// TODO: Make 'concurrentConsumers' configurable
|
||||
from("seda:metrics?concurrentConsumers=1")
|
||||
from("seda:outbound")
|
||||
.setHeader(Exchange.HTTP_METHOD, constant("POST"))
|
||||
//.setHeader(Exchange.CONTENT_TYPE, constant("application/json"))
|
||||
.doTry()
|
||||
//.process(new MetricProcessor())
|
||||
.marshal().json(JsonLibrary.Jackson, MetricResult.class)
|
||||
.marshal(new JacksonDataFormat(ComboResult.class))
|
||||
.to((String)registry.lookupByName("myServerUrl"))
|
||||
.doCatch(Exception.class)
|
||||
.log(LoggingLevel.WARN,"Error: ${exception.message}")
|
||||
//.log("Error sending metric to collector: ${body}")
|
||||
.end();
|
||||
|
||||
}
|
||||
|
|
18
client/src/main/java/sysmon/client/ComboAppender.java
Normal file
18
client/src/main/java/sysmon/client/ComboAppender.java
Normal file
|
@ -0,0 +1,18 @@
|
|||
package sysmon.client;
|
||||
|
||||
import sysmon.shared.ComboResult;
|
||||
import sysmon.shared.MetricResult;
|
||||
|
||||
public class ComboAppender {
|
||||
|
||||
public ComboResult append(ComboResult comboResult, MetricResult metricResult) {
|
||||
|
||||
if (comboResult == null) {
|
||||
comboResult = new ComboResult();
|
||||
}
|
||||
|
||||
comboResult.getMetricResults().add(metricResult);
|
||||
return comboResult;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
package sysmon.client;
|
||||
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.processor.aggregate.AbstractListAggregationStrategy;
|
||||
import sysmon.shared.MetricResult;
|
||||
|
||||
public class ListOfResultsStrategy extends AbstractListAggregationStrategy<MetricResult> {
|
||||
|
||||
@Override
|
||||
public MetricResult getValue(Exchange exchange) {
|
||||
return exchange.getIn().getBody(MetricResult.class);
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
version=0.0.12
|
||||
version=0.1.2
|
||||
pf4jVersion=3.6.0
|
||||
slf4jVersion=1.7.32
|
||||
camelVersion=3.11.2
|
||||
|
|
|
@ -6,7 +6,7 @@ class AixNetstatTest extends Specification {
|
|||
void "test netstat parsing"() {
|
||||
|
||||
setup:
|
||||
InputStream inputStream = getClass().getResourceAsStream('/netstat-aix.txt');
|
||||
InputStream inputStream = getClass().getResourceAsStream('/netstat-aix.txt')
|
||||
|
||||
when:
|
||||
AixNetstatParser parser = new AixNetstatParser(inputStream)
|
||||
|
|
|
@ -7,7 +7,7 @@ class AixProcessorTest extends Specification {
|
|||
void "test AIX lparstat shared output processing"() {
|
||||
|
||||
setup:
|
||||
InputStream inputStream = getClass().getResourceAsStream('/lparstat-aix-shared.txt');
|
||||
InputStream inputStream = getClass().getResourceAsStream('/lparstat-aix-shared.txt')
|
||||
|
||||
when:
|
||||
AixProcessorExtension extension = new AixProcessorExtension()
|
||||
|
@ -26,7 +26,7 @@ class AixProcessorTest extends Specification {
|
|||
void "test AIX lparstat dedicated output processing"() {
|
||||
|
||||
setup:
|
||||
InputStream inputStream = getClass().getResourceAsStream('/lparstat-aix-dedicated.txt');
|
||||
InputStream inputStream = getClass().getResourceAsStream('/lparstat-aix-dedicated.txt')
|
||||
|
||||
when:
|
||||
AixProcessorExtension extension = new AixProcessorExtension()
|
||||
|
@ -45,7 +45,7 @@ class AixProcessorTest extends Specification {
|
|||
void "test Linux lparstat output processing"() {
|
||||
|
||||
setup:
|
||||
InputStream inputStream = getClass().getResourceAsStream('/lparstat-linux.txt');
|
||||
InputStream inputStream = getClass().getResourceAsStream('/lparstat-linux.txt')
|
||||
|
||||
when:
|
||||
AixProcessorExtension extension = new AixProcessorExtension()
|
||||
|
|
|
@ -89,15 +89,15 @@ public class BaseProcessExtension implements MetricExtension {
|
|||
continue;
|
||||
}
|
||||
|
||||
String name = p.getName();
|
||||
if(!includeList.contains(name)) {
|
||||
// Skip process names not found in our includeList, only if the list is not empty or null
|
||||
if(includeList != null && !includeList.isEmpty() && !includeList.contains(p.getName())) {
|
||||
continue;
|
||||
}
|
||||
log.info("pid: " + p.getProcessID() + ", name: " + name + ", virt: " + p.getVirtualSize() + " rss: " + p.getResidentSetSize());
|
||||
log.debug("pid: " + p.getProcessID() + ", name: " + p.getName() + ", virt: " + p.getVirtualSize() + " rss: " + p.getResidentSetSize());
|
||||
|
||||
HashMap<String, String> tagsMap = new HashMap<String, String>() {{
|
||||
put("pid", String.valueOf(p.getProcessID()));
|
||||
put("name", name);
|
||||
put("name", p.getName());
|
||||
}};
|
||||
|
||||
HashMap<String, Object> fieldsMap = new HashMap<String, Object>() {{
|
||||
|
|
|
@ -6,7 +6,7 @@ class LinuxNetstatTest extends Specification {
|
|||
void "test netstat parsing"() {
|
||||
|
||||
setup:
|
||||
InputStream inputStream = getClass().getResourceAsStream('/netstat-linux.txt');
|
||||
InputStream inputStream = getClass().getResourceAsStream('/netstat-linux.txt')
|
||||
|
||||
when:
|
||||
LinuxNetstatParser parser = new LinuxNetstatParser(inputStream)
|
||||
|
|
|
@ -31,6 +31,10 @@ application {
|
|||
applicationDefaultJvmArgs = [ "-server", "-Xmx128m", "-XX:+UseG1GC" ]
|
||||
}
|
||||
|
||||
run {
|
||||
systemProperty 'sysmon.debug', '1'
|
||||
}
|
||||
|
||||
tasks.named('test') {
|
||||
// Use junit platform for unit tests.
|
||||
useJUnitPlatform()
|
||||
|
|
|
@ -47,7 +47,8 @@ public class Application implements Callable<Integer> {
|
|||
@Override
|
||||
public Integer call() throws IOException {
|
||||
|
||||
if(enableDebug) {
|
||||
String sysmonDebug = System.getProperty("sysmon.debug");
|
||||
if(sysmonDebug != null || enableDebug) {
|
||||
System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "INFO");
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
package sysmon.server;
|
||||
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.Processor;
|
||||
import org.influxdb.dto.BatchPoints;
|
||||
import org.influxdb.dto.Point;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import sysmon.shared.ComboResult;
|
||||
import sysmon.shared.Measurement;
|
||||
import sysmon.shared.MetricResult;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ComboResultToPointProcessor implements Processor {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(ComboResultToPointProcessor.class);
|
||||
private static String influxDbName;
|
||||
|
||||
ComboResultToPointProcessor(String influxDbName) {
|
||||
ComboResultToPointProcessor.influxDbName = influxDbName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Exchange exchange) throws Exception {
|
||||
|
||||
ComboResult comboResult = exchange.getIn().getBody(ComboResult.class);
|
||||
//MetricResult metricResult = exchange.getIn().getBody(MetricResult.class);
|
||||
|
||||
//log.info("Size of measurements: " + measurementList.size());
|
||||
|
||||
BatchPoints.Builder batchPoints = BatchPoints
|
||||
.database(ComboResultToPointProcessor.influxDbName)
|
||||
.precision(TimeUnit.MILLISECONDS);
|
||||
|
||||
List<MetricResult> results = comboResult.getMetricResults();
|
||||
for(MetricResult metricResult : results) {
|
||||
|
||||
List<Measurement> measurementList = metricResult.getMeasurements();
|
||||
for(Measurement measurement : measurementList) {
|
||||
|
||||
Point.Builder point = Point.measurement(metricResult.getName())
|
||||
.time(metricResult.getTimestamp(), TimeUnit.MILLISECONDS)
|
||||
.tag("hostname", metricResult.getHostname());
|
||||
|
||||
for (Map.Entry<String,String> entry : measurement.getTags().entrySet()) {
|
||||
//log.info("process() - tag: " + entry.getKey() + "=" + entry.getValue());
|
||||
point.tag(entry.getKey(), entry.getValue());
|
||||
}
|
||||
|
||||
for (Map.Entry<String,Object> entry : measurement.getFields().entrySet()) {
|
||||
//log.info("process() - field: " + entry.getKey() + "=" + entry.getValue());
|
||||
if(entry.getValue() instanceof Number) {
|
||||
Number num = (Number) entry.getValue();
|
||||
point.addField(entry.getKey(), num);
|
||||
} else if(entry.getValue() instanceof Boolean) {
|
||||
Boolean bol = (Boolean) entry.getValue();
|
||||
point.addField(entry.getKey(), bol);
|
||||
} else {
|
||||
String str = (String) entry.getValue();
|
||||
point.addField(entry.getKey(), str);
|
||||
}
|
||||
}
|
||||
batchPoints.point(point.build());
|
||||
}
|
||||
}
|
||||
|
||||
exchange.getIn().setBody(batchPoints.build());
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -5,7 +5,7 @@ import org.apache.camel.LoggingLevel;
|
|||
import org.apache.camel.builder.RouteBuilder;
|
||||
import org.apache.camel.model.rest.RestBindingMode;
|
||||
import org.apache.camel.spi.Registry;
|
||||
import sysmon.shared.MetricResult;
|
||||
import sysmon.shared.ComboResult;
|
||||
|
||||
public class ServerRouteBuilder extends RouteBuilder {
|
||||
|
||||
|
@ -34,20 +34,24 @@ public class ServerRouteBuilder extends RouteBuilder {
|
|||
.post("/metrics")
|
||||
.consumes("application/json")
|
||||
.produces("text/html")
|
||||
.type(MetricResult.class)
|
||||
.type(ComboResult.class)
|
||||
.route()
|
||||
.setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202))
|
||||
.setHeader("Content-Type", constant("application/x-www-form-urlencoded"))
|
||||
.to("seda:inbound?discardWhenFull=true")
|
||||
.doTry()
|
||||
.setHeader(Exchange.HTTP_RESPONSE_CODE, constant(202))
|
||||
.setHeader("Content-Type", constant("application/x-www-form-urlencoded"))
|
||||
.to("seda:inbound?discardWhenFull=true")
|
||||
.doCatch(Exception.class)
|
||||
.log(LoggingLevel.WARN, "Error: ${exception.message}")
|
||||
.end()
|
||||
.endRest();
|
||||
|
||||
fromF("seda:inbound?concurrentConsumers=%s", threads)
|
||||
.log(">>> metric: ${header.hostname} - ${body}")
|
||||
.doTry()
|
||||
.process(new MetricResultToPointProcessor(dbname))
|
||||
.process(new ComboResultToPointProcessor(dbname))
|
||||
.toF("influxdb://ref.myInfluxConnection?batch=true") //&retentionPolicy=autogen
|
||||
.doCatch(Exception.class)
|
||||
.log(LoggingLevel.WARN, "Error: ${exception}")
|
||||
.log(LoggingLevel.WARN, "Error: ${exception.message}")
|
||||
.end();
|
||||
|
||||
}
|
||||
|
|
34
shared/src/main/java/sysmon/shared/ComboResult.java
Normal file
34
shared/src/main/java/sysmon/shared/ComboResult.java
Normal file
|
@ -0,0 +1,34 @@
|
|||
package sysmon.shared;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class ComboResult implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private ArrayList<MetricResult> metricResults;
|
||||
|
||||
|
||||
public ComboResult() {
|
||||
metricResults = new ArrayList<>();
|
||||
}
|
||||
|
||||
public ComboResult(ArrayList<MetricResult> metricResults) {
|
||||
this.metricResults = metricResults;
|
||||
}
|
||||
|
||||
public ArrayList<MetricResult> getMetricResults() {
|
||||
return metricResults;
|
||||
}
|
||||
|
||||
public void setMetricResults(ArrayList<MetricResult> metricResults) {
|
||||
this.metricResults = metricResults;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ComboResult of size: " + metricResults.size();
|
||||
}
|
||||
|
||||
}
|
|
@ -7,6 +7,8 @@ import java.util.Objects;
|
|||
|
||||
public class Measurement implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private HashMap<String, String> tags = new HashMap<>();
|
||||
private HashMap<String, Object> fields = new HashMap<>();
|
||||
|
||||
|
|
Loading…
Reference in a new issue