package org.eclipse.ece.messaging;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.eclipse.ece.messaging.core.BasicExampleTest;
import org.eclipse.ece.messaging.core.ITestConstants;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.framework.BundleContext;
import org.osgi.framework.FrameworkUtil;
import org.osgi.service.cm.Configuration;
import org.osgi.util.pushstream.PushStream;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/eclipse/ece/messaging/Example04CoalesceTest.class */
public class Example04CoalesceTest extends BasicExampleTest {
    private final BundleContext context = FrameworkUtil.getBundle(Example04CoalesceTest.class).getBundleContext();
    private Configuration mqttConfig = null;
    private Configuration wsConfig = null;

    @Before
    public void before() throws MqttException {
        setup(this.context);
    }

    @After
    public void after() throws MqttException, IOException {
        if (this.wsConfig != null) {
            this.wsConfig.delete();
            this.wsConfig = null;
        }
        if (this.mqttConfig != null) {
            this.mqttConfig.delete();
            this.mqttConfig = null;
        }
    }

    @Test
    public void testPushStreamCoalescing() throws Exception {
        this.mqttConfig = getConfiguration("MQTTService");
        this.wsConfig = getConfiguration("WSService");
        Assert.assertNull(this.mqttConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        hashtable.put("brokerUrl", ITestConstants.MQTT_BROKER_URL);
        hashtable.put("type", "mqtt");
        Assert.assertNull(this.mqttConfig.getProperties());
        Hashtable hashtable2 = new Hashtable();
        hashtable2.put("brokerUrl", ITestConstants.WS_BROKER_URL);
        hashtable2.put("type", "ws");
        this.wsConfig.update(hashtable2);
        this.mqttConfig.update(hashtable);
        MessagingService messagingService = (MessagingService) getService(FrameworkUtil.createFilter("(type=mqtt)"), 30000L);
        Assert.assertNotNull(messagingService);
        MessagingService messagingService2 = (MessagingService) getService(FrameworkUtil.createFilter("(type=ws)"), 30000L);
        Assert.assertNotNull(messagingService2);
        PushStream subscribe = messagingService.subscribe(ITestConstants.MQTT_TOPIC_TEMP);
        PushStream subscribe2 = messagingService2.subscribe(ITestConstants.WS_PATH);
        subscribe.merge(subscribe2).merge(messagingService.subscribe(ITestConstants.MQTT_TOPIC_WIND)).map(this::convertMessageToString).coalesce(5, collection -> {
            return (String) collection.stream().map(str -> {
                return str.replaceAll("locationId", "l").replaceAll("location", "l").replaceAll("client", "l").replaceAll("value", "v").replaceAll("unit:\"mps\", ", "").replaceAll("time", "t").replaceAll("tstamp", "t").replaceAll("timestamp", "t");
            }).collect(Collectors.joining(","));
        }).forEach(str -> {
            System.out.println("Merged push stream in 'batch-mode': " + str);
        });
        messagingService.publish(ITestConstants.MQTT_TOPIC_TEMP, ByteBuffer.wrap("start c-mqtt01".getBytes()));
        new CountDownLatch(1).await(5L, TimeUnit.SECONDS);
        messagingService.publish(ITestConstants.MQTT_TOPIC_WIND, ByteBuffer.wrap("start c-mqtt03".getBytes()));
        new CountDownLatch(1).await(5L, TimeUnit.SECONDS);
        messagingService.publish(ITestConstants.MQTT_TOPIC_TEMP, ByteBuffer.wrap("start c-mqtt02".getBytes()));
        new CountDownLatch(1).await(5L, TimeUnit.SECONDS);
        messagingService2.publish(ITestConstants.WS_PATH, ByteBuffer.wrap("start".getBytes()));
        new CountDownLatch(1).await(5L, TimeUnit.SECONDS);
        messagingService.publish(ITestConstants.MQTT_TOPIC_TEMP, ByteBuffer.wrap("stop c-mqtt01".getBytes()));
        messagingService.publish(ITestConstants.MQTT_TOPIC_TEMP, ByteBuffer.wrap("stop c-mqtt02".getBytes()));
        messagingService.publish(ITestConstants.MQTT_TOPIC_WIND, ByteBuffer.wrap("stop c-mqtt03".getBytes()));
        messagingService2.publish(ITestConstants.WS_PATH, ByteBuffer.wrap("stop".getBytes()));
    }

    private String convertMessageToString(Message message) {
        return new String(message.payload().array());
    }
}
