/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.sensinact.gateway.southbound.mqtt.impl;

import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.function.Predicate;
import javax.net.SocketFactory;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.sensinact.gateway.southbound.mqtt.api.IMqttMessageListener;
import org.eclipse.sensinact.gateway.southbound.mqtt.impl.MqttClientConfiguration;
import org.eclipse.sensinact.gateway.southbound.mqtt.impl.SSLUtils;
import org.eclipse.sensinact.gateway.southbound.mqtt.impl.SensiNactMqttMessage;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(service={}, configurationPid={"sensinact.southbound.mqtt"}, configurationPolicy=ConfigurationPolicy.REQUIRE)
public class MqttClientHandler
implements MqttCallback {
    private static final Logger logger = LoggerFactory.getLogger(MqttClientHandler.class);
    private String handlerId;
    private MqttClient client;
    private int reconnectDelayMs;
    private Timer reconnectTimer;
    private Map<IMqttMessageListener, Predicate<String>> listeners = Collections.synchronizedMap(new IdentityHashMap());
    private String[] topics;
    private String clientId;
    private MqttConnectOptions connectOptions;

    @Reference(cardinality=ReferenceCardinality.MULTIPLE, policy=ReferencePolicy.DYNAMIC)
    public void addListener(IMqttMessageListener listener, Map<String, Object> svcProps) {
        String[] filters = this.getArrayProperty(svcProps.get("sensinact.mqtt.topics.filters"));
        this.listeners.put(listener, str -> this.serviceMatchesTopic((String)str, filters));
    }

    public void removeListener(IMqttMessageListener listener) {
        this.listeners.remove(listener);
    }

    @Activate
    public void activate(MqttClientConfiguration config) throws Exception {
        this.reconnectDelayMs = config.client_reconnect_delay();
        if (this.reconnectDelayMs < 100) {
            this.reconnectDelayMs = 100;
        } else if ((long)this.reconnectDelayMs > Duration.ofHours(1L).toMillis()) {
            this.reconnectDelayMs = (int)Duration.ofHours(1L).toMillis();
        }
        String broker = this.makeBrokerUri(config);
        this.clientId = this.makeClientId(config);
        this.topics = config.topics();
        if (this.topics == null || this.topics.length == 0) {
            logger.error("No topic to subscribe to");
            throw new IllegalArgumentException("No MQTT topic given");
        }
        String configId = config.id();
        this.handlerId = configId == null || configId.isBlank() ? UUID.randomUUID().toString() : configId;
        this.connectOptions = this.setupOptions(config);
        logger.debug("Connecting MQTT client with ID {}", (Object)this.clientId);
        this.client = new MqttClient(broker, this.clientId);
        this.client.setCallback((MqttCallback)this);
        this.client.setManualAcks(true);
        try {
            this.client.connect(this.connectOptions);
        }
        catch (MqttException e) {
            if (e.getCause() instanceof ConnectException) {
                this.connectionLost(e);
                logger.warn("MQTT client {} started, but currently unconnected", (Object)this.clientId);
                return;
            }
            throw e;
        }
        this.subscribe();
        logger.info("MQTT client {} started", (Object)this.clientId);
    }

    private void subscribe() {
        for (String topic : this.topics) {
            logger.debug("Subscribing MQTT client {} to topic: {}", (Object)this.clientId, (Object)topic);
            try {
                this.client.subscribe(topic);
            }
            catch (MqttException e) {
                logger.error("MQTT Client {} is unable to subscribe to topic {}", (Object)this.clientId, (Object)topic);
            }
        }
    }

    @Deactivate
    public void deactivate() throws Exception {
        if (this.client != null) {
            if (this.client.isConnected()) {
                this.client.disconnect();
            }
            this.client.close();
            logger.info("MQTT client {} stopped", (Object)this.client.getClientId());
            this.client = null;
        }
        this.handlerId = null;
    }

    private MqttConnectOptions setupOptions(MqttClientConfiguration config) throws Exception {
        String userPass;
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setConnectionTimeout(config.client_connection_timeout());
        String userName = config.user();
        if (userName != null && !userName.isBlank()) {
            logger.debug("Connecting MQTT with authentication");
            options.setUserName(userName);
        }
        if ((userPass = config._password()) != null) {
            options.setPassword(userPass.toCharArray());
        }
        if (config.auth_keystore_path() != null || config.auth_clientcert_path() != null) {
            options.setSocketFactory((SocketFactory)SSLUtils.setupSSLSocketFactory(config));
        }
        return options;
    }

    private String makeBrokerUri(MqttClientConfiguration config) throws URISyntaxException {
        String mqttHost = config.host();
        if (mqttHost == null || mqttHost.isEmpty()) {
            logger.error("No MQTT host given");
            throw new IllegalArgumentException("No MQTT host given");
        }
        boolean clientAuth = config.auth_keystore_path() != null || config.auth_clientcert_path() != null;
        String protocol = config.protocol();
        if (protocol == null || protocol.isBlank() || clientAuth && protocol.equalsIgnoreCase("tcp")) {
            protocol = clientAuth ? "ssl" : "tcp";
        }
        return new URI(protocol, null, mqttHost, config.port(), null, null, null).toString();
    }

    private String makeClientId(MqttClientConfiguration config) {
        String givenId = config.client_id();
        if (givenId == null || givenId.isEmpty()) {
            return MqttClient.generateClientId();
        }
        return givenId;
    }

    public void connectionLost(Throwable cause) {
        logger.warn("Connection to MQTT broker lost: {}. Waiting before reconnecting.", (Object)cause.getMessage());
        if (this.reconnectTimer != null) {
            this.reconnectTimer.cancel();
            this.reconnectTimer = null;
        }
        this.reconnectTimer = new Timer();
        this.reconnectTimer.schedule(new TimerTask(){

            @Override
            public void run() {
                try {
                    MqttClientHandler.this.client.connect(MqttClientHandler.this.connectOptions);
                }
                catch (MqttException e) {
                    if (e.getCause() instanceof ConnectException) {
                        logger.error("Error trying to reconnect to MQTT broker: {}", (Object)e.getMessage(), (Object)e);
                        MqttClientHandler.this.connectionLost(e);
                    } else {
                        logger.error("Fatal error trying to reconnect to MQTT broker: {}. No further reconnection will be attempted", (Object)e.getMessage(), (Object)e);
                    }
                    return;
                }
                MqttClientHandler.this.subscribe();
            }
        }, this.reconnectDelayMs);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        IdentityHashMap<IMqttMessageListener, Predicate<String>> workListeners;
        if (message.isDuplicate()) {
            logger.warn("Ignoring duplicate MQTT message on topic=[{}]: {}", (Object)topic, (Object)message);
            this.client.messageArrivedComplete(message.getId(), message.getQos());
            return;
        }
        SensiNactMqttMessage snMessage = new SensiNactMqttMessage(this.handlerId, topic, message);
        this.client.messageArrivedComplete(message.getId(), message.getQos());
        Map<IMqttMessageListener, Predicate<String>> map = this.listeners;
        synchronized (map) {
            workListeners = new IdentityHashMap<IMqttMessageListener, Predicate<String>>(this.listeners);
        }
        for (Map.Entry entry : workListeners.entrySet()) {
            if (!((Predicate)entry.getValue()).test(topic)) continue;
            try {
                ((IMqttMessageListener)entry.getKey()).onMqttMessage(this.handlerId, topic, snMessage);
            }
            catch (Throwable t) {
                logger.error("Error handling MQTT message. Client={}, topic={}, error={}", new Object[]{this.handlerId, topic, t.getMessage(), t});
            }
        }
    }

    private boolean serviceMatchesTopic(String topic, String[] filters) {
        if (filters == null || filters.length == 0) {
            return false;
        }
        for (String filter : filters) {
            if (!MqttTopic.isMatched((String)filter, (String)topic)) continue;
            return true;
        }
        return false;
    }

    private String[] getArrayProperty(Object value) {
        if (value == null) {
            return null;
        }
        if (value instanceof String[]) {
            return (String[])value;
        }
        if (value instanceof String) {
            return ((String)value).split("[,;]");
        }
        return null;
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
    }
}

