/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.spi.impl;

import io.moquette.parser.proto.messages.AbstractMessage;
import io.moquette.parser.proto.messages.PublishMessage;
import io.moquette.server.ConnectionDescriptor;
import io.moquette.spi.ClientSession;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.ISessionsStore;
import io.moquette.spi.MessageGUID;
import io.moquette.spi.impl.PersistentQueueMessageSender;
import io.moquette.spi.impl.ProtocolProcessor;
import io.moquette.spi.impl.subscriptions.Subscription;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class MessagesPublisher {
    private static final Logger LOG = LoggerFactory.getLogger(MessagesPublisher.class);
    private final ConcurrentMap<String, ConnectionDescriptor> connectionDescriptors;
    private final ISessionsStore m_sessionsStore;
    private final IMessagesStore m_messagesStore;
    private final PersistentQueueMessageSender messageSender;

    public MessagesPublisher(ConcurrentMap<String, ConnectionDescriptor> connectionDescriptors, ISessionsStore sessionsStore, IMessagesStore messagesStore, PersistentQueueMessageSender messageSender) {
        this.connectionDescriptors = connectionDescriptors;
        this.m_sessionsStore = sessionsStore;
        this.m_messagesStore = messagesStore;
        this.messageSender = messageSender;
    }

    static PublishMessage notRetainedPublish(String topic, AbstractMessage.QOSType qos, ByteBuffer message) {
        PublishMessage pubMessage = new PublishMessage();
        pubMessage.setRetainFlag(false);
        pubMessage.setTopicName(topic);
        pubMessage.setQos(qos);
        pubMessage.setPayload(message);
        return pubMessage;
    }

    void publish2Subscribers(IMessagesStore.StoredMessage pubMsg, List<Subscription> topicMatchingSubscriptions) {
        String topic = pubMsg.getTopic();
        AbstractMessage.QOSType publishingQos = pubMsg.getQos();
        ByteBuffer origMessage = pubMsg.getMessage();
        MessageGUID guid = null;
        if (publishingQos != AbstractMessage.QOSType.MOST_ONE) {
            guid = this.m_messagesStore.storePublishForFuture(pubMsg);
        }
        LOG.trace("Found {} matching subscriptions to <{}>", (Object)topicMatchingSubscriptions.size(), (Object)topic);
        for (Subscription sub : topicMatchingSubscriptions) {
            AbstractMessage.QOSType qos = ProtocolProcessor.lowerQosToTheSubscriptionDesired(sub, publishingQos);
            ClientSession targetSession = this.m_sessionsStore.sessionForClient(sub.getClientId());
            boolean targetIsActive = this.connectionDescriptors.containsKey(sub.getClientId());
            LOG.debug("Broker republishing to client <{}> topicFilter <{}> qos <{}>, active {}", new Object[]{sub.getClientId(), sub.getTopicFilter(), qos, targetIsActive});
            ByteBuffer message = origMessage.duplicate();
            if (targetIsActive) {
                PublishMessage publishMsg = MessagesPublisher.notRetainedPublish(topic, qos, message);
                if (qos != AbstractMessage.QOSType.MOST_ONE) {
                    int messageId = targetSession.nextPacketId();
                    targetSession.inFlightAckWaiting(guid, messageId);
                    publishMsg.setMessageID(messageId);
                }
                this.messageSender.sendPublish(targetSession, publishMsg);
                continue;
            }
            if (targetSession.isCleanSession()) continue;
            targetSession.enqueue(pubMsg);
        }
    }
}

