package org.gecko.adapter.eventadmin.tests;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.gecko.osgi.messaging.Message;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.Extensions;
import org.mockito.junit.jupiter.MockitoExtension;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceObjects;
import org.osgi.service.cm.ConfigurationAdmin;
import org.osgi.service.cm.annotations.RequireConfigurationAdmin;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventHandler;
import org.osgi.service.event.annotations.RequireEventAdmin;
import org.osgi.test.common.annotation.InjectBundleContext;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.annotation.Property;
import org.osgi.test.common.annotation.config.WithFactoryConfiguration;
import org.osgi.test.common.service.ServiceAware;
import org.osgi.test.junit5.cm.ConfigurationExtension;
import org.osgi.test.junit5.context.BundleContextExtension;
import org.osgi.test.junit5.service.ServiceExtension;
import org.osgi.util.pushstream.PushStream;

@RequireEventAdmin
@Extensions({@ExtendWith({MockitoExtension.class}), @ExtendWith({ServiceExtension.class}), @ExtendWith({ConfigurationExtension.class}), @ExtendWith({BundleContextExtension.class})})
@RequireConfigurationAdmin
/* loaded from: input_file:org/gecko/adapter/eventadmin/tests/EventAdminPushStreamTest.class */
public class EventAdminPushStreamTest {

    @InjectBundleContext
    BundleContext context;

    @InjectService
    EventAdmin ea;

    @InjectService
    ConfigurationAdmin ca;

