package org.eclipse.ece.messaging;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.ece.messaging.core.BasicExampleTest;
import org.eclipse.ece.messaging.core.ITestConstants;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import org.osgi.service.cm.Configuration;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/eclipse/ece/messaging/Example05BackPressureTest.class */
public class Example05BackPressureTest extends BasicExampleTest {
    private final BundleContext context = FrameworkUtil.getBundle(Example05BackPressureTest.class).getBundleContext();
    private Configuration mqttConfig = null;

    @Before
    public void before() throws MqttException {
        setup(this.context);
    }

    @After
    public void after() throws MqttException, IOException {
        if (this.mqttConfig != null) {
            this.mqttConfig.delete();
            this.mqttConfig = null;
        }
    }

    @Test
    public void testPushstreamBackpressure() throws Exception {
        this.mqttConfig = getConfiguration("MQTTService");
        Assert.assertNull(this.mqttConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        hashtable.put("brokerUrl", ITestConstants.MQTT_BROKER_URL);
        hashtable.put("type", "mqtt");
        this.mqttConfig.update(hashtable);
        MessagingService messagingService = (MessagingService) getService(FrameworkUtil.createFilter("(type=mqtt)"), 30000L);
        Assert.assertNotNull(messagingService);
        AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis());
        AtomicLong atomicLong2 = new AtomicLong(System.currentTimeMillis());
        messagingService.publish(ITestConstants.MQTT_TOPIC_TEMP, ByteBuffer.wrap("start c-mqtt01".getBytes()));
        messagingService.subscribe(ITestConstants.MQTT_TOPIC_TEMP).adjustBackPressure(j -> {
            return 1500L;
        }).map(this::convertMessageToString).forEach(str -> {
            long currentTimeMillis = System.currentTimeMillis();
            System.out.println("Push stream 1 back-pressure: '" + str + "' took " + (currentTimeMillis - atomicLong.getAndSet(currentTimeMillis)) + "ms");
        });
        messagingService.subscribe(ITestConstants.MQTT_TOPIC_TEMP).map(this::convertMessageToString).forEach(str2 -> {
            long currentTimeMillis = System.currentTimeMillis();
            System.out.println("Push stream 2 no back-pressure: '" + str2 + "' took " + (currentTimeMillis - atomicLong2.getAndSet(currentTimeMillis)) + "ms");
        });
        new CountDownLatch(1).await(10L, TimeUnit.SECONDS);
        messagingService.publish(ITestConstants.MQTT_TOPIC_TEMP, ByteBuffer.wrap("stop c-mqtt01".getBytes()));
        System.out.println("Stop producing data for both push streams");
        new CountDownLatch(1).await(10L, TimeUnit.SECONDS);
    }

    private String convertMessageToString(Message message) {
        return new String(message.payload().array());
    }
}
