今天遇到一个人问我netty3与netty4有什么区别。因为我之前使用netty做过网络程序开发,心里还是有点谱的。很自然地就说到了一些主要区别

  • 一些术语的变化,如Upstream变为了Inbound,Downstream变为了Outbound
  • netty3对每个读或写的操作,还会额外创建一个新的ChannelBuffer对象,这带来了很大的GC压力,为了缓解频繁申请回收Buffer时的GC压力,引入了池化的ByteBufs,当然在使用完Buffer后要注意需使用BufUtil.release释放。

那人再问,还有其它区别吗?然后我就说不上来了,惭愧。

回家查阅资料,终于将这个问题又深入理解了一次,这里记录一下以备忘。

线程模型的变化

Netty 3.X 版本线程模型

Netty 3.X的I/O操作线程模型比较复杂,它的处理模型包括两部分:

Inbound:主要包括链路建立事件、链路激活事件、读事件、I/O异常事件、链路关闭事件等; Outbound:主要包括写事件、连接事件、监听绑定事件、刷新事件等。 我们首先分析下Inbound操作的线程模型:

netty3_inbound.png

从上图可以看出,Inbound操作的主要处理流程如下:

  • I/O线程(Work线程)将消息从TCP缓冲区读取到SocketChannel的接收缓冲区中;
  • 由I/O线程负责生成相应的事件,触发事件向上执行,调度到ChannelPipeline中;
  • I/O线程调度执行ChannelPipeline中Handler链的对应方法,直到业务实现的Last Handler;
  • Last Handler将消息封装成Runnable,放入到业务线程池中执行,I/O线程返回,继续读/写等I/O操作;
  • 业务线程池从任务队列中弹出消息,并发执行业务逻辑。

通过对Netty 3的Inbound操作进行分析我们可以看出,Inbound的Handler都是由Netty的I/O Work线程负责执行。

下面我们继续分析Outbound操作的线程模型:

netty3_outbound.png

从上图可以看出,Outbound操作的主要处理流程如下:

业务线程发起Channel Write操作,发送消息;

  • Netty将写操作封装成写事件,触发事件向下传播;
  • 写事件被调度到ChannelPipeline中,由业务线程按照Handler Chain串行调用支持Downstream事件的Channel Handler;
  • 执行到系统最后一个ChannelHandler,将编码后的消息Push到发送队列中,业务线程返回;
  • Netty的I/O线程从发送消息队列中取出消息,调用SocketChannel的write方法进行消息发送。

Netty 4.X 版本线程模型

相比于Netty 3.X系列版本,Netty 4.X的I/O操作线程模型比较简答,它的原理图如下所示:

netty4_inoutbound.png

从上图可以看出,Outbound操作的主要处理流程如下:

  • I/O线程NioEventLoop从SocketChannel中读取数据报,将ByteBuf投递到ChannelPipeline,触发ChannelRead事件;
  • I/O线程NioEventLoop调用ChannelHandler链,直到将消息投递到业务线程,然后I/O线程返回,继续后续的读写操作;
  • 业务线程调用ChannelHandlerContext.write(Object msg)方法进行消息发送;
  • 如果是由业务线程发起的写操作,ChannelHandlerInvoker将发送消息封装成Task,放入到I/O线程NioEventLoop的任务队列中,由NioEventLoop在循环中统一调度和执行。放入任务队列之后,业务线程返回;
  • I/O线程NioEventLoop调用ChannelHandler链,进行消息发送,处理Outbound事件,直到将消息放入发送队列,然后唤醒Selector,进而执行写操作。

通过流程分析,我们发现Netty 4修改了线程模型,无论是Inbound还是Outbound操作,统一由I/O线程NioEventLoop调度执行。

自己的进一步理解

上述这段对比分析摘自这里。说的已经比较清楚了,但我还是加上一些说明:

  • netty4里第2步,I/O线程NioEventLoop调用ChannelHandler链,直到将消息投递到业务线程,这里netty并不直接将消息投递到业务线程,主要依赖于程序自行投递,一般方法无非是自行构造Task,将Task投递给自己的业务线程池。当然也可以在添加业务ChannelHandler时指定业务Handler运行所在的业务线程池,如下面的代码。
private final EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(4);
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();

