package org.gecko.rsa.provider;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.eclipse.emf.ecore.EObject;
import org.gecko.emf.osgi.ResourceSetFactory;
import org.gecko.osgi.messaging.Message;
import org.gecko.osgi.messaging.MessagingService;
import org.gecko.rsa.api.DeSerializationContext;
import org.gecko.rsa.api.DeSerializer;
import org.gecko.rsa.api.SerializationContext;
import org.gecko.rsa.api.Serializer;
import org.gecko.rsa.provider.marker.PushStreamMarker;
import org.gecko.rsa.provider.marker.VersionMarker;
import org.gecko.rsa.provider.ser.EObjectDeSerializer;
import org.gecko.rsa.provider.stream.EObjectInputStream;
import org.gecko.rsa.provider.stream.EObjectOutputStream;
import org.gecko.util.common.concurrent.NamedThreadFactory;
import org.gecko.util.pushstream.PushStreamContext;
import org.gecko.util.pushstream.distributed.DistributedEventSource;
import org.osgi.framework.ServiceException;
import org.osgi.framework.Version;
import org.osgi.util.promise.Deferred;
import org.osgi.util.promise.Promise;
import org.osgi.util.promise.PromiseFactory;
import org.osgi.util.pushstream.PushStream;

/* loaded from: input_file:org/gecko/rsa/provider/MessagingClientProxyHandler.class */
public class MessagingClientProxyHandler implements InvocationHandler {
    private static final Logger logger = Logger.getLogger(MessagingClientProxyHandler.class.getName());
    private final MessagingService messaging;
    private final String topicAddress;
    private final ClassLoader cl;
    private PushStream<Message> receiveData;
    private final Serializer<EObject, SerializationContext> serializer;
    private final DeSerializer<EObject, DeSerializationContext> deserializer;
    private final Class<?>[] interfaces;
    private Map<String, PushStream<Object>> receiveDataPS = new ConcurrentHashMap();
    private final Map<String, Deferred<ObjectInputStream>> deferredMap = new ConcurrentHashMap();
    private final ExecutorService psHandler = Executors.newFixedThreadPool(5, NamedThreadFactory.newNamedFactory("Remote-PS-Handler"));
    private final PromiseFactory pf = new PromiseFactory(Executors.newFixedThreadPool(2, NamedThreadFactory.newNamedFactory("messaging-invocation")));

    public MessagingClientProxyHandler(MessagingService messagingService, ResourceSetFactory resourceSetFactory, String str, ClassLoader classLoader, Class<?>[] clsArr) {
        this.receiveData = null;
        this.messaging = messagingService;
        this.interfaces = clsArr;
        this.serializer = new EObjectDeSerializer(resourceSetFactory);
        this.deserializer = new EObjectDeSerializer(resourceSetFactory);
        this.topicAddress = String.format(MessagingRSAEndpoint.MA_DATA_TOPIC, str);
        String format = String.format(MessagingRSAEndpoint.MA_DATA_RESPONSE_TOPIC, str);
        this.cl = classLoader;
        try {
            this.receiveData = messagingService.subscribe(format);
            this.receiveData.forEach(this::handleSingleDataResponse);
        } catch (Exception e) {
            logger.log(Level.SEVERE, String.format("Error subscribing to receiver topic '%s'", str));
        }
    }

    @Override // java.lang.reflect.InvocationHandler
    public Object invoke(Object obj, Method method, Object[] objArr) throws Throwable {
        if (isMethodCallFromInterface(method)) {
            return (Future.class.isAssignableFrom(method.getReturnType()) || CompletionStage.class.isAssignableFrom(method.getReturnType())) ? createFutureResult(method, objArr) : Promise.class.isAssignableFrom(method.getReturnType()) ? createPromiseResult(method, objArr) : handleSyncCall(method, objArr);
        }
        if (method.equals(Object.class.getMethod("equals", Object.class))) {
            return Boolean.valueOf(obj == objArr[0]);
        }
        return method.invoke(new Object(), objArr);
    }

    private boolean isMethodCallFromInterface(Method method) {
        Class<?> declaringClass = method.getDeclaringClass();
        for (Class<?> cls : this.interfaces) {
            if (cls.equals(declaringClass)) {
                return true;
            }
        }
        return false;
    }

