package org.gecko.adapter.eventadmin.tests;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.gecko.core.tests.AbstractOSGiTest;
import org.gecko.core.tests.ServiceChecker;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.annotations.RequireEventAdminMessageAdapter;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceObjects;
import org.osgi.service.cm.Configuration;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventHandler;
import org.osgi.util.pushstream.PushStream;

@RunWith(MockitoJUnitRunner.class)
@RequireEventAdminMessageAdapter
/* loaded from: input_file:org/gecko/adapter/eventadmin/tests/EventAdminPushStreamTest.class */
public class EventAdminPushStreamTest extends AbstractOSGiTest {
    public EventAdminPushStreamTest() {
        super(FrameworkUtil.getBundle(EventAdminPushStreamTest.class).getBundleContext());
    }

    public void test() {
    }

    @Test
    public void basicTest() throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        Hashtable hashtable = new Hashtable();
        hashtable.put("topic", "test/topic");
        ServiceChecker createTrackedChecker = createTrackedChecker(PushStream.class);
        createTrackedChecker.start();
        Configuration createConfigForCleanup = createConfigForCleanup("EventAdminTopicSubscription", "?", hashtable);
        createTrackedChecker.assertCreations(1, true);
        PushStream pushStream = (PushStream) createTrackedChecker.assertCreations(1, true).getTrackedService();
        pushStream.onError(th -> {
            System.err.println(String.format("[%s] ERROR BUFFER ", Long.valueOf(System.currentTimeMillis())));
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        int i = 2000;
        CountDownLatch countDownLatch = new CountDownLatch(2000);
        pushStream.adjustBackPressure((message, l) -> {
            System.out.println("bp: " + l);
            return l.longValue();
        }).fork(5, 0, newCachedThreadPool).forEach(message2 -> {
            String str = new String(message2.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        EventAdmin eventAdmin = (EventAdmin) getService(EventAdmin.class);
        Executors.newSingleThreadExecutor().execute(() -> {
            System.err.println(String.format("[%s] START POSTING ", Long.valueOf(System.currentTimeMillis())));
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    eventAdmin.postEvent(new Event("test/topic", Collections.singletonMap("content", ByteBuffer.wrap(("test" + i2).getBytes()))));
                } catch (Exception e) {
                    Assert.assertNull(e);
                }
            }
            System.err.println(String.format("[%s] FINISHED POSTING ", Long.valueOf(System.currentTimeMillis())));
        });
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch.getCount(), countDownLatch.await(10L, TimeUnit.SECONDS));
        ServiceChecker createTrackedChecker2 = createTrackedChecker(EventHandler.class);
        createTrackedChecker2.start();
        pushStream.close();
        createTrackedChecker2.assertRemovals(0, true);
        deleteConfigurationAndRemoveFromCleanup(createConfigForCleanup);
        createTrackedChecker2.assertRemovals(1, true);
    }

    @Test
    public void basicTestMultipleStreams() throws Exception {
        Executors.newCachedThreadPool();
        Hashtable hashtable = new Hashtable();
        hashtable.put("topic", "test/topic");
        ServiceChecker createTrackedChecker = createTrackedChecker(PushStream.class);
        createTrackedChecker.start();
        Configuration createConfigForCleanup = createConfigForCleanup("EventAdminTopicSubscription", "?", hashtable);
        createTrackedChecker.assertCreations(1, true);
        ServiceObjects serviceObjects = getServiceObjects(PushStream.class);
        PushStream pushStream = (PushStream) serviceObjects.getService();
        PushStream pushStream2 = (PushStream) serviceObjects.getService();
        PushStream pushStream3 = (PushStream) serviceObjects.getService();
        PushStream pushStream4 = (PushStream) serviceObjects.getService();
        PushStream pushStream5 = (PushStream) serviceObjects.getService();
        PushStream pushStream6 = (PushStream) serviceObjects.getService();
        pushStream.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        pushStream2.onError(th2 -> {
            System.err.println(th2.getMessage());
            th2.printStackTrace();
        });
        pushStream3.onError(th3 -> {
            System.err.println(th3.getMessage());
            th3.printStackTrace();
        });
        pushStream4.onError(th4 -> {
            System.err.println(th4.getMessage());
            th4.printStackTrace();
        });
        pushStream5.onError(th5 -> {
            System.err.println(th5.getMessage());
            th5.printStackTrace();
        });
        pushStream6.onError(th6 -> {
            System.err.println(th6.getMessage());
            th6.printStackTrace();
        });
        Assert.assertNotEquals(pushStream, pushStream2);
        int i = 100;
        CountDownLatch countDownLatch = new CountDownLatch(100);
        CountDownLatch countDownLatch2 = new CountDownLatch(100);
        CountDownLatch countDownLatch3 = new CountDownLatch(100);
        CountDownLatch countDownLatch4 = new CountDownLatch(100);
        CountDownLatch countDownLatch5 = new CountDownLatch(100);
        CountDownLatch countDownLatch6 = new CountDownLatch(100);
        CountDownLatch countDownLatch7 = new CountDownLatch(6);
        pushStream.onClose(() -> {
            countDownLatch7.countDown();
        });
        pushStream2.onClose(() -> {
            countDownLatch7.countDown();
        });
        pushStream3.onClose(() -> {
            countDownLatch7.countDown();
        });
        pushStream4.onClose(() -> {
            countDownLatch7.countDown();
        });
        pushStream5.onClose(() -> {
            countDownLatch7.countDown();
        });
        pushStream6.onClose(() -> {
            countDownLatch7.countDown();
        });
        pushStream.forEach(message -> {
            String str = new String(message.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        pushStream2.forEach(message2 -> {
            String str = new String(message2.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch2.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        pushStream3.forEach(message3 -> {
            String str = new String(message3.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch3.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        pushStream4.forEach(message4 -> {
            String str = new String(message4.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch4.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        pushStream5.forEach(message5 -> {
            String str = new String(message5.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch5.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        pushStream6.forEach(message6 -> {
            String str = new String(message6.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch6.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        EventAdmin eventAdmin = (EventAdmin) getService(EventAdmin.class);
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    eventAdmin.postEvent(new Event("test/topic", Collections.singletonMap("content", ByteBuffer.wrap(("test" + i2).getBytes()))));
                } catch (Exception e) {
                    Assert.assertNull(e);
                }
            }
        });
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch.getCount(), countDownLatch.await(10L, TimeUnit.SECONDS));
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch2.getCount(), countDownLatch2.await(10L, TimeUnit.SECONDS));
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch3.getCount(), countDownLatch3.await(10L, TimeUnit.SECONDS));
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch4.getCount(), countDownLatch4.await(10L, TimeUnit.SECONDS));
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch5.getCount(), countDownLatch5.await(10L, TimeUnit.SECONDS));
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch6.getCount(), countDownLatch6.await(10L, TimeUnit.SECONDS));
        ServiceChecker createTrackedChecker2 = createTrackedChecker(EventHandler.class);
        createTrackedChecker2.start();
        pushStream.close();
        createTrackedChecker2.assertRemovals(0, true);
        Assert.assertEquals(5L, countDownLatch7.getCount());
        pushStream2.close();
        createTrackedChecker2.assertRemovals(0, true);
        Assert.assertEquals(4L, countDownLatch7.getCount());
        deleteConfigurationAndRemoveFromCleanup(createConfigForCleanup);
        createTrackedChecker2.assertRemovals(1, true);
        Assert.assertTrue(countDownLatch7.await(1L, TimeUnit.SECONDS));
    }

    @Test
    public void basicTestMultipleStreamsWithError() throws Exception {
        Executors.newCachedThreadPool();
        Hashtable hashtable = new Hashtable();
        hashtable.put("topic", "test/topic");
        ServiceChecker createTrackedChecker = createTrackedChecker(PushStream.class);
        createTrackedChecker.start();
        Configuration createConfigForCleanup = createConfigForCleanup("EventAdminTopicSubscription", "?", hashtable);
        createTrackedChecker.assertCreations(1, true);
        ServiceObjects serviceObjects = getServiceObjects(PushStream.class);
        PushStream pushStream = (PushStream) serviceObjects.getService();
        PushStream pushStream2 = (PushStream) serviceObjects.getService();
        Assert.assertNotEquals(pushStream, pushStream2);
        int i = 100;
        CountDownLatch countDownLatch = new CountDownLatch(100);
        CountDownLatch countDownLatch2 = new CountDownLatch(100);
        CountDownLatch countDownLatch3 = new CountDownLatch(2);
        CountDownLatch countDownLatch4 = new CountDownLatch(1);
        pushStream.onClose(() -> {
            countDownLatch3.countDown();
        });
        pushStream2.onClose(() -> {
            countDownLatch3.countDown();
        });
        pushStream2.onError(th -> {
            countDownLatch4.countDown();
        });
        pushStream.forEach(message -> {
            String str = new String(message.payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        pushStream2.forEachEvent(pushEvent -> {
            String str = new String(((Message) pushEvent.getData()).payload().array());
            Assert.assertTrue(str.startsWith("test"));
            countDownLatch2.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
            if (countDownLatch2.getCount() == 95) {
                throw new RuntimeException("ERROR");
            }
            return 0L;
        });
        EventAdmin eventAdmin = (EventAdmin) getService(EventAdmin.class);
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    eventAdmin.postEvent(new Event("test/topic", Collections.singletonMap("content", ByteBuffer.wrap(("test" + i2).getBytes()))));
                } catch (Exception e) {
                    Assert.assertNull(e);
                }
            }
        });
        Assert.assertTrue("Not all messages have been prcessed. Current count " + countDownLatch.getCount(), countDownLatch.await(10L, TimeUnit.SECONDS));
        Assert.assertEquals(95L, countDownLatch2.getCount());
        ServiceChecker createTrackedChecker2 = createTrackedChecker(EventHandler.class);
        Assert.assertEquals(1L, countDownLatch3.getCount());
        Assert.assertEquals(0L, countDownLatch4.getCount());
        createTrackedChecker2.start();
        pushStream.close();
        createTrackedChecker2.assertRemovals(0, true);
        Assert.assertTrue(countDownLatch3.await(1L, TimeUnit.SECONDS));
        deleteConfigurationAndRemoveFromCleanup(createConfigForCleanup);
        createTrackedChecker2.assertRemovals(1, true);
    }

    public void doBefore() {
    }

    public void doAfter() {
    }
}
