/*
 * Decompiled with CFR 0.152.
 */
package org.apache.aries.typedevent.bus.impl;

import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.osgi.annotation.bundle.Capability;
import org.osgi.service.typedevent.monitor.MonitorEvent;
import org.osgi.service.typedevent.monitor.TypedEventMonitor;
import org.osgi.util.pushstream.PushEvent;
import org.osgi.util.pushstream.PushEventSource;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.PushStreamProvider;
import org.osgi.util.pushstream.PushbackPolicyOption;
import org.osgi.util.pushstream.QueuePolicyOption;
import org.osgi.util.pushstream.SimplePushEventSource;

@Capability(namespace="osgi.service", attribute={"objectClass:List<String>=org.osgi.service.typedevent.monitor.TypedEventMonitor"}, uses={TypedEventMonitor.class})
public class TypedEventMonitorImpl
implements TypedEventMonitor {
    private final LinkedList<MonitorEvent> historicEvents = new LinkedList();
    private final ExecutorService monitoringWorker;
    private final Object lock = new Object();
    private final PushStreamProvider psp;
    private final SimplePushEventSource<MonitorEvent> source;
    private final int historySize = 1024;

    public TypedEventMonitorImpl(Map<String, ?> props) {
        this.monitoringWorker = Executors.newCachedThreadPool();
        this.psp = new PushStreamProvider();
        this.source = (SimplePushEventSource)this.psp.buildSimpleEventSource(MonitorEvent.class).withExecutor((Executor)this.monitoringWorker).withQueuePolicy(QueuePolicyOption.BLOCK).build();
    }

    public void destroy() {
        this.source.close();
        this.monitoringWorker.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void event(String topic, Map<String, Object> eventData) {
        MonitorEvent me = new MonitorEvent();
        me.eventData = eventData;
        me.topic = topic;
        me.publicationTime = Instant.now();
        Object object = this.lock;
        synchronized (object) {
            this.historicEvents.add(me);
            for (int toRemove = this.historicEvents.size() - 1024; toRemove > 0; --toRemove) {
                this.historicEvents.poll();
            }
            this.source.publish((Object)me);
        }
    }

    public PushStream<MonitorEvent> monitorEvents() {
        return this.monitorEvents(0);
    }

    public PushStream<MonitorEvent> monitorEvents(int history) {
        return (PushStream)this.psp.buildStream(this.eventSource(history)).withBuffer(new ArrayBlockingQueue(Math.max(1024, history))).withPushbackPolicy(PushbackPolicyOption.FIXED, 0L).withQueuePolicy(QueuePolicyOption.FAIL).withExecutor((Executor)this.monitoringWorker).build();
    }

    public PushStream<MonitorEvent> monitorEvents(Instant history) {
        return (PushStream)this.psp.buildStream(this.eventSource(history)).withBuffer(new ArrayBlockingQueue(1024)).withPushbackPolicy(PushbackPolicyOption.FIXED, 0L).withQueuePolicy(QueuePolicyOption.FAIL).withExecutor((Executor)this.monitoringWorker).build();
    }

    PushEventSource<MonitorEvent> eventSource(int events) {
        return pec -> {
            Object object = this.lock;
            synchronized (object) {
                int size = this.historicEvents.size();
                int start = Math.max(0, size - events);
                List list = this.historicEvents.subList(start, size);
                for (MonitorEvent me : list) {
                    try {
                        if (pec.accept(PushEvent.data((Object)me)) >= 0L) continue;
                        return () -> {};
                    }
                    catch (Exception e) {
                        return () -> {};
                    }
                }
                return this.source.open(pec);
            }
        };
    }

    PushEventSource<MonitorEvent> eventSource(Instant since) {
        return pec -> {
            Object object = this.lock;
            synchronized (object) {
                ListIterator it = this.historicEvents.listIterator();
                while (it.hasNext()) {
                    MonitorEvent next = (MonitorEvent)it.next();
                    if (!next.publicationTime.isAfter(since)) continue;
                    it.previous();
                    break;
                }
                while (it.hasNext()) {
                    try {
                        if (pec.accept(PushEvent.data(it.next())) >= 0L) continue;
                        return () -> {};
                    }
                    catch (Exception e) {
                        return () -> {};
                    }
                }
                return this.source.open(pec);
            }
        };
    }
}

