package org.gecko.adapter.mqtt.tests;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttPersistenceException;
import org.gecko.moquette.broker.MQTTBroker;
import org.gecko.osgi.messaging.MessagingService;
import org.gecko.osgi.messaging.annotations.RequireMQTTv3;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extensions;
import org.mockito.junit.jupiter.MockitoExtension;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.annotation.Property;
import org.osgi.test.common.annotation.config.WithFactoryConfiguration;
import org.osgi.test.common.service.ServiceAware;
import org.osgi.test.junit5.cm.ConfigurationExtension;
import org.osgi.test.junit5.service.ServiceExtension;
import org.osgi.util.pushstream.PushStream;

@RequireMQTTv3
@Extensions({@ExtendWith({MockitoExtension.class}), @ExtendWith({ServiceExtension.class}), @ExtendWith({ConfigurationExtension.class})})
@WithFactoryConfiguration(factoryPid = "MQTTBroker", location = "?", name = "broker", properties = {@Property(key = "HOST", value = {"localhost"}), @Property(key = "PORT", value = {"2183"})})
/* loaded from: input_file:org/gecko/adapter/mqtt/tests/MqttComponentSubscribeTest.class */
public class MqttComponentSubscribeTest {
    private static final String BROKER_URL = "tcp://localhost:2183";
    private MqttClient checkClient;

    @AfterEach
    public void teardown() throws MqttException, IOException {
        if (this.checkClient != null) {
            if (this.checkClient.isConnected()) {
                this.checkClient.disconnect();
            }
            this.checkClient.close();
        }
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "MQTTService", location = "?", name = "read", properties = {@Property(key = "brokerUrl", value = {BROKER_URL})})
    public void testSubscribeMessage_NoMessage(@InjectService(cardinality = 0) ServiceAware<MQTTBroker> serviceAware, @InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware2) throws Exception {
        Assertions.assertNotNull((MQTTBroker) serviceAware.waitForService(1000L));
        MessagingService messagingService = (MessagingService) serviceAware2.waitForService(1000L);
        Assertions.assertNotNull(messagingService);
        PushStream subscribe = messagingService.subscribe("test.SubscribeMessage_NoMessage");
        Assertions.assertNotNull(subscribe);
        subscribe.close();
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "MQTTService", location = "?", name = "read", properties = {@Property(key = "brokerUrl", value = {BROKER_URL})})
    public void testSubscribeOftenMessage_NoMessage(@InjectService(cardinality = 0) ServiceAware<MQTTBroker> serviceAware, @InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware2) throws Exception {
        Assertions.assertNotNull((MQTTBroker) serviceAware.waitForService(1000L));
        MessagingService messagingService = (MessagingService) serviceAware2.waitForService(1000L);
        Assertions.assertNotNull(messagingService);
        PushStream subscribe = messagingService.subscribe("test.SubscribeOftenMessage_NoMessage");
        Assertions.assertNotNull(subscribe);
        subscribe.close();
        PushStream subscribe2 = messagingService.subscribe("test.SubscribeOftenMessage_NoMessage");
        Assertions.assertNotNull(subscribe2);
        subscribe2.close();
        Assertions.assertFalse(subscribe.equals(subscribe2));
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "MQTTService", location = "?", name = "read", properties = {@Property(key = "brokerUrl", value = {BROKER_URL})})
    public void testSubscribeMessage_Message(@InjectService(cardinality = 0) ServiceAware<MQTTBroker> serviceAware, @InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware2) throws Exception {
        Assertions.assertNotNull((MQTTBroker) serviceAware.waitForService(1000L));
        MessagingService messagingService = (MessagingService) serviceAware2.waitForService(1000L);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        Assertions.assertNotNull(messagingService);
        messagingService.subscribe("test.SubscribeMessage_Message").forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch.countDown();
        });
        publish("test.SubscribeMessage_Message", "this is a test");
        countDownLatch.await(5L, TimeUnit.SECONDS);
        Assertions.assertEquals("this is a test", atomicReference.get());
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "MQTTService", location = "?", name = "read", properties = {@Property(key = "brokerUrl", value = {BROKER_URL})})
    public void testSubscribeMessage_Wildcard(@InjectService(cardinality = 0) ServiceAware<MQTTBroker> serviceAware, @InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware2) throws Exception {
        Assertions.assertNotNull((MQTTBroker) serviceAware.waitForService(1000L));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        MessagingService messagingService = (MessagingService) serviceAware2.waitForService(1000L);
        Assertions.assertNotNull(messagingService);
        messagingService.subscribe("test.SubscribeMessage/#").forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch.countDown();
        });
        publish("test.SubscribeMessage/Wildcard", "this is a test");
        countDownLatch.await(5L, TimeUnit.SECONDS);
        Assertions.assertEquals("this is a test", atomicReference.get());
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "MQTTService", location = "?", name = "read", properties = {@Property(key = "brokerUrl", value = {BROKER_URL})})
    public void testSubscribeOftenMessage_Message(@InjectService(cardinality = 0) ServiceAware<MQTTBroker> serviceAware, @InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware2) throws Exception {
        Assertions.assertNotNull((MQTTBroker) serviceAware.waitForService(1000L));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        MessagingService messagingService = (MessagingService) serviceAware2.waitForService(1000L);
        Assertions.assertNotNull(messagingService);
        PushStream subscribe = messagingService.subscribe("test.SubscribeOftenMessage_Message");
        PushStream subscribe2 = messagingService.subscribe("test.SubscribeOftenMessage_Message");
        subscribe.forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch.countDown();
        });
        subscribe2.forEach(message2 -> {
            atomicReference2.set(new String(message2.payload().array()));
            countDownLatch.countDown();
        });
        publish("test.SubscribeOftenMessage_Message", "this is a test");
        countDownLatch.await(5L, TimeUnit.SECONDS);
        Assertions.assertEquals("this is a test", atomicReference.get());
        Assertions.assertEquals("this is a test", atomicReference2.get());
        Assertions.assertEquals(atomicReference.get(), atomicReference2.get());
    }

    private void publish(String str, String str2) throws MqttPersistenceException, MqttException {
        Assertions.assertNotNull(str);
        Assertions.assertNotNull(str2);
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(str2.getBytes());
        this.checkClient = new MqttClient(BROKER_URL, "test");
        MqttConnectionOptionsBuilder mqttConnectionOptionsBuilder = new MqttConnectionOptionsBuilder();
        mqttConnectionOptionsBuilder.username("demo");
        mqttConnectionOptionsBuilder.password("1234".getBytes());
        this.checkClient.connect(mqttConnectionOptionsBuilder.build());
        this.checkClient.publish(str, mqttMessage);
    }
}
