From 883963b03391d0fbdf5dbc91e86e98b4b5cb12c2 Mon Sep 17 00:00:00 2001 From: Mark Nellemann Date: Tue, 16 Mar 2021 22:01:11 +0100 Subject: [PATCH] Work on forwarding syslog messaged to Grafana Loki. --- README.md | 10 +++ .../biz/nellemann/syslogd/Application.java | 78 +++++++++++++------ .../biz/nellemann/syslogd/SyslogPrinter.java | 28 ++++++- .../biz/nellemann/syslogd/net/LokiClient.java | 73 +++++++++++++++++ .../biz/nellemann/syslogd/net/UdpClient.java | 23 ++---- .../syslogd/SyslogPrinterTest.groovy | 13 ++++ 6 files changed, 183 insertions(+), 42 deletions(-) create mode 100644 src/main/java/biz/nellemann/syslogd/net/LokiClient.java diff --git a/README.md b/README.md index c8bc71d..e8d84d2 100644 --- a/README.md +++ b/README.md @@ -63,3 +63,13 @@ If you don't want any output locally (only forwarding), you can use the ```--no- Syslog messages from AIX (and IBM Power Virtual I/O Servers) can be troublesome with some logging solutions. These can be received with syslogd and optionally forwarded on to Graylog, Splunk or other logging solutions. + + +## Development + + +### Test Grafana Loki + +```shell +docker run --rm -d --name=loki -p 3100:3100 grafana/loki +``` diff --git a/src/main/java/biz/nellemann/syslogd/Application.java b/src/main/java/biz/nellemann/syslogd/Application.java index 248d9b0..3b92d13 100644 --- a/src/main/java/biz/nellemann/syslogd/Application.java +++ b/src/main/java/biz/nellemann/syslogd/Application.java @@ -16,6 +16,7 @@ package biz.nellemann.syslogd; import biz.nellemann.syslogd.msg.SyslogMessage; +import biz.nellemann.syslogd.net.LokiClient; import biz.nellemann.syslogd.net.TcpServer; import biz.nellemann.syslogd.net.UdpClient; import biz.nellemann.syslogd.net.UdpServer; @@ -28,6 +29,7 @@ import picocli.CommandLine; import picocli.CommandLine.Command; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.concurrent.Callable; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -40,6 +42,8 @@ public class Application implements Callable, LogListener { private boolean doForward = false; private SyslogParser syslogParser; private UdpClient udpClient; + private UdpClient gelfClient; + private LokiClient lokiClient; @CommandLine.Option(names = {"-p", "--port"}, description = "Listening port [default: 514].", defaultValue = "514") @@ -60,11 +64,14 @@ public class Application implements Callable, LogListener { @CommandLine.Option(names = "--rfc5424", description = "Parse RFC-5424 messages [default: RFC-3164].", defaultValue = "false") private boolean rfc5424; - @CommandLine.Option(names = { "-f", "--forward"}, description = "Forward to UDP host[:port] (RFC-5424).", paramLabel = "") - private String forward; + @CommandLine.Option(names = { "-s", "--syslog"}, description = "Forward to Syslog UDP host[:port] (RFC-5424).", paramLabel = "") + private String syslog; - @CommandLine.Option(names = { "-g", "--gelf"}, description = "Forward in Graylog (GELF) JSON format.", defaultValue = "false") - private boolean gelf; + @CommandLine.Option(names = { "-g", "--gelf"}, description = "Forward to Graylog host[:port] (GELF).", paramLabel = "") + private String gelf; + + @CommandLine.Option(names = { "-l", "--loki"}, description = "Forward to Grafana Loki.", paramLabel = "") + private String loki; @CommandLine.Option(names = { "-d", "--debug" }, description = "Enable debugging [default: 'false'].") private boolean enableDebug = false; @@ -83,23 +90,18 @@ public class Application implements Callable, LogListener { syslogParser = new SyslogParserRfc3164(); } - if(forward != null && !forward.isEmpty()) { - String fHost, fPort; - Pattern pattern = Pattern.compile("^([^:]+)(?::([0-9]+))?$", Pattern.CASE_INSENSITIVE); - Matcher matcher = pattern.matcher(forward); - if(matcher.find()) { - fHost = matcher.group(1); - if(matcher.groupCount() == 2 && matcher.group(2) != null) { - fPort = matcher.group(2); - } else { - fPort = "514"; - } - } else { - fHost = "localhost"; - fPort = "514"; - } + if(syslog != null && !syslog.isEmpty()) { + udpClient = new UdpClient(getInetSocketAddress(syslog)); + doForward = true; + } - udpClient = new UdpClient(fHost, Integer.parseInt(fPort)); + if(gelf != null && !gelf.isEmpty()) { + gelfClient = new UdpClient(getInetSocketAddress(gelf)); + doForward = true; + } + + if(loki != null && !loki.isEmpty()) { + lokiClient = new LokiClient(loki); doForward = true; } @@ -143,11 +145,19 @@ public class Application implements Callable, LogListener { if(doForward) { try { - if(gelf) { - udpClient.send(SyslogPrinter.toGelf(msg)); - } else { + + if(udpClient != null) { udpClient.send(SyslogPrinter.toRfc5424(msg)); } + + if(gelfClient != null) { + gelfClient.send(SyslogPrinter.toGelf(msg)); + } + + if(lokiClient != null) { + lokiClient.send(SyslogPrinter.toLoki(msg)); + } + } catch (Exception e) { e.printStackTrace(); } @@ -157,6 +167,28 @@ public class Application implements Callable, LogListener { } + private InetSocketAddress getInetSocketAddress(String input) { + + String dstHost; + int dstPort; + InetSocketAddress inetSocketAddress = null; + + Pattern pattern = Pattern.compile("^([^:]+)(?::([0-9]+))?$", Pattern.CASE_INSENSITIVE); + Matcher matcher = pattern.matcher(input); + if(matcher.find()) { + dstHost = matcher.group(1); + if(matcher.groupCount() == 2 && matcher.group(2) != null) { + dstPort = Integer.parseInt(matcher.group(2)); + } else { + dstPort = 514; + } + inetSocketAddress = new InetSocketAddress(dstHost, dstPort); + } + + return inetSocketAddress; + } + + public static void main(String... args) { int exitCode = new CommandLine(new Application()).execute(args); System.exit(exitCode); diff --git a/src/main/java/biz/nellemann/syslogd/SyslogPrinter.java b/src/main/java/biz/nellemann/syslogd/SyslogPrinter.java index 7d478fb..0efb53c 100644 --- a/src/main/java/biz/nellemann/syslogd/SyslogPrinter.java +++ b/src/main/java/biz/nellemann/syslogd/SyslogPrinter.java @@ -81,7 +81,7 @@ public class SyslogPrinter { /** - * Return a GELF formatted string of the SyslogMessage. + * Return a GELF JSON formatted string of the SyslogMessage. * https://www.graylog.org/features/gelf * @param msg * @return @@ -104,6 +104,32 @@ public class SyslogPrinter { } + /** + * Return a Loki JSON formatted string of the SyslogMessage. + * https://grafana.com/docs/loki/latest/api/ + * @param msg + * @return + */ + +/* +{ "streams": [ { "stream": { "label": "value" }, "values": [ [ "", "" ], [ "", "" ] ] } ] } +{ "streams": [ { "stream": { "host": "hyperion", "facility": "USER", "severity": "NOTICE", "application": "mark"}, "values": [ [ "1615823598000000000", "Test 2345534343434" ] ] } ] } +{ "streams": [ { "stream": { "host": "hyperion", "facility": "USER", "severity": "NOTICE", "application": "mark"}, "values": [ [ "1615842165000000000", "Test" ] ] } ] } +*/ + + public static String toLoki(SyslogMessage msg) { + StringBuilder sb = new StringBuilder("{ \"streams\": [ { \"stream\": {"); + sb.append(String.format(" \"host\": \"%s\",", msg.hostname)); + sb.append(String.format(" \"facility\": \"%s\",", msg.facility)); + sb.append(String.format(" \"severity\": \"%s\",", msg.severity)); + sb.append(String.format(" \"application\": \"%s\"", msg.application)); + sb.append("}, \"values\": [ "); + sb.append(String.format("[ \"%d\", \"%s\" ]", msg.timestamp.getEpochSecond() * 1000000000l, msg.message)); + sb.append(" ] } ] }"); + return sb.toString(); + } + + static private String getPri(Facility facility, Severity severity) { int pri = (facility.toNumber() * 8) + severity.toNumber(); return String.format("%c%d%c", '<', pri, '>'); diff --git a/src/main/java/biz/nellemann/syslogd/net/LokiClient.java b/src/main/java/biz/nellemann/syslogd/net/LokiClient.java new file mode 100644 index 0000000..800f022 --- /dev/null +++ b/src/main/java/biz/nellemann/syslogd/net/LokiClient.java @@ -0,0 +1,73 @@ +package biz.nellemann.syslogd.net; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.*; +import java.nio.charset.StandardCharsets; + +public class LokiClient { + + private final static Logger log = LoggerFactory.getLogger(LokiClient.class); + + private final URL url; + + + public LokiClient(String url) throws MalformedURLException { + this.url = new URL(url); + } + + + public void send(String msg) throws MalformedURLException { + + URL pushUrl = new URL(url, "/loki/api/v1/push"); + log.warn("send() - URL: " + pushUrl.toString()); + + HttpURLConnection con = null; + try { + con = (HttpURLConnection)pushUrl.openConnection(); + con.setRequestMethod("POST"); + con.setRequestProperty("Content-Type", "application/json"); + //con.setRequestProperty("Content-Type", "application/json; utf-8"); + //con.setRequestProperty("Accept", "application/json"); + con.setDoOutput(true); + + byte[] input = msg.getBytes(StandardCharsets.US_ASCII); + try(OutputStream os = con.getOutputStream()) { + os.write(input, 0, input.length); + log.warn("send() - Data: " + msg); + } catch (IOException e) { + e.printStackTrace(); + } + + StringBuilder content; + + try (BufferedReader br = new BufferedReader( + new InputStreamReader(con.getInputStream()))) { + + String line; + content = new StringBuilder(); + + while ((line = br.readLine()) != null) { + content.append(line); + content.append(System.lineSeparator()); + } + } + + System.out.println(content.toString()); + + } catch (IOException e) { + log.warn("send() - " + e.getMessage()); + } finally { + if(con != null) { + con.disconnect(); + } + } + + } + +} diff --git a/src/main/java/biz/nellemann/syslogd/net/UdpClient.java b/src/main/java/biz/nellemann/syslogd/net/UdpClient.java index 69659c5..76e1a03 100644 --- a/src/main/java/biz/nellemann/syslogd/net/UdpClient.java +++ b/src/main/java/biz/nellemann/syslogd/net/UdpClient.java @@ -11,30 +11,17 @@ public class UdpClient { private final static Logger log = LoggerFactory.getLogger(UdpClient.class); + private InetSocketAddress inetSocketAddress; private DatagramSocket socket; - private InetAddress address; - private final Integer port; - public UdpClient(String host, Integer port) { - - try { - this.address = InetAddress.getByName(host); - } catch (UnknownHostException e) { - log.error("UdpClient() - UnknownHostException: " + e.getMessage()); - } - - try { - this.socket = new DatagramSocket(); - } catch (SocketException e) { - log.error("UdpClient() - Could not instantiate DatagramSocket: " + e.getMessage()); - } - - this.port = port; + public UdpClient(InetSocketAddress inetSocketAddress) throws SocketException { + this.inetSocketAddress = inetSocketAddress; + this.socket = new DatagramSocket(); } public void send(String msg) { byte[] buf = msg.getBytes(StandardCharsets.US_ASCII); - DatagramPacket packet = new DatagramPacket(buf, buf.length, address, port); + DatagramPacket packet = new DatagramPacket(buf, buf.length, inetSocketAddress.getAddress(), inetSocketAddress.getPort()); if(this.socket != null) { try { socket.send(packet); diff --git a/src/test/groovy/biz/nellemann/syslogd/SyslogPrinterTest.groovy b/src/test/groovy/biz/nellemann/syslogd/SyslogPrinterTest.groovy index 0b16366..adc03fc 100644 --- a/src/test/groovy/biz/nellemann/syslogd/SyslogPrinterTest.groovy +++ b/src/test/groovy/biz/nellemann/syslogd/SyslogPrinterTest.groovy @@ -24,6 +24,19 @@ class SyslogPrinterTest extends Specification { output.contains("_structured-data") } + void "test toLoki"() { + setup: + SyslogParser syslogParser = new SyslogParserRfc5424(); + String input = '<13>1 2020-09-23T08:57:30.950699+02:00 xps13 mark - - [exampleSDID@32473 iut="3" eventSource="Application" eventID="1011"] adfdfdf3432434565656' + SyslogMessage msg = syslogParser.parse(input) + + when: + String output = SyslogPrinter.toLoki(msg) + + then: + output == '{ "streams": [ { "stream": { "host": "xps13", "facility": "USER", "severity": "NOTICE", "application": "mark"}, "values": [ [ "1600845200000000000", "adfdfdf3432434565656" ] ] } ] }' + } + }