package org.eclipse.gyrex.eventbus.internal;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.gyrex.cloud.services.events.EventMessage;
import org.eclipse.gyrex.cloud.services.events.IEventReceiver;
import org.eclipse.gyrex.cloud.services.events.IEventTransport;
import org.eclipse.gyrex.common.services.IServiceProxy;
import org.eclipse.gyrex.eventbus.IEventBus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/gyrex/eventbus/internal/EventService.class */
public class EventService implements IEventBus {
    private static final Logger LOG = LoggerFactory.getLogger(EventService.class);
    private final String nodeId;
    private final IServiceProxy<IEventTransport> transportServiceProxy;
    private volatile boolean disposed;
    private final ReflectionService reflectionService = new ReflectionService();
    private final AtomicLong eventCounter = new AtomicLong(0);
    private final ExecutorService sendExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { // from class: org.eclipse.gyrex.eventbus.internal.EventService.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, "EventService-SendThread");
            thread.setDaemon(true);
            return thread;
        }
    });
    private final ConcurrentMap<Topic, TopicEventReceiver> activeTopics = new ConcurrentHashMap();

    /* loaded from: input_file:org/eclipse/gyrex/eventbus/internal/EventService$SendEvent.class */
    class SendEvent implements Runnable {
        private final String topciId;
        private final EventMessage message;

        SendEvent(String str, EventMessage eventMessage) {
            this.topciId = str;
            this.message = eventMessage;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                EventService.LOG.trace("Sending event ({}, topic {}).", this.message.getId(), this.topciId);
                EventService.this.getTransport().sendEvent(this.topciId, this.message, Collections.emptyMap());
            } catch (AssertionError | LinkageError | RuntimeException e) {
                EventService.LOG.warn("Unable to send event ({}) for topic ({}). Event discarded. {}", new Object[]{this.message.getId(), this.topciId, e.getMessage(), e});
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/eclipse/gyrex/eventbus/internal/EventService$TopicEventReceiver.class */
    public static class TopicEventReceiver implements IEventReceiver {
        private final Topic topic;

        TopicEventReceiver(Topic topic) {
            this.topic = topic;
        }

        public void receiveEvent(EventMessage eventMessage) {
            this.topic.dispatchEvent(eventMessage);
        }
    }

    public EventService(String str, IServiceProxy<IEventTransport> iServiceProxy) {
        this.nodeId = (String) Preconditions.checkNotNull(str);
        this.transportServiceProxy = (IServiceProxy) Preconditions.checkNotNull(iServiceProxy);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void activateTopic(Topic topic) {
        checkDisposed();
        TopicEventReceiver topicEventReceiver = new TopicEventReceiver(topic);
        if (this.activeTopics.putIfAbsent(topic, topicEventReceiver) == null) {
            LOG.debug("Subscribing topic ({}).", topic);
            getTransport().subscribeTopic(topic.getId(), topicEventReceiver, (Map) null);
        }
    }

    private void checkDisposed() {
        Preconditions.checkState(!this.disposed, "disposed");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deactivateTopic(Topic topic) {
        TopicEventReceiver remove = this.activeTopics.remove(topic);
        if (remove != null) {
            LOG.debug("Unsubscribing topic ({}).", topic);
            getTransport().unsubscribeTopic(topic.getId(), remove, (Map) null);
        }
    }

    public void dispose() {
        this.disposed = true;
        while (this.activeTopics.size() > 0) {
            deactivateTopic(this.activeTopics.keySet().iterator().next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReflectionService getReflectionService() {
        return this.reflectionService;
    }

    @Override // org.eclipse.gyrex.eventbus.IEventBus
    public TopicBuilder getTopic(String str) throws IllegalArgumentException, IllegalStateException {
        checkDisposed();
        return new TopicBuilder(str, this).addSerializer(ByteArrayDeSerializer.sharedInstance).addSerializer(ByteBufferDeSerializer.sharedInstance).addSerializer(StringDeSerializer.sharedInstance).addDeserializer(ByteArrayDeSerializer.sharedInstance).addDeserializer(ByteBufferDeSerializer.sharedInstance).addDeserializer(StringDeSerializer.sharedInstance);
    }

    @VisibleForTesting
    IEventTransport getTransport() {
        return (IEventTransport) this.transportServiceProxy.getService();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String newEventId() {
        return String.valueOf(this.nodeId) + "-" + nextId();
    }

    long nextId() {
        long j;
        long j2;
        do {
            j = this.eventCounter.get();
            j2 = j == Long.MAX_VALUE ? 0L : j + 1;
        } while (!this.eventCounter.compareAndSet(j, j2));
        return j2;
    }

    public void queueEvent(String str, EventMessage eventMessage) {
        if (!this.transportServiceProxy.isAvailable()) {
            LOG.trace("Discarding event ({}, topic {}). No transport available.", eventMessage.getId(), str);
        } else {
            LOG.trace("Queuing event ({}, topic {}) for delivery.", eventMessage.getId(), str);
            this.sendExecutor.execute(new SendEvent(str, eventMessage));
        }
    }
}
