package org.gecko.adapter.amqp.client;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:org/gecko/adapter/amqp/client/AMQPClient.class */
public class AMQPClient {
    private static final Logger logger = Logger.getLogger("o.g.ampqSend");
    private ConnectionFactory factory;
    private AtomicReference<Connection> connectionRef;
    private Map<String, Channel> channelMap;

    public static void main(String[] strArr) {
        AMQPClient aMQPClient = new AMQPClient();
        Consumer<byte[]> consumer = new Consumer<byte[]>() { // from class: org.gecko.adapter.amqp.client.AMQPClient.1
            @Override // java.util.function.Consumer
            public void accept(byte[] bArr) {
                AMQPClient.logger.info("Consumer received a message: '" + new String(bArr) + "'");
            }
        };
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            aMQPClient.registerConsumerQueue("test_queue", false, "myTag", consumer);
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed registering consumer for queue: " + "test_queue", (Throwable) e);
        }
        try {
            aMQPClient.sendSingleWithQueue("test_queue", "Hello with Queue: " + "test_queue");
        } catch (Exception e2) {
            logger.log(Level.SEVERE, "Sending failed for queue: " + "test_queue", (Throwable) e2);
        }
        try {
            aMQPClient.sendSingleWithExchange("test.queue", "myTest", "Hello with Exchange: " + "test.queue" + " and routing key: " + "myTest");
        } catch (Exception e3) {
            logger.log(Level.SEVERE, "Sending failed for exchange: " + "test.queue" + " and routing key: " + "myTest", (Throwable) e3);
        }
        try {
            try {
                countDownLatch.await(20L, TimeUnit.SECONDS);
                aMQPClient.disconnect();
                logger.info("Disconnected client");
            } catch (Exception e4) {
                logger.info("Waited 20 seconds, closing client now");
                aMQPClient.disconnect();
                logger.info("Disconnected client");
            }
        } catch (Throwable th) {
            aMQPClient.disconnect();
            logger.info("Disconnected client");
            throw th;
        }
    }

    public AMQPClient() {
        this(null);
    }

    public AMQPClient(String str) {
        this.connectionRef = new AtomicReference<>();
        this.channelMap = new ConcurrentHashMap();
        this.factory = new ConnectionFactory();
        if (str == null) {
            this.factory.setHost("devel.data-in-motion.biz");
        } else {
            this.factory.setHost(str);
        }
        this.factory.setPort(5672);
        this.factory.setUsername("demo");
        this.factory.setPassword("1234");
        this.factory.setVirtualHost("test");
        this.factory.setAutomaticRecoveryEnabled(false);
        this.factory.setTopologyRecoveryEnabled(false);
    }

    private Channel connectQueue(String str) throws IOException, TimeoutException {
        if (this.channelMap.containsKey(str)) {
            logger.info("Client - Channel already created for queue: " + str);
            return this.channelMap.get(str);
        }
        connect();
        Channel createChannel = this.connectionRef.get().createChannel();
        createChannel.queueDeclare(str, true, false, false, (Map) null);
        this.channelMap.put(str, createChannel);
        logger.info("Client - Created channel for queue: " + str);
        return createChannel;
    }

    private Channel connectExchangeFanout(String str) throws IOException, TimeoutException {
        return connectExchange(str, "", "fanout", false);
    }

    private Channel connectExchangeTopic(String str, String str2) throws IOException, TimeoutException {
        return connectExchange(str, str2, "topic", true);
    }

    private Channel connectExchangeDirect(String str, String str2) throws IOException, TimeoutException {
        return connectExchange(str, str2, "direct", false);
    }

    public void disconnect() {
        this.channelMap.keySet().forEach(str -> {
            try {
                disconnectChannel(str);
            } catch (Exception e) {
                logger.log(Level.SEVERE, "Error closing channel: " + str, (Throwable) e);
            } catch (AlreadyClosedException e2) {
            }
        });
        this.channelMap.clear();
        Connection connection = this.connectionRef.get();
        if (connection != null) {
            try {
                connection.close();
            } catch (Exception e) {
                logger.log(Level.SEVERE, "Error closing connection ", (Throwable) e);
            }
        }
    }

    public void purgeChannel(String str) throws IOException, TimeoutException {
        Channel connectQueue = connectQueue(str);
        if (connectQueue.isOpen()) {
            connectQueue.queuePurge(str);
        }
    }

    public void sendSingleWithQueue(String str, String str2) throws IOException, TimeoutException {
        Channel connectQueue = connectQueue(str);
        if (connectQueue.isOpen()) {
            connectQueue.basicPublish("", str, (AMQP.BasicProperties) null, str2.getBytes());
        }
    }

    public void sendSingleWithExchange(String str, String str2, String str3) throws IOException, TimeoutException {
        Channel connectExchangeTopic = connectExchangeTopic(str, str2);
        if (connectExchangeTopic.isOpen()) {
            connectExchangeTopic.basicPublish(str, str2, (AMQP.BasicProperties) null, str3.getBytes());
        }
    }

    public void sendSingleWithFanout(String str, String str2) throws IOException, TimeoutException {
        Channel connectExchangeFanout = connectExchangeFanout(str);
        if (connectExchangeFanout.isOpen()) {
            connectExchangeFanout.basicPublish(str, "", (AMQP.BasicProperties) null, str2.getBytes());
        }
    }

    public void sendSingleWithExchangeDirect(String str, String str2, String str3) throws IOException, TimeoutException {
        Channel connectExchangeDirect = connectExchangeDirect(str, str2);
        if (connectExchangeDirect.isOpen()) {
            connectExchangeDirect.basicPublish(str, str2, (AMQP.BasicProperties) null, str3.getBytes());
        }
    }

    public void registerConsumerFanout(String str, final Consumer<byte[]> consumer) throws IOException, TimeoutException {
        AtomicReference atomicReference = new AtomicReference();
        Objects.requireNonNull(atomicReference);
        Channel connectExchange = connectExchange((v1) -> {
            r1.set(v1);
        }, str, "", "fanout", true);
        connectExchange.basicConsume((String) atomicReference.get(), true, new DefaultConsumer(connectExchange) { // from class: org.gecko.adapter.amqp.client.AMQPClient.2
            public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                AMQPClient.logger.log(Level.INFO, "Received fanout message: '" + new String(bArr));
                consumer.accept(bArr);
            }
        });
    }

    public void registerConsumerDirect(String str, String str2, final Consumer<byte[]> consumer) throws IOException, TimeoutException {
        AtomicReference atomicReference = new AtomicReference();
        Objects.requireNonNull(atomicReference);
        Channel connectExchange = connectExchange((v1) -> {
            r1.set(v1);
        }, str, str2, "direct", true);
        connectExchange.basicConsume((String) atomicReference.get(), true, new DefaultConsumer(connectExchange) { // from class: org.gecko.adapter.amqp.client.AMQPClient.3
            public void handleDelivery(String str3, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                AMQPClient.logger.log(Level.INFO, "Received direct message: '" + new String(bArr));
                consumer.accept(bArr);
            }
        });
    }

    public void registerConsumerQueue(String str, boolean z, String str2, final Consumer<byte[]> consumer) throws IOException, TimeoutException {
        final Channel connectQueue = connectQueue(str);
        if (connectQueue.isOpen()) {
            connectQueue.basicConsume(str, z, str2, new DefaultConsumer(connectQueue) { // from class: org.gecko.adapter.amqp.client.AMQPClient.4
                public void handleDelivery(String str3, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = basicProperties.getContentType();
                    long deliveryTag = envelope.getDeliveryTag();
                    AMQPClient.logger.log(Level.INFO, "Received message: '" + new String(bArr) + "' with routingKey: " + routingKey + ", contentType: " + contentType + ", deliveryTag: " + deliveryTag);
                    consumer.accept(bArr);
                    connectQueue.basicAck(deliveryTag, false);
                }
            });
        }
    }

    public void registerRPCEcho(String str) throws IOException, TimeoutException {
        final Channel connectQueue = connectQueue(str);
        if (connectQueue.isOpen()) {
            connectQueue.basicConsume(str, true, new DefaultConsumer(connectQueue) { // from class: org.gecko.adapter.amqp.client.AMQPClient.5
                public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                    long deliveryTag = envelope.getDeliveryTag();
                    String correlationId = basicProperties.getCorrelationId();
                    String replyTo = basicProperties.getReplyTo();
                    AMQPClient.logger.log(Level.INFO, "Received RPC request: '" + new String(bArr) + ", deliveryTag: " + deliveryTag);
                    connectQueue.basicPublish("", replyTo, new AMQP.BasicProperties.Builder().correlationId(correlationId).build(), bArr);
                    connectQueue.basicAck(deliveryTag, false);
                }
            });
        }
    }

    private Channel connectExchange(String str, String str2, String str3, boolean z) throws IOException, TimeoutException {
        return connectExchange(null, str, str2, str3, z);
    }

    private Channel connectExchange(Consumer<String> consumer, String str, String str2, String str3, boolean z) throws IOException, TimeoutException {
        String str4 = str + "_" + str2 + "_" + str3;
        if (this.channelMap.containsKey(str4)) {
            logger.fine("Channel already exists for exchange: " + str + " and routing key: " + str2 + " and type: " + str3);
            return this.channelMap.get(str4);
        }
        connect();
        Channel createChannel = this.connectionRef.get().createChannel();
        if (str3.equals("fanout")) {
            createChannel.exchangeDeclare(str, str3);
        } else {
            createChannel.exchangeDeclare(str, str3, true);
        }
        if (z) {
            String queue = createChannel.queueDeclare().getQueue();
            createChannel.queueBind(queue, str, str2);
            if (consumer != null) {
                consumer.accept(queue);
            }
        }
        this.channelMap.put(str4, createChannel);
        logger.fine("Created channel for type '" + str3 + "', exchange: " + str + " and routing key: " + str2);
        return createChannel;
    }

    private void connect() throws IOException, TimeoutException {
        Connection connection = this.connectionRef.get();
        if (connection == null || !connection.isOpen()) {
            this.connectionRef.set(this.factory.newConnection());
        }
    }

    private void disconnectChannel(String str) throws IOException, TimeoutException {
        Channel channel = this.channelMap.get(str);
        if (channel == null) {
            logger.warning("No channel exists for key: " + str + " - Nothing to disconnect");
        } else {
            channel.close();
        }
    }
}
