Move lokiclient into it's own thread, to not block/delay udp forwards.

This commit is contained in:
Mark Nellemann 2021-03-27 15:23:34 +01:00
parent 76ee8abe91
commit 3b8231cf11
11 changed files with 193 additions and 70 deletions

View file

@ -1,3 +1,3 @@
id = syslogd
group = biz.nellemann.syslogd
version = 1.2.3
version = 1.2.4

View file

@ -16,10 +16,7 @@
package biz.nellemann.syslogd;
import biz.nellemann.syslogd.msg.SyslogMessage;
import biz.nellemann.syslogd.net.LokiClient;
import biz.nellemann.syslogd.net.TcpServer;
import biz.nellemann.syslogd.net.UdpClient;
import biz.nellemann.syslogd.net.UdpServer;
import biz.nellemann.syslogd.net.*;
import biz.nellemann.syslogd.parser.SyslogParser;
import biz.nellemann.syslogd.parser.SyslogParserRfc3164;
import biz.nellemann.syslogd.parser.SyslogParserRfc5424;
@ -32,21 +29,18 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.Callable;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Command(name = "syslogd",
mixinStandardHelpOptions = true,
versionProvider = biz.nellemann.syslogd.VersionProvider.class)
public class Application implements Callable<Integer>, LogListener {
public class Application implements Callable<Integer>, LogReceiveListener {
private boolean doForward = false;
private final List<LogForwardListener> logForwardListeners = new ArrayList<>();
private SyslogParser syslogParser;
private UdpClient udpClient;
private UdpClient gelfClient;
private LokiClient lokiClient;
@CommandLine.Option(names = {"-p", "--port"}, description = "Listening port [default: 514].", defaultValue = "514", paramLabel = "<num>")
@ -83,6 +77,7 @@ public class Application implements Callable<Integer>, LogListener {
@Override
public Integer call() throws IOException {
if(enableDebug) {
System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "DEBUG");
}
@ -95,8 +90,8 @@ public class Application implements Callable<Integer>, LogListener {
if(syslog != null) {
if(syslog.getScheme().toLowerCase(Locale.ROOT).equals("udp")) {
udpClient = new UdpClient(getInetSocketAddress(syslog));
doForward = true;
UdpClient udpClient = new UdpClient(getInetSocketAddress(syslog));
logForwardListeners.add(udpClient);
} else {
throw new UnsupportedOperationException("Forward protocol not implemented: " + syslog.getScheme());
}
@ -104,16 +99,18 @@ public class Application implements Callable<Integer>, LogListener {
if(gelf != null) {
if(gelf.getScheme().toLowerCase(Locale.ROOT).equals("udp")) {
gelfClient = new UdpClient(getInetSocketAddress(gelf));
doForward = true;
GelfClient gelfClient = new GelfClient(getInetSocketAddress(gelf));
logForwardListeners.add(gelfClient);
} else {
throw new UnsupportedOperationException("Forward protocol not implemented: " + gelf.getScheme());
}
}
if(loki != null) {
lokiClient = new LokiClient(loki);
doForward = true;
LokiClient lokiClient = new LokiClient(loki);
logForwardListeners.add(lokiClient);
Thread t = new Thread(lokiClient);
t.start();
}
if(udpServer) {
@ -133,7 +130,7 @@ public class Application implements Callable<Integer>, LogListener {
@Override
public void onLogEvent(LogEvent event) {
public void onLogEvent(LogReceiveEvent event) {
// Parse message
String message = event.getMessage();
@ -146,6 +143,10 @@ public class Application implements Callable<Integer>, LogListener {
if(msg != null) {
if(logForwardListeners.size() > 0) {
sendForwardEvent(msg);
}
if(stdout) {
if(ansiOutput) {
System.out.println(SyslogPrinter.toAnsiString(msg));
@ -154,30 +155,19 @@ public class Application implements Callable<Integer>, LogListener {
}
}
if(doForward) {
try {
if(udpClient != null) {
udpClient.send(SyslogPrinter.toRfc5424(msg));
}
if(gelfClient != null) {
gelfClient.send(SyslogPrinter.toGelf(msg));
}
if(lokiClient != null) {
lokiClient.send(SyslogPrinter.toLoki(msg));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private void sendForwardEvent(SyslogMessage message) {
LogForwardEvent event = new LogForwardEvent( this, message);
for (LogForwardListener listener : logForwardListeners) {
listener.onForwardEvent(event);
}
}
private InetSocketAddress getInetSocketAddress(URI input) {
InetSocketAddress inetSocketAddress = new InetSocketAddress(input.getHost(), input.getPort());
return inetSocketAddress;

View file

@ -0,0 +1,35 @@
/*
Copyright 2021 mark.nellemann@gmail.com
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package biz.nellemann.syslogd;
import biz.nellemann.syslogd.msg.SyslogMessage;
import java.util.EventObject;
public class LogForwardEvent extends EventObject {
private static final long serialVersionUID = 1L;
private final SyslogMessage message;
public LogForwardEvent(final Object source, final SyslogMessage message ) {
super( source );
this.message = message;
}
public SyslogMessage getMessage() {
return message;
}
}

View file

@ -0,0 +1,20 @@
/*
Copyright 2021 mark.nellemann@gmail.com
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package biz.nellemann.syslogd;
public interface LogForwardListener {
public void onForwardEvent(LogForwardEvent event);
}

View file

@ -16,14 +16,16 @@
package biz.nellemann.syslogd;
import biz.nellemann.syslogd.msg.SyslogMessage;
import java.util.EventObject;
public class LogEvent extends EventObject {
public class LogReceiveEvent extends EventObject {
private static final long serialVersionUID = 1L;
private final String message;
public LogEvent(final Object source, final String message ) {
public LogReceiveEvent(final Object source, final String message ) {
super( source );
this.message = message;
}

View file

@ -15,6 +15,6 @@
*/
package biz.nellemann.syslogd;
public interface LogListener {
public void onLogEvent(LogEvent event);
public interface LogReceiveListener {
public void onLogEvent(LogReceiveEvent event);
}

View file

@ -0,0 +1,28 @@
package biz.nellemann.syslogd.net;
import biz.nellemann.syslogd.LogForwardEvent;
import biz.nellemann.syslogd.SyslogPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.SocketException;
public class GelfClient extends UdpClient {
private final static Logger log = LoggerFactory.getLogger(GelfClient.class);
public GelfClient(InetSocketAddress inetSocketAddress) throws SocketException {
super(inetSocketAddress);
}
@Override
public void onForwardEvent(LogForwardEvent event) {
try {
send(SyslogPrinter.toGelf(event.getMessage()));
} catch (Exception e) {
log.warn("onForwardEvent() error", e);
}
}
}

View file

@ -15,18 +15,26 @@
*/
package biz.nellemann.syslogd.net;
import biz.nellemann.syslogd.LogForwardEvent;
import biz.nellemann.syslogd.LogForwardListener;
import biz.nellemann.syslogd.SyslogPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
public class LokiClient {
public class LokiClient implements LogForwardListener, Runnable {
private final static Logger log = LoggerFactory.getLogger(LokiClient.class);
private final ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(1024);
private final URL url;
private boolean keepRunning = true;
public LokiClient(URL url) {
@ -43,7 +51,7 @@ public class LokiClient {
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type", "application/json");
con.setConnectTimeout(500);
con.setReadTimeout(100);
con.setReadTimeout(150);
con.setDoOutput(true);
byte[] input = msg.getBytes(StandardCharsets.UTF_8);
@ -54,14 +62,14 @@ public class LokiClient {
}
int responseCode = con.getResponseCode();
if(responseCode != 204) {
log.warn("send() - response: " + responseCode);
log.debug("send() - msg: " + msg);
try (InputStream ignored = con.getInputStream()) {
if(responseCode != 204) {
log.warn("send() - response: " + responseCode);
}
}
} catch (IOException e) {
log.error("send() - " + e.getMessage());
log.error("send() - error: " + e.getMessage());
} finally {
if(con != null) {
con.disconnect();
@ -70,4 +78,24 @@ public class LokiClient {
}
@Override
public void run() {
while (keepRunning) {
try {
send(blockingQueue.take());
} catch (Exception e) {
log.warn(e.getMessage());
}
}
}
@Override
public void onForwardEvent(LogForwardEvent event) {
blockingQueue.offer(SyslogPrinter.toLoki(event.getMessage()));
}
}

View file

@ -15,8 +15,8 @@
*/
package biz.nellemann.syslogd.net;
import biz.nellemann.syslogd.LogEvent;
import biz.nellemann.syslogd.LogListener;
import biz.nellemann.syslogd.LogReceiveEvent;
import biz.nellemann.syslogd.LogReceiveListener;
import java.io.BufferedReader;
import java.io.IOException;
@ -54,13 +54,17 @@ public class TcpServer {
* Event Listener Configuration
*/
protected final List<LogListener> eventListeners = new ArrayList<>();
protected final List<LogReceiveListener> eventListeners = new ArrayList<>();
public synchronized void addEventListener( LogListener l ) {
eventListeners.add( l );
public synchronized void addEventListener(LogReceiveListener listener ) {
eventListeners.add( listener );
}
public synchronized void removeEventListener( LogListener l ) {
public synchronized void addEventListener(List<LogReceiveListener> listeners ) {
eventListeners.addAll(listeners);
}
public synchronized void removeEventListener( LogReceiveListener l ) {
eventListeners.remove( l );
}
@ -68,12 +72,12 @@ public class TcpServer {
private static class ClientHandler extends Thread {
protected final List<LogListener> eventListeners;
protected final List<LogReceiveListener> eventListeners;
private final Socket clientSocket;
private BufferedReader in;
public ClientHandler(Socket socket, List<LogListener> eventListeners) {
public ClientHandler(Socket socket, List<LogReceiveListener> eventListeners) {
this.clientSocket = socket;
this.eventListeners = eventListeners;
}
@ -101,8 +105,8 @@ public class TcpServer {
private synchronized void sendEvent(String message) {
LogEvent event = new LogEvent( this, message );
for (LogListener eventListener : eventListeners) {
LogReceiveEvent event = new LogReceiveEvent( this, message );
for (LogReceiveListener eventListener : eventListeners) {
eventListener.onLogEvent(event);
}
}

View file

@ -15,6 +15,9 @@
*/
package biz.nellemann.syslogd.net;
import biz.nellemann.syslogd.LogForwardEvent;
import biz.nellemann.syslogd.LogForwardListener;
import biz.nellemann.syslogd.SyslogPrinter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -22,7 +25,7 @@ import java.io.IOException;
import java.net.*;
import java.nio.charset.StandardCharsets;
public class UdpClient {
public class UdpClient implements LogForwardListener {
private final static Logger log = LoggerFactory.getLogger(UdpClient.class);
@ -49,4 +52,15 @@ public class UdpClient {
public void close() {
socket.close();
}
@Override
public void onForwardEvent(LogForwardEvent event) {
try {
send(SyslogPrinter.toRfc5424(event.getMessage()));
} catch (Exception e) {
log.warn("onForwardEvent() error", e);
}
}
}

View file

@ -15,9 +15,8 @@
*/
package biz.nellemann.syslogd.net;
import biz.nellemann.syslogd.Application;
import biz.nellemann.syslogd.LogEvent;
import biz.nellemann.syslogd.LogListener;
import biz.nellemann.syslogd.LogReceiveEvent;
import biz.nellemann.syslogd.LogReceiveListener;
import java.io.IOException;
import java.net.DatagramPacket;
@ -36,7 +35,6 @@ public class UdpServer extends Thread {
}
public UdpServer(int port) throws IOException {
super("SyslogServer");
socket = new DatagramSocket(port);
}
@ -59,8 +57,8 @@ public class UdpServer extends Thread {
}
private synchronized void sendEvent(String message) {
LogEvent event = new LogEvent( this, message);
for (LogListener eventListener : eventListeners) {
LogReceiveEvent event = new LogReceiveEvent( this, message);
for (LogReceiveListener eventListener : eventListeners) {
eventListener.onLogEvent(event);
}
}
@ -70,13 +68,17 @@ public class UdpServer extends Thread {
* Event Listener Configuration
*/
protected List<LogListener> eventListeners = new ArrayList<>();
protected List<LogReceiveListener> eventListeners = new ArrayList<>();
public synchronized void addEventListener(Application l ) {
eventListeners.add( l );
public synchronized void addEventListener(LogReceiveListener listener ) {
eventListeners.add( listener );
}
public synchronized void removeEventListener( LogListener l ) {
public synchronized void addEventListener(List<LogReceiveListener> listeners ) {
eventListeners.addAll(listeners);
}
public synchronized void removeEventListener( LogReceiveListener l ) {
eventListeners.remove( l );
}