/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.spi.impl;

import io.moquette.parser.proto.messages.AbstractMessage;
import io.moquette.parser.proto.messages.PublishMessage;
import io.moquette.server.ConnectionDescriptor;
import io.moquette.spi.ClientSession;
import io.moquette.spi.impl.DebugUtils;
import io.moquette.spi.impl.ProtocolProcessor;
import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PersistentQueueMessageSender {
    private static final Logger LOG = LoggerFactory.getLogger(PersistentQueueMessageSender.class);
    private final ConcurrentMap<String, ConnectionDescriptor> connectionDescriptors;

    public PersistentQueueMessageSender(ConcurrentMap<String, ConnectionDescriptor> connectionDescriptors) {
        this.connectionDescriptors = connectionDescriptors;
    }

    void sendPublish(ClientSession clientsession, PublishMessage pubMessage) {
        String clientId = clientsession.clientID;
        LOG.info("send publish message to <{}> on topic <{}>", (Object)clientId, (Object)pubMessage.getTopicName());
        if (LOG.isDebugEnabled()) {
            LOG.debug("directSend invoked clientId <{}> on topic <{}> QoS {} retained {} messageID {}", new Object[]{clientId, pubMessage.getTopicName(), pubMessage.getQos(), false, pubMessage.getMessageID()});
            LOG.debug("content <{}>", (Object)DebugUtils.payload2Str(pubMessage.getPayload()));
        }
        if (this.connectionDescriptors == null) {
            throw new RuntimeException("Internal bad error, found connectionDescriptors to null while it should be initialized, somewhere it's overwritten!!");
        }
        if (this.connectionDescriptors.get(clientId) == null) {
            throw new RuntimeException(String.format("Can't find a ConnectionDescriptor for client <%s> in cache <%s>", clientId, this.connectionDescriptors));
        }
        Channel channel = ((ConnectionDescriptor)this.connectionDescriptors.get((Object)clientId)).channel;
        LOG.trace("Session for clientId {}", (Object)clientId);
        if (channel.isWritable()) {
            LOG.debug("channel is writable");
            channel.writeAndFlush(pubMessage);
        } else if (pubMessage.getQos() != AbstractMessage.QOSType.MOST_ONE) {
            LOG.debug("enqueue to client session");
            clientsession.enqueue(ProtocolProcessor.asStoredMessage(pubMessage));
        }
    }
}

