/** * Copyright (c) 2012 - 2019 Data In Motion and others. * All rights reserved. * * This program and the accompanying materials are made available under the terms of the * Eclipse Public License v1.0 which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * Data In Motion - initial API and implementation */ package org.gecko.influxdb.impl; import java.time.Instant; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.logging.Logger; import java.util.stream.Collectors; import org.eclipse.emf.ecore.EAttribute; import org.eclipse.emf.ecore.EClass; import org.eclipse.emf.ecore.EFactory; import org.eclipse.emf.ecore.EObject; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBMapperException; import org.influxdb.dto.QueryResult; import org.influxdb.dto.QueryResult.Series; import org.influxdb.impl.InfluxDBMapper; /** * A mapper from the InfluxDB query result to an EObject. * * @author ilenia * @since May 27, 2019 */ public class InfluxEObjectMapper extends InfluxDBMapper { private Logger log = Logger.getLogger(InfluxEObjectMapper.class.getName()); /** * Creates a new instance. * @param influxDB */ public InfluxEObjectMapper(InfluxDB influxDB) { super(influxDB); } public List toEObject(QueryResult queryResult, EClass eClass, String measurementName, EAttribute timeAttribute) throws InfluxDBMapperException { return toEObject(queryResult, eClass, measurementName, null, null, timeAttribute); } public List toEObject(QueryResult queryResult, EClass eClass, String measurementName, Long startDate, Long endDate, EAttribute timeAttribute) throws InfluxDBMapperException { Objects.requireNonNull(measurementName, "measurementName"); Objects.requireNonNull(queryResult, "queryResult"); Objects.requireNonNull(eClass, "clazz"); List result = new LinkedList(); if(queryResult.getResults() != null && !queryResult.getResults().isEmpty()) { queryResult.getResults().stream() .filter(internalResult -> Objects.nonNull(internalResult) && Objects.nonNull(internalResult.getSeries())) .forEach(internalResult -> { internalResult.getSeries().stream() .filter(series -> series.getName().equals(measurementName)) .forEachOrdered(series -> { parseSeriesAsEObject(series, eClass, result, startDate, endDate, timeAttribute); }); }); } else { log.severe("Query Results are null or empty"); } return result; } private List parseSeriesAsEObject(Series series, EClass eClass, List result, Long startDate, Long endDate, EAttribute timeAttribute) { if(series.getColumns() != null) { int columnSize = series.getColumns().size(); EFactory factory = (EFactory) eClass.getEPackage().getEFactoryInstance(); String timeAttributeName = timeAttribute == null ? null : timeAttribute.getName(); for (List row : series.getValues()) { EObject object = factory.create(eClass); boolean saveObj = true; Map attMap = new HashMap(); List attributes = object.eClass().getEAllAttributes(); //stream attributes list to remove transient and derived features, and also multi-valued features //since it is not possible to store list for an influxDB point... attMap = attributes.stream().filter(a->(!a.isTransient() && !a.isDerived() && !a.isMany())) .collect(Collectors.toMap(k->k.getName(), v->v)); //fields values for (int i = 0; i < columnSize; i++) { String name = series.getColumns().get(i); // time is a fixed name in influx, we substitute it to the corresponding EAttribute if ("time".equals(name)) { name = timeAttributeName == null ? "time" : timeAttributeName; } EAttribute attribute = attMap.get(name); if(attribute != null) { if(name.equals(timeAttributeName)) { if(isDateOK(startDate, endDate, row.get(i))) { setAttributeValue(object, attribute, row.get(i)); } else { saveObj = false; } } else { setAttributeValue(object, attribute, row.get(i)); } } } //tags values if (series.getTags() != null && !series.getTags().isEmpty()) { for (Entry entry : series.getTags().entrySet()) { String name = entry.getKey(); EAttribute attribute = attMap.get(name); if(attribute != null) { setAttributeValue(object, attribute, entry.getValue()); } } } if(saveObj) result.add(object); } } else { log.severe("Series has no columns."); } return null; } /** * @param startDate * @param endDate * @param object * @return */ private boolean isDateOK(Long startDate, Long endDate, Object object) { if(startDate == null && endDate == null) { return true; } Instant instant = Instant.parse((String) object); Long time = instant.toEpochMilli(); if(startDate == null) { if(time < endDate) { return true; } } else if(endDate == null) { if(time > startDate) { return true; } } else { if(time < endDate && time > startDate) { return true; } } return false; } /** * @param object * @param attribute * @param value */ private void setAttributeValue(EObject object, EAttribute attribute, Object value) { if(String.class.isAssignableFrom(attribute.getEType().getInstanceClass())) { object.eSet(attribute, String.valueOf(value)); } if (double.class.isAssignableFrom(attribute.getEType().getInstanceClass())) { if(value instanceof String) object.eSet(attribute, Double.parseDouble((String) value)); else object.eSet(attribute, ((Double) value).doubleValue()); } if (long.class.isAssignableFrom(attribute.getEType().getInstanceClass())) { if(value instanceof String) { Instant instant = Instant.parse((String) value); object.eSet(attribute, instant.toEpochMilli()); } else object.eSet(attribute, ((Double) value).longValue()); } if (int.class.isAssignableFrom(attribute.getEType().getInstanceClass())) { if(value instanceof String) object.eSet(attribute, Integer.parseInt((String) value)); else object.eSet(attribute, ((Double) value).intValue()); } if (boolean.class.isAssignableFrom(attribute.getEType().getInstanceClass())) { object.eSet(attribute, Boolean.valueOf(String.valueOf(value)).booleanValue()); } if (Double.class.isAssignableFrom(attribute.getEType().getInstanceClass())) { object.eSet(attribute, value); } if (Long.class.isAssignableFrom(attribute.getEType().getInstanceClass())) { object.eSet(attribute, Long.valueOf(((Double) value).longValue())); } if (Integer.class.isAssignableFrom(attribute.getEType().getInstanceClass())) { object.eSet(attribute, Integer.valueOf(((Double) value).intValue())); } if (Boolean.class.isAssignableFrom(attribute.getEType().getInstanceClass())) { object.eSet(attribute, Boolean.valueOf(String.valueOf(value))); } } }