/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.sensinact.gateway.northbount.sensorthings.mqtt;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.moquette.broker.Server;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.messages.InterceptSubscribeMessage;
import io.moquette.interception.messages.InterceptUnsubscribeMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.eclipse.sensinact.core.command.GatewayThread;
import org.eclipse.sensinact.core.notification.AbstractResourceNotification;
import org.eclipse.sensinact.gateway.northbount.sensorthings.mqtt.SensorthingsMapper;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.typedevent.TypedEventHandler;
import org.osgi.service.typedevent.propertytypes.EventTopics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(service={TypedEventHandler.class}, configurationPid={"sensiNact.northbound.sensorthings.mqtt"}, configurationPolicy=ConfigurationPolicy.REQUIRE)
@EventTopics(value={"DATA/*", "LIFECYCLE/*", "METADATA/*"})
public class SensorthingsMqttNorthbound
extends AbstractInterceptHandler
implements TypedEventHandler<AbstractResourceNotification> {
    private static final Logger LOG = LoggerFactory.getLogger(SensorthingsMqttNorthbound.class);
    @Reference
    private GatewayThread gatewayThread;
    private final ObjectMapper mapper = ((JsonMapper.Builder)((JsonMapper.Builder)JsonMapper.builder().addModule((Module)new JavaTimeModule())).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)).build();
    private final Object lock = new Object();
    private final Map<String, Integer> subscriptionCounts = new HashMap<String, Integer>();
    private final Map<String, SensorthingsMapper<?>> subscriptions = new HashMap();
    private Server mqttBroker;

    @Activate
    void start(Config config) throws IOException {
        this.mqttBroker = new Server();
        Properties props = new Properties();
        props.setProperty("host", config.host());
        if (config.port() >= 0) {
            props.setProperty("port", String.valueOf(config.port()));
        }
        if (!config.keystore_file().isBlank()) {
            props.setProperty("ssl_port", String.valueOf(config.secure_port()));
            if (config.websocket_enable()) {
                props.setProperty("secure_websocket_port", String.valueOf(config.websocket_secure_port()));
            }
            props.setProperty("jks_path", config.keystore_file());
            props.setProperty("key_store_type", config.keystore_type());
            props.setProperty("key_store_password", config._keystore_password());
            props.setProperty("key_manager_password", config._keymanager_password());
        }
        if (config.websocket_enable() && config.websocket_port() >= 0) {
            props.setProperty("websocket_port", String.valueOf(config.websocket_port()));
        }
        MemoryConfig serverConfig = new MemoryConfig(props);
        this.mqttBroker.startServer(serverConfig, List.of(this));
    }

    @Deactivate
    void stop(Config config) {
        this.mqttBroker.stopServer();
        this.mqttBroker = null;
    }

    @Override
    public String getID() {
        return "Eclipse sensiNact Sensorthings subscription listener";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onSubscribe(InterceptSubscribeMessage msg) {
        String topicFilter = msg.getTopicFilter();
        if (topicFilter.indexOf(43) != -1 || topicFilter.indexOf(35) != -1) {
            LOG.warn("The topic filter {} contains wildcards which is not supported. It will be ignored");
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.subscriptionCounts.merge(topicFilter, 1, (a, b) -> a + b) == 1) {
                this.subscriptions.put(topicFilter, SensorthingsMapper.create(topicFilter, this.mapper, this.gatewayThread));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onUnsubscribe(InterceptUnsubscribeMessage msg) {
        String topicFilter = msg.getTopicFilter();
        Object object = this.lock;
        synchronized (object) {
            if (this.subscriptionCounts.computeIfPresent(topicFilter, (k, v) -> v == 1 ? null : Integer.valueOf(v - 1)) == null) {
                this.subscriptions.remove(topicFilter);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notify(String topic, AbstractResourceNotification event) {
        List<SensorthingsMapper<?>> list;
        Object object = this.lock;
        synchronized (object) {
            list = List.copyOf(this.subscriptions.values());
        }
        list.forEach(s -> s.toPayload(event).onSuccess(l -> l.forEach(o -> this.notifyListeners(s.getTopicFilter(), o))));
    }

    private void notifyListeners(String topic, Object data) {
        try {
            ByteBuf payload = Unpooled.wrappedBuffer((byte[])this.mapper.writeValueAsBytes(data));
            MqttPublishMessage message = MqttMessageBuilders.publish().topicName(topic).qos(MqttQoS.AT_MOST_ONCE).retained(false).payload(payload).build();
            this.mqttBroker.internalPublish(message, "sensinact.sensorthings");
        }
        catch (JsonProcessingException e) {
            LOG.warn("An error occurred creating a notification for topic {}", (Object)topic, (Object)e);
        }
    }

    public static @interface Config {
        public String host() default "0.0.0.0";

        public int port() default 1883;

        public int secure_port() default 8883;

        public boolean websocket_enable() default true;

        public int websocket_port() default 8884;

        public int websocket_secure_port() default 8885;

        public String keystore_file() default "";

        public String keystore_type() default "jks";

        public String _keystore_password() default "";

        public String _keymanager_password() default "";
    }
}

