package org.gecko.adapter.amqp.tests;

import java.nio.ByteBuffer;
import java.util.Dictionary;
import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.gecko.osgi.messaging.MessagingRPCPubOnSub;
import org.gecko.osgi.messaging.MessagingRPCService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extensions;
import org.mockito.junit.jupiter.MockitoExtension;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.osgi.test.common.annotation.InjectBundleContext;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.service.ServiceAware;
import org.osgi.test.junit5.context.BundleContextExtension;
import org.osgi.test.junit5.service.ServiceExtension;

@Extensions({@ExtendWith({MockitoExtension.class}), @ExtendWith({BundleContextExtension.class}), @ExtendWith({ServiceExtension.class})})
/* loaded from: input_file:org/gecko/adapter/amqp/tests/AMQPComponentRPCPubOnSubTest.class */
public class AMQPComponentRPCPubOnSubTest {
    private String amqpHost = System.getProperty("amqp.host", "devel.data-in-motion.biz");
    private String brokerUrl = "amqp://demo:1234@" + this.amqpHost + ":5672/test";
    private Configuration clientConfig = null;
    private Configuration serverConfig = null;

    @InjectBundleContext
    BundleContext context;

    @InjectService
    ConfigurationAdmin configAdmin;

    @BeforeEach
    public void setup() throws Exception {
    }

    @AfterEach
    public void teardown() throws Exception {
    }

    public void testRPCPubOnSubMessage(@InjectService(cardinality = 0) ServiceAware<MessagingRPCService> serviceAware, @InjectService(cardinality = 0) ServiceAware<MessagingRPCPubOnSub> serviceAware2, @InjectService(cardinality = 0) ServiceAware<Function> serviceAware3) throws Exception {
        Assertions.assertTrue(serviceAware2.isEmpty());
        Assertions.assertTrue(serviceAware.isEmpty());
        Assertions.assertTrue(serviceAware3.isEmpty());
        this.clientConfig = getConfiguration("AMQPRPCService");
        this.serverConfig = getConfiguration("AMQPubOnSubService");
        Assertions.assertNull(this.clientConfig.getProperties());
        Assertions.assertNull(this.serverConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        hashtable.put("brokerUrl", this.brokerUrl);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        this.clientConfig.update(hashtable);
        MessagingRPCService messagingRPCService = (MessagingRPCService) serviceAware.waitForService(2000L);
        Assertions.assertNotNull(messagingRPCService);
        ServiceRegistration registerService = this.context.registerService(Function.class, new POSFunction(), (Dictionary) null);
        Assertions.assertNotNull(serviceAware3.waitForService(2000L));
        Hashtable hashtable2 = new Hashtable();
        hashtable2.put("brokerUrl", this.brokerUrl);
        hashtable2.put("rpcQueue", "test_pubonsub");
        hashtable2.put("pushstream.parallelism", 5);
        this.serverConfig.update(hashtable2);
        Assertions.assertNotNull((MessagingRPCPubOnSub) serviceAware2.waitForService(2000L));
        messagingRPCService.publishRPC("test_pubonsub", ByteBuffer.wrap("this is an AMQP test".getBytes())).thenAccept(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch.countDown();
        });
        countDownLatch.await(15L, TimeUnit.SECONDS);
        Assertions.assertEquals("Response: " + "this is an AMQP test", atomicReference.get());
        registerService.unregister();
        this.serverConfig.delete();
        this.clientConfig.delete();
        Thread.sleep(500L);
    }

    public void testRPCPubOnSubMessageEnv(@InjectService(cardinality = 0) ServiceAware<MessagingRPCService> serviceAware, @InjectService(cardinality = 0) ServiceAware<MessagingRPCPubOnSub> serviceAware2, @InjectService(cardinality = 0) ServiceAware<Function> serviceAware3) throws Exception {
        Assertions.assertTrue(serviceAware2.isEmpty());
        Assertions.assertTrue(serviceAware.isEmpty());
        Assertions.assertTrue(serviceAware3.isEmpty());
        this.clientConfig = getConfiguration("AMQPRPCService");
        this.serverConfig = getConfiguration("AMQPubOnSubService");
        Assertions.assertNull(this.clientConfig.getProperties());
        Assertions.assertNull(this.serverConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        System.setProperty("AMQP_USER", "demo");
        System.setProperty("AMQP_PWD", "1234");
        hashtable.put("username.env", "AMQP_USER");
        hashtable.put("password.env", "AMQP_PWD");
        hashtable.put("host", this.amqpHost);
        hashtable.put("port", 5672);
        hashtable.put("virtualHost", "test");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        this.clientConfig.update(hashtable);
        MessagingRPCService messagingRPCService = (MessagingRPCService) serviceAware.waitForService(2000L);
        Assertions.assertNotNull(messagingRPCService);
        ServiceRegistration registerService = this.context.registerService(Function.class, new POSFunction(), (Dictionary) null);
        Assertions.assertNotNull(serviceAware3.waitForService(2000L));
        Hashtable hashtable2 = new Hashtable();
        hashtable2.put("username.env", "AMQP_USER");
        hashtable2.put("password.env", "AMQP_PWD");
        hashtable2.put("host", this.amqpHost);
        hashtable2.put("port", 5672);
        hashtable2.put("virtualHost", "test");
        hashtable2.put("rpcQueue", "test_pubonsub");
        hashtable2.put("pushstream.parallelism", 5);
        this.serverConfig.update(hashtable2);
        Assertions.assertNotNull((MessagingRPCPubOnSub) serviceAware2.waitForService(2000L));
        messagingRPCService.publishRPC("test_pubonsub", ByteBuffer.wrap("this is an AMQP test".getBytes())).thenAccept(message -> {
            atomicReference.set(new String(message.payload().array()));
            countDownLatch.countDown();
        });
        countDownLatch.await(15L, TimeUnit.SECONDS);
        Assertions.assertEquals("Response: " + "this is an AMQP test", atomicReference.get());
        registerService.unregister();
        this.serverConfig.delete();
        this.clientConfig.delete();
        Thread.sleep(500L);
    }

    private Configuration getConfiguration(String str) throws Exception {
        Configuration configuration = this.configAdmin.getConfiguration(str, "?");
        Assertions.assertNotNull(configuration);
        return configuration;
    }
}
