package org.gecko.adapter.amqp;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.adapter.amqp.client.AMQPContext;
import org.gecko.adapter.amqp.client.AMQPContextBuilder;
import org.gecko.adapter.amqp.consumer.AMQPHelper;
import org.gecko.adapter.amqp.consumer.AMQPRPCConsumer;
import org.gecko.adapter.amqp.consumer.AMQPReplyToConsumer;
import org.gecko.core.api.PropertyHelper;
import org.gecko.core.pushstream.PushStreamHelper;
import org.gecko.core.pushstream.SimplePushEventSourceContext;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingContext;
import org.gecko.osgi.messaging.MessagingReplyToService;
import org.gecko.osgi.messaging.ReplyToPolicy;
import org.osgi.annotation.bundle.Capability;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.osgi.util.promise.Promise;
import org.osgi.util.pushstream.PushStream;

@Capability(namespace = "gecko.messaging", name = "replyToService", version = "1.0.0", attribute = {"vendor=Gecko.io", "implementation=AMQP"})
@Component(service = {MessagingReplyToService.class}, name = "AMQPReplyToService", configurationPolicy = ConfigurationPolicy.REQUIRE, immediate = true)
/* loaded from: input_file:org/gecko/adapter/amqp/AMQPReplyToService.class */
public class AMQPReplyToService implements MessagingReplyToService, AutoCloseable {
    private static final Logger logger = Logger.getLogger(AMQPReplyToService.class.getName());
    private AtomicReference<Connection> connectionRef = new AtomicReference<>();
    private SimplePushEventSourceContext<Message> esContext;
    private ConnectionFactory connectionFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    @ObjectClassDefinition
    /* loaded from: input_file:org/gecko/adapter/amqp/AMQPReplyToService$AMQPConfig.class */
    public @interface AMQPConfig {
        String username();

        String password();

        String host() default "localhost";

        int port() default 5672;

        String virtualHost() default "";

        boolean autoRecovery() default false;

        boolean jmx() default false;

        String brokerUrl();
    }

    @Activate
    void activate(AMQPConfig aMQPConfig, Map<String, Object> map) throws Exception {
        try {
            this.connectionFactory = configureConnectionFactory(aMQPConfig, map);
            connect();
            this.esContext = PushStreamHelper.getEventSourceContext(map);
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Error creating AMQP connection", (Throwable) e);
            throw e;
        }
    }

    @Deactivate
    void deactivate() throws Exception {
        close();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        Connection connection = this.connectionRef.get();
        if (connection != null) {
            try {
                connection.close();
            } catch (Exception e) {
                logger.log(Level.SEVERE, "Error closing connection ", (Throwable) e);
            }
        }
    }

    public Promise<Message> publishSingle(String str, ByteBuffer byteBuffer) throws Exception {
        AMQPContext build = new AMQPContextBuilder().topic().m8queue(str).build();
        build.setRpc(true);
        return publishSingle(str, byteBuffer, build);
    }

    public Promise<Message> publishSingle(String str, ByteBuffer byteBuffer, MessagingContext messagingContext) throws Exception {
        if (messagingContext == null || !(messagingContext instanceof AMQPContext)) {
            throw new IllegalStateException("Invalid context was provided. Please use an AMQPContextBuilder to create the right one");
        }
        AMQPContext aMQPContext = (AMQPContext) messagingContext;
        aMQPContext.setQueueName(str);
        Channel connectExchange = aMQPContext.isExchangeMode() ? connectExchange(aMQPContext, false) : connectQueue(aMQPContext);
        if (aMQPContext.getReplyAddress() == null) {
            aMQPContext.setReplyAddress(connectExchange.queueDeclare().getQueue());
        }
        if (aMQPContext.getCorrelationId() == null) {
            aMQPContext.setCorrelationId(UUID.randomUUID().toString());
        }
        AMQP.BasicProperties createMessageProperties = AMQPHelper.createMessageProperties(aMQPContext);
        if (!connectExchange.isOpen()) {
            throw new IllegalStateException("Channel is closed");
        }
        byte[] array = byteBuffer.array();
        AMQPRPCConsumer aMQPRPCConsumer = new AMQPRPCConsumer(connectExchange, aMQPContext.getCorrelationId());
        if (aMQPContext.isExchangeMode()) {
            connectExchange.basicPublish(aMQPContext.getExchangeName(), aMQPContext.getRoutingKey(), createMessageProperties, array);
        } else {
            connectExchange.basicPublish("", aMQPContext.getQueueName(), createMessageProperties, array);
        }
        String basicConsume = connectExchange.basicConsume(aMQPContext.getReplyAddress(), true, aMQPRPCConsumer);
        return aMQPRPCConsumer.getMessage().thenAccept(message -> {
            connectExchange.basicCancel(basicConsume);
        }).thenAccept(message2 -> {
            connectExchange.close();
        });
    }

    public PushStream<Message> publishMany(String str, ByteBuffer byteBuffer) throws Exception {
        AMQPContext build = new AMQPContextBuilder().topic().m8queue(str).build();
        build.setRpc(true);
        return publishMany(str, byteBuffer, build);
    }

