From 3b8231cf1188e72ded9a418c0bfeffdb970fd474 Mon Sep 17 00:00:00 2001 From: Mark Nellemann Date: Sat, 27 Mar 2021 15:23:34 +0100 Subject: [PATCH] Move lokiclient into it's own thread, to not block/delay udp forwards. --- gradle.properties | 2 +- .../biz/nellemann/syslogd/Application.java | 64 ++++++++----------- .../nellemann/syslogd/LogForwardEvent.java | 35 ++++++++++ .../nellemann/syslogd/LogForwardListener.java | 20 ++++++ .../{LogEvent.java => LogReceiveEvent.java} | 6 +- ...gListener.java => LogReceiveListener.java} | 4 +- .../biz/nellemann/syslogd/net/GelfClient.java | 28 ++++++++ .../biz/nellemann/syslogd/net/LokiClient.java | 42 ++++++++++-- .../biz/nellemann/syslogd/net/TcpServer.java | 24 ++++--- .../biz/nellemann/syslogd/net/UdpClient.java | 16 ++++- .../biz/nellemann/syslogd/net/UdpServer.java | 22 ++++--- 11 files changed, 193 insertions(+), 70 deletions(-) create mode 100644 src/main/java/biz/nellemann/syslogd/LogForwardEvent.java create mode 100644 src/main/java/biz/nellemann/syslogd/LogForwardListener.java rename src/main/java/biz/nellemann/syslogd/{LogEvent.java => LogReceiveEvent.java} (83%) rename src/main/java/biz/nellemann/syslogd/{LogListener.java => LogReceiveListener.java} (87%) create mode 100644 src/main/java/biz/nellemann/syslogd/net/GelfClient.java diff --git a/gradle.properties b/gradle.properties index bcd09f8..4ee8126 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ id = syslogd group = biz.nellemann.syslogd -version = 1.2.3 +version = 1.2.4 diff --git a/src/main/java/biz/nellemann/syslogd/Application.java b/src/main/java/biz/nellemann/syslogd/Application.java index 71f4145..7a4d432 100644 --- a/src/main/java/biz/nellemann/syslogd/Application.java +++ b/src/main/java/biz/nellemann/syslogd/Application.java @@ -16,10 +16,7 @@ package biz.nellemann.syslogd; import biz.nellemann.syslogd.msg.SyslogMessage; -import biz.nellemann.syslogd.net.LokiClient; -import biz.nellemann.syslogd.net.TcpServer; -import biz.nellemann.syslogd.net.UdpClient; -import biz.nellemann.syslogd.net.UdpServer; +import biz.nellemann.syslogd.net.*; import biz.nellemann.syslogd.parser.SyslogParser; import biz.nellemann.syslogd.parser.SyslogParserRfc3164; import biz.nellemann.syslogd.parser.SyslogParserRfc5424; @@ -32,21 +29,18 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.net.URL; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.concurrent.Callable; -import java.util.regex.Matcher; -import java.util.regex.Pattern; @Command(name = "syslogd", mixinStandardHelpOptions = true, versionProvider = biz.nellemann.syslogd.VersionProvider.class) -public class Application implements Callable, LogListener { +public class Application implements Callable, LogReceiveListener { - private boolean doForward = false; + private final List logForwardListeners = new ArrayList<>(); private SyslogParser syslogParser; - private UdpClient udpClient; - private UdpClient gelfClient; - private LokiClient lokiClient; @CommandLine.Option(names = {"-p", "--port"}, description = "Listening port [default: 514].", defaultValue = "514", paramLabel = "") @@ -83,6 +77,7 @@ public class Application implements Callable, LogListener { @Override public Integer call() throws IOException { + if(enableDebug) { System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "DEBUG"); } @@ -95,8 +90,8 @@ public class Application implements Callable, LogListener { if(syslog != null) { if(syslog.getScheme().toLowerCase(Locale.ROOT).equals("udp")) { - udpClient = new UdpClient(getInetSocketAddress(syslog)); - doForward = true; + UdpClient udpClient = new UdpClient(getInetSocketAddress(syslog)); + logForwardListeners.add(udpClient); } else { throw new UnsupportedOperationException("Forward protocol not implemented: " + syslog.getScheme()); } @@ -104,16 +99,18 @@ public class Application implements Callable, LogListener { if(gelf != null) { if(gelf.getScheme().toLowerCase(Locale.ROOT).equals("udp")) { - gelfClient = new UdpClient(getInetSocketAddress(gelf)); - doForward = true; + GelfClient gelfClient = new GelfClient(getInetSocketAddress(gelf)); + logForwardListeners.add(gelfClient); } else { throw new UnsupportedOperationException("Forward protocol not implemented: " + gelf.getScheme()); } } if(loki != null) { - lokiClient = new LokiClient(loki); - doForward = true; + LokiClient lokiClient = new LokiClient(loki); + logForwardListeners.add(lokiClient); + Thread t = new Thread(lokiClient); + t.start(); } if(udpServer) { @@ -133,7 +130,7 @@ public class Application implements Callable, LogListener { @Override - public void onLogEvent(LogEvent event) { + public void onLogEvent(LogReceiveEvent event) { // Parse message String message = event.getMessage(); @@ -146,6 +143,10 @@ public class Application implements Callable, LogListener { if(msg != null) { + if(logForwardListeners.size() > 0) { + sendForwardEvent(msg); + } + if(stdout) { if(ansiOutput) { System.out.println(SyslogPrinter.toAnsiString(msg)); @@ -154,30 +155,19 @@ public class Application implements Callable, LogListener { } } - if(doForward) { - try { - - if(udpClient != null) { - udpClient.send(SyslogPrinter.toRfc5424(msg)); - } - - if(gelfClient != null) { - gelfClient.send(SyslogPrinter.toGelf(msg)); - } - - if(lokiClient != null) { - lokiClient.send(SyslogPrinter.toLoki(msg)); - } - - } catch (Exception e) { - e.printStackTrace(); - } - } } } + private void sendForwardEvent(SyslogMessage message) { + LogForwardEvent event = new LogForwardEvent( this, message); + for (LogForwardListener listener : logForwardListeners) { + listener.onForwardEvent(event); + } + } + + private InetSocketAddress getInetSocketAddress(URI input) { InetSocketAddress inetSocketAddress = new InetSocketAddress(input.getHost(), input.getPort()); return inetSocketAddress; diff --git a/src/main/java/biz/nellemann/syslogd/LogForwardEvent.java b/src/main/java/biz/nellemann/syslogd/LogForwardEvent.java new file mode 100644 index 0000000..a05bae6 --- /dev/null +++ b/src/main/java/biz/nellemann/syslogd/LogForwardEvent.java @@ -0,0 +1,35 @@ +/* + Copyright 2021 mark.nellemann@gmail.com + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package biz.nellemann.syslogd; + +import biz.nellemann.syslogd.msg.SyslogMessage; +import java.util.EventObject; + +public class LogForwardEvent extends EventObject { + + private static final long serialVersionUID = 1L; + private final SyslogMessage message; + + public LogForwardEvent(final Object source, final SyslogMessage message ) { + super( source ); + this.message = message; + } + + public SyslogMessage getMessage() { + return message; + } + +} diff --git a/src/main/java/biz/nellemann/syslogd/LogForwardListener.java b/src/main/java/biz/nellemann/syslogd/LogForwardListener.java new file mode 100644 index 0000000..8328842 --- /dev/null +++ b/src/main/java/biz/nellemann/syslogd/LogForwardListener.java @@ -0,0 +1,20 @@ +/* + Copyright 2021 mark.nellemann@gmail.com + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +package biz.nellemann.syslogd; + +public interface LogForwardListener { + public void onForwardEvent(LogForwardEvent event); +} diff --git a/src/main/java/biz/nellemann/syslogd/LogEvent.java b/src/main/java/biz/nellemann/syslogd/LogReceiveEvent.java similarity index 83% rename from src/main/java/biz/nellemann/syslogd/LogEvent.java rename to src/main/java/biz/nellemann/syslogd/LogReceiveEvent.java index 76650c3..e771210 100644 --- a/src/main/java/biz/nellemann/syslogd/LogEvent.java +++ b/src/main/java/biz/nellemann/syslogd/LogReceiveEvent.java @@ -16,14 +16,16 @@ package biz.nellemann.syslogd; +import biz.nellemann.syslogd.msg.SyslogMessage; + import java.util.EventObject; -public class LogEvent extends EventObject { +public class LogReceiveEvent extends EventObject { private static final long serialVersionUID = 1L; private final String message; - public LogEvent(final Object source, final String message ) { + public LogReceiveEvent(final Object source, final String message ) { super( source ); this.message = message; } diff --git a/src/main/java/biz/nellemann/syslogd/LogListener.java b/src/main/java/biz/nellemann/syslogd/LogReceiveListener.java similarity index 87% rename from src/main/java/biz/nellemann/syslogd/LogListener.java rename to src/main/java/biz/nellemann/syslogd/LogReceiveListener.java index 380bd8d..11ca0a2 100644 --- a/src/main/java/biz/nellemann/syslogd/LogListener.java +++ b/src/main/java/biz/nellemann/syslogd/LogReceiveListener.java @@ -15,6 +15,6 @@ */ package biz.nellemann.syslogd; -public interface LogListener { - public void onLogEvent(LogEvent event); +public interface LogReceiveListener { + public void onLogEvent(LogReceiveEvent event); } diff --git a/src/main/java/biz/nellemann/syslogd/net/GelfClient.java b/src/main/java/biz/nellemann/syslogd/net/GelfClient.java new file mode 100644 index 0000000..7bbe3ef --- /dev/null +++ b/src/main/java/biz/nellemann/syslogd/net/GelfClient.java @@ -0,0 +1,28 @@ +package biz.nellemann.syslogd.net; + +import biz.nellemann.syslogd.LogForwardEvent; +import biz.nellemann.syslogd.SyslogPrinter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.net.SocketException; + +public class GelfClient extends UdpClient { + + private final static Logger log = LoggerFactory.getLogger(GelfClient.class); + + public GelfClient(InetSocketAddress inetSocketAddress) throws SocketException { + super(inetSocketAddress); + } + + @Override + public void onForwardEvent(LogForwardEvent event) { + try { + send(SyslogPrinter.toGelf(event.getMessage())); + } catch (Exception e) { + log.warn("onForwardEvent() error", e); + } + + } +} diff --git a/src/main/java/biz/nellemann/syslogd/net/LokiClient.java b/src/main/java/biz/nellemann/syslogd/net/LokiClient.java index c27c7bc..1395706 100644 --- a/src/main/java/biz/nellemann/syslogd/net/LokiClient.java +++ b/src/main/java/biz/nellemann/syslogd/net/LokiClient.java @@ -15,18 +15,26 @@ */ package biz.nellemann.syslogd.net; +import biz.nellemann.syslogd.LogForwardEvent; +import biz.nellemann.syslogd.LogForwardListener; +import biz.nellemann.syslogd.SyslogPrinter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.*; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ArrayBlockingQueue; -public class LokiClient { +public class LokiClient implements LogForwardListener, Runnable { private final static Logger log = LoggerFactory.getLogger(LokiClient.class); + + private final ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(1024); private final URL url; + private boolean keepRunning = true; public LokiClient(URL url) { @@ -43,7 +51,7 @@ public class LokiClient { con.setRequestMethod("POST"); con.setRequestProperty("Content-Type", "application/json"); con.setConnectTimeout(500); - con.setReadTimeout(100); + con.setReadTimeout(150); con.setDoOutput(true); byte[] input = msg.getBytes(StandardCharsets.UTF_8); @@ -54,14 +62,14 @@ public class LokiClient { } int responseCode = con.getResponseCode(); - if(responseCode != 204) { - log.warn("send() - response: " + responseCode); - log.debug("send() - msg: " + msg); + try (InputStream ignored = con.getInputStream()) { + if(responseCode != 204) { + log.warn("send() - response: " + responseCode); + } } - } catch (IOException e) { - log.error("send() - " + e.getMessage()); + log.error("send() - error: " + e.getMessage()); } finally { if(con != null) { con.disconnect(); @@ -70,4 +78,24 @@ public class LokiClient { } + + @Override + public void run() { + + while (keepRunning) { + try { + send(blockingQueue.take()); + } catch (Exception e) { + log.warn(e.getMessage()); + } + } + + } + + + @Override + public void onForwardEvent(LogForwardEvent event) { + blockingQueue.offer(SyslogPrinter.toLoki(event.getMessage())); + } + } diff --git a/src/main/java/biz/nellemann/syslogd/net/TcpServer.java b/src/main/java/biz/nellemann/syslogd/net/TcpServer.java index ed66153..5f26a77 100644 --- a/src/main/java/biz/nellemann/syslogd/net/TcpServer.java +++ b/src/main/java/biz/nellemann/syslogd/net/TcpServer.java @@ -15,8 +15,8 @@ */ package biz.nellemann.syslogd.net; -import biz.nellemann.syslogd.LogEvent; -import biz.nellemann.syslogd.LogListener; +import biz.nellemann.syslogd.LogReceiveEvent; +import biz.nellemann.syslogd.LogReceiveListener; import java.io.BufferedReader; import java.io.IOException; @@ -54,13 +54,17 @@ public class TcpServer { * Event Listener Configuration */ - protected final List eventListeners = new ArrayList<>(); + protected final List eventListeners = new ArrayList<>(); - public synchronized void addEventListener( LogListener l ) { - eventListeners.add( l ); + public synchronized void addEventListener(LogReceiveListener listener ) { + eventListeners.add( listener ); } - public synchronized void removeEventListener( LogListener l ) { + public synchronized void addEventListener(List listeners ) { + eventListeners.addAll(listeners); + } + + public synchronized void removeEventListener( LogReceiveListener l ) { eventListeners.remove( l ); } @@ -68,12 +72,12 @@ public class TcpServer { private static class ClientHandler extends Thread { - protected final List eventListeners; + protected final List eventListeners; private final Socket clientSocket; private BufferedReader in; - public ClientHandler(Socket socket, List eventListeners) { + public ClientHandler(Socket socket, List eventListeners) { this.clientSocket = socket; this.eventListeners = eventListeners; } @@ -101,8 +105,8 @@ public class TcpServer { private synchronized void sendEvent(String message) { - LogEvent event = new LogEvent( this, message ); - for (LogListener eventListener : eventListeners) { + LogReceiveEvent event = new LogReceiveEvent( this, message ); + for (LogReceiveListener eventListener : eventListeners) { eventListener.onLogEvent(event); } } diff --git a/src/main/java/biz/nellemann/syslogd/net/UdpClient.java b/src/main/java/biz/nellemann/syslogd/net/UdpClient.java index 4f29ef9..c99610e 100644 --- a/src/main/java/biz/nellemann/syslogd/net/UdpClient.java +++ b/src/main/java/biz/nellemann/syslogd/net/UdpClient.java @@ -15,6 +15,9 @@ */ package biz.nellemann.syslogd.net; +import biz.nellemann.syslogd.LogForwardEvent; +import biz.nellemann.syslogd.LogForwardListener; +import biz.nellemann.syslogd.SyslogPrinter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +25,7 @@ import java.io.IOException; import java.net.*; import java.nio.charset.StandardCharsets; -public class UdpClient { +public class UdpClient implements LogForwardListener { private final static Logger log = LoggerFactory.getLogger(UdpClient.class); @@ -49,4 +52,15 @@ public class UdpClient { public void close() { socket.close(); } + + + @Override + public void onForwardEvent(LogForwardEvent event) { + try { + send(SyslogPrinter.toRfc5424(event.getMessage())); + } catch (Exception e) { + log.warn("onForwardEvent() error", e); + } + } + } diff --git a/src/main/java/biz/nellemann/syslogd/net/UdpServer.java b/src/main/java/biz/nellemann/syslogd/net/UdpServer.java index 850f23b..8439408 100644 --- a/src/main/java/biz/nellemann/syslogd/net/UdpServer.java +++ b/src/main/java/biz/nellemann/syslogd/net/UdpServer.java @@ -15,9 +15,8 @@ */ package biz.nellemann.syslogd.net; -import biz.nellemann.syslogd.Application; -import biz.nellemann.syslogd.LogEvent; -import biz.nellemann.syslogd.LogListener; +import biz.nellemann.syslogd.LogReceiveEvent; +import biz.nellemann.syslogd.LogReceiveListener; import java.io.IOException; import java.net.DatagramPacket; @@ -36,7 +35,6 @@ public class UdpServer extends Thread { } public UdpServer(int port) throws IOException { - super("SyslogServer"); socket = new DatagramSocket(port); } @@ -59,8 +57,8 @@ public class UdpServer extends Thread { } private synchronized void sendEvent(String message) { - LogEvent event = new LogEvent( this, message); - for (LogListener eventListener : eventListeners) { + LogReceiveEvent event = new LogReceiveEvent( this, message); + for (LogReceiveListener eventListener : eventListeners) { eventListener.onLogEvent(event); } } @@ -70,13 +68,17 @@ public class UdpServer extends Thread { * Event Listener Configuration */ - protected List eventListeners = new ArrayList<>(); + protected List eventListeners = new ArrayList<>(); - public synchronized void addEventListener(Application l ) { - eventListeners.add( l ); + public synchronized void addEventListener(LogReceiveListener listener ) { + eventListeners.add( listener ); } - public synchronized void removeEventListener( LogListener l ) { + public synchronized void addEventListener(List listeners ) { + eventListeners.addAll(listeners); + } + + public synchronized void removeEventListener( LogReceiveListener l ) { eventListeners.remove( l ); }