package org.gecko.adapter.amqp.api;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.gecko.adapter.amqp.client.AMQPContext;
import org.gecko.adapter.amqp.consumer.AMQPHelper;
import org.gecko.util.common.PropertyHelper;
import org.gecko.util.common.concurrent.NamedThreadFactory;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.util.converter.Converter;
import org.osgi.util.converter.Converters;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.PromiseFactory;
import org.osgi.util.promise.Promises;

/* loaded from: input_file:org/gecko/adapter/amqp/api/BasicAMQPService.class */
public abstract class BasicAMQPService {
    private static final Logger logger = Logger.getLogger(BasicAMQPService.class.getName());
    private Map<String, Object> properties;
    private Promise<Connection> connectionPromise;
    private AMQPConfiguration configuration = null;
    private AMQPProperties amqpProps = null;
    private AtomicReference<Connection> connectionRef = new AtomicReference<>();
    private Map<String, Channel> channelMap = new ConcurrentHashMap();
    private Promise<Channel> channelPromise = null;
    protected final Converter converter = Converters.standardConverter();

    /* JADX INFO: Access modifiers changed from: protected */
    public void activate(AMQPConfiguration aMQPConfiguration, Map<String, Object> map) throws Exception {
        if (!validateConfiguration(aMQPConfiguration)) {
            throw new ConfigurationException("AMQP-Configuration", "The AMQP configuration is not valid");
        }
        this.amqpProps = (AMQPProperties) this.converter.convert(map).to(AMQPProperties.class);
        this.properties = map;
        this.configuration = aMQPConfiguration;
        this.connectionPromise = new PromiseFactory(Executors.newSingleThreadExecutor(NamedThreadFactory.newNamedFactory("AMQPService-" + aMQPConfiguration.name()))).submit(this::configureConnectionFactory).map(this::doConnect).thenAccept(this::postConnect).onFailure(th -> {
            logger.log(Level.SEVERE, "Error creating AMQP connection", th);
        });
        if (getConfiguration().immediateChannel()) {
            this.channelPromise = this.connectionPromise.map(this::configureImmediateChannel);
        }
    }

