Combined work from collector branch into this.

This commit is contained in:
Mark Nellemann 2021-05-04 14:23:34 +02:00
parent 15cf1963b7
commit 9b99e22b8b
32 changed files with 535 additions and 224 deletions

View file

@ -1,11 +1,3 @@
/*
* This file was generated by the Gradle 'init' task.
*
* This generated file contains a sample Java application project to get you started.
* For more details take a look at the 'Building Java & JVM projects' chapter in the Gradle
* User Manual available at https://docs.gradle.org/7.0/userguide/building_java_projects.html
*/
plugins {
id 'groovy'
id 'application'
@ -30,6 +22,13 @@ dependencies {
annotationProcessor(group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}")
implementation group: 'org.pf4j', name: 'pf4j', version: "${pf4jVersion}"
implementation group: 'org.apache.camel', name: 'camel-core', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-main', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-http', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-jackson', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-bean', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-timer', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-stream', version: '3.7.3'
}
application {

View file

@ -3,53 +3,50 @@
*/
package org.sysmon.agent;
import org.pf4j.*;
import org.apache.camel.main.Main;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sysmon.shared.MetricExtension;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
// use Camels Main class
Main main = new Main();
// and add the routes (you can specify multiple classes)
main.configure().addRoutesBuilder(MyRouteBuilder.class);
// now keep the application running until the JVM is terminated (ctrl + c or sigterm)
try {
main.run(args);
} catch(Exception e) {
System.err.println(e.getMessage());
}
}
/*
public static void main(String[] args) throws InterruptedException {
// create the plugin manager
PluginManager pluginManager = new SysmonPluginManager(); // or "new ZipPluginManager() / new DefaultPluginManager()"
PluginManager pluginManager = new JarPluginManager(); // or "new ZipPluginManager() / new DefaultPluginManager()"
// start and load all plugins of application
pluginManager.loadPlugins();
pluginManager.startPlugins();
/*
final PluginManager pluginManager = new SysmonPluginManager() {
protected ExtensionFinder createExtensionFinder() {
DefaultExtensionFinder extensionFinder = (DefaultExtensionFinder) super.createExtensionFinder();
extensionFinder.addServiceProviderExtensionFinder();
return extensionFinder;
}
};
pluginManager.loadPlugins();
pluginManager.startPlugins();
*/
/*
List<PluginWrapper> plugins = pluginManager.getPlugins();
for(PluginWrapper wrapper : plugins) {
log.info(">>> Plugin Description: " + wrapper.getDescriptor().getPluginDescription());
}
*/
List<MetricExtension> metricExtensions = pluginManager.getExtensions(MetricExtension.class);
log.info(String.format("Found %d extensions for extension point '%s':", metricExtensions.size(), MetricExtension.class.getName()));
for (MetricExtension plugin : metricExtensions) {
log.info(">>> " + plugin.getGreeting());
for (MetricExtension ext : metricExtensions) {
if(ext.isSupported()) {
log.info(">>> " + ext.getGreeting());
// TODO: Setup camel
}
}
@ -62,24 +59,7 @@ public class Application {
Runtime.getRuntime().addShutdownHook(shutdownHook);
/*
do {
for (MetricExtension plugin : metricExtensions) {
// TODO: Find better way to avoid using plugins not working on runtime OS.
if(plugin.isSupported()) {
System.out.println(">>> " + plugin.getMetrics());
}
}
Thread.sleep(15000);
} while (keepRunning.get());
*/
}
}

View file

@ -0,0 +1,25 @@
package org.sysmon.agent;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import java.util.ArrayList;
//simply combines Exchange body values into an ArrayList<Object>
class ArrayListAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody();
ArrayList<Object> list = null;
if (oldExchange == null) {
list = new ArrayList<Object>();
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
} else {
list = oldExchange.getIn().getBody(ArrayList.class);
list.add(newBody);
return oldExchange;
}
}
}

View file

