package org.gecko.adapter.can.impl;

import com.github.kayak.core.Bus;
import com.github.kayak.core.BusURL;
import com.github.kayak.core.Frame;
import com.github.kayak.core.FrameListener;
import com.github.kayak.core.Subscription;
import com.github.kayak.core.TimeSource;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.gecko.core.pushstream.PushStreamHelper;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingContext;
import org.gecko.osgi.messaging.MessagingService;
import org.gecko.osgi.messaging.SimpleMessage;
import org.osgi.annotation.bundle.Capability;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.osgi.util.pushstream.PushStream;
import org.osgi.util.pushstream.PushStreamProvider;
import org.osgi.util.pushstream.SimplePushEventSource;

@Capability(namespace = "osgi.message.adapter", name = "can.adapter", version = "1.0.0", attribute = {"vendor=Gecko.io", "implementation=Process"})
@Component(service = {MessagingService.class}, name = "CanService", configurationPolicy = ConfigurationPolicy.REQUIRE, immediate = true)
/* loaded from: input_file:org/gecko/adapter/can/impl/CanService.class */
public class CanService implements MessagingService, AutoCloseable, FrameListener {
    private static final Logger logger = Logger.getLogger(CanService.class.getName());
    public static final String DEFAULT_CAN_DEVICE = "can0";
    public static final int DEFAULT_CAND_PORT = 29536;
    public static final String DEFAULT_CAND_HOST = "localhost";
    private PushStreamProvider provider = new PushStreamProvider();
    private Map<Integer, SimplePushEventSource<Message>> subscriptions = new ConcurrentHashMap();
    private Bus bus = new Bus();
    private final TimeSource timeSource = new TimeSource();
    private int objectId;

    @ObjectClassDefinition
    /* loaded from: input_file:org/gecko/adapter/can/impl/CanService$CanConfig.class */
    @interface CanConfig {
        String candHost() default "localhost";

        int candPort() default 29536;

        String interfaceDevice() default "can0";

        int objectId() default 1;
    }

    @Activate
    void activate(CanConfig canConfig, BundleContext bundleContext) throws Exception {
        String interfaceDevice = canConfig.interfaceDevice();
        String candHost = canConfig.candHost();
        int candPort = canConfig.candPort();
        this.objectId = canConfig.objectId();
        BusURL busURL = new BusURL(candHost, candPort, interfaceDevice);
        this.bus = new Bus();
        this.bus.setConnection(busURL);
        this.bus.setTimeSource(this.timeSource);
    }

    @Deactivate
    void deactivate() throws Exception {
        close();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.timeSource != null) {
            this.timeSource.stop();
        }
        if (this.bus != null) {
            String bus = this.bus.getConnection().getBus();
            this.bus.disconnect();
            this.bus.destroy();
            logger.log(Level.INFO, "Stopped CAN bus '{0}'", bus);
        }
        this.subscriptions.forEach((num, simplePushEventSource) -> {
            simplePushEventSource.close();
        });
        this.subscriptions.clear();
    }

    public PushStream<Message> subscribe(String str) throws Exception {
        return subscribe(str, null);
    }

    public PushStream<Message> subscribe(String str, MessagingContext messagingContext) throws Exception {
        startTimeSource();
        try {
            int parseObjectId = parseObjectId(str, this.objectId);
            SimplePushEventSource<Message> simplePushEventSource = this.subscriptions.get(Integer.valueOf(parseObjectId));
            if (simplePushEventSource == null) {
                Subscription subscription = new Subscription(this, this.bus);
                subscription.subscribe(parseObjectId, false);
                this.bus.addSubscription(subscription);
                simplePushEventSource = (SimplePushEventSource) this.provider.buildSimpleEventSource(Message.class).build();
                this.subscriptions.put(Integer.valueOf(parseObjectId), simplePushEventSource);
            }
            return (PushStream) PushStreamHelper.configurePushStreamBuilder(simplePushEventSource, messagingContext).build();
        } catch (Exception e) {
            throw new Exception(e.getMessage(), e);
        }
    }

    public void publish(String str, ByteBuffer byteBuffer) throws Exception {
        publish(str, byteBuffer, null);
    }

    public void publish(String str, ByteBuffer byteBuffer, MessagingContext messagingContext) throws Exception {
        startTimeSource();
        if (byteBuffer.array().length > 8) {
            throw new IllegalArgumentException("Content for the can bus must be at most 8 byte long");
        }
        Frame frame = new Frame(parseObjectId(str, this.objectId), false, byteBuffer.array());
        frame.setTimestamp(this.timeSource.getTime());
        this.bus.sendFrame(frame);
    }

    @Override // com.github.kayak.core.FrameListener
    public void newFrame(Frame frame) {
        Iterator<Map.Entry<Integer, SimplePushEventSource<Message>>> it = this.subscriptions.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, SimplePushEventSource<Message>> next = it.next();
            if (next.getKey().equals(Integer.valueOf(frame.getIdentifier()))) {
                SimplePushEventSource<Message> value = next.getValue();
                if (value.isConnected()) {
                    try {
                        value.publish(fromCanMessage(frame));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                } else {
                    value.close();
                    it.remove();
                }
            }
        }
    }

    private void startTimeSource() {
        if (this.timeSource.getMode().equals(TimeSource.Mode.PLAY)) {
            return;
        }
        this.timeSource.play();
    }

    private static int parseObjectId(String str, int i) {
        int i2;
        try {
            i2 = Integer.parseInt(str);
        } catch (NumberFormatException e) {
            i2 = i;
            logger.warning("Cannot parse the topic to a integer, ignoring it and taking default client id " + i);
        }
        return i2;
    }

    private static Message fromCanMessage(Frame frame) {
        return new SimpleMessage(Integer.valueOf(frame.getIdentifier()).toString(), ByteBuffer.wrap(frame.getData()));
    }
}
