Correct use of InfluxDB batch writing.

This commit is contained in:
Mark Nellemann 2022-03-04 18:51:29 +01:00
parent 28b4cdbea8
commit 1bbe713fc2
5 changed files with 92 additions and 39 deletions

View File

@ -2,6 +2,10 @@
All notable changes to this project will be documented in this file. All notable changes to this project will be documented in this file.
## [1.3.0] - 2022-02-xx
### Changed
- Correct use of InfluxDB batch writing.
## [1.2.8] - 2022-02-28 ## [1.2.8] - 2022-02-28
### Changed ### Changed
- Sort measurement tags before writing to InfluxDB. - Sort measurement tags before writing to InfluxDB.
@ -12,6 +16,7 @@ All notable changes to this project will be documented in this file.
### Added ### Added
- Options to include/exclude Managed Systems and/or Logical Partitions. - Options to include/exclude Managed Systems and/or Logical Partitions.
[1.3.0]: https://bitbucket.org/mnellemann/hmci/branches/compare/v1.3.0%0Dv1.2.8
[1.2.8]: https://bitbucket.org/mnellemann/hmci/branches/compare/v1.2.8%0Dv1.2.7 [1.2.8]: https://bitbucket.org/mnellemann/hmci/branches/compare/v1.2.8%0Dv1.2.7
[1.2.7]: https://bitbucket.org/mnellemann/hmci/branches/compare/v1.2.7%0Dv1.2.6 [1.2.7]: https://bitbucket.org/mnellemann/hmci/branches/compare/v1.2.7%0Dv1.2.6
[1.2.6]: https://bitbucket.org/mnellemann/hmci/branches/compare/v1.2.6%0Dv1.2.5 [1.2.6]: https://bitbucket.org/mnellemann/hmci/branches/compare/v1.2.6%0Dv1.2.5

View File

@ -84,10 +84,10 @@ This is most likely due to timezone, date and/or NTP not being configured correc
Example showing how you configure related settings through the HMC CLI: Example showing how you configure related settings through the HMC CLI:
```shell ```shell
chhmc -c date -s modify --datetime MMDDhhmm # Set current date/time: MMDDhhmm[[CC]YY][.ss]
chhmc -c date -s modify --timezone Europe/Copenhagen # Configure your timezone
chhmc -c xntp -s enable # Enable the NTP service chhmc -c xntp -s enable # Enable the NTP service
chhmc -c xntp -s add -a IP_Addr # Add a remote NTP server chhmc -c xntp -s add -a IP_Addr # Add a remote NTP server
chhmc -c date -s modify --timezone Europe/Copenhagen # Configure your timezone
chhmc -c date -s modify --datetime 01301615 # Set current date/time: MMDDhhmm[[CC]YY][.ss]
``` ```
Remember to reboot your HMC after changing the timezone. Remember to reboot your HMC after changing the timezone.

View File

@ -1,4 +1,4 @@
projectId = hmci projectId = hmci
projectGroup = biz.nellemann.hmci projectGroup = biz.nellemann.hmci
projectVersion = 1.2.8 projectVersion = 1.3.0
projectJavaVersion = 1.8 projectJavaVersion = 1.8

View File

