/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.sensinact.northbound.websocket.integration;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.sensinact.core.push.DataUpdate;
import org.eclipse.sensinact.core.push.dto.BulkGenericDto;
import org.eclipse.sensinact.core.push.dto.GenericDto;
import org.eclipse.sensinact.northbound.query.api.AbstractQueryDTO;
import org.eclipse.sensinact.northbound.query.api.AbstractResultDTO;
import org.eclipse.sensinact.northbound.query.api.EResultType;
import org.eclipse.sensinact.northbound.query.dto.SensinactPath;
import org.eclipse.sensinact.northbound.query.dto.notification.ResourceDataNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.notification.ResultResourceNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.query.QueryListDTO;
import org.eclipse.sensinact.northbound.query.dto.query.QuerySubscribeDTO;
import org.eclipse.sensinact.northbound.query.dto.query.QueryUnsubscribeDTO;
import org.eclipse.sensinact.northbound.query.dto.result.ResultListServicesDTO;
import org.eclipse.sensinact.northbound.query.dto.result.ResultSubscribeDTO;
import org.eclipse.sensinact.northbound.query.dto.result.ResultUnsubscribeDTO;
import org.eclipse.sensinact.northbound.websocket.integration.WSHandler;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.annotation.Property;
import org.osgi.test.common.annotation.config.WithConfiguration;
import org.osgi.test.common.annotation.config.WithConfigurations;

@WithConfigurations(value={@WithConfiguration(pid="sensinact.northbound.websocket", properties={@Property(key="allow.anonymous", value={"true"})}), @WithConfiguration(pid="sensinact.session.manager", properties={@Property(key="auth.policy", value={"ALLOW_ALL"})})})
public class WebSocketTest {
    @InjectService
    DataUpdate push;
    final ObjectMapper mapper = new ObjectMapper();
    static URI wsUri;

    @BeforeAll
    static void setup() throws URISyntaxException {
        wsUri = new URI("ws://localhost:14001/ws/sensinact");
    }

    public GenericDto makeDto(String provider, String service, String resource, Object value, Class<?> type) {
        GenericDto dto = new GenericDto();
        dto.model = provider;
        dto.provider = provider;
        dto.service = service;
        dto.resource = resource;
        dto.value = value;
        dto.type = type;
        return dto;
    }

    void sendDTO(Session session, AbstractQueryDTO dto) {
        try {
            session.getRemote().sendString(this.mapper.writeValueAsString((Object)dto));
        }
        catch (Exception e) {
            Assertions.fail((String)"Error sending DTO", (Throwable)e);
        }
    }

    @Test
    void testQuery() throws Exception {
        GenericDto dto = this.makeDto("wsTestProvider", "svc", "data", 42, Integer.class);
        this.push.pushUpdate((Object)dto).getValue();
        WSHandler handler = new WSHandler();
        CountDownLatch barrier = new CountDownLatch(1);
        QueryListDTO query = new QueryListDTO();
        query.uri = new SensinactPath(dto.provider);
        query.requestId = String.valueOf(new Random().nextInt());
        handler.onConnect = s -> this.sendDTO((Session)s, (AbstractQueryDTO)query);
        AtomicReference error = new AtomicReference();
        handler.onError = (s, t) -> {
            error.set(t);
            barrier.countDown();
        };
        handler.onClose = (s, c) -> barrier.countDown();
        AtomicReference resultHolder = new AtomicReference();
        handler.onMessage = (s, m) -> {
            try {
                resultHolder.set((AbstractResultDTO)this.mapper.readValue(m, AbstractResultDTO.class));
                barrier.countDown();
            }
            catch (JsonProcessingException e) {
                Assertions.fail((String)"Error parsing WS response", (Throwable)e);
            }
        };
        try (WSClient client = new WSClient();){
            client.ws.connect((Object)handler, wsUri).get();
            barrier.await();
        }
        if (error.get() != null) {
            Assertions.fail((Throwable)((Throwable)error.get()));
        }
        AbstractResultDTO result = (AbstractResultDTO)resultHolder.get();
        Assertions.assertNotNull((Object)result, (String)"No WS query result");
        Assertions.assertEquals((int)200, (int)result.statusCode);
        Assertions.assertEquals((Object)query.requestId, (Object)result.requestId);
        Assertions.assertEquals((Object)EResultType.SERVICES_LIST, (Object)result.type);
        ResultListServicesDTO svcList = (ResultListServicesDTO)result;
        Assertions.assertTrue((boolean)svcList.services.contains("admin"), (String)"Admin service is missing");
        Assertions.assertTrue((boolean)svcList.services.contains(dto.service), (String)"Provider service is missing");
    }

