package org.eclipse.sensinact.gateway.southbound.mqtt.test;

import io.moquette.broker.Server;
import io.moquette.broker.config.MemoryConfig;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.sensinact.gateway.southbound.mqtt.api.IMqttMessage;
import org.eclipse.sensinact.gateway.southbound.mqtt.api.IMqttMessageListener;
import org.eclipse.sensinact.gateway.southbound.mqtt.impl.MqttClientConfiguration;
import org.eclipse.sensinact.gateway.southbound.mqtt.impl.MqttClientHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/eclipse/sensinact/gateway/southbound/mqtt/test/MqttTest.class */
public class MqttTest {
    private MqttClient client;
    private final List<MqttClientHandler> handlers = new ArrayList();
    private Server server;

    @BeforeEach
    void start() throws Exception {
        this.server = new Server();
        MemoryConfig memoryConfig = new MemoryConfig(new Properties());
        memoryConfig.setProperty("host", "127.0.0.1");
        memoryConfig.setProperty("port", "2183");
        this.server.startServer(memoryConfig);
        this.client = new MqttClient("tcp://127.0.0.1:2183", MqttClient.generateClientId());
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        this.client.connect(mqttConnectOptions);
        this.handlers.add(setupHandler("id1", "sensinact/mqtt/test1/+"));
        this.handlers.add(setupHandler(null, "sensinact/mqtt/test2/+"));
    }

    @AfterEach
    void stop() throws Exception {
        try {
            this.client.disconnect();
            this.client.close();
            Iterator<MqttClientHandler> it = this.handlers.iterator();
            while (it.hasNext()) {
                it.next().deactivate();
            }
        } finally {
            this.server.stopServer();
        }
    }

    MqttClientHandler setupHandler(String str, String... strArr) throws Exception {
        MqttClientHandler mqttClientHandler = new MqttClientHandler();
        MqttClientConfiguration mqttClientConfiguration = (MqttClientConfiguration) Mockito.mock(MqttClientConfiguration.class);
        Mockito.when(mqttClientConfiguration.id()).thenReturn(str);
        Mockito.when(mqttClientConfiguration.host()).thenReturn("127.0.0.1");
        Mockito.when(Integer.valueOf(mqttClientConfiguration.port())).thenReturn(2183);
        Mockito.when(mqttClientConfiguration.topics()).thenReturn(strArr);
        mqttClientHandler.activate(mqttClientConfiguration);
        return mqttClientHandler;
    }

    @Test
    void testMqttMultipleListeners() throws Exception {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(32);
        IMqttMessageListener iMqttMessageListener = (str, str2, iMqttMessage) -> {
            arrayBlockingQueue.add(iMqttMessage);
        };
        ArrayBlockingQueue arrayBlockingQueue2 = new ArrayBlockingQueue(32);
        IMqttMessageListener iMqttMessageListener2 = (str3, str4, iMqttMessage2) -> {
            arrayBlockingQueue2.add(iMqttMessage2);
        };
        for (MqttClientHandler mqttClientHandler : this.handlers) {
            mqttClientHandler.addListener(iMqttMessageListener, Map.of("sensinact.mqtt.topics.filters", new String[]{"sensinact/mqtt/test1/foo", "sensinact/mqtt/test1/bar"}));
            mqttClientHandler.addListener(iMqttMessageListener2, Map.of("sensinact.mqtt.topics.filters", new String[]{"sensinact/mqtt/test1/foo"}));
        }
        this.client.publish("sensinact/mqtt/test1/foo", "Hello".getBytes(StandardCharsets.UTF_8), 1, false);
        IMqttMessage iMqttMessage3 = (IMqttMessage) arrayBlockingQueue.poll(1L, TimeUnit.SECONDS);
        Assertions.assertNotNull(iMqttMessage3);
        Assertions.assertEquals("sensinact/mqtt/test1/foo", iMqttMessage3.getTopic());
        Assertions.assertEquals("Hello", new String(iMqttMessage3.getPayload(), StandardCharsets.UTF_8));
        IMqttMessage iMqttMessage4 = (IMqttMessage) arrayBlockingQueue2.poll(1L, TimeUnit.SECONDS);
        Assertions.assertNotNull(iMqttMessage4);
        Assertions.assertEquals("sensinact/mqtt/test1/foo", iMqttMessage4.getTopic());
        Assertions.assertEquals("Hello", new String(iMqttMessage4.getPayload(), StandardCharsets.UTF_8));
        Assertions.assertTrue(iMqttMessage3.getPayload() != iMqttMessage4.getPayload(), "Payload is not a new one each time");
        this.client.publish("sensinact/mqtt/test1/bar", "Hello".getBytes(StandardCharsets.UTF_8), 1, false);
        IMqttMessage iMqttMessage5 = (IMqttMessage) arrayBlockingQueue.poll(1L, TimeUnit.SECONDS);
        Assertions.assertNotNull(iMqttMessage5);
        Assertions.assertEquals("sensinact/mqtt/test1/bar", iMqttMessage5.getTopic());
        Assertions.assertEquals("Hello", new String(iMqttMessage5.getPayload(), StandardCharsets.UTF_8));
        Assertions.assertEquals(0, arrayBlockingQueue2.size(), "Listener got an unexpected message");
    }