    protected Connection postConnect(Connection connection) {
        return connection;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void close() throws Exception {
        if (!getConfiguration().immediateChannel()) {
            this.channelMap.keySet().forEach(str -> {
                try {
                    disconnectChannel(str);
                } catch (Exception e) {
                    logger.log(Level.SEVERE, String.format("[%s] Error closing channel", str), (Throwable) e);
                }
            });
            this.channelMap.clear();
        } else if (Objects.nonNull(this.channelPromise)) {
            this.channelPromise.filter((v0) -> {
                return v0.isOpen();
            }).thenAccept((v0) -> {
                v0.close();
            });
        }
        this.connectionPromise.onResolve(() -> {
            Connection connection = this.connectionRef.get();
            if (Objects.nonNull(connection) && connection.isOpen()) {
                connection.abort();
                this.connectionRef.compareAndSet(connection, null);
            }
        }).onFailure(th -> {
            logger.log(Level.SEVERE, "Error closing AMQP connection ", th);
        });
    }

    protected ConnectionFactory configureConnectionFactory() throws ConfigurationException {
        boolean z = false;
        ConnectionFactory connectionFactory = new ConnectionFactory();
        AMQPConfiguration configuration = getConfiguration();
        if (configuration.brokerUrl() != null && configuration.brokerUrl().startsWith("amqp://")) {
            try {
                connectionFactory.setUri(configuration.brokerUrl());
                z = true;
            } catch (URISyntaxException | KeyManagementException | NoSuchAlgorithmException e) {
                logger.log(Level.SEVERE, "Error setting the URI to connection factroy " + configuration.brokerUrl(), e);
            }
        }
        if (!z && validateConfiguration(configuration)) {
            if (configuration.tls()) {
                configureTLSConnection(connectionFactory, configuration);
            }
            connectionFactory.setPort(configuration.port());
            connectionFactory.setHost(configuration.host());
            connectionFactory.setVirtualHost(configuration.virtualHost());
            if (configuration.username() == null || configuration.username().isEmpty()) {
                Object value = PropertyHelper.createHelper().getValue(this.properties, "username");
                if (value != null) {
                    connectionFactory.setUsername(value.toString());
                }
            } else {
                connectionFactory.setUsername(configuration.username());
            }
            if (configuration.password() == null || configuration.password().isEmpty()) {
                Object value2 = PropertyHelper.createHelper().getValue(this.properties, "password");
                if (value2 != null) {
                    connectionFactory.setPassword(value2.toString());
                }
            } else {
                connectionFactory.setPassword(configuration.password());
            }
        } else if (!z) {
            throw new ConfigurationException("amqp.configuration", "Error validating AMQP configuration, there are missing mandatory values");
        }
        return connectionFactory;
    }

    protected void configureTLSConnection(ConnectionFactory connectionFactory, AMQPConfiguration aMQPConfiguration) throws ConfigurationException {
        Objects.requireNonNull(connectionFactory);
        Objects.requireNonNull(aMQPConfiguration);
        String keyStorePassword = aMQPConfiguration.keyStorePassword();
        String trustStorePassword = aMQPConfiguration.trustStorePassword();
        char[] charArray = Objects.nonNull(keyStorePassword) ? keyStorePassword.toCharArray() : "".toCharArray();
        try {
            KeyStore keyStore = KeyStore.getInstance("PKCS12");
            keyStore.load(new FileInputStream(aMQPConfiguration.keyStore()), charArray);
            KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance("SunX509");
            keyManagerFactory.init(keyStore, charArray);
            char[] charArray2 = Objects.nonNull(trustStorePassword) ? trustStorePassword.toCharArray() : "".toCharArray();
            KeyStore keyStore2 = KeyStore.getInstance("JKS");
            keyStore2.load(new FileInputStream(aMQPConfiguration.trustStore()), charArray2);
            TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("SunX509");
            trustManagerFactory.init(keyStore2);
            SSLContext sSLContext = SSLContext.getInstance("TLSv1.2");
            sSLContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
            connectionFactory.useSslProtocol(sSLContext);
        } catch (Exception e) {
            throw new ConfigurationException("tls", "TLS configuration is invalid", e);
        }
    }

    protected Connection doConnect(ConnectionFactory connectionFactory) throws IOException, TimeoutException {
        Connection connection = this.connectionRef.get();
        if (Objects.isNull(connection) || !connection.isOpen()) {
            this.connectionRef.set(connectionFactory.newConnection());
        }
        return connection;
    }

    protected Connection ensureOpenConnection() {
        try {
            this.connectionPromise.getValue();
        } catch (InterruptedException e) {
            logger.log(Level.SEVERE, "Getting a connection was interrupted.", (Throwable) e);
            Thread.currentThread().interrupt();
        } catch (InvocationTargetException e2) {
            throw new IllegalStateException("Getting a connection failed.", e2.getCause());
        }
        Connection connection = this.connectionRef.get();
        if (Objects.nonNull(connection) && connection.isOpen()) {
            return connection;
        }
        throw new IllegalStateException(String.format("Got a non-open connection '%s'", connection));
    }

    protected boolean validateConfiguration(AMQPConfiguration aMQPConfiguration) {
        boolean z = Objects.nonNull(aMQPConfiguration) && Objects.nonNull(aMQPConfiguration.virtualHost()) && aMQPConfiguration.port() > 0 && (AMQPHelper.validateExchangeConfiguration(aMQPConfiguration) || AMQPHelper.validateQueueConfiguration(aMQPConfiguration)) && Objects.nonNull(aMQPConfiguration.host()) && !aMQPConfiguration.host().isEmpty();
        return (Objects.nonNull(aMQPConfiguration) && aMQPConfiguration.tls()) ? z && Objects.nonNull(aMQPConfiguration.keyStore()) && !aMQPConfiguration.keyStore().isBlank() && Objects.nonNull(aMQPConfiguration.trustStore()) && !aMQPConfiguration.trustStore().isBlank() : z;
    }

    protected Channel configureImmediateChannel(Connection connection) throws IOException, TimeoutException, ConfigurationException {
        Objects.requireNonNull(connection);
        Channel createChannel = connection.createChannel();
        String exchange = getConfiguration().exchange();
        String str = getConfiguration().topic();
        String routingKey = getConfiguration().routingKey();
        if (AMQPHelper.validateExchangeConfiguration(getConfiguration())) {
            createChannel.exchangeDeclare(exchange, AMQPContext.RoutingType.DIRECT.toString().toLowerCase(), false, false, (Map) null);
            if (AMQPHelper.validateQueueConfiguration(getConfiguration())) {
                createChannel.queueDeclare(str, false, false, false, (Map) null);
            } else {
                str = createChannel.queueDeclare().getQueue();
            }
            createChannel.queueBind(str, exchange, routingKey);
        } else {
            if (!AMQPHelper.validateQueueConfiguration(getConfiguration())) {
                throw new ConfigurationException("topic", "No topic name was provided or no correct exchange setup was provided");
            }
            createChannel.queueDeclare(str, false, false, false, (Map) null);
            createChannel.queuePurge(str);
        }
        return createChannel;
    }

    private void disconnectChannel(String str) throws IOException, TimeoutException {
        Channel remove = this.channelMap.remove(str);
        if (remove == null) {
            logger.log(Level.INFO, "[{0}] No channel exists. Nothing to disconnect", str);
        } else if (remove.isOpen()) {
            remove.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void disconnectChannel(AMQPContext aMQPContext, boolean z) throws IOException, TimeoutException {
        String key = AMQPHelper.getKey(aMQPContext);
        if (z) {
            key = key + "_sub";
        }
        disconnectChannel(key);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Channel connectExchange(AMQPContext aMQPContext, boolean z) throws IOException, TimeoutException {
        if (!AMQPHelper.validateExchangeContext(aMQPContext)) {
            throw new IllegalStateException("Error connecting to exchange with invalid context configuration");
        }
        AMQPProperties properties = aMQPContext.getProperties();
        Channel createChannel = ensureOpenConnection().createChannel();
        String exchangeName = aMQPContext.getExchangeName();
        String queueName = aMQPContext.getQueueName();
        String routingKey = aMQPContext.getRoutingKey();
        String routingType = aMQPContext.getRoutingType();
        Optional<Channel> channel = getChannel(aMQPContext, z);
        if (channel.isPresent()) {
            return channel.get();
        }
        HashMap hashMap = new HashMap();
        if (Objects.nonNull(properties) && properties.singleActiveConsumer()) {
            hashMap.put("x-single-active-consumer", true);
        }
        createChannel.exchangeDeclare(exchangeName, routingType, aMQPContext.isDurable(), aMQPContext.isAutoDelete(), (Map) null);
        if (z) {
            if (AMQPHelper.validateQueueContext(aMQPContext)) {
                createChannel.queueDeclare(queueName, false, aMQPContext.isExclusive(), false, hashMap);
            } else {
                queueName = createChannel.queueDeclare().getQueue();
                aMQPContext.setQueueName(queueName);
            }
            createChannel.queueBind(queueName, exchangeName, routingKey);
        }
        registerChannel(createChannel, aMQPContext, z);
        return createChannel;
    }

    protected Optional<Channel> getChannel(AMQPContext aMQPContext, boolean z) {
        String key = AMQPHelper.getKey(aMQPContext, z);
        if (!this.channelMap.containsKey(key)) {
            return Optional.empty();
        }
        logger.log(Level.FINE, "[{0}] Channel already exists", key);
        return Optional.of(this.channelMap.get(key));
    }

    protected void registerChannel(Channel channel, AMQPContext aMQPContext, boolean z) {
        String key = AMQPHelper.getKey(aMQPContext, z);
        if (this.channelMap.containsKey(key)) {
            return;
        }
        this.channelMap.put(key, channel);
    }

    protected Channel connectQueue(AMQPContext aMQPContext) throws IOException, TimeoutException {
        if (AMQPHelper.validateQueueContext(aMQPContext)) {
            throw new IllegalStateException("Error connecting to queue with invalid context configuration");
        }
        Channel createChannel = ensureOpenConnection().createChannel();
        String queueName = aMQPContext.getQueueName();
        if (this.channelMap.containsKey(queueName)) {
            logger.fine("Channel already created for queue: " + queueName);
            return this.channelMap.get(queueName);
        }
        createChannel.queueDeclare(queueName, aMQPContext.isDurable(), aMQPContext.isExclusive(), aMQPContext.isAutoDelete(), (Map) null);
        this.channelMap.put(queueName, createChannel);
        logger.fine("Created channel for queue: " + queueName);
        return createChannel;
    }

    public AMQPProperties getAmqpProperties() {
        return this.amqpProps;
    }

    public AMQPConfiguration getConfiguration() {
        return this.configuration;
    }

    public Promise<Channel> getImmediateChannel() {
        return Objects.nonNull(this.channelPromise) ? this.channelPromise : Promises.failed(new IllegalStateException("Channel not available because no immediate channel was configured!"));
    }

    public Promise<Connection> getConnection() {
        return this.connectionPromise;
    }
}
