/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.sensinact.core.command.impl;

import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.emf.ecore.EPackage;
import org.eclipse.emf.ecore.resource.ResourceSet;
import org.eclipse.sensinact.core.command.AbstractSensinactCommand;
import org.eclipse.sensinact.core.command.GatewayThread;
import org.eclipse.sensinact.core.metrics.IMetricTimer;
import org.eclipse.sensinact.core.metrics.IMetricsManager;
import org.eclipse.sensinact.core.model.SensinactModelManager;
import org.eclipse.sensinact.core.model.impl.SensinactModelManagerImpl;
import org.eclipse.sensinact.core.model.nexus.ModelNexus;
import org.eclipse.sensinact.core.notification.NotificationAccumulator;
import org.eclipse.sensinact.core.notification.impl.ImmediateNotificationAccumulator;
import org.eclipse.sensinact.core.notification.impl.NotificationAccumulatorImpl;
import org.eclipse.sensinact.core.twin.SensinactDigitalTwin;
import org.eclipse.sensinact.core.twin.impl.SensinactDigitalTwinImpl;
import org.eclipse.sensinact.core.whiteboard.WhiteboardHandler;
import org.eclipse.sensinact.core.whiteboard.impl.SensinactWhiteboard;
import org.eclipse.sensinact.model.core.provider.ProviderPackage;
import org.osgi.service.component.AnyService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.osgi.service.typedevent.TypedEventBus;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.PromiseFactory;

@Component(immediate=true)
public class GatewayThreadImpl
extends Thread
implements GatewayThread {
    private final TypedEventBus typedEventBus;
    private final SensinactWhiteboard whiteboard;
    private final ModelNexus nexusImpl;
    private final BlockingQueue<WorkItem<?>> work = new ArrayBlockingQueue(4096);
    private final AtomicBoolean run = new AtomicBoolean(true);
    private final PromiseFactory promiseFactory = new PromiseFactory((Executor)Executors.newSingleThreadExecutor(r -> new Thread(r, "Eclipse sensiNact Gateway Worker")), Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "Eclipse sensiNact Scheduler")));
    private final AtomicReference<WorkItem<?>> currentItem = new AtomicReference();
    private IMetricsManager metrics;

    @Activate
    public GatewayThreadImpl(@Reference IMetricsManager metrics, @Reference TypedEventBus typedEventBus, @Reference ResourceSet resourceSet, @Reference ProviderPackage providerPackage) {
        this.metrics = metrics;
        this.typedEventBus = typedEventBus;
        this.whiteboard = new SensinactWhiteboard(this, metrics);
        this.nexusImpl = new ModelNexus(resourceSet, providerPackage, this::getCurrentAccumulator, this.whiteboard);
        this.start();
    }

    @Deactivate
    void deactivate() {
        this.run.set(false);
        this.interrupt();
        try {
            this.join(500L);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.nexusImpl.shutDown();
        ExecutorService executor = (ExecutorService)this.promiseFactory.executor();
        ScheduledExecutorService scheduledExecutor = this.promiseFactory.scheduledExecutor();
        executor.shutdown();
        scheduledExecutor.shutdown();
        try {
            if (!executor.awaitTermination(2L, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
            if (!scheduledExecutor.awaitTermination(2L, TimeUnit.SECONDS)) {
                scheduledExecutor.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            executor.shutdownNow();
            scheduledExecutor.shutdown();
        }
    }

    @Reference(cardinality=ReferenceCardinality.MULTIPLE, policy=ReferencePolicy.DYNAMIC)
    void addEPackage(EPackage ePackage) {
        this.nexusImpl.addEPackage(ePackage);
    }

    void removeEPackage(EPackage ePackage) {
        this.nexusImpl.removeEPackage(ePackage);
    }

    @Reference(service=WhiteboardHandler.class, cardinality=ReferenceCardinality.MULTIPLE, policy=ReferencePolicy.DYNAMIC)
    void addWhiteboardResourceHandler(WhiteboardHandler wbHandler, Map<String, Object> props) {
        this.whiteboard.addWhiteboardHandler(wbHandler, props);
    }

    void updatedWhiteboardResourceHandler(WhiteboardHandler wbHandler, Map<String, Object> props) {
        this.whiteboard.updatedWhiteboardHandler(wbHandler, props);
    }

    void removeWhiteboardResourceHandler(WhiteboardHandler wbHandler, Map<String, Object> props) {
        this.whiteboard.removeWhiteboardHandler(wbHandler, props);
    }

    @Reference(service=AnyService.class, target="(sensiNact.whiteboard.resource=true)", cardinality=ReferenceCardinality.MULTIPLE, policy=ReferencePolicy.DYNAMIC)
    void addWhiteboardService(Object service, Map<String, Object> props) {
        this.whiteboard.addWhiteboardService(service, props);
    }

    void updatedWhiteboardService(Object service, Map<String, Object> props) {
        this.whiteboard.updatedWhiteboardService(service, props);
    }

    void removeWhiteboardService(Object service, Map<String, Object> props) {
        this.whiteboard.removeWhiteboardService(service, props);
    }

    private NotificationAccumulator getCurrentAccumulator() {
        WorkItem<?> workItem = this.currentItem.get();
        return workItem == null ? new ImmediateNotificationAccumulator(this.typedEventBus) : workItem.command.getAccumulator();
    }

    public PromiseFactory getPromiseFactory() {
        return this.promiseFactory;
    }

    public NotificationAccumulator createAccumulator() {
        return new NotificationAccumulatorImpl(this.typedEventBus);
    }

    public <T> Promise<T> execute(AbstractSensinactCommand<T> command) {
        Deferred d = this.getPromiseFactory().deferred();
        this.work.add(new WorkItem<T>(d, command, this.nexusImpl));
        this.metrics.getCounter("sensinact.tasks.pending").inc();
        this.metrics.getHistogram("sensinact.tasks.pending.hist").update((long)this.work.size());
        return d.getPromise();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        while (this.run.get()) {
            try {
                WorkItem<?> item = this.work.take();
                this.currentItem.set(item);
                this.metrics.getCounter("sensinact.tasks.pending").dec();
                this.metrics.getHistogram("sensinact.tasks.pending.hist").update((long)this.work.size());
                IMetricTimer timer = this.metrics.withTimer("sensinact.task.time");
                try {
                    item.doWork();
                }
                finally {
                    if (timer == null) continue;
                    timer.close();
                }
            }
            catch (InterruptedException e) {}
            continue;
            finally {
                this.currentItem.set(null);
            }
        }
    }

    private class WorkItem<T> {
        private final Deferred<T> d;
        private final AbstractSensinactCommand<T> command;
        private final ModelNexus nexusImpl;

        public WorkItem(Deferred<T> d, AbstractSensinactCommand<T> command, ModelNexus nexusImpl) {
            this.d = d;
            this.command = command;
            this.nexusImpl = nexusImpl;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void doWork() {
            try {
                Promise promise;
                SensinactDigitalTwinImpl twinImpl = new SensinactDigitalTwinImpl(this.nexusImpl, GatewayThread.getGatewayThread().getPromiseFactory());
                SensinactModelManagerImpl mgrImpl = new SensinactModelManagerImpl(this.nexusImpl);
                try {
                    promise = this.command.call((SensinactDigitalTwin)twinImpl, (SensinactModelManager)mgrImpl);
                }
                finally {
                    twinImpl.invalidate();
                    mgrImpl.invalidate();
                }
                this.d.resolveWith(promise);
            }
            catch (Exception e) {
                this.d.fail((Throwable)e);
            }
        }
    }
}

