package org.gecko.adapter.amqp.tests;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.gecko.adapter.amqp.client.AMQPClient;
import org.gecko.adapter.amqp.client.AMQPContextBuilder;
import org.gecko.osgi.messaging.MessagingService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extensions;
import org.mockito.junit.jupiter.MockitoExtension;
import org.osgi.service.cm.annotations.RequireConfigurationAdmin;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.annotation.Property;
import org.osgi.test.common.annotation.config.WithFactoryConfiguration;
import org.osgi.test.common.service.ServiceAware;
import org.osgi.test.junit5.cm.ConfigurationExtension;
import org.osgi.test.junit5.context.BundleContextExtension;
import org.osgi.test.junit5.service.ServiceExtension;
import org.osgi.util.promise.PromiseFactory;
import org.osgi.util.pushstream.PushStream;

@Extensions({@ExtendWith({MockitoExtension.class}), @ExtendWith({BundleContextExtension.class}), @ExtendWith({ServiceExtension.class}), @ExtendWith({ConfigurationExtension.class})})
@RequireConfigurationAdmin
/* loaded from: input_file:org/gecko/adapter/amqp/tests/AMQPComponentSubscribeTest.class */
public class AMQPComponentSubscribeTest {
    private String amqpHost = System.getProperty("amqp.host", "devel.data-in-motion.biz");
    private AMQPClient checkClient;

    @Nested
    /* loaded from: input_file:org/gecko/adapter/amqp/tests/AMQPComponentSubscribeTest$Env.class */
    class Env {
        Env() {
        }

        @BeforeEach
        public void setup() throws Exception {
            System.setProperty("AMQP_USER", "demo");
            System.setProperty("AMQP_PWD", "1234");
        }

        @AfterEach
        public void teardown() throws Exception {
            System.clearProperty("AMQP_USER");
            System.clearProperty("AMQP_PWD");
        }

        @Test
        @WithFactoryConfiguration(factoryPid = "AMQPService", location = "?", name = "ps", properties = {@Property(key = "username.env", value = {"AMQP_USER"}), @Property(key = "password.env", value = {"AMQP_PWD"}), @Property(key = "host", value = {"devel.data-in-motion.biz"}), @Property(key = "port", value = {"5672"}), @Property(key = "virtualHost", value = {"test"})})
        public void testSubscribeMessageEnv(@InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware) throws Exception {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            AtomicReference atomicReference = new AtomicReference();
            MessagingService messagingService = (MessagingService) serviceAware.waitForService(2000L);
            Assertions.assertNotNull(messagingService);
            messagingService.subscribe("test_SubscribeMessageEnv").forEach(message -> {
                atomicReference.set(new String(message.payload().array()));
                countDownLatch.countDown();
            });
            AMQPComponentSubscribeTest.this.checkClient.sendSingleWithQueue("test_SubscribeMessageEnv", "this is an AMQP test");
            countDownLatch.await(15L, TimeUnit.SECONDS);
            Assertions.assertEquals("this is an AMQP test", atomicReference.get());
        }
    }

    @BeforeEach
    public void setup() throws Exception {
        this.checkClient = new AMQPClient();
    }

