package org.gecko.adapter.amqp.pubsub;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.adapter.amqp.client.AMQPContext;
import org.gecko.adapter.amqp.client.AMQPMessage;
import org.gecko.adapter.amqp.pubsub.consumer.AMQPPubOnSubManyConsumer;
import org.gecko.osgi.messaging.MessagingRPCPubOnSub;
import org.gecko.util.common.PropertyHelper;
import org.gecko.util.common.concurrent.NamedThreadFactory;
import org.gecko.util.pushstream.PushStreamHelper;
import org.osgi.annotation.bundle.Capability;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.PromiseFactory;
import org.osgi.util.pushstream.PushStream;

@Capability(namespace = "gecko.messaging", name = "replyToExecutor", version = "1.0.0", attribute = {"vendor=Gecko.io", "implementation=AMQP"})
@Component(service = {MessagingRPCPubOnSub.class}, name = "AMQPReplyToExecutor", configurationPolicy = ConfigurationPolicy.REQUIRE, immediate = true)
/* loaded from: input_file:org/gecko/adapter/amqp/pubsub/AMQPReplyToExecutor.class */
public class AMQPReplyToExecutor implements MessagingRPCPubOnSub {
    private static final Logger logger = Logger.getLogger(AMQPReplyToExecutor.class.getName());

    @Reference(name = "ma.posFunction")
    private Function<AMQPMessage, PushStream<ByteBuffer>> callbackFunction;
    private Promise<Channel> connectionPromise;
    private Map<String, Object> properties;
    private AMQPPubOnSubConfig configuration = null;
    private AMQPPubOnSubManyConsumer consumer = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    @ObjectClassDefinition
    /* loaded from: input_file:org/gecko/adapter/amqp/pubsub/AMQPReplyToExecutor$AMQPPubOnSubConfig.class */
    public @interface AMQPPubOnSubConfig {
        String name();

        String username();

        String password();

        String host() default "localhost";

        int port() default 5672;

        String virtualHost() default "";

        boolean autoRecovery() default false;

        boolean jmx() default false;

        String brokerUrl();

        String rpcQueue();

        String rpcRoutingKey();
    }

    @Activate
    void activate(AMQPPubOnSubConfig aMQPPubOnSubConfig, Map<String, Object> map) throws Exception {
        this.properties = map;
        if (!validateConfiguration(aMQPPubOnSubConfig)) {
            throw new ConfigurationException("PubOnSub-Config", "The Publish-on-subscibe configuration is not valid");
        }
        this.configuration = aMQPPubOnSubConfig;
        this.connectionPromise = new PromiseFactory(Executors.newSingleThreadExecutor(NamedThreadFactory.newNamedFactory("PubOnSub-" + aMQPPubOnSubConfig.name()))).submit(this::configureConnectionFactory).map(this::configureConnection).thenAccept(this::configureChannel).onFailure(th -> {
            logger.log(Level.SEVERE, "Error creating AMQP publish on subscribe connection", th);
        });
    }

    @Deactivate
    void deactivate() throws Exception {
        if (this.consumer != null) {
            this.consumer.close();
        }
        this.connectionPromise.thenAccept(this::closeChannel).onFailure(th -> {
            logger.log(Level.SEVERE, "Error closing connection ", th);
        });
    }

    private ConnectionFactory configureConnectionFactory() throws ConfigurationException {
        if (this.configuration == null) {
            throw new IllegalArgumentException("Cannot create a connection factory without a configuration");
        }
        boolean z = false;
        ConnectionFactory connectionFactory = new ConnectionFactory();
        if (this.configuration.brokerUrl() != null && this.configuration.brokerUrl().startsWith("amqp://")) {
            try {
                connectionFactory.setUri(this.configuration.brokerUrl());
                z = true;
            } catch (URISyntaxException | KeyManagementException | NoSuchAlgorithmException e) {
                logger.log(Level.SEVERE, "Error setting the URI to connection factroy " + this.configuration.brokerUrl(), e);
            }
        }
        if (!z && validateConfiguration(this.configuration)) {
            connectionFactory.setPort(this.configuration.port());
            connectionFactory.setHost(this.configuration.host());
            connectionFactory.setVirtualHost(this.configuration.virtualHost());
            if (this.configuration.username() == null || this.configuration.username().isEmpty()) {
                Object value = PropertyHelper.createHelper().getValue(this.properties, "username");
                if (value != null) {
                    connectionFactory.setUsername(value.toString());
                }
            } else {
                connectionFactory.setUsername(this.configuration.username());
            }
            if (this.configuration.password() == null || this.configuration.password().isEmpty()) {
                Object value2 = PropertyHelper.createHelper().getValue(this.properties, "password");
                if (value2 != null) {
                    connectionFactory.setPassword(value2.toString());
                }
            } else {
                connectionFactory.setPassword(this.configuration.password());
            }
        } else if (!z) {
            throw new ConfigurationException("amqp.configuration", "Error validating AMQP configuration, there are missing mandatory values");
        }
        if (this.configuration.autoRecovery()) {
            connectionFactory.setAutomaticRecoveryEnabled(this.configuration.autoRecovery());
        }
        return connectionFactory;
    }

    private Channel configureConnection(ConnectionFactory connectionFactory) throws IOException, TimeoutException {
        return connectionFactory.newConnection().createChannel();
    }

    private Channel configureChannel(Channel channel) throws IOException, TimeoutException {
        String rpcQueue = this.configuration.rpcQueue();
        String rpcRoutingKey = this.configuration.rpcRoutingKey();
        if (rpcRoutingKey == null || rpcQueue == null) {
            channel.queueDeclare(rpcQueue, false, false, false, (Map) null);
            channel.queuePurge(rpcQueue);
        } else {
            channel.exchangeDeclare(rpcQueue, AMQPContext.RoutingType.DIRECT.toString().toLowerCase(), false, false, (Map) null);
            channel.queueBind(channel.queueDeclare().getQueue(), rpcQueue, rpcRoutingKey);
        }
        channel.basicQos(1);
        this.consumer = new AMQPPubOnSubManyConsumer(channel, this.callbackFunction, PushStreamHelper.getPushStreamContext(this.properties));
        channel.basicConsume(rpcQueue, false, this.consumer);
        return channel;
    }

    private void closeChannel(Channel channel) throws IOException, TimeoutException {
        if (channel == null || !channel.isOpen()) {
            return;
        }
        channel.close();
    }

    private boolean validateConfiguration(AMQPPubOnSubConfig aMQPPubOnSubConfig) {
        return (aMQPPubOnSubConfig == null || aMQPPubOnSubConfig.port() <= 0 || aMQPPubOnSubConfig.host() == null || aMQPPubOnSubConfig.host().isEmpty() || aMQPPubOnSubConfig.virtualHost() == null || aMQPPubOnSubConfig.rpcQueue() == null || aMQPPubOnSubConfig.rpcQueue().isEmpty()) ? false : true;
    }
}
