package org.gecko.adapter.eventadmin;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.logging.Logger;
import org.gecko.adapter.eventadmin.context.EventAdminMessagingContext;
import org.gecko.adapter.eventadmin.context.EventAdminMessagingContextBuilder;
import org.gecko.core.pushstream.PushStreamContext;
import org.gecko.core.pushstream.PushStreamHelper;
import org.gecko.core.pushstream.source.CallBackEventSource;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingContext;
import org.gecko.osgi.messaging.SimpleMessage;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.PrototypeServiceFactory;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.osgi.util.converter.Converters;
import org.osgi.util.converter.TypeReference;
import org.osgi.util.pushstream.PushEventConsumer;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.PushStreamBuilder;
import org.osgi.util.pushstream.PushStreamProvider;
import org.osgi.util.pushstream.PushbackPolicyOption;
import org.osgi.util.pushstream.QueuePolicyOption;
import org.osgi.util.pushstream.SimplePushEventSource;

@Component(name = "EventAdminTopicSubscription", service = {}, configurationPolicy = ConfigurationPolicy.REQUIRE)
/* loaded from: input_file:org/gecko/adapter/eventadmin/TopicEventHandler.class */
public class TopicEventHandler implements PrototypeServiceFactory<PushStream<Message>> {
    private static final Logger logger;
    private State state;
    private SimplePushEventSource<Message> eventSource;
    private BundleContext ctx;
    private ServiceRegistration<EventHandler> eventHandlerRegistration;
    private Runnable doAfterClose;
    private ReentrantLock lock;
    private String topic;
    private PushStreamProvider provider;
    private PushStreamContext<Message> messagingContext;
    private ServiceRegistration<?> pushStreamFactoryServiceRegistration;
    static final /* synthetic */ boolean $assertionsDisabled;

    @ObjectClassDefinition
    /* loaded from: input_file:org/gecko/adapter/eventadmin/TopicEventHandler$Config.class */
    @interface Config {
        String topic();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/gecko/adapter/eventadmin/TopicEventHandler$State.class */
    public enum State {
        NEW,
        CONNECTED,
        CLOSED,
        DISPOSED
    }

    public TopicEventHandler() {
        this.state = State.NEW;
        this.lock = new ReentrantLock();
        this.provider = new PushStreamProvider();
    }

    public TopicEventHandler(String str, BundleContext bundleContext, Runnable runnable, MessagingContext messagingContext) {
        this();
        this.topic = str;
        this.ctx = bundleContext;
        this.doAfterClose = runnable;
        this.messagingContext = messagingContext;
        this.eventSource = new CallBackEventSource((SimplePushEventSource) this.provider.buildSimpleEventSource(Message.class).withQueuePolicy(QueuePolicyOption.BLOCK).build(), this::openAndConnectIfNecessary, (Consumer) null, this::consumerClosed);
    }

    @Activate
    public void activate(Config config, Map<String, Object> map, ComponentContext componentContext) {
        this.topic = config.topic();
        this.ctx = componentContext.getBundleContext();
        this.messagingContext = PushStreamHelper.getPushStreamContext(map);
        this.eventSource = new CallBackEventSource((SimplePushEventSource) this.provider.buildSimpleEventSource(Message.class).withQueuePolicy(QueuePolicyOption.BLOCK).build(), this::openAndConnectIfNecessary, (Consumer) null, (BiConsumer) null);
        this.pushStreamFactoryServiceRegistration = this.ctx.registerService(PushStream.class.getName(), this, (Dictionary) Converters.standardConverter().convert(map).to(new TypeReference<Dictionary<String, Object>>() { // from class: org.gecko.adapter.eventadmin.TopicEventHandler.1
        }));
    }

    @Deactivate
    public void deactivate() {
        this.lock.lock();
        try {
            this.state = State.CLOSED;
            if (this.pushStreamFactoryServiceRegistration != null) {
                this.pushStreamFactoryServiceRegistration.unregister();
            }
            if (this.eventHandlerRegistration != null) {
                this.eventHandlerRegistration.unregister();
            }
            this.eventSource.close();
            this.state = State.DISPOSED;
        } finally {
            this.lock.unlock();
        }
    }

