/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.sensinact.gateway.southbound.http.factory.integration;

import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.CallSite;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.sensinact.core.notification.ResourceDataNotification;
import org.eclipse.sensinact.gateway.southbound.http.factory.integration.RequestHandler;
import org.eclipse.sensinact.northbound.security.api.UserInfo;
import org.eclipse.sensinact.northbound.session.ResourceDescription;
import org.eclipse.sensinact.northbound.session.SensiNactSession;
import org.eclipse.sensinact.northbound.session.SensiNactSessionManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.annotation.Property;
import org.osgi.test.common.annotation.config.WithConfiguration;

@WithConfiguration(pid="sensinact.session.manager", properties={@Property(key="auth.policy", value={"ALLOW_ALL"})})
public class HttpDeviceFactoryParallelQueries {
    static final String TEMPLATE = "Name,Value\n%s,%s\n";
    static QueuedThreadPool threadPool;
    static Server server1;
    static Server server2;
    static int httpPort1;
    static int httpPort2;
    static RequestHandler handler1;
    static RequestHandler handler2;
    @InjectService
    SensiNactSessionManager sessionManager;
    SensiNactSession session;
    @InjectService
    ConfigurationAdmin configAdmin;
    BlockingQueue<ResourceDataNotification> queue1;
    BlockingQueue<ResourceDataNotification> queue2;

    @BeforeAll
    static void setup() throws Exception {
        threadPool = new QueuedThreadPool();
        threadPool.setName("test-server");
        server1 = new Server((ThreadPool)threadPool);
        ServerConnector conn1 = new ServerConnector(server1);
        conn1.setPort(0);
        server1.addConnector((Connector)conn1);
        handler1 = new RequestHandler();
        server1.setHandler((Handler)handler1);
        server1.start();
        httpPort1 = conn1.getLocalPort();
        server2 = new Server((ThreadPool)threadPool);
        ServerConnector conn2 = new ServerConnector(server2);
        conn2.setPort(0);
        server2.addConnector((Connector)conn2);
        handler2 = new RequestHandler();
        server2.setHandler((Handler)handler2);
        server2.start();
        httpPort2 = conn2.getLocalPort();
    }

    @AfterAll
    static void teardown() throws Exception {
        server1.stop();
        server1 = null;
        server2.stop();
        server2 = null;
        threadPool.stop();
        threadPool = null;
    }

    @BeforeEach
    void start() throws InterruptedException {
        this.session = this.sessionManager.getDefaultSession(UserInfo.ANONYMOUS);
        this.queue1 = new ArrayBlockingQueue<ResourceDataNotification>(32);
        this.queue2 = new ArrayBlockingQueue<ResourceDataNotification>(32);
    }

    @AfterEach
    void stop() {
        this.session.activeListeners().keySet().forEach(arg_0 -> ((SensiNactSession)this.session).removeListener(arg_0));
        this.session = null;
        handler1.clear();
        handler2.clear();
    }

    byte[] readFile(String filename) throws IOException {
        try (InputStream inStream = this.getClass().getClassLoader().getResourceAsStream("/" + filename);){
            byte[] byArray = inStream.readAllBytes();
            return byArray;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testParallelQuery() throws Exception {
        String provider1 = "http-parallel-query1";
        String provider2 = "http-parallel-query2";
        int value1 = 42;
        int value2 = 21;
        handler1.setData("/data", String.format(TEMPLATE, "http-parallel-query1", 42));
        handler1.setPause("/data", 600);
        handler2.setData("/data", String.format(TEMPLATE, "http-parallel-query2", 21));
        handler2.setPause("/data", 800);
        this.session.addListener(List.of("http-parallel-query1/*"), (t, e) -> this.queue1.offer(e), null, null, null);
        this.session.addListener(List.of("http-parallel-query2/*"), (t, e) -> this.queue2.offer(e), null, null, null);
        Assertions.assertNull((Object)this.session.describeProvider("http-parallel-query1"));
        Assertions.assertNull((Object)this.session.describeProvider("http-parallel-query2"));
        String mappingConfig = new String(this.readFile("csv-station-dynamic-mapping.json"));
        Configuration config = this.configAdmin.createFactoryConfiguration("sensinact.http.device.factory", "?");
        try {
            Instant start = Instant.now();
            config.update(new Hashtable<String, CallSite>(Map.of("tasks.oneshot", "[{\"url\": \"http://localhost:" + httpPort1 + "/data\", \"mapping\": " + mappingConfig + "},{\"url\": \"http://localhost:" + httpPort2 + "/data\", \"mapping\": " + mappingConfig + "}]")));
            Assertions.assertNotNull((Object)this.queue1.poll(2L, TimeUnit.SECONDS));
            Assertions.assertNotNull((Object)this.queue2.poll(2L, TimeUnit.SECONDS));
            ResourceDescription rc1 = this.session.describeResource("http-parallel-query1", "data", "value");
            ResourceDescription rc2 = this.session.describeResource("http-parallel-query2", "data", "value");
            Assertions.assertTrue((boolean)rc1.timestamp.isAfter(start));
            Assertions.assertTrue((boolean)rc1.timestamp.isAfter(handler1.lastVisitTime("/data")));
            Assertions.assertTrue((boolean)rc2.timestamp.isAfter(rc1.timestamp));
            Assertions.assertTrue((boolean)rc2.timestamp.isAfter(handler2.lastVisitTime("/data")));
            Assertions.assertTrue((Duration.between(handler1.lastVisitTime("/data"), handler2.lastVisitTime("/data")).abs().toMillis() < 400L ? 1 : 0) != 0);
            Assertions.assertFalse((boolean)rc2.timestamp.isAfter(start.plus(1200L, ChronoUnit.MILLIS)));
            Assertions.assertEquals((Object)42, (Object)rc1.value);
            Assertions.assertEquals((Object)21, (Object)rc2.value);
            Assertions.assertEquals((int)1, (int)handler1.nbVisits("/data"));
            Assertions.assertEquals((int)1, (int)handler2.nbVisits("/data"));
        }
        finally {
            config.delete();
        }
    }
}