@ -0,0 +1,32 @@
package org.sysmon.agent;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sysmon.shared.MetricResult;
import org.sysmon.shared.dto.MetricMessageDTO;
import java.util.concurrent.atomic.AtomicLong;
public class MetricProcessor implements Processor {
private final static Logger log = LoggerFactory.getLogger(MetricProcessor.class);
private static final AtomicLong counter = new AtomicLong();
public void process(Exchange exchange) throws Exception {
MetricResult reading = exchange.getIn().getBody(MetricResult.class);
log.debug(reading.toString());
// do something with the payload and/or exchange here
//exchange.getIn().setBody("Changed body");
// do something...
MetricMessageDTO payload = new MetricMessageDTO("event " + reading, counter.getAndIncrement());
exchange.getIn().setBody(payload, MetricMessageDTO.class);
}
}

View file

@ -0,0 +1,18 @@
package org.sysmon.agent;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.sysmon.shared.MetricResult;
public class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Message newIn = newExchange.getIn();
MetricResult oldBody = oldExchange.getIn().getBody(MetricResult.class);
String newBody = newIn.getBody(String.class);
newIn.setBody(oldBody + newBody);
return newExchange;
}
}

View file

@ -0,0 +1,88 @@
package org.sysmon.agent;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.pf4j.JarPluginManager;
import org.pf4j.PluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sysmon.shared.MetricExtension;
import java.util.List;
public class MyRouteBuilder extends RouteBuilder {
private static final Logger log = LoggerFactory.getLogger(MyRouteBuilder.class);
@Override
public void configure() throws Exception {
PluginManager pluginManager = new JarPluginManager();
pluginManager.loadPlugins();
pluginManager.startPlugins();
List<MetricExtension> metricExtensions = pluginManager.getExtensions(MetricExtension.class);
//log.info(String.format("Found %d extensions for extension point '%s':", metricExtensions.size(), MetricExtension.class.getName()));
for (MetricExtension ext : metricExtensions) {
if(ext.isSupported()) {
log.info(">>> Enabling extension: " + ext.getDescription());
// Setup Camel route for this extension
from("timer:collect?period=5000")
.bean(ext, "getMetrics")
.setHeader("ext", constant(ext.getName()))
.to("seda:metrics");
}
}
/*
TODO: How to combine/wrap the individual metrics into a container which also contains
some extra information, such as our hostname, operating system, timestamp, etc.
Like one JSON or XML output with all metrics:
{
"hostname": "sauron",
"timestamp": "1322334343434",
"metrics": [
{ "processor": [
{ "cpu0":"10" },
{ "cpu1":"12" }
]},
{ "memory": [
{ "memUsed": "323434"},
{ "memFree": "4454545"}
]}
]
}
*/
/*
from("seda:metrics")
.aggregate(header("ext"), new ArrayListAggregationStrategy())
.completionInterval(5000)
.to("seda:aggregated");
from("seda:aggregated")
.log("${body}");
*/
// Send to collector when combined
from("seda:metrics")
.process(new MetricProcessor())
.marshal().json()
.setHeader(Exchange.HTTP_METHOD, constant("POST"))
.to("http://127.0.0.1:9925/metrics");
}
}

View file

@ -0,0 +1,18 @@
package org.sysmon.agent;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
class StringAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(oldBody + "+" + newBody);
return oldExchange;
}
}

View file

@ -1,30 +0,0 @@
package org.sysmon.agent;
import org.pf4j.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SysmonPluginManager extends DefaultPluginManager {
private static final Logger log = LoggerFactory.getLogger(SysmonPluginManager.class);
public SysmonPluginManager() {
super();
log.warn("SysmonPluginManager()");
}
@Override
protected PluginStatusProvider createPluginStatusProvider() {
log.warn("createPluginStatusProvider()");
return new SysmonPluginStatusProvider();
}
@Override
protected ExtensionFactory createExtensionFactory() {
log.warn("createExtensionFactory()");
return new SingletonExtensionFactory();
}
}