    private void handleSingleDataResponse(Message message) {
        Deferred<ObjectInputStream> remove;
        try {
            EObjectInputStream eObjectInputStream = new EObjectInputStream(new ByteArrayInputStream(message.payload().array()), this.cl, this.deserializer);
            String str = (String) eObjectInputStream.readObject();
            synchronized (this.deferredMap) {
                remove = this.deferredMap.remove(str);
            }
            if (remove != null) {
                remove.resolve(eObjectInputStream);
            } else {
                logger.severe(String.format("Did not found a count down latch for id '%s'", str));
            }
        } catch (IOException e) {
            logger.log(Level.SEVERE, "Cannot create BasicInputStream from byte array", (Throwable) e);
        } catch (ClassNotFoundException e2) {
            logger.log(Level.SEVERE, "Cannot find class to read UUID", (Throwable) e2);
        }
    }

    private PushStream<?> handlePushstreamMarker(PushStreamMarker pushStreamMarker) throws Exception {
        String correlation = pushStreamMarker.getCorrelation();
        logger.log(Level.FINE, String.format("[%s] Received PushStreamMarker", correlation));
        String returnChannel = pushStreamMarker.getReturnChannel();
        String controlChannel = pushStreamMarker.getControlChannel();
        DistributedEventSource distributedEventSource = new DistributedEventSource(Object.class);
        Executors.newSingleThreadExecutor(NamedThreadFactory.newNamedFactory("Remote-PES-" + correlation)).submit(() -> {
            distributedEventSource.onClose(() -> {
                sendPushstreamControlMessage(null, PushStreamMarker.PSDataType.CLOSE, correlation, controlChannel);
                this.receiveDataPS.remove(pushStreamMarker.getCorrelation());
            });
            distributedEventSource.onConnect(() -> {
                sendPushstreamControlMessage(null, PushStreamMarker.PSDataType.OPEN, correlation, controlChannel);
            });
            distributedEventSource.onError(th -> {
                sendPushstreamControlMessage(th, PushStreamMarker.PSDataType.ERROR, correlation, controlChannel);
                this.receiveDataPS.remove(pushStreamMarker.getCorrelation());
            });
        });
        logger.log(Level.FINE, String.format("[%s] Subscribe distributed event source to '%s'", correlation, returnChannel));
        this.messaging.subscribe(returnChannel).forEach(message -> {
            try {
                handlePushstreamDataResponse(message, distributedEventSource);
            } catch (Exception e) {
                logger.log(Level.SEVERE, "Error handling message for remote pushstream", (Throwable) e);
            }
        });
        PushStream<Object> createPushStream = distributedEventSource.createPushStream((PushStreamContext) null);
        this.receiveDataPS.put(correlation, createPushStream);
        return createPushStream;
    }

