package org.gecko.adapter.mqtt;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.gecko.core.pushstream.PushStreamHelper;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingContext;
import org.gecko.osgi.messaging.MessagingService;
import org.gecko.osgi.messaging.SimpleMessage;
import org.osgi.annotation.bundle.Capability;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.PushStreamProvider;
import org.osgi.util.pushstream.SimplePushEventSource;

@Capability(namespace = "osgi.message.adapter", name = "mqtt.adapter", version = "1.0.0", attribute = {"vendor=Gecko.io", "implementation=Paho"})
@Component(service = {MessagingService.class}, name = "MQTTService", configurationPolicy = ConfigurationPolicy.REQUIRE, immediate = true)
/* loaded from: input_file:org/gecko/adapter/mqtt/MQTTService.class */
public class MQTTService implements MessagingService, AutoCloseable, MqttCallback {
    private MqttClient mqtt;
    private PushStreamProvider provider = new PushStreamProvider();
    private Map<String, SimplePushEventSource<Message>> subscriptions = new ConcurrentHashMap();

    @ObjectClassDefinition
    /* loaded from: input_file:org/gecko/adapter/mqtt/MQTTService$MqttConfig.class */
    @interface MqttConfig {
        String brokerUrl();
    }

    public MQTTService() {
    }

    public MQTTService(MqttClient mqttClient) {
        this.mqtt = mqttClient;
        this.mqtt.setCallback(this);
    }

    @Activate
    void activate(MqttConfig mqttConfig, BundleContext bundleContext) throws Exception {
        try {
            this.mqtt = new MqttClient(mqttConfig.brokerUrl(), UUID.randomUUID().toString());
            this.mqtt.connect();
            this.mqtt.setCallback(this);
        } catch (Exception e) {
            System.err.println("Error connecting to MQTT broker " + mqttConfig.brokerUrl());
            throw e;
        }
    }

    @Deactivate
    void deactivate() throws Exception {
        close();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.mqtt != null) {
            this.mqtt.disconnect();
        }
        this.mqtt.close();
    }

    public void connectionLost(Throwable th) {
        th.printStackTrace();
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }

    public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
        Iterator<Map.Entry<String, SimplePushEventSource<Message>>> it = this.subscriptions.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, SimplePushEventSource<Message>> next = it.next();
            if (next.getKey().matches(str)) {
                SimplePushEventSource<Message> value = next.getValue();
                if (value.isConnected()) {
                    try {
                        value.publish(fromPahoMessage(mqttMessage, str));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else {
                    value.close();
                    it.remove();
                }
            }
        }
    }

    public PushStream<Message> subscribe(String str) throws Exception {
        return subscribe(str, new MQTTContextBuilder().withQoS(QoS.AT_LEAST_ONE).build());
    }

    public PushStream<Message> subscribe(String str, MessagingContext messagingContext) throws Exception {
        QoS qoS = QoS.AT_LEAST_ONE;
        if (messagingContext != null && (messagingContext instanceof MQTTContext)) {
            MQTTContext mQTTContext = (MQTTContext) messagingContext;
            if (mQTTContext.getQoS() != null) {
                qoS = mQTTContext.getQoS();
            }
        }
        try {
            this.mqtt.subscribe(str, qoS.ordinal());
            String replaceAll = str.replaceAll("\\*", "#");
            SimplePushEventSource<Message> simplePushEventSource = this.subscriptions.get(replaceAll);
            if (simplePushEventSource == null) {
                simplePushEventSource = (SimplePushEventSource) this.provider.buildSimpleEventSource(Message.class).build();
                this.subscriptions.put(replaceAll, simplePushEventSource);
            }
            return (PushStream) PushStreamHelper.configurePushStreamBuilder(simplePushEventSource, messagingContext).build();
        } catch (MqttException e) {
            throw new Exception(e.getMessage(), e);
        }
    }

    public void publish(String str, ByteBuffer byteBuffer) throws Exception {
        publish(str, byteBuffer, new MQTTContextBuilder().withQoS(QoS.AT_LEAST_ONE).build());
    }

    public void publish(String str, ByteBuffer byteBuffer, MessagingContext messagingContext) throws Exception {
        QoS qoS = QoS.AT_LEAST_ONE;
        boolean z = false;
        if (messagingContext != null && (messagingContext instanceof MQTTContext)) {
            MQTTContext mQTTContext = (MQTTContext) messagingContext;
            if (mQTTContext.getQoS() != null) {
                qoS = mQTTContext.getQoS();
            }
            z = mQTTContext.isRetained();
        }
        this.mqtt.publish(str, byteBuffer.array(), qoS.ordinal(), z);
    }

    public static Message fromPahoMessage(MqttMessage mqttMessage, String str) {
        return new SimpleMessage(str, ByteBuffer.wrap(mqttMessage.getPayload()));
    }
}
