package org.gecko.adapter.amqp.api.rpc;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.adapter.amqp.api.BasicReSubscribeConsumer;
import org.gecko.adapter.amqp.api.WorkerFunction;
import org.gecko.adapter.amqp.client.AMQPContext;
import org.gecko.adapter.amqp.client.AMQPMessage;
import org.gecko.adapter.amqp.consumer.AMQPHelper;
import org.gecko.util.common.concurrent.NamedThreadFactory;
import org.osgi.service.component.ComponentServiceObjects;
import org.osgi.util.promise.PromiseFactory;

/* loaded from: input_file:org/gecko/adapter/amqp/api/rpc/BasicRPCWorkerConsumer.class */
public class BasicRPCWorkerConsumer extends BasicReSubscribeConsumer<Void> {
    private static final Logger logger = Logger.getLogger(BasicRPCWorkerConsumer.class.getName());
    private final ComponentServiceObjects<WorkerFunction> functionCSO;
    private final PromiseFactory pf;
    private final AtomicInteger currentWorker;
    private int maxWorkerCount;

    public BasicRPCWorkerConsumer(Channel channel, String str, AMQPContext aMQPContext, ComponentServiceObjects<WorkerFunction> componentServiceObjects, PromiseFactory promiseFactory) {
        super(channel, str, aMQPContext);
        this.currentWorker = new AtomicInteger();
        this.maxWorkerCount = 3;
        this.pf = Objects.isNull(promiseFactory) ? new PromiseFactory(Executors.newCachedThreadPool(NamedThreadFactory.newNamedFactory(String.format("RPSWorker-%s", str)))) : promiseFactory;
        Objects.requireNonNull(componentServiceObjects);
        this.functionCSO = componentServiceObjects;
    }

    public BasicRPCWorkerConsumer(Channel channel, String str, AMQPContext aMQPContext, ComponentServiceObjects<WorkerFunction> componentServiceObjects) {
        this(channel, str, aMQPContext, componentServiceObjects, null);
    }

    public void setMaxWorkerCount(int i) {
        this.maxWorkerCount = i;
    }

    @Override // org.gecko.adapter.amqp.api.BasicReSubscribeConsumer, org.gecko.adapter.amqp.api.ReSubscribeCallback
    public boolean shouldCloseConsumer() throws IOException {
        return this.currentWorker.get() == this.maxWorkerCount;
    }

    @Override // org.gecko.adapter.amqp.api.BasicReSubscribeConsumer, org.gecko.adapter.amqp.api.ReSubscribeCallback
    public boolean shouldResubscribe() {
        return this.currentWorker.get() < this.maxWorkerCount;
    }

    @Override // org.gecko.adapter.amqp.api.BasicReSubscribeConsumer
    protected void doHandleDelivery(String str, Envelope envelope, BasicProperties basicProperties, byte[] bArr) throws IOException {
        long deliveryTag = envelope.getDeliveryTag();
        String messageId = basicProperties.getMessageId();
        this.pf.submit(() -> {
            this.currentWorker.incrementAndGet();
            setProgress(true);
            return AMQPHelper.createMessage(envelope, basicProperties, ByteBuffer.wrap(bArr));
        }).map((v1) -> {
            return handleWorker(v1);
        }).thenAccept(this::sendResponse).thenAccept(aMQPMessage -> {
            setProgress(this.currentWorker.decrementAndGet() > 0);
            checkResubscribe();
        }).onFailure(th -> {
            getChannel().basicReject(deliveryTag, false);
            logger.log(Level.SEVERE, th, () -> {
                return String.format("Cannot send response for message '%s' on consumer '%s'", messageId, str);
            });
        });
    }

    protected AMQPMessage handleWorker(AMQPMessage aMQPMessage) {
        WorkerFunction workerFunction = (WorkerFunction) this.functionCSO.getService();
        try {
            AMQPMessage createFromMessage = AMQPHelper.createFromMessage(aMQPMessage, workerFunction.apply(aMQPMessage));
            this.functionCSO.ungetService(workerFunction);
            return createFromMessage;
        } catch (Throwable th) {
            this.functionCSO.ungetService(workerFunction);
            throw th;
        }
    }

    protected void sendResponse(AMQPMessage aMQPMessage) throws IOException {
        String replyTo = aMQPMessage.getReplyTo();
        String correlationId = aMQPMessage.getCorrelationId();
        String messageId = aMQPMessage.getMessageId();
        String exchange = aMQPMessage.getExchange();
        if (Objects.isNull(exchange) || exchange.isBlank()) {
        }
        AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().correlationId(correlationId).messageId(messageId).timestamp(new Date()).build();
        byte[] array = aMQPMessage.payload().array();
        Channel channel = getChannel();
        if (!channel.isOpen()) {
            throw new IllegalStateException("Channel is not open anymore");
        }
        channel.basicPublish("", replyTo, build, array);
    }
}
