package org.gecko.adapter.amqp.pubsub.consumer;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.adapter.amqp.client.AMQPMessage;
import org.gecko.adapter.amqp.consumer.AMQPMessageImpl;
import org.gecko.core.pushstream.PushStreamContext;
import org.gecko.core.pushstream.PushStreamHelper;
import org.gecko.osgi.messaging.ReplyToPolicy;
import org.gecko.osgi.messaging.SimpleMessagingContext;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.PushStreamProvider;
import org.osgi.util.pushstream.SimplePushEventSource;

/* loaded from: input_file:org/gecko/adapter/amqp/pubsub/consumer/AMQPPubOnSubManyConsumer.class */
public class AMQPPubOnSubManyConsumer extends DefaultConsumer {
    private static final Logger logger = Logger.getLogger(AMQPPubOnSubManyConsumer.class.getName());
    private final Function<AMQPMessage, PushStream<ByteBuffer>> function;
    private final PushStreamProvider psp;
    private final SimplePushEventSource<AMQPMessage> eventSource;
    private PushStream<AMQPMessage> responseStream;

    /* renamed from: org.gecko.adapter.amqp.pubsub.consumer.AMQPPubOnSubManyConsumer$1, reason: invalid class name */
    /* loaded from: input_file:org/gecko/adapter/amqp/pubsub/consumer/AMQPPubOnSubManyConsumer$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$gecko$osgi$messaging$ReplyToPolicy = new int[ReplyToPolicy.values().length];

        static {
            try {
                $SwitchMap$org$gecko$osgi$messaging$ReplyToPolicy[ReplyToPolicy.SINGLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$gecko$osgi$messaging$ReplyToPolicy[ReplyToPolicy.MULTIPLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/gecko/adapter/amqp/pubsub/consumer/AMQPPubOnSubManyConsumer$POSContext.class */
    public static class POSContext {
        private AMQPMessage message;
        private ByteBuffer buffer;

        public POSContext(AMQPMessage aMQPMessage, ByteBuffer byteBuffer) {
            this.message = aMQPMessage;
            this.buffer = byteBuffer;
        }

        public byte[] getContent() {
            return this.buffer == null ? new byte[0] : this.buffer.array();
        }

        public String getReplyTo() {
            return this.message.getReplyTo();
        }

        public String getCorrelationId() {
            return this.message.getCorrelationId();
        }

        public long getDeliveryTag() {
            return this.message.getDeliveryTag();
        }

        public AMQP.BasicProperties getBasicProperties() {
            return new AMQP.BasicProperties.Builder().correlationId(getCorrelationId()).build();
        }
    }

    public AMQPPubOnSubManyConsumer(Channel channel, Function<AMQPMessage, PushStream<ByteBuffer>> function, PushStreamContext<AMQPMessage> pushStreamContext) {
        super(channel);
        this.psp = new PushStreamProvider();
        this.eventSource = (SimplePushEventSource) this.psp.buildSimpleEventSource(AMQPMessage.class).build();
        this.function = function;
        this.responseStream = PushStreamHelper.createPushStream(this.eventSource, pushStreamContext);
        this.responseStream.forEach(this::sendResponse);
    }

    public SimplePushEventSource<AMQPMessage> getEventSource() {
        return this.eventSource;
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        String routingKey = envelope.getRoutingKey();
        String exchange = envelope.getExchange();
        long deliveryTag = envelope.getDeliveryTag();
        String contentType = basicProperties.getContentType();
        ReplyToPolicy policy = ReplyToPolicy.getPolicy(basicProperties.getHeaders());
        String correlationId = basicProperties.getCorrelationId();
        String replyTo = basicProperties.getReplyTo();
        SimpleMessagingContext simpleMessagingContext = new SimpleMessagingContext();
        simpleMessagingContext.setReplyToPolicy(policy);
        AMQPMessageImpl aMQPMessageImpl = new AMQPMessageImpl(exchange == null ? "" : exchange, ByteBuffer.wrap(bArr), simpleMessagingContext);
        aMQPMessageImpl.setDeliveryTag(deliveryTag);
        aMQPMessageImpl.setExchange(exchange);
        aMQPMessageImpl.setRoutingKey(routingKey);
        aMQPMessageImpl.setContentType(contentType);
        aMQPMessageImpl.setReplyTo(replyTo);
        aMQPMessageImpl.setCorrelationId(correlationId);
        try {
            if (this.eventSource.isConnected()) {
                this.eventSource.publish(aMQPMessageImpl);
                getChannel().basicAck(deliveryTag, false);
            }
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Detected error on AMQP receive", (Throwable) e);
        }
    }

    public void sendResponse(AMQPMessage aMQPMessage) {
        try {
            PushStream map = this.function.apply(aMQPMessage).map(byteBuffer -> {
                return new POSContext(aMQPMessage, byteBuffer);
            });
            switch (AnonymousClass1.$SwitchMap$org$gecko$osgi$messaging$ReplyToPolicy[aMQPMessage.getContext().getReplyPolicy().ordinal()]) {
                case 1:
                    map.findFirst().thenAccept(optional -> {
                        optional.ifPresent(this::publishResponse);
                    });
                    map.close();
                    break;
                case 2:
                    map.forEach(this::publishResponse);
                    break;
            }
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Error executing function for correlation id " + aMQPMessage.getCorrelationId(), (Throwable) e);
        }
    }

    private void publishResponse(POSContext pOSContext) {
        try {
            getChannel().basicPublish("", pOSContext.getReplyTo(), pOSContext.getBasicProperties(), pOSContext.getContent());
        } catch (IOException e) {
            logger.log(Level.SEVERE, "Error sending answer for correlation id " + pOSContext.getCorrelationId(), (Throwable) e);
        }
    }

    public void close() throws InterruptedException {
        this.eventSource.close();
    }
}
