package io.moquette.server.netty;

import io.moquette.BrokerConstants;
import io.moquette.parser.netty.MQTTDecoder;
import io.moquette.parser.netty.MQTTEncoder;
import io.moquette.server.ServerAcceptor;
import io.moquette.server.config.IConfig;
import io.moquette.server.netty.metrics.BytesMetrics;
import io.moquette.server.netty.metrics.BytesMetricsCollector;
import io.moquette.server.netty.metrics.BytesMetricsHandler;
import io.moquette.server.netty.metrics.MQTTMessageLogger;
import io.moquette.server.netty.metrics.MessageMetrics;
import io.moquette.server.netty.metrics.MessageMetricsCollector;
import io.moquette.server.netty.metrics.MessageMetricsHandler;
import io.moquette.spi.impl.ProtocolProcessor;
import io.moquette.spi.security.ISslContextCreator;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import java.io.IOException;
import java.util.List;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/moquette/server/netty/NettyAcceptor.class */
public class NettyAcceptor implements ServerAcceptor {
    private static final String MQTT_SUBPROTOCOL_CSV_LIST = "mqtt, mqttv3.1, mqttv3.1.1";
    private static final Logger LOG = LoggerFactory.getLogger(NettyAcceptor.class);
    EventLoopGroup m_bossGroup;
    EventLoopGroup m_workerGroup;
    BytesMetricsCollector m_bytesMetricsCollector = new BytesMetricsCollector();
    MessageMetricsCollector m_metricsCollector = new MessageMetricsCollector();

    /* loaded from: input_file:io/moquette/server/netty/NettyAcceptor$ByteBufToWebSocketFrameEncoder.class */
    static class ByteBufToWebSocketFrameEncoder extends MessageToMessageEncoder<ByteBuf> {
        /* renamed from: encode, reason: avoid collision after fix types in other method */
        protected void encode2(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame();
            binaryWebSocketFrame.content().writeBytes(byteBuf);
            list.add(binaryWebSocketFrame);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.netty.handler.codec.MessageToMessageEncoder
        public /* bridge */ /* synthetic */ void encode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
            encode2(channelHandlerContext, byteBuf, (List<Object>) list);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/moquette/server/netty/NettyAcceptor$PipelineInitializer.class */
    public abstract class PipelineInitializer {
        PipelineInitializer() {
        }

        abstract void init(ChannelPipeline channelPipeline) throws Exception;
    }

    /* loaded from: input_file:io/moquette/server/netty/NettyAcceptor$WebSocketFrameToByteBufDecoder.class */
    static class WebSocketFrameToByteBufDecoder extends MessageToMessageDecoder<BinaryWebSocketFrame> {
        /* renamed from: decode, reason: avoid collision after fix types in other method */
        protected void decode2(ChannelHandlerContext channelHandlerContext, BinaryWebSocketFrame binaryWebSocketFrame, List<Object> list) throws Exception {
            ByteBuf content = binaryWebSocketFrame.content();
            content.retain();
            list.add(content);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.netty.handler.codec.MessageToMessageDecoder
        public /* bridge */ /* synthetic */ void decode(ChannelHandlerContext channelHandlerContext, BinaryWebSocketFrame binaryWebSocketFrame, List list) throws Exception {
            decode2(channelHandlerContext, binaryWebSocketFrame, (List<Object>) list);
        }
    }

    @Override // io.moquette.server.ServerAcceptor
    public void initialize(ProtocolProcessor protocolProcessor, IConfig iConfig, ISslContextCreator iSslContextCreator) throws IOException {
        this.m_bossGroup = new NioEventLoopGroup();
        this.m_workerGroup = new NioEventLoopGroup();
        NettyMQTTHandler nettyMQTTHandler = new NettyMQTTHandler(protocolProcessor);
        initializePlainTCPTransport(nettyMQTTHandler, iConfig);
        initializeWebSocketTransport(nettyMQTTHandler, iConfig);
        String property = iConfig.getProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME);
        String property2 = iConfig.getProperty(BrokerConstants.WSS_PORT_PROPERTY_NAME);
        if (property == null && property2 == null) {
            return;
        }
        SSLContext initSSLContext = iSslContextCreator.initSSLContext();
        if (initSSLContext == null) {
            LOG.error("Can't initialize SSLHandler layer! Exiting, check your configuration of jks");
        } else {
            initializeSSLTCPTransport(nettyMQTTHandler, iConfig, initSSLContext);
            initializeWSSTransport(nettyMQTTHandler, iConfig, initSSLContext);
        }
    }

    private void initFactory(String str, int i, final PipelineInitializer pipelineInitializer) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(this.m_bossGroup, this.m_workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { // from class: io.moquette.server.netty.NettyAcceptor.1
            @Override // io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) throws Exception {
                try {
                    pipelineInitializer.init(socketChannel.pipeline());
                } catch (Throwable th) {
                    NettyAcceptor.LOG.error("Severe error during pipeline creation", th);
                    throw th;
                }
            }
        }).option(ChannelOption.SO_BACKLOG, 128).option(ChannelOption.SO_REUSEADDR, true).option(ChannelOption.TCP_NODELAY, true).childOption(ChannelOption.SO_KEEPALIVE, true);
        try {
            ChannelFuture bind = serverBootstrap.bind(str, i);
            LOG.info("Server binded host: {}, port: {}", str, Integer.valueOf(i));
            bind.sync2();
        } catch (InterruptedException e) {
            LOG.error((String) null, e);
        }
    }