    @Test
    void testSubscription() throws Exception {
        GenericDto dto = this.makeDto("wsTestProviderSub", "svc", "data", 42, Integer.class);
        this.push.pushUpdate((Object)dto).getValue();
        WSHandler handler = new WSHandler();
        CountDownLatch barrier = new CountDownLatch(1);
        AtomicReference sessionRef = new AtomicReference();
        handler.onConnect = s -> {
            sessionRef.set(s);
            barrier.countDown();
        };
        AtomicReference error = new AtomicReference();
        handler.onError = (s, t) -> {
            error.set(t);
            barrier.countDown();
        };
        handler.onClose = (s, c) -> barrier.countDown();
        BlockingArrayQueue resultsHolder = new BlockingArrayQueue();
        handler.onMessage = (arg_0, arg_1) -> this.lambda$testSubscription$7((BlockingQueue)resultsHolder, error, arg_0, arg_1);
        try (WSClient client = new WSClient();){
            client.ws.connect((Object)handler, wsUri).get();
            barrier.await(2L, TimeUnit.SECONDS);
            Session session = (Session)sessionRef.get();
            QuerySubscribeDTO querySub = new QuerySubscribeDTO();
            querySub.uri = new SensinactPath(dto.provider, dto.service, dto.resource);
            querySub.requestId = String.valueOf(new Random().nextInt());
            this.sendDTO(session, (AbstractQueryDTO)querySub);
            AbstractResultDTO rawResult = (AbstractResultDTO)resultsHolder.poll(1L, TimeUnit.SECONDS);
            Assertions.assertNotNull((Object)rawResult, (String)"No result to subscribe");
            ResultSubscribeDTO subscribeResult = (ResultSubscribeDTO)rawResult;
            Assertions.assertEquals((Object)querySub.requestId, (Object)rawResult.requestId);
            Assertions.assertNotNull((Object)subscribeResult.subscriptionId, (String)"No subscription ID");
            Instant updateTime = Instant.now().truncatedTo(ChronoUnit.MILLIS);
            Object oldValue = dto.value;
            dto.value = -12;
            this.push.pushUpdate((Object)dto).getValue();
            rawResult = (AbstractResultDTO)resultsHolder.poll(1L, TimeUnit.SECONDS);
            if (rawResult == null && error.get() != null) {
                Assertions.fail((Throwable)((Throwable)error.get()));
            }
            Assertions.assertNotNull((Object)rawResult, (String)"No notification");
            ResultResourceNotificationDTO notif = (ResultResourceNotificationDTO)rawResult;
            Assertions.assertEquals((Object)querySub.requestId, (Object)subscribeResult.requestId);
            Assertions.assertEquals((Object)subscribeResult.subscriptionId, (Object)notif.subscriptionId);
            Assertions.assertNotNull((Object)notif.notification);
            Assertions.assertEquals((Object)oldValue, (Object)((ResourceDataNotificationDTO)notif.notification).oldValue);
            Assertions.assertEquals((Object)dto.value, (Object)((ResourceDataNotificationDTO)notif.notification).newValue);
            Assertions.assertEquals((Object)dto.provider, (Object)notif.notification.provider);
            Assertions.assertEquals((Object)dto.service, (Object)notif.notification.service);
            Assertions.assertEquals((Object)dto.resource, (Object)notif.notification.resource);
            Instant firstNotifTime = Instant.ofEpochMilli(notif.notification.timestamp);
            Assertions.assertFalse((boolean)updateTime.isAfter(firstNotifTime));
            Instant updateTime2 = Instant.now().truncatedTo(ChronoUnit.MILLIS);
            oldValue = dto.value;
            dto.value = 128;
            this.push.pushUpdate((Object)dto).getValue();
            rawResult = (AbstractResultDTO)resultsHolder.poll(1L, TimeUnit.SECONDS);
            Assertions.assertNotNull((Object)rawResult, (String)"No notification");
            notif = (ResultResourceNotificationDTO)rawResult;
            Assertions.assertEquals((Object)querySub.requestId, (Object)subscribeResult.requestId);
            Assertions.assertEquals((Object)subscribeResult.subscriptionId, (Object)notif.subscriptionId);
            Assertions.assertNotNull((Object)notif.notification);
            Assertions.assertEquals((Object)oldValue, (Object)((ResourceDataNotificationDTO)notif.notification).oldValue);
            Assertions.assertEquals((Object)dto.value, (Object)((ResourceDataNotificationDTO)notif.notification).newValue);
            Instant secondNotifTime = Instant.ofEpochMilli(notif.notification.timestamp);
            Assertions.assertFalse((boolean)firstNotifTime.isAfter(secondNotifTime));
            Assertions.assertFalse((boolean)updateTime2.isAfter(secondNotifTime));
            QueryUnsubscribeDTO unsubQuery = new QueryUnsubscribeDTO();
            unsubQuery.subscriptionId = subscribeResult.subscriptionId;
            this.sendDTO(session, (AbstractQueryDTO)unsubQuery);
            Instant timeout = Instant.now().plus(5L, ChronoUnit.SECONDS);
            boolean found = false;
            while (Instant.now().isBefore(timeout)) {
                rawResult = (AbstractResultDTO)resultsHolder.poll(1L, TimeUnit.SECONDS);
                if (rawResult == null || rawResult.type != EResultType.UNSUBSCRIPTION_RESPONSE) continue;
                found = true;
                break;
            }
            Assertions.assertTrue((boolean)found, (String)"Didn't get the unsubscription response");
            ResultUnsubscribeDTO unsubResult = (ResultUnsubscribeDTO)rawResult;
            Assertions.assertEquals((Object)subscribeResult.subscriptionId, (Object)unsubResult.subscriptionId);
            dto.value = 512;
            this.push.pushUpdate((Object)dto).getValue();
            Assertions.assertNull(resultsHolder.poll(1L, TimeUnit.SECONDS), (String)"Got a notification");
        }
    }

