package org.eclipse.sensinact.northbound.websocket.integration;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.sensinact.core.push.DataUpdate;
import org.eclipse.sensinact.core.push.dto.BulkGenericDto;
import org.eclipse.sensinact.core.push.dto.GenericDto;
import org.eclipse.sensinact.northbound.query.api.AbstractQueryDTO;
import org.eclipse.sensinact.northbound.query.api.AbstractResultDTO;
import org.eclipse.sensinact.northbound.query.api.EResultType;
import org.eclipse.sensinact.northbound.query.dto.SensinactPath;
import org.eclipse.sensinact.northbound.query.dto.notification.ResultResourceNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.query.QueryListDTO;
import org.eclipse.sensinact.northbound.query.dto.query.QuerySubscribeDTO;
import org.eclipse.sensinact.northbound.query.dto.query.QueryUnsubscribeDTO;
import org.eclipse.sensinact.northbound.query.dto.result.ResultListServicesDTO;
import org.eclipse.sensinact.northbound.query.dto.result.ResultSubscribeDTO;
import org.eclipse.sensinact.northbound.query.dto.result.ResultUnsubscribeDTO;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.annotation.Property;
import org.osgi.test.common.annotation.config.WithConfiguration;
import org.osgi.test.common.annotation.config.WithConfigurations;

@WithConfigurations({@WithConfiguration(pid = "sensinact.northbound.websocket", properties = {@Property(key = "allow.anonymous", value = {"true"})}), @WithConfiguration(pid = "sensinact.session.manager", properties = {@Property(key = "auth.policy", value = {"ALLOW_ALL"})})})
/* loaded from: input_file:org/eclipse/sensinact/northbound/websocket/integration/WebSocketTest.class */
public class WebSocketTest {

    @InjectService
    DataUpdate push;
    final ObjectMapper mapper = new ObjectMapper();
    static URI wsUri;

    /* loaded from: input_file:org/eclipse/sensinact/northbound/websocket/integration/WebSocketTest$WSClient.class */
    class WSClient implements AutoCloseable {
        WebSocketClient ws = new WebSocketClient();

        public WSClient() throws Exception {
            this.ws.start();
        }

        @Override // java.lang.AutoCloseable
        public void close() throws Exception {
            this.ws.stop();
            this.ws.destroy();
        }
    }

    @BeforeAll
    static void setup() throws URISyntaxException {
        wsUri = new URI("ws://localhost:14001/ws/sensinact");
    }

    public GenericDto makeDto(String str, String str2, String str3, Object obj, Class<?> cls) {
        GenericDto genericDto = new GenericDto();
        genericDto.model = str;
        genericDto.provider = str;
        genericDto.service = str2;
        genericDto.resource = str3;
        genericDto.value = obj;
        genericDto.type = cls;
        return genericDto;
    }

    void sendDTO(Session session, AbstractQueryDTO abstractQueryDTO) {
        try {
            session.getRemote().sendString(this.mapper.writeValueAsString(abstractQueryDTO));
        } catch (Exception e) {
            Assertions.fail("Error sending DTO", e);
        }
    }

