package org.gecko.adapter.amqp.consumer;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.adapter.amqp.client.AMQPMessage;
import org.gecko.osgi.messaging.Message;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;

/* loaded from: input_file:org/gecko/adapter/amqp/consumer/AMQPRPCConsumer.class */
public class AMQPRPCConsumer extends DefaultConsumer {
    private static final Logger logger = Logger.getLogger(AMQPRPCConsumer.class.getName());
    private final String correlationId;
    private final Deferred<AMQPMessage> messageDeferred;
    private Promise<AMQPMessage> message;

    public AMQPRPCConsumer(Channel channel, String str) {
        super(channel);
        this.messageDeferred = new Deferred<>();
        this.message = this.messageDeferred.getPromise();
        this.correlationId = str;
    }

    public Promise<Message> getMessage() {
        return this.message.map(aMQPMessage -> {
            return aMQPMessage;
        });
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        String routingKey = envelope.getRoutingKey();
        String exchange = envelope.getExchange();
        long deliveryTag = envelope.getDeliveryTag();
        String contentType = basicProperties.getContentType();
        String messageId = basicProperties.getMessageId();
        String correlationId = basicProperties.getCorrelationId();
        if (correlationId == null || !correlationId.equals(this.correlationId)) {
            logger.log(Level.SEVERE, "This message does not fit to the correlation id, given by the request");
            return;
        }
        String replyTo = basicProperties.getReplyTo();
        logger.log(Level.FINE, "Received message: '" + new String(bArr) + "' with routingKey: " + routingKey + ", contentType: " + contentType + ", deliveryTag: " + deliveryTag);
        try {
            AMQPMessageImpl aMQPMessageImpl = new AMQPMessageImpl(exchange == null ? "" : exchange, ByteBuffer.wrap(bArr));
            aMQPMessageImpl.setDeliveryTag(deliveryTag);
            aMQPMessageImpl.setExchange(exchange);
            aMQPMessageImpl.setRoutingKey(routingKey);
            aMQPMessageImpl.setContentType(contentType);
            aMQPMessageImpl.setReplyTo(replyTo);
            aMQPMessageImpl.setCorrelationId(correlationId);
            aMQPMessageImpl.setMessageId(messageId);
            this.messageDeferred.resolve(aMQPMessageImpl);
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Detected error on AMQP receive", (Throwable) e);
        }
    }
}
