/** * Copyright (c) 2012 - 2019 Data In Motion and others. * All rights reserved. * * This program and the accompanying materials are made available under the terms of the * Eclipse Public License v1.0 which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * Data In Motion - initial API and implementation */ package org.gecko.influxdb.impl; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.logging.Level; import java.util.logging.Logger; import org.gecko.influxdb.api.InfluxDBEntry; import org.gecko.influxdb.api.InfluxDBEntryHelper; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.ServiceScope; /** * * @author ilenia * @since May 14, 2019 */ @Component(scope=ServiceScope.PROTOTYPE) public class InfluxDBEntryHelperImpl implements InfluxDBEntryHelper { private Logger logger = Logger.getLogger(InfluxDBEntryHelperImpl.class.getName()); private String measurement; private String timeKey; private Set fieldKeys; private Set tagsKeys; private boolean isSizeOK; /* * (non-Javadoc) * @see de.dim.queueing.influxdb.api.InfluxdbEntryHelper#initialize(java.lang.String, java.lang.String[], java.lang.String[], java.lang.String) */ @Override public void initialize(String measurement, Set fieldKeys, Set tagsKeys, String timeKey) { this.measurement = measurement; this.timeKey = timeKey; this.fieldKeys = fieldKeys; this.tagsKeys = tagsKeys; } /* * (non-Javadoc) * @see de.dim.queueing.influxdb.api.InfluxdbEntryHelper#getInfluxdbEntries(java.util.Map) */ @Override public Set getInfluxdbEntries(Map> map) { int size = map.values().stream().findFirst().orElse(Collections.emptyList()).size(); Set entries = new TreeSet((o1, o2)->Long.valueOf(o1.getTimestamp()).compareTo(Long.valueOf(o2.getTimestamp()))); if(isMapOK(map, size)) { for(int i = 0; i < size; i++) { InfluxDBEntryImpl entry = new InfluxDBEntryImpl(); entry.setMeasurement(measurement); if(timeKey != null) { if(map.get(timeKey).get(i) instanceof String) { String time = (String) map.get(timeKey).get(i); entry.setTimestamp(Long.parseLong(time)); } else { entry.setTimestamp((long) map.get(timeKey).get(i)); } } if(tagsKeys != null && tagsKeys.size() > 0) { for(String tag : tagsKeys) { String value = (String) map.get(tag).get(i); if (value != null) { entry.getTags().put(tag, value); } } } for(String field : fieldKeys) { Object value = map.get(field).get(i); if (value != null) { entry.getFields().put(field, value); } } entries.add(entry); } } else { logger.log(Level.SEVERE, "The passed map is not set properly. No InfluxdbEntry will be set."); } return entries; } private boolean isMapOK(Map> map, int valueSize) { if(fieldKeys == null) { logger.log(Level.SEVERE, "At least one field key has to be set"); return false; } for(String field : fieldKeys) { if(!map.containsKey(field)) { logger.log(Level.SEVERE, "Map does not contain field " + field + ". Cannot continue."); return false; } } if(measurement == null) { logger.log(Level.SEVERE, "The measurement is null. Cannot continue."); return false; } isSizeOK = true; map.values().forEach(v->{ if(v.size() != valueSize) { logger.log(Level.SEVERE, "The lists of values should have the same size. Cannot continue."); isSizeOK = false; } }); if(!isSizeOK) { return false; } Set tagsToBeRemoved = new HashSet(); if(tagsKeys != null) { for(String tag : tagsKeys) { if(!map.containsKey(tag)) { logger.log(Level.WARNING, "Map does not contain tag " + tag + ". It will not be set!"); tagsToBeRemoved.add(tag); } } } tagsToBeRemoved.forEach(t->{ tagsKeys.remove(t); }); if(timeKey != null && !map.containsKey(timeKey)) { logger.log(Level.WARNING, "Map does not contain time key " + timeKey); logger.log(Level.WARNING, "The time will be set following the point insertion order!"); timeKey = null; } return true; } }