package org.eclipse.sensinact.northbound.rest.integration.notification;

import com.fasterxml.jackson.jakarta.rs.json.JacksonJsonProvider;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.core.Application;
import jakarta.ws.rs.sse.SseEventSource;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.sensinact.core.notification.ClientActionListener;
import org.eclipse.sensinact.core.notification.ClientLifecycleListener;
import org.eclipse.sensinact.core.notification.ClientMetadataListener;
import org.eclipse.sensinact.core.notification.LifecycleNotification;
import org.eclipse.sensinact.core.notification.ResourceDataNotification;
import org.eclipse.sensinact.core.push.DataUpdate;
import org.eclipse.sensinact.core.push.dto.GenericDto;
import org.eclipse.sensinact.model.core.provider.ProviderPackage;
import org.eclipse.sensinact.northbound.query.dto.notification.ResourceDataNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.notification.ResourceLifecycleNotificationDTO;
import org.eclipse.sensinact.northbound.rest.integration.TestUtils;
import org.eclipse.sensinact.northbound.security.api.UserInfo;
import org.eclipse.sensinact.northbound.session.SensiNactSession;
import org.eclipse.sensinact.northbound.session.SensiNactSessionManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opentest4j.AssertionFailedError;
import org.osgi.service.cm.Configuration;
import org.osgi.service.jakartars.client.SseEventSourceFactory;
import org.osgi.test.common.annotation.InjectService;
import org.osgi.test.common.annotation.Property;
import org.osgi.test.common.annotation.config.InjectConfiguration;
import org.osgi.test.common.annotation.config.WithConfiguration;
import org.osgi.test.common.service.ServiceAware;

@WithConfiguration(pid = "sensinact.session.manager", properties = {@Property(key = "auth.policy", value = {"ALLOW_ALL"})})
/* loaded from: input_file:org/eclipse/sensinact/northbound/rest/integration/notification/ResourceNotificationsTest.class */
public class ResourceNotificationsTest {
    private static final UserInfo USER = UserInfo.ANONYMOUS;
    private static final String PROVIDER = "RestNotificationProvider";
    private static final String PROVIDER_TOPIC = "RestNotificationProvider/RestNotificationProvider/*";

    @InjectService
    protected SseEventSourceFactory sseClient;

    @InjectService
    SensiNactSessionManager sessionManager;

    @InjectService
    DataUpdate push;

    @InjectService
    ClientBuilder clientBuilder;
    BlockingQueue<ResourceDataNotification> queue;
    final TestUtils utils = new TestUtils();

    @BeforeEach
    public void await(@InjectConfiguration(withConfig = @WithConfiguration(pid = "sensinact.northbound.rest", location = "?", properties = {@Property(key = "allow.anonymous", value = {"true"}), @Property(key = "buzz", value = {"fizzbuzz"})})) Configuration configuration, @InjectService(filter = "(buzz=fizzbuzz)", cardinality = 0) ServiceAware<Application> serviceAware) throws InterruptedException {
        serviceAware.waitForService(5000L);
        for (int i = 0; i < 10; i++) {
            try {
            } catch (Exception e) {
                e.printStackTrace();
            }
            if (this.utils.queryStatus("/").statusCode() == 200) {
                return;
            }
            Thread.sleep(200L);
        }
        throw new AssertionFailedError("REST API did not appear");
    }

    @BeforeEach
    void start() throws InterruptedException {
        this.queue = new ArrayBlockingQueue(32);
        this.sessionManager.getDefaultSession(USER).addListener(List.of(PROVIDER_TOPIC), (str, resourceDataNotification) -> {
            this.queue.offer(resourceDataNotification);
        }, (ClientMetadataListener) null, (ClientLifecycleListener) null, (ClientActionListener) null);
        Assertions.assertNull(this.queue.poll(500L, TimeUnit.MILLISECONDS));
    }

    @AfterEach
    void stop() {
        SensiNactSession defaultSession = this.sessionManager.getDefaultSession(USER);
        Set keySet = defaultSession.activeListeners().keySet();
        Objects.requireNonNull(defaultSession);
        keySet.forEach(defaultSession::removeListener);
    }

