package io.moquette.spi.impl;

import io.moquette.parser.proto.messages.AbstractMessage;
import io.moquette.parser.proto.messages.PublishMessage;
import io.moquette.server.ConnectionDescriptor;
import io.moquette.spi.ClientSession;
import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/moquette/spi/impl/PersistentQueueMessageSender.class */
public class PersistentQueueMessageSender {
    private static final Logger LOG = LoggerFactory.getLogger(PersistentQueueMessageSender.class);
    private final ConcurrentMap<String, ConnectionDescriptor> connectionDescriptors;

    public PersistentQueueMessageSender(ConcurrentMap<String, ConnectionDescriptor> concurrentMap) {
        this.connectionDescriptors = concurrentMap;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void sendPublish(ClientSession clientSession, PublishMessage publishMessage) {
        String str = clientSession.clientID;
        LOG.info("send publish message to <{}> on topic <{}>", str, publishMessage.getTopicName());
        if (LOG.isDebugEnabled()) {
            LOG.debug("directSend invoked clientId <{}> on topic <{}> QoS {} retained {} messageID {}", new Object[]{str, publishMessage.getTopicName(), publishMessage.getQos(), false, publishMessage.getMessageID()});
            LOG.debug("content <{}>", DebugUtils.payload2Str(publishMessage.getPayload()));
        }
        if (this.connectionDescriptors == null) {
            throw new RuntimeException("Internal bad error, found connectionDescriptors to null while it should be initialized, somewhere it's overwritten!!");
        }
        if (this.connectionDescriptors.get(str) == null) {
            throw new RuntimeException(String.format("Can't find a ConnectionDescriptor for client <%s> in cache <%s>", str, this.connectionDescriptors));
        }
        Channel channel = this.connectionDescriptors.get(str).channel;
        LOG.trace("Session for clientId {}", str);
        if (channel.isWritable()) {
            LOG.debug("channel is writable");
            channel.writeAndFlush(publishMessage);
        } else if (publishMessage.getQos() != AbstractMessage.QOSType.MOST_ONE) {
            LOG.debug("enqueue to client session");
            clientSession.enqueue(ProtocolProcessor.asStoredMessage(publishMessage));
        }
    }
}