    @Test
    void testMqttMultipleConfigurations() throws Exception {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(32);
        IMqttMessageListener iMqttMessageListener = (str, str2, iMqttMessage) -> {
            arrayBlockingQueue.add(iMqttMessage);
        };
        Iterator<MqttClientHandler> it = this.handlers.iterator();
        while (it.hasNext()) {
            it.next().addListener(iMqttMessageListener, Map.of("sensinact.mqtt.topics.filters", new String[]{"sensinact/mqtt/test1/foo", "sensinact/mqtt/test2/bar"}));
        }
        this.client.publish("sensinact/mqtt/test1/foo", "Hello".getBytes(StandardCharsets.UTF_8), 1, false);
        IMqttMessage iMqttMessage2 = (IMqttMessage) arrayBlockingQueue.poll(1L, TimeUnit.SECONDS);
        Assertions.assertNotNull(iMqttMessage2);
        Assertions.assertEquals("sensinact/mqtt/test1/foo", iMqttMessage2.getTopic());
        Assertions.assertEquals("Hello", new String(iMqttMessage2.getPayload(), StandardCharsets.UTF_8));
        this.client.publish("sensinact/mqtt/test1/bar", "Hello".getBytes(StandardCharsets.UTF_8), 1, false);
        Assertions.assertNull((IMqttMessage) arrayBlockingQueue.poll(1L, TimeUnit.SECONDS));
        this.client.publish("sensinact/mqtt/test2/foo", "Hello".getBytes(StandardCharsets.UTF_8), 1, false);
        Assertions.assertNull((IMqttMessage) arrayBlockingQueue.poll(1L, TimeUnit.SECONDS));
        this.client.publish("sensinact/mqtt/test2/bar", "Hello".getBytes(StandardCharsets.UTF_8), 1, false);
        IMqttMessage iMqttMessage3 = (IMqttMessage) arrayBlockingQueue.poll(1L, TimeUnit.SECONDS);
        Assertions.assertNotNull(iMqttMessage3);
        Assertions.assertEquals("sensinact/mqtt/test2/bar", iMqttMessage3.getTopic());
        Assertions.assertEquals("Hello", new String(iMqttMessage3.getPayload(), StandardCharsets.UTF_8));
    }

    @Test
    void testMqttHandlerId() throws Exception {
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(32);
        IMqttMessageListener iMqttMessageListener = (str, str2, iMqttMessage) -> {
            Assertions.assertEquals(str, iMqttMessage.getHandlerId());
            arrayBlockingQueue.add(iMqttMessage);
        };
        Iterator<MqttClientHandler> it = this.handlers.iterator();
        while (it.hasNext()) {
            it.next().addListener(iMqttMessageListener, Map.of("sensinact.mqtt.topics.filters", new String[]{"sensinact/mqtt/test1/foo", "sensinact/mqtt/test2/bar"}));
        }
        this.client.publish("sensinact/mqtt/test1/foo", "HandlerID".getBytes(StandardCharsets.UTF_8), 1, false);
        IMqttMessage iMqttMessage2 = (IMqttMessage) arrayBlockingQueue.poll(1L, TimeUnit.SECONDS);
        Assertions.assertNotNull(iMqttMessage2);
        Assertions.assertEquals("id1", iMqttMessage2.getHandlerId());
        Assertions.assertEquals("sensinact/mqtt/test1/foo", iMqttMessage2.getTopic());
        Assertions.assertEquals("HandlerID", new String(iMqttMessage2.getPayload(), StandardCharsets.UTF_8));
        this.client.publish("sensinact/mqtt/test2/bar", "HandlerID".getBytes(StandardCharsets.UTF_8), 1, false);
        IMqttMessage iMqttMessage3 = (IMqttMessage) arrayBlockingQueue.poll(1L, TimeUnit.SECONDS);
        Assertions.assertNotNull(iMqttMessage3);
        Assertions.assertNotEquals(iMqttMessage2.getHandlerId(), UUID.fromString(iMqttMessage3.getHandlerId()).toString());
        Assertions.assertEquals("sensinact/mqtt/test2/bar", iMqttMessage3.getTopic());
        Assertions.assertEquals("HandlerID", new String(iMqttMessage3.getPayload(), StandardCharsets.UTF_8));
    }
}
