package io.moquette.spi.impl;

import io.moquette.parser.proto.messages.AbstractMessage;
import io.moquette.parser.proto.messages.PubCompMessage;
import io.moquette.parser.proto.messages.PubRecMessage;
import io.moquette.parser.proto.messages.PubRelMessage;
import io.moquette.parser.proto.messages.PublishMessage;
import io.moquette.server.ConnectionDescriptor;
import io.moquette.server.netty.NettyUtils;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.ISessionsStore;
import io.moquette.spi.MessageGUID;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.impl.subscriptions.SubscriptionsStore;
import io.moquette.spi.security.IAuthorizator;
import io.netty.channel.Channel;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/moquette/spi/impl/Qos2PublishHandler.class */
public class Qos2PublishHandler {
    private static final Logger LOG = LoggerFactory.getLogger(Qos1PublishHandler.class);
    private final IAuthorizator m_authorizator;
    private final SubscriptionsStore subscriptions;
    private final IMessagesStore m_messagesStore;
    private final BrokerInterceptor m_interceptor;
    private final ConcurrentMap<String, ConnectionDescriptor> connectionDescriptors;
    private final ISessionsStore m_sessionsStore;
    private final String brokerPort;
    private final MessagesPublisher publisher;

    public Qos2PublishHandler(IAuthorizator iAuthorizator, SubscriptionsStore subscriptionsStore, IMessagesStore iMessagesStore, BrokerInterceptor brokerInterceptor, ConcurrentMap<String, ConnectionDescriptor> concurrentMap, ISessionsStore iSessionsStore, String str, MessagesPublisher messagesPublisher) {
        this.m_authorizator = iAuthorizator;
        this.subscriptions = subscriptionsStore;
        this.m_messagesStore = iMessagesStore;
        this.m_interceptor = brokerInterceptor;
        this.connectionDescriptors = concurrentMap;
        this.m_sessionsStore = iSessionsStore;
        this.brokerPort = str;
        this.publisher = messagesPublisher;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void receivedPublishQos2(Channel channel, PublishMessage publishMessage) {
        AbstractMessage.QOSType qOSType = AbstractMessage.QOSType.EXACTLY_ONCE;
        String topicName = publishMessage.getTopicName();
        if (checkWriteOnTopic(topicName, channel)) {
            return;
        }
        Integer messageID = publishMessage.getMessageID();
        IMessagesStore.StoredMessage asStoredMessage = ProtocolProcessor.asStoredMessage(publishMessage);
        String clientID = NettyUtils.clientID(channel);
        asStoredMessage.setClientID(clientID);
        LOG.info("PUBLISH on server {} from clientID <{}> on topic <{}> with QoS {}", new Object[]{this.brokerPort, clientID, topicName, qOSType});
        MessageGUID storePublishForFuture = this.m_messagesStore.storePublishForFuture(asStoredMessage);
        if (publishMessage.isLocal()) {
            sendPubRec(clientID, messageID.intValue());
        }
        if (publishMessage.isRetainFlag()) {
            if (publishMessage.getPayload().hasRemaining()) {
                this.m_messagesStore.storeRetained(topicName, storePublishForFuture);
            } else {
                this.m_messagesStore.cleanRetained(topicName);
            }
        }
        this.m_interceptor.notifyTopicPublished(publishMessage, clientID, NettyUtils.userName(channel));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processPubRel(Channel channel, PubRelMessage pubRelMessage) {
        String clientID = NettyUtils.clientID(channel);
        int intValue = pubRelMessage.getMessageID().intValue();
        LOG.debug("PUB --PUBREL--> SRV processPubRel invoked for clientID {} ad messageID {}", clientID, Integer.valueOf(intValue));
        IMessagesStore.StoredMessage storedMessage = this.m_sessionsStore.sessionForClient(clientID).storedMessage(intValue);
        String topic = storedMessage.getTopic();
        List<Subscription> matches = this.subscriptions.matches(topic);
        LOG.debug("publish2Subscribers republishing to existing subscribers that matches the topic {}", topic);
        if (LOG.isTraceEnabled()) {
            LOG.trace("content <{}>", DebugUtils.payload2Str(storedMessage.getMessage()));
            LOG.trace("subscription tree {}", this.subscriptions.dumpTree());
        }
        this.publisher.publish2Subscribers(storedMessage, matches);
        if (storedMessage.isRetained()) {
            if (storedMessage.getMessage().hasRemaining()) {
                this.m_messagesStore.storeRetained(topic, storedMessage.getGuid());
            } else {
                this.m_messagesStore.cleanRetained(topic);
            }
        }
        sendPubComp(clientID, intValue);
    }

    private boolean checkWriteOnTopic(String str, Channel channel) {
        String clientID = NettyUtils.clientID(channel);
        if (this.m_authorizator.canWrite(str, NettyUtils.userName(channel), clientID)) {
            return false;
        }
        LOG.debug("topic {} doesn't have write credentials", str);
        return true;
    }

    private void sendPubRec(String str, int i) {
        LOG.trace("PUB <--PUBREC-- SRV sendPubRec invoked for clientID {} with messageID {}", str, Integer.valueOf(i));
        PubRecMessage pubRecMessage = new PubRecMessage();
        pubRecMessage.setMessageID(Integer.valueOf(i));
        this.connectionDescriptors.get(str).channel.writeAndFlush(pubRecMessage);
    }

    private void sendPubComp(String str, int i) {
        LOG.debug("PUB <--PUBCOMP-- SRV sendPubComp invoked for clientID {} ad messageID {}", str, Integer.valueOf(i));
        PubCompMessage pubCompMessage = new PubCompMessage();
        pubCompMessage.setMessageID(Integer.valueOf(i));
        this.connectionDescriptors.get(str).channel.writeAndFlush(pubCompMessage);
    }
}
