WriteAndFlush
WriteAndFlush这里分为两个步骤,Write,Flush
Write
回忆上文追到的unsafe的write方法,这个方法就是本文的入口。
1 | public final void write(Object msg, ChannelPromise promise) { |
一共分为四个步骤
(1)确认当前是reactor线程
(2)过滤msg
1 | protected final Object filterOutboundMessage(Object msg) { |
如果msg既不是ByteBuf类型也不是FileRegion类型的,那么直接抛出异常。这里还有一个值得注意的,方法会将所有非直接内存转换成直接内存DirectBuffer
(3)估算msg的size
(4)调用outboundBuffer.addMessage(msg, size, promise)
1 | //ChannelOutboundBuffer |
这里涉及几个指针,tailEntry,flushedEntry,unFlushedEntry,在执行N次上述方法后,指针之间如下图所示
Flush
在pipeline上调用的flush最终都会落到head节点上
1 | //HeadContext |
1 | //AbstractUnsafe |
这里主要看addFlush和flush0方法
1 | public void addFlush() { |
结合上面那个图,我们知道该方法执行完毕后,unFlushedEntry和flushedEntry位置对调了。接着去看flush0
1 | protected void flush0() { |
1 | protected void doWrite(ChannelOutboundBuffer in) throws Exception { |
(1)通过current方法拿到第一个需要flush的节点
1 | public Object current() { |
(2)获取自旋的次数,后文会提到为啥要自旋
(3)调用doWriteBytes(buf)
1 | protected int doWriteBytes(ByteBuf buf) throws Exception { |
这里解释下为啥要自旋,因为doWriteBytes并不保证一次会将entry的数据读取完毕,所以需要不断循环直到!buf.isReadable()
。
我们看到这里调用了ByteBuf的readBytes方法将数据写到对应的channel中,官方文档如是说,Transfers this buffer’s data to the specified stream starting at the current。
(4)删除节点,就是普通的链表删除头节点的套路
1 | private void removeEntry(Entry e) { |
几个值得注意的点:
1.netty中的缓冲区中的byteBuf是DirectByteBuf
2.调用write方法实际是把数据写到了单向链表中
3.调用flush才是真正的把数据写到socket缓冲区