/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.spi.impl;

import io.moquette.parser.proto.messages.AbstractMessage;
import io.moquette.parser.proto.messages.PubCompMessage;
import io.moquette.parser.proto.messages.PubRecMessage;
import io.moquette.parser.proto.messages.PubRelMessage;
import io.moquette.parser.proto.messages.PublishMessage;
import io.moquette.server.ConnectionDescriptor;
import io.moquette.server.netty.NettyUtils;
import io.moquette.spi.ClientSession;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.ISessionsStore;
import io.moquette.spi.MessageGUID;
import io.moquette.spi.impl.BrokerInterceptor;
import io.moquette.spi.impl.DebugUtils;
import io.moquette.spi.impl.MessagesPublisher;
import io.moquette.spi.impl.ProtocolProcessor;
import io.moquette.spi.impl.Qos1PublishHandler;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.impl.subscriptions.SubscriptionsStore;
import io.moquette.spi.security.IAuthorizator;
import io.netty.channel.Channel;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class Qos2PublishHandler {
    private static final Logger LOG = LoggerFactory.getLogger(Qos1PublishHandler.class);
    private final IAuthorizator m_authorizator;
    private final SubscriptionsStore subscriptions;
    private final IMessagesStore m_messagesStore;
    private final BrokerInterceptor m_interceptor;
    private final ConcurrentMap<String, ConnectionDescriptor> connectionDescriptors;
    private final ISessionsStore m_sessionsStore;
    private final String brokerPort;
    private final MessagesPublisher publisher;

    public Qos2PublishHandler(IAuthorizator authorizator, SubscriptionsStore subscriptions, IMessagesStore messagesStore, BrokerInterceptor interceptor, ConcurrentMap<String, ConnectionDescriptor> connectionDescriptors, ISessionsStore sessionsStore, String brokerPort, MessagesPublisher messagesPublisher) {
        this.m_authorizator = authorizator;
        this.subscriptions = subscriptions;
        this.m_messagesStore = messagesStore;
        this.m_interceptor = interceptor;
        this.connectionDescriptors = connectionDescriptors;
        this.m_sessionsStore = sessionsStore;
        this.brokerPort = brokerPort;
        this.publisher = messagesPublisher;
    }

    void receivedPublishQos2(Channel channel, PublishMessage msg) {
        AbstractMessage.QOSType qos = AbstractMessage.QOSType.EXACTLY_ONCE;
        String topic = msg.getTopicName();
        if (this.checkWriteOnTopic(topic, channel)) {
            return;
        }
        Integer messageID = msg.getMessageID();
        IMessagesStore.StoredMessage toStoreMsg = ProtocolProcessor.asStoredMessage(msg);
        String clientID = NettyUtils.clientID(channel);
        toStoreMsg.setClientID(clientID);
        LOG.info("PUBLISH on server {} from clientID <{}> on topic <{}> with QoS {}", new Object[]{this.brokerPort, clientID, topic, qos});
        MessageGUID guid = this.m_messagesStore.storePublishForFuture(toStoreMsg);
        if (msg.isLocal()) {
            this.sendPubRec(clientID, messageID);
        }
        if (msg.isRetainFlag()) {
            if (!msg.getPayload().hasRemaining()) {
                this.m_messagesStore.cleanRetained(topic);
            } else {
                this.m_messagesStore.storeRetained(topic, guid);
            }
        }
        String username = NettyUtils.userName(channel);
        this.m_interceptor.notifyTopicPublished(msg, clientID, username);
    }

    void processPubRel(Channel channel, PubRelMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = msg.getMessageID();
        LOG.debug("PUB --PUBREL--> SRV processPubRel invoked for clientID {} ad messageID {}", (Object)clientID, (Object)messageID);
        ClientSession targetSession = this.m_sessionsStore.sessionForClient(clientID);
        IMessagesStore.StoredMessage evt = targetSession.storedMessage(messageID);
        String topic = evt.getTopic();
        List<Subscription> topicMatchingSubscriptions = this.subscriptions.matches(topic);
        LOG.debug("publish2Subscribers republishing to existing subscribers that matches the topic {}", (Object)topic);
        if (LOG.isTraceEnabled()) {
            LOG.trace("content <{}>", (Object)DebugUtils.payload2Str(evt.getMessage()));
            LOG.trace("subscription tree {}", (Object)this.subscriptions.dumpTree());
        }
        this.publisher.publish2Subscribers(evt, topicMatchingSubscriptions);
        if (evt.isRetained()) {
            if (!evt.getMessage().hasRemaining()) {
                this.m_messagesStore.cleanRetained(topic);
            } else {
                this.m_messagesStore.storeRetained(topic, evt.getGuid());
            }
        }
        this.sendPubComp(clientID, messageID);
    }

    private boolean checkWriteOnTopic(String topic, Channel channel) {
        String clientID = NettyUtils.clientID(channel);
        String username = NettyUtils.userName(channel);
        if (!this.m_authorizator.canWrite(topic, username, clientID)) {
            LOG.debug("topic {} doesn't have write credentials", (Object)topic);
            return true;
        }
        return false;
    }

    private void sendPubRec(String clientID, int messageID) {
        LOG.trace("PUB <--PUBREC-- SRV sendPubRec invoked for clientID {} with messageID {}", (Object)clientID, (Object)messageID);
        PubRecMessage pubRecMessage = new PubRecMessage();
        pubRecMessage.setMessageID(messageID);
        ((ConnectionDescriptor)this.connectionDescriptors.get((Object)clientID)).channel.writeAndFlush(pubRecMessage);
    }

    private void sendPubComp(String clientID, int messageID) {
        LOG.debug("PUB <--PUBCOMP-- SRV sendPubComp invoked for clientID {} ad messageID {}", (Object)clientID, (Object)messageID);
        PubCompMessage pubCompMessage = new PubCompMessage();
        pubCompMessage.setMessageID(messageID);
        ((ConnectionDescriptor)this.connectionDescriptors.get((Object)clientID)).channel.writeAndFlush(pubCompMessage);
    }
}

