返回

Netty——任务加入异步线程池

发布时间:2022-10-22 18:13:17 318
# bootstrap# java# 服务器# 数据# 服务器

前言

我们常常遇到这样的需求:在一个业务逻辑处理器中,需要写数据库、进行网络连接等耗时业务。Netty的原则是不阻塞I/O线程,所以需指定Handler执行的线程池。

 

如果MyBusinessLogicHandler是一个耗时的处理逻辑,应该制定group,避免I/O线程被阻塞,如果业务逻辑是异步处理或处理时间很快那么可以不用指定group。

 

在 Netty 中做耗时的,不可预料的操作,比如数据库,网络请求,会严重影响 Netty 对 Socket 的处理速度。 而解决方法就是将耗时任务添加到异步线程池中。但就添加线程池这步操作来讲,可以有2种方式,而且这2种方式实现的区别也蛮大的。

  • 1、处理耗时业务的第一种方式:handler 中加入线程池。
  • 2、处理耗时业务的第二种方式:Context 中添加线程池。

当我们使用addLast方法添加线程池后,handler将优先使用这个线程池,如果不添加,将使用IO线程。

handler 中加入线程池

服务端的实现

public class NettyServer {

    public static void main(String[] args) {
        //创建bossGroup 和 workerGroup
        //创建两个线程组,bossGroup和workerGroup
        //bossGroup只是处理连接请求,真正的和客户端业务处理会交给workerGroup处理
        //两个都是无限循环
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来进行设置
            bootstrap.group(bossGroup,workerGroup)//设置两个线程组
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端的通道实现
                    .option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接的个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                    //给workerGroup的EventLoop对应的管道设置处理器
                    .childHandler(new NettyServerInitializer());
            System.out.println("......服务器 id ready...");
            //绑定一个端口并同步,生成一个ChannelFuture对象
            ChannelFuture channelFuture = bootstrap.bind(6668).sync();

            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

服务端Initializer

public class NettyServerInitializer extends ChannelInitializer {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new NettyServerHandler());
    }
}

服务端Handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //group充当业务线程池,可以将任务提交到该线程池
    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    /**
     * 读取数据事件(读取客户端发送的消息)
     * @param ctx   上下文对象,可以获取管道pipeline,通道channel,地址
     * @param msg   客户端发送的数据,默认Object
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("NettyServerHandler.channelRead 执行的线程:"+Thread.currentThread().getName());
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:"+ctx.channel().remoteAddress());
        //如果这里有一个非常耗时长的业务 -> 提交到线程池异步执行
        group.submit(() -> {
            try {
                //模拟耗时长的业务
                Thread.sleep(10 * 1000);
                System.out.println("group.submit 异步执行的线程:"+Thread.currentThread().getName());
                ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵2~", CharsetUtil.UTF_8));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        System.out.println("......go on ......");
    }

    /**
     * 数据读取完毕
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵1~", CharsetUtil.UTF_8));
    }

    /**
     * 发生异常,需要关闭通道
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }
}

Context 中添加线程池

服务端的实现

public class NettyServer {

    public static void main(String[] args) {
        //创建bossGroup 和 workerGroup
        //创建两个线程组,bossGroup和workerGroup
        //bossGroup只是处理连接请求,真正的和客户端业务处理会交给workerGroup处理
        //两个都是无限循环
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用链式编程来进行设置
            bootstrap.group(bossGroup,workerGroup)//设置两个线程组
                    .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端的通道实现
                    .option(ChannelOption.SO_BACKLOG,128)//设置线程队列等待连接的个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                    //给workerGroup的EventLoop对应的管道设置处理器
                    .childHandler(new NettyServerInitializer());
            System.out.println("......服务器 id ready...");
            //绑定一个端口并同步,生成一个ChannelFuture对象
            ChannelFuture channelFuture = bootstrap.bind(6668).sync();

            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

服务端Initializer

public class NettyServerInitializer extends ChannelInitializer {

    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        //在addLast添加handler,参数指定了EventExecutorGroup
        //那么该handler会优先加入到线程池中执行
        pipeline.addLast(group,"handler",new NettyServerHandler());
    }
}

服务端Handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取数据事件(读取客户端发送的消息)
     * @param ctx   上下文对象,可以获取管道pipeline,通道channel,地址
     * @param msg   客户端发送的数据,默认Object
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("NettyServerHandler.channelRead 执行的线程:"+Thread.currentThread().getName());
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客户端发送消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:"+ctx.channel().remoteAddress());
        System.out.println("......go on ......");
    }

    /**
     * 数据读取完毕
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵1~", CharsetUtil.UTF_8));
    }

    /**
     * 发生异常,需要关闭通道
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }
}

两种方式的比较

  • 1、第一种方式在handler内部添加EventExecutorGroup,可能更加自由,比如如果需要访问数据库等耗时操作那就异步,如果不需要那就不异步,异步可能会拖长接口响应时间,因为需要将任务放进mpscTask中,如果IO时间很短,Task很多,可能一个循环下来,都没有时间执行整个task,导致接口响应时间不达标。
  • 2、第二种方式是Netty标准方式(即加入到队列),但是这样做会将整个handler都交给业务线程池,不论耗时不耗时,都加入到队列里不够灵活。
  • 3、两种方式各有优劣,第一种灵活性更加,怎么使用,视实际情况而定。
特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报
评论区(0)
按点赞数排序
用户头像
精选文章
thumb 中国研究员首次曝光美国国安局顶级后门—“方程式组织”
thumb 俄乌线上战争,网络攻击弥漫着数字硝烟
thumb 从网络安全角度了解俄罗斯入侵乌克兰的相关事件时间线
下一篇
Netty——解决Selector 空轮询BUG 2022-10-22 17:49:44