package org.gecko.adapter.amqp.consumer;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.adapter.amqp.client.AMQPMessage;
import org.gecko.adapter.amqp.jmx.AMQPConsumerMetric;
import org.gecko.core.pushstream.PushStreamContext;
import org.gecko.core.pushstream.PushStreamHelper;
import org.gecko.core.pushstream.SimplePushEventSourceContext;
import org.gecko.core.pushstream.source.AcknowledgingEventSource;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingContext;
import org.osgi.util.function.Predicate;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.SimplePushEventSource;

/* loaded from: input_file:org/gecko/adapter/amqp/consumer/AMQPAcknowledgingConsumer.class */
public class AMQPAcknowledgingConsumer extends DefaultConsumer {
    private static final Logger logger = Logger.getLogger(AMQPAcknowledgingConsumer.class.getName());
    protected final AcknowledgingEventSource<Message> eventSource;
    protected final String topic;
    protected AMQPConsumerMetric mbean;

    public AMQPAcknowledgingConsumer(Channel channel, String str, Predicate<Message> predicate, SimplePushEventSourceContext<Message> simplePushEventSourceContext) {
        super(channel);
        this.eventSource = PushStreamHelper.fromSimpleEventSource(PushStreamHelper.createSimpleEventSource(Message.class, simplePushEventSourceContext), (PushStreamContext) null).acknowledgeFilter(predicate).acknowledge(this::acknowledgeMessage).negativeAcknowledge(this::rejectMessage);
        this.topic = str;
    }

    public AMQPAcknowledgingConsumer(Channel channel, String str, SimplePushEventSource<Message> simplePushEventSource, Predicate<Message> predicate) {
        super(channel);
        this.eventSource = PushStreamHelper.fromSimpleEventSource(simplePushEventSource, (PushStreamContext) null).acknowledgeFilter(predicate).acknowledge(this::acknowledgeMessage).negativeAcknowledge(this::rejectMessage);
        this.topic = str;
    }

    public PushStream<Message> createPushstream(MessagingContext messagingContext) {
        return (PushStream) PushStreamHelper.configurePushStreamBuilder(this.eventSource, messagingContext).build();
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        String routingKey = envelope.getRoutingKey();
        String exchange = envelope.getExchange();
        long deliveryTag = envelope.getDeliveryTag();
        String contentType = basicProperties.getContentType();
        String correlationId = basicProperties.getCorrelationId();
        String replyTo = basicProperties.getReplyTo();
        logger.log(Level.FINE, "Received message: '" + new String(bArr) + "' with routingKey: " + routingKey + ", contentType: " + contentType + ", deliveryTag: " + deliveryTag);
        AMQPMessageImpl aMQPMessageImpl = new AMQPMessageImpl(this.topic, ByteBuffer.wrap(bArr));
        aMQPMessageImpl.setDeliveryTag(deliveryTag);
        aMQPMessageImpl.setExchange(exchange);
        aMQPMessageImpl.setRoutingKey(routingKey);
        aMQPMessageImpl.setContentType(contentType);
        aMQPMessageImpl.setReplyTo(replyTo);
        aMQPMessageImpl.setCorrelationId(correlationId);
        if (this.eventSource.isConnected()) {
            try {
                this.eventSource.publish(aMQPMessageImpl);
                if (this.mbean != null) {
                    this.mbean.setLastMessageTime(new Date());
                }
            } catch (Exception e) {
                logger.log(Level.SEVERE, "Detected error on AMQP receive", (Throwable) e);
            }
        }
    }

    protected void acknowledgeMessage(Message message) {
        long deliveryTag = ((AMQPMessage) message).getDeliveryTag();
        logger.log(Level.FINEST, "Acknowledge message with deliveryTag: " + deliveryTag);
        try {
            getChannel().basicAck(deliveryTag, false);
        } catch (IOException e) {
            logger.log(Level.SEVERE, "Detected error on acknowledging the message ", (Throwable) e);
        }
    }

    protected void rejectMessage(Message message) {
        long deliveryTag = ((AMQPMessage) message).getDeliveryTag();
        logger.log(Level.FINEST, "Reject message with deliveryTag: " + deliveryTag);
        try {
            getChannel().basicReject(deliveryTag, false);
        } catch (IOException e) {
            logger.log(Level.SEVERE, "Detected error on acknowledging the message ", (Throwable) e);
        }
    }

    public void close() {
        this.eventSource.close();
    }

    public AMQPConsumerMetric getMBean() {
        return this.mbean;
    }

    public void setMBean(AMQPConsumerMetric aMQPConsumerMetric) {
        this.mbean = aMQPConsumerMetric;
    }
}
