Merged in gelf (pull request #8)

Support incoming messages in GELF format (also compressed UDP).
This commit is contained in:
Mark Nellemann 2022-12-08 12:57:48 +00:00
commit 4a05014dbc
19 changed files with 341 additions and 73 deletions

View file

@ -1,64 +1,68 @@
# Syslog Server # Syslog Director
All received messages are written to *stdout* and/or forwarded to a remote logging destination. All received messages are written to *stdout* and/or forwarded to remote logging destinations.
The syslog server is able to listen on both UDP and TCP and parses syslog messages in either RFC5424 or RFC3164 (BSD) format. ![architecture](doc/syslogd.png)
This software is free to use and is licensed under the [Apache 2.0 License](https://bitbucket.org/mnellemann/syslogd/src/master/LICENSE). Supported incoming message formats are:
- RFC5424 UDP and TCP
![architecture](https://bitbucket.org/mnellemann/syslogd/downloads/syslogd.svg) - RFC3164 (BSD) UDP and TCP
- GELF format UDP and TCP (also compressed on UDP)
The default syslog port (514) requires you to run syslogd as root / administrator.
If you do not wish to do so, you can choose any port number (with the *-p* or *--port* flag) above 1024.
Supported remote logging destinations are: Supported remote logging destinations are:
- Syslog (RFC5424 over UDP) - Syslog (RFC5424 over UDP)
- Graylog (GELF over UDP) - Graylog (GELF over UDP)
- and Grafana Loki (HTTP over TCP). - and Grafana Loki (HTTP over TCP).
This software is free to use and is licensed under the [Apache 2.0 License](https://bitbucket.org/mnellemann/syslogd/src/master/LICENSE).
## Usage Instructions ## Usage Instructions
- Install the syslogd package (*.deb* or *.rpm*) from [downloads](https://bitbucket.org/mnellemann/syslogd/downloads/) or build from source. - Install the syslogd package (*.deb* or *.rpm*) from [downloads](https://bitbucket.org/mnellemann/syslogd/downloads/) or build from source.
- Run *bin/syslogd*, use the *-h* option for help :)
```text ```text
Usage: syslogd [-dhV] [--[no-]ansi] [--[no-]stdout] [--[no-]tcp] [--[no-]udp] Usage: syslogd [-dhV] [--[no-]ansi] [--[no-]stdout] [--[no-]tcp] [--[no-]udp]
[--rfc5424] [-g=<uri>] [-l=<url>] [-p=<num>] [-s=<uri>] [-f=<protocol>] [-p=<num>] [--to-gelf=<uri>] [--to-loki=<url>]
-d, --debug Enable debugging [default: 'false']. [--to-syslog=<uri>]
-g, --gelf=<uri> Forward to Graylog <udp://host:port>. -d, --debug Enable debugging [default: 'false'].
-h, --help Show this help message and exit. -f, --format=<protocol> Input format: RFC-5424, RFC-3164 or GELF [default:
-l, --loki=<url> Forward to Grafana Loki <http://host:port>. RFC-3164].
--[no-]ansi Output ANSI colors [default: true]. -h, --help Show this help message and exit.
--[no-]stdout Output messages to stdout [default: true]. --[no-]ansi Output in ANSI colors [default: true].
--[no-]tcp Listen on TCP [default: true]. --[no-]stdout Output messages to stdout [default: true].
--[no-]udp Listen on UDP [default: true]. --[no-]tcp Listen on TCP [default: true].
-p, --port=<num> Listening port [default: 514]. --[no-]udp Listen on UDP [default: true].
--rfc5424 Parse RFC-5424 messages [default: RFC-3164]. -p, --port=<num> Listening port [default: 1514].
-s, --syslog=<uri> Forward to Syslog <udp://host:port> (RFC-5424). --to-gelf=<uri> Forward to Graylog <udp://host:port>.
-V, --version Print version information and exit. --to-loki=<url> Forward to Grafana Loki <http://host:port>.
--to-syslog=<uri> Forward to Syslog <udp://host:port> (RFC-5424).
-V, --version Print version information and exit.
``` ```
The default syslog port (514) requires you to run syslogd as root / administrator.
Any port number above 1024 does not require privileges and can be selected with the *-p* or *--port* option.
### Examples ### Examples
Listening on a non-standard syslog port: Listening on the default syslog port:
``` ```
java -jar /path/to/syslogd-x.y.z-all.jar --port 1514 java -jar /path/to/syslogd-x.y.z-all.jar --port 514
``` ```
or, if installed as a *deb* or *rpm* package: or, if installed as a *deb* or *rpm* package:
``` ```
/opt/syslogd/bin/syslogd --port 1514 /opt/syslogd/bin/syslogd --port 514
``` ```
Listening on the standard syslog port (requires root privileges) and forwarding messages on to another log-system on a non-standard port. Forwarding messages on to another log-system on a non-standard port.
``` ```
java -jar /path/to/syslogd-x.y.z-all.jar --syslog udp://remotehost:514 java -jar /path/to/syslogd-x.y.z-all.jar --syslog udp://remotehost:514
``` ```
Forwarding to a Graylog server in GELF format. Forwarding messages to a Graylog server in GELF format.
``` ```
java -jar /path/to/syslogd-x.y.z-all.jar --gelf udp://remotehost:12201 java -jar /path/to/syslogd-x.y.z-all.jar --gelf udp://remotehost:12201

View file

@ -2,6 +2,7 @@ plugins {
id 'java' id 'java'
id 'groovy' id 'groovy'
id 'application' id 'application'
id 'jacoco'
id "com.github.johnrengelman.shadow" version "7.1.2" id "com.github.johnrengelman.shadow" version "7.1.2"
id "net.nemerosa.versioning" version "2.15.1" id "net.nemerosa.versioning" version "2.15.1"
id "nebula.ospackage" version "9.1.1" id "nebula.ospackage" version "9.1.1"
@ -16,20 +17,61 @@ dependencies {
implementation 'info.picocli:picocli:4.7.0' implementation 'info.picocli:picocli:4.7.0'
implementation 'org.slf4j:slf4j-api:2.0.5' implementation 'org.slf4j:slf4j-api:2.0.5'
implementation 'org.slf4j:slf4j-simple:2.0.5' implementation 'org.slf4j:slf4j-simple:2.0.5'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.1'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.1'
testImplementation 'org.spockframework:spock-core:2.3-groovy-3.0' testImplementation 'org.spockframework:spock-core:2.3-groovy-3.0'
testImplementation 'org.slf4j:slf4j-api:2.0.5' testImplementation 'org.slf4j:slf4j-api:2.0.5'
testRuntimeOnly 'org.slf4j:slf4j-simple:2.0.5'
} }
application { application {
getMainClass().set('biz.nellemann.syslogd.Application') getMainClass().set('biz.nellemann.syslogd.Application')
} }
java {
sourceCompatibility = 1.8
targetCompatibility = 1.8
}
test { test {
useJUnitPlatform() useJUnitPlatform()
} }
jacoco {
toolVersion = "0.8.8"
}
jacocoTestReport {
group = "verification"
reports {
xml.required = false
csv.required = false
html.destination file("${buildDir}/reports/coverage")
}
}
test.finalizedBy jacocoTestReport
jacocoTestCoverageVerification {
violationRules {
rule {
limit {
counter = 'LINE'
minimum = 0.3
}
limit {
counter = 'BRANCH'
minimum = 0.2
}
limit {
counter = 'CLASS'
minimum = 0.4
}
}
}
}
check.dependsOn jacocoTestCoverageVerification
apply plugin: 'nebula.ospackage' apply plugin: 'nebula.ospackage'
ospackage { ospackage {
packageName = 'syslogd' packageName = 'syslogd'
@ -80,7 +122,9 @@ jar {
} }
} }
java { tasks.create("packages") {
sourceCompatibility = 1.8 group "build"
targetCompatibility = 1.8 dependsOn ":build"
dependsOn ":buildDeb"
dependsOn ":buildRpm"
} }

View file

@ -7,10 +7,14 @@ Edit the **syslogd.service** and configure required options.
To install as a systemd service, copy the **syslogd.service** To install as a systemd service, copy the **syslogd.service**
file into */etc/systemd/system/* and enable the service: file into */etc/systemd/system/* and enable the service:
systemctl daemon-reload ```shell
systemctl enable syslogd.service systemctl daemon-reload
systemctl restart syslogd.service systemctl enable syslogd.service
systemctl restart syslogd.service
```
To read log output from the service, use: To read log output from the service, use:
journalctl -f -u syslogd.service ```shell
journalctl -f -u syslogd.service
```

1
doc/syslogd.drawio Normal file
View file

@ -0,0 +1 @@
<mxfile host="drawio-plugin" modified="2022-12-07T07:21:50.025Z" agent="5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/104.0.5112.102 Safari/537.36" etag="0NRdR40T7b5zyLZSsQHk" version="20.5.3" type="embed"><diagram id="23iRSUPoRavnBvh4doch" name="Page-1"><mxGraphModel dx="1115" dy="620" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="1169" pageHeight="827" math="0" shadow="0"><root><mxCell id="0"/><mxCell id="1" parent="0"/><mxCell id="10" style="edgeStyle=orthogonalEdgeStyle;orthogonalLoop=1;jettySize=auto;html=1;curved=1;sketch=1;shadow=1;" parent="1" source="2" target="3" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="2" value="RFC--3164" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;shadow=1;fillColor=#ffe6cc;strokeColor=#d79b00;" parent="1" vertex="1"><mxGeometry x="100" y="40" width="120" height="60" as="geometry"/></mxCell><mxCell id="13" style="edgeStyle=orthogonalEdgeStyle;curved=1;sketch=1;orthogonalLoop=1;jettySize=auto;html=1;shadow=1;" parent="1" source="3" target="9" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="14" style="edgeStyle=orthogonalEdgeStyle;curved=1;sketch=1;orthogonalLoop=1;jettySize=auto;html=1;shadow=1;" parent="1" source="3" target="6" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="15" style="edgeStyle=orthogonalEdgeStyle;curved=1;sketch=1;orthogonalLoop=1;jettySize=auto;html=1;shadow=1;" parent="1" source="3" target="7" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="16" style="edgeStyle=orthogonalEdgeStyle;curved=1;sketch=1;orthogonalLoop=1;jettySize=auto;html=1;shadow=1;" parent="1" source="3" target="8" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="3" value="syslogd" style="shape=parallelogram;perimeter=parallelogramPerimeter;whiteSpace=wrap;html=1;fixedSize=1;sketch=1;rounded=1;shadow=1;fillColor=#dae8fc;strokeColor=#6c8ebf;" parent="1" vertex="1"><mxGeometry x="280" y="130" width="120" height="60" as="geometry"/></mxCell><mxCell id="11" style="edgeStyle=orthogonalEdgeStyle;curved=1;sketch=1;orthogonalLoop=1;jettySize=auto;html=1;shadow=1;" parent="1" source="4" target="3" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="4" value="RFC--5424" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;shadow=1;fillColor=#fff2cc;strokeColor=#d6b656;" parent="1" vertex="1"><mxGeometry x="20" y="130" width="120" height="60" as="geometry"/></mxCell><mxCell id="12" style="edgeStyle=orthogonalEdgeStyle;curved=1;sketch=1;orthogonalLoop=1;jettySize=auto;html=1;shadow=1;" parent="1" source="5" target="3" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="5" value="GELF" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;shadow=1;fillColor=#f8cecc;strokeColor=#b85450;" parent="1" vertex="1"><mxGeometry x="130" y="223.5" width="120" height="60" as="geometry"/></mxCell><mxCell id="6" value="Syslog&lt;br&gt;RFC-5424" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;shadow=1;fillColor=#d5e8d4;strokeColor=#82b366;" parent="1" vertex="1"><mxGeometry x="570" y="80" width="120" height="60" as="geometry"/></mxCell><mxCell id="7" value="Grafana Loki" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;shadow=1;fillColor=#e1d5e7;strokeColor=#9673a6;" parent="1" vertex="1"><mxGeometry x="550" y="170" width="120" height="60" as="geometry"/></mxCell><mxCell id="8" value="Graylog" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;shadow=1;fillColor=#fff2cc;strokeColor=#d6b656;" parent="1" vertex="1"><mxGeometry x="420" y="250" width="120" height="60" as="geometry"/></mxCell><mxCell id="9" value="Standard&lt;br&gt;Output" style="rounded=1;whiteSpace=wrap;html=1;sketch=1;shadow=1;fillColor=#f5f5f5;strokeColor=#666666;fontColor=#333333;" parent="1" vertex="1"><mxGeometry x="410" y="30" width="120" height="60" as="geometry"/></mxCell></root></mxGraphModel></diagram></mxfile>

BIN
doc/syslogd.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 172 KiB

View file

@ -1,10 +1,10 @@
[Unit] [Unit]
Description=Simple Syslog Service Description=Syslog Director
[Service] [Service]
TimeoutStartSec=0 TimeoutStartSec=0
Restart=always Restart=always
ExecStart=/opt/syslogd/bin/syslogd --no-stdout --syslog=udp://localhost:1514 ExecStart=/opt/syslogd/bin/syslogd --port 514 --no-stdout --syslog=udp://localhost:1514
[Install] [Install]
WantedBy=default.target WantedBy=default.target

View file

@ -1,3 +1,5 @@
id = syslogd id = syslogd
name = syslogd
group = biz.nellemann.syslogd group = biz.nellemann.syslogd
version = 1.2.8 version = 1.3.0
description = "Syslog Director"

View file

@ -17,6 +17,7 @@ package biz.nellemann.syslogd;
import biz.nellemann.syslogd.msg.SyslogMessage; import biz.nellemann.syslogd.msg.SyslogMessage;
import biz.nellemann.syslogd.net.*; import biz.nellemann.syslogd.net.*;
import biz.nellemann.syslogd.parser.GelfParser;
import biz.nellemann.syslogd.parser.SyslogParser; import biz.nellemann.syslogd.parser.SyslogParser;
import biz.nellemann.syslogd.parser.SyslogParserRfc3164; import biz.nellemann.syslogd.parser.SyslogParserRfc3164;
import biz.nellemann.syslogd.parser.SyslogParserRfc5424; import biz.nellemann.syslogd.parser.SyslogParserRfc5424;
@ -42,7 +43,7 @@ public class Application implements Callable<Integer>, LogReceiveListener {
private SyslogParser syslogParser; private SyslogParser syslogParser;
@CommandLine.Option(names = {"-p", "--port"}, description = "Listening port [default: 514].", defaultValue = "514", paramLabel = "<num>") @CommandLine.Option(names = {"-p", "--port"}, description = "Listening port [default: 1514].", defaultValue = "1514", paramLabel = "<num>")
private int port; private int port;
@CommandLine.Option(names = "--no-udp", negatable = true, description = "Listen on UDP [default: true].", defaultValue = "true") @CommandLine.Option(names = "--no-udp", negatable = true, description = "Listen on UDP [default: true].", defaultValue = "true")
@ -51,22 +52,22 @@ public class Application implements Callable<Integer>, LogReceiveListener {
@CommandLine.Option(names = "--no-tcp", negatable = true, description = "Listen on TCP [default: true].", defaultValue = "true") @CommandLine.Option(names = "--no-tcp", negatable = true, description = "Listen on TCP [default: true].", defaultValue = "true")
private boolean tcpServer; private boolean tcpServer;
@CommandLine.Option(names = "--no-ansi", negatable = true, description = "Output ANSI colors [default: true].", defaultValue = "true") @CommandLine.Option(names = "--no-ansi", negatable = true, description = "Output in ANSI colors [default: true].", defaultValue = "true")
private boolean ansiOutput; private boolean ansiOutput;
@CommandLine.Option(names = "--no-stdout", negatable = true, description = "Output messages to stdout [default: true].", defaultValue = "true") @CommandLine.Option(names = "--no-stdout", negatable = true, description = "Output messages to stdout [default: true].", defaultValue = "true")
private boolean stdout; private boolean stdout;
@CommandLine.Option(names = "--rfc5424", description = "Parse RFC-5424 messages [default: RFC-3164].", defaultValue = "false") @CommandLine.Option(names = {"-f", "--format"}, description = "Input format: RFC-5424, RFC-3164 or GELF [default: RFC-3164].", defaultValue = "RFC-3164")
private boolean rfc5424; private String protocol;
@CommandLine.Option(names = { "-s", "--syslog"}, description = "Forward to Syslog <udp://host:port> (RFC-5424).", paramLabel = "<uri>") @CommandLine.Option(names = { "--to-syslog"}, description = "Forward to Syslog <udp://host:port> (RFC-5424).", paramLabel = "<uri>")
private URI syslog; private URI syslog;
@CommandLine.Option(names = { "-g", "--gelf"}, description = "Forward to Graylog <udp://host:port>.", paramLabel = "<uri>") @CommandLine.Option(names = { "--to-gelf"}, description = "Forward to Graylog <udp://host:port>.", paramLabel = "<uri>")
private URI gelf; private URI gelf;
@CommandLine.Option(names = { "-l", "--loki"}, description = "Forward to Grafana Loki <http://host:port>.", paramLabel = "<url>") @CommandLine.Option(names = { "--to-loki"}, description = "Forward to Grafana Loki <http://host:port>.", paramLabel = "<url>")
private URL loki; private URL loki;
@CommandLine.Option(names = { "-d", "--debug" }, description = "Enable debugging [default: 'false'].") @CommandLine.Option(names = { "-d", "--debug" }, description = "Enable debugging [default: 'false'].")
@ -81,7 +82,9 @@ public class Application implements Callable<Integer>, LogReceiveListener {
System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG");
} }
if(rfc5424) { if(protocol.equalsIgnoreCase("GELF"))
syslogParser = new GelfParser();
else if (protocol.equalsIgnoreCase("RFC-5424")) {
syslogParser = new SyslogParserRfc5424(); syslogParser = new SyslogParserRfc5424();
} else { } else {
syslogParser = new SyslogParserRfc3164(); syslogParser = new SyslogParserRfc3164();
@ -132,10 +135,9 @@ public class Application implements Callable<Integer>, LogReceiveListener {
public void onLogEvent(LogReceiveEvent event) { public void onLogEvent(LogReceiveEvent event) {
// Parse message // Parse message
String message = event.getMessage();
SyslogMessage msg = null; SyslogMessage msg = null;
try { try {
msg = syslogParser.parse(message); msg = syslogParser.parse(event.getBytes());
} catch(Exception e) { } catch(Exception e) {
e.printStackTrace(); e.printStackTrace();
} }

View file

@ -15,20 +15,34 @@
*/ */
package biz.nellemann.syslogd; package biz.nellemann.syslogd;
import java.net.DatagramPacket;
import java.nio.charset.StandardCharsets;
import java.util.EventObject; import java.util.EventObject;
public class LogReceiveEvent extends EventObject { public class LogReceiveEvent extends EventObject {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private final String message; //private final String message;
private final DatagramPacket packet;
/*
public LogReceiveEvent(final Object source, final String message ) { public LogReceiveEvent(final Object source, final String message ) {
super( source ); super( source );
this.message = message; this.message = message;
} }
*/
public String getMessage() { public LogReceiveEvent(final Object source, final DatagramPacket packet) {
return message; super( source );
this.packet = packet;
}
public String getText() {
return new String(packet.getData(), packet.getOffset(), packet.getLength(), StandardCharsets.UTF_8);
}
public byte[] getBytes() {
return packet.getData();
} }
} }

View file

@ -106,7 +106,7 @@ public class SyslogPrinter {
StringBuilder sb = new StringBuilder("{ \"version\": \"1.1\","); StringBuilder sb = new StringBuilder("{ \"version\": \"1.1\",");
sb.append(String.format("\"host\": \"%s\",", msg.hostname)); sb.append(String.format("\"host\": \"%s\",", msg.hostname));
sb.append(String.format("\"short_message\": \"%s\",", JsonUtil.encode(msg.message))); sb.append(String.format("\"short_message\": \"%s\",", JsonUtil.encode(msg.message)));
//sb.append(String.format("\"full_message\": \"%s\",", msg.message)); sb.append(String.format("\"full_message\": \"%s\",", msg.structuredData));
sb.append(String.format("\"timestamp\": %d,", msg.timestamp.getEpochSecond())); sb.append(String.format("\"timestamp\": %d,", msg.timestamp.getEpochSecond()));
sb.append(String.format("\"level\": %d,", msg.severity.toNumber())); sb.append(String.format("\"level\": %d,", msg.severity.toNumber()));
sb.append(String.format("\"_facility\": \"%s\",", msg.facility)); sb.append(String.format("\"_facility\": \"%s\",", msg.facility));

View file

@ -15,38 +15,55 @@
*/ */
package biz.nellemann.syslogd.msg; package biz.nellemann.syslogd.msg;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant; import java.time.Instant;
@JsonIgnoreProperties(ignoreUnknown = true)
public class SyslogMessage { public class SyslogMessage {
public Facility facility; @JsonIgnore
public Severity severity; public Facility facility = Facility.user;
@JsonProperty("level")
public Severity severity = Severity.info;
// The VERSION field denotes the version of the syslog protocol specification. // The VERSION field denotes the version of the syslog protocol specification.
public Integer version; public String version;
// The TIMESTAMP field is a formalized timestamp derived from [RFC3339]. // The TIMESTAMP field is a formalized timestamp derived from [RFC3339].
@JsonProperty("timestamp") // 1670357783.694 - in GELF: seconds since UNIX epoch with optional decimal places for milliseconds
public Instant timestamp; public Instant timestamp;
// The HOSTNAME field identifies the machine that originally sent the syslog message. // The HOSTNAME field identifies the machine that originally sent the syslog message.
@JsonProperty("host")
public String hostname; public String hostname;
// The APP-NAME field SHOULD identify the device or application that originated the message. // The APP-NAME field SHOULD identify the device or application that originated the message.
@JsonProperty("_logger_name")
public String application; public String application;
// The PROCID field is often used to provide the process name or process ID associated with a syslog system. // The PROCID field is often used to provide the process name or process ID associated with a syslog system.
@JsonProperty("_thread_name")
public String processId; public String processId;
// The MSGID SHOULD identify the type of message. // The MSGID SHOULD identify the type of message.
@JsonIgnore
public String messageId; public String messageId;
// STRUCTURED-DATA provides a mechanism to express information in a well defined, easily parseable and interpretable data format. // STRUCTURED-DATA provides a mechanism to express information in a well defined, easily parseable and interpretable data format.
@JsonProperty("full_message")
public String structuredData; public String structuredData;
// The MSG part contains a free-form message that provides information about the event. // The MSG part contains a free-form message that provides information about the event.
public final String message; @JsonProperty("short_message")
public String message;
public SyslogMessage(final String message) { @JsonCreator
public SyslogMessage(@JsonProperty("short_message") final String message) {
this.message = message; this.message = message;
} }

View file

@ -34,7 +34,6 @@ public class LokiClient implements LogForwardListener, Runnable {
private final ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1024); private final ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1024);
private final URL url; private final URL url;
private boolean keepRunning = true;
public LokiClient(URL url) { public LokiClient(URL url) {
@ -82,7 +81,7 @@ public class LokiClient implements LogForwardListener, Runnable {
@Override @Override
public void run() { public void run() {
while (keepRunning) { while (true) {
try { try {
send(blockingQueue.take()); send(blockingQueue.take());
} catch (Exception e) { } catch (Exception e) {

View file

@ -21,8 +21,10 @@ import biz.nellemann.syslogd.LogReceiveListener;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.DatagramPacket;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
@ -31,10 +33,6 @@ public class TcpServer {
private final int port; private final int port;
private ServerSocket serverSocket; private ServerSocket serverSocket;
public TcpServer() {
this(514);
}
public TcpServer(int port) { public TcpServer(int port) {
this.port = port; this.port = port;
} }
@ -84,6 +82,8 @@ public class TcpServer {
public void run() { public void run() {
// GELF TCP does not support compression due to the use of the null byte (\0) as frame delimiter.
// Is \0 also used as frame delimiter for regular syslog messages ?
try { try {
in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
String inputLine; String inputLine;
@ -105,7 +105,8 @@ public class TcpServer {
private synchronized void sendEvent(String message) { private synchronized void sendEvent(String message) {
LogReceiveEvent event = new LogReceiveEvent( this, message ); DatagramPacket packet = new DatagramPacket(message.getBytes(StandardCharsets.UTF_8), message.length());
LogReceiveEvent event = new LogReceiveEvent( this, packet);
for (LogReceiveListener eventListener : eventListeners) { for (LogReceiveListener eventListener : eventListeners) {
eventListener.onLogEvent(event); eventListener.onLogEvent(event);
} }

View file

@ -30,24 +30,20 @@ public class UdpServer extends Thread {
protected DatagramSocket socket; protected DatagramSocket socket;
protected boolean listen = true; protected boolean listen = true;
public UdpServer() throws IOException {
this(514);
}
public UdpServer(int port) throws IOException { public UdpServer(int port) throws IOException {
socket = new DatagramSocket(port); socket = new DatagramSocket(port);
} }
public void run() { public void run() {
byte[] buf = new byte[4096]; byte[] buf = new byte[8192];
DatagramPacket packet = new DatagramPacket(buf, buf.length); DatagramPacket packet = new DatagramPacket(buf, buf.length);
while (listen) { while (listen) {
try { try {
socket.receive(packet); socket.receive(packet);
String packetData = new String(packet.getData(), packet.getOffset(), packet.getLength(), StandardCharsets.UTF_8); //String packetData = new String(packet.getData(), packet.getOffset(), packet.getLength(), StandardCharsets.UTF_8);
sendEvent(packetData); sendEvent(packet);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
listen = false; listen = false;
@ -56,13 +52,21 @@ public class UdpServer extends Thread {
socket.close(); socket.close();
} }
/*
private synchronized void sendEvent(String message) { private synchronized void sendEvent(String message) {
LogReceiveEvent event = new LogReceiveEvent( this, message); LogReceiveEvent event = new LogReceiveEvent( this, message);
for (LogReceiveListener eventListener : eventListeners) { for (LogReceiveListener eventListener : eventListeners) {
eventListener.onLogEvent(event); eventListener.onLogEvent(event);
} }
} }
*/
private synchronized void sendEvent(DatagramPacket packet) {
LogReceiveEvent event = new LogReceiveEvent( this, packet);
for (LogReceiveListener eventListener : eventListeners) {
eventListener.onLogEvent(event);
}
}
/** /**
* Event Listener Configuration * Event Listener Configuration

View file

@ -0,0 +1,77 @@
package biz.nellemann.syslogd.parser;
import biz.nellemann.syslogd.msg.SyslogMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.time.Instant;
import java.util.zip.DataFormatException;
public class GelfParser extends SyslogParser {
private final static Logger log = LoggerFactory.getLogger(GelfParser.class);
private final ObjectMapper objectMapper;
public GelfParser() {
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
}
@Override
public SyslogMessage parse(String input) {
SyslogMessage message = null;
try {
message = objectMapper.readValue(input, SyslogMessage.class);
} catch (JsonProcessingException e) {
log.warn("parse() - error: {}", e.getMessage());
}
return message;
}
/*
https://go2docs.graylog.org/5-0/getting_in_log_data/gelf.html
zlib signatures at offset 0
78 01 : No Compression (no preset dictionary)
78 5E : Best speed (no preset dictionary)
78 9C : Default Compression (no preset dictionary)
78 DA : Best Compression (no preset dictionary)
78 20 : No Compression (with preset dictionary)
78 7D : Best speed (with preset dictionary)
78 BB : Default Compression (with preset dictionary)
78 F9 : Best Compression (with preset dictionary)
gzip signature at offset 0
1F 8B : GZIP compressed
*/
@Override
public SyslogMessage parse(byte[] input) {
String text = null;
if(input[0] == (byte)0x78 && input[1] == (byte)0x9c) { // Compressed data - TODO: better detection ?
try {
text = decompress(input);
} catch (DataFormatException | UnsupportedEncodingException e) {
log.error("parse() - error: {}", e.getMessage());
}
} else {
text = byteArrayToString(input);
}
return parse(text);
}
@Override
public Instant parseTimestamp(String dateString) {
return null;
}
}

View file

@ -16,11 +16,17 @@
package biz.nellemann.syslogd.parser; package biz.nellemann.syslogd.parser;
import biz.nellemann.syslogd.msg.SyslogMessage; import biz.nellemann.syslogd.msg.SyslogMessage;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.time.Instant; import java.time.Instant;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
public abstract class SyslogParser { public abstract class SyslogParser {
public abstract SyslogMessage parse(final String input); public abstract SyslogMessage parse(final String input);
public abstract SyslogMessage parse(final byte[] input);
public abstract Instant parseTimestamp(final String dateString); public abstract Instant parseTimestamp(final String dateString);
@ -50,4 +56,23 @@ public abstract class SyslogParser {
return severity; return severity;
} }
protected String byteArrayToString(byte[] input) {
return new String(input, 0, input.length, StandardCharsets.UTF_8);
}
protected String decompress(byte[] data) throws UnsupportedEncodingException, DataFormatException {
// Decompress the bytes
Inflater decompressor = new Inflater();
decompressor.setInput(data, 0, data.length);
byte[] result = new byte[data.length * 2];
int resultLength = decompressor.inflate(result);
decompressor.end();
// Decode the bytes into a String
return new String(result, 0, resultLength, StandardCharsets.UTF_8);
}
} }

View file

@ -15,6 +15,7 @@
*/ */
package biz.nellemann.syslogd.parser; package biz.nellemann.syslogd.parser;
import biz.nellemann.syslogd.LogReceiveEvent;
import biz.nellemann.syslogd.msg.Facility; import biz.nellemann.syslogd.msg.Facility;
import biz.nellemann.syslogd.msg.Severity; import biz.nellemann.syslogd.msg.Severity;
import biz.nellemann.syslogd.msg.SyslogMessage; import biz.nellemann.syslogd.msg.SyslogMessage;
@ -77,6 +78,11 @@ public class SyslogParserRfc3164 extends SyslogParser {
return syslogMessage; return syslogMessage;
} }
@Override
public SyslogMessage parse(byte[] input) {
return parse(byteArrayToString(input));
}
/** /**
* Parse rfc3164 TIMESTAMP field into Instant. * Parse rfc3164 TIMESTAMP field into Instant.

View file

@ -15,6 +15,7 @@
*/ */
package biz.nellemann.syslogd.parser; package biz.nellemann.syslogd.parser;
import biz.nellemann.syslogd.LogReceiveEvent;
import biz.nellemann.syslogd.msg.Severity; import biz.nellemann.syslogd.msg.Severity;
import biz.nellemann.syslogd.msg.Facility; import biz.nellemann.syslogd.msg.Facility;
import biz.nellemann.syslogd.msg.SyslogMessage; import biz.nellemann.syslogd.msg.SyslogMessage;
@ -71,7 +72,7 @@ public class SyslogParserRfc5424 extends SyslogParser {
SyslogMessage syslogMessage = new SyslogMessage(msg.trim()); SyslogMessage syslogMessage = new SyslogMessage(msg.trim());
syslogMessage.facility = Facility.getByNumber(facility); syslogMessage.facility = Facility.getByNumber(facility);
syslogMessage.severity = Severity.getByNumber(severity); syslogMessage.severity = Severity.getByNumber(severity);
syslogMessage.version = Integer.parseInt(ver); syslogMessage.version = ver;
syslogMessage.timestamp = parseTimestamp(date); syslogMessage.timestamp = parseTimestamp(date);
syslogMessage.hostname = host; syslogMessage.hostname = host;
if(app != null && !app.equals("-")) if(app != null && !app.equals("-"))
@ -86,6 +87,11 @@ public class SyslogParserRfc5424 extends SyslogParser {
return syslogMessage; return syslogMessage;
} }
@Override
public SyslogMessage parse(byte[] input) {
return parse(byteArrayToString(input));
}
/** /**
* Parse rfc5424 TIMESTAMP field into Instant. * Parse rfc5424 TIMESTAMP field into Instant.

View file

@ -0,0 +1,62 @@
package biz.nellemann.syslogd
import biz.nellemann.syslogd.msg.Facility
import biz.nellemann.syslogd.msg.Severity
import biz.nellemann.syslogd.msg.SyslogMessage
import biz.nellemann.syslogd.parser.GelfParser
import biz.nellemann.syslogd.parser.SyslogParser
import spock.lang.Specification
class GelfParserTest extends Specification {
SyslogParser syslogParser;
void setup() {
syslogParser = new GelfParser();
}
void "uncompressed GELF message"() {
/*
0x7b 0x220x760x650x720x730x690x6f0x6e0x220x3a0x220x310x2e0x310x220x2c0x220x680x6f0x730x740x220x3a0x220x700x6f0x700x2d0x6f0x730x2e0x6c0x6f0x630x610x6c0x640x6f0x6d0x610x690x6e0x220x2c0x220x730x680x6f0x720x740x5f0x6d0x650x730x730x610x670x650x220x3a0x220x6d0x610x690x6e0x280x290x200x2d0x200x530x740x610x720x740x690x6e0x670x200x560x540x440x2d0x430x610x6d0x650x720x610x220x2c0x220x660x750x6c0x6c0x5f0x6d0x650x730x730x610x670x650x220x3a0x220x6d0x610x690x6e0x280x290x200x2d0x200x530x740x610x720x740x690x6e0x670x200x560x540x440x2d0x430x610x6d0x650x720x610x5c0x5c0x6e0x220x2c0x220x740x690x6d0x650x730x740x610x6d0x700x220x3a0x310x360x370x300x330x350x370x370x380x330x2e0x360x390x340x2c0x220x6c0x650x760x650x6c0x220x3a0x340x2c0x220x5f0x740x680x720x650x610x640x5f0x6e0x610x6d0x650x220x3a0x220x6d0x610x690x6e0x220x2c0x220x5f0x6c0x6f0x670x670x650x720x5f0x6e0x610x6d0x650x220x3a0x220x760x740x640x2e0x630x610x6d0x650x720x610x2e0x410x700x700x6c0x690x630x610x740x690x6f0x6e0x220x7d0x0a0x560x1f0x210x470x710x2c0xeb0x090x740x8e0x060x6f0x7f0xe70x7a0x330x890xb80xe10x6e0xa40x090xe90x9f0x870x1c0x290x900xcd0x880x4b0xea0xce0x9b0xf30x660xbc0x880x7b0x6f0x410x610x8f0x4c0xf00x570xaa0x590x620x180x8b0x820x730x040x350x3e0x120x8b0x930x7b0x130x780x5b0xb50x150x870x4f0xd00x5c0xb60xec0xfb0x070x9a0x690xc70xe8
*/
setup:
def input = '{"version":"1.1","host":"pop-os.localdomain","short_message":"main() - Starting VTD-Camera","full_message":"main() - Starting VTD-Camera\\n","timestamp":1670357783.694,"level":4,"_thread_name":"main","_logger_name":"vtd.camera.Application"}'
when:
SyslogMessage msg = syslogParser.parse(input)
then:
msg.version == "1.1"
msg.message == "main() - Starting VTD-Camera"
msg.hostname == "pop-os.localdomain"
msg.application == "vtd.camera.Application"
msg.processId == "main"
msg.timestamp.toString() == "2022-12-06T20:16:23.694Z"
}
void "compressed GELF message"() {
// GELF Magic Bytes: 0x1e, 0x0f
setup:
byte[] input = [ 120, -100, -51, 81, 77, 107, -61, 48, 12, -3, 43, -63, -25, -38, -115, -77, -75, -51, 2, -127, -11, -80, -61, 96, -73, 94, 11, -63, 56, 90, -30, -59, 31, -63, -106, -53, -54, -40, 127, -97, 92, 86, -40, 79, -40, 69, -78, -97, -11, -12, -98, -28, 47, 118, -127, -104, 76, -16, -84, 99, 82, 72, -74, 97, 115, 72, 72, 23, -77, 114, 89, 115, 73, 65, -14, 102, 87, 11, -56, 92, -125, -57, -88, 44, -105, 66, 7, -73, 102, 4, 97, 60, 66, -12, -54, 18, 45, -51, 33, -30, -32, 32, 37, 53, 1, -15, -31, 66, -43, -3, 49, -29, 76, -39, 104, -123, -92, 113, -54, 90, 83, -63, 75, 121, -86, 114, 42, 84, 7, -67, 83, 113, 17, 30, -84, 5, -89, -68, 127, -98, -100, 50, -74, 40, 84, 8, 94, 81, -113, -90, -118, -32, 2, -62, 113, 28, 35, -79, 123, -39, -18, -124, 108, -91, 104, -102, 7, 33, -27, -95, 74, 4, 82, -13, -41, -79, -9, -39, 22, 43, -17, -108, -2, -127, -109, -77, 39, 47, 104, -56, 8, 42, -73, -78, 78, -18, 15, -11, -82, -106, -121, -89, 86, -44, -113, -5, 13, -77, 100, -52, -78, -114, 78, -125, 90, -41, 65, -121, 76, -21, -67, -110, -31, 113, 97, -65, 88, 113, 69, -128, -93, 61, -57, -126, -39, 48, 77, 16, -17, -80, -6, 8, 57, -118, 99, -119, 39, -48, 57, 26, -68, -2, -99, -21, -51, 36, -14, 13, 55, 34, 77, 72, -1, 60, -28, 72, -126, 108, 70, 92, 83, 119, -34, -98, -73, -29, 34, 110, -67, 5, -119, -35, 53, -63, 95, -88, 102, -115, 97, 44, 8, -50, 17, -44, 120, 87, 44, 76, -18, 77, -32, 109, -35, -42, 28, 62, 65, -13, -122, 125, -1, 0, -40, 60, -57, -72 ];
when:
SyslogMessage msg = syslogParser.parse(input)
then:
msg.version == "1.1"
msg.facility == Facility.user;
msg.severity == Severity.info;
msg.message == "event=AuthenticationSuccessEvent username=mark.nellemann@gmail.com tenant=2 remoteAddress=185.181.223.117 sessionId=null"
msg.hostname == "ip-10-1-101-250.eu-central-1.compute.internal"
msg.application == "ajour.AjourSecuritySuccessEventListener"
msg.processId == "http-nio-8080-exec-2"
msg.timestamp.toString() == "2022-12-08T12:16:38.046Z"
}
}