    @Test
    void resourceNotificationNonExistent() throws Exception {
        SseEventSource newSource = this.sseClient.newSource(this.clientBuilder.connectTimeout(3L, TimeUnit.SECONDS).register(JacksonJsonProvider.class).build().target("http://localhost:8185/sensinact/").path("providers").path(PROVIDER).path("services").path("new").path("resources").path("new-resource").path("SUBSCRIBE"));
        BlockingArrayQueue blockingArrayQueue = new BlockingArrayQueue();
        BlockingArrayQueue blockingArrayQueue2 = new BlockingArrayQueue();
        newSource.register(inboundSseEvent -> {
            String name = inboundSseEvent.getName();
            boolean z = -1;
            switch (name.hashCode()) {
                case -302323862:
                    if (name.equals("lifecycle")) {
                        z = false;
                        break;
                    }
                    break;
                case 3076010:
                    if (name.equals("data")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    blockingArrayQueue.add((ResourceLifecycleNotificationDTO) inboundSseEvent.readData(ResourceLifecycleNotificationDTO.class));
                    return;
                case true:
                    blockingArrayQueue2.add((ResourceDataNotificationDTO) inboundSseEvent.readData(ResourceDataNotificationDTO.class));
                    return;
                default:
                    return;
            }
        });
        newSource.open();
        try {
            GenericDto makeDto = this.utils.makeDto(PROVIDER, "new", "new-resource", 42, Integer.class);
            this.push.pushUpdate(makeDto);
            ResourceDataNotification poll = this.queue.poll(1L, TimeUnit.SECONDS);
            Assertions.assertNotNull(poll);
            Assertions.assertEquals(ProviderPackage.Literals.ADMIN__FRIENDLY_NAME.getName(), poll.resource, "First event was not FriendlyName, so the Provider already exists. It is likely that the data folder wasn't cleared and this is a remnant of a previous testrun.");
            Assertions.assertNotNull(this.queue.poll(1L, TimeUnit.SECONDS));
            Assertions.assertNotNull(this.queue.poll(1L, TimeUnit.SECONDS));
            ResourceDataNotification poll2 = this.queue.poll(2L, TimeUnit.SECONDS);
            this.utils.assertNotification(makeDto, poll2);
            ResourceLifecycleNotificationDTO resourceLifecycleNotificationDTO = (ResourceLifecycleNotificationDTO) blockingArrayQueue.poll(1L, TimeUnit.SECONDS);
            Assertions.assertNotNull(resourceLifecycleNotificationDTO);
            Assertions.assertEquals(PROVIDER, resourceLifecycleNotificationDTO.provider);
            Assertions.assertEquals(makeDto.service, resourceLifecycleNotificationDTO.service);
            Assertions.assertEquals(makeDto.resource, resourceLifecycleNotificationDTO.resource);
            Assertions.assertEquals(LifecycleNotification.Status.RESOURCE_CREATED, resourceLifecycleNotificationDTO.status);
            Assertions.assertFalse(poll2.timestamp.isAfter(Instant.ofEpochMilli(resourceLifecycleNotificationDTO.timestamp)), "Lifecycle timestamp is too early");
            ResourceDataNotificationDTO resourceDataNotificationDTO = (ResourceDataNotificationDTO) blockingArrayQueue2.poll(500L, TimeUnit.MILLISECONDS);
            Assertions.assertNotNull(resourceDataNotificationDTO);
            Assertions.assertEquals(PROVIDER, resourceDataNotificationDTO.provider);
            Assertions.assertEquals(makeDto.service, resourceDataNotificationDTO.service);
            Assertions.assertEquals(makeDto.resource, resourceDataNotificationDTO.resource);
            Assertions.assertNull(resourceDataNotificationDTO.oldValue, "Old value is not null");
            Assertions.assertEquals(makeDto.value, resourceDataNotificationDTO.newValue);
            Assertions.assertEquals(poll2.timestamp.toEpochMilli(), resourceDataNotificationDTO.timestamp);
            makeDto.value = Integer.valueOf(42 + 10);
            this.push.pushUpdate(makeDto);
            ResourceDataNotification poll3 = this.queue.poll(1L, TimeUnit.SECONDS);
            this.utils.assertNotification(makeDto, poll3);
            ResourceDataNotificationDTO resourceDataNotificationDTO2 = (ResourceDataNotificationDTO) blockingArrayQueue2.poll(500L, TimeUnit.MILLISECONDS);
            Assertions.assertNotNull(resourceDataNotificationDTO2);
            Assertions.assertEquals(PROVIDER, resourceDataNotificationDTO2.provider);
            Assertions.assertEquals(makeDto.service, resourceDataNotificationDTO2.service);
            Assertions.assertEquals(makeDto.resource, resourceDataNotificationDTO2.resource);
            Assertions.assertEquals(42, resourceDataNotificationDTO2.oldValue);
            Assertions.assertEquals(makeDto.value, resourceDataNotificationDTO2.newValue);
            Assertions.assertEquals(poll3.timestamp.toEpochMilli(), resourceDataNotificationDTO2.timestamp);
            Assertions.assertNull(blockingArrayQueue.poll());
            newSource.close();
        } catch (Throwable th) {
            newSource.close();
            throw th;
        }
    }
}
