/*
 * Decompiled with CFR 0.152.
 */
package io.moquette.spi.impl;

import io.moquette.interception.InterceptHandler;
import io.moquette.interception.messages.InterceptAcknowledgedMessage;
import io.moquette.parser.proto.messages.AbstractMessage;
import io.moquette.parser.proto.messages.ConnAckMessage;
import io.moquette.parser.proto.messages.ConnectMessage;
import io.moquette.parser.proto.messages.PubAckMessage;
import io.moquette.parser.proto.messages.PubCompMessage;
import io.moquette.parser.proto.messages.PubRecMessage;
import io.moquette.parser.proto.messages.PubRelMessage;
import io.moquette.parser.proto.messages.PublishMessage;
import io.moquette.parser.proto.messages.SubAckMessage;
import io.moquette.parser.proto.messages.SubscribeMessage;
import io.moquette.parser.proto.messages.UnsubAckMessage;
import io.moquette.parser.proto.messages.UnsubscribeMessage;
import io.moquette.server.ConnectionDescriptor;
import io.moquette.server.netty.AutoFlushHandler;
import io.moquette.server.netty.NettyUtils;
import io.moquette.spi.ClientSession;
import io.moquette.spi.IMatchingCondition;
import io.moquette.spi.IMessagesStore;
import io.moquette.spi.ISessionsStore;
import io.moquette.spi.MessageGUID;
import io.moquette.spi.impl.BrokerInterceptor;
import io.moquette.spi.impl.InternalRepublisher;
import io.moquette.spi.impl.MessagesPublisher;
import io.moquette.spi.impl.PersistentQueueMessageSender;
import io.moquette.spi.impl.Qos0PublishHandler;
import io.moquette.spi.impl.Qos1PublishHandler;
import io.moquette.spi.impl.Qos2PublishHandler;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.moquette.spi.impl.subscriptions.SubscriptionsStore;
import io.moquette.spi.security.IAuthenticator;
import io.moquette.spi.security.IAuthorizator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.timeout.IdleStateHandler;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProtocolProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(ProtocolProcessor.class);
    protected ConcurrentMap<String, ConnectionDescriptor> connectionDescriptors;
    protected ConcurrentMap<RunningSubscription, SubscriptionState> subscriptionInCourse;
    private SubscriptionsStore subscriptions;
    private boolean allowAnonymous;
    private boolean allowZeroByteClientId;
    private IAuthorizator m_authorizator;
    private IMessagesStore m_messagesStore;
    private ISessionsStore m_sessionsStore;
    private IAuthenticator m_authenticator;
    private BrokerInterceptor m_interceptor;
    private String m_server_port;
    private Qos0PublishHandler qos0PublishHandler;
    private Qos1PublishHandler qos1PublishHandler;
    private Qos2PublishHandler qos2PublishHandler;
    private MessagesPublisher messagesPublisher;
    private InternalRepublisher internalRepublisher;
    private ConcurrentMap<String, WillMessage> m_willStore = new ConcurrentHashMap<String, WillMessage>();

    ProtocolProcessor() {
    }

    public void init(SubscriptionsStore subscriptions, IMessagesStore storageService, ISessionsStore sessionsStore, IAuthenticator authenticator, boolean allowAnonymous, IAuthorizator authorizator, BrokerInterceptor interceptor) {
        this.init(subscriptions, storageService, sessionsStore, authenticator, allowAnonymous, false, authorizator, interceptor, null);
    }

    public void init(SubscriptionsStore subscriptions, IMessagesStore storageService, ISessionsStore sessionsStore, IAuthenticator authenticator, boolean allowAnonymous, boolean allowZeroByteClientId, IAuthorizator authorizator, BrokerInterceptor interceptor) {
        this.init(subscriptions, storageService, sessionsStore, authenticator, allowAnonymous, allowZeroByteClientId, authorizator, interceptor, null);
    }

    void init(SubscriptionsStore subscriptions, IMessagesStore storageService, ISessionsStore sessionsStore, IAuthenticator authenticator, boolean allowAnonymous, boolean allowZeroByteClientId, IAuthorizator authorizator, BrokerInterceptor interceptor, String serverPort) {
        this.connectionDescriptors = new ConcurrentHashMap<String, ConnectionDescriptor>();
        this.subscriptionInCourse = new ConcurrentHashMap<RunningSubscription, SubscriptionState>();
        this.m_interceptor = interceptor;
        this.subscriptions = subscriptions;
        this.allowAnonymous = allowAnonymous;
        this.allowZeroByteClientId = allowZeroByteClientId;
        this.m_authorizator = authorizator;
        LOG.trace("subscription tree on init {}", (Object)subscriptions.dumpTree());
        this.m_authenticator = authenticator;
        this.m_messagesStore = storageService;
        this.m_sessionsStore = sessionsStore;
        this.m_server_port = serverPort;
        PersistentQueueMessageSender messageSender = new PersistentQueueMessageSender(this.connectionDescriptors);
        this.messagesPublisher = new MessagesPublisher(this.connectionDescriptors, sessionsStore, this.m_messagesStore, messageSender);
        this.qos0PublishHandler = new Qos0PublishHandler(this.m_authorizator, subscriptions, this.m_messagesStore, this.m_interceptor, this.messagesPublisher);
        this.qos1PublishHandler = new Qos1PublishHandler(this.m_authorizator, subscriptions, this.m_messagesStore, this.m_interceptor, this.connectionDescriptors, this.m_server_port, this.messagesPublisher);
        this.qos2PublishHandler = new Qos2PublishHandler(this.m_authorizator, subscriptions, this.m_messagesStore, this.m_interceptor, this.connectionDescriptors, this.m_sessionsStore, this.m_server_port, this.messagesPublisher);
        this.internalRepublisher = new InternalRepublisher(messageSender);
    }

    public void processConnect(Channel channel, ConnectMessage msg) {
        ConnectionDescriptor descriptor;
        LOG.info("CONNECT for client <{}>", (Object)msg.getClientID());
        if (msg.getProtocolVersion() != 3 && msg.getProtocolVersion() != 4) {
            ConnAckMessage badProto = new ConnAckMessage();
            badProto.setReturnCode((byte)1);
            LOG.warn("CONNECT sent bad proto ConnAck");
            channel.writeAndFlush(badProto);
            channel.close();
            return;
        }
        if (msg.getClientID() == null || msg.getClientID().length() == 0) {
            if (!msg.isCleanSession() || !this.allowZeroByteClientId) {
                ConnAckMessage okResp = new ConnAckMessage();
                okResp.setReturnCode((byte)2);
                channel.writeAndFlush(okResp);
                channel.close();
                LOG.warn("CONNECT sent rejected identifier ConnAck");
                return;
            }
            String randomIdentifier = UUID.randomUUID().toString().replace("-", "");
            msg.setClientID(randomIdentifier);
            LOG.info("Client connected with server generated identifier: {}", (Object)randomIdentifier);
        }
        if (!this.login(channel, msg)) {
            channel.close();
            return;
        }
        String clientID = msg.getClientID();
        ConnectionDescriptor existing = this.connectionDescriptors.putIfAbsent(clientID, descriptor = new ConnectionDescriptor(clientID, channel, msg.isCleanSession()));
        if (existing != null) {
            LOG.info("Found an existing connection with same client ID <{}>, forcing to close", (Object)msg.getClientID());
            existing.abort();
            return;
        }
        this.initializeKeepAliveTimeout(channel, msg);
        this.storeWillMessage(msg);
        if (!this.sendAck(descriptor, msg)) {
            channel.close();
            return;
        }
        this.m_interceptor.notifyClientConnected(msg);
        ClientSession clientSession = this.createOrLoadClientSession(descriptor, msg);
        if (clientSession == null) {
            channel.close();
            return;
        }
        if (!this.republish(descriptor, msg, clientSession)) {
            channel.close();
            return;
        }
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.MESSAGES_REPUBLISHED, ConnectionDescriptor.ConnectionState.ESTABLISHED);
        if (!success) {
            channel.close();
        } else {
            LOG.info("Connection established");
        }
        LOG.info("CONNECT processed");
    }

    private boolean login(Channel channel, ConnectMessage msg) {
        if (msg.isUserFlag()) {
            byte[] pwd = null;
            if (msg.isPasswordFlag()) {
                pwd = msg.getPassword();
            } else if (!this.allowAnonymous) {
                this.failedCredentials(channel);
                return false;
            }
            if (!this.m_authenticator.checkValid(msg.getClientID(), msg.getUsername(), pwd)) {
                this.failedCredentials(channel);
                return false;
            }
            NettyUtils.userName(channel, msg.getUsername());
        } else if (!this.allowAnonymous) {
            this.failedCredentials(channel);
            return false;
        }
        return true;
    }

    private boolean sendAck(ConnectionDescriptor descriptor, ConnectMessage msg) {
        boolean isSessionAlreadyStored;
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.DISCONNECTED, ConnectionDescriptor.ConnectionState.SENDACK);
        if (!success) {
            return false;
        }
        ConnAckMessage okResp = new ConnAckMessage();
        okResp.setReturnCode((byte)0);
        ClientSession clientSession = this.m_sessionsStore.sessionForClient(msg.getClientID());
        boolean bl = isSessionAlreadyStored = clientSession != null;
        if (!msg.isCleanSession() && isSessionAlreadyStored) {
            okResp.setSessionPresent(true);
        }
        if (isSessionAlreadyStored) {
            clientSession.cleanSession(msg.isCleanSession());
        }
        descriptor.channel.writeAndFlush(okResp);
        return true;
    }

    private void initializeKeepAliveTimeout(Channel channel, ConnectMessage msg) {
        int keepAlive = msg.getKeepAlive();
        LOG.debug("Connect with keepAlive {} s", (Object)keepAlive);
        NettyUtils.keepAlive(channel, keepAlive);
        NettyUtils.cleanSession(channel, msg.isCleanSession());
        NettyUtils.clientID(channel, msg.getClientID());
        LOG.debug("Connect create session <{}>", (Object)channel);
        this.setIdleTime(channel.pipeline(), Math.round((float)keepAlive * 1.5f));
    }

    private void storeWillMessage(ConnectMessage msg) {
        if (msg.isWillFlag()) {
            AbstractMessage.QOSType willQos = AbstractMessage.QOSType.valueOf(msg.getWillQos());
            byte[] willPayload = msg.getWillMessage();
            ByteBuffer bb = (ByteBuffer)ByteBuffer.allocate(willPayload.length).put(willPayload).flip();
            WillMessage will = new WillMessage(msg.getWillTopic(), bb, msg.isWillRetain(), willQos);
            this.m_willStore.put(msg.getClientID(), will);
            LOG.info("Session for clientID <{}> with will to topic {}", (Object)msg.getClientID(), (Object)msg.getWillTopic());
        }
    }

    private ClientSession createOrLoadClientSession(ConnectionDescriptor descriptor, ConnectMessage msg) {
        boolean isSessionAlreadyStored;
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.SENDACK, ConnectionDescriptor.ConnectionState.SESSION_CREATED);
        if (!success) {
            return null;
        }
        ClientSession clientSession = this.m_sessionsStore.sessionForClient(msg.getClientID());
        boolean bl = isSessionAlreadyStored = clientSession != null;
        if (!isSessionAlreadyStored) {
            LOG.debug("Create persistent session for clientID <{}>", (Object)msg.getClientID());
            clientSession = this.m_sessionsStore.createNewSession(msg.getClientID(), msg.isCleanSession());
        }
        if (msg.isCleanSession()) {
            clientSession.cleanSession();
        }
        LOG.debug("Created session for client ID <{}> with clean session {}", (Object)msg.getClientID(), (Object)msg.isCleanSession());
        return clientSession;
    }

    private boolean republish(ConnectionDescriptor descriptor, ConnectMessage msg, ClientSession clientSession) {
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.SESSION_CREATED, ConnectionDescriptor.ConnectionState.MESSAGES_REPUBLISHED);
        if (!success) {
            return false;
        }
        if (!msg.isCleanSession()) {
            this.republishStoredInSession(clientSession);
        }
        int flushIntervalMs = 500;
        this.setupAutoFlusher(descriptor.channel.pipeline(), flushIntervalMs);
        return true;
    }

    private void failedCredentials(Channel session) {
        ConnAckMessage okResp = new ConnAckMessage();
        okResp.setReturnCode((byte)4);
        session.writeAndFlush(okResp);
        LOG.info("Client {} failed to connect with bad username or password.", (Object)session);
    }

    private void setupAutoFlusher(ChannelPipeline pipeline, int flushIntervalMs) {
        try {
            pipeline.addAfter("idleEventHandler", "autoFlusher", new AutoFlushHandler(flushIntervalMs, TimeUnit.MILLISECONDS));
        }
        catch (NoSuchElementException nseex) {
            pipeline.addFirst("autoFlusher", (ChannelHandler)new AutoFlushHandler(flushIntervalMs, TimeUnit.MILLISECONDS));
        }
    }

    private void setIdleTime(ChannelPipeline pipeline, int idleTime) {
        if (pipeline.names().contains("idleStateHandler")) {
            pipeline.remove("idleStateHandler");
        }
        pipeline.addFirst("idleStateHandler", (ChannelHandler)new IdleStateHandler(0, 0, idleTime));
    }

    private void republishStoredInSession(ClientSession clientSession) {
        LOG.trace("republishStoredInSession for client <{}>", (Object)clientSession);
        BlockingQueue<IMessagesStore.StoredMessage> publishedEvents = clientSession.queue();
        if (publishedEvents.isEmpty()) {
            LOG.info("No stored messages for client <{}>", (Object)clientSession.clientID);
            return;
        }
        LOG.info("republishing stored messages to client <{}>", (Object)clientSession.clientID);
        this.internalRepublisher.publishStored(clientSession, publishedEvents);
    }

    public void processPubAck(Channel channel, PubAckMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = msg.getMessageID();
        String username = NettyUtils.userName(channel);
        LOG.trace("retrieving inflight for messageID <{}>", (Object)messageID);
        ClientSession targetSession = this.m_sessionsStore.sessionForClient(clientID);
        IMessagesStore.StoredMessage inflightMsg = targetSession.getInflightMessage(messageID);
        targetSession.inFlightAcknowledged(messageID);
        String topic = inflightMsg.getTopic();
        this.m_interceptor.notifyMessageAcknowledged(new InterceptAcknowledgedMessage(inflightMsg, topic, username));
    }

    public static IMessagesStore.StoredMessage asStoredMessage(PublishMessage msg) {
        IMessagesStore.StoredMessage stored = new IMessagesStore.StoredMessage(msg.getPayload().array(), msg.getQos(), msg.getTopicName());
        stored.setRetained(msg.isRetainFlag());
        stored.setMessageID(msg.getMessageID());
        return stored;
    }

    private static IMessagesStore.StoredMessage asStoredMessage(WillMessage will) {
        IMessagesStore.StoredMessage pub = new IMessagesStore.StoredMessage(will.getPayload().array(), will.getQos(), will.getTopic());
        pub.setRetained(will.isRetained());
        return pub;
    }

    public void processPublish(Channel channel, PublishMessage msg) {
        LOG.info("PUB --PUBLISH--> SRV executePublish invoked with {}", (Object)msg);
        AbstractMessage.QOSType qos = msg.getQos();
        switch (qos) {
            case MOST_ONE: {
                this.qos0PublishHandler.receivedPublishQos0(channel, msg);
                break;
            }
            case LEAST_ONE: {
                this.qos1PublishHandler.receivedPublishQos1(channel, msg);
                break;
            }
            case EXACTLY_ONCE: {
                this.qos2PublishHandler.receivedPublishQos2(channel, msg);
            }
        }
    }

    public void internalPublish(PublishMessage msg) {
        AbstractMessage.QOSType qos = msg.getQos();
        String topic = msg.getTopicName();
        LOG.info("embedded PUBLISH on topic <{}> with QoS {}", (Object)topic, (Object)qos);
        MessageGUID guid = null;
        IMessagesStore.StoredMessage toStoreMsg = ProtocolProcessor.asStoredMessage(msg);
        if (msg.getClientId() == null || msg.getClientId().isEmpty()) {
            toStoreMsg.setClientID("BROKER_SELF");
        } else {
            toStoreMsg.setClientID(msg.getClientId());
        }
        toStoreMsg.setMessageID(1);
        if (qos == AbstractMessage.QOSType.EXACTLY_ONCE) {
            guid = this.m_messagesStore.storePublishForFuture(toStoreMsg);
        }
        List<Subscription> topicMatchingSubscriptions = this.subscriptions.matches(topic);
        this.messagesPublisher.publish2Subscribers(toStoreMsg, topicMatchingSubscriptions);
        if (!msg.isRetainFlag()) {
            return;
        }
        if (qos == AbstractMessage.QOSType.MOST_ONE || !msg.getPayload().hasRemaining()) {
            this.m_messagesStore.cleanRetained(topic);
            return;
        }
        if (guid == null) {
            guid = this.m_messagesStore.storePublishForFuture(toStoreMsg);
        }
        this.m_messagesStore.storeRetained(topic, guid);
    }

    private void forwardPublishWill(WillMessage will, String clientID) {
        Integer messageId = null;
        if (will.getQos() != AbstractMessage.QOSType.MOST_ONE) {
            messageId = this.m_sessionsStore.nextPacketID(clientID);
        }
        IMessagesStore.StoredMessage tobeStored = ProtocolProcessor.asStoredMessage(will);
        tobeStored.setClientID(clientID);
        tobeStored.setMessageID(messageId);
        String topic = tobeStored.getTopic();
        List<Subscription> topicMatchingSubscriptions = this.subscriptions.matches(topic);
        this.messagesPublisher.publish2Subscribers(tobeStored, topicMatchingSubscriptions);
    }

    static AbstractMessage.QOSType lowerQosToTheSubscriptionDesired(Subscription sub, AbstractMessage.QOSType qos) {
        if (qos.byteValue() > sub.getRequestedQos().byteValue()) {
            qos = sub.getRequestedQos();
        }
        return qos;
    }

    public void processPubRel(Channel channel, PubRelMessage msg) {
        this.qos2PublishHandler.processPubRel(channel, msg);
    }

    public void processPubRec(Channel channel, PubRecMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        ClientSession targetSession = this.m_sessionsStore.sessionForClient(clientID);
        int messageID = msg.getMessageID();
        targetSession.moveInFlightToSecondPhaseAckWaiting(messageID);
        LOG.debug("\t\tSRV <--PUBREC-- SUB processPubRec invoked for clientID {} ad messageID {}", (Object)clientID, (Object)messageID);
        PubRelMessage pubRelMessage = new PubRelMessage();
        pubRelMessage.setMessageID(messageID);
        pubRelMessage.setQos(AbstractMessage.QOSType.LEAST_ONE);
        channel.writeAndFlush(pubRelMessage);
    }

    public void processPubComp(Channel channel, PubCompMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        int messageID = msg.getMessageID();
        LOG.debug("\t\tSRV <--PUBCOMP-- SUB processPubComp invoked for clientID {} ad messageID {}", (Object)clientID, (Object)messageID);
        ClientSession targetSession = this.m_sessionsStore.sessionForClient(clientID);
        IMessagesStore.StoredMessage inflightMsg = targetSession.secondPhaseAcknowledged(messageID);
        String username = NettyUtils.userName(channel);
        String topic = inflightMsg.getTopic();
        this.m_interceptor.notifyMessageAcknowledged(new InterceptAcknowledgedMessage(inflightMsg, topic, username));
    }

    public void processDisconnect(Channel channel) throws InterruptedException {
        channel.flush();
        String clientID = NettyUtils.clientID(channel);
        ConnectionDescriptor existingDescriptor = (ConnectionDescriptor)this.connectionDescriptors.get(clientID);
        if (existingDescriptor == null) {
            channel.close();
            return;
        }
        if (existingDescriptor.channel != channel) {
            channel.close();
            return;
        }
        if (!this.removeSubscriptions(existingDescriptor, clientID)) {
            channel.close();
            return;
        }
        if (!this.dropStoredMessages(existingDescriptor, clientID)) {
            channel.close();
            return;
        }
        if (!this.cleanWillMessageAndNotifyInterceptor(existingDescriptor, clientID)) {
            channel.close();
            return;
        }
        if (!this.closeChannel(existingDescriptor)) {
            return;
        }
        boolean stillPresent = this.connectionDescriptors.remove(clientID, existingDescriptor);
        if (!stillPresent) {
            return;
        }
        LOG.info("DISCONNECT client <{}> finished", (Object)clientID);
    }

    private boolean removeSubscriptions(ConnectionDescriptor descriptor, String clientID) {
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.ESTABLISHED, ConnectionDescriptor.ConnectionState.SUBSCRIPTIONS_REMOVED);
        if (!success) {
            return false;
        }
        if (descriptor.cleanSession) {
            LOG.info("cleaning old saved subscriptions for client <{}>", (Object)clientID);
            this.m_sessionsStore.wipeSubscriptions(clientID);
            LOG.debug("Wiped subscriptions for client <{}>", (Object)clientID);
        }
        return true;
    }

    private boolean dropStoredMessages(ConnectionDescriptor descriptor, String clientID) {
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.SUBSCRIPTIONS_REMOVED, ConnectionDescriptor.ConnectionState.MESSAGES_DROPPED);
        if (!success) {
            return false;
        }
        if (descriptor.cleanSession) {
            LOG.debug("Removing messages in session's queue for client <{}>", (Object)clientID);
            this.m_sessionsStore.dropQueue(clientID);
            LOG.debug("Removed messages in session for client's queue <{}>", (Object)clientID);
        }
        return true;
    }

    private boolean cleanWillMessageAndNotifyInterceptor(ConnectionDescriptor descriptor, String clientID) {
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.MESSAGES_DROPPED, ConnectionDescriptor.ConnectionState.INTERCEPTORS_NOTIFIED);
        if (!success) {
            return false;
        }
        this.m_willStore.remove(clientID);
        String username = NettyUtils.userName(descriptor.channel);
        this.m_interceptor.notifyClientDisconnected(clientID, username);
        return true;
    }

    private boolean closeChannel(ConnectionDescriptor descriptor) {
        boolean success = descriptor.assignState(ConnectionDescriptor.ConnectionState.INTERCEPTORS_NOTIFIED, ConnectionDescriptor.ConnectionState.DISCONNECTED);
        if (!success) {
            return false;
        }
        descriptor.channel.close();
        return true;
    }

    public void processConnectionLost(String clientID, Channel channel) {
        ConnectionDescriptor oldConnDescr = new ConnectionDescriptor(clientID, channel, true);
        this.connectionDescriptors.remove(clientID, oldConnDescr);
        if (this.m_willStore.containsKey(clientID)) {
            WillMessage will = (WillMessage)this.m_willStore.get(clientID);
            this.forwardPublishWill(will, clientID);
            this.m_willStore.remove(clientID);
        }
        String username = NettyUtils.userName(channel);
        this.m_interceptor.notifyClientConnectionLost(clientID, username);
    }

    public void processUnsubscribe(Channel channel, UnsubscribeMessage msg) {
        List<String> topics = msg.topicFilters();
        String clientID = NettyUtils.clientID(channel);
        LOG.debug("UNSUBSCRIBE subscription on topics {} for clientID <{}>", topics, (Object)clientID);
        ClientSession clientSession = this.m_sessionsStore.sessionForClient(clientID);
        for (String topic : topics) {
            boolean validTopic = SubscriptionsStore.validate(topic);
            if (!validTopic) {
                channel.close();
                LOG.warn("UNSUBSCRIBE found an invalid topic filter <{}> for clientID <{}>", (Object)topic, (Object)clientID);
                return;
            }
            this.subscriptions.removeSubscription(topic, clientID);
            clientSession.unsubscribeFrom(topic);
            String username = NettyUtils.userName(channel);
            this.m_interceptor.notifyTopicUnsubscribed(topic, clientID, username);
        }
        int messageID = msg.getMessageID();
        UnsubAckMessage ackMessage = new UnsubAckMessage();
        ackMessage.setMessageID(messageID);
        LOG.info("replying with UnsubAck to MSG ID {}", (Object)messageID);
        channel.writeAndFlush(ackMessage);
    }

    public void processSubscribe(Channel channel, SubscribeMessage msg) {
        String clientID = NettyUtils.clientID(channel);
        LOG.info("SUBSCRIBE client <{}>", (Object)clientID);
        int messageID = msg.getMessageID();
        LOG.debug("SUBSCRIBE client <{}> on server {} packetID {}", new Object[]{clientID, this.m_server_port, messageID});
        RunningSubscription executionKey = new RunningSubscription(clientID, messageID);
        SubscriptionState currentStatus = this.subscriptionInCourse.putIfAbsent(executionKey, SubscriptionState.VERIFIED);
        if (currentStatus != null) {
            LOG.debug("The client <{}> sent another SUBSCRIBE while this one was processing", (Object)clientID);
            return;
        }
        String username = NettyUtils.userName(channel);
        List<SubscribeMessage.Couple> ackTopics = this.doVerify(clientID, username, msg);
        SubAckMessage ackMessage = this.doAckMessageFromValidateFilters(ackTopics);
        if (!this.subscriptionInCourse.replace(executionKey, SubscriptionState.VERIFIED, SubscriptionState.STORED)) {
            LOG.debug("The client {} sent another SUBSCRIBE while this one was verifing topicFilters");
            return;
        }
        ackMessage.setMessageID(messageID);
        List<Subscription> newSubscriptions = this.doStoreSubscription(ackTopics, clientID);
        LOG.debug("SUBACK for packetID {}", (Object)messageID);
        if (LOG.isTraceEnabled()) {
            LOG.trace("subscription tree {}", (Object)this.subscriptions.dumpTree());
        }
        for (Subscription subscription : newSubscriptions) {
            LOG.debug("Persisting subscription {}", (Object)subscription);
            this.subscriptions.add(subscription.asClientTopicCouple());
        }
        channel.writeAndFlush(ackMessage);
        for (Subscription subscription : newSubscriptions) {
            this.publishRetainedMessagesInSession(subscription, username);
        }
        boolean success = this.subscriptionInCourse.remove(executionKey, (Object)SubscriptionState.STORED);
        if (!success) {
            LOG.warn("Failed to remove the descriptor, something bad happened");
        }
    }

    private List<Subscription> doStoreSubscription(List<SubscribeMessage.Couple> ackTopics, String clientID) {
        ClientSession clientSession = this.m_sessionsStore.sessionForClient(clientID);
        ArrayList<Subscription> newSubscriptions = new ArrayList<Subscription>();
        for (SubscribeMessage.Couple req : ackTopics) {
            if (req.qos == AbstractMessage.QOSType.FAILURE.byteValue()) continue;
            AbstractMessage.QOSType qos = AbstractMessage.QOSType.valueOf(req.qos);
            Subscription newSubscription = new Subscription(clientID, req.topicFilter, qos);
            clientSession.subscribe(newSubscription);
            newSubscriptions.add(newSubscription);
        }
        return newSubscriptions;
    }

    private List<SubscribeMessage.Couple> doVerify(String clientID, String username, SubscribeMessage msg) {
        ClientSession clientSession = this.m_sessionsStore.sessionForClient(clientID);
        ArrayList<SubscribeMessage.Couple> ackTopics = new ArrayList<SubscribeMessage.Couple>();
        for (SubscribeMessage.Couple req : msg.subscriptions()) {
            if (!this.m_authorizator.canRead(req.topicFilter, username, clientSession.clientID)) {
                LOG.debug("topic {} doesn't have read credentials", (Object)req.topicFilter);
                ackTopics.add(new SubscribeMessage.Couple(AbstractMessage.QOSType.FAILURE.byteValue(), req.topicFilter));
                continue;
            }
            boolean validTopic = SubscriptionsStore.validate(req.topicFilter);
            AbstractMessage.QOSType qos = validTopic ? AbstractMessage.QOSType.valueOf(req.qos) : AbstractMessage.QOSType.FAILURE;
            ackTopics.add(new SubscribeMessage.Couple(qos.byteValue(), req.topicFilter));
        }
        return ackTopics;
    }

    private SubAckMessage doAckMessageFromValidateFilters(List<SubscribeMessage.Couple> topicFilters) {
        SubAckMessage ackMessage = new SubAckMessage();
        for (SubscribeMessage.Couple req : topicFilters) {
            ackMessage.addType(AbstractMessage.QOSType.valueOf(req.qos));
        }
        return ackMessage;
    }

    private void publishRetainedMessagesInSession(final Subscription newSubscription, String username) {
        LOG.debug("Publish persisted messages in session {}", (Object)newSubscription);
        Collection<IMessagesStore.StoredMessage> messages = this.m_messagesStore.searchMatching(new IMatchingCondition(){

            @Override
            public boolean match(String key) {
                return SubscriptionsStore.matchTopics(key, newSubscription.getTopicFilter());
            }
        });
        LOG.debug("Found {} messages to republish", (Object)messages.size());
        ClientSession targetSession = this.m_sessionsStore.sessionForClient(newSubscription.getClientId());
        this.internalRepublisher.publishRetained(targetSession, messages);
        this.m_interceptor.notifyTopicSubscribed(newSubscription, username);
    }

    public void notifyChannelWritable(Channel channel) {
        String clientID = NettyUtils.clientID(channel);
        ClientSession clientSession = this.m_sessionsStore.sessionForClient(clientID);
        boolean emptyQueue = false;
        while (channel.isWritable() && !emptyQueue) {
            IMessagesStore.StoredMessage msg = (IMessagesStore.StoredMessage)clientSession.queue().poll();
            if (msg == null) {
                emptyQueue = true;
                continue;
            }
            boolean retained = this.m_messagesStore.getMessageByGuid(msg.getGuid()) != null;
            PublishMessage pubMsg = InternalRepublisher.createPublishForQos(msg.getTopic(), msg.getQos(), msg.getMessage(), retained);
            channel.write(pubMsg);
        }
        channel.flush();
    }

    public boolean addInterceptHandler(InterceptHandler interceptHandler) {
        return this.m_interceptor.addInterceptHandler(interceptHandler);
    }

    public boolean removeInterceptHandler(InterceptHandler interceptHandler) {
        return this.m_interceptor.removeInterceptHandler(interceptHandler);
    }

    private class RunningSubscription {
        final String clientID;
        final long packetId;

        RunningSubscription(String clientID, long packeId) {
            this.clientID = clientID;
            this.packetId = packeId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            RunningSubscription that = (RunningSubscription)o;
            if (this.packetId != that.packetId) {
                return false;
            }
            return this.clientID != null ? this.clientID.equals(that.clientID) : that.clientID == null;
        }

        public int hashCode() {
            int result = this.clientID != null ? this.clientID.hashCode() : 0;
            result = 31 * result + (int)(this.packetId ^ this.packetId >>> 32);
            return result;
        }
    }

    private static enum SubscriptionState {
        STORED,
        VERIFIED;

    }

    static final class WillMessage {
        private final String topic;
        private final ByteBuffer payload;
        private final boolean retained;
        private final AbstractMessage.QOSType qos;

        public WillMessage(String topic, ByteBuffer payload, boolean retained, AbstractMessage.QOSType qos) {
            this.topic = topic;
            this.payload = payload;
            this.retained = retained;
            this.qos = qos;
        }

        public String getTopic() {
            return this.topic;
        }

        public ByteBuffer getPayload() {
            return this.payload;
        }

        public boolean isRetained() {
            return this.retained;
        }

        public AbstractMessage.QOSType getQos() {
            return this.qos;
        }
    }
}

