package org.gecko.adapter.mqtt.common;

import java.util.UUID;
import org.gecko.adapter.mqtt.MQTTContext;
import org.gecko.adapter.mqtt.MqttConfig;
import org.gecko.adapter.mqtt.QoS;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingContext;
import org.gecko.util.pushstream.PushStreamHelper;
import org.osgi.util.promise.Promise;
import org.osgi.util.pushstream.PushEventConsumer;
import org.osgi.util.pushstream.SimplePushEventSource;

/* loaded from: input_file:org/gecko/adapter/mqtt/common/MqttPushEventSource.class */
public class MqttPushEventSource implements SimplePushEventSource<Message> {
    private String topic;
    private SimplePushEventSource<Message> source;
    private GeckoMqttClient mqttClient;
    private int qos;
    private String id;
    private MqttConfig config;
    private MqttClientFactory<GeckoMqttClient> clientFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public MqttPushEventSource(String str, MessagingContext messagingContext, MqttConfig mqttConfig, MqttClientFactory<GeckoMqttClient> mqttClientFactory) {
        this.topic = str;
        this.config = mqttConfig;
        this.clientFactory = mqttClientFactory;
        this.id = UUID.randomUUID() + "-" + str;
        QoS qoS = QoS.AT_LEAST_ONE;
        if (messagingContext != null && (messagingContext instanceof MQTTContext)) {
            MQTTContext mQTTContext = (MQTTContext) messagingContext;
            if (mQTTContext.getQoS() != null) {
                qoS = mQTTContext.getQoS();
            }
        }
        this.qos = qoS.ordinal();
        this.source = PushStreamHelper.createSimpleEventSource(Message.class, messagingContext);
        this.source.connectPromise().onResolve(this::initMQTTClient);
    }

    public AutoCloseable open(PushEventConsumer<? super Message> pushEventConsumer) throws Exception {
        return this.source.open(pushEventConsumer);
    }

    public void close() {
        this.source.close();
    }

    public void publish(Message message) {
        this.source.publish(message);
    }

    public void endOfStream() {
        this.source.endOfStream();
    }

    public void error(Throwable th) {
        this.source.error(th);
    }

    public boolean isConnected() {
        return this.source.isConnected();
    }

    public Promise<Void> connectPromise() {
        return this.source.connectPromise();
    }

    private void initMQTTClient() {
        this.mqttClient = this.clientFactory.createClient(this.config, this.id);
        this.mqttClient.subscribe(this.topic, this.qos, this);
    }
}