    @Test
    @WithFactoryConfiguration(factoryPid = "EventAdminTopicSubscription", location = "?", name = "ps", properties = {@Property(key = "topic", value = {"test/topic"})})
    public void basicTest(@InjectService(cardinality = 0) ServiceAware<PushStream> serviceAware, @InjectService(cardinality = 0) ServiceAware<EventHandler> serviceAware2) throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        Assertions.assertNotNull(serviceAware.waitForService(2000L));
        Assertions.assertFalse(serviceAware.isEmpty());
        PushStream pushStream = (PushStream) serviceAware.getService();
        pushStream.onError(th -> {
            System.err.println(String.format("[%s] ERROR BUFFER ", Long.valueOf(System.currentTimeMillis())));
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        int i = 2000;
        CountDownLatch countDownLatch = new CountDownLatch(2000);
        pushStream.adjustBackPressure((message, l) -> {
            System.out.println("bp: " + l);
            return l.longValue();
        }).fork(5, 0, newCachedThreadPool).forEach(message2 -> {
            String str = new String(message2.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        Executors.newSingleThreadExecutor().execute(() -> {
            System.err.println(String.format("[%s] START POSTING ", Long.valueOf(System.currentTimeMillis())));
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    this.ea.postEvent(new Event("test/topic", Collections.singletonMap("content", ByteBuffer.wrap(("test" + i2).getBytes()))));
                } catch (Exception e) {
                    Assertions.assertNull(e);
                }
            }
            System.err.println(String.format("[%s] FINISHED POSTING ", Long.valueOf(System.currentTimeMillis())));
        });
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch.getCount());
        Assertions.assertEquals(1, serviceAware2.getTrackingCount());
        pushStream.close();
        Assertions.assertEquals(1, serviceAware2.getTrackingCount());
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "EventAdminTopicSubscription", location = "?", name = "?", properties = {@Property(key = "topic", value = {"test/topic"})})
    public void basicTestMultipleStreams(@InjectService(cardinality = 0) ServiceAware<PushStream> serviceAware, @InjectService(cardinality = 0) ServiceAware<EventHandler> serviceAware2) throws Exception {
        Assertions.assertNotNull(serviceAware.waitForService(2000L));
        Assertions.assertFalse(serviceAware.isEmpty());
        ServiceObjects serviceObjects = this.context.getServiceObjects(serviceAware.getServiceReference());
        PushStream pushStream = (PushStream) serviceObjects.getService();
        PushStream pushStream2 = (PushStream) serviceObjects.getService();
        PushStream pushStream3 = (PushStream) serviceObjects.getService();
        PushStream pushStream4 = (PushStream) serviceObjects.getService();
        PushStream pushStream5 = (PushStream) serviceObjects.getService();
        PushStream pushStream6 = (PushStream) serviceObjects.getService();
        pushStream.onError(th -> {
            System.err.println(th.getMessage());
            th.printStackTrace();
        });
        pushStream2.onError(th2 -> {
            System.err.println(th2.getMessage());
            th2.printStackTrace();
        });
        pushStream3.onError(th3 -> {
            System.err.println(th3.getMessage());
            th3.printStackTrace();
        });
        pushStream4.onError(th4 -> {
            System.err.println(th4.getMessage());
            th4.printStackTrace();
        });
        pushStream5.onError(th5 -> {
            System.err.println(th5.getMessage());
            th5.printStackTrace();
        });
        pushStream6.onError(th6 -> {
            System.err.println(th6.getMessage());
            th6.printStackTrace();
        });
        Assertions.assertNotEquals(pushStream, pushStream2);
        int i = 100;
        CountDownLatch countDownLatch = new CountDownLatch(100);
        CountDownLatch countDownLatch2 = new CountDownLatch(100);
        CountDownLatch countDownLatch3 = new CountDownLatch(100);
        CountDownLatch countDownLatch4 = new CountDownLatch(100);
        CountDownLatch countDownLatch5 = new CountDownLatch(100);
        CountDownLatch countDownLatch6 = new CountDownLatch(100);
        CountDownLatch countDownLatch7 = new CountDownLatch(6);
        pushStream.onClose(() -> {
            countDownLatch7.countDown();
        });
        pushStream2.onClose(() -> {
            countDownLatch7.countDown();
        });
        pushStream3.onClose(() -> {
            countDownLatch7.countDown();
        });
        pushStream4.onClose(() -> {
            countDownLatch7.countDown();
        });
        pushStream5.onClose(() -> {
            countDownLatch7.countDown();
        });
        pushStream6.onClose(() -> {
            countDownLatch7.countDown();
        });
        pushStream.forEach(message -> {
            String str = new String(message.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        pushStream2.forEach(message2 -> {
            String str = new String(message2.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch2.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        pushStream3.forEach(message3 -> {
            String str = new String(message3.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch3.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        pushStream4.forEach(message4 -> {
            String str = new String(message4.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch4.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        pushStream5.forEach(message5 -> {
            String str = new String(message5.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch5.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        pushStream6.forEach(message6 -> {
            String str = new String(message6.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch6.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    this.ea.postEvent(new Event("test/topic", Collections.singletonMap("content", ByteBuffer.wrap(("test" + i2).getBytes()))));
                } catch (Exception e) {
                    Assertions.assertNull(e);
                }
            }
        });
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch.getCount());
        Assertions.assertTrue(countDownLatch2.await(10L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch2.getCount());
        Assertions.assertTrue(countDownLatch3.await(10L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch3.getCount());
        Assertions.assertTrue(countDownLatch4.await(10L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch4.getCount());
        Assertions.assertTrue(countDownLatch5.await(10L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch5.getCount());
        Assertions.assertTrue(countDownLatch6.await(10L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch6.getCount());
        Assertions.assertEquals(1, serviceAware2.getTrackingCount());
        pushStream.close();
        pushStream2.close();
        pushStream3.close();
        Assertions.assertEquals(1, serviceAware2.getTrackingCount());
        Assertions.assertEquals(3L, countDownLatch7.getCount());
    }

    @Test
    @WithFactoryConfiguration(factoryPid = "EventAdminTopicSubscription", location = "?", name = "ps", properties = {@Property(key = "topic", value = {"test/MultipleStreamsWithError"})})
    public void basicTestMultipleStreamsWithError(@InjectService(cardinality = 0) ServiceAware<PushStream> serviceAware, @InjectService(cardinality = 0) ServiceAware<EventHandler> serviceAware2) throws Exception {
        serviceAware.waitForService(2000L);
        ServiceObjects serviceObjects = this.context.getServiceObjects(serviceAware.getServiceReference());
        PushStream pushStream = (PushStream) serviceObjects.getService();
        PushStream pushStream2 = (PushStream) serviceObjects.getService();
        Assertions.assertNotEquals(pushStream, pushStream2);
        int i = 100;
        CountDownLatch countDownLatch = new CountDownLatch(100);
        CountDownLatch countDownLatch2 = new CountDownLatch(5);
        CountDownLatch countDownLatch3 = new CountDownLatch(2);
        CountDownLatch countDownLatch4 = new CountDownLatch(1);
        pushStream.onClose(() -> {
            countDownLatch3.countDown();
        });
        pushStream2.onClose(() -> {
            countDownLatch3.countDown();
        });
        pushStream2.onError(th -> {
            countDownLatch4.countDown();
        });
        pushStream.forEach(message -> {
            String str = new String(message.payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
        });
        pushStream2.forEachEvent(pushEvent -> {
            String str = new String(((Message) pushEvent.getData()).payload().array());
            Assertions.assertTrue(str.startsWith("test"));
            countDownLatch2.countDown();
            System.out.println("sub content: " + str + " ts: " + System.currentTimeMillis());
            if (countDownLatch2.getCount() == 0) {
                throw new RuntimeException("ERROR");
            }
            return 0L;
        });
        Executors.newSingleThreadExecutor().execute(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    this.ea.postEvent(new Event("test/MultipleStreamsWithError", Collections.singletonMap("content", ByteBuffer.wrap(("test" + i2).getBytes()))));
                } catch (Exception e) {
                    Assertions.assertNull(e);
                }
            }
        });
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Not all messages have been prcessed. Current count " + countDownLatch.getCount());
        Assertions.assertTrue(countDownLatch2.await(10L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch4.await(5L, TimeUnit.SECONDS));
        Assertions.assertEquals(1L, countDownLatch3.getCount());
        Assertions.assertEquals(1, serviceAware2.getTrackingCount());
        pushStream.close();
        Assertions.assertTrue(countDownLatch3.await(1L, TimeUnit.SECONDS));
    }
}
