package org.gecko.adapter.amqp.pubsub.consumer;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.adapter.amqp.client.AMQPMessage;
import org.gecko.adapter.amqp.consumer.AMQPMessageImpl;
import org.gecko.core.pushstream.PushStreamContext;
import org.gecko.core.pushstream.PushStreamHelper;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.PushStreamProvider;
import org.osgi.util.pushstream.SimplePushEventSource;

/* loaded from: input_file:org/gecko/adapter/amqp/pubsub/consumer/AMQPPubOnSubConsumer.class */
public class AMQPPubOnSubConsumer extends DefaultConsumer {
    private static final Logger logger = Logger.getLogger(AMQPPubOnSubConsumer.class.getName());
    private final Function<AMQPMessage, ByteBuffer> function;
    private final PushStreamProvider psp;
    private final SimplePushEventSource<AMQPMessage> eventSource;
    private PushStream<AMQPMessage> responseStream;

    public AMQPPubOnSubConsumer(Channel channel, Function<AMQPMessage, ByteBuffer> function, PushStreamContext<AMQPMessage> pushStreamContext) {
        super(channel);
        this.psp = new PushStreamProvider();
        this.eventSource = (SimplePushEventSource) this.psp.buildSimpleEventSource(AMQPMessage.class).build();
        this.function = function;
        this.responseStream = PushStreamHelper.createPushStream(this.eventSource, pushStreamContext);
        this.responseStream.forEach(this::sendResponse);
    }

    public SimplePushEventSource<AMQPMessage> getEventSource() {
        return this.eventSource;
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        String routingKey = envelope.getRoutingKey();
        String exchange = envelope.getExchange();
        long deliveryTag = envelope.getDeliveryTag();
        String contentType = basicProperties.getContentType();
        String correlationId = basicProperties.getCorrelationId();
        String replyTo = basicProperties.getReplyTo();
        AMQPMessageImpl aMQPMessageImpl = new AMQPMessageImpl(exchange == null ? "" : exchange, ByteBuffer.wrap(bArr));
        aMQPMessageImpl.setDeliveryTag(deliveryTag);
        aMQPMessageImpl.setExchange(exchange);
        aMQPMessageImpl.setRoutingKey(routingKey);
        aMQPMessageImpl.setContentType(contentType);
        aMQPMessageImpl.setReplyTo(replyTo);
        aMQPMessageImpl.setCorrelationId(correlationId);
        try {
            if (this.eventSource.isConnected()) {
                this.eventSource.publish(aMQPMessageImpl);
            }
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Detected error on AMQP receive", (Throwable) e);
        }
    }

    public void sendResponse(AMQPMessage aMQPMessage) {
        String replyTo = aMQPMessage.getReplyTo();
        String correlationId = aMQPMessage.getCorrelationId();
        long deliveryTag = aMQPMessage.getDeliveryTag();
        AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().correlationId(correlationId).build();
        byte[] bArr = new byte[0];
        try {
            try {
                bArr = this.function.apply(aMQPMessage).array();
                Channel channel = getChannel();
                try {
                    channel.basicPublish("", replyTo, build, bArr);
                    channel.basicAck(deliveryTag, false);
                } catch (IOException e) {
                    logger.log(Level.SEVERE, "Error sending answer for correlation id " + correlationId, (Throwable) e);
                }
            } catch (Exception e2) {
                logger.log(Level.SEVERE, "Error executing function for correlation id " + correlationId, (Throwable) e2);
                Channel channel2 = getChannel();
                try {
                    channel2.basicPublish("", replyTo, build, bArr);
                    channel2.basicAck(deliveryTag, false);
                } catch (IOException e3) {
                    logger.log(Level.SEVERE, "Error sending answer for correlation id " + correlationId, (Throwable) e3);
                }
            }
        } catch (Throwable th) {
            Channel channel3 = getChannel();
            try {
                channel3.basicPublish("", replyTo, build, bArr);
                channel3.basicAck(deliveryTag, false);
            } catch (IOException e4) {
                logger.log(Level.SEVERE, "Error sending answer for correlation id " + correlationId, (Throwable) e4);
            }
            throw th;
        }
    }

    public void close() throws InterruptedException {
        this.eventSource.close();
    }
}
