package org.eclipse.sensinact.gateway.southbound.mqtt.factory.integration;

import io.moquette.broker.Server;
import io.moquette.broker.config.MemoryConfig;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.sensinact.core.command.AbstractSensinactCommand;
import org.eclipse.sensinact.core.command.GatewayThread;
import org.eclipse.sensinact.core.model.SensinactModelManager;
import org.eclipse.sensinact.core.notification.ClientActionListener;
import org.eclipse.sensinact.core.notification.ClientLifecycleListener;
import org.eclipse.sensinact.core.notification.ClientMetadataListener;
import org.eclipse.sensinact.core.notification.ResourceDataNotification;
import org.eclipse.sensinact.core.twin.SensinactDigitalTwin;
import org.eclipse.sensinact.gateway.geojson.Point;
import org.eclipse.sensinact.northbound.session.ResourceDescription;
import org.eclipse.sensinact.northbound.session.SensiNactSession;
import org.eclipse.sensinact.northbound.session.SensiNactSessionManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.osgi.annotation.bundle.Requirement;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.annotation.Property;
import org.osgi.test.common.annotation.config.WithConfiguration;
import org.osgi.test.common.annotation.config.WithConfigurations;
import org.osgi.test.common.annotation.config.WithFactoryConfiguration;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.PromiseFactory;
import org.osgi.util.promise.Promises;

@WithConfigurations({@WithConfiguration(pid = "sensinact.mqtt.device.factory", location = "?", properties = {@Property(key = "mqtt.handler.id", value = {"handlerWS"}), @Property(key = "mapping", value = {"{\n  \"parser\": \"csv\",\n  \"parser.options\": { \"header\": true },\n  \"mapping\": {\n    \"@provider\": \"Name\",\n    \"@latitude\": { \"path\": \"Latitude\", \"type\": \"float\" },\n    \"@longitude\": { \"path\": \"Longitude\", \"type\": \"float\" },\n    \"@date\": \"Date\", \"@time\": \"Time\",\n    \"data/testName\": { \"literal\": \"${context.topic-2}\" },\n    \"data/value\": { \"path\": \"Value\", \"type\": \"int\" }\n  },\n  \"mapping.options\": { \"format.date\": \"d.M.y\", \"format.time\": \"H:m\" }\n}"})}), @WithConfiguration(pid = "sensinact.session.manager", properties = {@Property(key = "auth.policy", value = {"ALLOW_ALL"})})})
@WithFactoryConfiguration(factoryPid = "sensinact.southbound.mqtt", name = "h1", location = "?", properties = {@Property(key = "id", value = {"handlerWS"}), @Property(key = "protocol", value = {"ws"}), @Property(key = "host", value = {"127.0.0.1"}), @Property(key = "port", value = {"2184"}), @Property(key = "path", value = {"/ws"}), @Property(key = "topics", value = {"sensinact/mqtt/testWS/+"})})
@Requirement(namespace = "osgi.service", filter = "(objectClass=org.eclipse.sensinact.northbound.session.SensiNactSessionManager)")
/* loaded from: input_file:org/eclipse/sensinact/gateway/southbound/mqtt/factory/integration/MqttWSDeviceFactoryTest.class */
public class MqttWSDeviceFactoryTest {
    private static Server server;
    private static String TOPIC = "sensinact/mqtt/testWS/handler";

    @InjectService
    GatewayThread thread;

    @InjectService
    SensiNactSessionManager sessionManager;
    SensiNactSession session;
    BlockingQueue<ResourceDataNotification> queue;
    private final String typedProvider1 = "typed-provider1";
    private final String typedProvider2 = "typed-provider2";
    private MqttClient client;

    @BeforeAll
    static void startBroker() throws IOException {
        server = new Server();
        MemoryConfig memoryConfig = new MemoryConfig(new Properties());
        memoryConfig.setProperty("host", "127.0.0.1");
        memoryConfig.setProperty("port", "2183");
        memoryConfig.setProperty("websocket_port", "2184");
        memoryConfig.setProperty("websocket_path", "/ws");
        server.startServer(memoryConfig);
    }

    @AfterAll
    static void stopBroker() throws IOException {
        server.stopServer();
    }

    @BeforeEach
    void start() throws Exception {
        this.session = this.sessionManager.getDefaultAnonymousSession();
        this.queue = new ArrayBlockingQueue(32);
        this.client = new MqttClient("tcp://127.0.0.1:2183", MqttClient.generateClientId());
        this.client.connect();
    }

