/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.sensinact.gateway.southbound.mqtt.test;

import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.sensinact.gateway.southbound.mqtt.api.IMqttMessage;
import org.eclipse.sensinact.gateway.southbound.mqtt.api.IMqttMessageListener;
import org.eclipse.sensinact.gateway.southbound.mqtt.impl.MqttClientConfiguration;
import org.eclipse.sensinact.gateway.southbound.mqtt.impl.MqttClientHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class MqttDelayedStartTest {
    private MqttClient client;
    private MqttClientHandler handler;
    private Server server;

    @BeforeEach
    void setupHandlers() throws Exception {
        this.handler = new MqttClientHandler();
    }

    private MqttClientConfiguration getConfig() {
        MqttClientConfiguration mock = (MqttClientConfiguration)Mockito.mock(MqttClientConfiguration.class);
        Mockito.when((Object)mock.id()).thenReturn((Object)"id1");
        Mockito.when((Object)mock.host()).thenReturn((Object)"127.0.0.1");
        Mockito.when((Object)mock.port()).thenReturn((Object)2183);
        Mockito.when((Object)mock.topics()).thenReturn((Object)new String[]{"sensinact/mqtt/test1/+"});
        Mockito.when((Object)mock.client_reconnect_delay()).thenReturn((Object)1000);
        return mock;
    }

    void startServerAndLocalClient() throws Exception {
        this.server = new Server();
        MemoryConfig config = new MemoryConfig(new Properties());
        config.setProperty("host", "127.0.0.1");
        config.setProperty("port", "2183");
        this.server.startServer((IConfig)config);
        this.client = new MqttClient("tcp://127.0.0.1:2183", MqttClient.generateClientId());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        this.client.connect(options);
    }

    @AfterEach
    void stop() throws Exception {
        try {
            MqttClient c = this.client;
            this.client = null;
            if (c != null) {
                c.disconnect();
                c.close();
            }
            MqttClientHandler h = this.handler;
            this.handler = null;
            if (h != null) {
                h.deactivate();
            }
        }
        finally {
            Server s = this.server;
            this.server = null;
            if (s != null) {
                s.stopServer();
            }
        }
    }

    private void doTest(BlockingQueue<IMqttMessage> receiveQueue) throws MqttException, MqttPersistenceException, InterruptedException {
        String content = "Hello";
        String topic = "sensinact/mqtt/test1/foo";
        IMqttMessage msg = null;
        for (int i = 0; i < 5 && msg == null; ++i) {
            this.client.publish(topic, content.getBytes(StandardCharsets.UTF_8), 1, false);
            msg = receiveQueue.poll(1L, TimeUnit.SECONDS);
        }
        Assertions.assertNotNull(msg);
        Assertions.assertEquals((Object)topic, (Object)msg.getTopic());
        Assertions.assertEquals((Object)content, (Object)new String(msg.getPayload(), StandardCharsets.UTF_8));
    }

    @Test
    void testMqttDelayedServerStart() throws Exception {
        this.handler.activate(this.getConfig());
        ArrayBlockingQueue<IMqttMessage> messages1 = new ArrayBlockingQueue<IMqttMessage>(32);
        IMqttMessageListener listener1 = (handler, topic, msg) -> messages1.add(msg);
        this.handler.addListener(listener1, Map.of("sensinact.mqtt.topics.filters", new String[]{"sensinact/mqtt/test1/foo", "sensinact/mqtt/test1/bar"}));
        this.startServerAndLocalClient();
        this.doTest(messages1);
    }

    @Test
    void testMqttEarlyHandlerRegDelayedServerStart() throws Exception {
        ArrayBlockingQueue<IMqttMessage> messages1 = new ArrayBlockingQueue<IMqttMessage>(32);
        IMqttMessageListener listener1 = (handler, topic, msg) -> messages1.add(msg);
        this.handler.addListener(listener1, Map.of("sensinact.mqtt.topics.filters", new String[]{"sensinact/mqtt/test1/foo", "sensinact/mqtt/test1/bar"}));
        this.handler.activate(this.getConfig());
        this.startServerAndLocalClient();
        this.doTest(messages1);
    }

    @Test
    void testMqttEarlyHandlerRegEarlyServerStart() throws Exception {
        this.startServerAndLocalClient();
        ArrayBlockingQueue<IMqttMessage> messages1 = new ArrayBlockingQueue<IMqttMessage>(32);
        IMqttMessageListener listener1 = (handler, topic, msg) -> messages1.add(msg);
        this.handler.addListener(listener1, Map.of("sensinact.mqtt.topics.filters", new String[]{"sensinact/mqtt/test1/foo", "sensinact/mqtt/test1/bar"}));
        this.handler.activate(this.getConfig());
        this.doTest(messages1);
    }

    @Test
    void testMqttEarlyServerStart() throws Exception {
        this.startServerAndLocalClient();
        this.handler.activate(this.getConfig());
        ArrayBlockingQueue<IMqttMessage> messages1 = new ArrayBlockingQueue<IMqttMessage>(32);
        IMqttMessageListener listener1 = (handler, topic, msg) -> messages1.add(msg);
        this.handler.addListener(listener1, Map.of("sensinact.mqtt.topics.filters", new String[]{"sensinact/mqtt/test1/foo", "sensinact/mqtt/test1/bar"}));
        this.doTest(messages1);
    }
}

