package org.gecko.util.pushstream.source;

import java.util.function.BiConsumer;
import org.gecko.util.pushstream.PushStreamContext;
import org.osgi.util.function.Consumer;
import org.osgi.util.function.Predicate;
import org.osgi.util.promise.Promise;
import org.osgi.util.pushstream.PushEventConsumer;
import org.osgi.util.pushstream.PushStreamProvider;
import org.osgi.util.pushstream.SimplePushEventSource;

/* loaded from: input_file:jar/org.gecko.util.pushstream-1.2.0.202401231920.jar:org/gecko/util/pushstream/source/AcknowledgingEventSource.class */
public class AcknowledgingEventSource<T> implements SimplePushEventSource<T> {
    private final PushStreamProvider psp = new PushStreamProvider();
    private SimplePushEventSource<T> eventSource;
    private PushStreamContext<T> context;
    private Predicate<T> ackFilter;
    private Consumer<T> ackFunction;
    private Consumer<T> nackFunction;
    private BiConsumer<Throwable, T> ackErrorFunction;

    public AcknowledgingEventSource(SimplePushEventSource<T> simplePushEventSource, PushStreamContext<T> pushStreamContext) {
        this.eventSource = simplePushEventSource;
        this.context = pushStreamContext;
        initializeContext();
    }

    public AcknowledgingEventSource(Class<T> cls, PushStreamContext<T> pushStreamContext) {
        this.eventSource = (SimplePushEventSource) this.psp.buildSimpleEventSource(cls).build();
        this.context = pushStreamContext;
        initializeContext();
    }

    public AcknowledgingEventSource<T> acknowledgeFilter(Predicate<T> predicate) {
        this.ackFilter = predicate;
        return this;
    }

    public AcknowledgingEventSource<T> acknowledge(Consumer<T> consumer) {
        this.ackFunction = consumer;
        return this;
    }

    public AcknowledgingEventSource<T> negativeAcknowledge(Consumer<T> consumer) {
        this.nackFunction = consumer;
        return this;
    }

    public AcknowledgingEventSource<T> acknowledgeError(BiConsumer<Throwable, T> biConsumer) {
        this.ackErrorFunction = biConsumer;
        return this;
    }

    @Override // org.osgi.util.pushstream.SimplePushEventSource
    public void publish(T t) {
        if (this.eventSource == null) {
            throw new IllegalStateException("Cannot publish message for a null push event source");
        }
        try {
            if (this.ackFilter == null) {
                doPublishAndAck(t);
            } else if (this.ackFilter.test(t)) {
                doPublishAndAck(t);
            } else {
                doNegativeAcknowledge(t);
            }
        } catch (Exception e) {
            throw new IllegalStateException("Error testing data for acknowledge", e);
        }
    }

    @Override // org.osgi.util.pushstream.PushEventSource
    public AutoCloseable open(PushEventConsumer<? super T> pushEventConsumer) throws Exception {
        return this.eventSource.open(pushEventConsumer);
    }

    @Override // org.osgi.util.pushstream.SimplePushEventSource, java.lang.AutoCloseable
    public void close() {
        this.eventSource.close();
    }

    @Override // org.osgi.util.pushstream.SimplePushEventSource
    public void endOfStream() {
        this.eventSource.endOfStream();
    }

    @Override // org.osgi.util.pushstream.SimplePushEventSource
    public void error(Throwable th) {
        this.eventSource.error(th);
    }

    @Override // org.osgi.util.pushstream.SimplePushEventSource
    public boolean isConnected() {
        return this.eventSource.isConnected();
    }

    @Override // org.osgi.util.pushstream.SimplePushEventSource
    public Promise<Void> connectPromise() {
        return this.eventSource.connectPromise();
    }

    private void initializeContext() {
        if (this.context != null) {
            this.ackFilter = this.context.getAcknowledgeFilter();
            this.ackFunction = this.context.getAcknowledgeFunction();
        }
    }

    private void doPublishAndAck(T t) {
        try {
            this.eventSource.publish(t);
            if (this.ackFunction != null) {
                this.ackFunction.accept(t);
            }
        } catch (Exception e) {
            if (this.ackErrorFunction != null) {
                this.ackErrorFunction.accept(e, t);
            }
        }
    }

    private void doNegativeAcknowledge(T t) {
        if (this.nackFunction != null) {
            try {
                this.nackFunction.accept(t);
            } catch (Exception e) {
                if (this.ackErrorFunction != null) {
                    this.ackErrorFunction.accept(e, t);
                }
            }
        }
    }
}
