package org.gecko.rsa.discovery.ma.repository;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.eclipse.emf.ecore.EObject;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingService;
import org.gecko.rsa.discovery.ma.converter.EndPointDeSerializer;
import org.gecko.rsa.discovery.ma.converter.EndpointDescriptionConverter;
import org.gecko.rsa.discovery.ma.util.MADiscoveryConstants;
import org.gecko.rsa.model.rsa.EndpointDescription;
import org.gecko.rsa.model.rsa.EndpointDescriptions;
import org.gecko.rsa.model.rsa.RSAFactory;
import org.osgi.service.remoteserviceadmin.EndpointEvent;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
import org.osgi.util.pushstream.PushStream;

/* loaded from: input_file:org/gecko/rsa/discovery/ma/repository/MessageAdapterEndpointRepository.class */
public class MessageAdapterEndpointRepository implements Closeable {
    private static final Logger logger = Logger.getLogger(MessageAdapterEndpointRepository.class.getName());
    private final MessagingService messaging;
    private final String frameworkUUID;
    private EndpointEventListener listener;
    private PushStream<Message> getSubscribe;
    private PushStream<Message> addSubscribe;
    private PushStream<Message> removeSubscribe;
    private PushStream<Message> modifySubscribe;
    private PushStream<Message> announceSubscribe;
    private final EndpointDescriptionConverter converter = new EndpointDescriptionConverter();
    private final EndPointDeSerializer deSerializer = new EndPointDeSerializer();
    private final Map<String, EndpointDescription> nodes = new ConcurrentHashMap();

    public MessageAdapterEndpointRepository(MessagingService messagingService, String str) {
        this.messaging = messagingService;
        this.frameworkUUID = str;
    }

    public MessageAdapterEndpointRepository(MessagingService messagingService, EndpointEventListener endpointEventListener, String str) {
        this.messaging = messagingService;
        this.frameworkUUID = str;
        this.listener = endpointEventListener;
    }

    public void setListener(EndpointEventListener endpointEventListener) {
        this.listener = endpointEventListener;
    }

    public void initialize() {
        try {
            this.addSubscribe = this.messaging.subscribe(MADiscoveryConstants.TOPIC_ADD_SERVICE);
            this.addSubscribe.forEach(this::notifyAdd);
        } catch (Exception e) {
            logger.log(Level.SEVERE, String.format("Error subscribing for topic ", MADiscoveryConstants.TOPIC_ADD_SERVICE), (Throwable) e);
        }
        try {
            this.removeSubscribe = this.messaging.subscribe(MADiscoveryConstants.TOPIC_REMOVE_SERVICE);
            this.removeSubscribe.forEach(this::notifyRemove);
        } catch (Exception e2) {
            logger.log(Level.SEVERE, String.format("Error subscribing for topic ", MADiscoveryConstants.TOPIC_REMOVE_SERVICE), (Throwable) e2);
        }
        try {
            this.modifySubscribe = this.messaging.subscribe(MADiscoveryConstants.TOPIC_MODIFY_SERVICE);
            this.modifySubscribe.forEach(this::notifyModify);
        } catch (Exception e3) {
            logger.log(Level.SEVERE, String.format("Error subscribing for topic ", MADiscoveryConstants.TOPIC_MODIFY_SERVICE), (Throwable) e3);
        }
        try {
            this.announceSubscribe = this.messaging.subscribe(String.format(MADiscoveryConstants.TOPIC_ANNOUNCED_SERVICE, this.frameworkUUID));
            this.announceSubscribe.forEach(this::notifyModify);
        } catch (Exception e4) {
            logger.log(Level.SEVERE, String.format("Error subscribing for topic ", MADiscoveryConstants.TOPIC_MODIFY_SERVICE), (Throwable) e4);
        }
        try {
            this.getSubscribe = this.messaging.subscribe(MADiscoveryConstants.TOPIC_GET_DESCRIPTION);
            this.getSubscribe.forEach(this::publishAllDescriptions);
        } catch (Exception e5) {
            logger.log(Level.SEVERE, String.format("Error subscribing for topic ", MADiscoveryConstants.TOPIC_GET_DESCRIPTION), (Throwable) e5);
        }
    }

