package org.gecko.rsa.discovery.ma.handler;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingService;
import org.gecko.rsa.api.ExportEndpointHandler;
import org.gecko.rsa.core.EndPointDeSerializer;
import org.gecko.rsa.core.converter.EndpointDescriptionConverter;
import org.gecko.rsa.discovery.EndpointDiscovery;
import org.gecko.rsa.discovery.ImportingEndpointManager;
import org.gecko.rsa.model.rsa.EndpointDescription;
import org.gecko.rsa.model.rsa.EndpointDescriptions;
import org.gecko.rsa.model.rsa.RSAFactory;
import org.osgi.framework.BundleContext;
import org.osgi.util.pushstream.PushStream;

/* loaded from: input_file:org/gecko/rsa/discovery/ma/handler/MessageAdapterDiscoverHandler.class */
public class MessageAdapterDiscoverHandler implements ExportEndpointHandler {
    private static final Logger logger = Logger.getLogger(MessageAdapterDiscoverHandler.class.getName());
    private final Map<String, EndpointDescription> importingDescriptions = new ConcurrentHashMap();
    private final Map<String, EndpointDescription> exportingDescriptions = new ConcurrentHashMap();
    private final EndpointDescriptionConverter converter = new EndpointDescriptionConverter();
    private final EndPointDeSerializer deSerializer = new EndPointDeSerializer();
    private final EndpointDiscovery discovery;
    private final MessagingService messaging;
    private final String frameworkUUID;
    private PushStream<Message> importSubscribe;
    private PushStream<Message> exportSubscribe;
    private PushStream<Message> removeSubscribe;
    private PushStream<Message> modifySubscribe;
    private PushStream<Message> announceSubscribe;

    public MessageAdapterDiscoverHandler(MessagingService messagingService, BundleContext bundleContext, MADiscoveryConfig mADiscoveryConfig) {
        this.messaging = messagingService;
        this.discovery = new EndpointDiscovery(bundleContext, mADiscoveryConfig.name(), this);
        this.frameworkUUID = bundleContext.getProperty("org.osgi.framework.uuid");
    }

    public void start() {
        if (this.discovery == null || this.messaging == null) {
            throw new IllegalStateException("There is a missing discovery or messaging to start this handler");
        }
        this.discovery.start();
        initializeSubscriptions();
        callForDescription();
    }

    public void stop() {
        if (this.discovery != null && this.discovery.isRunning()) {
            this.discovery.stop();
        }
        if (this.importSubscribe != null) {
            this.importSubscribe.close();
        }
        if (this.modifySubscribe != null) {
            this.modifySubscribe.close();
        }
        if (this.removeSubscribe != null) {
            this.removeSubscribe.close();
        }
    }

    private void initializeSubscriptions() {
        try {
            this.exportSubscribe = this.messaging.subscribe(MADiscoveryConstants.TOPIC_ADD_SERVICE);
            this.exportSubscribe.forEach(this::notifyAdd);
        } catch (Exception e) {
            logger.log(Level.SEVERE, String.format("Error subscribing for topic ", MADiscoveryConstants.TOPIC_ADD_SERVICE), (Throwable) e);
        }
        try {
            this.removeSubscribe = this.messaging.subscribe(MADiscoveryConstants.TOPIC_REMOVE_SERVICE);
            this.removeSubscribe.forEach(this::notifyRemove);
        } catch (Exception e2) {
            logger.log(Level.SEVERE, String.format("Error subscribing for topic ", MADiscoveryConstants.TOPIC_REMOVE_SERVICE), (Throwable) e2);
        }
        try {
            this.modifySubscribe = this.messaging.subscribe(MADiscoveryConstants.TOPIC_MODIFY_SERVICE);
            this.modifySubscribe.forEach(this::notifyModify);
        } catch (Exception e3) {
            logger.log(Level.SEVERE, String.format("Error subscribing for topic ", MADiscoveryConstants.TOPIC_MODIFY_SERVICE), (Throwable) e3);
        }
        try {
            this.announceSubscribe = this.messaging.subscribe(String.format(MADiscoveryConstants.TOPIC_ANNOUNCED_SERVICE, this.frameworkUUID));
            this.announceSubscribe.forEach(this::notifyModify);
        } catch (Exception e4) {
            logger.log(Level.SEVERE, String.format("Error subscribing for topic ", MADiscoveryConstants.TOPIC_MODIFY_SERVICE), (Throwable) e4);
        }
        try {
            this.importSubscribe = this.messaging.subscribe(MADiscoveryConstants.TOPIC_GET_DESCRIPTION);
            this.importSubscribe.forEach(this::publishAllDescriptions);
        } catch (Exception e5) {
            logger.log(Level.SEVERE, String.format("Error subscribing for topic ", MADiscoveryConstants.TOPIC_GET_DESCRIPTION), (Throwable) e5);
        }
    }

