package org.gecko.adapter.amqp.api;

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.adapter.amqp.client.AMQPContext;
import org.gecko.adapter.amqp.client.AMQPContextBuilder;
import org.gecko.util.common.concurrent.NamedThreadFactory;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.PromiseFactory;

/* loaded from: input_file:org/gecko/adapter/amqp/api/BasicReSubscriber.class */
public abstract class BasicReSubscriber<T> extends BasicAMQPService {
    private static final Logger logger = Logger.getLogger(BasicReSubscriber.class.getName());
    protected PromiseFactory pf = new PromiseFactory(Executors.newCachedThreadPool(NamedThreadFactory.newNamedFactory("AMQPRe-Subscriber")));
    private BasicReSubscribeConsumerFactory<T> consumerFactory;

    public Promise<T> subscribePromise(String str) {
        AMQPConfiguration configuration = getConfiguration();
        AMQPProperties amqpProperties = getAmqpProperties();
        Objects.requireNonNull(amqpProperties);
        AMQPContext m11build = AMQPContextBuilder.createBuilder(configuration).properties(amqpProperties).durable().m10queue(str).m11build();
        Deferred<T> deferred = this.pf.deferred();
        try {
            Channel connectExchange = connectExchange(m11build, true);
            if (amqpProperties.basicQos() != 0 && amqpProperties.basicQos() > 0) {
                connectExchange.basicQos(amqpProperties.basicQos());
            }
            String uuid = UUID.randomUUID().toString();
            BasicReSubscribeConsumer<T> createReSubscribeConsumer = createReSubscribeConsumer(connectExchange, uuid, m11build);
            Objects.requireNonNull(createReSubscribeConsumer);
            createReSubscribeConsumer.setDeferred(deferred);
            createReSubscribeConsumer.setDisconnectCallback(this::doDisconnectChannel);
            if (connectExchange.isOpen()) {
                connectExchange.basicConsume(str, amqpProperties.autoAcknowledge(), uuid, createReSubscribeConsumer);
            } else {
                deferred.fail(new IllegalStateException("The channel is not open"));
            }
        } catch (IOException | TimeoutException e) {
            doDisconnectChannel(m11build);
            deferred.fail(e);
        }
        return deferred.getPromise();
    }

    protected BasicReSubscribeConsumer<T> createReSubscribeConsumer(Channel channel, String str, AMQPContext aMQPContext) {
        Objects.requireNonNull(getConsumerFactory());
        return getConsumerFactory().createConsumer(BasicReSubscribeConsumerFactory.createContext(channel, str, aMQPContext, null, this.pf, null));
    }

    protected void doDisconnectChannel(AMQPContext aMQPContext) {
        try {
            disconnectChannel(aMQPContext, true);
        } catch (IOException | TimeoutException e) {
            logger.log(Level.SEVERE, e, () -> {
                return "Error disconnecting channel";
            });
        }
    }

    public BasicReSubscribeConsumerFactory<T> getConsumerFactory() {
        return this.consumerFactory;
    }

    public void setConsumerFactory(BasicReSubscribeConsumerFactory<T> basicReSubscribeConsumerFactory) {
        Objects.requireNonNull(basicReSubscribeConsumerFactory);
        this.consumerFactory = basicReSubscribeConsumerFactory;
    }
}
