Work on forwarding syslog messaged to Grafana Loki.

This commit is contained in:
Mark Nellemann 2021-03-16 22:01:11 +01:00
parent 058198003b
commit 883963b033
6 changed files with 183 additions and 42 deletions

View file

@ -63,3 +63,13 @@ If you don't want any output locally (only forwarding), you can use the ```--no-
Syslog messages from AIX (and IBM Power Virtual I/O Servers) can be troublesome with some logging solutions. These can be received with
syslogd and optionally forwarded on to Graylog, Splunk or other logging solutions.
## Development
### Test Grafana Loki
```shell
docker run --rm -d --name=loki -p 3100:3100 grafana/loki
```

View file

@ -16,6 +16,7 @@
package biz.nellemann.syslogd;
import biz.nellemann.syslogd.msg.SyslogMessage;
import biz.nellemann.syslogd.net.LokiClient;
import biz.nellemann.syslogd.net.TcpServer;
import biz.nellemann.syslogd.net.UdpClient;
import biz.nellemann.syslogd.net.UdpServer;
@ -28,6 +29,7 @@ import picocli.CommandLine;
import picocli.CommandLine.Command;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Callable;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -40,6 +42,8 @@ public class Application implements Callable<Integer>, LogListener {
private boolean doForward = false;
private SyslogParser syslogParser;
private UdpClient udpClient;
private UdpClient gelfClient;
private LokiClient lokiClient;
@CommandLine.Option(names = {"-p", "--port"}, description = "Listening port [default: 514].", defaultValue = "514")
@ -60,11 +64,14 @@ public class Application implements Callable<Integer>, LogListener {
@CommandLine.Option(names = "--rfc5424", description = "Parse RFC-5424 messages [default: RFC-3164].", defaultValue = "false")
private boolean rfc5424;
@CommandLine.Option(names = { "-f", "--forward"}, description = "Forward to UDP host[:port] (RFC-5424).", paramLabel = "<host>")
private String forward;
@CommandLine.Option(names = { "-s", "--syslog"}, description = "Forward to Syslog UDP host[:port] (RFC-5424).", paramLabel = "<host>")
private String syslog;
@CommandLine.Option(names = { "-g", "--gelf"}, description = "Forward in Graylog (GELF) JSON format.", defaultValue = "false")
private boolean gelf;
@CommandLine.Option(names = { "-g", "--gelf"}, description = "Forward to Graylog host[:port] (GELF).", paramLabel = "<host>")
private String gelf;
@CommandLine.Option(names = { "-l", "--loki"}, description = "Forward to Grafana Loki.", paramLabel = "<url>")
private String loki;
@CommandLine.Option(names = { "-d", "--debug" }, description = "Enable debugging [default: 'false'].")
private boolean enableDebug = false;
@ -83,23 +90,18 @@ public class Application implements Callable<Integer>, LogListener {
syslogParser = new SyslogParserRfc3164();
}
if(forward != null && !forward.isEmpty()) {
String fHost, fPort;
Pattern pattern = Pattern.compile("^([^:]+)(?::([0-9]+))?$", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(forward);
if(matcher.find()) {
fHost = matcher.group(1);
if(matcher.groupCount() == 2 && matcher.group(2) != null) {
fPort = matcher.group(2);
} else {
fPort = "514";
}
} else {
fHost = "localhost";
fPort = "514";
}
if(syslog != null && !syslog.isEmpty()) {
udpClient = new UdpClient(getInetSocketAddress(syslog));
doForward = true;
}
udpClient = new UdpClient(fHost, Integer.parseInt(fPort));
if(gelf != null && !gelf.isEmpty()) {
gelfClient = new UdpClient(getInetSocketAddress(gelf));
doForward = true;
}
if(loki != null && !loki.isEmpty()) {
lokiClient = new LokiClient(loki);
doForward = true;
}
@ -143,11 +145,19 @@ public class Application implements Callable<Integer>, LogListener {
if(doForward) {
try {
if(gelf) {
udpClient.send(SyslogPrinter.toGelf(msg));
} else {
if(udpClient != null) {
udpClient.send(SyslogPrinter.toRfc5424(msg));
}
if(gelfClient != null) {
gelfClient.send(SyslogPrinter.toGelf(msg));
}
if(lokiClient != null) {
lokiClient.send(SyslogPrinter.toLoki(msg));
}
} catch (Exception e) {
e.printStackTrace();
}
@ -157,6 +167,28 @@ public class Application implements Callable<Integer>, LogListener {
}
private InetSocketAddress getInetSocketAddress(String input) {
String dstHost;
int dstPort;
InetSocketAddress inetSocketAddress = null;
Pattern pattern = Pattern.compile("^([^:]+)(?::([0-9]+))?$", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(input);
if(matcher.find()) {
dstHost = matcher.group(1);
if(matcher.groupCount() == 2 && matcher.group(2) != null) {
dstPort = Integer.parseInt(matcher.group(2));
} else {
dstPort = 514;
}
inetSocketAddress = new InetSocketAddress(dstHost, dstPort);
}
return inetSocketAddress;
}
public static void main(String... args) {
int exitCode = new CommandLine(new Application()).execute(args);
System.exit(exitCode);

View file

@ -81,7 +81,7 @@ public class SyslogPrinter {
/**
* Return a GELF formatted string of the SyslogMessage.
* Return a GELF JSON formatted string of the SyslogMessage.
* https://www.graylog.org/features/gelf
* @param msg
* @return
@ -104,6 +104,32 @@ public class SyslogPrinter {
}
/**
* Return a Loki JSON formatted string of the SyslogMessage.
* https://grafana.com/docs/loki/latest/api/
* @param msg
* @return
*/
/*
{ "streams": [ { "stream": { "label": "value" }, "values": [ [ "<unix epoch in nanoseconds>", "<log line>" ], [ "<unix epoch in nanoseconds>", "<log line>" ] ] } ] }
{ "streams": [ { "stream": { "host": "hyperion", "facility": "USER", "severity": "NOTICE", "application": "mark"}, "values": [ [ "1615823598000000000", "Test 2345534343434" ] ] } ] }
{ "streams": [ { "stream": { "host": "hyperion", "facility": "USER", "severity": "NOTICE", "application": "mark"}, "values": [ [ "1615842165000000000", "Test" ] ] } ] }
*/
public static String toLoki(SyslogMessage msg) {
StringBuilder sb = new StringBuilder("{ \"streams\": [ { \"stream\": {");
sb.append(String.format(" \"host\": \"%s\",", msg.hostname));
sb.append(String.format(" \"facility\": \"%s\",", msg.facility));
sb.append(String.format(" \"severity\": \"%s\",", msg.severity));
sb.append(String.format(" \"application\": \"%s\"", msg.application));
sb.append("}, \"values\": [ ");
sb.append(String.format("[ \"%d\", \"%s\" ]", msg.timestamp.getEpochSecond() * 1000000000l, msg.message));
sb.append(" ] } ] }");
return sb.toString();
}
static private String getPri(Facility facility, Severity severity) {
int pri = (facility.toNumber() * 8) + severity.toNumber();
return String.format("%c%d%c", '<', pri, '>');

View file

@ -0,0 +1,73 @@
package biz.nellemann.syslogd.net;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.*;
import java.nio.charset.StandardCharsets;
public class LokiClient {
private final static Logger log = LoggerFactory.getLogger(LokiClient.class);
private final URL url;
public LokiClient(String url) throws MalformedURLException {
this.url = new URL(url);
}
public void send(String msg) throws MalformedURLException {
URL pushUrl = new URL(url, "/loki/api/v1/push");
log.warn("send() - URL: " + pushUrl.toString());
HttpURLConnection con = null;
try {
con = (HttpURLConnection)pushUrl.openConnection();
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type", "application/json");
//con.setRequestProperty("Content-Type", "application/json; utf-8");
//con.setRequestProperty("Accept", "application/json");
con.setDoOutput(true);
byte[] input = msg.getBytes(StandardCharsets.US_ASCII);
try(OutputStream os = con.getOutputStream()) {
os.write(input, 0, input.length);
log.warn("send() - Data: " + msg);
} catch (IOException e) {
e.printStackTrace();
}
StringBuilder content;
try (BufferedReader br = new BufferedReader(
new InputStreamReader(con.getInputStream()))) {
String line;
content = new StringBuilder();
while ((line = br.readLine()) != null) {
content.append(line);
content.append(System.lineSeparator());
}
}
System.out.println(content.toString());
} catch (IOException e) {
log.warn("send() - " + e.getMessage());
} finally {
if(con != null) {
con.disconnect();
}
}
}
}

View file

@ -11,30 +11,17 @@ public class UdpClient {
private final static Logger log = LoggerFactory.getLogger(UdpClient.class);
private InetSocketAddress inetSocketAddress;
private DatagramSocket socket;
private InetAddress address;
private final Integer port;
public UdpClient(String host, Integer port) {
try {
this.address = InetAddress.getByName(host);
} catch (UnknownHostException e) {
log.error("UdpClient() - UnknownHostException: " + e.getMessage());
}
try {
this.socket = new DatagramSocket();
} catch (SocketException e) {
log.error("UdpClient() - Could not instantiate DatagramSocket: " + e.getMessage());
}
this.port = port;
public UdpClient(InetSocketAddress inetSocketAddress) throws SocketException {
this.inetSocketAddress = inetSocketAddress;
this.socket = new DatagramSocket();
}
public void send(String msg) {
byte[] buf = msg.getBytes(StandardCharsets.US_ASCII);
DatagramPacket packet = new DatagramPacket(buf, buf.length, address, port);
DatagramPacket packet = new DatagramPacket(buf, buf.length, inetSocketAddress.getAddress(), inetSocketAddress.getPort());
if(this.socket != null) {
try {
socket.send(packet);

View file

@ -24,6 +24,19 @@ class SyslogPrinterTest extends Specification {
output.contains("_structured-data")
}
void "test toLoki"() {
setup:
SyslogParser syslogParser = new SyslogParserRfc5424();
String input = '<13>1 2020-09-23T08:57:30.950699+02:00 xps13 mark - - [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] adfdfdf3432434565656'
SyslogMessage msg = syslogParser.parse(input)
when:
String output = SyslogPrinter.toLoki(msg)
then:
output == '{ "streams": [ { "stream": { "host": "xps13", "facility": "USER", "severity": "NOTICE", "application": "mark"}, "values": [ [ "1600845200000000000", "adfdfdf3432434565656" ] ] } ] }'
}
}