package org.gecko.adapter.amqp.api;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.logging.Logger;
import org.gecko.adapter.amqp.client.AMQPContext;
import org.osgi.util.promise.Deferred;

/* loaded from: input_file:org/gecko/adapter/amqp/api/BasicReSubscribeConsumer.class */
public abstract class BasicReSubscribeConsumer<T> extends DefaultConsumer implements ReSubscribeCallback<T> {
    private static final Logger logger = Logger.getLogger(BasicReSubscribeConsumer.class.getName());
    private final AtomicBoolean progress;
    private final AtomicBoolean cancelled;
    private Consumer<AMQPContext> disconnectCallback;
    private Deferred<T> deferred;
    protected final AMQPContext context;
    protected final String consumerTag;
    protected Envelope envelope;
    protected BasicProperties properties;
    protected byte[] body;

    public BasicReSubscribeConsumer(Channel channel, String str, AMQPContext aMQPContext, Consumer<AMQPContext> consumer) {
        super(channel);
        this.progress = new AtomicBoolean(false);
        this.cancelled = new AtomicBoolean(false);
        this.envelope = null;
        this.properties = null;
        this.body = null;
        Objects.requireNonNull(str);
        Objects.requireNonNull(aMQPContext);
        this.context = aMQPContext;
        this.consumerTag = str;
        this.disconnectCallback = consumer;
    }

    public BasicReSubscribeConsumer(Channel channel, String str, AMQPContext aMQPContext) {
        this(channel, str, aMQPContext, null);
    }

    public void setDeferred(Deferred<T> deferred) {
        Objects.requireNonNull(deferred);
        this.deferred = deferred;
    }

    public void setDisconnectCallback(Consumer<AMQPContext> consumer) {
        this.disconnectCallback = consumer;
    }

    @Override // org.gecko.adapter.amqp.api.ReSubscribeCallback
    public boolean shouldCloseConsumer() throws IOException {
        return false;
    }

    @Override // org.gecko.adapter.amqp.api.ReSubscribeCallback
    public boolean shouldResubscribe() {
        return false;
    }

    @Override // org.gecko.adapter.amqp.api.ReSubscribeCallback
    public T getCloseValue() {
        return null;
    }

    public void handleCancelOk(String str) {
        this.cancelled.compareAndSet(false, true);
        checkResubscribe();
    }

    private void doDisconnect() {
        Deferred<T> deferred = getDeferred();
        try {
            preDisconnect();
            if (Objects.nonNull(this.disconnectCallback)) {
                this.disconnectCallback.accept(this.context);
            }
            postDisconnect();
        } catch (Exception e) {
            deferred.fail(e);
        }
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        this.envelope = envelope;
        this.properties = basicProperties;
        this.body = bArr;
        long deliveryTag = envelope.getDeliveryTag();
        String routingKey = envelope.getRoutingKey();
        String exchange = envelope.getExchange();
        String contentType = basicProperties.getContentType();
        logger.fine(() -> {
            return String.format("Tag: '%s'; exchange: '%s'; key: '%s'; contentType: '%s'; deliveryTag: '%s'", str, exchange, routingKey, contentType, Long.valueOf(deliveryTag));
        });
        doHandleDelivery(str, envelope, basicProperties, bArr);
        AMQPProperties properties = this.context.getProperties();
        if (Objects.nonNull(properties) && !properties.autoAcknowledge()) {
            getChannel().basicAck(envelope.getDeliveryTag(), false);
        }
        doShouldCloseConsumer();
    }

    protected boolean isCancelled() {
        return this.cancelled.get();
    }

    public final void checkResubscribe() {
        if (shouldResubscribe() && isCancelled() && !isProgress()) {
            doDisconnect();
            this.deferred.resolve(getCloseValue());
        }
    }

    protected abstract void doHandleDelivery(String str, Envelope envelope, BasicProperties basicProperties, byte[] bArr) throws IOException;

    protected void preDisconnect() throws IOException {
    }

    protected void postDisconnect() throws IOException {
    }

    protected Deferred<T> getDeferred() {
        Objects.requireNonNull(this.deferred);
        return this.deferred;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setProgress(boolean z) {
        this.progress.set(z);
    }

    private boolean isProgress() {
        return this.progress.get();
    }

    private boolean doShouldCloseConsumer() throws IOException {
        if (shouldCloseConsumer()) {
            return doCancelConsumer();
        }
        return false;
    }

    private boolean doCancelConsumer() throws IOException {
        Deferred<T> deferred = getDeferred();
        try {
            getChannel().basicCancel(this.consumerTag);
            return true;
        } catch (Exception e) {
            deferred.fail(e);
            return false;
        }
    }
}