    public void add(org.osgi.service.remoteserviceadmin.EndpointDescription endpointDescription) throws InterruptedException {
        EndpointDescription caseOSGiEndpointDescription = this.converter.caseOSGiEndpointDescription(endpointDescription);
        String key = getKey(caseOSGiEndpointDescription);
        if (key == null) {
            logger.severe(String.format("Exporting endpoint failed because of missing endpoint.id. Endpoint: %s, Key: %s" + caseOSGiEndpointDescription, endpointDescription, key));
            return;
        }
        this.nodes.put(key, caseOSGiEndpointDescription);
        publishEndpointDescription(caseOSGiEndpointDescription, MADiscoveryConstants.TOPIC_ADD_SERVICE);
        logger.fine(String.format("Exporting endpoint via message adapter. Endpoint: %s, Key: %s", endpointDescription, key));
    }

    public void modify(org.osgi.service.remoteserviceadmin.EndpointDescription endpointDescription) throws InterruptedException {
        EndpointDescription caseOSGiEndpointDescription = this.converter.caseOSGiEndpointDescription(endpointDescription);
        String key = getKey(caseOSGiEndpointDescription);
        if (key == null) {
            logger.severe(String.format("Exporting changed endpoint failed because of missing endpoint.id. Endpoint: %s, Key: %s" + caseOSGiEndpointDescription, endpointDescription, key));
            return;
        }
        this.nodes.put(key, caseOSGiEndpointDescription);
        publishEndpointDescription(caseOSGiEndpointDescription, MADiscoveryConstants.TOPIC_MODIFY_SERVICE);
        logger.log(Level.INFO, String.format("Exporting changed endpoint via message adapter. Endpoint: %s, Key: %s", endpointDescription, key));
    }

    public void remove(org.osgi.service.remoteserviceadmin.EndpointDescription endpointDescription) throws InterruptedException {
        EndpointDescription caseOSGiEndpointDescription = this.converter.caseOSGiEndpointDescription(endpointDescription);
        String key = getKey(caseOSGiEndpointDescription);
        if (key == null) {
            logger.severe(String.format("Export remove endpoint failed because of missing endpoint.id. Endpoint: %s, Key: %s" + caseOSGiEndpointDescription, endpointDescription, key));
            return;
        }
        EndpointDescription remove = this.nodes.remove(getKey(caseOSGiEndpointDescription));
        publishEndpointDescription(remove, MADiscoveryConstants.TOPIC_REMOVE_SERVICE);
        logger.info(String.format("Export remove endpoint via message adapter. Endpoint: %s, Path: %s removed: " + remove, endpointDescription, ""));
    }

    public Collection<EndpointDescription> getAll() {
        return this.nodes.values();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.addSubscribe.close();
        this.removeSubscribe.close();
        this.modifySubscribe.close();
        this.announceSubscribe.close();
        this.getSubscribe.close();
        this.nodes.clear();
    }

    public static List<String> removeEmpty(List<String> list) {
        ArrayList arrayList = new ArrayList();
        if (list == null) {
            return arrayList;
        }
        for (String str : list) {
            if (str != null && !str.isEmpty()) {
                arrayList.add(str);
            }
        }
        return arrayList;
    }

    private String getKey(EndpointDescription endpointDescription) {
        return (String) endpointDescription.getProperty().stream().filter(property -> {
            return property.getName().equals("endpoint.id");
        }).findFirst().map(property2 -> {
            return property2.getValue();
        }).orElse(null);
    }

    private void publishEndpointDescription(EndpointDescriptions endpointDescriptions, String str) {
        try {
            this.messaging.publish(str, ByteBuffer.wrap(((ByteArrayOutputStream) this.deSerializer.serialize((EObject) endpointDescriptions).getValue()).toByteArray()));
        } catch (Exception e) {
            logger.log(Level.SEVERE, String.format("Error publishing payload for endpoint to topic %s", str), (Throwable) e);
        }
    }

    private void publishEndpointDescription(EndpointDescription endpointDescription, String str) {
        EndpointDescriptions createEndpointDescriptions = RSAFactory.eINSTANCE.createEndpointDescriptions();
        createEndpointDescriptions.getEndpointDescription().add(endpointDescription);
        publishEndpointDescription(createEndpointDescriptions, str);
    }