private void start() throws InterruptedException {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .handler(new CheckAcceptHandler())
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch)
                        throws Exception {
                    ch.pipeline().addLast("decoder", new LineBasedFrameDecoder(4096));
                    ch.pipeline().addLast("stringDecoder", new StringDecoder(StandardCharsets.UTF_8));
                    ch.pipeline().addLast("lineEncoder", new LineEncoder(StandardCharsets.UTF_8));
                    //指定了业务handler运行所在的业务线程池
                    ch.pipeline().addLast(businessGroup, "logicHandler", new ServerLogicalHandler());
                }
            });
    final int listenPort = 8888;
    b.bind(listenPort).sync();
}
  • 如果原来的程序逻辑并没有使用单独的业务线程池的话,netty3与netty4在线程模型上就看不到变更了。当然非常不建议这么干,直接使用IO线程处理业务逻辑会极大地影响网络程序的处理性能。
  • netty3与netty4在线程模型上的变更,看着影响并不大,但其实会造成很多其它的问题,参见这里提到的4个问题,这些问题产生的根本原因均是由于线程模型发生变化造成的。

事件对象从ChannelHandler中消失了

在3.x时代,所有的I/O操作都会创建一个新的ChannelEvent对象,如下面的API

void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception;
void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception;

而netty4.x里,为了避免频繁创建与回收ChannelEvent对象所造成的GC压力,上述两个处理所有类型事件的接口被改成了多个接口。

void channelRegistered(ChannelHandlerContext ctx);
void channelUnregistered(ChannelHandlerContext ctx);
void channelActive(ChannelHandlerContext ctx);
void channelInactive(ChannelHandlerContext ctx);
void channelRead(ChannelHandlerContext ctx, Object message);

void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise);
void connect(
        ChannelHandlerContext ctx, SocketAddress remoteAddress,
        SocketAddress localAddress, ChannelPromise promise);
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise);
void close(ChannelHandlerContext ctx, ChannelPromise promise);
void deregister(ChannelHandlerContext ctx, ChannelPromise promise);
void write(ChannelHandlerContext ctx, Object message, ChannelPromise promise);
void flush(ChannelHandlerContext ctx);
void read(ChannelHandlerContext ctx);

ChannelHandlerContext类也被修改来反映上述提到的变化:

// Before:
ctx.sendUpstream(evt);

// After:
ctx.fireChannelRead(receivedMessage);

如果开发者有特殊需求,确实需要监听自己的事件类型,ChannelHandler也提供了一个处理器方法叫做userEventTriggered()。发送自定义事件类型可参考io.netty.handler.timeout.IdleStateHandler中发送IdleStateEvent的代码。处理自定义事件的代码如下

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
        throws Exception {
    if (evt instanceof IdleStateEvent) {
        IdleStateEvent event = (IdleStateEvent) evt;
        if (event.state().equals(IdleState.READER_IDLE)) {
            System.out.println("READER_IDLE");
            // 超时关闭channel
            ctx.close();
        } else if (event.state().equals(IdleState.WRITER_IDLE)) {
            System.out.println("WRITER_IDLE");
        } else if (event.state().equals(IdleState.ALL_IDLE)) {
            System.out.println("ALL_IDLE");
            // 发送心跳
            ctx.channel().write("ping\n");
        }
    }
    super.userEventTriggered(ctx, evt);
}

4.x的netty里Channel的write方法不再自动flush

3.x的netty里Channel的write方法会自动flush, 而netty4.x里不会了,这样程序员可以按照业务逻辑write响应,最后一次flush。但千万要记住最后必须flush了。当然也可以直接用writeAndFlush方法。

入站流量挂起

3.x有一个由Channel.setReadable(boolean)提供的不是很明显的入站流量挂起机制。它引入了在ChannelHandler之间的复杂交互操作,同时处理器由于不正确实现而很容易互相干扰。 4.x里,新的名为read()的出站操作增加了。如果你使用Channel.config().setAutoRead(false)来关闭默认的auto-read标志,Netty不会读入任何东西,直到你显式地调用read()操作。一旦你发布的read()操作完成,同时通道再次停止读,一个名为channelReadComplete()的入站事件会触发一遍你能够重新发布另外的read()操作。你同样也可以拦截read()操作来执行更多高级的流量控制。

如下面的代码

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //连接建立后,设置为不自动读入站流量
    ctx.channel().config().setAutoRead(false);
    super.channelActive(ctx);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    //这里可根据条件,决定是否再读一次
    ctx.channel().read();
    super.channelReadComplete(ctx);
}

调度任意的任务到一个I/O线程里运行

当一个Channel被注册到EventLoopGroup时,Channel实际上是注册到由EventLoopGroup管理EventLoop中的一个。在4.x里,EventLoop实现了java.utilconcurrent.ScheduledExecutorService接口。这意味着用户可以在一个用户通道归属的I/O线程里执行或调度一个任意的Runnable或Callable。随着新的娘好定义的线程模型的到来(稍后会介绍),它变得极其容易地编写一个线程安全的处理器。