@ -109,7 +109,7 @@ class HmcInstance implements Runnable {
writeMetricsForSystemEnergy(); writeMetricsForSystemEnergy();
writeMetricsForManagedSystems(); writeMetricsForManagedSystems();
writeMetricsForLogicalPartitions(); writeMetricsForLogicalPartitions();
influxClient.writeBatchPoints(); //influxClient.writeBatchPoints();
// Refresh // Refresh
if (++executions > rescanValue) { if (++executions > rescanValue) {

View File

@ -16,6 +16,7 @@
package biz.nellemann.hmci; package biz.nellemann.hmci;
import biz.nellemann.hmci.Configuration.InfluxObject; import biz.nellemann.hmci.Configuration.InfluxObject;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB; import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBException; import org.influxdb.InfluxDBException;
import org.influxdb.InfluxDBFactory; import org.influxdb.InfluxDBFactory;
@ -24,6 +25,7 @@ import org.influxdb.dto.Point;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.SocketException;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -33,7 +35,7 @@ import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep; import static java.lang.Thread.sleep;
class InfluxClient { public final class InfluxClient {
private final static Logger log = LoggerFactory.getLogger(InfluxClient.class); private final static Logger log = LoggerFactory.getLogger(InfluxClient.class);
@ -43,8 +45,6 @@ class InfluxClient {
final private String database; final private String database;
private InfluxDB influxDB; private InfluxDB influxDB;
private BatchPoints batchPoints;
private int errorCounter = 0;
InfluxClient(InfluxObject config) { InfluxClient(InfluxObject config) {
@ -69,7 +69,18 @@ class InfluxClient {
log.debug("Connecting to InfluxDB - {}", url); log.debug("Connecting to InfluxDB - {}", url);
influxDB = InfluxDBFactory.connect(url, username, password).setDatabase(database); influxDB = InfluxDBFactory.connect(url, username, password).setDatabase(database);
influxDB.version(); // This ensures that we actually try to connect to the db influxDB.version(); // This ensures that we actually try to connect to the db
batchPoints = BatchPoints.database(database).precision(TimeUnit.SECONDS).build();
influxDB.enableBatch(
BatchOptions.DEFAULTS
.threadFactory(runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
})
); // (4)
Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close));
connected = true; connected = true;
} catch(Exception e) { } catch(Exception e) {
sleep(15 * 1000); sleep(15 * 1000);
@ -92,28 +103,41 @@ class InfluxClient {
influxDB = null; influxDB = null;
} }
/*
synchronized void writeBatchPoints() throws Exception { synchronized void writeBatchPoints() throws Exception {
log.trace("writeBatchPoints()"); log.trace("writeBatchPoints()");
try { try {
influxDB.write(batchPoints); influxDB.write(batchPoints);
batchPoints = BatchPoints.database(database).precision(TimeUnit.SECONDS).build();
errorCounter = 0; errorCounter = 0;
} catch (InfluxDBException.DatabaseNotFoundException e) { } catch (InfluxDBException.DatabaseNotFoundException e) {
log.error("writeBatchPoints() - database \"{}\" not found/created: can't write data", database); log.error("writeBatchPoints() - database \"{}\" not found/created: can't write data", database);
if(++errorCounter > 3) { if (++errorCounter > 3) {
throw new RuntimeException(e);
}
} catch (org.influxdb.InfluxDBIOException e) {
log.warn("writeBatchPoints() - io exception: {}", e.getMessage());
if(++errorCounter < 3) {
log.warn("writeBatchPoints() - reconnecting to InfluxDB due to io exception.");
logoff();
login();
writeBatchPoints();
} else {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} catch(Exception e) { } catch(Exception e) {
e.printStackTrace(); log.warn("writeBatchPoints() - general exception: {}", e.getMessage());
log.warn("writeBatchPoints() {}", e.getMessage()); if(++errorCounter < 3) {
if(++errorCounter > 5) { log.warn("writeBatchPoints() - reconnecting to InfluxDB due to general exception.");
errorCounter = 0;
logoff(); logoff();
login(); login();
writeBatchPoints();
} else {
throw new RuntimeException(e);
} }
} }
} }
*/
/* /*
@ -134,25 +158,41 @@ class InfluxClient {
return; return;
} }
getSystemDetails(system, timestamp).forEach( it -> batchPoints.point(it) ); //getSystemDetails(system, timestamp).forEach( it -> batchPoints.point(it) );
getSystemProcessor(system, timestamp).forEach( it -> batchPoints.point(it) ); getSystemDetails(system, timestamp).forEach( it -> influxDB.write(it));
getSystemPhysicalProcessorPool(system, timestamp).forEach( it -> batchPoints.point(it) ); //getSystemProcessor(system, timestamp).forEach( it -> batchPoints.point(it) );
getSystemSharedProcessorPools(system, timestamp).forEach( it -> batchPoints.point(it) ); getSystemProcessor(system, timestamp).forEach( it -> influxDB.write(it) );
getSystemMemory(system, timestamp).forEach( it -> batchPoints.point(it) ); //getSystemPhysicalProcessorPool(system, timestamp).forEach( it -> batchPoints.point(it) );
getSystemPhysicalProcessorPool(system, timestamp).forEach( it -> influxDB.write(it) );
//getSystemSharedProcessorPools(system, timestamp).forEach( it -> batchPoints.point(it) );
getSystemSharedProcessorPools(system, timestamp).forEach( it -> influxDB.write(it) );
//getSystemMemory(system, timestamp).forEach( it -> batchPoints.point(it) );
getSystemMemory(system, timestamp).forEach( it -> influxDB.write(it) );
getSystemViosDetails(system, timestamp).forEach(it -> batchPoints.point(it) ); //getSystemViosDetails(system, timestamp).forEach(it -> batchPoints.point(it) );
getSystemViosProcessor(system, timestamp).forEach( it -> batchPoints.point(it) ); getSystemViosDetails(system, timestamp).forEach(it -> influxDB.write(it) );
getSystemViosMemory(system, timestamp).forEach( it -> batchPoints.point(it) ); //getSystemViosProcessor(system, timestamp).forEach( it -> batchPoints.point(it) );
getSystemViosProcessor(system, timestamp).forEach( it -> influxDB.write(it) );
//getSystemViosMemory(system, timestamp).forEach( it -> batchPoints.point(it) );
getSystemViosMemory(system, timestamp).forEach( it -> influxDB.write(it) );
getSystemViosNetworkLpars(system, timestamp).forEach(it -> batchPoints.point(it) ); //getSystemViosNetworkLpars(system, timestamp).forEach(it -> batchPoints.point(it) );
getSystemViosNetworkGenericAdapters(system, timestamp).forEach(it -> batchPoints.point(it) ); getSystemViosNetworkLpars(system, timestamp).forEach(it -> influxDB.write(it) );
getSystemViosNetworkSharedAdapters(system, timestamp).forEach(it -> batchPoints.point(it) ); //getSystemViosNetworkGenericAdapters(system, timestamp).forEach(it -> batchPoints.point(it) );
getSystemViosNetworkVirtualAdapters(system, timestamp).forEach(it -> batchPoints.point(it) ); getSystemViosNetworkGenericAdapters(system, timestamp).forEach(it -> influxDB.write(it) );
//getSystemViosNetworkSharedAdapters(system, timestamp).forEach(it -> batchPoints.point(it) );
getSystemViosNetworkSharedAdapters(system, timestamp).forEach(it -> influxDB.write(it) );
//getSystemViosNetworkVirtualAdapters(system, timestamp).forEach(it -> batchPoints.point(it) );
getSystemViosNetworkVirtualAdapters(system, timestamp).forEach(it -> influxDB.write(it) );
getSystemViosStorageLpars(system, timestamp).forEach(it -> batchPoints.point(it) ); //getSystemViosStorageLpars(system, timestamp).forEach(it -> batchPoints.point(it) );
getSystemViosFiberChannelAdapters(system, timestamp).forEach(it -> batchPoints.point(it) ); getSystemViosStorageLpars(system, timestamp).forEach(it -> influxDB.write(it) );
getSystemViosStoragePhysicalAdapters(system, timestamp).forEach(it -> batchPoints.point(it) ); //getSystemViosFiberChannelAdapters(system, timestamp).forEach(it -> batchPoints.point(it) );
getSystemViosStorageVirtualAdapters(system, timestamp).forEach(it -> batchPoints.point(it) ); getSystemViosFiberChannelAdapters(system, timestamp).forEach(it -> influxDB.write(it) );
//getSystemViosStoragePhysicalAdapters(system, timestamp).forEach(it -> batchPoints.point(it) );
getSystemViosStoragePhysicalAdapters(system, timestamp).forEach(it -> influxDB.write(it) );
//getSystemViosStorageVirtualAdapters(system, timestamp).forEach(it -> batchPoints.point(it) );
getSystemViosStorageVirtualAdapters(system, timestamp).forEach(it -> influxDB.write(it) );
} }
@ -265,12 +305,18 @@ class InfluxClient {
return; return;
} }
getPartitionDetails(partition, timestamp).forEach( it -> batchPoints.point(it)); //getPartitionDetails(partition, timestamp).forEach( it -> batchPoints.point(it));
getPartitionMemory(partition, timestamp).forEach( it -> batchPoints.point(it)); getPartitionDetails(partition, timestamp).forEach( it -> influxDB.write(it));
getPartitionProcessor(partition, timestamp).forEach( it -> batchPoints.point(it)); //getPartitionMemory(partition, timestamp).forEach( it -> batchPoints.point(it));
getPartitionNetworkVirtual(partition, timestamp).forEach(it -> batchPoints.point(it)); getPartitionMemory(partition, timestamp).forEach( it -> influxDB.write(it));
getPartitionStorageVirtualGeneric(partition, timestamp).forEach(it -> batchPoints.point(it)); //getPartitionProcessor(partition, timestamp).forEach( it -> batchPoints.point(it));
getPartitionStorageVirtualFibreChannel(partition, timestamp).forEach(it -> batchPoints.point(it)); getPartitionProcessor(partition, timestamp).forEach( it -> influxDB.write(it));
//getPartitionNetworkVirtual(partition, timestamp).forEach(it -> batchPoints.point(it));
getPartitionNetworkVirtual(partition, timestamp).forEach(it -> influxDB.write(it));
//getPartitionStorageVirtualGeneric(partition, timestamp).forEach(it -> batchPoints.point(it));
getPartitionStorageVirtualGeneric(partition, timestamp).forEach(it -> influxDB.write(it));
//getPartitionStorageVirtualFibreChannel(partition, timestamp).forEach(it -> batchPoints.point(it));
getPartitionStorageVirtualFibreChannel(partition, timestamp).forEach(it -> influxDB.write(it));
} }
@ -326,8 +372,10 @@ class InfluxClient {
return; return;
} }
getSystemEnergyPower(systemEnergy, timestamp).forEach(it -> batchPoints.point(it) ); //getSystemEnergyPower(systemEnergy, timestamp).forEach(it -> batchPoints.point(it) );
getSystemEnergyTemperature(systemEnergy, timestamp).forEach(it -> batchPoints.point(it) ); getSystemEnergyPower(systemEnergy, timestamp).forEach(it -> influxDB.write(it) );
//getSystemEnergyTemperature(systemEnergy, timestamp).forEach(it -> batchPoints.point(it) );
getSystemEnergyTemperature(systemEnergy, timestamp).forEach(it -> influxDB.write(it) );
} }
private static List<Point> getSystemEnergyPower(SystemEnergy system, Instant timestamp) { private static List<Point> getSystemEnergyPower(SystemEnergy system, Instant timestamp) {