    private void handlePushstreamDataResponse(Message message, DistributedEventSource<Object> distributedEventSource) {
        try {
            EObjectInputStream eObjectInputStream = new EObjectInputStream(new ByteArrayInputStream(message.payload().array()), this.cl, this.deserializer);
            try {
                String str = (String) eObjectInputStream.readObject();
                PushStreamMarker.PSDataType pSDataType = PushStreamMarker.PSDataType.values()[eObjectInputStream.readInt()];
                logger.log(Level.FINE, String.format("[%s] Received remote message of type %s", str, pSDataType));
                switch (pSDataType) {
                    case DATA:
                        Object readObject = eObjectInputStream.readObject();
                        this.psHandler.execute(() -> {
                            distributedEventSource.doExternalPublish(readObject);
                        });
                        break;
                    case ERROR:
                        Throwable th = (Throwable) eObjectInputStream.readObject();
                        this.psHandler.execute(() -> {
                            distributedEventSource.doExternalError(th);
                        });
                        break;
                    case CLOSE:
                        this.psHandler.execute(() -> {
                            distributedEventSource.doExternalClose();
                        });
                        break;
                }
                eObjectInputStream.close();
            } catch (Throwable th2) {
                try {
                    eObjectInputStream.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
                throw th2;
            }
        } catch (IOException e) {
            logger.log(Level.SEVERE, "Cannot create BasicInputStream from byte array", (Throwable) e);
        } catch (ClassNotFoundException e2) {
            logger.log(Level.SEVERE, "Cannot find class to read UUID", (Throwable) e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Object handleSyncCall(Method method, Object[] objArr) throws Throwable {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            EObjectOutputStream eObjectOutputStream = new EObjectOutputStream(byteArrayOutputStream, this.serializer);
            try {
                String uuid = UUID.randomUUID().toString();
                eObjectOutputStream.writeObject(uuid);
                eObjectOutputStream.writeObject(method.getName());
                eObjectOutputStream.writeObject(objArr);
                eObjectOutputStream.flush();
                ByteBuffer wrap = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
                Promise<ObjectInputStream> resolveResult = resolveResult(uuid);
                this.messaging.publish(this.topicAddress, wrap);
                Object value = resolveResult.map(this::readResult).getValue();
                eObjectOutputStream.close();
                if (value instanceof Throwable) {
                    throw ((Throwable) value);
                }
                return value;
            } finally {
            }
        } catch (Throwable th) {
            if (th instanceof ServiceException) {
                throw th;
            }
            throw new ServiceException("Error calling '" + this.topicAddress + "' method: " + method.getName(), 5, th);
        }
    }

    private Promise<ObjectInputStream> resolveResult(String str) {
        Deferred<ObjectInputStream> deferred = this.pf.deferred();
        synchronized (this.deferredMap) {
            this.deferredMap.put(str, deferred);
        }
        return deferred.getPromise().timeout(30000L);
    }

    private Object readResult(ObjectInputStream objectInputStream) {
        try {
            return readReplaceMarker(objectInputStream.readObject());
        } catch (IOException e) {
            throw new IllegalStateException(e);
        } catch (ClassNotFoundException e2) {
            throw new IllegalStateException(e2);
        }
    }

    private Object readReplaceMarker(Object obj) {
        if (obj instanceof VersionMarker) {
            return new Version(((VersionMarker) obj).getVersion());
        }
        if (!(obj instanceof PushStreamMarker)) {
            return obj;
        }
        PushStreamMarker pushStreamMarker = (PushStreamMarker) obj;
        try {
            return handlePushstreamMarker(pushStreamMarker);
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Error returning pushstream", (Throwable) e);
            return pushStreamMarker;
        }
    }

    private void sendPushstreamControlMessage(Object obj, PushStreamMarker.PSDataType pSDataType, String str, String str2) {
        logger.log(Level.FINE, String.format("[%s] Sending message of type %s to channel '%s'", str, pSDataType, str2));
        try {
            this.messaging.publish(str2, ByteBuffer.wrap(writeDataMessage(obj, pSDataType, str).toByteArray()));
        } catch (Exception e) {
            logger.log(Level.SEVERE, String.format("[%s] Error sending control message to upstream", str), (Throwable) e);
        }
    }

    private ByteArrayOutputStream writeDataMessage(Object obj, PushStreamMarker.PSDataType pSDataType, String str) throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try {
            EObjectOutputStream eObjectOutputStream = new EObjectOutputStream(byteArrayOutputStream, this.serializer);
            try {
                eObjectOutputStream.writeObject(str);
                eObjectOutputStream.writeInt(pSDataType.ordinal());
                if (obj != null) {
                    eObjectOutputStream.writeObject(obj);
                }
                eObjectOutputStream.flush();
                eObjectOutputStream.close();
                return byteArrayOutputStream;
            } finally {
            }
        } catch (Exception e) {
            logger.log(Level.SEVERE, String.format("[%s] Error creating message", str));
            throw e;
        }
    }

    private Object createFutureResult(final Method method, final Object[] objArr) {
        return CompletableFuture.supplyAsync(new Supplier<Object>() { // from class: org.gecko.rsa.provider.MessagingClientProxyHandler.1
            @Override // java.util.function.Supplier
            public Object get() {
                try {
                    return MessagingClientProxyHandler.this.handleSyncCall(method, objArr);
                } catch (RuntimeException e) {
                    throw e;
                } catch (Throwable th) {
                    throw new RuntimeException(th);
                }
            }
        });
    }

    private Object createPromiseResult(Method method, Object[] objArr) {
        Deferred deferred = this.pf.deferred();
        ((ExecutorService) this.pf.executor()).submit(() -> {
            try {
                deferred.resolve(handleSyncCall(method, objArr));
            } catch (Throwable th) {
                deferred.fail(th);
            }
        });
        return deferred.getPromise();
    }
}
