Work on chunked GELF messages.

This commit is contained in:
Mark Nellemann 2022-12-13 16:59:28 +01:00
parent 4cc11b0587
commit 5104bd0750
5 changed files with 108 additions and 43 deletions

View file

@ -19,9 +19,9 @@ dependencies {
implementation 'org.slf4j:slf4j-simple:2.0.5'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.1'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.1'
implementation 'org.apache.commons:commons-collections4:4.4'
testImplementation 'org.spockframework:spock-core:2.3-groovy-3.0'
testImplementation 'org.slf4j:slf4j-api:2.0.5'
}
application {

View file

@ -17,6 +17,7 @@ package biz.nellemann.syslogd;
import java.net.DatagramPacket;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.EventObject;
public class LogReceiveEvent extends EventObject {
@ -42,7 +43,7 @@ public class LogReceiveEvent extends EventObject {
}
public byte[] getBytes() {
return packet.getData();
return Arrays.copyOfRange(packet.getData(), packet.getOffset(), packet.getLength());
}
}

View file

@ -4,13 +4,12 @@ import biz.nellemann.syslogd.msg.SyslogMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.commons.collections4.map.PassiveExpiringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.time.Instant;
import java.util.Arrays;
import java.util.zip.DataFormatException;
import java.util.*;
public class GelfParser extends SyslogParser {
@ -18,12 +17,63 @@ public class GelfParser extends SyslogParser {
private final ObjectMapper objectMapper;
private final int expiryInMills = 10_000;
private final PassiveExpiringMap<Integer, TreeMap<Integer, byte[]>> expiringMap = new PassiveExpiringMap<>(expiryInMills);
public GelfParser() {
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
}
/*
Magic Bytes - 2 bytes: 0x1e 0x0f
Message ID - 8 bytes: Must be the same for every chunk of this message.
Identifies the whole message and is used to reassemble the chunks later.
Generate from millisecond timestamp + hostname, for example.
Sequence number - 1 byte: The sequence number of this chunk starts at 0 and is always less than the sequence count.
Sequence count - 1 byte: Total number of chunks this message has.
All chunks MUST arrive within 5 seconds or the server will discard all chunks that have arrived or are in the process of arriving. A message MUST NOT consist of more than 128 chunks.
*/
private SyslogMessage parseChunked(byte[] input) {
if(input.length < 12) return null;
byte[] messageId = { input[2], input[3], input[4], input[5], input[6], input[7], input[8], input[9] };
byte seqNumber = input[10];
byte seqTotal = input[11];
byte[] payload = Arrays.copyOfRange(input, 12, input.length);
log.debug("parseChunked() - msgId: {}, seqNo: {}, seqTot: {}, payload: {}", messageId, seqNumber, seqTotal, byteArrayToString(payload));
// messageId byte[] to int
int id = 0;
for (byte b : messageId) {
id = (id << 8) + (b & 0xFF);
}
TreeMap<Integer, byte[]> integerTreeMap;
if(expiringMap.containsKey(id)) {
integerTreeMap = expiringMap.get(id);
} else {
integerTreeMap = new TreeMap<>();
}
integerTreeMap.put((int)seqNumber, payload);
expiringMap.put(id, integerTreeMap);
if(seqNumber+1 >= seqTotal) {
StringBuilder sb = new StringBuilder();
integerTreeMap.forEach( (i, p) -> {
sb.append(byteArrayToString(p));
});
return parse(sb.toString());
}
return null;
}
@Override
public SyslogMessage parse(String input) {
SyslogMessage message = null;
@ -56,34 +106,25 @@ public class GelfParser extends SyslogParser {
@Override
public SyslogMessage parse(byte[] input) {
if(input.length < 8) return null; // TODO: Find proper minimum input length ?
String text;
// GELF Magic Bytes: 0x1e 0x0f
if(input[0] == (byte)0x1e && input[1] == (byte)0x0f) {
/*
Message ID - 8 bytes: Must be the same for every chunk of this message.
Identifies the whole message and is used to reassemble the chunks later.
Generate from millisecond timestamp + hostname, for example.
Sequence number - 1 byte: The sequence number of this chunk starts at 0 and is always less than the sequence count.
Sequence count - 1 byte: Total number of chunks this message has.
All chunks MUST arrive within 5 seconds or the server will discard all chunks that have arrived or are in the process of arriving. A message MUST NOT consist of more than 128 chunks.
*/
log.warn("parse() - Found Magic Bytes, can't parse yet.");
byte[] newInput = Arrays.copyOfRange(input, 12, input.length);
//return parse(byteArrayToString(newInput));
return null;
for(byte b : input) {
if(b > 0x0) {
System.out.printf("%d, ", (b & 0xff));
}
}
System.out.println();
if(input.length < 8) return null; // TODO: Find proper minimum input length ?
// Compressed data: 0x78 0x9c
if(input[0] == (byte)0x78 && input[1] == (byte)0x9c) { // TODO: better detection ?
try {
return parse(decompress(input));
} catch (DataFormatException | UnsupportedEncodingException e) {
log.error("parse() - error: {}", e.getMessage());
return null;
}
if(input[0] == (byte)0x78 && input[1] == (byte)0x9c) {
input = decompress(input);
}
// Magic Bytes: 0x1e 0x0f
if(input[0] == (byte)0x1e && input[1] == (byte)0x0f) {
return parseChunked(input);
}
return parse(byteArrayToString(input));

View file

@ -16,6 +16,8 @@
package biz.nellemann.syslogd.parser;
import biz.nellemann.syslogd.msg.SyslogMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
@ -25,6 +27,8 @@ import java.util.zip.Inflater;
public abstract class SyslogParser {
private final static Logger log = LoggerFactory.getLogger(SyslogParser.class);
public abstract SyslogMessage parse(final String input);
public abstract SyslogMessage parse(final byte[] input);
@ -62,17 +66,24 @@ public abstract class SyslogParser {
}
protected String decompress(byte[] data) throws UnsupportedEncodingException, DataFormatException {
protected byte[] decompress(byte[] data) {
// Decompress the bytes
Inflater decompressor = new Inflater();
decompressor.setInput(data, 0, data.length);
byte[] result = new byte[data.length * 2];
int resultLength = decompressor.inflate(result);
decompressor.end();
try {
// Decompress the bytes
Inflater decompressor = new Inflater();
decompressor.setInput(data, 0, data.length);
//byte[] result = new byte[data.length * 2];
int resultLength = decompressor.inflate(result);
decompressor.end();
// Decode the bytes into a String
return new String(result, 0, resultLength, StandardCharsets.UTF_8);
// Decode the bytes into a String
//uncompressed = new String(result, 0, resultLength, StandardCharsets.UTF_8);
} catch (DataFormatException e) {
log.error("decompress() - error: {}", e.getMessage());
}
return result;
}
}

View file

@ -5,6 +5,7 @@ import biz.nellemann.syslogd.msg.Severity
import biz.nellemann.syslogd.msg.SyslogMessage
import biz.nellemann.syslogd.parser.GelfParser
import biz.nellemann.syslogd.parser.SyslogParser
import spock.lang.Ignore
import spock.lang.Specification
@ -18,9 +19,6 @@ class GelfParserTest extends Specification {
void "uncompressed GELF message"() {
/*
0x7b 0x220x760x650x720x730x690x6f0x6e0x220x3a0x220x310x2e0x310x220x2c0x220x680x6f0x730x740x220x3a0x220x700x6f0x700x2d0x6f0x730x2e0x6c0x6f0x630x610x6c0x640x6f0x6d0x610x690x6e0x220x2c0x220x730x680x6f0x720x740x5f0x6d0x650x730x730x610x670x650x220x3a0x220x6d0x610x690x6e0x280x290x200x2d0x200x530x740x610x720x740x690x6e0x670x200x560x540x440x2d0x430x610x6d0x650x720x610x220x2c0x220x660x750x6c0x6c0x5f0x6d0x650x730x730x610x670x650x220x3a0x220x6d0x610x690x6e0x280x290x200x2d0x200x530x740x610x720x740x690x6e0x670x200x560x540x440x2d0x430x610x6d0x650x720x610x5c0x5c0x6e0x220x2c0x220x740x690x6d0x650x730x740x610x6d0x700x220x3a0x310x360x370x300x330x350x370x370x380x330x2e0x360x390x340x2c0x220x6c0x650x760x650x6c0x220x3a0x340x2c0x220x5f0x740x680x720x650x610x640x5f0x6e0x610x6d0x650x220x3a0x220x6d0x610x690x6e0x220x2c0x220x5f0x6c0x6f0x670x670x650x720x5f0x6e0x610x6d0x650x220x3a0x220x760x740x640x2e0x630x610x6d0x650x720x610x2e0x410x700x700x6c0x690x630x610x740x690x6f0x6e0x220x7d0x0a0x560x1f0x210x470x710x2c0xeb0x090x740x8e0x060x6f0x7f0xe70x7a0x330x890xb80xe10x6e0xa40x090xe90x9f0x870x1c0x290x900xcd0x880x4b0xea0xce0x9b0xf30x660xbc0x880x7b0x6f0x410x610x8f0x4c0xf00x570xaa0x590x620x180x8b0x820x730x040x350x3e0x120x8b0x930x7b0x130x780x5b0xb50x150x870x4f0xd00x5c0xb60xec0xfb0x070x9a0x690xc70xe8
*/
setup:
def input = '{"version":"1.1","host":"pop-os.localdomain","short_message":"main() - Starting VTD-Camera","full_message":"main() - Starting VTD-Camera\\n","timestamp":1670357783.694,"level":4,"_thread_name":"main","_logger_name":"vtd.camera.Application"}'
@ -39,9 +37,6 @@ class GelfParserTest extends Specification {
void "compressed GELF message"() {
// GELF Magic Bytes: 0x1e, 0x0f
setup:
byte[] input = [ 120, -100, -51, 81, 77, 107, -61, 48, 12, -3, 43, -63, -25, -38, -115, -77, -75, -51, 2, -127, -11, -80, -61, 96, -73, 94, 11, -63, 56, 90, -30, -59, 31, -63, -106, -53, -54, -40, 127, -97, 92, 86, -40, 79, -40, 69, -78, -97, -11, -12, -98, -28, 47, 118, -127, -104, 76, -16, -84, 99, 82, 72, -74, 97, 115, 72, 72, 23, -77, 114, 89, 115, 73, 65, -14, 102, 87, 11, -56, 92, -125, -57, -88, 44, -105, 66, 7, -73, 102, 4, 97, 60, 66, -12, -54, 18, 45, -51, 33, -30, -32, 32, 37, 53, 1, -15, -31, 66, -43, -3, 49, -29, 76, -39, 104, -123, -92, 113, -54, 90, 83, -63, 75, 121, -86, 114, 42, 84, 7, -67, 83, 113, 17, 30, -84, 5, -89, -68, 127, -98, -100, 50, -74, 40, 84, 8, 94, 81, -113, -90, -118, -32, 2, -62, 113, 28, 35, -79, 123, -39, -18, -124, 108, -91, 104, -102, 7, 33, -27, -95, 74, 4, 82, -13, -41, -79, -9, -39, 22, 43, -17, -108, -2, -127, -109, -77, 39, 47, 104, -56, 8, 42, -73, -78, 78, -18, 15, -11, -82, -106, -121, -89, 86, -44, -113, -5, 13, -77, 100, -52, -78, -114, 78, -125, 90, -41, 65, -121, 76, -21, -67, -110, -31, 113, 97, -65, 88, 113, 69, -128, -93, 61, -57, -126, -39, 48, 77, 16, -17, -80, -6, 8, 57, -118, 99, -119, 39, -48, 57, 26, -68, -2, -99, -21, -51, 36, -14, 13, 55, 34, 77, 72, -1, 60, -28, 72, -126, 108, 70, 92, 83, 119, -34, -98, -73, -29, 34, 110, -67, 5, -119, -35, 53, -63, 95, -88, 102, -115, 97, 44, 8, -50, 17, -44, 120, 87, 44, 76, -18, 77, -32, 109, -35, -42, 28, 62, 65, -13, -122, 125, -1, 0, -40, 60, -57, -72 ];
@ -59,4 +54,21 @@ class GelfParserTest extends Specification {
msg.timestamp.toString() == "2022-12-08T12:16:38.046Z"
}
@Ignore
void "chunked GELF message"() {
setup:
byte[] chunk1 = [ 0x1E, 0x0F, 0x5A, 0x6D, 0x46, 0x28, 0x20, 0x02, 0x7B, 0x22, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6F, 0x6E, 0x22, 0x3A, 0x22, 0x31, 0x2E, 0x31, 0x22, 0x2C, 0x22, 0x68, 0x6F, 0x73, 0x74, 0x22, 0x3A, 0x22, 0x69, 0x70, 0x2D, 0x31, 0x30, 0x2D, 0x31, 0x2D, 0x31, 0x30, 0x32, 0x2D, 0x37, 0x35, 0x2E, 0x65, 0x75, 0x2D, 0x63, 0x65, 0x6E, 0x74, 0x72, 0x61, 0x6C, 0x2D, 0x31, 0x2E, 0x63, 0x6F, 0x6D, 0x70, 0x75, 0x74, 0x65, 0x2E, 0x69, 0x6E, 0x74, 0x65, 0x72, 0x6E, 0x61, 0x6C, 0x22, 0x2C, 0x22, 0x73, 0x68, 0x6F, 0x72, 0x74, 0x5F, 0x6D, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x3A, 0x22, 0x65, 0x76, 0x65, 0x6E, 0x74, 0x3D, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6E, 0x74, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x45, 0x76, 0x65, 0x6E, 0x74, 0x20, 0x75, 0x73, 0x65, 0x72, 0x6E, 0x61, 0x6D, 0x65, 0x3D, 0x6D, 0x61, 0x72, 0x6B, 0x2E, 0x6E, 0x65, 0x6C, 0x6C, 0x65, 0x6D, 0x61, 0x6E, 0x6E, 0x40, 0x67, 0x6D, 0x61, 0x69, 0x6C, 0x2E, 0x63, 0x6F, 0x6D, 0x20, 0x74, 0x65, 0x6E, 0x61, 0x6E, 0x74, 0x3D, 0x33, 0x20, 0x72, 0x65, 0x6D, 0x6F, 0x74, 0x65, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x3D, 0x31, 0x32, 0x39, 0x2E, 0x34, 0x31, 0x2E, 0x34, 0x36, 0x2E, 0x37, 0x20, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6F, 0x6E, 0x49, 0x64, 0x3D, 0x32, 0x37, 0x38, 0x33, 0x35, 0x38, 0x43, 0x33, 0x33, 0x31, 0x45, 0x31, 0x41, 0x44, 0x30, 0x36, 0x36, 0x32, 0x42, 0x38, 0x37, 0x38, 0x43, 0x34, 0x37, 0x32, 0x46, 0x30, 0x32, 0x39, 0x30, 0x41, 0x22, 0x2C, 0x22, 0x66, 0x75, 0x6C, 0x6C, 0x5F, 0x6D, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x3A, 0x22, 0x65, 0x76, 0x65, 0x6E, 0x74, 0x3D, 0x41, 0x75, 0x74, 0x68, 0x65, 0x6E, 0x74, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x45, 0x76, 0x65, 0x6E, 0x74, 0x20, 0x75, 0x73, 0x65, 0x72, 0x6E, 0x61, 0x6D, 0x65, 0x3D, 0x6D, 0x61, 0x72, 0x6B, 0x2E, 0x6E, 0x65, 0x6C, 0x6C, 0x65, 0x6D, 0x61, 0x6E, 0x6E, 0x40, 0x67, 0x6D, 0x61, 0x69, 0x6C, 0x2E, 0x63, 0x6F, 0x6D, 0x20, 0x74, 0x65, 0x6E, 0x61, 0x6E, 0x74, 0x3D, 0x33, 0x20, 0x72, 0x65, 0x6D, 0x6F, 0x74, 0x65, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x3D, 0x31, 0x32, 0x39, 0x2E, 0x34, 0x31, 0x2E, 0x34, 0x36, 0x2E, 0x37, 0x20, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6F, 0x6E, 0x49, 0x64, 0x3D, 0x32, 0x37, 0x38, 0x33, 0x35, 0x38, 0x43, 0x33, 0x33, 0x31, 0x45, 0x31, 0x41, 0x44, 0x30, 0x36, 0x36, 0x32, 0x42, 0x38, 0x37, 0x38, 0x43, 0x34, 0x37, 0x32, 0x46, 0x30, 0x32, 0x39, 0x30, 0x41, 0x5C, 0x6E, 0x22, 0x2C, 0x22, 0x74, 0x69, 0x6D, 0x65, 0x73, 0x74, 0x61, 0x6D, 0x70, 0x22, 0x3A, 0x31, 0x36, 0x37, 0x30, 0x39, 0x34, 0x36, 0x32, 0x32, 0x33, 0x2E, 0x36, 0x36, 0x30, 0x2C, 0x22, 0x6C, 0x65, 0x76, 0x65, 0x6C, 0x22, 0x3A, 0x36, 0x2C, 0x22, 0x5F, 0x61, 0x70, 0x70, 0x5F, 0x63, 0x6F, 0x75, 0x6E, 0x74, 0x72, 0x79, 0x22, 0x3A, 0x22, 0x64, 0x6B, 0x22, 0x2C, 0x22, 0x5F, 0x61, 0x70, 0x70, 0x5F, 0x6E, 0x61, 0x6D, 0x65, 0x22, 0x3A, 0x22, 0x6D, 0x69, 0x6E, 0x74, 0x72, 0x22, 0x2C, 0x22, 0x5F, 0x6C, 0x6F, 0x67, 0x67, 0x65, 0x72, 0x5F, 0x6E, 0x61, 0x6D, 0x65, 0x22, 0x3A, 0x22, 0x61, 0x6A, 0x6F, 0x75, 0x72 ]
byte[] chunk2 = [ 0x1E, 0x0F, 0x5A, 0x6D, 0x46, 0x28, 0x20, 0x01, 0x02, 0x2E, 0x41, 0x6A, 0x6F, 0x75, 0x72, 0x53, 0x65, 0x63, 0x75, 0x72, 0x69, 0x74, 0x79, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x45, 0x76, 0x65, 0x6E, 0x74, 0x4C, 0x69, 0x73, 0x74, 0x65, 0x6E, 0x65, 0x72, 0x22, 0x2C, 0x22, 0x5F, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5F, 0x75, 0x72, 0x6C, 0x22, 0x3A, 0x22, 0x68, 0x74, 0x74, 0x70, 0x73, 0x3A, 0x5C, 0x2F, 0x5C, 0x2F, 0x74, 0x65, 0x73, 0x74, 0x2E, 0x6D, 0x69, 0x6E, 0x74, 0x72, 0x2E, 0x61, 0x70, 0x70, 0x22, 0x2C, 0x22, 0x5F, 0x61, 0x70, 0x70, 0x5F, 0x65, 0x6E, 0x76, 0x22, 0x3A, 0x22, 0x74, 0x65, 0x73, 0x74, 0x22, 0x2C, 0x22, 0x5F, 0x74, 0x68, 0x72, 0x65, 0x61, 0x64, 0x5F, 0x6E, 0x61, 0x6D, 0x65, 0x22, 0x3A, 0x22, 0x68, 0x74, 0x74, 0x70, 0x2D, 0x6E, 0x69, 0x6F, 0x2D, 0x38, 0x30, 0x38, 0x30, 0x2D, 0x65, 0x78, 0x65, 0x63, 0x2D, 0x38, 0x22, 0x7D ]
when:
syslogParser.parse(chunk1)
SyslogMessage msg = syslogParser.parse(chunk2)
then:
msg.message == "fobar"
}
}