package org.eclipse.sensinact.northbound.ws.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.sensinact.core.notification.AbstractResourceNotification;
import org.eclipse.sensinact.core.notification.ClientActionListener;
import org.eclipse.sensinact.core.notification.ClientDataListener;
import org.eclipse.sensinact.core.notification.ClientMetadataListener;
import org.eclipse.sensinact.core.notification.LifecycleNotification;
import org.eclipse.sensinact.core.notification.ResourceDataNotification;
import org.eclipse.sensinact.core.snapshot.ICriterion;
import org.eclipse.sensinact.core.snapshot.ResourceValueFilter;
import org.eclipse.sensinact.gateway.geojson.GeoJsonObject;
import org.eclipse.sensinact.northbound.query.api.AbstractQueryDTO;
import org.eclipse.sensinact.northbound.query.api.AbstractResultDTO;
import org.eclipse.sensinact.northbound.query.api.EQueryType;
import org.eclipse.sensinact.northbound.query.api.IQueryHandler;
import org.eclipse.sensinact.northbound.query.api.StatusException;
import org.eclipse.sensinact.northbound.query.dto.SensinactPath;
import org.eclipse.sensinact.northbound.query.dto.notification.AbstractResourceNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.notification.ErrorResultNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.notification.ResourceDataNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.notification.ResourceLifecycleNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.notification.ResultResourceNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.query.QuerySubscribeDTO;
import org.eclipse.sensinact.northbound.query.dto.query.QueryUnsubscribeDTO;
import org.eclipse.sensinact.northbound.query.dto.result.ErrorResultDTO;
import org.eclipse.sensinact.northbound.query.dto.result.ResultSubscribeDTO;
import org.eclipse.sensinact.northbound.query.dto.result.ResultUnsubscribeDTO;
import org.eclipse.sensinact.northbound.session.SensiNactSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@WebSocket(idleTimeout = 0)
/* loaded from: input_file:org/eclipse/sensinact/northbound/ws/impl/WebSocketEndpoint.class */
public class WebSocketEndpoint {
    private static final Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);
    private final SensiNactSession userSession;
    private final WebSocketCreator pool;
    private final IQueryHandler queryHandler;
    private final ObjectMapper mapper = JsonMapper.builder().addModule(new JavaTimeModule()).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false).build();
    private final AtomicReference<Session> wsSession = new AtomicReference<>();
    private final Set<String> subscriptions = new HashSet();

    /* renamed from: org.eclipse.sensinact.northbound.ws.impl.WebSocketEndpoint$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/sensinact/northbound/ws/impl/WebSocketEndpoint$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$sensinact$northbound$query$api$EQueryType = new int[EQueryType.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$sensinact$northbound$query$api$EQueryType[EQueryType.SUBSCRIBE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$sensinact$northbound$query$api$EQueryType[EQueryType.UNSUBSCRIBE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public WebSocketEndpoint(WebSocketCreator webSocketCreator, SensiNactSession sensiNactSession, IQueryHandler iQueryHandler) {
        this.pool = webSocketCreator;
        this.userSession = sensiNactSession;
        this.queryHandler = iQueryHandler;
    }

    public synchronized void close() {
        Session andSet = this.wsSession.getAndSet(null);
        if (andSet == null) {
            return;
        }
        if (this.userSession != null) {
            Iterator<String> it = this.subscriptions.iterator();
            while (it.hasNext()) {
                this.userSession.removeListener(it.next());
            }
        }
        this.subscriptions.clear();
        if (andSet.isOpen()) {
            try {
                andSet.close();
            } catch (Throwable th) {
                logger.error("Error closing WebSocket: {}", th.getMessage(), th);
            }
        }
        this.userSession.expire();
    }

    @OnWebSocketConnect
    public void open(Session session) {
        logger.debug("WebSocket opening - {}", session);
        this.wsSession.set(session);
    }

    @OnWebSocketClose
    public void onClose(Session session, int i, String str) {
        logger.debug("WebSocket closing - {} ({}: {})", new Object[]{session, Integer.valueOf(i), str});
        this.pool.deleteSocketEndpoint(this);
        this.userSession.expire();
    }

    @OnWebSocketError
    public void onError(Session session, Throwable th) {
        logger.warn("Error from WebSocket session {}: {}", session, th.getMessage());
    }

    @OnWebSocketMessage
    public void onMessage(Session session, String str) {
        AbstractResultDTO handleQuery;
        if (this.userSession.isExpired()) {
            session.close(1006, "User session expired due to inactivity");
        } else {
            this.userSession.extend(Duration.of(5L, ChronoUnit.MINUTES));
        }
        try {
            AbstractQueryDTO abstractQueryDTO = (AbstractQueryDTO) this.mapper.readValue(str, AbstractQueryDTO.class);
            try {
                switch (AnonymousClass1.$SwitchMap$org$eclipse$sensinact$northbound$query$api$EQueryType[abstractQueryDTO.operation.ordinal()]) {
                    case 1:
                        handleSubscribe(abstractQueryDTO.requestId, (QuerySubscribeDTO) abstractQueryDTO);
                        return;
                    case 2:
                        handleQuery = handleUnsubscribe((QueryUnsubscribeDTO) abstractQueryDTO);
                        break;
                    default:
                        handleQuery = this.queryHandler.handleQuery(this.userSession, abstractQueryDTO);
                        break;
                }
                handleQuery.requestId = abstractQueryDTO.requestId;
                sendResult(session, handleQuery);
            } catch (StatusException e) {
                logger.error("Error handling query {}: {}", new Object[]{abstractQueryDTO, e.getMessage(), e});
                sendError(session, abstractQueryDTO.uri, e.statusCode, "Error running query: " + e.getMessage());
            } catch (Throwable th) {
                logger.error("Error handling query {}: {}", new Object[]{abstractQueryDTO, th.getMessage(), th});
                sendError(session, abstractQueryDTO.uri, 500, "Error running query: " + th.getMessage());
            }
        } catch (Throwable th2) {
            logger.error("Error parsing WebSocket query: {}", th2.getMessage(), th2);
            sendError(session, null, 400, "Error parsing query: " + th2.getMessage());
        }
    }

    private NotificationSnapshot convertToSnapshot(AbstractResourceNotification abstractResourceNotification) {
        if (abstractResourceNotification.getClass() == LifecycleNotification.class) {
            return new NotificationSnapshot((LifecycleNotification) abstractResourceNotification);
        }
        if (abstractResourceNotification.getClass() == ResourceDataNotification.class) {
            return new NotificationSnapshot((ResourceDataNotification) abstractResourceNotification);
        }
        return null;
    }

    private Predicate<AbstractResourceNotification> prepareFilter(QuerySubscribeDTO querySubscribeDTO) throws StatusException {
        ICriterion parseFilter = this.queryHandler.parseFilter(querySubscribeDTO.filter, querySubscribeDTO.filterLanguage);
        if (parseFilter == null) {
            return null;
        }
        Predicate<AbstractResourceNotification> predicate = null;
        Predicate providerFilter = parseFilter.getProviderFilter();
        Predicate serviceFilter = parseFilter.getServiceFilter();
        Predicate resourceFilter = parseFilter.getResourceFilter();
        ResourceValueFilter resourceValueFilter = parseFilter.getResourceValueFilter();
        if (providerFilter != null || serviceFilter != null || resourceFilter != null || resourceValueFilter != null) {
            predicate = abstractResourceNotification -> {
                NotificationSnapshot convertToSnapshot = convertToSnapshot(abstractResourceNotification);
                if (convertToSnapshot == null) {
                    return false;
                }
                if (providerFilter != null && !providerFilter.test(convertToSnapshot.provider)) {
                    return false;
                }
                if (serviceFilter != null && !serviceFilter.test(convertToSnapshot.service)) {
                    return false;
                }
                if (resourceFilter == null || resourceFilter.test(convertToSnapshot.resource)) {
                    return resourceValueFilter == null || resourceValueFilter.test(convertToSnapshot.provider, List.of(convertToSnapshot.resource));
                }
                return false;
            };
        }
        Predicate locationFilter = parseFilter.getLocationFilter();
        if (locationFilter != null) {
            Predicate<? super AbstractResourceNotification> predicate2 = abstractResourceNotification2 -> {
                if (!"admin".equals(abstractResourceNotification2.service) || !"location".equals(abstractResourceNotification2.resource)) {
                    return true;
                }
                if (abstractResourceNotification2.getClass() == LifecycleNotification.class) {
                    LifecycleNotification lifecycleNotification = (LifecycleNotification) abstractResourceNotification2;
                    return lifecycleNotification.initialValue != null && locationFilter.test((GeoJsonObject) lifecycleNotification.initialValue);
                }
                if (abstractResourceNotification2.getClass() != ResourceDataNotification.class) {
                    return true;
                }
                ResourceDataNotification resourceDataNotification = (ResourceDataNotification) abstractResourceNotification2;
                return resourceDataNotification.newValue != null && locationFilter.test((GeoJsonObject) resourceDataNotification.newValue);
            };
            predicate = predicate == null ? predicate2 : predicate.and(predicate2);
        }
        return predicate;
    }

    private void handleSubscribe(String str, QuerySubscribeDTO querySubscribeDTO) throws Exception {
        List of;
        SensinactPath sensinactPath = querySubscribeDTO.uri;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        ResultSubscribeDTO resultSubscribeDTO = new ResultSubscribeDTO();
        if (sensinactPath.targetsSpecificResource()) {
            resultSubscribeDTO.uri = sensinactPath.toUri();
            of = List.of(resultSubscribeDTO.uri.substring(1));
        } else {
            resultSubscribeDTO.uri = "/";
            of = List.of("*");
        }
        Predicate<AbstractResourceNotification> prepareFilter = (querySubscribeDTO.filter == null || querySubscribeDTO.filter.isBlank()) ? null : prepareFilter(querySubscribeDTO);
        Predicate<AbstractResourceNotification> predicate = prepareFilter;
        ClientDataListener clientDataListener = (str2, resourceDataNotification) -> {
            try {
                Session session = this.wsSession.get();
                if (session == null || !session.isOpen()) {
                    logger.warn("Detected closed WebSocket. Stop listening");
                    this.userSession.removeListener((String) atomicReference.get());
                } else if ((predicate == null || predicate.test(resourceDataNotification)) && checkLatch(countDownLatch)) {
                    sendNotification(session, (String) atomicReference.get(), new ResourceDataNotificationDTO(resourceDataNotification));
                }
            } catch (Throwable th) {
                logger.warn("Error notifying WebSocket of life cycle update", th);
            }
        };
        Predicate<AbstractResourceNotification> predicate2 = prepareFilter;
        String addListener = this.userSession.addListener(of, clientDataListener, (ClientMetadataListener) null, (str3, lifecycleNotification) -> {
            try {
                Session session = this.wsSession.get();
                if (session == null || !session.isOpen()) {
                    logger.warn("Detected closed WebSocket. Stop listening");
                    this.userSession.removeListener((String) atomicReference.get());
                } else if ((predicate2 == null || predicate2.test(lifecycleNotification)) && checkLatch(countDownLatch)) {
                    sendNotification(session, (String) atomicReference.get(), new ResourceLifecycleNotificationDTO(lifecycleNotification));
                }
            } catch (Throwable th) {
                logger.error("Error notifying WebSocket of a data update", th);
            }
        }, (ClientActionListener) null);
        atomicReference.set(addListener);
        this.subscriptions.add(addListener);
        resultSubscribeDTO.statusCode = 200;
        resultSubscribeDTO.subscriptionId = addListener;
        resultSubscribeDTO.requestId = str;
        try {
            Session session = this.wsSession.get();
            if (session == null || !session.isOpen()) {
                this.subscriptions.remove(addListener);
                this.userSession.removeListener(addListener);
            } else {
                sendResult(session, resultSubscribeDTO);
            }
            countDownLatch.countDown();
        } catch (Exception e) {
            this.subscriptions.remove(addListener);
            this.userSession.removeListener(addListener);
            throw e;
        }
    }

    private boolean checkLatch(CountDownLatch countDownLatch) {
        try {
            return countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            return false;
        }
    }

    private void sendNotification(Session session, String str, AbstractResourceNotificationDTO abstractResourceNotificationDTO) {
        try {
            ResultResourceNotificationDTO resultResourceNotificationDTO = new ResultResourceNotificationDTO();
            resultResourceNotificationDTO.statusCode = 200;
            resultResourceNotificationDTO.uri = new SensinactPath(abstractResourceNotificationDTO.provider, abstractResourceNotificationDTO.service, abstractResourceNotificationDTO.resource).toUri();
            resultResourceNotificationDTO.subscriptionId = str;
            resultResourceNotificationDTO.notification = abstractResourceNotificationDTO;
            session.getRemote().sendString(this.mapper.writeValueAsString(resultResourceNotificationDTO));
        } catch (IOException e) {
            logger.error("Error sending notification to client: {}", e.getMessage(), e);
            try {
                session.getRemote().sendString(this.mapper.writeValueAsString(new ErrorResultNotificationDTO(str)));
            } catch (IOException e2) {
                logger.error("Error sending notification to client: {}. Closing WebSocket.", e2.getMessage(), e2);
                this.userSession.removeListener(str);
                close();
            }
        }
    }

    private AbstractResultDTO handleUnsubscribe(QueryUnsubscribeDTO queryUnsubscribeDTO) {
        if (!this.subscriptions.remove(queryUnsubscribeDTO.subscriptionId)) {
            return new ErrorResultDTO(404, "Unknown subscription");
        }
        this.userSession.removeListener(queryUnsubscribeDTO.subscriptionId);
        ResultUnsubscribeDTO resultUnsubscribeDTO = new ResultUnsubscribeDTO();
        resultUnsubscribeDTO.statusCode = 200;
        resultUnsubscribeDTO.subscriptionId = queryUnsubscribeDTO.subscriptionId;
        return resultUnsubscribeDTO;
    }

    private void sendResult(Session session, AbstractResultDTO abstractResultDTO) {
        try {
            session.getRemote().sendString(this.mapper.writeValueAsString(abstractResultDTO));
        } catch (IOException e) {
            sendError(session, null, 500, "Error sending results: " + e.getMessage());
        }
    }

    private void sendError(Session session, SensinactPath sensinactPath, int i, String str) {
        String str2;
        ErrorResultDTO errorResultDTO = new ErrorResultDTO();
        ((AbstractResultDTO) errorResultDTO).uri = sensinactPath != null ? sensinactPath.toUri() : null;
        ((AbstractResultDTO) errorResultDTO).statusCode = i;
        ((AbstractResultDTO) errorResultDTO).error = str;
        try {
            str2 = this.mapper.writeValueAsString(errorResultDTO);
        } catch (JsonProcessingException e) {
            logger.error("Error preparing error message payload to client: {}", e.getMessage(), e);
            str2 = "{\"uri\": null, \"statusCode\": 500, \"error\": \"Error sending error\", \"result\": null}";
        }
        try {
            session.getRemote().sendString(str2);
        } catch (IOException e2) {
            logger.error("Error sending error to client: {}", e2.getMessage(), e2);
        }
    }
}