    public void exportServiceDescription(org.osgi.service.remoteserviceadmin.EndpointDescription endpointDescription) throws Exception {
        EndpointDescription caseOSGiEndpointDescription = this.converter.caseOSGiEndpointDescription(endpointDescription);
        String key = getKey(caseOSGiEndpointDescription);
        if (key == null) {
            logger.severe(String.format("Exporting new endpoint failed because of missing endpoint.id. Endpoint: %s, Key: %s" + caseOSGiEndpointDescription, endpointDescription, key));
            return;
        }
        this.exportingDescriptions.put(key, caseOSGiEndpointDescription);
        publishEndpointDescription(caseOSGiEndpointDescription, MADiscoveryConstants.TOPIC_ADD_SERVICE);
        logger.log(Level.INFO, String.format("Exporting new endpoint via message adapter. Endpoint: %s, Key: %s", endpointDescription, key));
    }

    public void modifyServiceDescription(org.osgi.service.remoteserviceadmin.EndpointDescription endpointDescription) throws Exception {
        EndpointDescription caseOSGiEndpointDescription = this.converter.caseOSGiEndpointDescription(endpointDescription);
        String key = getKey(caseOSGiEndpointDescription);
        if (key == null) {
            logger.severe(String.format("Exporting changed endpoint failed because of missing endpoint.id. Endpoint: %s, Key: %s" + caseOSGiEndpointDescription, endpointDescription, key));
        } else if (this.exportingDescriptions.put(key, caseOSGiEndpointDescription) == null) {
            exportServiceDescription(endpointDescription);
        } else {
            publishEndpointDescription(caseOSGiEndpointDescription, MADiscoveryConstants.TOPIC_MODIFY_SERVICE);
            logger.log(Level.INFO, String.format("Exporting changed endpoint via message adapter. Endpoint: %s, Key: %s", endpointDescription, key));
        }
    }

    public void removeServiceDescription(org.osgi.service.remoteserviceadmin.EndpointDescription endpointDescription) throws Exception {
        EndpointDescription caseOSGiEndpointDescription = this.converter.caseOSGiEndpointDescription(endpointDescription);
        String key = getKey(caseOSGiEndpointDescription);
        if (key == null) {
            logger.severe(String.format("Exporting removed endpoint failed because of missing endpoint.id. Endpoint: %s, Key: %s" + caseOSGiEndpointDescription, endpointDescription, key));
        } else if (this.exportingDescriptions.remove(key) != null) {
            publishEndpointDescription(caseOSGiEndpointDescription, MADiscoveryConstants.TOPIC_REMOVE_SERVICE);
            logger.log(Level.INFO, String.format("Exporting removed endpoint via message adapter. Endpoint: %s, Key: %s", endpointDescription, key));
        }
    }

    private String getKey(EndpointDescription endpointDescription) {
        return (String) endpointDescription.getProperty().stream().filter(property -> {
            return property.getName().equals("endpoint.id");
        }).findFirst().map(property2 -> {
            return property2.getValue();
        }).orElse(null);
    }

    private void notifyAdd(Message message) {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(message.payload().array());
        ImportingEndpointManager importingEndpointManager = this.discovery.getImportingEndpointManager();
        this.deSerializer.deserialize(byteArrayInputStream).thenAccept(endpointDescriptions -> {
            synchronized (endpointDescriptions) {
                for (EndpointDescription endpointDescription : endpointDescriptions.getEndpointDescription()) {
                    String key = getKey(endpointDescription);
                    if (this.importingDescriptions.containsKey(key) || this.exportingDescriptions.containsKey(key)) {
                        logger.info("Detected and new incoming endpoint, that already exists. Ignore it");
                    } else {
                        this.importingDescriptions.put(key, endpointDescription);
                        importingEndpointManager.addImported(endpointDescription);
                        logger.fine("Detected and incoming endpoint andsent 'ADD' event");
                    }
                }
            }
        }).onFailure(th -> {
            logger.log(Level.SEVERE, "Detected an error for an incoming endpoint. Cannot send ADD event", th);
        });
    }