    @Test
    void testSubscriptionWithFilter() throws Exception {
        GenericDto dto1 = this.makeDto("wsTestProviderSubFilter1", "svc", "data", 42, Integer.class);
        GenericDto dto2 = this.makeDto("wsTestProviderSubFilter2", "svc", "data", 21, Integer.class);
        BulkGenericDto bulk = new BulkGenericDto();
        bulk.dtos = List.of(dto1, dto2);
        this.push.pushUpdate((Object)bulk).getValue();
        WSHandler handler = new WSHandler();
        CountDownLatch barrier = new CountDownLatch(1);
        AtomicReference sessionRef = new AtomicReference();
        handler.onConnect = s -> {
            sessionRef.set(s);
            barrier.countDown();
        };
        AtomicReference error = new AtomicReference();
        handler.onError = (s, t) -> {
            error.set(t);
            barrier.countDown();
        };
        handler.onClose = (s, c) -> barrier.countDown();
        BlockingArrayQueue resultsHolder = new BlockingArrayQueue();
        handler.onMessage = (arg_0, arg_1) -> this.lambda$testSubscriptionWithFilter$11((BlockingQueue)resultsHolder, error, arg_0, arg_1);
        try (WSClient client = new WSClient();){
            client.ws.connect((Object)handler, wsUri).get();
            barrier.await(2L, TimeUnit.SECONDS);
            Session session = (Session)sessionRef.get();
            QuerySubscribeDTO querySub = new QuerySubscribeDTO();
            querySub.uri = new SensinactPath();
            querySub.requestId = String.valueOf(new Random().nextInt());
            querySub.filter = "(PROVIDER=" + dto1.provider + ")";
            querySub.filterLanguage = "ldap";
            this.sendDTO(session, (AbstractQueryDTO)querySub);
            AbstractResultDTO rawResult = (AbstractResultDTO)resultsHolder.poll(1L, TimeUnit.SECONDS);
            Assertions.assertNotNull((Object)rawResult, (String)"No result to subscribe");
            ResultSubscribeDTO subscribeResult = (ResultSubscribeDTO)rawResult;
            Assertions.assertEquals((Object)querySub.requestId, (Object)rawResult.requestId);
            Assertions.assertNotNull((Object)subscribeResult.subscriptionId, (String)"No subscription ID");
            Instant updateTime = Instant.now().truncatedTo(ChronoUnit.MILLIS);
            Object oldValue = dto1.value;
            dto1.value = -12;
            this.push.pushUpdate((Object)dto1).getValue();
            rawResult = (AbstractResultDTO)resultsHolder.poll(1L, TimeUnit.SECONDS);
            if (rawResult == null && error.get() != null) {
                Assertions.fail((Throwable)((Throwable)error.get()));
            }
            Assertions.assertNotNull((Object)rawResult, (String)"No notification");
            ResultResourceNotificationDTO notif = (ResultResourceNotificationDTO)rawResult;
            Assertions.assertEquals((Object)querySub.requestId, (Object)subscribeResult.requestId);
            Assertions.assertEquals((Object)subscribeResult.subscriptionId, (Object)notif.subscriptionId);
            Assertions.assertNotNull((Object)notif.notification);
            Assertions.assertEquals((Object)dto1.provider, (Object)notif.notification.provider);
            Assertions.assertEquals((Object)oldValue, (Object)((ResourceDataNotificationDTO)notif.notification).oldValue);
            Assertions.assertEquals((Object)dto1.value, (Object)((ResourceDataNotificationDTO)notif.notification).newValue);
            Assertions.assertEquals((Object)dto1.provider, (Object)notif.notification.provider);
            Assertions.assertEquals((Object)dto1.service, (Object)notif.notification.service);
            Assertions.assertEquals((Object)dto1.resource, (Object)notif.notification.resource);
            Instant firstNotifTime = Instant.ofEpochMilli(notif.notification.timestamp);
            Assertions.assertFalse((boolean)updateTime.isAfter(firstNotifTime));
            dto2.value = 128;
            this.push.pushUpdate((Object)dto2).getValue();
            rawResult = (AbstractResultDTO)resultsHolder.poll(1L, TimeUnit.SECONDS);
            Assertions.assertNull((Object)rawResult, (String)"Got notified on filtered out provider");
            Instant updateTime2 = Instant.now().truncatedTo(ChronoUnit.MILLIS);
            oldValue = dto1.value;
            dto1.value = -128;
            this.push.pushUpdate((Object)dto1).getValue();
            rawResult = (AbstractResultDTO)resultsHolder.poll(1L, TimeUnit.SECONDS);
            Assertions.assertNotNull((Object)rawResult, (String)"No notification");
            notif = (ResultResourceNotificationDTO)rawResult;
            Assertions.assertEquals((Object)querySub.requestId, (Object)subscribeResult.requestId);
            Assertions.assertEquals((Object)subscribeResult.subscriptionId, (Object)notif.subscriptionId);
            Assertions.assertNotNull((Object)notif.notification);
            Assertions.assertEquals((Object)dto1.provider, (Object)notif.notification.provider);
            Assertions.assertEquals((Object)oldValue, (Object)((ResourceDataNotificationDTO)notif.notification).oldValue);
            Assertions.assertEquals((Object)dto1.value, (Object)((ResourceDataNotificationDTO)notif.notification).newValue);
            Instant secondNotifTime = Instant.ofEpochMilli(notif.notification.timestamp);
            Assertions.assertFalse((boolean)firstNotifTime.isAfter(secondNotifTime));
            Assertions.assertFalse((boolean)updateTime2.isAfter(secondNotifTime));
            QueryUnsubscribeDTO unsubQuery = new QueryUnsubscribeDTO();
            unsubQuery.subscriptionId = subscribeResult.subscriptionId;
            this.sendDTO(session, (AbstractQueryDTO)unsubQuery);
            Instant timeout = Instant.now().plus(5L, ChronoUnit.SECONDS);
            boolean found = false;
            while (Instant.now().isBefore(timeout)) {
                rawResult = (AbstractResultDTO)resultsHolder.poll(1L, TimeUnit.SECONDS);
                if (rawResult == null || rawResult.type != EResultType.UNSUBSCRIPTION_RESPONSE) continue;
                found = true;
                break;
            }
            Assertions.assertTrue((boolean)found, (String)"Didn't get the unsubscription response");
            ResultUnsubscribeDTO unsubResult = (ResultUnsubscribeDTO)rawResult;
            Assertions.assertEquals((Object)subscribeResult.subscriptionId, (Object)unsubResult.subscriptionId);
            dto1.value = 512;
            dto2.value = 256;
            this.push.pushUpdate((Object)bulk).getValue();
            Assertions.assertNull(resultsHolder.poll(1L, TimeUnit.SECONDS), (String)"Got a notification");
        }
    }

    private /* synthetic */ void lambda$testSubscriptionWithFilter$11(BlockingQueue resultsHolder, AtomicReference error, Session s, String m) {
        try {
            resultsHolder.add((AbstractResultDTO)this.mapper.readValue(m, AbstractResultDTO.class));
        }
        catch (JsonProcessingException e) {
            error.set(new Exception("Error parsing WS response", e));
        }
    }

    private /* synthetic */ void lambda$testSubscription$7(BlockingQueue resultsHolder, AtomicReference error, Session s, String m) {
        try {
            resultsHolder.add((AbstractResultDTO)this.mapper.readValue(m, AbstractResultDTO.class));
        }
        catch (JsonProcessingException e) {
            error.set(new Exception("Error parsing WS response", e));
        }
    }

    class WSClient
    implements AutoCloseable {
        WebSocketClient ws = new WebSocketClient();

        public WSClient() throws Exception {
            this.ws.start();
        }

        @Override
        public void close() throws Exception {
            this.ws.stop();
            this.ws.destroy();
        }
    }
}

