驽马十驾 驽马十驾

驽马十驾,功在不舍

目录
IM系列2:利用Netty启动 WebSocket 以及心跳机制的作用和实现
/  

IM系列2:利用Netty启动 WebSocket 以及心跳机制的作用和实现

开篇

最近为公司的某个业务实现了一个基于NettyIM聊天应用的服务端,这里将关键思路和重点技术总结下,希望能对大家有个帮助。

这是 IM系列第二篇。

核心技术

Netty是一门强大的NIO框架,如果涉及到网络IO,大多数情况都请应该选择他作为底层的IO框架。

接下来你会了解下如下几个知识点:

  • Netty 的基本启动类
  • Netty 如何将HTTP升级为 WebSocket

启动类

老规矩,直接上代码:

private fun startNetty() {
        try {
            val begin = System.currentTimeMillis()
            //初始化
            bossGroup = NioEventLoopGroup()
            workGroup = NioEventLoopGroup()

            serverBootstrap = ServerBootstrap().group(bossGroup, workGroup) //boss辅助客户端的tcp连接请求  worker负责与客户端之前的读写操作
                    .channel(NioServerSocketChannel::class.java) //配置客户端的channel类型
                    .option(ChannelOption.SO_BACKLOG, 1024) //配置TCP参数,握手字符串长度设置
                    .childOption(ChannelOption.SO_KEEPALIVE, true)//开启心跳包活机制,就是客户端、服务端建立连接处于ESTABLISHED状态,超过2小时没有交流,机制会被启动
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR, FixedRecvByteBufAllocator(592048))//配置固定长度接收缓存区分配器
                    .childHandler(childChannelHandler) //绑定I/O事件的处理,类,WebSocketChildChannelHandler中定义
            val end = System.currentTimeMillis()

            channelFuture = serverBootstrap.bind(port).sync().addListener {
                if (it.isSuccess) {
                    log.warn("[im-启动]Netty Websocket服务器启动\"成功\",耗时 " + (end - begin) + " ms,已绑定端口 " + port + " 阻塞式等候客户端连接")
                } else {
                    log.warn("[im-启动]Netty Websocket服务器启动\"失败\",耗时 " + (end - begin) + " ms,已绑定端口 " + port + " 阻塞式等候客户端连接")
                }
            }
        } catch (e: Exception) {
            log.error("[im-启动]启动netty时报错", e)
            bossGroup.shutdownGracefully()
            workGroup.shutdownGracefully()
        }
    }

上面的就是 Netty负责 IO连接的模板代码,用过 Netty的,都应该明白。

其中最重要的是childHandler(childChannelHandler) ,这个childChannelHandler是根据业务需要实现的IO处理器。

比如我为了解析WebSocket实现如下:

@Component
class IMChildHandlerInitializer : ChannelInitializer<SocketChannel>() {


    @Autowired
    private lateinit var webSocketServerHandler: IMWebsocketHandler

    override fun initChannel(ch: SocketChannel) {
        //处理基本的 HTTP请求
        ch.pipeline().addLast("http-codec", HttpServerCodec()) 
        ch.pipeline().addLast("aggregator", HttpObjectAggregator(64 * 1024)) 
        ch.pipeline().addLast("http-chunked", ChunkedWriteHandler())
      
        //http 的请求升级到 websocket 请求有 2 种方式,一种是自己写,一种是 交给netty内置的连接来处理
        ch.pipeline().addLast("http-up-wb", WebSocketServerProtocolHandler("/ws"))
        ch.pipeline().addLast("heart-notice",IdleStateHandler(60, 60, 0, TimeUnit.SECONDS))
        ch.pipeline().addLast("heart-handler", heartHandler)
        ch.pipeline().addLast("websocket-handler", webSocketServerHandler)
    }

}
  • 因为websockethttp协议升级来的,所以最前 3 行代码都是解析http
  • 接下来的 4 句代码,都挺关键的,我们分别来看:
    • addLast("http-up-wb", WebSocketServerProtocolHandler("/ue-ws"))使用内置的WebSocketServerProtocolHandlerhttp升级到websocket协议。其中/ws就是连接的后缀,你可以自己修改。
    • addLast("heart-notice",IdleStateHandler(60, 60, 0, TimeUnit.SECONDS))是一个内置的心跳处理器,会在没有读\写超过规定时间的时候,触发对应的接口。
    • addLast("heart-handler", heartHandler)通过心跳来判定用户是否下线
    • addLast("websocket-handler", webSocketServerHandler)这个就是核心的websocket业务处理类了。

