package org.apache.aries.typedevent.bus.impl;

import java.time.Instant;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.osgi.annotation.bundle.Capability;
import org.osgi.service.typedevent.monitor.MonitorEvent;
import org.osgi.service.typedevent.monitor.TypedEventMonitor;
import org.osgi.util.pushstream.PushEvent;
import org.osgi.util.pushstream.PushEventSource;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.PushStreamBuilder;
import org.osgi.util.pushstream.PushStreamProvider;
import org.osgi.util.pushstream.PushbackPolicyOption;
import org.osgi.util.pushstream.QueuePolicyOption;
import org.osgi.util.pushstream.SimplePushEventSource;

@Capability(namespace = "osgi.service", attribute = {"objectClass:List<String>=org.osgi.service.typedevent.monitor.TypedEventMonitor"}, uses = {TypedEventMonitor.class})
/* loaded from: input_file:jar/org.apache.aries.typedevent.bus-0.0.2-SNAPSHOT.jar:org/apache/aries/typedevent/bus/impl/TypedEventMonitorImpl.class */
public class TypedEventMonitorImpl implements TypedEventMonitor {
    private final LinkedList<MonitorEvent> historicEvents = new LinkedList<>();
    private final Object lock = new Object();
    private final int historySize = 1024;
    private final ExecutorService monitoringWorker = Executors.newCachedThreadPool();
    private final PushStreamProvider psp = new PushStreamProvider();
    private final SimplePushEventSource<MonitorEvent> source = (SimplePushEventSource) this.psp.buildSimpleEventSource(MonitorEvent.class).withExecutor(this.monitoringWorker).withQueuePolicy(QueuePolicyOption.BLOCK).build();

    public TypedEventMonitorImpl(Map<String, ?> map) {
    }

    public void destroy() {
        this.source.close();
        this.monitoringWorker.shutdown();
    }

    public void event(String str, Map<String, Object> map) {
        MonitorEvent monitorEvent = new MonitorEvent();
        monitorEvent.eventData = map;
        monitorEvent.topic = str;
        monitorEvent.publicationTime = Instant.now();
        synchronized (this.lock) {
            this.historicEvents.add(monitorEvent);
            for (int size = this.historicEvents.size() - 1024; size > 0; size--) {
                this.historicEvents.poll();
            }
            this.source.publish(monitorEvent);
        }
    }

    @Override // org.osgi.service.typedevent.monitor.TypedEventMonitor
    public PushStream<MonitorEvent> monitorEvents() {
        return monitorEvents(0);
    }

    @Override // org.osgi.service.typedevent.monitor.TypedEventMonitor
    public PushStream<MonitorEvent> monitorEvents(int i) {
        return (PushStream) this.psp.buildStream(eventSource(i)).withBuffer((PushStreamBuilder) new ArrayBlockingQueue(Math.max(1024, i))).withPushbackPolicy(PushbackPolicyOption.FIXED, 0L).withQueuePolicy(QueuePolicyOption.FAIL).withExecutor((Executor) this.monitoringWorker).build();
    }

    @Override // org.osgi.service.typedevent.monitor.TypedEventMonitor
    public PushStream<MonitorEvent> monitorEvents(Instant instant) {
        return (PushStream) this.psp.buildStream(eventSource(instant)).withBuffer((PushStreamBuilder) new ArrayBlockingQueue(1024)).withPushbackPolicy(PushbackPolicyOption.FIXED, 0L).withQueuePolicy(QueuePolicyOption.FAIL).withExecutor((Executor) this.monitoringWorker).build();
    }

    PushEventSource<MonitorEvent> eventSource(int i) {
        return pushEventConsumer -> {
            synchronized (this.lock) {
                int size = this.historicEvents.size();
                Iterator<MonitorEvent> it = this.historicEvents.subList(Math.max(0, size - i), size).iterator();
                while (it.hasNext()) {
                    try {
                        if (pushEventConsumer.accept(PushEvent.data(it.next())) < 0) {
                            return () -> {
                            };
                        }
                    } catch (Exception e) {
                        return () -> {
                        };
                    }
                }
                return this.source.open(pushEventConsumer);
            }
        };
    }

    PushEventSource<MonitorEvent> eventSource(Instant instant) {
        return pushEventConsumer -> {
            synchronized (this.lock) {
                ListIterator<MonitorEvent> listIterator = this.historicEvents.listIterator();
                while (true) {
                    if (!listIterator.hasNext()) {
                        break;
                    }
                    if (listIterator.next().publicationTime.isAfter(instant)) {
                        listIterator.previous();
                        break;
                    }
                }
                while (listIterator.hasNext()) {
                    try {
                        if (pushEventConsumer.accept(PushEvent.data(listIterator.next())) < 0) {
                            return () -> {
                            };
                        }
                    } catch (Exception e) {
                        return () -> {
                        };
                    }
                }
                return this.source.open(pushEventConsumer);
            }
        };
    }
}