    @AfterEach
    public void teardown() throws Exception {
        this.checkClient.purgeChannel("test_#");
        this.checkClient.disconnect();
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "AMQPService", location = "?", name = "ps", properties = {@Property(key = "brokerUrl", value = {"amqp://demo:1234@devel.data-in-motion.biz:5672/test"})})
    public void testSubscribeMessage(@InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        MessagingService messagingService = (MessagingService) serviceAware.waitForService(2000L);
        Assertions.assertNotNull(messagingService);
        messagingService.subscribe("test_SubscribeMessage").forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch.countDown();
        });
        this.checkClient.sendSingleWithQueue("test_SubscribeMessage", "this is an AMQP test");
        countDownLatch.await(15L, TimeUnit.SECONDS);
        Assertions.assertEquals("this is an AMQP test", atomicReference.get());
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "AMQPService", location = "?", name = "ps", properties = {@Property(key = "brokerUrl", value = {"amqp://demo:1234@devel.data-in-motion.biz:5672/test"})})
    public void testSubscribeFanout(@InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        MessagingService messagingService = (MessagingService) serviceAware.waitForService(2000L);
        Assertions.assertNotNull(messagingService);
        messagingService.subscribe("test_SubscribeFanout", new AMQPContextBuilder().fanout().exchange("test_SubscribeFanout", "").build()).forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch.countDown();
        });
        new AMQPClient(this.amqpHost).registerConsumerFanout("test_SubscribeFanout", bArr -> {
            atomicReference2.set(new String(bArr));
            countDownLatch.countDown();
        });
        this.checkClient.sendSingleWithFanout("test_SubscribeFanout", "this is an AMQP test");
        Assertions.assertTrue(countDownLatch.await(15L, TimeUnit.SECONDS));
        Assertions.assertEquals("this is an AMQP test", atomicReference.get());
        Assertions.assertEquals("this is an AMQP test", atomicReference2.get());
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "AMQPService", location = "?", name = "ps", properties = {@Property(key = "brokerUrl", value = {"amqp://demo:1234@devel.data-in-motion.biz:5672/test"})})
    public void testSubscribeDirectMulticast(@InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        MessagingService messagingService = (MessagingService) serviceAware.waitForService(2000L);
        Assertions.assertNotNull(messagingService);
        messagingService.subscribe("test_SubscribeDirectMulticast", new AMQPContextBuilder().durable().direct().exchange("test_SubscribeDirectMulticast", "blub").build()).forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch.countDown();
        });
        new AMQPClient(this.amqpHost).registerConsumerDirect("test_SubscribeDirectMulticast", "blub", bArr -> {
            atomicReference2.set(new String(bArr));
            countDownLatch.countDown();
        });
        this.checkClient.sendSingleWithExchangeDirect("test_SubscribeDirectMulticast", "blub", "this is an AMQP test");
        Assertions.assertTrue(countDownLatch.await(15L, TimeUnit.SECONDS));
        Assertions.assertEquals("this is an AMQP test", atomicReference.get());
        Assertions.assertEquals("this is an AMQP test", atomicReference2.get());
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "AMQPService", location = "?", name = "ps", properties = {@Property(key = "brokerUrl", value = {"amqp://demo:1234@devel.data-in-motion.biz:5672/test"})})
    public void testSubscribeDirectSelective(@InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        MessagingService messagingService = (MessagingService) serviceAware.waitForService(2000L);
        Assertions.assertNotNull(messagingService);
        messagingService.subscribe("test_SubscribeDirectSelective", new AMQPContextBuilder().durable().direct().exchange("test_SubscribeDirectSelective", "bla").build()).forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch.countDown();
        });
        new AMQPClient(this.amqpHost).registerConsumerDirect("test_SubscribeDirectSelective", "blub", bArr -> {
            atomicReference2.set(new String(bArr));
            countDownLatch.countDown();
        });
        this.checkClient.sendSingleWithExchangeDirect("test_SubscribeDirectSelective", "blub", "this is an AMQP test");
        Assertions.assertTrue(countDownLatch.await(15L, TimeUnit.SECONDS));
        Assertions.assertNull(atomicReference.get());
        Assertions.assertEquals("this is an AMQP test", atomicReference2.get());
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "AMQPService", location = "?", name = "ps", properties = {@Property(key = "brokerUrl", value = {"amqp://demo:1234@devel.data-in-motion.biz:5672/test"}), @Property(key = "pushstream.ses.bufferSize", value = {"100"}, scalar = Property.Scalar.Integer), @Property(key = "pushstream.ses.queue.policy.name", value = {"BLOCK"})})
    public void testSubscribeMessageConfigureEventSource(@InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        MessagingService messagingService = (MessagingService) serviceAware.waitForService(2000L);
        Assertions.assertNotNull(messagingService);
        messagingService.subscribe("test_SubscribeMessageConfigureEventSource").forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch.countDown();
        });
        this.checkClient.sendSingleWithQueue("test_SubscribeMessageConfigureEventSource", "this is an AMQP test");
        countDownLatch.await(15L, TimeUnit.SECONDS);
        Assertions.assertEquals("this is an AMQP test", atomicReference.get());
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "AMQPService", location = "?", name = "ps", properties = {@Property(key = "brokerUrl", value = {"amqp://demo:1234@devel.data-in-motion.biz:5672/test"}), @Property(key = "jmx", value = {"true"})})
    public void testSubscribeMessageMany(@InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(new ArrayList(10));
        MessagingService messagingService = (MessagingService) serviceAware.waitForService(2000L);
        Assertions.assertNotNull(messagingService);
        messagingService.subscribe("test_SubscribeMessageMany").forEach(message -> {
            ((List) atomicReference.get()).add(new String(message.payload().array()));
            countDownLatch.countDown();
        });
        for (int i = 0; i < 10; i++) {
            this.checkClient.sendSingleWithQueue("test_SubscribeMessageMany", "this is an AMQP test" + i);
        }
        countDownLatch.await(25L, TimeUnit.SECONDS);
        Assertions.assertEquals(10, ((List) atomicReference.get()).size());
        Assertions.assertEquals("this is an AMQP test" + "9", ((List) atomicReference.get()).get(9));
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "AMQPService", location = "?", name = "ps", properties = {@Property(key = "brokerUrl", value = {"amqp://demo:1234@devel.data-in-motion.biz:5672/test"})})
    public void testSubscribeManyMessageMany(@InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware) throws Exception {
        String str = "test_SubscribeManyMessageManyr";
        String str2 = "this is an AMQP test";
        CountDownLatch countDownLatch = new CountDownLatch(20);
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(new ArrayList(20));
        MessagingService messagingService = (MessagingService) serviceAware.waitForService(2000L);
        Assertions.assertNotNull(messagingService);
        AMQPContextBuilder aMQPContextBuilder = new AMQPContextBuilder();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        PushStream subscribe = messagingService.subscribe("test_SubscribeManyMessageManyr", aMQPContextBuilder.direct().exchange("test_SubscribeManyMessageManyr", "test").durable().withParallelism(2).withExecutor(newCachedThreadPool).build());
        PushStream subscribe2 = messagingService.subscribe("test_SubscribeManyMessageManyr", aMQPContextBuilder.direct().exchange("test_SubscribeManyMessageManyr", "test").durable().withParallelism(2).withExecutor(newCachedThreadPool).build());
        subscribe.forEach(message -> {
            String str3 = new String(message.payload().array());
            List list = (List) atomicReference.get();
            synchronized (list) {
                list.add(str3 + "1");
            }
            countDownLatch.countDown();
        });
        subscribe2.forEach(message2 -> {
            String str3 = new String(message2.payload().array());
            List list = (List) atomicReference.get();
            synchronized (list) {
                list.add(str3 + "2");
            }
            countDownLatch.countDown();
        });
        Assertions.assertEquals(10, ((Integer) new PromiseFactory(newCachedThreadPool).submit(() -> {
            for (int i = 0; i < 10; i++) {
                this.checkClient.sendSingleWithExchangeDirect(str, "test", str2 + i);
            }
            return 10;
        }).getValue()).intValue());
        countDownLatch.await(10L, TimeUnit.SECONDS);
        Assertions.assertEquals(20, ((List) atomicReference.get()).size());
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "AMQPService", location = "?", name = "ps", properties = {@Property(key = "brokerUrl", value = {"amqp://demo:1234@devel.data-in-motion.biz:5672/test"})})
    public void testSubscribeMessageWrongQueue(@InjectService(cardinality = 0) ServiceAware<MessagingService> serviceAware) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        MessagingService messagingService = (MessagingService) serviceAware.waitForService(2000L);
        Assertions.assertNotNull(messagingService);
        messagingService.subscribe("test_SubscribeMessageWrongQueue").forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch.countDown();
        });
        this.checkClient.sendSingleWithQueue("test_SubscribeMessageWrongQueue" + "test", "this is an AMQP test");
        Assertions.assertFalse(countDownLatch.await(5L, TimeUnit.SECONDS));
        Assertions.assertNull(atomicReference.get());
    }
}