View file

@ -1,57 +0,0 @@
package org.sysmon.agent;
import org.pf4j.PluginStatusProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
public class SysmonPluginStatusProvider implements PluginStatusProvider {
private static final Logger log = LoggerFactory.getLogger(SysmonPluginStatusProvider.class);
private final List<String> enabledPlugins = new ArrayList<>();
private final List<String> disabledPlugins = new ArrayList<>();
public SysmonPluginStatusProvider() {
log.warn("SysmonPluginManager()");
}
@Override
public boolean isPluginDisabled(String pluginId) {
log.warn("isPluginDisabled() - " + pluginId);
if (disabledPlugins.contains(pluginId)) {
return true;
}
return !enabledPlugins.isEmpty() && !enabledPlugins.contains(pluginId);
}
@Override
public void disablePlugin(String pluginId) {
log.warn("disablePlugin() - " + pluginId);
if (isPluginDisabled(pluginId)) {
// do nothing
return;
}
}
@Override
public void enablePlugin(String pluginId) {
log.warn("enablePlugin() - " + pluginId);
if (!isPluginDisabled(pluginId)) {
// do nothing
return;
}
}
}

View file

@ -0,0 +1,42 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
# to configure camel main
# here you can configure options on camel main (see MainConfigurationProperties class)
camel.main.name = SysMon-Agent
# enable tracing
### camel.main.tracing = true
# bean introspection to log reflection based configuration
camel.main.beanIntrospectionExtendedStatistics=true
camel.main.beanIntrospectionLoggingLevel=INFO
# run in lightweight mode to be tiny as possible
camel.main.lightweight = true
# and eager load classes
#camel.main.eager-classloading = true
# use object pooling to reduce JVM garbage collection
#camel.main.exchange-factory = pooled
#camel.main.exchange-factory-statistics-enabled = true
# can be used to not start the route
# camel.main.auto-startup = false
# configure beans
camel.beans.metricProcessor = #class:org.sysmon.agent.MetricProcessor

View file

@ -1,4 +1,4 @@
org.slf4j.simpleLogger.logFile=System.err
org.slf4j.simpleLogger.logFile=System.out
org.slf4j.simpleLogger.showDateTime=true
org.slf4j.simpleLogger.showShortLogName=true
org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss.SSS

View file

@ -0,0 +1,6 @@
org.slf4j.simpleLogger.logFile=System.err
org.slf4j.simpleLogger.showDateTime=false
org.slf4j.simpleLogger.showShortLogName=true
org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss.SSS
org.slf4j.simpleLogger.levelInBrackets=true
org.slf4j.simpleLogger.defaultLogLevel=debug

35
collector/build.gradle Normal file
View file

@ -0,0 +1,35 @@
plugins {
id 'groovy'
id 'application'
}
repositories {
// Use Maven Central for resolving dependencies.
mavenCentral()
}
dependencies {
implementation project(':shared')
implementation "org.slf4j:slf4j-api:${slf4jVersion}"
implementation "org.slf4j:slf4j-simple:${slf4jVersion}"
implementation group: 'org.apache.camel', name: 'camel-core', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-rest', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-jetty', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-jackson', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-main', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-bean', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-timer', version: '3.7.3'
implementation group: 'org.apache.camel', name: 'camel-stream', version: '3.7.3'
}
application {
// Define the main class for the application.
mainClassName = 'org.sysmon.collector.Application'
}
tasks.named('test') {
// Use junit platform for unit tests.
useJUnitPlatform()
}

View file

