/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.sensinact.northbound.ws.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.sensinact.core.notification.AbstractResourceNotification;
import org.eclipse.sensinact.core.notification.ClientDataListener;
import org.eclipse.sensinact.core.notification.ClientLifecycleListener;
import org.eclipse.sensinact.core.notification.LifecycleNotification;
import org.eclipse.sensinact.core.notification.ResourceDataNotification;
import org.eclipse.sensinact.core.snapshot.ICriterion;
import org.eclipse.sensinact.core.snapshot.ResourceValueFilter;
import org.eclipse.sensinact.gateway.geojson.GeoJsonObject;
import org.eclipse.sensinact.northbound.query.api.AbstractQueryDTO;
import org.eclipse.sensinact.northbound.query.api.AbstractResultDTO;
import org.eclipse.sensinact.northbound.query.api.IQueryHandler;
import org.eclipse.sensinact.northbound.query.api.StatusException;
import org.eclipse.sensinact.northbound.query.dto.SensinactPath;
import org.eclipse.sensinact.northbound.query.dto.notification.AbstractResourceNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.notification.ErrorResultNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.notification.ResourceDataNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.notification.ResourceLifecycleNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.notification.ResultResourceNotificationDTO;
import org.eclipse.sensinact.northbound.query.dto.query.QuerySubscribeDTO;
import org.eclipse.sensinact.northbound.query.dto.query.QueryUnsubscribeDTO;
import org.eclipse.sensinact.northbound.query.dto.result.ErrorResultDTO;
import org.eclipse.sensinact.northbound.query.dto.result.ResultSubscribeDTO;
import org.eclipse.sensinact.northbound.query.dto.result.ResultUnsubscribeDTO;
import org.eclipse.sensinact.northbound.session.SensiNactSession;
import org.eclipse.sensinact.northbound.ws.impl.NotificationSnapshot;
import org.eclipse.sensinact.northbound.ws.impl.WebSocketCreator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@WebSocket(idleTimeout=0)
public class WebSocketEndpoint {
    private static final Logger logger = LoggerFactory.getLogger(WebSocketEndpoint.class);
    private final ObjectMapper mapper = ((JsonMapper.Builder)((JsonMapper.Builder)JsonMapper.builder().addModule((Module)new JavaTimeModule())).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)).build();
    private final AtomicReference<Session> wsSession = new AtomicReference();
    private final SensiNactSession userSession;
    private final WebSocketCreator pool;
    private final IQueryHandler queryHandler;
    private final Set<String> subscriptions = new HashSet<String>();

    public WebSocketEndpoint(WebSocketCreator pool, SensiNactSession sensiNactSession, IQueryHandler queryHandler) {
        this.pool = pool;
        this.userSession = sensiNactSession;
        this.queryHandler = queryHandler;
    }

    public synchronized void close() {
        Session ws = this.wsSession.getAndSet(null);
        if (ws == null) {
            return;
        }
        if (this.userSession != null) {
            for (String listenerId : this.subscriptions) {
                this.userSession.removeListener(listenerId);
            }
        }
        this.subscriptions.clear();
        if (ws.isOpen()) {
            try {
                ws.close();
            }
            catch (Throwable t) {
                logger.error("Error closing WebSocket: {}", (Object)t.getMessage(), (Object)t);
            }
        }
        this.userSession.expire();
    }

    @OnWebSocketConnect
    public void open(Session session) {
        logger.debug("WebSocket opening - {}", (Object)session);
        this.wsSession.set(session);
    }

    @OnWebSocketClose
    public void onClose(Session session, int statusCode, String reason) {
        logger.debug("WebSocket closing - {} ({}: {})", new Object[]{session, statusCode, reason});
        this.pool.deleteSocketEndpoint(this);
        this.userSession.expire();
    }

    @OnWebSocketError
    public void onError(Session session, Throwable t) {
        logger.warn("Error from WebSocket session {}: {}", (Object)session, (Object)t.getMessage());
    }

    @OnWebSocketMessage
    public void onMessage(Session wsSession, String strContent) {
        AbstractQueryDTO query;
        if (this.userSession.isExpired()) {
            wsSession.close(1006, "User session expired due to inactivity");
        } else {
            this.userSession.extend(Duration.of(5L, ChronoUnit.MINUTES));
        }
        try {
            query = (AbstractQueryDTO)this.mapper.readValue(strContent, AbstractQueryDTO.class);
        }
        catch (Throwable t) {
            logger.error("Error parsing WebSocket query: {}", (Object)t.getMessage(), (Object)t);
            this.sendError(wsSession, null, 400, "Error parsing query: " + t.getMessage());
            return;
        }
        try {
            AbstractResultDTO result;
            switch (query.operation) {
                case SUBSCRIBE: {
                    this.handleSubscribe(query.requestId, (QuerySubscribeDTO)query);
                    return;
                }
                case UNSUBSCRIBE: {
                    result = this.handleUnsubscribe((QueryUnsubscribeDTO)query);
                    break;
                }
                default: {
                    result = this.queryHandler.handleQuery(this.userSession, query);
                }
            }
            result.requestId = query.requestId;
            this.sendResult(wsSession, result);
        }
        catch (StatusException e) {
            logger.error("Error handling query {}: {}", new Object[]{query, e.getMessage(), e});
            this.sendError(wsSession, query.uri, e.statusCode, "Error running query: " + e.getMessage());
        }
        catch (Throwable t) {
            logger.error("Error handling query {}: {}", new Object[]{query, t.getMessage(), t});
            this.sendError(wsSession, query.uri, 500, "Error running query: " + t.getMessage());
        }
    }

    private NotificationSnapshot convertToSnapshot(AbstractResourceNotification notif) {
        if (notif.getClass() == LifecycleNotification.class) {
            return new NotificationSnapshot((LifecycleNotification)notif);
        }
        if (notif.getClass() == ResourceDataNotification.class) {
            return new NotificationSnapshot((ResourceDataNotification)notif);
        }
        return null;
    }

    private Predicate<AbstractResourceNotification> prepareFilter(QuerySubscribeDTO query) throws StatusException {
        Predicate locationFilter;
        ICriterion parsedFilter = this.queryHandler.parseFilter(query.filter, query.filterLanguage);
        if (parsedFilter == null) {
            return null;
        }
        Predicate<AbstractResourceNotification> predicate = null;
        Predicate providerFilter = parsedFilter.getProviderFilter();
        Predicate serviceFilter = parsedFilter.getServiceFilter();
        Predicate resourceFilter = parsedFilter.getResourceFilter();
        ResourceValueFilter resourceValueFilter = parsedFilter.getResourceValueFilter();
        if (providerFilter != null || serviceFilter != null || resourceFilter != null || resourceValueFilter != null) {
            predicate = notif -> {
                NotificationSnapshot snapshot = this.convertToSnapshot((AbstractResourceNotification)notif);
                if (snapshot == null) {
                    return false;
                }
                if (providerFilter != null && !providerFilter.test(snapshot.provider)) {
                    return false;
                }
                if (serviceFilter != null && !serviceFilter.test(snapshot.service)) {
                    return false;
                }
                if (resourceFilter != null && !resourceFilter.test(snapshot.resource)) {
                    return false;
                }
                return resourceValueFilter == null || resourceValueFilter.test(snapshot.provider, List.of(snapshot.resource));
            };
        }
        if ((locationFilter = parsedFilter.getLocationFilter()) != null) {
            Predicate<AbstractResourceNotification> locationPredicate = notif -> {
                if ("admin".equals(notif.service) && "location".equals(notif.resource)) {
                    if (notif.getClass() == LifecycleNotification.class) {
                        LifecycleNotification lifecycleNotif = (LifecycleNotification)notif;
                        return lifecycleNotif.initialValue != null && locationFilter.test((GeoJsonObject)lifecycleNotif.initialValue);
                    }
                    if (notif.getClass() == ResourceDataNotification.class) {
                        ResourceDataNotification dataNotif = (ResourceDataNotification)notif;
                        return dataNotif.newValue != null && locationFilter.test((GeoJsonObject)dataNotif.newValue);
                    }
                }
                return true;
            };
            predicate = predicate == null ? locationPredicate : predicate.and(locationPredicate);
        }
        return predicate;
    }

    private void handleSubscribe(String requestId, QuerySubscribeDTO query) throws Exception {
        List<String> topics;
        SensinactPath path = query.uri;
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<String> listenerId = new AtomicReference<String>();
        ResultSubscribeDTO result = new ResultSubscribeDTO();
        if (path.targetsSpecificResource()) {
            result.uri = path.toUri();
            topics = List.of(result.uri.substring(1));
        } else {
            result.uri = "/";
            topics = List.of("*");
        }
        Predicate<AbstractResourceNotification> p = query.filter != null && !query.filter.isBlank() ? this.prepareFilter(query) : null;
        ClientDataListener cld = (topic, evt) -> {
            try {
                Session ws = this.wsSession.get();
                if (ws == null || !ws.isOpen()) {
                    logger.warn("Detected closed WebSocket. Stop listening");
                    this.userSession.removeListener((String)listenerId.get());
                } else if ((p == null || p.test((AbstractResourceNotification)evt)) && this.checkLatch(latch)) {
                    this.sendNotification(ws, (String)listenerId.get(), (AbstractResourceNotificationDTO)new ResourceDataNotificationDTO(evt));
                }
            }
            catch (Throwable e) {
                logger.warn("Error notifying WebSocket of life cycle update", e);
            }
        };
        ClientLifecycleListener cll = (topic, evt) -> {
            try {
                Session ws = this.wsSession.get();
                if (ws == null || !ws.isOpen()) {
                    logger.warn("Detected closed WebSocket. Stop listening");
                    this.userSession.removeListener((String)listenerId.get());
                } else if ((p == null || p.test((AbstractResourceNotification)evt)) && this.checkLatch(latch)) {
                    this.sendNotification(ws, (String)listenerId.get(), (AbstractResourceNotificationDTO)new ResourceLifecycleNotificationDTO(evt));
                }
            }
            catch (Throwable e) {
                logger.error("Error notifying WebSocket of a data update", e);
            }
        };
        String id = this.userSession.addListener(topics, cld, null, cll, null);
        listenerId.set(id);
        this.subscriptions.add(id);
        result.statusCode = 200;
        result.subscriptionId = id;
        result.requestId = requestId;
        try {
            Session ws = this.wsSession.get();
            if (ws == null || !ws.isOpen()) {
                this.subscriptions.remove(id);
                this.userSession.removeListener(id);
            } else {
                this.sendResult(ws, (AbstractResultDTO)result);
            }
        }
        catch (Exception e) {
            this.subscriptions.remove(id);
            this.userSession.removeListener(id);
            throw e;
        }
        latch.countDown();
    }

    private boolean checkLatch(CountDownLatch latch) {
        try {
            return latch.await(500L, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            return false;
        }
    }

    private void sendNotification(Session ws, String listenerId, AbstractResourceNotificationDTO notification) {
        try {
            ResultResourceNotificationDTO result = new ResultResourceNotificationDTO();
            result.statusCode = 200;
            result.uri = new SensinactPath(notification.provider, notification.service, notification.resource).toUri();
            result.subscriptionId = listenerId;
            result.notification = notification;
            ws.getRemote().sendString(this.mapper.writeValueAsString((Object)result));
        }
        catch (IOException e) {
            logger.error("Error sending notification to client: {}", (Object)e.getMessage(), (Object)e);
            try {
                ErrorResultNotificationDTO errorNotification = new ErrorResultNotificationDTO(listenerId);
                ws.getRemote().sendString(this.mapper.writeValueAsString((Object)errorNotification));
            }
            catch (IOException e2) {
                logger.error("Error sending notification to client: {}. Closing WebSocket.", (Object)e2.getMessage(), (Object)e2);
                this.userSession.removeListener(listenerId);
                this.close();
            }
        }
    }

    private AbstractResultDTO handleUnsubscribe(QueryUnsubscribeDTO query) {
        if (this.subscriptions.remove(query.subscriptionId)) {
            this.userSession.removeListener(query.subscriptionId);
            ResultUnsubscribeDTO result = new ResultUnsubscribeDTO();
            result.statusCode = 200;
            result.subscriptionId = query.subscriptionId;
            return result;
        }
        return new ErrorResultDTO(404, "Unknown subscription");
    }

    private void sendResult(Session session, AbstractResultDTO dto) {
        try {
            session.getRemote().sendString(this.mapper.writeValueAsString((Object)dto));
        }
        catch (IOException e) {
            this.sendError(session, null, 500, "Error sending results: " + e.getMessage());
        }
    }

    private void sendError(Session session, SensinactPath target, int statusCode, String errorMessage) {
        String payload;
        ErrorResultDTO dto = new ErrorResultDTO();
        dto.uri = target != null ? target.toUri() : null;
        dto.statusCode = statusCode;
        dto.error = errorMessage;
        try {
            payload = this.mapper.writeValueAsString((Object)dto);
        }
        catch (JsonProcessingException e) {
            logger.error("Error preparing error message payload to client: {}", (Object)e.getMessage(), (Object)e);
            payload = "{\"uri\": null, \"statusCode\": 500, \"error\": \"Error sending error\", \"result\": null}";
        }
        try {
            session.getRemote().sendString(payload);
        }
        catch (IOException e) {
            logger.error("Error sending error to client: {}", (Object)e.getMessage(), (Object)e);
        }
    }
}

