Make threads configurable.

Switch from jetty to netty-http.
This commit is contained in:
Mark Nellemann 2021-06-16 10:20:17 +02:00
parent 3a89ab42f9
commit 126f256c6c
4 changed files with 19 additions and 13 deletions

View file

@ -1,4 +1,4 @@
version=0.0.4
version=0.0.5
pf4jVersion=3.6.0
slf4jVersion=1.7.30
camelVersion=3.10.0

View file

@ -18,10 +18,11 @@ dependencies {
implementation group: 'org.apache.camel', name: 'camel-core', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-main', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-rest', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-jetty', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-stream', version: camelVersion
//implementation group: 'org.apache.camel', name: 'camel-jetty', version: camelVersion
//implementation group: 'org.apache.camel', name: 'camel-stream', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-jackson', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-influxdb', version: camelVersion
implementation group: 'org.apache.camel', name: 'camel-netty-http', version: camelVersion
}
def projectName = "sysmon-server"

View file

@ -7,7 +7,6 @@ import picocli.CommandLine;
import java.io.IOException;
import java.net.URL;
import java.util.Properties;
import java.util.concurrent.Callable;
@CommandLine.Command(name = "sysmon-server", mixinStandardHelpOptions = true)
@ -22,12 +21,17 @@ public class Application implements Callable<Integer> {
@CommandLine.Option(names = { "-p", "--influxdb-pass" }, description = "InfluxDB Password (default: ${DEFAULT-VALUE}).", defaultValue = "", paramLabel = "<pass>")
private String influxPass;
//@CommandLine.Option(names = { "-d", "--influxdb-db" }, description = "InfluxDB Database (default: ${DEFAULT-VALUE}).", defaultValue = "", paramLabel = "<name>")
//private String influxName = "sysmon";
@CommandLine.Option(names = { "-H", "--server-host" }, description = "Server listening address (default: ${DEFAULT-VALUE}).", paramLabel = "<addr>")
private String listenHost = "0.0.0.0";
@CommandLine.Option(names = { "-P", "--server-port" }, description = "Server listening port (default: ${DEFAULT-VALUE}).", paramLabel = "<port>")
private Integer listenPort = 9925;
@CommandLine.Option(names = { "-t", "--threads" }, description = "Threads for processing inbound metrics(default: ${DEFAULT-VALUE}).", paramLabel = "<num>")
private Integer threads = 5;
public static void main(String... args) {
int exitCode = new CommandLine(new Application()).execute(args);
@ -38,17 +42,20 @@ public class Application implements Callable<Integer> {
@Override
public Integer call() throws IOException {
/*
Properties properties = new Properties();
properties.put("http.host", listenHost);
properties.put("http.port", listenPort);
*/
InfluxDB influxConnectionBean = InfluxDBFactory.connect(influxUrl.toString(), influxUser, influxPass);
Main main = new Main();
main.bind("myInfluxConnection", influxConnectionBean);
main.bind("http.host", listenHost);
main.bind("http.port", listenPort);
main.bind("properties", properties);
//main.bind("properties", properties);
main.bind("threads", threads);
//main.bind("influxdb_name", influxName);
main.configure().addRoutesBuilder(ServerRouteBuilder.class);
// now keep the application running until the JVM is terminated (ctrl + c or sigterm)

View file

@ -13,17 +13,19 @@ public class ServerRouteBuilder extends RouteBuilder {
Registry registry = getContext().getRegistry();
restConfiguration().component("jetty")
restConfiguration().component("netty-http")
.bindingMode(RestBindingMode.auto)
.host(registry.lookupByNameAndType("http.host", String.class))
.port(registry.lookupByNameAndType("http.port", Integer.class));
/*
rest()
.get("/")
.produces("text/html")
.route()
.to("log:stdout")
.endRest();
*/
rest()
.post("/metrics")
@ -36,15 +38,11 @@ public class ServerRouteBuilder extends RouteBuilder {
.to("seda:inbound")
.endRest();
//from("seda:inbound").log("Got metric from: ${header.component}").to("mock:sink");
// TODO: Make 'concurrentConsumers' configurable
from("seda:inbound?concurrentConsumers=5")
fromF("seda:inbound?concurrentConsumers=%s", registry.lookupByNameAndType("threads", Integer.class))
.log(">>> metric: ${header.hostname} - ${body}")
.doTry()
.process(new MetricResultToPointProcessor())
.to("influxdb://ref.myInfluxConnection?databaseName=sysmon&retentionPolicy=autogen")
.toF("influxdb://ref.myInfluxConnection?databaseName=%s&retentionPolicy=autogen", "sysmon")
.doCatch(Exception.class)
.log("Error storing metric to InfluxDB: ${exception}")
.end();