/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.sensinact.gateway.southbound.mqtt.factory.integration;

import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.sensinact.core.command.AbstractSensinactCommand;
import org.eclipse.sensinact.core.command.GatewayThread;
import org.eclipse.sensinact.core.model.SensinactModelManager;
import org.eclipse.sensinact.core.notification.ResourceDataNotification;
import org.eclipse.sensinact.core.twin.SensinactDigitalTwin;
import org.eclipse.sensinact.gateway.geojson.Point;
import org.eclipse.sensinact.northbound.session.ResourceDescription;
import org.eclipse.sensinact.northbound.session.SensiNactSession;
import org.eclipse.sensinact.northbound.session.SensiNactSessionManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.osgi.annotation.bundle.Requirement;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.annotation.Property;
import org.osgi.test.common.annotation.config.WithConfiguration;
import org.osgi.test.common.annotation.config.WithConfigurations;
import org.osgi.test.common.annotation.config.WithFactoryConfiguration;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.PromiseFactory;
import org.osgi.util.promise.Promises;

@WithFactoryConfiguration(factoryPid="sensinact.southbound.mqtt", name="h1", location="?", properties={@Property(key="id", value={"handlerWS"}), @Property(key="protocol", value={"ws"}), @Property(key="host", value={"127.0.0.1"}), @Property(key="port", value={"2184"}), @Property(key="path", value={"/ws"}), @Property(key="topics", value={"sensinact/mqtt/testWS/+"})})
@WithConfigurations(value={@WithConfiguration(pid="sensinact.mqtt.device.factory", location="?", properties={@Property(key="mqtt.handler.id", value={"handlerWS"}), @Property(key="mapping", value={"{\n  \"parser\": \"csv\",\n  \"parser.options\": { \"header\": true },\n  \"mapping\": {\n    \"@provider\": \"Name\",\n    \"@latitude\": { \"path\": \"Latitude\", \"type\": \"float\" },\n    \"@longitude\": { \"path\": \"Longitude\", \"type\": \"float\" },\n    \"@date\": \"Date\", \"@time\": \"Time\",\n    \"data/testName\": { \"literal\": \"${context.topic-2}\" },\n    \"data/value\": { \"path\": \"Value\", \"type\": \"int\" }\n  },\n  \"mapping.options\": { \"format.date\": \"d.M.y\", \"format.time\": \"H:m\" }\n}"})}), @WithConfiguration(pid="sensinact.session.manager", properties={@Property(key="auth.policy", value={"ALLOW_ALL"})})})
@Requirement(namespace="osgi.service", filter="(objectClass=org.eclipse.sensinact.northbound.session.SensiNactSessionManager)")
public class MqttWSDeviceFactoryTest {
    private static Server server;
    private static String TOPIC;
    @InjectService
    GatewayThread thread;
    @InjectService
    SensiNactSessionManager sessionManager;
    SensiNactSession session;
    BlockingQueue<ResourceDataNotification> queue;
    private final String typedProvider1 = "typed-provider1";
    private final String typedProvider2 = "typed-provider2";
    private MqttClient client;

    @BeforeAll
    static void startBroker() throws IOException {
        server = new Server();
        MemoryConfig config = new MemoryConfig(new Properties());
        ((IConfig)config).setProperty("host", "127.0.0.1");
        ((IConfig)config).setProperty("port", "2183");
        ((IConfig)config).setProperty("websocket_port", "2184");
        ((IConfig)config).setProperty("websocket_path", "/ws");
        server.startServer(config);
    }

    @AfterAll
    static void stopBroker() throws IOException {
        server.stopServer();
    }

    @BeforeEach
    void start() throws Exception {
        this.session = this.sessionManager.getDefaultAnonymousSession();
        this.queue = new ArrayBlockingQueue<ResourceDataNotification>(32);
        this.client = new MqttClient("tcp://127.0.0.1:2183", MqttClient.generateClientId());
        this.client.connect();
    }