    private void publishAllDescriptions(Message message) {
        String str = new String(message.payload().array());
        if (this.frameworkUUID == null || this.frameworkUUID.equalsIgnoreCase(str)) {
            logger.fine("Received a get all request for this OSGi framework (Requesting ourself). It can be ignored.");
            return;
        }
        String format = String.format(MADiscoveryConstants.TOPIC_ANNOUNCED_SERVICE, str);
        Collection<EndpointDescription> all = getAll();
        if (all.isEmpty()) {
            return;
        }
        EndpointDescriptions createEndpointDescriptions = RSAFactory.eINSTANCE.createEndpointDescriptions();
        createEndpointDescriptions.getEndpointDescription().addAll(all);
        publishEndpointDescription(createEndpointDescriptions, format);
    }

    private void notifyAdd(Message message) {
        if (this.listener == null) {
            logger.warning("Cannot consume add message with end-point description because no listener is configured");
            return;
        }
        try {
            for (EndpointDescription endpointDescription : ((EndpointDescriptions) this.deSerializer.deserialize(new ByteArrayInputStream(message.payload().array())).getValue()).getEndpointDescription()) {
                String key = getKey(endpointDescription);
                if (this.nodes.containsKey(key)) {
                    logger.info("Detected and new incoming endpoint, that already exists. Ignore it");
                } else {
                    org.osgi.service.remoteserviceadmin.EndpointDescription endpointDescription2 = (org.osgi.service.remoteserviceadmin.EndpointDescription) this.converter.doSwitch(endpointDescription);
                    this.nodes.put(key, endpointDescription);
                    this.listener.endpointChanged(new EndpointEvent(1, endpointDescription2), (String) null);
                    logger.info("Detected and incoming endpoint andsent 'ADD' event");
                }
            }
        } catch (InterruptedException | InvocationTargetException e) {
            logger.log(Level.SEVERE, "Detected an error for an incoming endpoint. Cannot send ADD event", e);
        }
    }

    private void notifyModify(Message message) {
        if (this.listener == null) {
            logger.warning("Cannot consume modify message with end-point description because no listener is configured");
            return;
        }
        try {
            for (EndpointDescription endpointDescription : ((EndpointDescriptions) this.deSerializer.deserialize(new ByteArrayInputStream(message.payload().array())).getValue()).getEndpointDescription()) {
                String key = getKey(endpointDescription);
                org.osgi.service.remoteserviceadmin.EndpointDescription endpointDescription2 = (org.osgi.service.remoteserviceadmin.EndpointDescription) this.converter.doSwitch(endpointDescription);
                int i = this.nodes.put(key, endpointDescription) != null ? 4 : 1;
                this.listener.endpointChanged(new EndpointEvent(i, endpointDescription2), (String) null);
                Logger logger2 = logger;
                Object[] objArr = new Object[1];
                objArr[0] = i == 1 ? "ADD" : "MODIFY";
                logger2.info(String.format("Detected and incoming endpoint sent '%s' event", objArr));
            }
        } catch (InterruptedException | InvocationTargetException e) {
            logger.log(Level.SEVERE, "Detected an error for a modified endpoint. Cannot send MODIFY or ADD event", e);
        }
    }

    private void notifyRemove(Message message) {
        if (this.listener == null) {
            logger.warning("Cannot consume remove message with end-point description because no listener is configured");
            return;
        }
        try {
            Iterator it = ((EndpointDescriptions) this.deSerializer.deserialize(new ByteArrayInputStream(message.payload().array())).getValue()).getEndpointDescription().iterator();
            while (it.hasNext()) {
                EObject eObject = (EndpointDescription) this.nodes.remove(getKey((EndpointDescription) it.next()));
                if (eObject != null) {
                    this.listener.endpointChanged(new EndpointEvent(2, (org.osgi.service.remoteserviceadmin.EndpointDescription) this.converter.doSwitch(eObject)), (String) null);
                    logger.info("Detected and incoming endpoint sent REMOVE event");
                } else {
                    logger.info("No endpoint was found, that could be removed");
                }
            }
        } catch (InterruptedException | InvocationTargetException e) {
            logger.log(Level.SEVERE, "Detected an error for a remove endpoint. Cannot send REOMVE event", e);
        }
    }

    public void initializeTopologyManager() {
        try {
            this.messaging.publish(MADiscoveryConstants.TOPIC_GET_DESCRIPTION, ByteBuffer.wrap(this.frameworkUUID.getBytes()));
        } catch (Exception e) {
            logger.log(Level.SEVERE, String.format("Error sending intialize get all request for topic ", MADiscoveryConstants.TOPIC_GET_DESCRIPTION), (Throwable) e);
        }
    }
}