    @Test
    void testQuery() throws Exception {
        GenericDto makeDto = makeDto("wsTestProvider", "svc", "data", 42, Integer.class);
        this.push.pushUpdate(makeDto).getValue();
        WSHandler wSHandler = new WSHandler();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        QueryListDTO queryListDTO = new QueryListDTO();
        queryListDTO.uri = new SensinactPath(makeDto.provider);
        queryListDTO.requestId = String.valueOf(new Random().nextInt());
        wSHandler.onConnect = session -> {
            sendDTO(session, queryListDTO);
        };
        AtomicReference atomicReference = new AtomicReference();
        wSHandler.onError = (session2, th) -> {
            atomicReference.set(th);
            countDownLatch.countDown();
        };
        wSHandler.onClose = (session3, num) -> {
            countDownLatch.countDown();
        };
        AtomicReference atomicReference2 = new AtomicReference();
        wSHandler.onMessage = (session4, str) -> {
            try {
                atomicReference2.set((AbstractResultDTO) this.mapper.readValue(str, AbstractResultDTO.class));
                countDownLatch.countDown();
            } catch (JsonProcessingException e) {
                Assertions.fail("Error parsing WS response", e);
            }
        };
        WSClient wSClient = new WSClient();
        try {
            wSClient.ws.connect(wSHandler, wsUri).get();
            countDownLatch.await();
            wSClient.close();
            if (atomicReference.get() != null) {
                Assertions.fail((Throwable) atomicReference.get());
            }
            ResultListServicesDTO resultListServicesDTO = (AbstractResultDTO) atomicReference2.get();
            Assertions.assertNotNull(resultListServicesDTO, "No WS query result");
            Assertions.assertEquals(200, ((AbstractResultDTO) resultListServicesDTO).statusCode);
            Assertions.assertEquals(queryListDTO.requestId, ((AbstractResultDTO) resultListServicesDTO).requestId);
            Assertions.assertEquals(EResultType.SERVICES_LIST, ((AbstractResultDTO) resultListServicesDTO).type);
            ResultListServicesDTO resultListServicesDTO2 = resultListServicesDTO;
            Assertions.assertTrue(resultListServicesDTO2.services.contains("admin"), "Admin service is missing");
            Assertions.assertTrue(resultListServicesDTO2.services.contains(makeDto.service), "Provider service is missing");
        } catch (Throwable th2) {
            try {
                wSClient.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }

    @Test
    void testSubscription() throws Exception {
        GenericDto makeDto = makeDto("wsTestProviderSub", "svc", "data", 42, Integer.class);
        this.push.pushUpdate(makeDto).getValue();
        WSHandler wSHandler = new WSHandler();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        wSHandler.onConnect = session -> {
            atomicReference.set(session);
            countDownLatch.countDown();
        };
        AtomicReference atomicReference2 = new AtomicReference();
        wSHandler.onError = (session2, th) -> {
            atomicReference2.set(th);
            countDownLatch.countDown();
        };
        wSHandler.onClose = (session3, num) -> {
            countDownLatch.countDown();
        };
        BlockingArrayQueue blockingArrayQueue = new BlockingArrayQueue();
        wSHandler.onMessage = (session4, str) -> {
            try {
                blockingArrayQueue.add((AbstractResultDTO) this.mapper.readValue(str, AbstractResultDTO.class));
            } catch (JsonProcessingException e) {
                atomicReference2.set(new Exception("Error parsing WS response", e));
            }
        };
        WSClient wSClient = new WSClient();
        try {
            wSClient.ws.connect(wSHandler, wsUri).get();
            countDownLatch.await(2L, TimeUnit.SECONDS);
            Session session5 = (Session) atomicReference.get();
            QuerySubscribeDTO querySubscribeDTO = new QuerySubscribeDTO();
            querySubscribeDTO.uri = new SensinactPath(makeDto.provider, makeDto.service, makeDto.resource);
            querySubscribeDTO.requestId = String.valueOf(new Random().nextInt());
            sendDTO(session5, querySubscribeDTO);
            ResultSubscribeDTO resultSubscribeDTO = (AbstractResultDTO) blockingArrayQueue.poll(1L, TimeUnit.SECONDS);
            Assertions.assertNotNull(resultSubscribeDTO, "No result to subscribe");
            ResultSubscribeDTO resultSubscribeDTO2 = resultSubscribeDTO;
            Assertions.assertEquals(querySubscribeDTO.requestId, ((AbstractResultDTO) resultSubscribeDTO).requestId);
            Assertions.assertNotNull(resultSubscribeDTO2.subscriptionId, "No subscription ID");
            Instant truncatedTo = Instant.now().truncatedTo(ChronoUnit.MILLIS);
            Object obj = makeDto.value;
            makeDto.value = -12;
            this.push.pushUpdate(makeDto).getValue();
            ResultResourceNotificationDTO resultResourceNotificationDTO = (AbstractResultDTO) blockingArrayQueue.poll(1L, TimeUnit.SECONDS);
            if (resultResourceNotificationDTO == null && atomicReference2.get() != null) {
                Assertions.fail((Throwable) atomicReference2.get());
            }
            Assertions.assertNotNull(resultResourceNotificationDTO, "No notification");
            ResultResourceNotificationDTO resultResourceNotificationDTO2 = resultResourceNotificationDTO;
            Assertions.assertEquals(querySubscribeDTO.requestId, resultSubscribeDTO2.requestId);
            Assertions.assertEquals(resultSubscribeDTO2.subscriptionId, resultResourceNotificationDTO2.subscriptionId);
            Assertions.assertNotNull(resultResourceNotificationDTO2.notification);
            Assertions.assertEquals(obj, resultResourceNotificationDTO2.notification.oldValue);
            Assertions.assertEquals(makeDto.value, resultResourceNotificationDTO2.notification.newValue);
            Assertions.assertEquals(makeDto.provider, resultResourceNotificationDTO2.notification.provider);
            Assertions.assertEquals(makeDto.service, resultResourceNotificationDTO2.notification.service);
            Assertions.assertEquals(makeDto.resource, resultResourceNotificationDTO2.notification.resource);
            Instant ofEpochMilli = Instant.ofEpochMilli(resultResourceNotificationDTO2.notification.timestamp);
            Assertions.assertFalse(truncatedTo.isAfter(ofEpochMilli));
            Instant truncatedTo2 = Instant.now().truncatedTo(ChronoUnit.MILLIS);
            Object obj2 = makeDto.value;
            makeDto.value = 128;
            this.push.pushUpdate(makeDto).getValue();
            AbstractResultDTO abstractResultDTO = (AbstractResultDTO) blockingArrayQueue.poll(1L, TimeUnit.SECONDS);
            Assertions.assertNotNull(abstractResultDTO, "No notification");
            ResultResourceNotificationDTO resultResourceNotificationDTO3 = (ResultResourceNotificationDTO) abstractResultDTO;
            Assertions.assertEquals(querySubscribeDTO.requestId, resultSubscribeDTO2.requestId);
            Assertions.assertEquals(resultSubscribeDTO2.subscriptionId, resultResourceNotificationDTO3.subscriptionId);
            Assertions.assertNotNull(resultResourceNotificationDTO3.notification);
            Assertions.assertEquals(obj2, resultResourceNotificationDTO3.notification.oldValue);
            Assertions.assertEquals(makeDto.value, resultResourceNotificationDTO3.notification.newValue);
            Instant ofEpochMilli2 = Instant.ofEpochMilli(resultResourceNotificationDTO3.notification.timestamp);
            Assertions.assertFalse(ofEpochMilli.isAfter(ofEpochMilli2));
            Assertions.assertFalse(truncatedTo2.isAfter(ofEpochMilli2));
            QueryUnsubscribeDTO queryUnsubscribeDTO = new QueryUnsubscribeDTO();
            queryUnsubscribeDTO.subscriptionId = resultSubscribeDTO2.subscriptionId;
            sendDTO(session5, queryUnsubscribeDTO);
            Instant plus = Instant.now().plus(5L, (TemporalUnit) ChronoUnit.SECONDS);
            boolean z = false;
            while (true) {
                if (!Instant.now().isBefore(plus)) {
                    break;
                }
                abstractResultDTO = (AbstractResultDTO) blockingArrayQueue.poll(1L, TimeUnit.SECONDS);
                if (abstractResultDTO != null && abstractResultDTO.type == EResultType.UNSUBSCRIPTION_RESPONSE) {
                    z = true;
                    break;
                }
            }
            Assertions.assertTrue(z, "Didn't get the unsubscription response");
            Assertions.assertEquals(resultSubscribeDTO2.subscriptionId, ((ResultUnsubscribeDTO) abstractResultDTO).subscriptionId);
            makeDto.value = 512;
            this.push.pushUpdate(makeDto).getValue();
            Assertions.assertNull(blockingArrayQueue.poll(1L, TimeUnit.SECONDS), "Got a notification");
            wSClient.close();
        } catch (Throwable th2) {
            try {
                wSClient.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }

    @Test
    void testSubscriptionWithFilter() throws Exception {
        GenericDto makeDto = makeDto("wsTestProviderSubFilter1", "svc", "data", 42, Integer.class);
        GenericDto makeDto2 = makeDto("wsTestProviderSubFilter2", "svc", "data", 21, Integer.class);
        BulkGenericDto bulkGenericDto = new BulkGenericDto();
        bulkGenericDto.dtos = List.of(makeDto, makeDto2);
        this.push.pushUpdate(bulkGenericDto).getValue();
        WSHandler wSHandler = new WSHandler();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        wSHandler.onConnect = session -> {
            atomicReference.set(session);
            countDownLatch.countDown();
        };
        AtomicReference atomicReference2 = new AtomicReference();
        wSHandler.onError = (session2, th) -> {
            atomicReference2.set(th);
            countDownLatch.countDown();
        };
        wSHandler.onClose = (session3, num) -> {
            countDownLatch.countDown();
        };
        BlockingArrayQueue blockingArrayQueue = new BlockingArrayQueue();
        wSHandler.onMessage = (session4, str) -> {
            try {
                blockingArrayQueue.add((AbstractResultDTO) this.mapper.readValue(str, AbstractResultDTO.class));
            } catch (JsonProcessingException e) {
                atomicReference2.set(new Exception("Error parsing WS response", e));
            }
        };
        WSClient wSClient = new WSClient();
        try {
            wSClient.ws.connect(wSHandler, wsUri).get();
            countDownLatch.await(2L, TimeUnit.SECONDS);
            Session session5 = (Session) atomicReference.get();
            QuerySubscribeDTO querySubscribeDTO = new QuerySubscribeDTO();
            querySubscribeDTO.uri = new SensinactPath();
            querySubscribeDTO.requestId = String.valueOf(new Random().nextInt());
            querySubscribeDTO.filter = "(PROVIDER=" + makeDto.provider + ")";
            querySubscribeDTO.filterLanguage = "ldap";
            sendDTO(session5, querySubscribeDTO);
            ResultSubscribeDTO resultSubscribeDTO = (AbstractResultDTO) blockingArrayQueue.poll(1L, TimeUnit.SECONDS);
            Assertions.assertNotNull(resultSubscribeDTO, "No result to subscribe");
            ResultSubscribeDTO resultSubscribeDTO2 = resultSubscribeDTO;
            Assertions.assertEquals(querySubscribeDTO.requestId, ((AbstractResultDTO) resultSubscribeDTO).requestId);
            Assertions.assertNotNull(resultSubscribeDTO2.subscriptionId, "No subscription ID");
            Instant truncatedTo = Instant.now().truncatedTo(ChronoUnit.MILLIS);
            Object obj = makeDto.value;
            makeDto.value = -12;
            this.push.pushUpdate(makeDto).getValue();
            ResultResourceNotificationDTO resultResourceNotificationDTO = (AbstractResultDTO) blockingArrayQueue.poll(1L, TimeUnit.SECONDS);
            if (resultResourceNotificationDTO == null && atomicReference2.get() != null) {
                Assertions.fail((Throwable) atomicReference2.get());
            }
            Assertions.assertNotNull(resultResourceNotificationDTO, "No notification");
            ResultResourceNotificationDTO resultResourceNotificationDTO2 = resultResourceNotificationDTO;
            Assertions.assertEquals(querySubscribeDTO.requestId, resultSubscribeDTO2.requestId);
            Assertions.assertEquals(resultSubscribeDTO2.subscriptionId, resultResourceNotificationDTO2.subscriptionId);
            Assertions.assertNotNull(resultResourceNotificationDTO2.notification);
            Assertions.assertEquals(makeDto.provider, resultResourceNotificationDTO2.notification.provider);
            Assertions.assertEquals(obj, resultResourceNotificationDTO2.notification.oldValue);
            Assertions.assertEquals(makeDto.value, resultResourceNotificationDTO2.notification.newValue);
            Assertions.assertEquals(makeDto.provider, resultResourceNotificationDTO2.notification.provider);
            Assertions.assertEquals(makeDto.service, resultResourceNotificationDTO2.notification.service);
            Assertions.assertEquals(makeDto.resource, resultResourceNotificationDTO2.notification.resource);
            Instant ofEpochMilli = Instant.ofEpochMilli(resultResourceNotificationDTO2.notification.timestamp);
            Assertions.assertFalse(truncatedTo.isAfter(ofEpochMilli));
            makeDto2.value = 128;
            this.push.pushUpdate(makeDto2).getValue();
            Assertions.assertNull((AbstractResultDTO) blockingArrayQueue.poll(1L, TimeUnit.SECONDS), "Got notified on filtered out provider");
            Instant truncatedTo2 = Instant.now().truncatedTo(ChronoUnit.MILLIS);
            Object obj2 = makeDto.value;
            makeDto.value = -128;
            this.push.pushUpdate(makeDto).getValue();
            AbstractResultDTO abstractResultDTO = (AbstractResultDTO) blockingArrayQueue.poll(1L, TimeUnit.SECONDS);
            Assertions.assertNotNull(abstractResultDTO, "No notification");
            ResultResourceNotificationDTO resultResourceNotificationDTO3 = (ResultResourceNotificationDTO) abstractResultDTO;
            Assertions.assertEquals(querySubscribeDTO.requestId, resultSubscribeDTO2.requestId);
            Assertions.assertEquals(resultSubscribeDTO2.subscriptionId, resultResourceNotificationDTO3.subscriptionId);
            Assertions.assertNotNull(resultResourceNotificationDTO3.notification);
            Assertions.assertEquals(makeDto.provider, resultResourceNotificationDTO3.notification.provider);
            Assertions.assertEquals(obj2, resultResourceNotificationDTO3.notification.oldValue);
            Assertions.assertEquals(makeDto.value, resultResourceNotificationDTO3.notification.newValue);
            Instant ofEpochMilli2 = Instant.ofEpochMilli(resultResourceNotificationDTO3.notification.timestamp);
            Assertions.assertFalse(ofEpochMilli.isAfter(ofEpochMilli2));
            Assertions.assertFalse(truncatedTo2.isAfter(ofEpochMilli2));
            QueryUnsubscribeDTO queryUnsubscribeDTO = new QueryUnsubscribeDTO();
            queryUnsubscribeDTO.subscriptionId = resultSubscribeDTO2.subscriptionId;
            sendDTO(session5, queryUnsubscribeDTO);
            Instant plus = Instant.now().plus(5L, (TemporalUnit) ChronoUnit.SECONDS);
            boolean z = false;
            while (true) {
                if (!Instant.now().isBefore(plus)) {
                    break;
                }
                abstractResultDTO = (AbstractResultDTO) blockingArrayQueue.poll(1L, TimeUnit.SECONDS);
                if (abstractResultDTO != null && abstractResultDTO.type == EResultType.UNSUBSCRIPTION_RESPONSE) {
                    z = true;
                    break;
                }
            }
            Assertions.assertTrue(z, "Didn't get the unsubscription response");
            Assertions.assertEquals(resultSubscribeDTO2.subscriptionId, ((ResultUnsubscribeDTO) abstractResultDTO).subscriptionId);
            makeDto.value = 512;
            makeDto2.value = 256;
            this.push.pushUpdate(bulkGenericDto).getValue();
            Assertions.assertNull(blockingArrayQueue.poll(1L, TimeUnit.SECONDS), "Got a notification");
            wSClient.close();
        } catch (Throwable th2) {
            try {
                wSClient.close();
            } catch (Throwable th3) {
                th2.addSuppressed(th3);
            }
            throw th2;
        }
    }
}