public class MyHandler extends ChannelOutboundHandlerAdapter {
    ...
    public void flush(ChannelHandlerContext ctx, ChannelFuture f) {
        ...
        ctx.flush(f);

        // Schedule a write timeout.
        ctx.executor().schedule(new MyWriteTimeoutTask(), 30, TimeUnit.SECONDS);
        ...
    }
}

AttributeMap

在4.x里,一个名为AttributeMap的新接口被加入了,它被Channel和ChannelHandlerContext继承。作为替代,ChannelLocal和Channel.attachment被移除。这些属性会在他们关联的Channel被垃圾回收的同时回收。这个确实比原来方便不少。

public class MyHandler extends ChannelInboundHandlerAdapter<MyMessage> {

    private static final AttributeKey<MyState> STATE =
            new AttributeKey<MyState>("MyHandler.state");

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        ctx.attr(STATE).set(new MyState());
        ctx.fireChannelRegistered();
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MyMessage msg) {
        MyState state = ctx.attr(STATE).get();
    }
    ...
}

ChannelFuture拆分为ChannelFuture和ChannelPromise

在4.x里,ChannelFuture已经被拆分为ChannelFuture和ChannelPromise了。这不仅仅是让异步操作里的生产者和消费者间的约定更明显,同样也是得在使用从链中返回的ChannelFuture更加安全,因为ChannelFuture的状态是不能改变的。 由于这个编号,一些方法现在都采用ChannelPromise而不是ChannelFuture来改变它的状态。两者的核心区别是ChannelFuture的状态是不可改变的,而ChannelPromise可以。

如下面的代码

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    if (timeoutNanos > 0) {
        promise = promise.unvoid();
        scheduleTimeout(ctx, promise);
    }
    //下面的方法会改变promise的状态
    ctx.write(msg, promise);
}

不再有ExecutionHandler

在4.x里,不再有ExecutionHandler,而是提供DefaultEventExecutorGroup,可以在添加业务ChannelHandler时指定业务Handler运行所在的业务线程池,如下面的代码。

private final EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(4);
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();

private void start() throws InterruptedException {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .handler(new CheckAcceptHandler())
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch)
                        throws Exception {
                    ch.pipeline().addLast("decoder", new LineBasedFrameDecoder(4096));
                    ch.pipeline().addLast("stringDecoder", new StringDecoder(StandardCharsets.UTF_8));
                    ch.pipeline().addLast("lineEncoder", new LineEncoder(StandardCharsets.UTF_8));
                    //指定了业务handler运行所在的业务线程池
                    ch.pipeline().addLast(businessGroup, "logicHandler", new ServerLogicalHandler());
                }
            });
    final int listenPort = 8888;
    b.bind(listenPort).sync();
}

灵活的I/O线程分配

  • 在4.x里,能够从一个已存在的jdk套接字上创建一个Channel
java.nio.channels.SocketChannel mySocket = java.nio.channels.SocketChannel.open();

// Perform some blocking operation here.
...

// Netty takes over.
SocketChannel ch = new NioSocketChannel(mySocket);
EventLoopGroup group = ...;
group.register(ch);
  • 在4.x里,能够取消注册和重新注册一个Channel从/到一个I/O线程。例如,你能够利用Netty提供的高层次无阻塞I/O的优势来解决复杂的协议,然后取消注册Channel并且切换到阻塞模式来在可能的最大吞吐量下传输一个文件。当然,它能够再次注册已经取消了注册的Channel。
java.nio.channels.FileChannel myFile = ...;
java.nio.channels.SocketChannel mySocket = java.nio.channels.SocketChannel.open();

// Perform some blocking operation here.
...

// Netty takes over.
SocketChannel ch = new NioSocketChannel(mySocket);
EventLoopGroup group = ...;
group.register(ch);
...

// Deregister from Netty.
ch.deregister().sync();

// Perform some blocking operation here.
mySocket.configureBlocking(true);
myFile.transferFrom(mySocket, ...);

// Register back again to another event loop group.
EventLoopGroup anotherGroup = ...;
anotherGroup.register(ch);

简化的关闭

在4.x里,releaseExternalResources()不必再用了。你可以通过调用EventLoopGroup.shutdown()直接地关闭所有打开的连接同时使所有I/O线程停止,就像你使用java.util.concurrent.ExecutorService.shutdown()关闭你的线程池一样。

参考

http://www.infoq.com/cn/articles/netty-version-upgrade-history-thread-part https://www.oschina.net/translate/netty-4-0-new-and-noteworthy https://www.oschina.net/question/139577_146101 http://netty.io/wiki/new-and-noteworthy-in-4.0.html