/** * Copyright (c) 2012 - 2019 Data In Motion and others. * All rights reserved. * * This program and the accompanying materials are made available under the terms of the * Eclipse Public License v1.0 which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * Data In Motion - initial API and implementation */ package org.gecko.rsa.discovery; import java.io.Closeable; import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Logger; import org.gecko.rsa.core.converter.EndpointDescriptionConverter; import org.gecko.rsa.model.rsa.Property; import org.osgi.framework.BundleContext; import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.EndpointEvent; import org.osgi.service.remoteserviceadmin.RemoteConstants; /** * End-point handler for incoming service descriptions. * This handler is usually called from a protocol specific implementation. * * These are coming from events or turned into {@link EndpointEvent} * @author Mark Hoffmann * @since 06.07.2018 */ public class ImportingEndpointManager implements Closeable { private static final Logger logger = Logger.getLogger(ImportingEndpointManager.class.getName()); private final EndpointDescriptionConverter converter = new EndpointDescriptionConverter(); private final Map descriptionMap = new ConcurrentHashMap(); private final ImportingEndpointEventManager eventManager; private ImportingTopologyManagerTracker endpointListenerTracker; /** * Creates a new instance. */ public ImportingEndpointManager(String discoveryId) { eventManager = new ImportingEndpointEventManager(this, discoveryId); } /** * Initializes the manager and starts tracking for topology managers */ public void start(BundleContext ctx) { endpointListenerTracker = new ImportingTopologyManagerTracker(ctx, eventManager); endpointListenerTracker.open(); } /* * (non-Javadoc) * @see java.io.Closeable#close() */ @Override public void close() throws IOException { if (endpointListenerTracker != null) { endpointListenerTracker.close(); } if (eventManager != null) { eventManager.close(); } descriptionMap.clear(); } /** * Returns all end-point descriptions or an empty collection * @return all end-point descriptions */ public Collection getAllDescriptions() { return descriptionMap.values(); } /** * Returns the key, which is the end-point id * @param endpointDescription the end-point description * @return the end-point id property or null */ private String getKey(org.gecko.rsa.model.rsa.EndpointDescription endpointDescription) { Optional endpointId = endpointDescription.getProperty().stream().filter(p->p.getName().equals(RemoteConstants.ENDPOINT_ID)).findFirst(); return endpointId.map(p->p.getValue()).orElse(null); } /** * Received {@link EndpointDescription} to be imported * @param description the descriptions to be imported */ public void addImported(org.gecko.rsa.model.rsa.EndpointDescription description) { if (eventManager == null) { logger.warning("Cannot consume description because no listener is configured"); return; } String key = getKey(description); if (key == null) { throw new IllegalStateException("Detected an endpoint description with no 'service.id'"); } if (descriptionMap.containsKey(key)) { logger.info("Detected and new incoming endpoint, that already exists. Ignore it"); return; } try { EndpointDescription endpointDescription = converter.doSwitch(description); descriptionMap.put(key, description); EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpointDescription); eventManager.endpointChanged(event, null); logger.info("Detected and incoming endpoint and sent 'ADD' event"); } catch (Exception e) { if (e instanceof IllegalStateException) { throw e; } throw new IllegalStateException(e.getMessage()); } } /** * Received a modification for an imported description. * If there is no end-point description, it will be created and an add event will be sent. * Otherwise a modified event will be sent * @param description the descriptions to be imported */ public void modifyImported(org.gecko.rsa.model.rsa.EndpointDescription description) { if (eventManager == null) { logger.warning("Cannot consume modify end-point description because no listener is configured"); return; } String key = getKey(description); EndpointDescription endpointDescription = converter.doSwitch(description); org.gecko.rsa.model.rsa.EndpointDescription edModify = descriptionMap.put(key, description); int eventType = EndpointEvent.ADDED; if (edModify != null) { eventType = EndpointEvent.MODIFIED; } EndpointEvent event = new EndpointEvent(eventType, endpointDescription); eventManager.endpointChanged(event, null); logger.info(String.format("Detected and incoming endpoint sent '%s' event", (eventType == EndpointEvent.ADDED ? "ADD" : "MODIFY"))); } /** * Remove a imported description and notifies all listeners about this end-point change. * @param description the description to be imported */ public void removeImported(org.gecko.rsa.model.rsa.EndpointDescription description) { if (eventManager == null) { logger.warning("Cannot consume remove message with end-point description because no listener is configured"); return; } String key = getKey(description); org.gecko.rsa.model.rsa.EndpointDescription edRemove = descriptionMap.remove(key); if (edRemove != null) { EndpointDescription endpointDescription = converter.doSwitch(edRemove); EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpointDescription); eventManager.endpointChanged(event, null); logger.info("Detected and incoming endpoint sent REMOVE event"); } else { logger.info("No endpoint was found, that could be removed"); } } }