package org.gecko.adapter.amqp.tests;

import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.gecko.adapter.amqp.client.AMQPClient;
import org.gecko.adapter.amqp.client.AMQPContextBuilder;
import org.gecko.osgi.messaging.MessagingService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Filter;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.osgi.util.promise.PromiseFactory;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.QueuePolicyOption;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/gecko/adapter/amqp/tests/AMQPComponentSubscribeTest.class */
public class AMQPComponentSubscribeTest {
    private AMQPClient checkClient;
    private String amqpHost = System.getProperty("amqp.host", "localhost");
    private String brokerUrl = "amqp://demo:1234@" + this.amqpHost + ":5672/test";
    private Configuration clientConfig = null;
    private final BundleContext context = FrameworkUtil.getBundle(AMQPComponentSubscribeTest.class).getBundleContext();

    @Before
    public void setup() throws Exception {
        this.checkClient = new AMQPClient();
    }

    @After
    public void teardown() throws Exception {
        this.checkClient.disconnect();
        if (this.clientConfig != null) {
            this.clientConfig.delete();
            this.clientConfig = null;
        }
    }

    @Test
    public void testSubscribeMessage() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.clientConfig = getConfiguration(this.context, "AMQPService", countDownLatch);
        Assert.assertNull(this.clientConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        hashtable.put("brokerUrl", this.brokerUrl);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        this.clientConfig.update(hashtable);
        countDownLatch.await(2L, TimeUnit.SECONDS);
        MessagingService messagingService = (MessagingService) getService(MessagingService.class, 30000L);
        Assert.assertNotNull(messagingService);
        messagingService.subscribe(AMQPComponentPublishTest.PUBLISH_TOPIC).forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch2.countDown();
        });
        this.checkClient.sendSingleWithQueue(AMQPComponentPublishTest.PUBLISH_TOPIC, "this is an AMQP test");
        countDownLatch2.await(15L, TimeUnit.SECONDS);
        Assert.assertEquals("this is an AMQP test", atomicReference.get());
    }

    @Test
    public void testSubscribeMessageEnv() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.clientConfig = getConfiguration(this.context, "AMQPService", countDownLatch);
        Assert.assertNull(this.clientConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        System.setProperty("AMQP_USER", "demo");
        System.setProperty("AMQP_PWD", "1234");
        hashtable.put("username.env", "AMQP_USER");
        hashtable.put("password.env", "AMQP_PWD");
        hashtable.put("host", this.amqpHost);
        hashtable.put("port", 5672);
        hashtable.put("virtualHost", "test");
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        this.clientConfig.update(hashtable);
        countDownLatch.await(2L, TimeUnit.SECONDS);
        MessagingService messagingService = (MessagingService) getService(MessagingService.class, 30000L);
        Assert.assertNotNull(messagingService);
        messagingService.subscribe(AMQPComponentPublishTest.PUBLISH_TOPIC).forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch2.countDown();
        });
        this.checkClient.sendSingleWithQueue(AMQPComponentPublishTest.PUBLISH_TOPIC, "this is an AMQP test");
        countDownLatch2.await(15L, TimeUnit.SECONDS);
        Assert.assertEquals("this is an AMQP test", atomicReference.get());
    }

    @Test
    public void testSubscribeMessageConfigureEventSource() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.clientConfig = getConfiguration(this.context, "AMQPService", countDownLatch);
        Assert.assertNull(this.clientConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        hashtable.put("brokerUrl", this.brokerUrl);
        hashtable.put("pushstream.ses.bufferSize", 100);
        hashtable.put("pushstream.ses.queue.policy.name", QueuePolicyOption.BLOCK.name());
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        this.clientConfig.update(hashtable);
        countDownLatch.await(2L, TimeUnit.SECONDS);
        MessagingService messagingService = (MessagingService) getService(MessagingService.class, 30000L);
        Assert.assertNotNull(messagingService);
        messagingService.subscribe(AMQPComponentPublishTest.PUBLISH_TOPIC).forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch2.countDown();
        });
        this.checkClient.sendSingleWithQueue(AMQPComponentPublishTest.PUBLISH_TOPIC, "this is an AMQP test");
        countDownLatch2.await(15L, TimeUnit.SECONDS);
        Assert.assertEquals("this is an AMQP test", atomicReference.get());
    }

    @Test
    public void testSubscribeMessageMany() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.clientConfig = getConfiguration(this.context, "AMQPService", countDownLatch);
        Assert.assertNull(this.clientConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        hashtable.put("brokerUrl", this.brokerUrl);
        hashtable.put("jmx", true);
        CountDownLatch countDownLatch2 = new CountDownLatch(10);
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(new ArrayList(10));
        this.clientConfig.update(hashtable);
        countDownLatch.await(10L, TimeUnit.SECONDS);
        MessagingService messagingService = (MessagingService) getService(MessagingService.class, 30000L);
        Assert.assertNotNull(messagingService);
        messagingService.subscribe(AMQPComponentPublishTest.PUBLISH_TOPIC).forEach(message -> {
            ((List) atomicReference.get()).add(new String(message.payload().array()));
            countDownLatch2.countDown();
        });
        for (int i = 0; i < 10; i++) {
            this.checkClient.sendSingleWithQueue(AMQPComponentPublishTest.PUBLISH_TOPIC, "this is an AMQP test" + i);
        }
        countDownLatch2.await(25L, TimeUnit.SECONDS);
        Assert.assertEquals(10L, ((List) atomicReference.get()).size());
        Assert.assertEquals("this is an AMQP test9", ((List) atomicReference.get()).get(9));
    }

    @Test
    public void testSubscribeManyMessageMany() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.clientConfig = getConfiguration(this.context, "AMQPService", countDownLatch);
        String str = "test_dir";
        String str2 = "this is an AMQP test";
        Assert.assertNull(this.clientConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        hashtable.put("brokerUrl", this.brokerUrl);
        CountDownLatch countDownLatch2 = new CountDownLatch(20);
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(new ArrayList(20));
        this.clientConfig.update(hashtable);
        countDownLatch.await(2L, TimeUnit.SECONDS);
        MessagingService messagingService = (MessagingService) getService(MessagingService.class, 30000L);
        Assert.assertNotNull(messagingService);
        AMQPContextBuilder aMQPContextBuilder = new AMQPContextBuilder();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        PushStream subscribe = messagingService.subscribe("test_dir", aMQPContextBuilder.direct().exchange("test_dir", "test").durable().withParallelism(2).withExecutor(newCachedThreadPool).build());
        PushStream subscribe2 = messagingService.subscribe("test_dir", aMQPContextBuilder.direct().exchange("test_dir", "test").durable().withParallelism(2).withExecutor(newCachedThreadPool).build());
        subscribe.forEach(message -> {
            String str3 = new String(message.payload().array());
            List list = (List) atomicReference.get();
            synchronized (list) {
                list.add(str3 + "1");
            }
            countDownLatch2.countDown();
        });
        subscribe2.forEach(message2 -> {
            String str3 = new String(message2.payload().array());
            List list = (List) atomicReference.get();
            synchronized (list) {
                list.add(str3 + "2");
            }
            countDownLatch2.countDown();
        });
        Assert.assertEquals(10L, ((Integer) new PromiseFactory(newCachedThreadPool).submit(() -> {
            for (int i = 0; i < 10; i++) {
                this.checkClient.sendSingleWithExchangeDirect(str, "test", str2 + i);
            }
            return 10;
        }).getValue()).intValue());
        countDownLatch2.await(10L, TimeUnit.SECONDS);
        Assert.assertEquals(20L, ((List) atomicReference.get()).size());
    }

    @Test
    public void testSubscribeMessageWrongQueue() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.clientConfig = getConfiguration(this.context, "AMQPService", countDownLatch);
        Assert.assertNull(this.clientConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        hashtable.put("brokerUrl", this.brokerUrl);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        this.clientConfig.update(hashtable);
        countDownLatch.await(2L, TimeUnit.SECONDS);
        MessagingService messagingService = (MessagingService) getService(MessagingService.class, 30000L);
        Assert.assertNotNull(messagingService);
        messagingService.subscribe(AMQPComponentPublishTest.PUBLISH_TOPIC).forEach(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch2.countDown();
        });
        this.checkClient.sendSingleWithQueue(AMQPComponentPublishTest.PUBLISH_TOPIC + "test", "this is an AMQP test");
        Assert.assertFalse(countDownLatch2.await(5L, TimeUnit.SECONDS));
        Assert.assertNull(atomicReference.get());
    }

    private Configuration getConfiguration(BundleContext bundleContext, String str, CountDownLatch countDownLatch) throws Exception {
        ServiceReference[] allServiceReferences = bundleContext.getAllServiceReferences(ConfigurationAdmin.class.getName(), (String) null);
        Assert.assertNotNull(allServiceReferences);
        Assert.assertEquals(1L, allServiceReferences.length);
        Object service = bundleContext.getService(allServiceReferences[0]);
        Assert.assertNotNull(service);
        Assert.assertTrue(service instanceof ConfigurationAdmin);
        Configuration configuration = ((ConfigurationAdmin) service).getConfiguration(str, "?");
        Assert.assertNotNull(configuration);
        return configuration;
    }

    <T> T getService(Class<T> cls, long j) throws InterruptedException {
        ServiceTracker serviceTracker = new ServiceTracker(this.context, cls, (ServiceTrackerCustomizer) null);
        serviceTracker.open();
        return (T) serviceTracker.waitForService(j);
    }

    <T> T getService(Filter filter, long j) throws InterruptedException {
        ServiceTracker serviceTracker = new ServiceTracker(this.context, filter, (ServiceTrackerCustomizer) null);
        serviceTracker.open();
        return (T) serviceTracker.waitForService(j);
    }
}
