package org.eclipse.sensinact.gateway.southbound.mqtt.test;

import io.moquette.broker.Server;
import io.moquette.broker.config.MemoryConfig;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.sensinact.gateway.southbound.mqtt.api.IMqttMessage;
import org.eclipse.sensinact.gateway.southbound.mqtt.impl.MqttClientConfiguration;
import org.eclipse.sensinact.gateway.southbound.mqtt.impl.MqttClientHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/eclipse/sensinact/gateway/southbound/mqtt/test/MqttDelayedStartTest.class */
public class MqttDelayedStartTest {
    private MqttClient client;
    private MqttClientHandler handler;
    private Server server;

    @BeforeEach
    void setupHandlers() throws Exception {
        this.handler = new MqttClientHandler();
    }

    private MqttClientConfiguration getConfig() {
        MqttClientConfiguration mqttClientConfiguration = (MqttClientConfiguration) Mockito.mock(MqttClientConfiguration.class);
        Mockito.when(mqttClientConfiguration.id()).thenReturn("id1");
        Mockito.when(mqttClientConfiguration.host()).thenReturn("127.0.0.1");
        Mockito.when(Integer.valueOf(mqttClientConfiguration.port())).thenReturn(2183);
        Mockito.when(mqttClientConfiguration.topics()).thenReturn(new String[]{"sensinact/mqtt/test1/+"});
        Mockito.when(Integer.valueOf(mqttClientConfiguration.client_reconnect_delay())).thenReturn(1000);
        return mqttClientConfiguration;
    }

    void startServerAndLocalClient() throws Exception {
        this.server = new Server();
        MemoryConfig memoryConfig = new MemoryConfig(new Properties());
        memoryConfig.setProperty("host", "127.0.0.1");
        memoryConfig.setProperty("port", "2183");
        this.server.startServer(memoryConfig);
        this.client = new MqttClient("tcp://127.0.0.1:2183", MqttClient.generateClientId());
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        this.client.connect(mqttConnectOptions);
    }

    @AfterEach
    void stop() throws Exception {
        try {
            MqttClient mqttClient = this.client;
            this.client = null;
            if (mqttClient != null) {
                mqttClient.disconnect();
                mqttClient.close();
            }
            MqttClientHandler mqttClientHandler = this.handler;
            this.handler = null;
            if (mqttClientHandler != null) {
                mqttClientHandler.deactivate();
            }
        } finally {
            Server server = this.server;
            this.server = null;
            if (server != null) {
                server.stopServer();
            }
        }
    }

    private void doTest(BlockingQueue<IMqttMessage> blockingQueue) throws MqttException, MqttPersistenceException, InterruptedException {
        IMqttMessage iMqttMessage = null;
        for (int i = 0; i < 5 && iMqttMessage == null; i++) {
            this.client.publish("sensinact/mqtt/test1/foo", "Hello".getBytes(StandardCharsets.UTF_8), 1, false);
            iMqttMessage = blockingQueue.poll(1L, TimeUnit.SECONDS);
        }
        Assertions.assertNotNull(iMqttMessage);
        Assertions.assertEquals("sensinact/mqtt/test1/foo", iMqttMessage.getTopic());
        Assertions.assertEquals("Hello", new String(iMqttMessage.getPayload(), StandardCharsets.UTF_8));
    }

    @Test
    void testMqttDelayedServerStart() throws Exception {
        this.handler.activate(getConfig());
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(32);
        this.handler.addListener((str, str2, iMqttMessage) -> {
            arrayBlockingQueue.add(iMqttMessage);
        }, Map.of("sensinact.mqtt.topics.filters", new String[]{"sensinact/mqtt/test1/foo", "sensinact/mqtt/test1/bar"}));
        startServerAndLocalClient();
        doTest(arrayBlockingQueue);
    }

    @Test
    void testMqttEarlyHandlerRegDelayedServerStart() throws Exception {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(32);
        this.handler.addListener((str, str2, iMqttMessage) -> {
            arrayBlockingQueue.add(iMqttMessage);
        }, Map.of("sensinact.mqtt.topics.filters", new String[]{"sensinact/mqtt/test1/foo", "sensinact/mqtt/test1/bar"}));
        this.handler.activate(getConfig());
        startServerAndLocalClient();
        doTest(arrayBlockingQueue);
    }

    @Test
    void testMqttEarlyHandlerRegEarlyServerStart() throws Exception {
        startServerAndLocalClient();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(32);
        this.handler.addListener((str, str2, iMqttMessage) -> {
            arrayBlockingQueue.add(iMqttMessage);
        }, Map.of("sensinact.mqtt.topics.filters", new String[]{"sensinact/mqtt/test1/foo", "sensinact/mqtt/test1/bar"}));
        this.handler.activate(getConfig());
        doTest(arrayBlockingQueue);
    }

    @Test
    void testMqttEarlyServerStart() throws Exception {
        startServerAndLocalClient();
        this.handler.activate(getConfig());
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(32);
        this.handler.addListener((str, str2, iMqttMessage) -> {
            arrayBlockingQueue.add(iMqttMessage);
        }, Map.of("sensinact.mqtt.topics.filters", new String[]{"sensinact/mqtt/test1/foo", "sensinact/mqtt/test1/bar"}));
        doTest(arrayBlockingQueue);
    }
}
