package org.eclipse.sensinact.gateway.southbound.mqtt.impl;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.function.Predicate;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.sensinact.gateway.southbound.mqtt.api.IMqttMessageListener;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(service = {}, configurationPid = {"sensinact.southbound.mqtt"}, configurationPolicy = ConfigurationPolicy.REQUIRE)
/* loaded from: input_file:org/eclipse/sensinact/gateway/southbound/mqtt/impl/MqttClientHandler.class */
public class MqttClientHandler implements MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(MqttClientHandler.class);
    private String handlerId;
    private MqttClient client;
    private int reconnectDelayMs;
    private Timer reconnectTimer;
    private Map<IMqttMessageListener, Predicate<String>> listeners = Collections.synchronizedMap(new IdentityHashMap());

    @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC)
    public void addListener(IMqttMessageListener iMqttMessageListener, Map<String, Object> map) {
        String[] arrayProperty = getArrayProperty(map.get(IMqttMessageListener.MQTT_TOPICS_FILTERS));
        this.listeners.put(iMqttMessageListener, str -> {
            return serviceMatchesTopic(str, arrayProperty);
        });
    }

    public void removeListener(IMqttMessageListener iMqttMessageListener) {
        this.listeners.remove(iMqttMessageListener);
    }

    @Activate
    public void activate(MqttClientConfiguration mqttClientConfiguration) throws Exception {
        String makeBrokerUri = makeBrokerUri(mqttClientConfiguration);
        String makeClientId = makeClientId(mqttClientConfiguration);
        String[] strArr = mqttClientConfiguration.topics();
        if (strArr == null || strArr.length == 0) {
            logger.error("No topic to subscribe to");
            throw new IllegalArgumentException("No MQTT topic given");
        }
        String id = mqttClientConfiguration.id();
        if (id == null || id.isBlank()) {
            this.handlerId = UUID.randomUUID().toString();
        } else {
            this.handlerId = id;
        }
        MqttConnectOptions mqttConnectOptions = setupOptions(mqttClientConfiguration);
        logger.debug("Connecting MQTT client with ID {}", makeClientId);
        this.client = new MqttClient(makeBrokerUri, makeClientId);
        this.client.setCallback(this);
        this.client.setManualAcks(true);
        this.client.connect(mqttConnectOptions);
        for (String str : strArr) {
            logger.debug("Subscribing MQTT client {} to topic: {}", makeClientId, str);
            this.client.subscribe(str);
        }
        logger.info("MQTT client {} started", makeClientId);
    }

    @Deactivate
    public void deactivate() throws Exception {
        if (this.client != null) {
            if (this.client.isConnected()) {
                this.client.disconnect();
            }
            this.client.close();
            logger.info("MQTT client {} stopped", this.client.getClientId());
            this.client = null;
        }
        this.handlerId = null;
    }

    private MqttConnectOptions setupOptions(MqttClientConfiguration mqttClientConfiguration) throws Exception {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setConnectionTimeout(mqttClientConfiguration.client_connection_timeout());
        String user = mqttClientConfiguration.user();
        if (user != null && !user.isBlank()) {
            logger.debug("Connecting MQTT with authentication");
            mqttConnectOptions.setUserName(user);
        }
        String _password = mqttClientConfiguration._password();
        if (_password != null) {
            mqttConnectOptions.setPassword(_password.toCharArray());
        }
        if (mqttClientConfiguration.auth_keystore_path() != null || mqttClientConfiguration.auth_clientcert_path() != null) {
            mqttConnectOptions.setSocketFactory(SSLUtils.setupSSLSocketFactory(mqttClientConfiguration));
        }
        return mqttConnectOptions;
    }

    private String makeBrokerUri(MqttClientConfiguration mqttClientConfiguration) throws URISyntaxException {
        String host = mqttClientConfiguration.host();
        if (host == null || host.isEmpty()) {
            logger.error("No MQTT host given");
            throw new IllegalArgumentException("No MQTT host given");
        }
        boolean z = (mqttClientConfiguration.auth_keystore_path() == null && mqttClientConfiguration.auth_clientcert_path() == null) ? false : true;
        String protocol = mqttClientConfiguration.protocol();
        if (protocol == null || protocol.isBlank() || (z && protocol.equalsIgnoreCase("tcp"))) {
            protocol = z ? "ssl" : "tcp";
        }
        return new URI(protocol, null, host, mqttClientConfiguration.port(), null, null, null).toString();
    }

    private String makeClientId(MqttClientConfiguration mqttClientConfiguration) {
        String client_id = mqttClientConfiguration.client_id();
        return (client_id == null || client_id.isEmpty()) ? MqttClient.generateClientId() : client_id;
    }

    public void connectionLost(Throwable th) {
        logger.warn("Connection to MQTT broker lost: {}. Waiting before reconnecting.", th.getMessage());
        if (this.reconnectTimer != null) {
            this.reconnectTimer.cancel();
            this.reconnectTimer = null;
        }
        this.reconnectTimer = new Timer();
        this.reconnectTimer.schedule(new TimerTask() { // from class: org.eclipse.sensinact.gateway.southbound.mqtt.impl.MqttClientHandler.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    MqttClientHandler.this.client.reconnect();
                } catch (MqttException e) {
                    MqttClientHandler.logger.error("Error trying to reconnect to MQTT broker: {}", e.getMessage(), e);
                    MqttClientHandler.this.connectionLost(e);
                }
            }
        }, this.reconnectDelayMs);
    }

    public void messageArrived(String str, MqttMessage mqttMessage) throws Exception {
        IdentityHashMap identityHashMap;
        if (mqttMessage.isDuplicate()) {
            logger.warn("Ignoring duplicate MQTT message on topic=[{}]: {}", str, mqttMessage);
            this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
            return;
        }
        SensiNactMqttMessage sensiNactMqttMessage = new SensiNactMqttMessage(this.handlerId, str, mqttMessage);
        this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
        synchronized (this.listeners) {
            identityHashMap = new IdentityHashMap(this.listeners);
        }
        for (Map.Entry entry : identityHashMap.entrySet()) {
            if (((Predicate) entry.getValue()).test(str)) {
                try {
                    ((IMqttMessageListener) entry.getKey()).onMqttMessage(this.handlerId, str, sensiNactMqttMessage);
                } catch (Throwable th) {
                    logger.error("Error handling MQTT message. Client={}, topic={}, error={}", new Object[]{this.handlerId, str, th.getMessage(), th});
                }
            }
        }
    }

    private boolean serviceMatchesTopic(String str, String[] strArr) {
        if (strArr == null || strArr.length == 0) {
            return false;
        }
        for (String str2 : strArr) {
            if (MqttTopic.isMatched(str2, str)) {
                return true;
            }
        }
        return false;
    }

    private String[] getArrayProperty(Object obj) {
        if (obj == null) {
            return null;
        }
        if (obj instanceof String[]) {
            return (String[]) obj;
        }
        if (obj instanceof String) {
            return ((String) obj).split("[,;]");
        }
        return null;
    }

    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }
}