@ -0,0 +1,19 @@
package org.sysmon.collector;
import org.apache.camel.main.Main;
public class Application {
public static void main(String[] args) {
Main main = new Main();
main.configure().addRoutesBuilder(CollectorRouteBuilder.class);
// now keep the application running until the JVM is terminated (ctrl + c or sigterm)
try {
main.run(args);
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
}

View file

@ -0,0 +1,34 @@
package org.sysmon.collector;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.rest.RestBindingMode;
import org.sysmon.shared.dto.MetricMessageDTO;
public class CollectorRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
restConfiguration().component("jetty")
.bindingMode(RestBindingMode.auto)
.host("127.0.0.1")
.port(9925);
rest()
.get("/")
.produces("text/html")
.route()
.to("log:stdout")
.endRest();
rest()
.post("/metrics")
.consumes("application/json")
.produces("text/html")
.type(MetricMessageDTO.class)
.route()
.to("bean:incomingMetricProcessor")
.endRest();
}
}

View file

@ -0,0 +1,21 @@
package org.sysmon.collector.bean;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
public class Hello implements Processor {
private final static Logger log = LoggerFactory.getLogger(Hello.class);
public void process(Exchange exchange) throws Exception {
String name = exchange.getIn().getHeader("name", String.class);
String msg = "Hello " + Objects.requireNonNull(name, "universe");
log.info(msg);
exchange.getMessage().setBody(msg);
}
}

View file

@ -0,0 +1,23 @@
package org.sysmon.collector.bean;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sysmon.shared.dto.MetricMessageDTO;
public class IncomingMetricProcessor implements Processor {
private final static Logger log = LoggerFactory.getLogger(IncomingMetricProcessor.class);
public void process(Exchange exchange) throws Exception {
MetricMessageDTO payload = exchange.getIn().getBody(MetricMessageDTO.class);
log.info("I am going to send this data to InfluxDB.");
log.info(payload.toString());
exchange.getMessage().setBody("OK");
}
}

View file

@ -0,0 +1,43 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
# to configure camel main
# here you can configure options on camel main (see MainConfigurationProperties class)
camel.main.name = SysMon-Collector
# enable tracing
camel.main.tracing = true
# bean introspection to log reflection based configuration
camel.main.beanIntrospectionExtendedStatistics=true
camel.main.beanIntrospectionLoggingLevel=INFO
# run in lightweight mode to be tiny as possible
camel.main.lightweight = true
# and eager load classes
#camel.main.eager-classloading = true
# use object pooling to reduce JVM garbage collection
#camel.main.exchange-factory = pooled
#camel.main.exchange-factory-statistics-enabled = true
# can be used to not start the route
# camel.main.auto-startup = false
# configure beans
camel.beans.incomingMetricProcessor = #class:org.sysmon.collector.bean.IncomingMetricProcessor
camel.beans.hello = #class:org.sysmon.collector.bean.Hello

View file

@ -0,0 +1,6 @@
org.slf4j.simpleLogger.logFile=System.out
org.slf4j.simpleLogger.showDateTime=true
org.slf4j.simpleLogger.showShortLogName=true
org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss.SSS
org.slf4j.simpleLogger.levelInBrackets=true
org.slf4j.simpleLogger.defaultLogLevel=info

View file

@ -0,0 +1,6 @@
org.slf4j.simpleLogger.logFile=System.err
org.slf4j.simpleLogger.showDateTime=false
org.slf4j.simpleLogger.showShortLogName=true
org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss.SSS
org.slf4j.simpleLogger.levelInBrackets=true
org.slf4j.simpleLogger.defaultLogLevel=debug

View file

@ -13,8 +13,13 @@ public class AixDiskExtension implements MetricExtension {
}
@Override
public String getGreeting() {
return "Welcome from AIX DiskMetric";
public String getName() {
return "aix-disk";
}
@Override
public String getDescription() {
return "AIX Disk Metrics (TODO)";
}
@Override

View file

@ -1,6 +1,5 @@
package org.sysmon.plugins.sysmon_aix;
import org.pf4j.PluginState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.pf4j.Plugin;
@ -14,23 +13,6 @@ public class AixPlugin extends Plugin {
super(wrapper);
}
@Override
public void start() {
if(!System.getProperty("os.name").toLowerCase().contains("aix")) {
log.warn("start() - Plugin not supported here.");
wrapper.setPluginState(PluginState.DISABLED);
wrapper.getPlugin().stop();
} else {
log.info("start() - Good to go.");
}
}
@Override
public void stop() {
log.debug("stop()");
}
}