    private void notifyModify(Message message) {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(message.payload().array());
        ImportingEndpointManager importingEndpointManager = this.discovery.getImportingEndpointManager();
        this.deSerializer.deserialize(byteArrayInputStream).thenAccept(endpointDescriptions -> {
            synchronized (endpointDescriptions) {
                for (EndpointDescription endpointDescription : endpointDescriptions.getEndpointDescription()) {
                    String key = getKey(endpointDescription);
                    if (this.exportingDescriptions.containsKey(key)) {
                        logger.info("Detected an incoming modified endpoint, that already exists. Ignore it");
                    } else {
                        EndpointDescription put = this.importingDescriptions.put(key, endpointDescription);
                        if (put != null) {
                            importingEndpointManager.modifyImported(put);
                            logger.info(String.format("Detected and incoming endpoint sent '%s' event", "MODIFY"));
                        } else {
                            importingEndpointManager.addImported(endpointDescription);
                        }
                    }
                }
            }
        }).onFailure(th -> {
            logger.log(Level.SEVERE, "Detected an error for a modified endpoint. Cannot send MODIFY or ADD event", th);
        });
    }

    private void notifyRemove(Message message) {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(message.payload().array());
        ImportingEndpointManager importingEndpointManager = this.discovery.getImportingEndpointManager();
        this.deSerializer.deserialize(byteArrayInputStream).thenAccept(endpointDescriptions -> {
            synchronized (endpointDescriptions) {
                Iterator it = endpointDescriptions.getEndpointDescription().iterator();
                while (it.hasNext()) {
                    EndpointDescription remove = this.importingDescriptions.remove(getKey((EndpointDescription) it.next()));
                    if (remove != null) {
                        importingEndpointManager.removeImported(remove);
                        logger.info("Detected and incoming endpoint sent REMOVE event");
                    } else {
                        logger.info("No endpoint was found, that could be removed");
                    }
                }
            }
        }).onFailure(th -> {
            logger.log(Level.SEVERE, "Detected an error for a remove endpoint. Cannot send REOMVE event", th);
        });
    }

    private void publishEndpointDescription(EndpointDescriptions endpointDescriptions, String str) {
        try {
            this.messaging.publish(str, ByteBuffer.wrap(((ByteArrayOutputStream) this.deSerializer.serialize(endpointDescriptions).getValue()).toByteArray()));
        } catch (Exception e) {
            logger.log(Level.SEVERE, String.format("Error publishing payload for endpoint to topic %s", str), (Throwable) e);
        }
    }

    private void publishEndpointDescription(EndpointDescription endpointDescription, String str) {
        EndpointDescriptions createEndpointDescriptions = RSAFactory.eINSTANCE.createEndpointDescriptions();
        createEndpointDescriptions.getEndpointDescription().add(endpointDescription);
        publishEndpointDescription(createEndpointDescriptions, str);
    }

    private void publishAllDescriptions(Message message) {
        String str = new String(message.payload().array());
        if (this.frameworkUUID == null || this.frameworkUUID.equalsIgnoreCase(str)) {
            logger.fine("Received a get all request for this OSGi framework (Requesting ourself). It can be ignored.");
            return;
        }
        String format = String.format(MADiscoveryConstants.TOPIC_ANNOUNCED_SERVICE, str);
        Collection<EndpointDescription> values = this.exportingDescriptions.values();
        EndpointDescriptions createEndpointDescriptions = RSAFactory.eINSTANCE.createEndpointDescriptions();
        if (!values.isEmpty()) {
            createEndpointDescriptions.getEndpointDescription().addAll(values);
        }
        publishEndpointDescription(createEndpointDescriptions, format);
    }

    private void callForDescription() {
        try {
            this.messaging.publish(MADiscoveryConstants.TOPIC_GET_DESCRIPTION, ByteBuffer.wrap(this.frameworkUUID.getBytes()));
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Error requesting for service descriptions.", (Throwable) e);
        }
    }
}
