4<  io/moquette/broker/PostOfficeLOGLorg/slf4j/Logger;   java/lang/Object()V5io/moquette/broker/PostOffice$FailedPublishCollection  $(Lio/moquette/broker/PostOffice$1;)V  failedPublishes7Lio/moquette/broker/PostOffice$FailedPublishCollection;   authorizator!Lio/moquette/broker/Authorizator;   subscriptions:Lio/moquette/broker/subscriptions/ISubscriptionsDirectory;  !retainedRepository(Lio/moquette/broker/IRetainedRepository; # $%sessionRegistry$Lio/moquette/broker/SessionRegistry; ' () interceptor,Lio/moquette/interception/BrokerInterceptor; + ,- sessionLoops*Lio/moquette/broker/SessionEventLoopGroup; /01 23io/moquette/broker/Session$WillpayloadLio/netty/buffer/ByteBuf;5&io/moquette/broker/subscriptions/Topic /7 89topicLjava/lang/String; 4; <(Ljava/lang/String;)V /> ?@qos%Lio/netty/handler/codec/mqtt/MqttQoS; B CDpublish2Subscribers(Lio/netty/buffer/ByteBuf;Lio/moquette/broker/subscriptions/Topic;Lio/netty/handler/codec/mqtt/MqttQoS;)Lio/moquette/broker/RoutingResults; FGH IJio/moquette/broker/Utils messageId,(Lio/netty/handler/codec/mqtt/MqttMessage;)I LMN OPio/moquette/broker/AuthorizatorverifyTopicsReadAccessh(Ljava/lang/String;Ljava/lang/String;Lio/netty/handler/codec/mqtt/MqttSubscribeMessage;)Ljava/util/List; R STdoAckMessageFromValidateFiltersB(Ljava/util/List;I)Lio/netty/handler/codec/mqtt/MqttSubAckMessage; VWX YZjava/util/Liststream()Ljava/util/stream/Stream;\ ]^test ()Ljava/util/function/Predicate; `ab cdjava/util/stream/Streamfilter9(Ljava/util/function/Predicate;)Ljava/util/stream/Stream;f ghapply1(Ljava/lang/String;)Ljava/util/function/Function; `j klmap8(Ljava/util/function/Function;)Ljava/util/stream/Stream; nop qrjava/util/stream/CollectorstoList()Ljava/util/stream/Collector; `t uvcollect0(Ljava/util/stream/Collector;)Ljava/lang/Object; Vx yziterator()Ljava/util/Iterator; |}~ java/util/IteratorhasNext()Z | next()Ljava/lang/Object;-io/moquette/broker/subscriptions/Subscription  8io/moquette/broker/subscriptions/ISubscriptionsDirectoryadd2(Lio/moquette/broker/subscriptions/Subscription;)V  "io/moquette/broker/SessionRegistryretrieve0(Ljava/lang/String;)Lio/moquette/broker/Session;  io/moquette/broker/SessionaddSubscriptions(Ljava/util/List;)V  !io/moquette/broker/MQTTConnectionsendSubAckMessage3(ILio/netty/handler/codec/mqtt/MqttSubAckMessage;)V  'publishRetainedMessagesForSubscriptions%(Ljava/lang/String;Ljava/util/List;)V  *io/moquette/interception/BrokerInterceptornotifyTopicSubscribedD(Lio/moquette/broker/subscriptions/Subscription;Ljava/lang/String;)V getTopicFilter*()Lio/moquette/broker/subscriptions/Topic; 4 toString()Ljava/lang/String;  &io/moquette/broker/IRetainedRepositoryretainedOnTopic$(Ljava/lang/String;)Ljava/util/List; V isEmpty"io/moquette/broker/RetainedMessage qosLevel'()Lio/netty/handler/codec/mqtt/MqttQoS;   lowerQosToTheSubscriptionDesired{(Lio/moquette/broker/subscriptions/Subscription;Lio/netty/handler/codec/mqtt/MqttQoS;)Lio/netty/handler/codec/mqtt/MqttQoS;  getPayload()[B  io/netty/buffer/Unpooled wrappedBuffer([B)Lio/netty/buffer/ByteBuf; getTopic !sendRetainedPublishOnSessionAtQosi(Lio/moquette/broker/subscriptions/Topic;Lio/netty/handler/codec/mqtt/MqttQoS;Lio/netty/buffer/ByteBuf;)V  io/netty/buffer/ByteBufreleasejava/util/ArrayList 1io/netty/handler/codec/mqtt/MqttTopicSubscription qualityOfService  #io/netty/handler/codec/mqtt/MqttQoSvalue()I  java/lang/IntegervalueOf(I)Ljava/lang/Integer; V (Ljava/lang/Object;)Z+io/netty/handler/codec/mqtt/MqttFixedHeader  +io/netty/handler/codec/mqtt/MqttMessageTypeSUBACK-Lio/netty/handler/codec/mqtt/MqttMessageType; @ AT_MOST_ONCE X(Lio/netty/handler/codec/mqtt/MqttMessageType;ZLio/netty/handler/codec/mqtt/MqttQoS;ZI)V-io/netty/handler/codec/mqtt/MqttSubAckPayload  (Ljava/lang/Iterable;)V-io/netty/handler/codec/mqtt/MqttSubAckMessage    7io/netty/handler/codec/mqtt/MqttMessageIdVariableHeaderfrom<(I)Lio/netty/handler/codec/mqtt/MqttMessageIdVariableHeader;  (Lio/netty/handler/codec/mqtt/MqttFixedHeader;Lio/netty/handler/codec/mqtt/MqttMessageIdVariableHeader;Lio/netty/handler/codec/mqtt/MqttSubAckPayload;)V   getClientId'Session not found when unsubscribing {}  org/slf4j/Loggerwarn'(Ljava/lang/String;Ljava/lang/Object;)V  sendUnsubAckMessage&(Ljava/util/List;Ljava/lang/String;I)Vjava/lang/String 4! "isValid $ % dropConnection'ATopic filter is not valid. topics: {}, offending topic filter: {} ) *9(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V,Removing subscription topic={} . /trace 1 23removeSubscription=(Lio/moquette/broker/subscriptions/Topic;Ljava/lang/String;)V 5 26+(Lio/moquette/broker/subscriptions/Topic;)V 8 9:channelLio/netty/channel/Channel; <=> ?@io/moquette/broker/NettyUtilsuserName.(Lio/netty/channel/Channel;)Ljava/lang/String; B CDnotifyTopicUnsubscribed9(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)V LF GHcanWriteO(Lio/moquette/broker/subscriptions/Topic;Ljava/lang/String;Ljava/lang/String;)ZJ0client is not authorized to publish on topic: {} L Merror OPQ  io/netty/util/ReferenceCountUtil STU VW&java/util/concurrent/CompletableFuturecompletedFuture<(Ljava/lang/Object;)Ljava/util/concurrent/CompletableFuture; YZ[ 2\.io/netty/handler/codec/mqtt/MqttPublishMessage()Lio/netty/buffer/ByteBuf; ^_` a!io/moquette/broker/RoutingResults isAllFailedc9No one publish was successfully enqueued to session loops e f<info ^h ijcompletableFuture*()Ljava/util/concurrent/CompletableFuture;l mnrun(Lio/moquette/broker/PostOffice;Lio/netty/handler/codec/mqtt/MqttPublishMessage;Lio/moquette/broker/subscriptions/Topic;Ljava/lang/String;Ljava/lang/String;)Ljava/lang/Runnable; Sp qrthenRun>(Ljava/lang/Runnable;)Ljava/util/concurrent/CompletableFuture; 4t uv getTokens()Ljava/util/List;x0Invalid topic format, force close the connection z < ^| }~preroutingError%()Lio/moquette/broker/RoutingResults;9MQTT client: {} is not authorized to publish on topic: {}  M* Y  fixedHeader/()Lio/netty/handler/codec/mqtt/MqttFixedHeader;  isDup   listFailed$(Ljava/lang/String;I)Ljava/util/Set;  @ AT_LEAST_ONCE  C(Lio/netty/buffer/ByteBuf;Lio/moquette/broker/subscriptions/Topic;Lio/netty/handler/codec/mqtt/MqttQoS;Ljava/util/Set;)Lio/moquette/broker/RoutingResults;  isTraceEnabledsubscriber routes: {} ^  isAllSuccess   sendPubAck(I)V   manageRetain[(Lio/moquette/broker/subscriptions/Topic;Lio/netty/handler/codec/mqtt/MqttPublishMessage;)V  notifyTopicPublishedW(Lio/netty/handler/codec/mqtt/MqttPublishMessage;Ljava/lang/String;Ljava/lang/String;)V ^ failedRoutingsLjava/util/List;   insertAll,(ILjava/lang/String;Ljava/util/Collection;)V ^ successedRoutings   access$100c(Lio/moquette/broker/PostOffice$FailedPublishCollection;ILjava/lang/String;Ljava/util/Collection;)V  isRetain   isReadable  6 cleanRetained  retain   NO_FILTERLjava/util/Set;  matchQosSharpening:(Lio/moquette/broker/subscriptions/Topic;)Ljava/util/List;'No matching subscriptions for topic: {}  vjava/util/Collections emptyList ^ K(Ljava/util/List;Ljava/util/List;Ljava/util/concurrent/CompletableFuture;)V8io/moquette/broker/PostOffice$BatchingPublishesCollector  L(Lio/moquette/broker/PostOffice;Lio/moquette/broker/SessionEventLoopGroup;)V    java/util/Setcontains    countBatches  (I)Lio/netty/buffer/ByteBuf; accept(Lio/moquette/broker/PostOffice;Lio/netty/buffer/ByteBuf;Lio/moquette/broker/subscriptions/Topic;Lio/netty/handler/codec/mqtt/MqttQoS;)Ljava/util/function/Consumer;  routeBatchedPublishes/(Ljava/util/function/Consumer;)Ljava/util/List;\ g()Ljava/util/function/Function; g"()Ljava/util/function/IntFunction; ` toArray5(Ljava/util/function/IntFunction;)[Ljava/lang/Object;)[Ljava/util/concurrent/CompletableFuture; S allOfS([Ljava/util/concurrent/CompletableFuture;)Ljava/util/concurrent/CompletableFuture;)io/moquette/broker/PostOffice$RouteResult   access$300?(Lio/moquette/broker/PostOffice$RouteResult;)Ljava/lang/String;   subscriberIdsByEventLoop*(Ljava/lang/String;)Ljava/util/Collection;     access$400_(Lio/moquette/broker/PostOffice$RouteResult;)Lio/moquette/broker/PostOffice$RouteResult$Status;  0io/moquette/broker/PostOffice$RouteResult$StatusFAIL2Lio/moquette/broker/PostOffice$RouteResult$Status; V addAll(Ljava/util/Collection;)Z  \ duplicate xjava/util/Collection   !publishToSession(Lio/netty/buffer/ByteBuf;Lio/moquette/broker/subscriptions/Topic;Lio/moquette/broker/subscriptions/Subscription;Lio/netty/handler/codec/mqtt/MqttQoS;)V#NSending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {} % &'debug((Ljava/lang/String;[Ljava/lang/Object;)V ) *$sendNotRetainedPublishOnSessionAtQos,EPUBLISH to not yet present session. CId: {}, topicFilter: {}, qos: {}.-Processing PUB QoS2 message on connection: {} Y0 12variableHeader9()Lio/netty/handler/codec/mqtt/MqttPublishVariableHeader; 456 75io/netty/handler/codec/mqtt/MqttPublishVariableHeader topicName95MQTT client is not authorized to publish on topic: {} 4; <packetId > ?@ EXACTLY_ONCE A B sendPubRec D EgetRequestedQos H1Sending internal PUBLISH message Topic={}, qos={} J f*Lafter routed publishes: {} N O readableBytes Q RSnotifyClientConnected3(Lio/netty/handler/codec/mqtt/MqttConnectMessage;)V U VWnotifyClientDisconnected'(Ljava/lang/String;Ljava/lang/String;)V Y ZWnotifyClientConnectionLost \]^ _`(io/moquette/broker/SessionEventLoopGroupsessionLoopThreadName&(Ljava/lang/String;)Ljava/lang/String; \b cd routeCommandp(Ljava/lang/String;Ljava/lang/String;Ljava/util/concurrent/Callable;)Lio/moquette/broker/PostOffice$RouteResult; \f g  terminate i jWdispatchDisconnection l m<cleanupForClient o  p(Lio/netty/buffer/ByteBuf;Lio/moquette/broker/subscriptions/Topic;Ljava/util/Collection;Lio/netty/handler/codec/mqtt/MqttQoS;)V 5 s tb(Ljava/lang/String;Lio/moquette/broker/subscriptions/Topic;Lio/netty/handler/codec/mqtt/MqttQoS;)V v w@FAILURE yz{ |}org/slf4j/LoggerFactory getLogger%(Ljava/lang/Class;)Lorg/slf4j/Logger;java/util/HashSet ~  Signature#Ljava/util/Set;(Lio/moquette/broker/subscriptions/ISubscriptionsDirectory;Lio/moquette/broker/IRetainedRepository;Lio/moquette/broker/SessionRegistry;Lio/moquette/interception/BrokerInterceptor;Lio/moquette/broker/Authorizator;Lio/moquette/broker/SessionEventLoopGroup;)VCodeLineNumberTableLocalVariableTablethisLio/moquette/broker/PostOffice;init'(Lio/moquette/broker/SessionRegistry;)VfireWill$(Lio/moquette/broker/Session$Will;)Vwill!Lio/moquette/broker/Session$Will;subscribeClientToTopics|(Lio/netty/handler/codec/mqtt/MqttSubscribeMessage;Ljava/lang/String;Ljava/lang/String;Lio/moquette/broker/MQTTConnection;)V subscription/Lio/moquette/broker/subscriptions/Subscription;msg2Lio/netty/handler/codec/mqtt/MqttSubscribeMessage;clientIDusernamemqttConnection#Lio/moquette/broker/MQTTConnection; messageIDI ackTopics ackMessage/Lio/netty/handler/codec/mqtt/MqttSubAckMessage;newSubscriptionssessionLio/moquette/broker/Session;LocalVariableTypeTableELjava/util/List;ALjava/util/List; StackMapTable0io/netty/handler/codec/mqtt/MqttSubscribeMessage retainedQos payloadBuf retainedMsg$Lio/moquette/broker/RetainedMessage; topicFilter retainedMsgs targetSession6Ljava/util/List;V(Ljava/lang/String;Ljava/util/List;)Vreq3Lio/netty/handler/codec/mqtt/MqttTopicSubscription; topicFiltersgrantedQoSLevels-Lio/netty/handler/codec/mqtt/MqttFixedHeader;/Lio/netty/handler/codec/mqtt/MqttSubAckPayload;%Ljava/util/List;w(Ljava/util/List;I)Lio/netty/handler/codec/mqtt/MqttSubAckMessage; unsubscribe7(Ljava/util/List;Lio/moquette/broker/MQTTConnection;I)V(Lio/moquette/broker/subscriptions/Topic; validTopicZttopics$Ljava/util/List;K(Ljava/util/List;Lio/moquette/broker/MQTTConnection;I)VreceivedPublishQos0(Lio/moquette/broker/subscriptions/Topic;Ljava/lang/String;Ljava/lang/String;Lio/netty/handler/codec/mqtt/MqttPublishMessage;)Ljava/util/concurrent/CompletableFuture;0Lio/netty/handler/codec/mqtt/MqttPublishMessage; publishResult#Lio/moquette/broker/RoutingResults;(Lio/moquette/broker/subscriptions/Topic;Ljava/lang/String;Ljava/lang/String;Lio/netty/handler/codec/mqtt/MqttPublishMessage;)Ljava/util/concurrent/CompletableFuture;receivedPublishQos1(Lio/moquette/broker/MQTTConnection;Lio/moquette/broker/subscriptions/Topic;Ljava/lang/String;ILio/netty/handler/codec/mqtt/MqttPublishMessage;)Lio/moquette/broker/RoutingResults; failedClientsroutes connectionclientId publishingQossub subscibersIdsLjava/util/Collection;rr+Lio/moquette/broker/PostOffice$RouteResult;filterTargetClientstopicMatchingSubscriptions collector:Lio/moquette/broker/PostOffice$BatchingPublishesCollector;publishResultspublishFutures publishes(Ljava/util/concurrent/CompletableFuture;*Ljava/util/Collection;=Ljava/util/List;:Ljava/util/concurrent/CompletableFuture;(Lio/netty/buffer/ByteBuf;Lio/moquette/broker/subscriptions/Topic;Lio/netty/handler/codec/mqtt/MqttQoS;Ljava/util/Set;)Lio/moquette/broker/RoutingResults;GLjava/util/Collection;(Lio/netty/buffer/ByteBuf;Lio/moquette/broker/subscriptions/Topic;Ljava/util/Collection;Lio/netty/handler/codec/mqtt/MqttQoS;)VisSessionPresentreceivedPublishQos2(Lio/moquette/broker/MQTTConnection;Lio/netty/handler/codec/mqtt/MqttPublishMessage;Ljava/lang/String;)Lio/moquette/broker/RoutingResults;publishRoutingsinternalPublishU(Lio/netty/handler/codec/mqtt/MqttPublishMessage;)Lio/moquette/broker/RoutingResults;dispatchConnection0Lio/netty/handler/codec/mqtt/MqttConnectMessage;dispatchConnectionLostactionDescriptionactionLjava/util/concurrent/Callable;3Ljava/util/concurrent/Callable;(Ljava/lang/String;Ljava/lang/String;Ljava/util/concurrent/Callable;)Lio/moquette/broker/PostOffice$RouteResult;clientDisconnectedlambda$publish2Subscribers$4,(I)[Ljava/util/concurrent/CompletableFuture;x$0lambda$publish2Subscribers$3y(Lio/netty/buffer/ByteBuf;Lio/moquette/broker/subscriptions/Topic;Lio/netty/handler/codec/mqtt/MqttQoS;Ljava/util/List;)Vbatchlambda$receivedPublishQos0$2(Lio/netty/handler/codec/mqtt/MqttPublishMessage;Lio/moquette/broker/subscriptions/Topic;Ljava/lang/String;Ljava/lang/String;)V lambda$subscribeClientToTopics$1v(Ljava/lang/String;Lio/netty/handler/codec/mqtt/MqttTopicSubscription;)Lio/moquette/broker/subscriptions/Subscription; lambda$subscribeClientToTopics$06(Lio/netty/handler/codec/mqtt/MqttTopicSubscription;)Z access$200()Lorg/slf4j/Logger; SourceFilePostOffice.javaBootstrapMethods  "java/lang/invoke/LambdaMetafactory metafactory(Ljava/lang/invoke/MethodHandles$Lookup;Ljava/lang/String;Ljava/lang/invoke/MethodType;Ljava/lang/invoke/MethodType;Ljava/lang/invoke/MethodHandle;Ljava/lang/invoke/MethodType;)Ljava/lang/invoke/CallSite;  &(Ljava/lang/Object;)Ljava/lang/Object;  d(Lio/netty/handler/codec/mqtt/MqttTopicSubscription;)Lio/moquette/broker/subscriptions/Subscription;   (Ljava/lang/Object;)V   ! " isSuccess$.(Lio/moquette/broker/PostOffice$RouteResult;)Z& h(U(Lio/moquette/broker/PostOffice$RouteResult;)Ljava/util/concurrent/CompletableFuture;*(I)Ljava/lang/Object;, -  InnerClassesFailedPublishCollection2io/moquette/broker/PostOffice$1WillBatchingPublishesCollector RouteResultStatus8%java/lang/invoke/MethodHandles$Lookup:java/lang/invoke/MethodHandlesLookup   !$%(),- 2** Y**+*,*-"*&**&  %+1H222 !2$%2()22,->*+" $%Q*+.4Y+6:+=AW  +E6*,-+K:*Q:U[_,eimsV:w:  { : * *",:  *,w:  { : *& -J(38Edor|z d   99E{|D E{4N VV|#)|" *"+N,w:{::*:w:{?:  :  :  ĸ: -    Ww> '1>HKjqzp q&@ z?@ 3 j- 1i9>\'s9 >\6|9V|EV|STgYN+w:{$:-߶WػY:Y-:Y   & 8 ;MWH&gggI_MW2g_V|* ,:*":,++w:{v:4Y:: 6  ,#&+(+-*04,7;: *& A,+V#+,JU \!a#e$s%t()+-./23f UY8\R 9 Jd9I9 2,|?4<Ve*+,-EI+KNWR*X+A:]bdNWRg*+-,ko* 6 789$;3<;=F>L?QB>ee8e9e9e32 $,^ ,sW, wy+#NW{+:*,-E,NW{X:!*: *, :*,A:-+*,*&-*NW*nPQ RST!U%W+X9YGZM[Q^X`capb~cdfgiklmpruwp p ~89+9X23e p %+/ ^#-,%,X*+*+,{ |}!, --8-! CDS *+,-÷*  23 8 @CB*,:!,-^Y͸RҰY***:w:{0:åڹ +W*+,-:U_i:: Y: Y: w:  {E :  :   W+W  W^Y   Ұf "4Bax '14a '3 BB23B8B@B 6B|u l c H 'B 6u l c Z 4V|,e 4VVSVV|>  p <+:-:{&::*,ֱ$-8;R- ?@$<<23<8<<@63 <|, !o*"-ڶ:61"Y-SY-SYS$,+(%+Y-SY-SYS$.  +;@L`inHoo23o8oo?@ bV@4! ײ-+-4Y,/3::,X:+:*-E8K,NW{,/:6,"*: *= :*=A:+@*,*&,-*,NW*Z "(7DIMV`m |!"$&'(+-02p m |98"23(9VK m M41 ^"Z+*C*CL+679?@'x+FM4Y+/3:N+X:G-,I*-,A:K-+, M*-*-+6 IJKL*N4OAQKRNT]VgWjYuZ>xxp?@a8[234DNY4^ SA *&+P bc  jWL *&+,T f g   9 ?9WL *&+,X j k   9 ?9_`= **+[n  9cde **+,-au*  9 9   g 6**e yz WU*+,h*+k 9?9 /S i*+,-n+W 4238@%+ *,*&+-+NWC EHI$J4%%%8%9%9 \4Y+q:MY*,+߷r   9 8 E*߲u @/ /x~Yñ H    #%')+./: 0 1/3456@79;