View file

@ -22,8 +22,13 @@ public class AixProcessorExtension implements MetricExtension {
}
@Override
public String getGreeting() {
return "Welcome from AIX ProcessorMetric";
public String getName() {
return "aix-processor";
}
@Override
public String getDescription() {
return "AIX Processor Metrics";
}
@Override

View file

@ -24,8 +24,13 @@ public class LinuxDiskExtension implements MetricExtension {
}
@Override
public String getGreeting() {
return "Welcome from Linux DiskMetric";
public String getName() {
return "linux-disk";
}
@Override
public String getDescription() {
return "Linux Disk Metrics";
}
@Override

View file

@ -25,8 +25,13 @@ public class LinuxMemoryExtension implements MetricExtension {
}
@Override
public String getGreeting() {
return "Welcome from Linux MemoryMetric";
public String getName() {
return "linux-memory";
}
@Override
public String getDescription() {
return "Linux Memory Metrics";
}

View file

@ -1,6 +1,5 @@
package org.sysmon.plugins.sysmon_linux;
import org.pf4j.PluginState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.pf4j.Plugin;
@ -15,21 +14,4 @@ public class LinuxPlugin extends Plugin {
super(wrapper);
}
@Override
public void start() {
if(!System.getProperty("os.name").toLowerCase().contains("linux")) {
log.warn("start() - Plugin not supported here.");
wrapper.setPluginState(PluginState.DISABLED);
} else {
log.info("start() - Good to go.");
}
}
@Override
public void stop() {
log.debug("stop()");
}
}

View file

@ -29,10 +29,14 @@ public class LinuxProcessorExtension implements MetricExtension {
return System.getProperty("os.name").toLowerCase().contains("linux");
}
@Override
public String getName() {
return "linux-processor";
}
@Override
public String getGreeting() {
return "Welcome from Linux ProcessorMetric";
public String getDescription() {
return "Linux Processor Metrics";
}

View file

@ -8,7 +8,7 @@
*/
rootProject.name = 'sysmon'
include('shared', 'agent', 'plugins')
include('shared', 'agent', 'collector', 'plugins')
new File(rootDir, "plugins").listFiles().each {

View file

@ -1,9 +0,0 @@
package org.sysmon.shared;
import java.util.concurrent.Callable;
public interface MetricBean {
public MetricResult getMetrics();
}

View file

@ -5,7 +5,10 @@ import org.pf4j.ExtensionPoint;
public interface MetricExtension extends ExtensionPoint {
boolean isSupported();
String getGreeting();
String getName();
String getDescription();
MetricResult getMetrics();
}

View file

@ -1,24 +0,0 @@
package org.sysmon.shared;
import org.pf4j.Plugin;
import org.pf4j.PluginWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SysmonPlugin extends Plugin {
private static final Logger log = LoggerFactory.getLogger(SysmonPlugin.class);
public SysmonPlugin(PluginWrapper wrapper) {
super(wrapper);
log.warn("SysmonPlugin");
}
@Override
public void start() {
log.warn("start();");
}
}

View file

@ -0,0 +1,45 @@
package org.sysmon.shared.dto;
import java.util.Objects;
public class MetricMessageDTO {
private String msg;
private long id;
public MetricMessageDTO() {
// empty constructor is required bu Jackson for deserialization
}
public MetricMessageDTO(String msg, long id) {
Objects.requireNonNull(msg);
this.msg = msg;
this.id = id;
}
public String getMsg() {
return msg;
}
public long getId() {
return id;
}
public void setMsg(String msg) {
this.msg = msg;
}
public void setId(long id) {
this.id = id;
}
@Override
public String toString() {
return "MetricMessageDTO{" +
"msg='" + msg + '\'' +
", id=" + id +
'}';
}
}