    @AfterEach
    void stop() throws Exception {
        Set keySet = this.session.activeListeners().keySet();
        SensiNactSession sensiNactSession = this.session;
        Objects.requireNonNull(sensiNactSession);
        keySet.forEach(sensiNactSession::removeListener);
        this.session = null;
        if (this.client.isConnected()) {
            this.client.publish(TOPIC, new byte[0], 1, true);
            this.client.disconnect();
        }
        this.client.close();
        this.thread.execute(new AbstractSensinactCommand<Void>() { // from class: org.eclipse.sensinact.gateway.southbound.mqtt.factory.integration.MqttWSDeviceFactoryTest.1
            protected Promise<Void> call(SensinactDigitalTwin sensinactDigitalTwin, SensinactModelManager sensinactModelManager, PromiseFactory promiseFactory) {
                Iterator it = Arrays.asList("typed-provider1", "typed-provider2").iterator();
                while (it.hasNext()) {
                    Optional.ofNullable(sensinactDigitalTwin.getProvider((String) it.next())).ifPresent(sensinactProvider -> {
                        sensinactProvider.delete();
                    });
                }
                return Promises.resolved((Object) null);
            }
        }).getValue();
    }

    byte[] readFile(String str) throws IOException {
        InputStream resourceAsStream = getClass().getClassLoader().getResourceAsStream("/" + str);
        try {
            byte[] readAllBytes = resourceAsStream.readAllBytes();
            if (resourceAsStream != null) {
                resourceAsStream.close();
            }
            return readAllBytes;
        } catch (Throwable th) {
            if (resourceAsStream != null) {
                try {
                    resourceAsStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testWorking() throws Exception {
        this.session.addListener(List.of("typed-provider1/*"), (str, resourceDataNotification) -> {
            this.queue.offer(resourceDataNotification);
        }, (ClientMetadataListener) null, (ClientLifecycleListener) null, (ClientActionListener) null);
        Assertions.assertNull(this.session.describeProvider("typed-provider1"));
        Assertions.assertNull(this.session.describeProvider("typed-provider2"));
        this.client.publish(TOPIC, readFile("csv-header-typed.csv"), 1, true);
        Assertions.assertNotNull(this.queue.poll(1L, TimeUnit.SECONDS));
        Assertions.assertEquals(42, (Integer) this.session.getResourceValue("typed-provider1", "data", "value", Integer.class));
        Assertions.assertEquals(84, (Integer) this.session.getResourceValue("typed-provider2", "data", "value", Integer.class));
        Instant from = Instant.from(LocalDateTime.of(2021, 10, 20, 18, 14, 0).atOffset(ZoneOffset.UTC));
        Assertions.assertEquals(from, this.session.describeResource("typed-provider1", "data", "value").timestamp);
        Instant from2 = Instant.from(LocalDateTime.of(2021, 10, 20, 18, 17, 0).atOffset(ZoneOffset.UTC));
        Assertions.assertEquals(from2, this.session.describeResource("typed-provider2", "data", "value").timestamp);
        ResourceDescription describeResource = this.session.describeResource("typed-provider1", "admin", "location");
        Assertions.assertEquals(from, describeResource.timestamp);
        Assertions.assertNotNull(describeResource.value);
        Point point = (Point) describeResource.value;
        Assertions.assertEquals(1.2d, point.coordinates.latitude, 0.001d);
        Assertions.assertEquals(3.4d, point.coordinates.longitude, 0.001d);
        Assertions.assertTrue(Double.isNaN(point.coordinates.elevation));
        ResourceDescription describeResource2 = this.session.describeResource("typed-provider2", "admin", "location");
        Assertions.assertNotNull(describeResource2.value);
        Assertions.assertEquals(from2, describeResource2.timestamp);
        Point point2 = (Point) describeResource2.value;
        Assertions.assertEquals(5.6d, point2.coordinates.latitude, 0.001d);
        Assertions.assertEquals(7.8d, point2.coordinates.longitude, 0.001d);
        Assertions.assertTrue(Double.isNaN(point2.coordinates.elevation));
        Assertions.assertEquals("testWS", this.session.getResourceValue("typed-provider1", "data", "testName", String.class));
        Assertions.assertEquals("testWS", this.session.getResourceValue("typed-provider2", "data", "testName", String.class));
    }
}
