package org.gecko.adapter.mqtt.tests;

import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.gecko.moquette.broker.MQTTBroker;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingService;
import org.gecko.osgi.messaging.annotations.RequireMQTTv3;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extensions;
import org.mockito.junit.jupiter.MockitoExtension;
import org.osgi.service.cm.annotations.RequireConfigurationAdmin;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.annotation.Property;
import org.osgi.test.common.annotation.config.WithFactoryConfiguration;
import org.osgi.test.common.annotation.config.WithFactoryConfigurations;
import org.osgi.test.common.service.ServiceAware;
import org.osgi.test.junit5.cm.ConfigurationExtension;
import org.osgi.test.junit5.context.BundleContextExtension;
import org.osgi.test.junit5.service.ServiceExtension;

@RequireMQTTv3
@Extensions({@ExtendWith({MockitoExtension.class}), @ExtendWith({ServiceExtension.class}), @ExtendWith({ConfigurationExtension.class}), @ExtendWith({BundleContextExtension.class})})
@RequireConfigurationAdmin
/* loaded from: input_file:org/gecko/adapter/mqtt/tests/MqttComponentReconnectTest.class */
public class MqttComponentReconnectTest {
    private static final String TOPIC = "publish.junit." + UUID.randomUUID();
    private static final String BROKER_URL = "tcp://localhost:2183";
    private CountDownLatch messageLatch;

    /* loaded from: input_file:org/gecko/adapter/mqtt/tests/MqttComponentReconnectTest$TestPublisher.class */
    private static final class TestPublisher implements Runnable {
        private final MessagingService messagingService;
        private boolean isPublishing;
        private final CountDownLatch successLatch = new CountDownLatch(1);
        private CountDownLatch errorLatch = new CountDownLatch(1);

        private TestPublisher(MessagingService messagingService) {
            this.messagingService = messagingService;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.isPublishing = true;
            while (true) {
                try {
                    this.messagingService.publish(MqttComponentReconnectTest.TOPIC, ByteBuffer.wrap("42".getBytes()));
                    if (!this.isPublishing) {
                        this.isPublishing = true;
                        this.successLatch.countDown();
                    }
                } catch (Exception e) {
                    if (this.isPublishing) {
                        this.isPublishing = false;
                        this.errorLatch.countDown();
                    }
                }
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e2) {
                }
            }
        }

        private void waitError(int i) throws InterruptedException {
            this.errorLatch.await(i, TimeUnit.SECONDS);
        }

        private void waitSuccess(int i) throws InterruptedException {
            this.successLatch.await(i, TimeUnit.SECONDS);
        }
    }

    @WithFactoryConfigurations({@WithFactoryConfiguration(factoryPid = "MQTTBroker", location = "?", name = "broker", properties = {@Property(key = "HOST", value = {"localhost"}), @Property(key = "PORT", value = {"2183"})}), @WithFactoryConfiguration(factoryPid = "MQTTService", location = "?", name = "read", properties = {@Property(key = "brokerUrl", value = {BROKER_URL})}), @WithFactoryConfiguration(factoryPid = "MQTTService", location = "?", name = "write", properties = {@Property(key = "brokerUrl", value = {BROKER_URL})})})
    @Test
    public void testReconnect(@InjectService(cardinality = 0) ServiceAware<MQTTBroker> serviceAware, @InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware2, @InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware3) throws Exception {
        this.messageLatch = new CountDownLatch(1);
        MQTTBroker mQTTBroker = (MQTTBroker) serviceAware.waitForService(1000L);
        MessagingService messagingService = (MessagingService) serviceAware2.waitForService(1000L);
        Assertions.assertNotNull(messagingService);
        messagingService.subscribe(TOPIC).forEach(message -> {
            handle(message);
        });
        MessagingService messagingService2 = (MessagingService) serviceAware3.waitForService(1000L);
        Assertions.assertNotNull(messagingService2);
        TestPublisher testPublisher = new TestPublisher(messagingService2);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            newSingleThreadExecutor.execute(testPublisher);
            this.messageLatch.await(1L, TimeUnit.SECONDS);
            testPublisher.waitSuccess(1);
            Assertions.assertTrue(testPublisher.isPublishing);
            mQTTBroker.stop();
            testPublisher.waitError(3);
            Assertions.assertFalse(testPublisher.isPublishing);
            mQTTBroker.start();
            testPublisher.waitSuccess(3);
            this.messageLatch = new CountDownLatch(1);
            this.messageLatch.await(3L, TimeUnit.SECONDS);
            Assertions.assertTrue(testPublisher.isPublishing);
            newSingleThreadExecutor.shutdownNow();
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    private void handle(Message message) {
        Assertions.assertEquals("42", new String(message.payload().array()));
        this.messageLatch.countDown();
    }
}
