package org.gecko.adapter.eventadmin.tests;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.gecko.adapter.eventadmin.context.EventAdminMessagingContext;
import org.gecko.adapter.eventadmin.context.EventAdminMessagingContextBuilder;
import org.gecko.core.tests.AbstractOSGiTest;
import org.gecko.core.tests.ServiceChecker;
import org.gecko.osgi.messaging.MessagingContext;
import org.gecko.osgi.messaging.MessagingService;
import org.gecko.osgi.messaging.SimpleMessagingContextBuilder;
import org.gecko.osgi.messaging.annotations.RequireEventAdminMessageAdapter;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.framework.FrameworkUtil;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventHandler;
import org.osgi.util.pushstream.PushStream;

@RunWith(MockitoJUnitRunner.class)
@RequireEventAdminMessageAdapter
/* loaded from: input_file:org/gecko/adapter/eventadmin/tests/EventAdminMessagingAdapterTest.class */
public class EventAdminMessagingAdapterTest extends AbstractOSGiTest {
    public EventAdminMessagingAdapterTest() {
        super(FrameworkUtil.getBundle(EventAdminMessagingAdapterTest.class).getBundleContext());
    }

    @Test
    public void test() {
    }

    @Test
    public void basicTest() throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        PushStream subscribe = ((MessagingService) getService(MessagingService.class)).subscribe("test/topic");
        int i = 100;
        CountDownLatch countDownLatch = new CountDownLatch(100);
        subscribe.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        subscribe.adjustBackPressure((message, l) -> {
            System.out.println("bp: " + l);
            return l.longValue();
        }).fork(5, 0, newCachedThreadPool).forEach(message2 -> {
            String str = new String(message2.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        EventAdmin eventAdmin = (EventAdmin) getService(EventAdmin.class);
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    eventAdmin.postEvent(new Event("test/topic", Collections.singletonMap("content", ByteBuffer.wrap(("test" + i2).getBytes()))));
                } catch (Exception e) {
                    Assert.assertNull(e);
                }
            }
        });
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch.getCount(), countDownLatch.await(10L, TimeUnit.SECONDS));
    }

    @Test
    public void basicTestSendViaMessageAdapter() throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        MessagingService messagingService = (MessagingService) getService(MessagingService.class);
        PushStream subscribe = messagingService.subscribe("test/topic");
        int i = 100;
        CountDownLatch countDownLatch = new CountDownLatch(100);
        subscribe.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        subscribe.adjustBackPressure((message, l) -> {
            System.out.println("bp: " + l);
            return l.longValue();
        }).fork(5, 0, newCachedThreadPool).forEach(message2 -> {
            String str = new String(message2.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    messagingService.publish("test/topic", ByteBuffer.wrap(("test" + i2).getBytes()));
                } catch (Exception e) {
                    Assert.assertNull(e);
                }
            }
        });
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch.getCount(), countDownLatch.await(10L, TimeUnit.SECONDS));
    }

    @Test
    public void basicTestSendViaMessageAdapterWithContext() throws Exception {
        Executors.newCachedThreadPool();
        MessagingService messagingService = (MessagingService) getService(MessagingService.class);
        PushStream subscribe = messagingService.subscribe("test/topic");
        int i = 100;
        CountDownLatch countDownLatch = new CountDownLatch(100);
        subscribe.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        subscribe.forEach(message -> {
            String str = new String(message.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            Assert.assertEquals("test", message.getContext().getContentType());
            Assert.assertTrue(message.getContext() instanceof EventAdminMessagingContext);
            Assert.assertEquals("testheader", message.getContext().getHeaders().get("testheader"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        MessagingContext build = EventAdminMessagingContextBuilder.builder().header("testheader", "testheader").contentType("test").build();
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    messagingService.publish("test/topic", ByteBuffer.wrap(("test" + i2).getBytes()), build);
                } catch (Exception e) {
                    Assert.assertNull(e);
                }
            }
        });
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch.getCount(), countDownLatch.await(10L, TimeUnit.SECONDS));
    }

    @Test
    public void basicBackpressure() throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        MessagingService messagingService = (MessagingService) getService(MessagingService.class);
        PushStream subscribe = messagingService.subscribe("test/topic");
        int i = 2;
        CountDownLatch countDownLatch = new CountDownLatch(2);
        subscribe.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        subscribe.adjustBackPressure((message, l) -> {
            System.out.println("bp: " + l);
            return 5000L;
        }).fork(5, 0, newCachedThreadPool).forEach(message2 -> {
            String str = new String(message2.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        MessagingContext build = SimpleMessagingContextBuilder.builder().contentType("test").build();
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    messagingService.publish("test/topic", ByteBuffer.wrap(("test" + i2).getBytes()), build);
                } catch (Exception e) {
                    Assert.assertNull(e);
                }
            }
        });
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch.getCount(), countDownLatch.await(15L, TimeUnit.SECONDS));
    }

    @Test
    public void testCleanup() throws Exception {
        Executors.newCachedThreadPool();
        MessagingService messagingService = (MessagingService) getService(MessagingService.class);
        PushStream subscribe = messagingService.subscribe("test/topic");
        int i = 100;
        CountDownLatch countDownLatch = new CountDownLatch(100);
        subscribe.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        subscribe.forEach(message -> {
            String str = new String(message.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    messagingService.publish("test/topic", ByteBuffer.wrap(("test" + i2).getBytes()));
                } catch (Exception e) {
                    Assert.assertNull(e);
                }
            }
        });
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch.getCount(), countDownLatch.await(10L, TimeUnit.SECONDS));
        ServiceChecker createTrackedChecker = createTrackedChecker(EventHandler.class);
        subscribe.close();
        createTrackedChecker.assertRemovals(0, true);
    }

    @Test
    public void testCleanupMultiple() throws Exception {
        Executors.newCachedThreadPool();
        MessagingService messagingService = (MessagingService) getService(MessagingService.class);
        int i = 100;
        PushStream subscribe = messagingService.subscribe("test/topic");
        CountDownLatch countDownLatch = new CountDownLatch(100);
        subscribe.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        subscribe.forEach(message -> {
            String str = new String(message.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        PushStream subscribe2 = messagingService.subscribe("test/topic");
        CountDownLatch countDownLatch2 = new CountDownLatch(100);
        subscribe2.onError(th2 -> {
            System.err.println(th2.getMessage());
            th2.printStackTrace();
        });
        subscribe2.forEach(message2 -> {
            String str = new String(message2.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch2.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        PushStream subscribe3 = messagingService.subscribe("test/topic");
        CountDownLatch countDownLatch3 = new CountDownLatch(100);
        subscribe3.onError(th3 -> {
            System.err.println(th3.getMessage());
            th3.printStackTrace();
        });
        subscribe3.forEach(message3 -> {
            String str = new String(message3.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch3.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    messagingService.publish("test/topic", ByteBuffer.wrap(("test" + i2).getBytes()));
                } catch (Exception e) {
                    Assert.assertNull(e);
                }
            }
        });
        Assert.assertTrue("Not all messages have been processed. Current count " + countDownLatch.getCount(), countDownLatch.await(10L, TimeUnit.SECONDS));
        Assert.assertTrue("Not all messages have been processed. Current count " + countDownLatch2.getCount(), countDownLatch2.await(10L, TimeUnit.SECONDS));
        Assert.assertTrue("Not all messages have been processed. Current count " + countDownLatch3.getCount(), countDownLatch3.await(10L, TimeUnit.SECONDS));
        ServiceChecker createTrackedChecker = createTrackedChecker(EventHandler.class);
        subscribe.close();
        createTrackedChecker.assertRemovals(0, true);
        subscribe2.close();
        createTrackedChecker.assertRemovals(0, true);
        subscribe3.close();
        createTrackedChecker.assertRemovals(1, true);
    }

    public void doBefore() {
    }

    public void doAfter() {
    }
}
