package io.moquette.broker;

import io.moquette.broker.ISessionsRepository;
import io.moquette.broker.SessionRegistry;
import io.moquette.broker.Utils;
import io.moquette.broker.scheduler.Expirable;
import io.moquette.broker.scheduler.ScheduledExpirationService;
import io.moquette.broker.subscriptions.ISubscriptionsDirectory;
import io.moquette.broker.subscriptions.ShareName;
import io.moquette.broker.subscriptions.Subscription;
import io.moquette.broker.subscriptions.SubscriptionIdentifier;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.interception.BrokerInterceptor;
import io.moquette.persistence.H2SessionsRepository;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttReasonCodes;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.ReferenceCountUtil;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/moquette/broker/PostOffice.class */
public class PostOffice {
    private static final String WILL_PUBLISHER = "will_publisher";
    private static final String INTERNAL_PUBLISHER = "internal_publisher";
    private static final Logger LOG = LoggerFactory.getLogger(PostOffice.class);
    private static final Set<String> NO_FILTER = new HashSet();
    private final Authorizator authorizator;
    private final ISubscriptionsDirectory subscriptions;
    private final IRetainedRepository retainedRepository;
    private final ISessionsRepository sessionRepository;
    private SessionRegistry sessionRegistry;
    private BrokerInterceptor interceptor;
    private final FailedPublishCollection failedPublishes;
    private final SessionEventLoopGroup sessionLoops;
    private final Clock clock;
    private final ScheduledExpirationService<ISessionsRepository.Will> willExpirationService;
    private final ScheduledExpirationService<ExpirableTopic> retainedMessagesExpirationService;
    private final MqttQoS maxServerGrantedQos;

    /* renamed from: io.moquette.broker.PostOffice$1, reason: invalid class name */
    /* loaded from: input_file:io/moquette/broker/PostOffice$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$netty$handler$codec$mqtt$MqttSubscriptionOption$RetainedHandlingPolicy = new int[MqttSubscriptionOption.RetainedHandlingPolicy.values().length];

        static {
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttSubscriptionOption$RetainedHandlingPolicy[MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttSubscriptionOption$RetainedHandlingPolicy[MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/moquette/broker/PostOffice$BatchingPublishesCollector.class */
    public class BatchingPublishesCollector {
        final List<Subscription>[] subscriptions;
        private final int eventLoops;
        private final SessionEventLoopGroup loopGroup;

        BatchingPublishesCollector(SessionEventLoopGroup sessionEventLoopGroup) {
            this.eventLoops = sessionEventLoopGroup.getEventLoopCount();
            this.loopGroup = sessionEventLoopGroup;
            this.subscriptions = new List[this.eventLoops];
        }

        public void add(Subscription subscription) {
            int subscriberEventLoop = subscriberEventLoop(subscription.getClientId());
            if (this.subscriptions[subscriberEventLoop] == null) {
                this.subscriptions[subscriberEventLoop] = new ArrayList();
            }
            this.subscriptions[subscriberEventLoop].add(subscription);
        }

        private int subscriberEventLoop(String str) {
            return this.loopGroup.targetQueueOrdinal(str);
        }

        List<RouteResult> routeBatchedPublishes(Consumer<List<Subscription>> consumer) {
            ArrayList arrayList = new ArrayList(this.eventLoops);
            for (List<Subscription> list : this.subscriptions) {
                if (list != null) {
                    String clientId = list.get(0).getClientId();
                    if (PostOffice.LOG.isTraceEnabled()) {
                        PostOffice.LOG.trace("Routing PUBLISH to eventLoop {}  for subscriptions [{}]", Integer.valueOf(subscriberEventLoop(clientId)), (String) list.stream().map((v0) -> {
                            return v0.toString();
                        }).collect(Collectors.joining(",\n")));
                    }
                    arrayList.add(PostOffice.this.routeCommand(clientId, "batched PUB", () -> {
                        consumer.accept(list);
                        return null;
                    }));
                }
            }
            return arrayList;
        }

        Collection<String> subscriberIdsByEventLoop(String str) {
            return (Collection) this.subscriptions[subscriberEventLoop(str)].stream().map((v0) -> {
                return v0.getClientId();
            }).collect(Collectors.toList());
        }