    public void handle(Event event) {
        logger.fine("Handle incomming event " + event.toString());
        String topic = event.getTopic();
        ByteBuffer byteBuffer = (ByteBuffer) event.getProperty(EventAdminMessageService.CONTENT);
        if (!$assertionsDisabled && this.eventSource == null) {
            throw new AssertionError();
        }
        HashMap hashMap = new HashMap();
        Arrays.asList(event.getPropertyNames()).stream().filter(str -> {
            return !EventAdminMessageService.CONTENT.equals(str);
        }).forEach(str2 -> {
            hashMap.put(str2, event.getProperty(str2));
        });
        this.eventSource.publish(new SimpleMessage(topic, byteBuffer, translateMapIntoContext(event)));
        logger.fine("finished Handing of incomming event " + event.toString());
    }

    public void openAndConnectIfNecessary(CallBackEventSource<Message> callBackEventSource, PushEventConsumer<? super Message> pushEventConsumer) {
        logger.fine("Opening eventSource");
        if (this.state == State.NEW) {
            this.lock.lock();
            try {
                if (this.state == State.NEW) {
                    logger.fine("registering Eventhandler for topic " + this.topic);
                    Hashtable hashtable = new Hashtable();
                    hashtable.put("event.topics", new String[]{this.topic});
                    this.eventHandlerRegistration = this.ctx.registerService(EventHandler.class, new EventHandler() { // from class: org.gecko.adapter.eventadmin.TopicEventHandler.2
                        public void handleEvent(Event event) {
                            TopicEventHandler.this.handle(event);
                        }
                    }, hashtable);
                    this.state = State.CONNECTED;
                }
            } finally {
                this.lock.unlock();
            }
        }
    }

    public PushStream<Message> registerPushStream() {
        this.lock.lock();
        try {
            if (this.state == State.CLOSED || this.state == State.DISPOSED) {
                return null;
            }
            PushStreamBuilder withPushbackPolicy = PushStreamHelper.configurePushStreamBuilder(this.eventSource, this.messagingContext).withPushbackPolicy(PushbackPolicyOption.ON_FULL_FIXED, 10L);
            if (this.messagingContext.getBufferQueue() == null) {
                withPushbackPolicy.withBuffer(new ArrayBlockingQueue(this.messagingContext.getBufferSize() > 0 ? this.messagingContext.getBufferSize() : 1000));
            }
            return (PushStream) withPushbackPolicy.build();
        } finally {
            this.lock.unlock();
        }
    }

    private void consumerClosed(CallBackEventSource<Message> callBackEventSource, PushEventConsumer<? super Message> pushEventConsumer) {
        logger.fine("closing event Source");
        this.lock.lock();
        try {
            if (this.state == State.CONNECTED && !callBackEventSource.isConnected()) {
                this.state = State.CLOSED;
                this.eventHandlerRegistration.unregister();
                this.eventHandlerRegistration = null;
                if (this.doAfterClose != null) {
                    this.doAfterClose.run();
                }
                this.state = State.DISPOSED;
            }
        } finally {
            this.lock.unlock();
        }
    }

    public PushStream<Message> getService(Bundle bundle, ServiceRegistration<PushStream<Message>> serviceRegistration) {
        return registerPushStream();
    }

    public void ungetService(Bundle bundle, ServiceRegistration<PushStream<Message>> serviceRegistration, PushStream<Message> pushStream) {
        pushStream.close();
    }

    private EventAdminMessagingContext translateMapIntoContext(Event event) {
        EventAdminMessagingContextBuilder builder = EventAdminMessagingContextBuilder.builder();
        if (event.containsProperty(EventAdminMessageService.HEADERS_PREFIX)) {
            builder.headers((Map) event.getProperty(EventAdminMessageService.HEADERS_PREFIX));
        }
        builder.contentEncoding((String) event.getProperty("content.encoding"));
        builder.contentType((String) event.getProperty("content.type"));
        builder.correlationId((String) event.getProperty("correlation.id"));
        builder.queue((String) event.getProperty("queue.name"));
        builder.replyTo((String) event.getProperty("reply.address"));
        return (EventAdminMessagingContext) builder.build();
    }

    public /* bridge */ /* synthetic */ void ungetService(Bundle bundle, ServiceRegistration serviceRegistration, Object obj) {
        ungetService(bundle, (ServiceRegistration<PushStream<Message>>) serviceRegistration, (PushStream<Message>) obj);
    }

    /* renamed from: getService, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ Object m2getService(Bundle bundle, ServiceRegistration serviceRegistration) {
        return getService(bundle, (ServiceRegistration<PushStream<Message>>) serviceRegistration);
    }

    static {
        $assertionsDisabled = !TopicEventHandler.class.desiredAssertionStatus();
        logger = Logger.getLogger(TopicEventHandler.class.getName());
    }
}
