package org.gecko.util.pushstream.distributed;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.logging.Logger;
import org.gecko.util.pushstream.PushStreamContext;
import org.gecko.util.pushstream.PushStreamHelper;
import org.osgi.util.pushstream.PushEventConsumer;
import org.osgi.util.pushstream.PushEventSource;
import org.osgi.util.pushstream.PushStreamProvider;
import org.osgi.util.pushstream.SimplePushEventSource;

/* loaded from: input_file:org/gecko/util/pushstream/distributed/DistributedEventSource.class */
public class DistributedEventSource<T> implements PushEventSource<T> {
    private static final Logger logger = Logger.getLogger(DistributedEventSource.class.getName());
    private final PushStreamProvider psp = new PushStreamProvider();
    private final AtomicReference<Runnable> connectFunction = new AtomicReference<>();
    private final AtomicReference<Runnable> closeFunction = new AtomicReference<>();
    private final AtomicReference<Consumer<Throwable>> errorFunction = new AtomicReference<>();
    private final AtomicReference<Throwable> error = new AtomicReference<>();
    private final AtomicBoolean close = new AtomicBoolean(false);
    protected final SimplePushEventSource<T> eventSource;

    public DistributedEventSource(SimplePushEventSource<T> simplePushEventSource) {
        this.eventSource = simplePushEventSource;
        this.eventSource.connectPromise().onResolve(this::doOnConnect);
    }

    public DistributedEventSource(Class<T> cls) {
        this.eventSource = this.psp.createSimpleEventSource(cls);
        this.eventSource.connectPromise().onResolve(this::doOnConnect);
    }

    public AutoCloseable open(PushEventConsumer<? super T> pushEventConsumer) throws Exception {
        return this.eventSource.open(pushEventConsumer);
    }

    public void doExternalError(Throwable th) {
        if (this.error.compareAndSet(null, th)) {
            this.close.set(true);
            this.eventSource.error(th);
        }
    }

    public void doExternalClose() {
        if (this.close.compareAndSet(false, true)) {
            this.eventSource.endOfStream();
        }
    }

    public void doExternalPublish(T t) {
        if (this.eventSource.isConnected()) {
            this.eventSource.publish(t);
        } else {
            logger.severe("The underlaying event source is not connected. This should not happen. Data gets lost!");
        }
    }

    public DistributedEventSource<T> onConnect(Runnable runnable) {
        if (this.connectFunction.compareAndSet(null, runnable)) {
            return this;
        }
        throw new IllegalStateException("A connect handler has already been defined for this source object");
    }

    public DistributedEventSource<T> onClose(Runnable runnable) {
        if (this.closeFunction.compareAndSet(null, runnable)) {
            return this;
        }
        throw new IllegalStateException("A close handler has already been set");
    }

    public DistributedEventSource<T> onError(Consumer<Throwable> consumer) {
        if (this.errorFunction.compareAndSet(null, consumer)) {
            return this;
        }
        throw new IllegalStateException("An error handler has already been set");
    }

    public DistributedPushStream<T> createPushStream(PushStreamContext<T> pushStreamContext) {
        DistributedPushStream<T> distributedPushStream = new DistributedPushStream<>(PushStreamHelper.createPushStream(this, pushStreamContext));
        distributedPushStream.distributeOnClose(this::doOnClose).distributeOnError(this::doOnError);
        return distributedPushStream;
    }

    private synchronized void doOnError(Throwable th) {
        Consumer<Throwable> consumer;
        if (this.error.compareAndSet(th, null) || (consumer = this.errorFunction.get()) == null) {
            return;
        }
        consumer.accept(th);
    }

    private void doOnClose() {
        Runnable runnable;
        if (this.close.compareAndSet(true, false) || (runnable = this.closeFunction.get()) == null) {
            return;
        }
        runnable.run();
    }

    private void doOnConnect() {
        Runnable andSet = this.connectFunction.getAndSet(null);
        if (andSet != null) {
            andSet.run();
        }
    }
}