        public int countBatches() {
            int i = 0;
            for (List<Subscription> list : this.subscriptions) {
                if (list != null) {
                    i++;
                }
            }
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/moquette/broker/PostOffice$ExpirableTopic.class */
    public static class ExpirableTopic implements Expirable {
        private final Topic topic;
        private Instant expireAt;

        public ExpirableTopic(Topic topic, Instant instant) {
            this.topic = topic;
            this.expireAt = instant;
        }

        @Override // io.moquette.broker.scheduler.Expirable
        public Optional<Instant> expireAt() {
            return Optional.of(this.expireAt);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/moquette/broker/PostOffice$FailedPublishCollection.class */
    public static class FailedPublishCollection {
        private final ConcurrentMap<PacketId, Set<String>> packetsMap;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/moquette/broker/PostOffice$FailedPublishCollection$PacketId.class */
        public static class PacketId {
            private final String clientId;
            private final int idPacket;

            PacketId(String str, int i) {
                this.clientId = str;
                this.idPacket = i;
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null || getClass() != obj.getClass()) {
                    return false;
                }
                PacketId packetId = (PacketId) obj;
                return this.idPacket == packetId.idPacket && Objects.equals(this.clientId, packetId.clientId);
            }

            public int hashCode() {
                return Objects.hash(this.clientId, Integer.valueOf(this.idPacket));
            }

            public boolean belongToClient(String str) {
                return this.clientId.equals(str);
            }
        }

        private FailedPublishCollection() {
            this.packetsMap = new ConcurrentHashMap();
        }

        private void insert(String str, int i, String str2) {
            this.packetsMap.computeIfAbsent(new PacketId(str, i), packetId -> {
                return new HashSet();
            }).add(str2);
        }

        public void remove(String str, int i, String str2) {
            this.packetsMap.computeIfPresent(new PacketId(str, i), (packetId, set) -> {
                set.remove(str2);
                if (set.isEmpty()) {
                    return null;
                }
                return set;
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void removeAll(int i, String str, Collection<String> collection) {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                remove(str, i, it.next());
            }
        }

        void cleanupForClient(String str) {
            Stream<PacketId> filter = this.packetsMap.keySet().stream().filter(packetId -> {
                return packetId.belongToClient(str);
            });
            ConcurrentMap<PacketId, Set<String>> concurrentMap = this.packetsMap;
            Objects.requireNonNull(concurrentMap);
            filter.forEach((v1) -> {
                r1.remove(v1);
            });
        }

        void insertAll(int i, String str, Collection<String> collection) {
            Iterator<String> it = collection.iterator();
            while (it.hasNext()) {
                insert(str, i, it.next());
            }
        }

        Set<String> listFailed(String str, int i) {
            return this.packetsMap.getOrDefault(new PacketId(str, i), Collections.emptySet());
        }

        /* synthetic */ FailedPublishCollection(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/moquette/broker/PostOffice$RouteResult.class */
    public static class RouteResult {
        private final String clientId;
        private final Status status;
        private CompletableFuture queuedFuture;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/moquette/broker/PostOffice$RouteResult$Status.class */
        public enum Status {
            SUCCESS,
            FAIL
        }

        public static RouteResult success(String str, CompletableFuture completableFuture) {
            return new RouteResult(str, Status.SUCCESS, completableFuture);
        }

        public static RouteResult failed(String str) {
            return failed(str, null);
        }

        public static RouteResult failed(String str, String str2) {
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.completeExceptionally(new Error(str2));
            return new RouteResult(str, Status.FAIL, completableFuture);
        }

        private RouteResult(String str, Status status, CompletableFuture completableFuture) {
            this.clientId = str;
            this.status = status;
            this.queuedFuture = completableFuture;
        }

        public CompletableFuture completableFuture() {
            if (this.status == Status.FAIL) {
                throw new IllegalArgumentException("Accessing completable future on a failed result");
            }
            return this.queuedFuture;
        }

        public boolean isSuccess() {
            return this.status == Status.SUCCESS;
        }

        public RouteResult ifFailed(Runnable runnable) {
            if (!isSuccess()) {
                runnable.run();
            }
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/moquette/broker/PostOffice$SharedSubscriptionData.class */
    public static final class SharedSubscriptionData {
        final ShareName name;
        final Topic topicFilter;
        final MqttSubscriptionOption option;

        private SharedSubscriptionData(ShareName shareName, Topic topic, MqttSubscriptionOption mqttSubscriptionOption) {
            Objects.requireNonNull(shareName);
            Objects.requireNonNull(topic);
            Objects.requireNonNull(mqttSubscriptionOption);
            this.name = shareName;
            this.topicFilter = topic;
            this.option = mqttSubscriptionOption;
        }

        static SharedSubscriptionData fromMqttSubscription(MqttTopicSubscription mqttTopicSubscription) {
            return new SharedSubscriptionData(new ShareName(SharedSubscriptionUtils.extractShareName(mqttTopicSubscription.topicName())), Topic.asTopic(SharedSubscriptionUtils.extractFilterFromShared(mqttTopicSubscription.topicName())), mqttTopicSubscription.option());
        }
    }

    PostOffice(ISubscriptionsDirectory iSubscriptionsDirectory, IRetainedRepository iRetainedRepository, SessionRegistry sessionRegistry, ISessionsRepository iSessionsRepository, BrokerInterceptor brokerInterceptor, Authorizator authorizator, SessionEventLoopGroup sessionEventLoopGroup) {
        this(iSubscriptionsDirectory, iRetainedRepository, sessionRegistry, iSessionsRepository, brokerInterceptor, authorizator, sessionEventLoopGroup, Clock.systemDefaultZone());
    }

    PostOffice(ISubscriptionsDirectory iSubscriptionsDirectory, IRetainedRepository iRetainedRepository, SessionRegistry sessionRegistry, ISessionsRepository iSessionsRepository, BrokerInterceptor brokerInterceptor, Authorizator authorizator, SessionEventLoopGroup sessionEventLoopGroup, Clock clock) {
        this(iSubscriptionsDirectory, iRetainedRepository, sessionRegistry, iSessionsRepository, brokerInterceptor, authorizator, sessionEventLoopGroup, clock, MqttQoS.EXACTLY_ONCE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PostOffice(ISubscriptionsDirectory iSubscriptionsDirectory, IRetainedRepository iRetainedRepository, SessionRegistry sessionRegistry, ISessionsRepository iSessionsRepository, BrokerInterceptor brokerInterceptor, Authorizator authorizator, SessionEventLoopGroup sessionEventLoopGroup, Clock clock, MqttQoS mqttQoS) {
        this.failedPublishes = new FailedPublishCollection(null);
        this.authorizator = authorizator;
        this.subscriptions = iSubscriptionsDirectory;
        this.retainedRepository = iRetainedRepository;
        this.sessionRepository = iSessionsRepository;
        this.sessionRegistry = sessionRegistry;
        this.interceptor = brokerInterceptor;
        this.sessionLoops = sessionEventLoopGroup;
        this.clock = clock;
        this.maxServerGrantedQos = mqttQoS;
        this.willExpirationService = new ScheduledExpirationService<>(clock, this::publishWill);
        recreateWillExpires(iSessionsRepository);
        this.retainedMessagesExpirationService = new ScheduledExpirationService<>(clock, this::cleanRetainedExpired);
        recreateRetainedExpires(iRetainedRepository);
    }

    private void cleanRetainedExpired(ExpirableTopic expirableTopic) {
        this.retainedRepository.cleanRetained(expirableTopic.topic);
    }

    private void recreateRetainedExpires(IRetainedRepository iRetainedRepository) {
        iRetainedRepository.listExpirable().forEach(this::trackRetainedForExpiry);
    }

    private void trackRetainedForExpiry(RetainedMessage retainedMessage) {
        this.retainedMessagesExpirationService.track(retainedMessage.getTopic().toString(), new ExpirableTopic(retainedMessage.getTopic(), retainedMessage.getExpiryTime()));
    }

    private void recreateWillExpires(ISessionsRepository iSessionsRepository) {
        ScheduledExpirationService<ISessionsRepository.Will> scheduledExpirationService = this.willExpirationService;
        Objects.requireNonNull(scheduledExpirationService);
        iSessionsRepository.listSessionsWill((v1, v2) -> {
            r1.track(v1, v2);
        });
    }

    public void fireWill(Session session) {
        ISessionsRepository.Will will = session.getWill();
        String clientID = session.getClientID();
        if (will.delayInterval == 0) {
            publishWill(will);
        } else {
            trackWillSpecificationForFutureFire(session, will, clientID, Math.min(session.getSessionData().expiryInterval(), will.delayInterval));
            LOG.debug("Scheduled will message for client {} on topic {}", clientID, will.topic);
        }
    }

    private void trackWillSpecificationForFutureFire(Session session, ISessionsRepository.Will will, String str, int i) {
        ISessionsRepository.Will withExpirationComputed = will.withExpirationComputed(i, this.clock);
        this.sessionRepository.saveWill(session.getClientID(), withExpirationComputed);
        this.willExpirationService.track(str, withExpirationComputed);
    }

    private void publishWill(ISessionsRepository.Will will) {
        publish2Subscribers(WILL_PUBLISHER, willMessageExpiry(will), MqttMessageBuilders.publish().topicName(will.topic).retained(will.retained).qos(will.qos).payload(Unpooled.copiedBuffer(will.payload)).build());
    }

    private static Instant willMessageExpiry(ISessionsRepository.Will will) {
        Optional<Duration> messageExpiry = will.properties.messageExpiry();
        return messageExpiry.isPresent() ? Instant.now().plus((TemporalAmount) messageExpiry.get()) : Instant.MAX;
    }

    public void wipeExistingScheduledWill(String str) {
        if (this.willExpirationService.untrack(str)) {
            LOG.debug("Wiped task to delayed publish for old client {}", str);
        }
        this.sessionRepository.deleteWill(str);
    }

    public void subscribeClientToTopics(MqttSubscribeMessage mqttSubscribeMessage, String str, String str2, MQTTConnection mQTTConnection) {
        List<SharedSubscriptionData> emptyList;
        Optional<SubscriptionIdentifier> empty;
        int messageId = Utils.messageId(mqttSubscribeMessage);
        Session retrieve = this.sessionRegistry.retrieve(str);
        if (mQTTConnection.isProtocolVersion5()) {
            emptyList = (List) mqttSubscribeMessage.payload().topicSubscriptions().stream().filter(mqttTopicSubscription -> {
                return SharedSubscriptionUtils.isSharedSubscription(mqttTopicSubscription.topicName());
            }).map(SharedSubscriptionData::fromMqttSubscription).collect(Collectors.toList());
            Optional findFirst = emptyList.stream().filter(sharedSubscriptionData -> {
                return !SharedSubscriptionUtils.validateShareName(sharedSubscriptionData.name.toString());
            }).findFirst();
            if (findFirst.isPresent()) {
                LOG.info("{} used an invalid shared subscription name {}, disconnecting", str, ((SharedSubscriptionData) findFirst.get()).name);
                retrieve.disconnectFromBroker();
                return;
            } else {
                try {
                    empty = verifyAndExtractMessageIdentifier(mqttSubscribeMessage);
                } catch (IllegalArgumentException e) {
                    retrieve.disconnectFromBroker();
                    return;
                }
            }
        } else {
            emptyList = Collections.emptyList();
            empty = Optional.empty();
        }
        List<MqttTopicSubscription> updateWithMaximumSupportedQoS = updateWithMaximumSupportedQoS(mQTTConnection.isProtocolVersion5() ? this.authorizator.verifyAlsoSharedTopicsReadAccess(str, str2, mqttSubscribeMessage) : this.authorizator.verifyTopicsReadAccess(str, str2, mqttSubscribeMessage));
        MqttSubAckMessage doAckMessageFromValidateFilters = doAckMessageFromValidateFilters(updateWithMaximumSupportedQoS, messageId);
        Optional<SubscriptionIdentifier> optional = empty;
        List<Subscription> list = (List) updateWithMaximumSupportedQoS.stream().filter(mqttTopicSubscription2 -> {
            return mqttTopicSubscription2.qualityOfService() != MqttQoS.FAILURE;
        }).filter(mqttTopicSubscription3 -> {
            return !SharedSubscriptionUtils.isSharedSubscription(mqttTopicSubscription3.topicName());
        }).map(mqttTopicSubscription4 -> {
            Topic topic = new Topic(mqttTopicSubscription4.topicName());
            MqttSubscriptionOption option = mqttTopicSubscription4.option();
            return optional.isPresent() ? new Subscription(str, topic, option, (SubscriptionIdentifier) optional.get()) : new Subscription(str, topic, option);
        }).collect(Collectors.toList());
        Set set = (Set) list.stream().map(this::addSubscriptionReportingNewStatus).filter(PostOffice::needToReceiveRetained).map(couple -> {
            return (Subscription) couple.v2;
        }).collect(Collectors.toSet());
        for (SharedSubscriptionData sharedSubscriptionData2 : emptyList) {
            if (empty.isPresent()) {
                this.subscriptions.addShared(str, sharedSubscriptionData2.name, sharedSubscriptionData2.topicFilter, sharedSubscriptionData2.option, empty.get());
            } else {
                this.subscriptions.addShared(str, sharedSubscriptionData2.name, sharedSubscriptionData2.topicFilter, sharedSubscriptionData2.option);
            }
        }
        retrieve.addSubscriptions(list);
        mQTTConnection.sendSubAckMessage(messageId, doAckMessageFromValidateFilters);
        publishRetainedMessagesForSubscriptions(str, set);
        Iterator<Subscription> it = list.iterator();
        while (it.hasNext()) {
            this.interceptor.notifyTopicSubscribed(it.next(), str2);
        }
    }

    private static boolean needToReceiveRetained(Utils.Couple<Boolean, Subscription> couple) {
        switch (AnonymousClass1.$SwitchMap$io$netty$handler$codec$mqtt$MqttSubscriptionOption$RetainedHandlingPolicy[couple.v2.option().retainHandling().ordinal()]) {
            case H2SessionsRepository.WILL_PRESENT /* 1 */:
                return true;
            case 2:
                return couple.v1.booleanValue();
            default:
                return false;
        }
    }

    private Utils.Couple<Boolean, Subscription> addSubscriptionReportingNewStatus(Subscription subscription) {
        boolean add;
        if (subscription.hasSubscriptionIdentifier()) {
            add = this.subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.option(), subscription.getSubscriptionIdentifier());
        } else {
            add = this.subscriptions.add(subscription.getClientId(), subscription.getTopicFilter(), subscription.option());
        }
        return new Utils.Couple<>(Boolean.valueOf(add), subscription);
    }

    private List<MqttTopicSubscription> updateWithMaximumSupportedQoS(List<MqttTopicSubscription> list) {
        return (List) list.stream().map(this::updateWithMaximumSupportedQoS).collect(Collectors.toList());
    }

    private MqttTopicSubscription updateWithMaximumSupportedQoS(MqttTopicSubscription mqttTopicSubscription) {
        return new MqttTopicSubscription(mqttTopicSubscription.topicName(), optionWithQos(minQos(mqttTopicSubscription.qualityOfService(), this.maxServerGrantedQos), mqttTopicSubscription.option()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static MqttSubscriptionOption optionWithQos(MqttQoS mqttQoS, MqttSubscriptionOption mqttSubscriptionOption) {
        return new MqttSubscriptionOption(mqttQoS, mqttSubscriptionOption.isNoLocal(), mqttSubscriptionOption.isRetainAsPublished(), mqttSubscriptionOption.retainHandling());
    }

    private static MqttQoS minQos(MqttQoS mqttQoS, MqttQoS mqttQoS2) {
        return (mqttQoS == MqttQoS.FAILURE || mqttQoS2 == MqttQoS.FAILURE) ? MqttQoS.FAILURE : mqttQoS.value() < mqttQoS2.value() ? mqttQoS : mqttQoS2;
    }

    private static Optional<SubscriptionIdentifier> verifyAndExtractMessageIdentifier(MqttSubscribeMessage mqttSubscribeMessage) {
        List properties = mqttSubscribeMessage.idAndPropertiesVariableHeader().properties().getProperties(MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value());
        if (properties.size() > 1) {
            LOG.warn("Received a Subscribe with more than one subscription identifier property ({})", Integer.valueOf(properties.size()));
            throw new IllegalArgumentException("More than one subscription identifier properties");
        }
        if (properties.isEmpty()) {
            return Optional.empty();
        }
        Integer num = (Integer) ((MqttProperties.MqttProperty) properties.iterator().next()).value();
        try {
            return Optional.of(new SubscriptionIdentifier(num.intValue()));
        } catch (IllegalArgumentException e) {
            LOG.warn("Received a Subscribe with SubscriptionIdentifier value {} out of range 1..268435455", num);
            throw e;
        }
    }

    private void publishRetainedMessagesForSubscriptions(String str, Collection<Subscription> collection) {
        Session retrieve = this.sessionRegistry.retrieve(str);
        for (Subscription subscription : collection) {
            String topic = subscription.getTopicFilter().toString();
            Collection<RetainedMessage> retainedOnTopic = this.retainedRepository.retainedOnTopic(topic);
            if (retainedOnTopic.isEmpty()) {
                LOG.debug("No retained messages matching topic filter {}", topic);
            } else {
                for (RetainedMessage retainedMessage : retainedOnTopic) {
                    MqttProperties.MqttProperty[] prepareSubscriptionProperties = prepareSubscriptionProperties(subscription, Arrays.asList(retainedMessage.getMqttProperties()));
                    MqttQoS lowerQosToTheSubscriptionDesired = lowerQosToTheSubscriptionDesired(subscription, retainedMessage.qosLevel());
                    ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(retainedMessage.getPayload());
                    retrieve.sendRetainedPublishOnSessionAtQos(retainedMessage.getTopic(), lowerQosToTheSubscriptionDesired, wrappedBuffer, appendMessageExpiry(prepareSubscriptionProperties, retainedMessage));
                    wrappedBuffer.release();
                }
            }
        }
    }

    private MqttProperties.MqttProperty[] appendMessageExpiry(MqttProperties.MqttProperty[] mqttPropertyArr, RetainedMessage retainedMessage) {
        if (retainedMessage.getExpiryTime() == null) {
            return mqttPropertyArr;
        }
        MqttProperties.MqttProperty integerProperty = new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value(), Integer.valueOf(((int) Duration.between(retainedMessage.getExpiryTime(), Instant.now()).toMillis()) / 1000));
        MqttProperties.MqttProperty[] mqttPropertyArr2 = new MqttProperties.MqttProperty[mqttPropertyArr.length + 1];
        System.arraycopy(mqttPropertyArr, 0, mqttPropertyArr2, 0, mqttPropertyArr.length);
        mqttPropertyArr2[mqttPropertyArr.length] = integerProperty;
        return mqttPropertyArr2;
    }

    private MqttSubAckMessage doAckMessageFromValidateFilters(List<MqttTopicSubscription> list, int i) {
        ArrayList arrayList = new ArrayList();
        Iterator<MqttTopicSubscription> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(Integer.valueOf(it.next().qualityOfService().value()));
        }
        return new MqttSubAckMessage(new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i), new MqttSubAckPayload(arrayList));
    }

    public void unsubscribe(List<String> list, MQTTConnection mQTTConnection, int i) {
        String clientId = mQTTConnection.getClientId();
        Session retrieve = this.sessionRegistry.retrieve(clientId);
        if (retrieve == null) {
            LOG.warn("Session not found when unsubscribing {}", clientId);
            mQTTConnection.sendUnsubAckMessage(list, clientId, i);
            return;
        }
        for (String str : list) {
            Topic topic = new Topic(str);
            if (!topic.isValid()) {
                mQTTConnection.dropConnection();
                LOG.warn("Topic filter is not valid. topics: {}, offending topic filter: {}", list, topic);
                return;
            }
            LOG.trace("Removing subscription topic={}", topic);
            if (SharedSubscriptionUtils.isSharedSubscription(str)) {
                String extractFilterFromShared = SharedSubscriptionUtils.extractFilterFromShared(str);
                this.subscriptions.removeSharedSubscription(new ShareName(SharedSubscriptionUtils.extractShareName(str)), Topic.asTopic(extractFilterFromShared), clientId);
            } else {
                this.subscriptions.removeSubscription(topic, clientId);
            }
            retrieve.removeSubscription(topic);
            this.interceptor.notifyTopicUnsubscribed(topic.toString(), clientId, NettyUtils.userName(mQTTConnection.channel));
        }
        mQTTConnection.sendUnsubAckMessage(list, clientId, i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletableFuture<Void> receivedPublishQos0(MQTTConnection mQTTConnection, String str, String str2, MqttPublishMessage mqttPublishMessage, Instant instant) {
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        if (!this.authorizator.canWrite(topic, str, str2)) {
            LOG.error("client is not authorized to publish on topic: {}", topic);
            ReferenceCountUtil.release(mqttPublishMessage);
            return CompletableFuture.completedFuture(null);
        }
        if (!isPayloadFormatToValidate(mqttPublishMessage) || validatePayloadAsUTF8(mqttPublishMessage)) {
            RoutingResults publish2Subscribers = publish2Subscribers(str2, instant, mqttPublishMessage);
            if (!publish2Subscribers.isAllFailed()) {
                return publish2Subscribers.completableFuture().thenRun(() -> {
                    if (mqttPublishMessage.fixedHeader().isRetain()) {
                        this.retainedRepository.cleanRetained(topic);
                    }
                    this.interceptor.notifyTopicPublished(mqttPublishMessage, str2, str);
                    ReferenceCountUtil.release(mqttPublishMessage);
                });
            }
            LOG.info("No one publish was successfully enqueued to session loops");
            ReferenceCountUtil.release(mqttPublishMessage);
            return CompletableFuture.completedFuture(null);
        }
        LOG.warn("Received not valid UTF-8 payload when payload format indicator was enabled (QoS0)");
        ReferenceCountUtil.release(mqttPublishMessage);
        mQTTConnection.brokerDisconnect(MqttReasonCodes.Disconnect.PAYLOAD_FORMAT_INVALID);
        mQTTConnection.disconnectSession();
        mQTTConnection.dropConnection();
        return CompletableFuture.completedFuture(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RoutingResults receivedPublishQos1(MQTTConnection mQTTConnection, String str, int i, MqttPublishMessage mqttPublishMessage, Instant instant) {
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        topic.getTokens();
        if (!topic.isValid()) {
            LOG.warn("Invalid topic format, force close the connection");
            mQTTConnection.dropConnection();
            ReferenceCountUtil.release(mqttPublishMessage);
            return RoutingResults.preroutingError();
        }
        String clientId = mQTTConnection.getClientId();
        if (!this.authorizator.canWrite(topic, str, clientId)) {
            LOG.error("MQTT client: {} is not authorized to publish on topic: {}", clientId, topic);
            ReferenceCountUtil.release(mqttPublishMessage);
            return RoutingResults.preroutingError();
        }
        if (isPayloadFormatToValidate(mqttPublishMessage) && !validatePayloadAsUTF8(mqttPublishMessage)) {
            LOG.warn("Received not valid UTF-8 payload when payload format indicator was enabled (QoS1)");
            mQTTConnection.sendPubAck(i, MqttReasonCodes.PubAck.PAYLOAD_FORMAT_INVALID);
            ReferenceCountUtil.release(mqttPublishMessage);
            return RoutingResults.preroutingError();
        }
        if (isContentTypeToValidate(mqttPublishMessage) && !validateContentTypeAsUTF8(mqttPublishMessage)) {
            LOG.warn("Received not valid UTF-8 content type (QoS1)");
            ReferenceCountUtil.release(mqttPublishMessage);
            mQTTConnection.brokerDisconnect(MqttReasonCodes.Disconnect.PROTOCOL_ERROR);
            mQTTConnection.disconnectSession();
            mQTTConnection.dropConnection();
            return RoutingResults.preroutingError();
        }
        RoutingResults publish2Subscribers = mqttPublishMessage.fixedHeader().isDup() ? publish2Subscribers(clientId, this.failedPublishes.listFailed(clientId, i), instant, mqttPublishMessage) : publish2Subscribers(clientId, instant, mqttPublishMessage);
        if (LOG.isTraceEnabled()) {
            LOG.trace("subscriber routes: {}", publish2Subscribers);
        }
        if (publish2Subscribers.isAllSuccess()) {
            mQTTConnection.sendPubAck(i);
            manageRetain(topic, mqttPublishMessage);
            this.interceptor.notifyTopicPublished(mqttPublishMessage, clientId, str);
        } else {
            this.failedPublishes.insertAll(i, clientId, publish2Subscribers.failedRoutings);
        }
        ReferenceCountUtil.release(mqttPublishMessage);
        this.failedPublishes.removeAll(i, clientId, publish2Subscribers.successedRoutings);
        return publish2Subscribers;
    }

    private static boolean validatePayloadAsUTF8(MqttPublishMessage mqttPublishMessage) {
        boolean z = true;
        try {
            StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap(Utils.readBytesAndRewind(mqttPublishMessage.payload())));
        } catch (CharacterCodingException e) {
            z = false;
        }
        return z;
    }

    private void manageRetain(Topic topic, MqttPublishMessage mqttPublishMessage) {
        if (isRetained(mqttPublishMessage)) {
            if (!mqttPublishMessage.payload().isReadable()) {
                this.retainedRepository.cleanRetained(topic);
                this.retainedMessagesExpirationService.untrack(topic.toString());
                return;
            }
            if (!hasProperty(mqttPublishMessage.variableHeader().properties(), MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL)) {
                this.retainedRepository.retain(topic, mqttPublishMessage);
                return;
            }
            Instant plus = Instant.now().plus((TemporalAmount) Duration.ofSeconds(getIntProperty(r0, MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL)));
            this.retainedRepository.retain(topic, mqttPublishMessage, plus);
            this.retainedMessagesExpirationService.track(topic.toString(), new ExpirableTopic(topic, plus));
        }
    }

    private static boolean hasProperty(MqttProperties mqttProperties, MqttProperties.MqttPropertyType mqttPropertyType) {
        return mqttProperties.getProperty(mqttPropertyType.value()) != null;
    }

    private static int getIntProperty(MqttProperties mqttProperties, MqttProperties.MqttPropertyType mqttPropertyType) {
        return ((Integer) mqttProperties.getProperty(mqttPropertyType.value()).value()).intValue();
    }

    private RoutingResults publish2Subscribers(String str, Instant instant, MqttPublishMessage mqttPublishMessage) {
        return publish2Subscribers(str, NO_FILTER, instant, mqttPublishMessage);
    }

    private RoutingResults publish2Subscribers(String str, Set<String> set, Instant instant, MqttPublishMessage mqttPublishMessage) {
        boolean isRetain = mqttPublishMessage.fixedHeader().isRetain();
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
        List<Subscription> matchQosSharpening = this.subscriptions.matchQosSharpening(topic);
        if (matchQosSharpening.isEmpty()) {
            LOG.trace("No matching subscriptions for topic: {}", topic);
            return new RoutingResults(Collections.emptyList(), Collections.emptyList(), CompletableFuture.completedFuture(null));
        }
        BatchingPublishesCollector batchingPublishesCollector = new BatchingPublishesCollector(this.sessionLoops);
        for (Subscription subscription : matchQosSharpening) {
            if (set == NO_FILTER || set.contains(subscription.getClientId())) {
                if (!subscription.option().isNoLocal()) {
                    batchingPublishesCollector.add(subscription);
                } else if (!str.equals(subscription.getClientId())) {
                    batchingPublishesCollector.add(subscription);
                }
            }
        }
        int countBatches = batchingPublishesCollector.countBatches();
        if (countBatches <= 0) {
            LOG.trace("No matching subscriptions for topic: {}", topic);
            return new RoutingResults(Collections.emptyList(), Collections.emptyList(), CompletableFuture.completedFuture(null));
        }
        mqttPublishMessage.retain(countBatches);
        List<RouteResult> routeBatchedPublishes = batchingPublishesCollector.routeBatchedPublishes(list -> {
            publishToSession(topic, list, qosLevel, isRetain, instant, mqttPublishMessage);
            mqttPublishMessage.release();
        });
        CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) routeBatchedPublishes.stream().filter((v0) -> {
            return v0.isSuccess();
        }).map((v0) -> {
            return v0.completableFuture();
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (RouteResult routeResult : routeBatchedPublishes) {
            Collection<String> subscriberIdsByEventLoop = batchingPublishesCollector.subscriberIdsByEventLoop(routeResult.clientId);
            if (routeResult.status == RouteResult.Status.FAIL) {
                arrayList.addAll(subscriberIdsByEventLoop);
                mqttPublishMessage.release();
            } else {
                arrayList2.addAll(subscriberIdsByEventLoop);
            }
        }
        return new RoutingResults(arrayList2, arrayList, allOf);
    }

    private void publishToSession(Topic topic, Collection<Subscription> collection, MqttQoS mqttQoS, boolean z, Instant instant, MqttPublishMessage mqttPublishMessage) {
        ByteBuf duplicate = mqttPublishMessage.payload().duplicate();
        for (Subscription subscription : collection) {
            MqttQoS lowerQosToTheSubscriptionDesired = lowerQosToTheSubscriptionDesired(subscription, mqttQoS);
            boolean z2 = false;
            if (subscription.option().isRetainAsPublished()) {
                z2 = z;
            }
            publishToSession(duplicate, topic, subscription, lowerQosToTheSubscriptionDesired, z2, instant, mqttPublishMessage);
        }
    }

    private void publishToSession(ByteBuf byteBuf, Topic topic, Subscription subscription, MqttQoS mqttQoS, boolean z, Instant instant, MqttPublishMessage mqttPublishMessage) {
        Session retrieve = this.sessionRegistry.retrieve(subscription.getClientId());
        if (!(retrieve != null)) {
            LOG.debug("PUBLISH to not yet present session. CId: {}, topicFilter: {}, qos: {}", new Object[]{subscription.getClientId(), subscription.getTopicFilter(), mqttQoS});
        } else {
            LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}", new Object[]{subscription.getClientId(), subscription.getTopicFilter(), mqttQoS});
            retrieve.sendPublishOnSessionAtQos(new SessionRegistry.PublishedMessage(topic, mqttQoS, byteBuf, z, instant, prepareSubscriptionProperties(subscription, mqttPublishMessage.variableHeader().properties().listAll())));
        }
    }

    private MqttProperties.MqttProperty[] prepareSubscriptionProperties(Subscription subscription, Collection<? extends MqttProperties.MqttProperty> collection) {
        ArrayList arrayList = new ArrayList(collection.size() + 1);
        for (MqttProperties.MqttProperty mqttProperty : collection) {
            if (mqttProperty.propertyId() != MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value()) {
                arrayList.add(mqttProperty);
            }
        }
        if (subscription.hasSubscriptionIdentifier()) {
            arrayList.add(createSubscriptionIdProperty(subscription));
        }
        return (MqttProperties.MqttProperty[]) arrayList.toArray(new MqttProperties.MqttProperty[0]);
    }

    private MqttProperties.IntegerProperty createSubscriptionIdProperty(Subscription subscription) {
        return new MqttProperties.IntegerProperty(MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value(), Integer.valueOf(subscription.getSubscriptionIdentifier().value()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RoutingResults receivedPublishQos2(MQTTConnection mQTTConnection, MqttPublishMessage mqttPublishMessage, String str, Instant instant) {
        LOG.trace("Processing PUB QoS2 message on connection: {}", mQTTConnection);
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        String clientId = mQTTConnection.getClientId();
        if (!this.authorizator.canWrite(topic, str, clientId)) {
            LOG.error("MQTT client is not authorized to publish on topic: {}", topic);
            ReferenceCountUtil.release(mqttPublishMessage);
            return RoutingResults.preroutingError();
        }
        int packetId = mqttPublishMessage.variableHeader().packetId();
        if (isPayloadFormatToValidate(mqttPublishMessage) && !validatePayloadAsUTF8(mqttPublishMessage)) {
            LOG.warn("Received not valid UTF-8 payload when payload format indicator was enabled (QoS2)");
            mQTTConnection.sendPubRec(packetId, MqttReasonCodes.PubRec.PAYLOAD_FORMAT_INVALID);
            ReferenceCountUtil.release(mqttPublishMessage);
            return RoutingResults.preroutingError();
        }
        RoutingResults publish2Subscribers = mqttPublishMessage.fixedHeader().isDup() ? publish2Subscribers(clientId, this.failedPublishes.listFailed(clientId, packetId), instant, mqttPublishMessage) : publish2Subscribers(clientId, instant, mqttPublishMessage);
        if (publish2Subscribers.isAllSuccess()) {
            mQTTConnection.sendPubRec(packetId);
            manageRetain(topic, mqttPublishMessage);
            this.interceptor.notifyTopicPublished(mqttPublishMessage, clientId, str);
        } else {
            this.failedPublishes.insertAll(packetId, clientId, publish2Subscribers.failedRoutings);
        }
        ReferenceCountUtil.release(mqttPublishMessage);
        this.failedPublishes.removeAll(packetId, clientId, publish2Subscribers.successedRoutings);
        return publish2Subscribers;
    }

    static MqttQoS lowerQosToTheSubscriptionDesired(Subscription subscription, MqttQoS mqttQoS) {
        if (mqttQoS.value() > subscription.option().qos().value()) {
            mqttQoS = subscription.option().qos();
        }
        return mqttQoS;
    }

    public RoutingResults internalPublish(MqttPublishMessage mqttPublishMessage) {
        MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
        Topic topic = new Topic(mqttPublishMessage.variableHeader().topicName());
        ByteBuf payload = mqttPublishMessage.payload();
        LOG.info("Sending internal PUBLISH message Topic={}, qos={}", topic, qosLevel);
        RoutingResults publish2Subscribers = publish2Subscribers(INTERNAL_PUBLISHER, Instant.MAX, mqttPublishMessage);
        LOG.trace("after routed publishes: {}", publish2Subscribers);
        if (!isRetained(mqttPublishMessage)) {
            return publish2Subscribers;
        }
        if (qosLevel == MqttQoS.AT_MOST_ONCE || payload.readableBytes() == 0) {
            this.retainedRepository.cleanRetained(topic);
            return publish2Subscribers;
        }
        this.retainedRepository.retain(topic, mqttPublishMessage);
        return publish2Subscribers;
    }

    private static boolean isRetained(MqttPublishMessage mqttPublishMessage) {
        return mqttPublishMessage.fixedHeader().isRetain();
    }

    private static boolean isPayloadFormatToValidate(MqttPublishMessage mqttPublishMessage) {
        MqttProperties.IntegerProperty property = mqttPublishMessage.variableHeader().properties().getProperty(MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR.value());
        return property != null && (property instanceof MqttProperties.IntegerProperty) && ((Integer) property.value()).intValue() == 1;
    }

    private static boolean isContentTypeToValidate(MqttPublishMessage mqttPublishMessage) {
        return mqttPublishMessage.variableHeader().properties().getProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value()) != null;
    }

    private static boolean validateContentTypeAsUTF8(MqttPublishMessage mqttPublishMessage) {
        boolean z = true;
        try {
            StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap(((String) mqttPublishMessage.variableHeader().properties().getProperty(MqttProperties.MqttPropertyType.CONTENT_TYPE.value()).value()).getBytes()));
        } catch (CharacterCodingException e) {
            z = false;
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispatchConnection(MqttConnectMessage mqttConnectMessage) {
        this.interceptor.notifyClientConnected(mqttConnectMessage);
    }

    void dispatchDisconnection(String str, String str2) {
        this.interceptor.notifyClientDisconnected(str, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispatchConnectionLost(String str, String str2) {
        this.interceptor.notifyClientConnectionLost(str, str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String sessionLoopThreadName(String str) {
        return this.sessionLoops.sessionLoopThreadName(str);
    }

    public RouteResult routeCommand(String str, String str2, Callable<Void> callable) {
        return this.sessionLoops.routeCommand(str, str2, callable);
    }

    public void terminate() {
        this.willExpirationService.shutdown();
        this.retainedMessagesExpirationService.shutdown();
        this.sessionLoops.terminate();
    }

    public void clientDisconnected(String str, String str2) {
        dispatchDisconnection(str, str2);
        this.failedPublishes.cleanupForClient(str);
    }
}