    public PushStream<Message> publishMany(String str, ByteBuffer byteBuffer, MessagingContext messagingContext) throws Exception {
        if (messagingContext == null || !(messagingContext instanceof AMQPContext)) {
            throw new IllegalStateException("Invalid context was provided. Please use an AMQPContextBuilder to create the right one");
        }
        AMQPContext aMQPContext = (AMQPContext) messagingContext;
        aMQPContext.setQueueName(str);
        Channel connectExchange = aMQPContext.isExchangeMode() ? connectExchange(aMQPContext, false) : connectQueue(aMQPContext);
        if (aMQPContext.getReplyAddress() == null) {
            aMQPContext.setReplyAddress(connectExchange.queueDeclare().getQueue());
        } else {
            connectExchange.queueDeclare(aMQPContext.getReplyAddress(), aMQPContext.isDurable(), aMQPContext.isExclusive(), aMQPContext.isAutoDelete(), (Map) null);
        }
        if (aMQPContext.getCorrelationId() == null) {
            aMQPContext.setCorrelationId(UUID.randomUUID().toString());
        }
        aMQPContext.getHeader().put("replyToPolicy", ReplyToPolicy.MULTIPLE.name());
        AMQP.BasicProperties createMessageProperties = AMQPHelper.createMessageProperties(aMQPContext);
        if (!connectExchange.isOpen()) {
            connectExchange.close();
            throw new IllegalStateException("Channel is closed");
        }
        AMQPReplyToConsumer aMQPReplyToConsumer = new AMQPReplyToConsumer(connectExchange, aMQPContext.getReplyAddress(), aMQPContext.getAcknowledgeFilter(), this.esContext, aMQPContext.getCorrelationId());
        byte[] array = byteBuffer.array();
        connectExchange.basicConsume(aMQPContext.getReplyAddress(), false, aMQPReplyToConsumer);
        if (aMQPContext.isExchangeMode()) {
            connectExchange.basicPublish(aMQPContext.getExchangeName(), aMQPContext.getRoutingKey(), createMessageProperties, array);
        } else {
            connectExchange.basicPublish("", aMQPContext.getQueueName(), createMessageProperties, array);
        }
        return aMQPReplyToConsumer.createPushstream(messagingContext);
    }

    private ConnectionFactory configureConnectionFactory(AMQPConfig aMQPConfig, Map<String, Object> map) throws ConfigurationException {
        if (aMQPConfig == null) {
            throw new IllegalArgumentException("Cannot create a connection factory without a configuration");
        }
        boolean z = false;
        ConnectionFactory connectionFactory = new ConnectionFactory();
        if (aMQPConfig.brokerUrl() != null && aMQPConfig.brokerUrl().startsWith("amqp://")) {
            try {
                connectionFactory.setUri(aMQPConfig.brokerUrl());
                z = true;
            } catch (URISyntaxException | KeyManagementException | NoSuchAlgorithmException e) {
                logger.log(Level.SEVERE, "Error setting the URI to connection factroy " + aMQPConfig.brokerUrl(), e);
            }
        }
        if (!z && validateConfiguration(aMQPConfig)) {
            connectionFactory.setPort(aMQPConfig.port());
            connectionFactory.setHost(aMQPConfig.host());
            connectionFactory.setVirtualHost(aMQPConfig.virtualHost());
            if (aMQPConfig.username() == null || aMQPConfig.username().isEmpty()) {
                Object value = PropertyHelper.createHelper().getValue(map, "username");
                if (value != null) {
                    connectionFactory.setUsername(value.toString());
                }
            } else {
                connectionFactory.setUsername(aMQPConfig.username());
            }
            if (aMQPConfig.password() == null || aMQPConfig.password().isEmpty()) {
                Object value2 = PropertyHelper.createHelper().getValue(map, "password");
                if (value2 != null) {
                    connectionFactory.setPassword(value2.toString());
                }
            } else {
                connectionFactory.setPassword(aMQPConfig.password());
            }
        } else if (!z) {
            throw new ConfigurationException("amqp.configuration", "Error validating AMQP configuration, there are missing mandatory values");
        }
        if (aMQPConfig.autoRecovery()) {
            connectionFactory.setAutomaticRecoveryEnabled(aMQPConfig.autoRecovery());
        }
        return connectionFactory;
    }

    private boolean validateConfiguration(AMQPConfig aMQPConfig) {
        return (!(aMQPConfig != null) || !(aMQPConfig.port() > 0) || aMQPConfig.host() == null || aMQPConfig.host().isEmpty() || aMQPConfig.virtualHost() == null || aMQPConfig.virtualHost().isEmpty()) ? false : true;
    }

    private void connect() throws IOException, TimeoutException {
        Connection connection = this.connectionRef.get();
        if (connection == null || !connection.isOpen()) {
            this.connectionRef.set(this.connectionFactory.newConnection());
        }
    }

    private Channel connectExchange(AMQPContext aMQPContext, boolean z) throws IOException, TimeoutException {
        String exchangeName = aMQPContext.getExchangeName();
        String routingKey = aMQPContext.getRoutingKey();
        String routingType = aMQPContext.getRoutingType();
        String key = AMQPHelper.getKey(aMQPContext);
        if (z) {
            key = key + "_sub";
        }
        connect();
        Channel createChannel = this.connectionRef.get().createChannel();
        createChannel.exchangeDeclare(exchangeName, routingType, aMQPContext.isDurable(), aMQPContext.isAutoDelete(), (Map) null);
        if (z) {
            String queue = createChannel.queueDeclare().getQueue();
            aMQPContext.setQueueName(queue);
            createChannel.queueBind(queue, exchangeName, routingKey);
        }
        logger.log(Level.FINE, "[{0}] Created channel", key);
        return createChannel;
    }

    private Channel connectQueue(AMQPContext aMQPContext) throws IOException, TimeoutException {
        connect();
        return this.connectionRef.get().createChannel();
    }
}
