Switch to updated influxdb client.
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Mark Nellemann 2023-04-04 22:22:10 +02:00
parent 2967f6ef75
commit 6b9b78f32c
4 changed files with 39 additions and 43 deletions

View file

@ -2,6 +2,9 @@
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## 1.4.4 - 2023-04-xx
- Initial support for InfluxDB v2
## 1.4.3 - 2023-03-21 ## 1.4.3 - 2023-03-21
- Fix and improve processor utilization dashboards. - Fix and improve processor utilization dashboards.
- Minor code cleanup. - Minor code cleanup.

View file

@ -1,11 +1,8 @@
plugins { plugins {
id 'java' id 'java'
id 'jacoco'
id 'groovy' id 'groovy'
id 'application' id 'application'
// Code coverage of tests
id 'jacoco'
id "net.nemerosa.versioning" version "2.15.1" id "net.nemerosa.versioning" version "2.15.1"
id "com.netflix.nebula.ospackage" version "10.0.0" id "com.netflix.nebula.ospackage" version "10.0.0"
id "com.github.johnrengelman.shadow" version "7.1.2" id "com.github.johnrengelman.shadow" version "7.1.2"
@ -22,17 +19,16 @@ version = projectVersion
dependencies { dependencies {
annotationProcessor 'info.picocli:picocli-codegen:4.7.1' annotationProcessor 'info.picocli:picocli-codegen:4.7.1'
implementation 'info.picocli:picocli:4.7.1' implementation 'info.picocli:picocli:4.7.1'
implementation 'org.influxdb:influxdb-java:2.23'
//implementation 'com.influxdb:influxdb-client-java:6.7.0'
implementation 'org.slf4j:slf4j-api:2.0.6' implementation 'org.slf4j:slf4j-api:2.0.6'
implementation 'org.slf4j:slf4j-simple:2.0.6' implementation 'org.slf4j:slf4j-simple:2.0.6'
implementation 'com.squareup.okhttp3:okhttp:4.10.0' // Also used by InfluxDB Client implementation 'com.squareup.okhttp3:okhttp:4.10.0' // Also used by InfluxDB Client
implementation 'com.influxdb:influxdb-client-java:6.8.0'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2' implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.14.2' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.14.2'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-toml:2.14.2' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-toml:2.14.2'
testImplementation 'junit:junit:4.13.2' testImplementation 'junit:junit:4.13.2'
testImplementation 'org.spockframework:spock-core:2.3-groovy-3.0' testImplementation 'org.spockframework:spock-core:2.3-groovy-4.0'
testImplementation "org.mock-server:mockserver-netty-no-dependencies:5.14.0" testImplementation "org.mock-server:mockserver-netty-no-dependencies:5.14.0"
} }
@ -87,7 +83,7 @@ buildDeb {
} }
jacoco { jacoco {
toolVersion = "0.8.8" toolVersion = "0.8.9"
} }
jacocoTestReport { jacocoTestReport {

View file

@ -1,3 +1,3 @@
projectId = hmci projectId = hmci
projectGroup = biz.nellemann.hmci projectGroup = biz.nellemann.hmci
projectVersion = 1.4.3 projectVersion = 1.4.4

View file

@ -16,64 +16,61 @@
package biz.nellemann.hmci; package biz.nellemann.hmci;
import biz.nellemann.hmci.dto.toml.InfluxConfiguration; import biz.nellemann.hmci.dto.toml.InfluxConfiguration;
import org.influxdb.BatchOptions; import com.influxdb.client.InfluxDBClient;
import org.influxdb.InfluxDB; import com.influxdb.client.InfluxDBClientFactory;
import org.influxdb.InfluxDBFactory; import com.influxdb.client.WriteApi;
import org.influxdb.dto.Point; import com.influxdb.client.write.Point;
import com.influxdb.client.domain.WritePrecision;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep; import static java.lang.Thread.sleep;
public final class InfluxClient { public final class InfluxClient {
private final static Logger log = LoggerFactory.getLogger(InfluxClient.class); private final static Logger log = LoggerFactory.getLogger(InfluxClient.class);
final private String url; final private String url;
final private String username; final private String token;
final private String password; final private String org;
final private String database; final private String database; // Bucket in v2
private InfluxDBClient influxDBClient;
private WriteApi writeApi;
private InfluxDB influxDB;
InfluxClient(InfluxConfiguration config) { InfluxClient(InfluxConfiguration config) {
this.url = config.url; this.url = config.url;
this.username = config.username; this.token = config.username + ":" + config.password;
this.password = config.password; this.org = "hmci"; // In InfluxDB 1.x, there is no concept of organization.
this.database = config.database; this.database = config.database;
} }
synchronized void login() throws RuntimeException, InterruptedException { synchronized void login() throws RuntimeException, InterruptedException {
if(influxDB != null) { if(influxDBClient != null) {
return; return;
} }
boolean connected = false; boolean connected = false;
int loginErrors = 0; int loginErrors = 0;
do { do {
try { try {
log.debug("Connecting to InfluxDB - {}", url); log.debug("Connecting to InfluxDB - {}", url);
influxDB = InfluxDBFactory.connect(url, username, password).setDatabase(database); influxDBClient = InfluxDBClientFactory.create(url, token.toCharArray(), org, database);
influxDB.version(); // This ensures that we actually try to connect to the db influxDBClient.version(); // This ensures that we actually try to connect to the db
Runtime.getRuntime().addShutdownHook(new Thread(influxDBClient::close));
influxDB.enableBatch(
BatchOptions.DEFAULTS
.flushDuration(5000)
.threadFactory(runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
})
);
Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close));
// Todo: Handle events - https://github.com/influxdata/influxdb-client-java/tree/master/client#handle-the-events
writeApi = influxDBClient.makeWriteApi();
connected = true; connected = true;
} catch(Exception e) { } catch(Exception e) {
sleep(15 * 1000); sleep(15 * 1000);
@ -90,10 +87,10 @@ public final class InfluxClient {
synchronized void logoff() { synchronized void logoff() {
if(influxDB != null) { if(influxDBClient != null) {
influxDB.close(); influxDBClient.close();
} }
influxDB = null; influxDBClient = null;
} }
@ -101,7 +98,7 @@ public final class InfluxClient {
log.debug("write() - measurement: {} {}", name, measurements.size()); log.debug("write() - measurement: {} {}", name, measurements.size());
if(!measurements.isEmpty()) { if(!measurements.isEmpty()) {
processMeasurementMap(measurements, name).forEach((point) -> { processMeasurementMap(measurements, name).forEach((point) -> {
influxDB.write(point); writeApi.writePoint(point);
}); });
} }
} }
@ -111,11 +108,11 @@ public final class InfluxClient {
List<Point> listOfPoints = new ArrayList<>(); List<Point> listOfPoints = new ArrayList<>();
measurements.forEach( (m) -> { measurements.forEach( (m) -> {
log.trace("processMeasurementMap() - timestamp: {}, tags: {}, fields: {}", m.timestamp, m.tags, m.fields); log.trace("processMeasurementMap() - timestamp: {}, tags: {}, fields: {}", m.timestamp, m.tags, m.fields);
Point.Builder builder = Point.measurement(name) Point point = new Point(name)
.time(m.timestamp.getEpochSecond(), TimeUnit.SECONDS) .time(m.timestamp.getEpochSecond(), WritePrecision.S)
.tag(m.tags) .addTags(m.tags)
.fields(m.fields); .addFields(m.fields);
listOfPoints.add(builder.build()); listOfPoints.add(point);
}); });
return listOfPoints; return listOfPoints;
} }