心跳处理器

websocket是长连接,当客户端异常退出或者路由器断线等特殊情况的时候,连接可能并不会自动关闭。但是对于应用而言,每一个连接都是宝贵的,所以不能让冗余的连接存在来浪费资源。

为了处理这种实际上断开了连接,但是应用没有断开的情况,需要增加了心跳机制。

同时因为连接是客户端发起的,所以应该选择让客户端主动来发起心跳,服务端根据心跳来进行处理。

下面是服务端处理心跳的简化代码。

@Component
@ChannelHandler.Sharable
class HeartHandler : ChannelInboundHandlerAdapter() {

    companion object {
        /**
         * 用户的基本信息
         */
        private val LOSE_HEART_TIMES: AttributeKey<Int> = AttributeKey.valueOf("lose-heart-times")

        /**
         * 日志
         */
        private val log: Logger = LoggerFactory.getLogger(javaClass)

        /**
         * 默认测次数,触发该事件,就应该是一次了
         */
        private const val DEFAULT_TIMES: Int = 1

        /**
         * 最大次数
         */
        private const val LIMIT_LOSE_TIMES: Int = 2
    }

    @Autowired
    private lateinit var channelContextManager: ChannelContextManager

    /**
     * 只会触发一次的激活回调
     *
     * @param ctx ChannelHandlerContext
     */
    override fun channelActive(ctx: ChannelHandlerContext) {
        ctx.channel().attr(LOSE_HEART_TIMES).set(DEFAULT_TIMES)
        super.channelActive(ctx)
    }

    /**
     * 通用事件的处理,因为在此之前注册了心跳事件,所以这里触发的应该为心跳事件
     *
     * @param ctx ChannelHandlerContext
     * @param evt Any
     */
    override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
        if (evt !is IdleStateEvent)
            return

        when (evt.state()) {
            //某个 channel 读超时
            IdleState.READER_IDLE -> {
                if (loseHeartTimesOverLimit(ctx)) {
                    log.error("[im-心跳死亡]读超时,下线处理,用户下线:{}...")
                    ctx.close()
                } else {
                    log.error("[im-心跳丢失]读超时{}...", channelContextManager.mtAttr(ctx)?.userInfo?.realName)
                }
            }
        }
        ctx.writeAndFlush(TextWebSocketFrame("服务端心跳事件..."))
    }

    /**
     * 判定心跳的次数是否已经大于阈值
     *
     * @param ctx ChannelHandlerContext
     */
    private fun loseHeartTimesOverLimit(ctx: ChannelHandlerContext): Boolean {
        val times = ctx.channel().attr(LOSE_HEART_TIMES).get()!!
        return if (times >= LIMIT_LOSE_TIMES) {
            true
        } else {
            ctx.channel().attr(LOSE_HEART_TIMES).set(times + 1)
            false
        }
    }

}
  • HeartHandler是交给 Spring 进行管理的,所以会在多个连接中共享这个变量,按照Netty要求,我们需要采用@ChannelHandler.Sharable注解来注释该类,示意该变量是共享的。【备注1】

  • 复写了channelActive事件,作用是客户端第一次连接将心跳超时这个变量同这个Channel绑定

  • 复写了userEventTriggered并且处理了IdleState.READER_IDLE 这个服务端读的事件,心跳判定就在这个这个事件里面处理。

  • loseHeartTimesOverLimit利用了Channel绑定的参数LOSE_HEART_TIMES来判定当前是否已经超过了断线阈值。

结语

本篇文章主要谈到了 2 个知识点:

  • 利用Netty来启动Websocket的启动代码
  • 如何利用心跳来处理异常情况下,网络本已断开,但是应用的连接未断开连接的问题

下一篇我们来讲讲具体的通信业务编写。

【备注1】20200722 最近看到SOFARPC以及部分文章有提到,心跳的Handler不应该共享,我后续代码已经改为不共享了,虽然共享我也没有发现出什么问题...~~~~

骐骥一跃,不能十步。驽马十驾,功在不舍。