/** * Copyright (c) 2012 - 2018 Data In Motion and others. * All rights reserved. * * This program and the accompanying materials are made available under the terms of the * Eclipse Public License v1.0 which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * Data In Motion - initial API and implementation */ package org.gecko.influxdb.impl; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; import org.eclipse.emf.ecore.EAttribute; import org.eclipse.emf.ecore.EClass; import org.eclipse.emf.ecore.EObject; import org.gecko.core.api.PropertyHelper; import org.gecko.influxdb.api.InfluxDBEntry; import org.gecko.influxdb.api.InfluxDBService; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import org.influxdb.dto.QueryResult.Result; import org.influxdb.dto.QueryResult.Series; import org.influxdb.impl.InfluxDBResultMapper; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.ConfigurationPolicy; import org.osgi.service.component.annotations.Deactivate; import org.osgi.service.component.annotations.Modified; @Component(configurationPid = "InfluxDBClient", configurationPolicy = ConfigurationPolicy.REQUIRE) public class InfluxDBServiceImpl implements InfluxDBService{ private Logger logger = Logger.getLogger(InfluxDBServiceImpl.class.getName()); private volatile InfluxDB influxDB; private String protocol; private String hostname; private String port; private String url; private String username; private String password; private @interface InfluxConfig { String protocol() default "https"; String hostname() default "localhost"; String port() default "8086"; String url(); String username(); String password(); } @Activate public void activate(InfluxConfig config, Map properties) { configureConnection(config, properties); logger.log(Level.FINE, "Activated InfluxDB Service"); } @Modified public void modified(InfluxConfig config, Map properties) { if (influxDB != null) { influxDB.close(); } configureConnection(config, properties); logger.log(Level.FINE, "Modified configuration for InfluxDB Service"); } @Deactivate public void deactivate() { if (influxDB != null) { influxDB.close(); } logger.log(Level.FINE, "Deactivated InfluxDB Service"); } /** * Checks the configuration. If no password or username is found in the InfluxConfig it looks for System * environment variables. * * @param config * @param properties */ private void configureConnection(InfluxConfig config, Map properties) { if (config == null) { throw new IllegalArgumentException("Cannot create a connection to InfluxDB without a configuration"); } if (config.url() != null && !config.url().isEmpty()) { url = config.url(); } else { Object urlValue = PropertyHelper.createHelper().getValue(properties, "url"); if (urlValue != null) { logger.fine("Found url in system env properties"); url = urlValue.toString(); } else { if (config.protocol() != null && !config.protocol().isEmpty()) { protocol = config.protocol(); } if (config.hostname() != null && !config.hostname().isEmpty()) { hostname = config.hostname(); } if (config.port() != null && !config.port().isEmpty()) { port = config.port(); } } } if (config.username() != null && !config.username().isEmpty()) { username = config.username(); } else { Object userValue = PropertyHelper.createHelper().getValue(properties, "username"); if (userValue != null) { logger.fine("Found username in system env properties"); username = userValue.toString(); } } if (config.password() != null && !config.password().isEmpty()) { password = config.password(); } else { Object passValue = PropertyHelper.createHelper().getValue(properties, "password"); if (passValue != null) { logger.fine("Found password in system env properties"); password = passValue.toString(); } } } /* * (non-Javadoc) * @see de.dim.queueing.influxdb.api.InfluxdbService#createDB(java.lang.String) */ @Override public boolean createDB(String dbName) { connect(); QueryResult result = influxDB.query(new Query("CREATE DATABASE " + dbName)); if(result.hasError()) { logger.log(Level.SEVERE, "Error creating the db " + result.getError()); } return !result.hasError(); } /* * (non-Javadoc) * @see de.dim.queueing.influxdb.api.InfluxdbService#writeSinglePoint(java.lang.String, de.dim.queueing.influxdb.api.InfluxdbEntry) */ @Override public boolean writeSinglePoint(String dbName, InfluxDBEntry entry) { connect(); influxDB.setDatabase(dbName); Point point = createSinglePoint(entry); if(point != null) { influxDB.write(point); return true; } else { logger.log(Level.SEVERE, "Error writing point to db " + dbName); return false; } } /* * (non-Javadoc) * @see de.dim.queueing.influxdb.api.InfluxdbService#writeTimeSeries(java.lang.String, java.util.List) */ @Override public boolean writeTimeSeries(String dbName, List entries) { connect(); influxDB.setDatabase(dbName); BatchPoints batchPoints = createBatchPoints(entries); if(batchPoints != null) { influxDB.write(batchPoints); return true; } return false; } /* * (non-Javadoc) * @see de.dim.queueing.influxdb.api.InfluxdbService#writeWithTimeShift(java.lang.String, de.dim.queueing.influxdb.api.InfluxdbEntry, long) */ @Override public boolean writeWithTimeShift(String dbName, List entries, long timeShift) { connect(); influxDB.setDatabase(dbName); BatchPoints batchPoints = createBatchPoints(entries, timeShift); if(batchPoints != null) { influxDB.write(batchPoints); return true; } return false; } /* * (non-Javadoc) * @see de.dim.queueing.influxdb.api.InfluxdbService#getQuery(java.lang.String, java.lang.String, java.lang.Class) */ @Override public List getQuery(String measurement, String dbName, Class clazz) { connect(); influxDB.setDatabase(dbName); QueryResult result = influxDB.query(new Query("SELECT * from " + measurement)); if(result.hasError()) { logger.log(Level.SEVERE, "Error creating the db " + result.getError()); return new ArrayList(); } else { InfluxDBResultMapper resultMapper = new InfluxDBResultMapper(); List pointList = resultMapper.toPOJO(result, clazz); return pointList; } } /* * (non-Javadoc) * @see org.gecko.influxdb.api.InfluxDBService#getEObjectQuery(java.lang.String, java.lang.String, java.lang.String, java.lang.String, org.eclipse.emf.ecore.EClass, java.lang.Long, java.lang.Long, org.eclipse.emf.ecore.EAttribute) */ @Override public List getEObjectQuery(String measurement, String dbName, String idTag, String idTagValue, EClass eclass, Long startDate, Long endDate, EAttribute timeAttribute) { connect(); influxDB.setDatabase(dbName); QueryResult result = null; if(idTag == null || idTagValue == null) { result = influxDB.query(new Query("SELECT * from " + measurement)); } else { result = influxDB.query(new Query("SELECT * from " + measurement + " WHERE " + idTag + "='"+idTagValue+"'")); } if(result.hasError()) { logger.log(Level.SEVERE, "Error creating the db " + result.getError()); return new ArrayList(); } else { InfluxEObjectMapper influxEObjMapper = new InfluxEObjectMapper(influxDB); List pointList = (List) influxEObjMapper.toEObject(result, eclass, measurement, startDate, endDate, timeAttribute); return pointList; } } /* * (non-Javadoc) * @see org.gecko.influxdb.api.InfluxDBService#getSeriesMap(java.lang.String, java.lang.String, java.lang.String, org.eclipse.emf.ecore.EClass, java.lang.Long, java.lang.Long, org.eclipse.emf.ecore.EAttribute) */ @Override public Map> getSeriesMap(String measurement, String dbName, String idTag, EClass eclass, Long startDate, Long endDate, EAttribute timeAttribute) { connect(); influxDB.setDatabase(dbName); QueryResult query = influxDB.query(new Query("SHOW TAG VALUES FROM " + measurement + " WITH KEY IN (" + idTag + ")")); List results = query.getResults(); List idTagValues = new LinkedList(); Map> resMap = new HashMap>(); if(query.hasError()) { logger.severe("Query Result error. " + query.getError()); return resMap; } if(results != null && !results.isEmpty()) { for(Result r : results) { List series = r.getSeries(); if (series == null) { logger.severe("Series is null, error: " + r.getError() + " in DB " + dbName); continue; } for(Series s : series) { if(s.getValues() != null && !s.getValues().isEmpty()) { for(List objs : s.getValues()) { if(objs == null || objs.isEmpty()) { logger.severe("No Points with TAG " + idTag + " in DB " + dbName); return null; } if(objs.size() > 1) { idTagValues.add((String) objs.get(1)); //it is a tag, so it is a String... } else { logger.warning("No value associated with tag " + idTag); } } } else { logger.warning("Series has null or empty values"); } } } for(String value : idTagValues) { resMap.put(value, getEObjectQuery(measurement, dbName, idTag, value, eclass, startDate, endDate, timeAttribute)); } } else { logger.severe("Query result is null or empty"); } return resMap; } /* * (non-Javadoc) * @see de.dim.queueing.influxdb.api.InfluxdbService#removeDB(java.lang.String) */ @Override public boolean removeDB(String dbName) { connect(); QueryResult result = influxDB.query(new Query("DROP DATABASE " + dbName)); if(result.hasError()) { logger.log(Level.SEVERE, "Error removing the db " + result.getError()); } return !result.hasError(); } /* * (non-Javadoc) * @see de.dim.queueing.influxdb.api.InfluxdbService#getTimeShift(java.lang.String, java.lang.String) */ @Override public long getTimeShift(String dbName, String timeShiftTag) { connect(); influxDB.setDatabase(dbName); QueryResult result = influxDB.query(new Query("SHOW TAG VALUES WITH KEY="+timeShiftTag)); if(result.hasError()) { logger.log(Level.SEVERE, "Error retrieving series from db " + result.getError()); return 0; } else { if(result.getResults() != null && !result.getResults().isEmpty()) { Result res = result.getResults().get(0); List series = res.getSeries(); if(series != null && !series.isEmpty()) { Series ser = series.get(0); if(ser.getValues() != null && !ser.getValues().isEmpty()) { List values = series.get(0).getValues().get(0); if(values.size() == 2 && values.get(1) instanceof String) { String val = (String) values.get(1); return Long.parseLong(val); } else { logger.severe("Tag values not found for key " + timeShiftTag); return 0; } } else { logger.severe("Series values are null or empty"); return 0; } } else { logger.severe("Series are null or empty"); return 0; } } else { logger.warning("Results of query is empty or null"); return 0; } } } private void connect() { if(influxDB == null) { if(url != null) { logger.fine("Found url!"); influxDB = InfluxDBFactory.connect(url, username, password); } else { String address = protocol+"://"+hostname+":"+port; influxDB = InfluxDBFactory.connect(address, username, password); } } } /** * Creates the Point to be written in the db from the corresponding InfluxdbContext * @param entry the InfluxdbContext * @return the Point to be written in the db */ private Point createSinglePoint(InfluxDBEntry entry) { if(isEntrytOK(entry)) { String measurement = (String) entry.getMeasurement(); Map tags = entry.getTags(); Map fields = entry.getFields(); long time = entry.getTimestamp(); if(time != 0) { return Point.measurement(measurement).tag(tags).fields(fields).time(time, TimeUnit.MILLISECONDS).build(); } else { return Point.measurement(measurement).tag(tags).fields(fields).build(); } } else { logger.log(Level.SEVERE, "Entry is not OK " + entry); return null; } } private List createPoints(List entries) { List points = new LinkedList(); for(InfluxDBEntry entry : entries) { Point point = createSinglePoint(entry); if(point != null) { points.add(point); } else { logger.log(Level.WARNING, "Point is null. Not adding it!"); } } return points; } private List createPoints(List entries, long timeShift) { if(entries != null) { if(timeShift > 0) { entries = entries.stream().sorted(Comparator.comparing(InfluxDBEntry::getTimestamp)) .collect(Collectors.toList()); long t1 = -1; for(InfluxDBEntry entry : entries) { if(t1 == -1) { t1 = entries.get(0).getTimestamp(); } long t = entry.getTimestamp(); long dt = t - t1; entry.setTimestamp(timeShift+dt); } } return createPoints(entries); } else { throw new IllegalArgumentException("List of InfluxDBEntry is null."); } } private BatchPoints createBatchPoints(List entries) { return createBatchPoints(entries, 0); } private BatchPoints createBatchPoints(List entries, long timeShift) { List points = createPoints(entries, timeShift); if(points.isEmpty()) { logger.log(Level.SEVERE, "Error creating BatchPoints"); return null; } return BatchPoints.builder().points(new ArrayList(points)).build(); } /** * Checks if the InfluxEntry provided is OK. * With OK we mean: * - at least one filed has to be provided. * - a measurement has to be set (current implementation supports only String); * @return true if the InfluxdbContext is OK, false otherwise */ private boolean isEntrytOK(InfluxDBEntry entry) { if(entry == null) { logger.severe("InfluxDBEntry is null"); return false; } Map fields = entry.getFields(); if(fields.isEmpty()) { logger.log(Level.SEVERE, "The Influxdb Context should contain at least one field"); return false; } if(entry.getMeasurement() == null) { logger.log(Level.SEVERE, "The Influxdb Context should contain at least one measurement"); return false; } if(!(entry.getMeasurement() instanceof String)) { logger.log(Level.SEVERE, "The Influxdb Context Measurement should be a String"); return false; } return true; } }