hmci/src/main/java/biz/nellemann/hmci/InfluxClient.java

151 lines
5.0 KiB
Java

/*
* Copyright 2020 Mark Nellemann <mark.nellemann@gmail.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package biz.nellemann.hmci;
import biz.nellemann.hmci.dto.toml.InfluxConfiguration;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
public final class InfluxClient {
private final static Logger log = LoggerFactory.getLogger(InfluxClient.class);
final private String url;
final private String username;
final private String password;
final private String database;
private InfluxDB influxDB;
InfluxClient(InfluxConfiguration config) {
this.url = config.url;
this.username = config.username;
this.password = config.password;
this.database = config.database;
}
synchronized void login() throws RuntimeException, InterruptedException {
if(influxDB != null) {
return;
}
boolean connected = false;
int loginErrors = 0;
do {
try {
log.debug("Connecting to InfluxDB - {}", url);
influxDB = InfluxDBFactory.connect(url, username, password).setDatabase(database);
influxDB.version(); // This ensures that we actually try to connect to the db
influxDB.enableBatch(
BatchOptions.DEFAULTS
.threadFactory(runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
})
);
Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close));
connected = true;
} catch(Exception e) {
sleep(15 * 1000);
if(loginErrors++ > 3) {
log.error("login() - error, giving up: {}", e.getMessage());
throw new RuntimeException(e);
} else {
log.warn("login() - error, retrying: {}", e.getMessage());
}
}
} while(!connected);
}
synchronized void logoff() {
if(influxDB != null) {
influxDB.close();
}
influxDB = null;
}
public void write(List<Measurement> measurements, Instant timestamp, String measurement) {
log.debug("write() - measurement: {} {}", measurement, measurements.size());
processMeasurementMap(measurements, timestamp, measurement).forEach( (point) -> { influxDB.write(point); });
}
private List<Point> processMeasurementMap(List<Measurement> measurements, Instant timestamp, String measurement) {
List<Point> listOfPoints = new ArrayList<>();
measurements.forEach( (m) -> {
Point.Builder builder = Point.measurement(measurement)
.time(timestamp.toEpochMilli(), TimeUnit.MILLISECONDS)
.tag(m.tags)
.fields(m.fields);
/*
// Iterate fields
m.fields.forEach((fieldName, fieldValue) -> {
log.info("processMeasurementMap() {} - fieldName: {}, fieldValue: {}", measurement, fieldName, fieldValue);
if(fieldValue instanceof Number) {
Number num = (Number) fieldValue;
builder.addField(fieldName, num);
} else if(fieldValue instanceof Boolean) {
Boolean bol = (Boolean) fieldValue;
builder.addField(fieldName, bol);
} else {
String str = (String) fieldValue;
builder.addField(fieldName, str);
}
});
// Iterate sorted tags
Map<String, String> sortedTags = new TreeMap<>(m.tags);
sortedTags.forEach((tagName, tagValue) -> {
log.info("processMeasurementMap() {} - tagName: {}, tagValue: {}", measurement, tagName, tagValue);
builder.tag(tagName, tagValue);
});
*/
/*
if(m.fields.size() > 0 && m.tags.size() > 0) {
listOfPoints.add(builderbuilder.build());
}*/
listOfPoints.add(builder.build());
});
return listOfPoints;
}
}