    @AfterEach
    void stop() throws Exception {
        this.session.activeListeners().keySet().forEach(arg_0 -> ((SensiNactSession)this.session).removeListener(arg_0));
        this.session = null;
        if (this.client.isConnected()) {
            this.client.publish(TOPIC, new byte[0], 1, true);
            this.client.disconnect();
        }
        this.client.close();
        this.thread.execute((AbstractSensinactCommand)new AbstractSensinactCommand<Void>(){

            protected Promise<Void> call(SensinactDigitalTwin twin, SensinactModelManager modelMgr, PromiseFactory promiseFactory) {
                for (String provider : Arrays.asList("typed-provider1", "typed-provider2")) {
                    Optional.ofNullable(twin.getProvider(provider)).ifPresent(p -> p.delete());
                }
                return Promises.resolved(null);
            }
        }).getValue();
    }

    byte[] readFile(String filename) throws IOException {
        try (InputStream inStream = this.getClass().getClassLoader().getResourceAsStream("/" + filename);){
            byte[] byArray = inStream.readAllBytes();
            return byArray;
        }
    }

    @Test
    void testWorking() throws Exception {
        this.session.addListener(List.of("typed-provider1/*"), (t, e) -> this.queue.offer(e), null, null, null);
        Assertions.assertNull((Object)this.session.describeProvider("typed-provider1"));
        Assertions.assertNull((Object)this.session.describeProvider("typed-provider2"));
        byte[] csvContent = this.readFile("csv-header-typed.csv");
        this.client.publish(TOPIC, csvContent, 1, true);
        Assertions.assertNotNull((Object)this.queue.poll(1L, TimeUnit.SECONDS));
        Assertions.assertEquals((int)42, (Integer)((Integer)this.session.getResourceValue("typed-provider1", "data", "value", Integer.class)));
        Assertions.assertEquals((int)84, (Integer)((Integer)this.session.getResourceValue("typed-provider2", "data", "value", Integer.class)));
        Instant timestamp1 = Instant.from(LocalDateTime.of(2021, 10, 20, 18, 14, 0).atOffset(ZoneOffset.UTC));
        Assertions.assertEquals((Object)timestamp1, (Object)this.session.describeResource((String)"typed-provider1", (String)"data", (String)"value").timestamp);
        Instant timestamp2 = Instant.from(LocalDateTime.of(2021, 10, 20, 18, 17, 0).atOffset(ZoneOffset.UTC));
        Assertions.assertEquals((Object)timestamp2, (Object)this.session.describeResource((String)"typed-provider2", (String)"data", (String)"value").timestamp);
        ResourceDescription location1 = this.session.describeResource("typed-provider1", "admin", "location");
        Assertions.assertEquals((Object)timestamp1, (Object)location1.timestamp);
        Assertions.assertNotNull((Object)location1.value);
        Point geoPoint = (Point)location1.value;
        Assertions.assertEquals((double)1.2, (double)geoPoint.coordinates.latitude, (double)0.001);
        Assertions.assertEquals((double)3.4, (double)geoPoint.coordinates.longitude, (double)0.001);
        Assertions.assertTrue((boolean)Double.isNaN(geoPoint.coordinates.elevation));
        ResourceDescription location2 = this.session.describeResource("typed-provider2", "admin", "location");
        Assertions.assertNotNull((Object)location2.value);
        Assertions.assertEquals((Object)timestamp2, (Object)location2.timestamp);
        geoPoint = (Point)location2.value;
        Assertions.assertEquals((double)5.6, (double)geoPoint.coordinates.latitude, (double)0.001);
        Assertions.assertEquals((double)7.8, (double)geoPoint.coordinates.longitude, (double)0.001);
        Assertions.assertTrue((boolean)Double.isNaN(geoPoint.coordinates.elevation));
        Assertions.assertEquals((Object)"testWS", (Object)this.session.getResourceValue("typed-provider1", "data", "testName", String.class));
        Assertions.assertEquals((Object)"testWS", (Object)this.session.getResourceValue("typed-provider2", "data", "testName", String.class));
    }

    static {
        TOPIC = "sensinact/mqtt/testWS/handler";
    }
}

