package org.gecko.adapter.eventadmin;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.logging.Logger;
import org.gecko.adapter.eventadmin.context.EventAdminMessagingContext;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingContext;
import org.gecko.osgi.messaging.MessagingService;
import org.gecko.osgi.messaging.SimpleMessagingContextBuilder;
import org.osgi.annotation.bundle.Capability;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.util.pushstream.PushStream;

@Capability(namespace = "osgi.message.adapter", name = "eventadmin.adapter", version = "1.0.0", attribute = {"vendor=Gecko.io", "implementation=Eventadmin"})
@Component
/* loaded from: input_file:org/gecko/adapter/eventadmin/EventAdminMessageService.class */
public class EventAdminMessageService implements MessagingService {
    public static final String HEADERS_PREFIX = "headers";
    public static final String CONTENT = "content";

    @Reference
    private EventAdmin eventAdmin;
    private static final Logger logger = Logger.getLogger(EventAdminMessageService.class.getName());
    private Map<String, TopicEventHandler> subscribtions = new ConcurrentHashMap();
    private BundleContext bundleContext;

    @Activate
    public void activate(ComponentContext componentContext, Map<String, Object> map) {
        this.bundleContext = componentContext.getBundleContext();
    }

    @Deactivate
    public void deactivate() {
        Iterator<Map.Entry<String, TopicEventHandler>> it = this.subscribtions.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().deactivate();
            it.remove();
        }
    }

    public PushStream<Message> subscribe(String str) throws Exception {
        return subscribe(str, new SimpleMessagingContextBuilder().build());
    }

    public PushStream<Message> subscribe(String str, MessagingContext messagingContext) throws Exception {
        String generateKey = generateKey(str, messagingContext);
        TopicEventHandler topicEventHandler = new TopicEventHandler(str, this.bundleContext, () -> {
            this.subscribtions.remove(generateKey);
        }, messagingContext);
        TopicEventHandler putIfAbsent = this.subscribtions.putIfAbsent(generateKey, topicEventHandler);
        if (putIfAbsent == null) {
            logger.info("Adding subsciption to " + generateKey);
            putIfAbsent = topicEventHandler;
        }
        PushStream<Message> registerPushStream = putIfAbsent.registerPushStream();
        return registerPushStream == null ? subscribe(str, messagingContext) : registerPushStream;
    }

    private String generateKey(String str, MessagingContext messagingContext) {
        return str;
    }

    public void publish(String str, ByteBuffer byteBuffer) throws Exception {
        this.eventAdmin.postEvent(new Event(str, Collections.singletonMap(CONTENT, byteBuffer)));
    }

    public void publish(String str, ByteBuffer byteBuffer, MessagingContext messagingContext) throws Exception {
        HashMap hashMap = new HashMap();
        translateContextIntoMap(messagingContext, hashMap);
        hashMap.put(CONTENT, byteBuffer);
        this.eventAdmin.postEvent(new Event(str, hashMap));
    }

    private void translateContextIntoMap(MessagingContext messagingContext, Map<String, Object> map) {
        messagingContext.getClass();
        putIfPresent(messagingContext::getContentEncoding, "content.encoding", map);
        messagingContext.getClass();
        putIfPresent(messagingContext::getContentType, "content.type", map);
        messagingContext.getClass();
        putIfPresent(messagingContext::getCorrelationId, "correlation.id", map);
        messagingContext.getClass();
        putIfPresent(messagingContext::getQueueName, "queue.name", map);
        messagingContext.getClass();
        putIfPresent(messagingContext::getReplyAddress, "reply.address", map);
        messagingContext.getClass();
        putIfPresent(messagingContext::getRoutingKey, "routing.key", map);
        if (messagingContext instanceof EventAdminMessagingContext) {
            map.put(HEADERS_PREFIX, ((EventAdminMessagingContext) messagingContext).getHeaders());
        }
    }

    private void putIfPresent(Supplier<Object> supplier, String str, Map<String, Object> map) {
        Optional ofNullable = Optional.ofNullable(supplier.get());
        map.getClass();
        ofNullable.ifPresent(partial((v1, v2) -> {
            return r1.put(v1, v2);
        }, str));
    }

    public static Consumer<Object> partial(BiFunction<String, Object, Object> biFunction, String str) {
        return obj -> {
            biFunction.apply(str, obj);
        };
    }
}
