4"io/moquette/broker/Session  java/time/ClocksystemDefaultZone()Ljava/time/Clock;  "io/moquette/broker/SessionRegistry(Lio/moquette/broker/subscriptions/ISubscriptionsDirectory;Lio/moquette/broker/ISessionsRepository;Lio/moquette/broker/IQueueRepository;Lio/moquette/broker/Authorizator;Ljava/util/concurrent/ScheduledExecutorService;Ljava/time/Clock;ILio/moquette/broker/SessionEventLoopGroup;)V  java/lang/Object()V&java/util/concurrent/ConcurrentHashMap   pool$Ljava/util/concurrent/ConcurrentMap;java/util/concurrent/DelayQueue  !"removableSessions!Ljava/util/concurrent/DelayQueue; $ %&subscriptionsDirectory:Lio/moquette/broker/subscriptions/ISubscriptionsDirectory; ( )*sessionsRepository(Lio/moquette/broker/ISessionsRepository; , -.queueRepository%Lio/moquette/broker/IQueueRepository; 0 12 authorizator!Lio/moquette/broker/Authorizator;4 56run:(Lio/moquette/broker/SessionRegistry;)Ljava/lang/Runnable; 8 9:%EXPIRED_SESSION_CLEANER_TASK_INTERVALLjava/time/Duration; <=> ?@java/time/Duration getSeconds()J BCD EFjava/util/concurrent/TimeUnitSECONDSLjava/util/concurrent/TimeUnit; HIJ KL-java/util/concurrent/ScheduledExecutorServicescheduleWithFixedDelay](Ljava/lang/Runnable;JJLjava/util/concurrent/TimeUnit;)Ljava/util/concurrent/ScheduledFuture; N OPscheduledExpiredSessions&Ljava/util/concurrent/ScheduledFuture; R STclockLjava/time/Clock; V WXglobalExpirySecondsI Z [\ loopsGroup*Lio/moquette/broker/SessionEventLoopGroup; ^ _recreateSessionPoolajava/util/ArrayList ` d efdrainTo(Ljava/util/Collection;)I h ijLOGLorg/slf4j/Logger;l#Retrieved {} expired sessions or {} nop qrjava/lang/IntegervalueOf(I)Ljava/lang/Integer; t uvsize()I xyz {|org/slf4j/Loggerdebug9(Ljava/lang/String;Ljava/lang/Object;Ljava/lang/Object;)V ~ java/util/Listiterator()Ljava/util/Iterator;  java/util/IteratorhasNext()Z next()Ljava/lang/Object;2io/moquette/broker/ISessionsRepository$SessionData expireAt()Ljava/util/Optional; apply()Ljava/util/function/Function;  java/util/Optionalmap3(Ljava/util/function/Function;)Ljava/util/Optional; UNDEFINED orElse&(Ljava/lang/Object;)Ljava/lang/Object;java/lang/String"Removing session {}, expired on {} clientId()Ljava/lang/String; remove(Ljava/lang/String;)V  &io/moquette/broker/ISessionsRepositorydelete7(Lio/moquette/broker/ISessionsRepository$SessionData;)V  isPresentjava/lang/RuntimeExceptionjava/lang/StringBuilder HCan't track for expiration a session without expiry instant, client_id: append-(Ljava/lang/String;)Ljava/lang/StringBuilder; toString )start tracking the session {} for removal x {'(Ljava/lang/String;Ljava/lang/Object;)V  add!(Ljava/util/concurrent/Delayed;)Z  (Ljava/lang/Object;)Z  #io/moquette/broker/IQueueRepositorylistQueueNames()Ljava/util/Set; list()Ljava/util/Collection; java/util/Collection  containsQueue(Ljava/lang/String;)Z getOrCreateQueue<(Ljava/lang/String;)Lio/moquette/broker/SessionMessageQueue;  java/util/Set  `(Lio/moquette/broker/ISessionsRepository$SessionData;ZLio/moquette/broker/SessionMessageQueue;)V  "java/util/concurrent/ConcurrentMapput8(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object; trackForRemovalOnExpiration isEmptyRRecreating sessions left {} unused queues. This is probably a bug. Session IDs: {} t toArray()[Ljava/lang/Object;  java/util/Arrays'([Ljava/lang/Object;)Ljava/lang/String; x  |error    retrieve0(Ljava/lang/String;)Lio/moquette/broker/Session;  createNewSession`(Lio/netty/handler/codec/mqtt/MqttConnectMessage;Ljava/lang/String;)Lio/moquette/broker/Session;8io/moquette/broker/SessionRegistry$SessionCreationResult  3io/moquette/broker/SessionRegistry$CreationModeEnumCREATED_CLEAN_NEW5Lio/moquette/broker/SessionRegistry$CreationModeEnum;  U(Lio/moquette/broker/Session;Lio/moquette/broker/SessionRegistry$CreationModeEnum;Z)VBAnother thread added a Session for our clientId {}, this is a bug! x  "(case 1, not existing session with CId {} x$ %trace ' ()reopenExistingSession(Lio/netty/handler/codec/mqtt/MqttConnectMessage;Ljava/lang/String;Lio/moquette/broker/Session;Ljava/lang/String;)Lio/moquette/broker/SessionRegistry$SessionCreationResult; +,- ./.io/netty/handler/codec/mqtt/MqttConnectMessagevariableHeader9()Lio/netty/handler/codec/mqtt/MqttConnectVariableHeader; 123 45io/netty/handler/codec/mqtt/MqttConnectVariableHeaderisCleanSession 6 7 disconnected 9 :closeImmediately < =>purgeSessionState(Lio/moquette/broker/Session;)V@0case 2, oldSession with same CId {} disconnected BCD EF(io/moquette/broker/Session$SessionStatus DISCONNECTED*Lio/moquette/broker/Session$SessionStatus; BH IF CONNECTING K LM assignStateW(Lio/moquette/broker/Session$SessionStatus;Lio/moquette/broker/Session$SessionStatus;)ZO,io/moquette/broker/SessionCorruptedExceptionQ4old session moved in connected state by other thread N T UVcopySessionConfigO(Lio/netty/handler/codec/mqtt/MqttConnectMessage;Lio/moquette/broker/Session;)V X YZreactivateSubscriptions1(Lio/moquette/broker/Session;Ljava/lang/String;)V\0case 3, oldSession with same CId {} disconnected ^ _REOPEN_EXISTING a bcsessionLio/moquette/broker/Session; e fggetSessionData6()Lio/moquette/broker/ISessionsRepository$SessionData; i juntrackFromRemovalOnExpiration l mngetSubscriptions()Ljava/util/List;p-io/moquette/broker/subscriptions/Subscription or stgetTopicFilter*()Lio/moquette/broker/subscriptions/Topic; v w getClientID yz{ |}io/moquette/broker/AuthorizatorcanReadO(Lio/moquette/broker/subscriptions/Topic;Ljava/lang/String;Ljava/lang/String;)Z  8io/moquette/broker/subscriptions/ISubscriptionsDirectoryremoveSubscription=(Lio/moquette/broker/subscriptions/Topic;Ljava/lang/String;)V io/moquette/broker/InMemoryQueue   'io/netty/handler/codec/mqtt/MqttVersion MQTT_3_1_1)Lio/netty/handler/codec/mqtt/MqttVersion;  P(Ljava/lang/String;Lio/netty/handler/codec/mqtt/MqttVersion;ILjava/time/Clock;)V 1  isWillFlag   createWillS(Lio/netty/handler/codec/mqtt/MqttConnectMessage;)Lio/moquette/broker/Session$Will;  (Lio/moquette/broker/ISessionsRepository$SessionData;ZLio/moquette/broker/Session$Will;Lio/moquette/broker/SessionMessageQueue;)V  markConnecting   saveSession  update%(ZLio/moquette/broker/Session$Will;)V + payload2()Lio/netty/handler/codec/mqtt/MqttConnectPayload;  .io/netty/handler/codec/mqtt/MqttConnectPayloadwillMessageInBytes()[B  io/netty/buffer/Unpooled copiedBuffer([B)Lio/netty/buffer/ByteBuf;   willTopic 1  isWillRetain 1 vwillQos  q#io/netty/handler/codec/mqtt/MqttQoS((I)Lio/netty/handler/codec/mqtt/MqttQoS;io/moquette/broker/Session$Will  T(Ljava/lang/String;Lio/netty/buffer/ByteBuf;Lio/netty/handler/codec/mqtt/MqttQoS;Z)V  get   disconnect  expireImmediately  gwithExpirationComputed"Remove session state for client {} B F DESTROYED#Session has already changed state:  -(Ljava/lang/Object;)Ljava/lang/StringBuilder;  > unsubscribe  Clean up removed session call=(Lio/moquette/broker/Session;)Ljava/util/concurrent/Callable;  (io/moquette/broker/SessionEventLoopGroup routeCommandp(Ljava/lang/String;Ljava/lang/String;Ljava/util/concurrent/Callable;)Lio/moquette/broker/PostOffice$RouteResult;  values  stream()Ljava/util/stream/Stream; test ()Ljava/util/function/Predicate;  java/util/stream/Streamfilter9(Ljava/util/function/Predicate;)Ljava/util/stream/Stream; C(Lio/moquette/broker/SessionRegistry;)Ljava/util/function/Function;  8(Ljava/util/function/Function;)Ljava/util/stream/Stream;     java/util/stream/CollectorstoList()Ljava/util/stream/Collector;  collect0(Ljava/util/stream/Collector;)Ljava/lang/Object;Disconnecting client: {})Client {} not found, nothing disconnected/Client {} successfully disconnected from broker   remoteAddress 1(Ljava/lang/String;)Ljava/util/function/Function;  !"$java/util/concurrent/ScheduledFuturecancel(Z)Z$,Successfully cancelled expired sessions task x& 'info)\Can't cancel the execution of expired sessions task, was already cancelled? {}, was done? {} + , isCancelled ./0 q1java/lang/Boolean(Z)Ljava/lang/Boolean; 3 4isDone x6 7|warn 9 :&updateNotCleanSessionsWithProperExpire < =close   CDE Fjava/util/ObjectsrequireNonNull H IJacceptG(Lio/moquette/broker/ISessionsRepository;)Ljava/util/function/Consumer; L MNforEach (Ljava/util/function/Consumer;)V P QisCleanS#io/moquette/broker/ClientDescriptor UVW Xjava/net/InetSocketAddress getHostString UZ [vgetPort R] ^((Ljava/lang/String;Ljava/lang/String;I)V ` acleanUp <c de ofSeconds(J)Ljava/time/Duration; ghi jkorg/slf4j/LoggerFactory getLogger%(Ljava/lang/Class;)Lorg/slf4j/Logger; Signature)Ljava/util/concurrent/ScheduledFuture<*>;TLjava/util/concurrent/ConcurrentMap;WLjava/util/concurrent/DelayQueue;(Lio/moquette/broker/subscriptions/ISubscriptionsDirectory;Lio/moquette/broker/ISessionsRepository;Lio/moquette/broker/IQueueRepository;Lio/moquette/broker/Authorizator;Ljava/util/concurrent/ScheduledExecutorService;Lio/moquette/broker/SessionEventLoopGroup;)VCodeLineNumberTableLocalVariableTablethis$Lio/moquette/broker/SessionRegistry; scheduler/Ljava/util/concurrent/ScheduledExecutorService;checkExpiredSessions expiredAtLjava/lang/String;expiredSession4Lio/moquette/broker/ISessionsRepository$SessionData;expiredSessionsLjava/util/List;drainedSessionsLocalVariableTypeTableFLjava/util/List; StackMapTablepersistentQueue(Lio/moquette/broker/SessionMessageQueue; rehydratedqueuesLjava/util/Set;^Lio/moquette/broker/SessionMessageQueue;#Ljava/util/Set;createOrReopenSession(Lio/netty/handler/codec/mqtt/MqttConnectMessage;Ljava/lang/String;Ljava/lang/String;)Lio/moquette/broker/SessionRegistry$SessionCreationResult; newSessionpreviouspostConnectAction:Lio/moquette/broker/SessionRegistry$SessionCreationResult;msg0Lio/netty/handler/codec/mqtt/MqttConnectMessage;username oldSessioncreationResult connectingZ newIsClean topicReadable existingSub/Lio/moquette/broker/subscriptions/Subscription;queuewill!Lio/moquette/broker/Session$Will;cleanexpiryInterval sessionData&io/moquette/broker/SessionMessageQueue willPayloadLio/netty/buffer/ByteBuf;retainedqos%Lio/netty/handler/codec/mqtt/MqttQoS;clientIDconnectionClosedresultoldlistConnectedClients?()Ljava/util/Collection; dropSession(Ljava/lang/String;Z)ZremoveSessionStateclientcreateClientDescriptor2(Lio/moquette/broker/Session;)Ljava/util/Optional;sremoteAddressOptLjava/util/Optional;2Ljava/util/Optional;Y(Lio/moquette/broker/Session;)Ljava/util/Optional;/lambda$updateNotCleanSessionsWithProperExpire$37(Lio/moquette/broker/ISessionsRepository$SessionData;)Z/lambda$updateNotCleanSessionsWithProperExpire$2(Lio/moquette/broker/Session;)Zlambda$createClientDescriptor$1U(Ljava/lang/String;Ljava/net/InetSocketAddress;)Lio/moquette/broker/ClientDescriptor;rLjava/net/InetSocketAddress;lambda$remove$00(Lio/moquette/broker/Session;)Ljava/lang/String; Exceptionsjava/lang/Exception SourceFileSessionRegistry.javaBootstrapMethods  "java/lang/invoke/LambdaMetafactory metafactory(Ljava/lang/invoke/MethodHandles$Lookup;Ljava/lang/String;Ljava/lang/invoke/MethodType;Ljava/lang/invoke/MethodType;Ljava/lang/invoke/MethodHandle;Ljava/lang/invoke/MethodType;)Ljava/lang/invoke/CallSite;  x java/time/Instant'(Ljava/time/Instant;)Ljava/lang/String;     connected  (Ljava/util/Optional;)Z  ;(Ljava/util/Optional;)Lio/moquette/broker/ClientDescriptor;  C(Ljava/net/InetSocketAddress;)Lio/moquette/broker/ClientDescriptor;  dR(Lio/moquette/broker/Session;)Lio/moquette/broker/ISessionsRepository$SessionData;  j(Lio/moquette/broker/ISessionsRepository$SessionData;)Lio/moquette/broker/ISessionsRepository$SessionData;(Ljava/lang/Object;)V  InnerClasses SessionDataSessionCreationResultCreationModeEnum SessionStatusWill)io/moquette/broker/PostOffice$RouteResultio/moquette/broker/PostOffice RouteResult/io/moquette/broker/SessionRegistry$PubRelMarker PubRelMarker3io/moquette/broker/SessionRegistry$PublishedMessagePublishedMessage2io/moquette/broker/SessionRegistry$EnqueuedMessageEnqueuedMessage%java/lang/invoke/MethodHandles$Lookup java/lang/invoke/MethodHandlesLookup!  OPlmWX[\9:ijln%&)*-.12!"loSTpq} *+,-  r sHtu%&)*-.12vw[\q f**Y*Y*+#*,'*-+*/**37;7;AGM*Q*U*Y*]r>$)/;GOU[aes\ ftuf%&f)*f-.f12fvwfSTfWXf[\xq!`YbL*+c=gkm*smw+}N-M-::gw**'r* )D[lus4[%yzD<{|tu|}~sX |}0~Rq@+!YY+÷ƿg+*+Wr (6?s@tu@b|(jqB *+Wr  s tu b|_qS*+L*'M,\,N*+-?*+-:+-WY-:*-W*-+g+m+r2 ,<KVcsx{s4K-cc,Lb|tu K- ^$q[b*, :K*+,:Y:*,:g,g!,#*+,-&:r.  #49EQT_s\ =c4c#1btubbzbz_[cQE + +  +()q+*06-5-8;*-;*+,:*,Wg?,#Y:F-AGJ6NYPR*+-S*-Wg[,#Y-]:*`dhrN &3?NQ]bmsz sf &(cN]7tuzcz &<& +YZqL+k}N-;-o:*/q,+ux6*#q+u~±r"),16HKs41*LtuLbcLz  =>q2+k}M,!,oN*#-q+u~ܱr .!1"s 2tu2bc  &q +*0>*+,: Y:*U6Y,*Q:+**+:Y:Y:*'r:%( )+$.20E2O3V4f5i6w9|:;sp Vfctuzwc$f2XXEE|$f2C8  +UVq'+*0>+* *+::,r?ABDF&Gs>'tu''bc q6+M+N+*6+*:Y-,İrJ KLM(Ns>6tu6 +#z(  qB*+rRstuz>qm++͙ *+;*+dзrVW X[]stubc =>qHg+u+AնJ=NYYض+ڶ÷R*+*+ur`abc:f?gGhs HtuHbc.:q1*+M,!*,dW*Y+,Wrklno0ts 1tu1z#c0qC** ߰rwxy(z2{7|Bws CtulqMg++*+N-g+-8*-;g+r2   $026:?Ks*MtuMzM -c  qw+uM+N-,r s*tucz   l=qL*Mg#%&g(*M*-*M2-5*8*+;r" %9>BKs Ltu":qJ*>?@A*'YBWGKr'1;Is Jtu qE*rs |@ qB *Ors  c @ qERY*+T+Y\rsz q4*_r pqs cq, b7 fgr 7  R      @B @     !