package org.eclipse.sensinact.gateway.southbound.http.factory.integration;

import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.sensinact.core.notification.ClientActionListener;
import org.eclipse.sensinact.core.notification.ClientLifecycleListener;
import org.eclipse.sensinact.core.notification.ClientMetadataListener;
import org.eclipse.sensinact.core.notification.ResourceDataNotification;
import org.eclipse.sensinact.northbound.security.api.UserInfo;
import org.eclipse.sensinact.northbound.session.ResourceDescription;
import org.eclipse.sensinact.northbound.session.SensiNactSession;
import org.eclipse.sensinact.northbound.session.SensiNactSessionManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.annotation.Property;
import org.osgi.test.common.annotation.config.WithConfiguration;

@WithConfiguration(pid = "sensinact.session.manager", properties = {@Property(key = "auth.policy", value = {"ALLOW_ALL"})})
/* loaded from: input_file:org/eclipse/sensinact/gateway/southbound/http/factory/integration/HttpDeviceFactoryParallelQueries.class */
public class HttpDeviceFactoryParallelQueries {
    static final String TEMPLATE = "Name,Value\n%s,%s\n";
    static QueuedThreadPool threadPool;
    static Server server1;
    static Server server2;
    static int httpPort1;
    static int httpPort2;
    static RequestHandler handler1;
    static RequestHandler handler2;

    @InjectService
    SensiNactSessionManager sessionManager;
    SensiNactSession session;

    @InjectService
    ConfigurationAdmin configAdmin;
    BlockingQueue<ResourceDataNotification> queue1;
    BlockingQueue<ResourceDataNotification> queue2;

    @BeforeAll
    static void setup() throws Exception {
        threadPool = new QueuedThreadPool();
        threadPool.setName("test-server");
        server1 = new Server(threadPool);
        ServerConnector serverConnector = new ServerConnector(server1);
        serverConnector.setPort(0);
        server1.addConnector(serverConnector);
        handler1 = new RequestHandler();
        server1.setHandler(handler1);
        server1.start();
        httpPort1 = serverConnector.getLocalPort();
        server2 = new Server(threadPool);
        ServerConnector serverConnector2 = new ServerConnector(server2);
        serverConnector2.setPort(0);
        server2.addConnector(serverConnector2);
        handler2 = new RequestHandler();
        server2.setHandler(handler2);
        server2.start();
        httpPort2 = serverConnector2.getLocalPort();
    }

    @AfterAll
    static void teardown() throws Exception {
        server1.stop();
        server1 = null;
        server2.stop();
        server2 = null;
        threadPool.stop();
        threadPool = null;
    }

    @BeforeEach
    void start() throws InterruptedException {
        this.session = this.sessionManager.getDefaultSession(UserInfo.ANONYMOUS);
        this.queue1 = new ArrayBlockingQueue(32);
        this.queue2 = new ArrayBlockingQueue(32);
    }

    @AfterEach
    void stop() {
        Set keySet = this.session.activeListeners().keySet();
        SensiNactSession sensiNactSession = this.session;
        Objects.requireNonNull(sensiNactSession);
        keySet.forEach(sensiNactSession::removeListener);
        this.session = null;
        handler1.clear();
        handler2.clear();
    }

    byte[] readFile(String str) throws IOException {
        InputStream resourceAsStream = getClass().getClassLoader().getResourceAsStream("/" + str);
        try {
            byte[] readAllBytes = resourceAsStream.readAllBytes();
            if (resourceAsStream != null) {
                resourceAsStream.close();
            }
            return readAllBytes;
        } catch (Throwable th) {
            if (resourceAsStream != null) {
                try {
                    resourceAsStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testParallelQuery() throws Exception {
        handler1.setData("/data", String.format(TEMPLATE, "http-parallel-query1", 42));
        handler1.setPause("/data", 600);
        handler2.setData("/data", String.format(TEMPLATE, "http-parallel-query2", 21));
        handler2.setPause("/data", 800);
        this.session.addListener(List.of("http-parallel-query1/*"), (str, resourceDataNotification) -> {
            this.queue1.offer(resourceDataNotification);
        }, (ClientMetadataListener) null, (ClientLifecycleListener) null, (ClientActionListener) null);
        this.session.addListener(List.of("http-parallel-query2/*"), (str2, resourceDataNotification2) -> {
            this.queue2.offer(resourceDataNotification2);
        }, (ClientMetadataListener) null, (ClientLifecycleListener) null, (ClientActionListener) null);
        Assertions.assertNull(this.session.describeProvider("http-parallel-query1"));
        Assertions.assertNull(this.session.describeProvider("http-parallel-query2"));
        String str3 = new String(readFile("csv-station-dynamic-mapping.json"));
        Configuration createFactoryConfiguration = this.configAdmin.createFactoryConfiguration("sensinact.http.device.factory", "?");
        try {
            Instant now = Instant.now();
            createFactoryConfiguration.update(new Hashtable(Map.of("tasks.oneshot", "[{\"url\": \"http://localhost:" + httpPort1 + "/data\", \"mapping\": " + str3 + "},{\"url\": \"http://localhost:" + httpPort2 + "/data\", \"mapping\": " + str3 + "}]")));
            Assertions.assertNotNull(this.queue1.poll(2L, TimeUnit.SECONDS));
            Assertions.assertNotNull(this.queue2.poll(2L, TimeUnit.SECONDS));
            ResourceDescription describeResource = this.session.describeResource("http-parallel-query1", "data", "value");
            ResourceDescription describeResource2 = this.session.describeResource("http-parallel-query2", "data", "value");
            Assertions.assertTrue(describeResource.timestamp.isAfter(now));
            Assertions.assertTrue(describeResource.timestamp.isAfter(handler1.lastVisitTime("/data")));
            Assertions.assertTrue(describeResource2.timestamp.isAfter(describeResource.timestamp));
            Assertions.assertTrue(describeResource2.timestamp.isAfter(handler2.lastVisitTime("/data")));
            Assertions.assertTrue(Duration.between(handler1.lastVisitTime("/data"), handler2.lastVisitTime("/data")).abs().toMillis() < 400);
            Assertions.assertFalse(describeResource2.timestamp.isAfter(now.plus(1200L, (TemporalUnit) ChronoUnit.MILLIS)));
            Assertions.assertEquals(42, describeResource.value);
            Assertions.assertEquals(21, describeResource2.value);
            Assertions.assertEquals(1, handler1.nbVisits("/data"));
            Assertions.assertEquals(1, handler2.nbVisits("/data"));
            createFactoryConfiguration.delete();
        } catch (Throwable th) {
            createFactoryConfiguration.delete();
            throw th;
        }
    }
}