    private void initializePlainTCPTransport(final NettyMQTTHandler nettyMQTTHandler, IConfig iConfig) throws IOException {
        final MoquetteIdleTimeoutHandler moquetteIdleTimeoutHandler = new MoquetteIdleTimeoutHandler();
        String property = iConfig.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
        String property2 = iConfig.getProperty("port", BrokerConstants.DISABLED_PORT_BIND);
        if (BrokerConstants.DISABLED_PORT_BIND.equals(property2)) {
            LOG.info("tcp MQTT is disabled because the value for the property with key {}", "port");
        } else {
            initFactory(property, Integer.parseInt(property2), new PipelineInitializer() { // from class: io.moquette.server.netty.NettyAcceptor.2
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super();
                }

                @Override // io.moquette.server.netty.NettyAcceptor.PipelineInitializer
                void init(ChannelPipeline channelPipeline) {
                    channelPipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, 10));
                    channelPipeline.addAfter("idleStateHandler", "idleEventHandler", moquetteIdleTimeoutHandler);
                    channelPipeline.addFirst("bytemetrics", new BytesMetricsHandler(NettyAcceptor.this.m_bytesMetricsCollector));
                    channelPipeline.addLast("decoder", new MQTTDecoder());
                    channelPipeline.addLast("encoder", new MQTTEncoder());
                    channelPipeline.addLast("metrics", new MessageMetricsHandler(NettyAcceptor.this.m_metricsCollector));
                    channelPipeline.addLast("messageLogger", new MQTTMessageLogger());
                    channelPipeline.addLast("handler", nettyMQTTHandler);
                }
            });
        }
    }

    private void initializeWebSocketTransport(final NettyMQTTHandler nettyMQTTHandler, IConfig iConfig) throws IOException {
        String property = iConfig.getProperty(BrokerConstants.WEB_SOCKET_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
        if (BrokerConstants.DISABLED_PORT_BIND.equals(property)) {
            LOG.info("WebSocket is disabled");
            return;
        }
        int parseInt = Integer.parseInt(property);
        final MoquetteIdleTimeoutHandler moquetteIdleTimeoutHandler = new MoquetteIdleTimeoutHandler();
        initFactory(iConfig.getProperty(BrokerConstants.HOST_PROPERTY_NAME), parseInt, new PipelineInitializer() { // from class: io.moquette.server.netty.NettyAcceptor.3
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // io.moquette.server.netty.NettyAcceptor.PipelineInitializer
            void init(ChannelPipeline channelPipeline) {
                channelPipeline.addLast(new HttpServerCodec());
                channelPipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                channelPipeline.addLast("webSocketHandler", new WebSocketServerProtocolHandler("/mqtt", NettyAcceptor.MQTT_SUBPROTOCOL_CSV_LIST));
                channelPipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
                channelPipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
                channelPipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, 10));
                channelPipeline.addAfter("idleStateHandler", "idleEventHandler", moquetteIdleTimeoutHandler);
                channelPipeline.addFirst("bytemetrics", new BytesMetricsHandler(NettyAcceptor.this.m_bytesMetricsCollector));
                channelPipeline.addLast("decoder", new MQTTDecoder());
                channelPipeline.addLast("encoder", new MQTTEncoder());
                channelPipeline.addLast("metrics", new MessageMetricsHandler(NettyAcceptor.this.m_metricsCollector));
                channelPipeline.addLast("messageLogger", new MQTTMessageLogger());
                channelPipeline.addLast("handler", nettyMQTTHandler);
            }
        });
    }

    private void initializeSSLTCPTransport(final NettyMQTTHandler nettyMQTTHandler, IConfig iConfig, final SSLContext sSLContext) throws IOException {
        String property = iConfig.getProperty(BrokerConstants.SSL_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
        if (BrokerConstants.DISABLED_PORT_BIND.equals(property)) {
            LOG.info("SSL MQTT is disabled because there is no value in properties for key {}", BrokerConstants.SSL_PORT_PROPERTY_NAME);
            return;
        }
        int parseInt = Integer.parseInt(property);
        LOG.info("Starting SSL on port {}", Integer.valueOf(parseInt));
        final MoquetteIdleTimeoutHandler moquetteIdleTimeoutHandler = new MoquetteIdleTimeoutHandler();
        String property2 = iConfig.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
        final boolean booleanValue = Boolean.valueOf(iConfig.getProperty(BrokerConstants.NEED_CLIENT_AUTH, "false")).booleanValue();
        initFactory(property2, parseInt, new PipelineInitializer() { // from class: io.moquette.server.netty.NettyAcceptor.4
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // io.moquette.server.netty.NettyAcceptor.PipelineInitializer
            void init(ChannelPipeline channelPipeline) throws Exception {
                channelPipeline.addLast("ssl", NettyAcceptor.this.createSslHandler(sSLContext, booleanValue));
                channelPipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, 10));
                channelPipeline.addAfter("idleStateHandler", "idleEventHandler", moquetteIdleTimeoutHandler);
                channelPipeline.addFirst("bytemetrics", new BytesMetricsHandler(NettyAcceptor.this.m_bytesMetricsCollector));
                channelPipeline.addLast("decoder", new MQTTDecoder());
                channelPipeline.addLast("encoder", new MQTTEncoder());
                channelPipeline.addLast("metrics", new MessageMetricsHandler(NettyAcceptor.this.m_metricsCollector));
                channelPipeline.addLast("messageLogger", new MQTTMessageLogger());
                channelPipeline.addLast("handler", nettyMQTTHandler);
            }
        });
    }

    private void initializeWSSTransport(final NettyMQTTHandler nettyMQTTHandler, IConfig iConfig, final SSLContext sSLContext) throws IOException {
        String property = iConfig.getProperty(BrokerConstants.WSS_PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);
        if (BrokerConstants.DISABLED_PORT_BIND.equals(property)) {
            LOG.info("SSL websocket is disabled because there is no value in properties for key {}", BrokerConstants.WSS_PORT_PROPERTY_NAME);
            return;
        }
        int parseInt = Integer.parseInt(property);
        final MoquetteIdleTimeoutHandler moquetteIdleTimeoutHandler = new MoquetteIdleTimeoutHandler();
        String property2 = iConfig.getProperty(BrokerConstants.HOST_PROPERTY_NAME);
        final boolean booleanValue = Boolean.valueOf(iConfig.getProperty(BrokerConstants.NEED_CLIENT_AUTH, "false")).booleanValue();
        initFactory(property2, parseInt, new PipelineInitializer() { // from class: io.moquette.server.netty.NettyAcceptor.5
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // io.moquette.server.netty.NettyAcceptor.PipelineInitializer
            void init(ChannelPipeline channelPipeline) throws Exception {
                channelPipeline.addLast("ssl", NettyAcceptor.this.createSslHandler(sSLContext, booleanValue));
                channelPipeline.addLast("httpEncoder", new HttpResponseEncoder());
                channelPipeline.addLast("httpDecoder", new HttpRequestDecoder());
                channelPipeline.addLast("aggregator", new HttpObjectAggregator(65536));
                channelPipeline.addLast("webSocketHandler", new WebSocketServerProtocolHandler("/mqtt", NettyAcceptor.MQTT_SUBPROTOCOL_CSV_LIST));
                channelPipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
                channelPipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
                channelPipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, 10));
                channelPipeline.addAfter("idleStateHandler", "idleEventHandler", moquetteIdleTimeoutHandler);
                channelPipeline.addFirst("bytemetrics", new BytesMetricsHandler(NettyAcceptor.this.m_bytesMetricsCollector));
                channelPipeline.addLast("decoder", new MQTTDecoder());
                channelPipeline.addLast("encoder", new MQTTEncoder());
                channelPipeline.addLast("metrics", new MessageMetricsHandler(NettyAcceptor.this.m_metricsCollector));
                channelPipeline.addLast("messageLogger", new MQTTMessageLogger());
                channelPipeline.addLast("handler", nettyMQTTHandler);
            }
        });
    }

    @Override // io.moquette.server.ServerAcceptor
    public void close() {
        if (this.m_workerGroup == null) {
            throw new IllegalStateException("Invoked close on an Acceptor that wasn't initialized");
        }
        if (this.m_bossGroup == null) {
            throw new IllegalStateException("Invoked close on an Acceptor that wasn't initialized");
        }
        Future<?> shutdownGracefully = this.m_workerGroup.shutdownGracefully();
        Future<?> shutdownGracefully2 = this.m_bossGroup.shutdownGracefully();
        try {
            shutdownGracefully.await(100L);
            try {
                shutdownGracefully2.await(100L);
                MessageMetrics computeMetrics = this.m_metricsCollector.computeMetrics();
                LOG.info("Msg read: {}, msg wrote: {}", Long.valueOf(computeMetrics.messagesRead()), Long.valueOf(computeMetrics.messagesWrote()));
                BytesMetrics computeMetrics2 = this.m_bytesMetricsCollector.computeMetrics();
                LOG.info(String.format("Bytes read: %d, bytes wrote: %d", Long.valueOf(computeMetrics2.readBytes()), Long.valueOf(computeMetrics2.wroteBytes())));
            } catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        } catch (InterruptedException e2) {
            throw new IllegalStateException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChannelHandler createSslHandler(SSLContext sSLContext, boolean z) {
        SSLEngine createSSLEngine = sSLContext.createSSLEngine();
        createSSLEngine.setUseClientMode(false);
        if (z) {
            createSSLEngine.setNeedClientAuth(true);
        }
        return new SslHandler(createSSLEngine);
    }
}
