package org.gecko.adapter.mqtt.tests;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.gecko.osgi.messaging.MessagingService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Filter;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;

@RunWith(MockitoJUnitRunner.class)
/* loaded from: input_file:org/gecko/adapter/mqtt/tests/MqttComponentPublishTest.class */
public class MqttComponentPublishTest {
    private MqttClient checkClient;
    private String brokerUrl = "tcp://iot.eclipse.org:1883";
    private Configuration clientConfig = null;
    private final BundleContext context = FrameworkUtil.getBundle(MqttComponentPublishTest.class).getBundleContext();

    @Before
    public void setup() throws MqttException {
        this.checkClient = new MqttClient(this.brokerUrl, "test");
        this.checkClient.connect();
    }

    @After
    public void teardown() throws MqttException, IOException {
        this.checkClient.disconnect();
        this.checkClient.close();
        if (this.clientConfig != null) {
            this.clientConfig.delete();
            this.clientConfig = null;
        }
    }

    @Test
    public void testPublishMessage() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.clientConfig = getConfiguration(this.context, "MQTTService", countDownLatch);
        Assert.assertNull(this.clientConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        hashtable.put("brokerUrl", this.brokerUrl);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicReference<String> atomicReference = new AtomicReference<>();
        connectClient("publish.junit", countDownLatch2, atomicReference);
        this.clientConfig.update(hashtable);
        countDownLatch.await(10L, TimeUnit.SECONDS);
        MessagingService messagingService = (MessagingService) getService(MessagingService.class, 30000L);
        Assert.assertNotNull(messagingService);
        messagingService.publish("publish.junit", ByteBuffer.wrap("this is a test".getBytes()));
        countDownLatch2.await(5L, TimeUnit.SECONDS);
        Assert.assertEquals("this is a test", atomicReference.get());
    }

    @Test
    public void testPublishMessage_WildCard() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.clientConfig = getConfiguration(this.context, "MQTTService", countDownLatch);
        Assert.assertNull(this.clientConfig.getProperties());
        Hashtable hashtable = new Hashtable();
        hashtable.put("brokerUrl", this.brokerUrl);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicReference<String> atomicReference = new AtomicReference<>();
        connectClient("publish.junit", countDownLatch2, atomicReference);
        ServiceTracker serviceTracker = new ServiceTracker(this.context, MessagingService.class, new TestServiceCustomizer(this.context, countDownLatch));
        serviceTracker.open(true);
        this.clientConfig.update(hashtable);
        countDownLatch.await(5L, TimeUnit.SECONDS);
        Assert.assertEquals(1L, r0.getAddCount());
        MessagingService messagingService = (MessagingService) serviceTracker.waitForService(15000L);
        Assert.assertNotNull(messagingService);
        messagingService.publish("publish.junit", ByteBuffer.wrap("this is a test".getBytes()));
        countDownLatch2.await(5L, TimeUnit.SECONDS);
        Assert.assertEquals("this is a test", atomicReference.get());
    }

    private Configuration getConfiguration(BundleContext bundleContext, String str, CountDownLatch countDownLatch) throws Exception {
        ServiceReference[] allServiceReferences = bundleContext.getAllServiceReferences(ConfigurationAdmin.class.getName(), (String) null);
        Assert.assertNotNull(allServiceReferences);
        Assert.assertEquals(1L, allServiceReferences.length);
        Object service = bundleContext.getService(allServiceReferences[0]);
        Assert.assertNotNull(service);
        Assert.assertTrue(service instanceof ConfigurationAdmin);
        Configuration configuration = ((ConfigurationAdmin) service).getConfiguration(str, "?");
        Assert.assertNotNull(configuration);
        return configuration;
    }

    private void connectClient(String str, final CountDownLatch countDownLatch, final AtomicReference<String> atomicReference) throws MqttException {
        this.checkClient.subscribe(str);
        this.checkClient.setCallback(new MqttCallback() { // from class: org.gecko.adapter.mqtt.tests.MqttComponentPublishTest.1
            public void messageArrived(String str2, MqttMessage mqttMessage) throws Exception {
                atomicReference.set(new String(mqttMessage.getPayload()));
                countDownLatch.countDown();
            }

            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                Assert.fail("delivery complete was not expected");
            }

            public void connectionLost(Throwable th) {
                Assert.fail("fail was not expected");
            }
        });
    }

    <T> T getService(Class<T> cls, long j) throws InterruptedException {
        ServiceTracker serviceTracker = new ServiceTracker(this.context, cls, (ServiceTrackerCustomizer) null);
        serviceTracker.open();
        return (T) serviceTracker.waitForService(j);
    }

    <T> T getService(Filter filter, long j) throws InterruptedException {
        ServiceTracker serviceTracker = new ServiceTracker(this.context, filter, (ServiceTrackerCustomizer) null);
        serviceTracker.open();
        return (T) serviceTracker.waitForService(j);
    }
}
