package org.gecko.adapter.amqp.tests;

import java.nio.ByteBuffer;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.gecko.osgi.messaging.MessagingRPCPubOnSub;
import org.gecko.osgi.messaging.MessagingReplyToService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Filter;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/gecko/adapter/amqp/tests/AMQPComponentReplyToExecutorTest.class */
public class AMQPComponentReplyToExecutorTest {
    private String amqpHost = System.getProperty("amqp.host", "localhost");
    private String brokerUrl = "amqp://demo:1234@" + this.amqpHost + ":5672/test";
    private Configuration clientConfig = null;
    private Configuration serverConfig = null;
    private final BundleContext context = FrameworkUtil.getBundle(AMQPComponentReplyToExecutorTest.class).getBundleContext();

    @Before
    public void setup() throws Exception {
    }

    @After
    public void teardown() throws Exception {
        if (this.serverConfig != null) {
            this.serverConfig.delete();
            this.serverConfig = null;
        }
        if (this.clientConfig != null) {
            this.clientConfig.delete();
            this.clientConfig = null;
        }
    }

    @Test
    public void testRPCPubOnSubMessage() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.clientConfig = getConfiguration(this.context, "AMQPReplyToService", countDownLatch);
        this.serverConfig = getConfiguration(this.context, "AMQPReplyToExecutor", countDownLatch2);
        Assert.assertNull(this.clientConfig.getProperties());
        Assert.assertNull(this.serverConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        hashtable.put("brokerUrl", this.brokerUrl);
        CountDownLatch countDownLatch3 = new CountDownLatch(5);
        LinkedList linkedList = new LinkedList();
        this.clientConfig.update(hashtable);
        countDownLatch.await(2L, TimeUnit.SECONDS);
        MessagingReplyToService messagingReplyToService = (MessagingReplyToService) getService(MessagingReplyToService.class, 3000L);
        Assert.assertNotNull(messagingReplyToService);
        ServiceRegistration registerService = this.context.registerService(Function.class, new POSManyFunction(), (Dictionary) null);
        Hashtable hashtable2 = new Hashtable();
        hashtable2.put("brokerUrl", this.brokerUrl);
        hashtable2.put("rpcQueue", "test_pubonsubmany");
        hashtable2.put("pushstream.parallelism", 5);
        this.serverConfig.update(hashtable2);
        countDownLatch2.await(2L, TimeUnit.SECONDS);
        Assert.assertNotNull((MessagingRPCPubOnSub) getService(MessagingRPCPubOnSub.class, 30000L));
        messagingReplyToService.publishMany("test_pubonsubmany", ByteBuffer.wrap("this is an AMQP test".getBytes())).forEach(message -> {
            linkedList.add(new String(message.payload().array()));
            countDownLatch3.countDown();
        }).onFailure(th -> {
            Assert.fail("Unexpected error in consuming push events");
        });
        countDownLatch3.await(15L, TimeUnit.SECONDS);
        Assert.assertEquals(5L, linkedList.size());
        registerService.unregister();
    }

    private Configuration getConfiguration(BundleContext bundleContext, String str, CountDownLatch countDownLatch) throws Exception {
        ServiceReference[] allServiceReferences = bundleContext.getAllServiceReferences(ConfigurationAdmin.class.getName(), (String) null);
        Assert.assertNotNull(allServiceReferences);
        Assert.assertEquals(1L, allServiceReferences.length);
        Object service = bundleContext.getService(allServiceReferences[0]);
        Assert.assertNotNull(service);
        Assert.assertTrue(service instanceof ConfigurationAdmin);
        Configuration configuration = ((ConfigurationAdmin) service).getConfiguration(str, "?");
        Assert.assertNotNull(configuration);
        return configuration;
    }

    <T> T getService(Class<T> cls, long j) throws InterruptedException {
        ServiceTracker serviceTracker = new ServiceTracker(this.context, cls, (ServiceTrackerCustomizer) null);
        serviceTracker.open();
        return (T) serviceTracker.waitForService(j);
    }

    <T> T getService(Filter filter, long j) throws InterruptedException {
        ServiceTracker serviceTracker = new ServiceTracker(this.context, filter, (ServiceTrackerCustomizer) null);
        serviceTracker.open();
        return (T) serviceTracker.waitForService(j);
    }
}
