WriteAndFlush
WriteAndFlush这里分为两个步骤,Write,Flush
Write
回忆上文追到的unsafe的write方法,这个方法就是本文的入口。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
一共分为四个步骤
(1)确认当前是reactor线程
(2)过滤msg1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
如果msg既不是ByteBuf类型也不是FileRegion类型的,那么直接抛出异常。这里还有一个值得注意的,方法会将所有非直接内存转换成直接内存DirectBuffer
(3)估算msg的size
(4)调用outboundBuffer.addMessage(msg, size, promise)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16//ChannelOutboundBuffer
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
incrementPendingOutboundBytes(size, false);
}
这里涉及几个指针,tailEntry,flushedEntry,unFlushedEntry,在执行N次上述方法后,指针之间如下图所示
Flush
在pipeline上调用的flush最终都会落到head节点上1
2
3
4//HeadContext
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
1 | //AbstractUnsafe |
这里主要看addFlush和flush0方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public void addFlush() {
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
unflushedEntry = null;
}
}
结合上面那个图,我们知道该方法执行完毕后,unFlushedEntry和flushedEntry位置对调了。接着去看flush01
2
3protected void flush0() {
doWrite(outboundBuffer);
}
1 | protected void doWrite(ChannelOutboundBuffer in) throws Exception { |
(1)通过current方法拿到第一个需要flush的节点1
2
3
4
5
6
7public Object current() {
Entry entry = flushedEntry;
if (entry == null) {
return null;
}
return entry.msg;
}
(2)获取自旋的次数,后文会提到为啥要自旋
(3)调用doWriteBytes(buf)
1
2
3
4protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
这里解释下为啥要自旋,因为doWriteBytes并不保证一次会将entry的数据读取完毕,所以需要不断循环直到!buf.isReadable()
。
我们看到这里调用了ByteBuf的readBytes方法将数据写到对应的channel中,官方文档如是说,Transfers this buffer’s data to the specified stream starting at the current。
(4)删除节点,就是普通的链表删除头节点的套路1
2
3
4
5
6
7
8
9
10
11private void removeEntry(Entry e) {
if (-- flushed == 0) {
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
flushedEntry = e.next;
}
}
几个值得注意的点:
1.netty中的缓冲区中的byteBuf是DirectByteBuf
2.调用write方法实际是把数据写到了单向链表中
3.调用flush才是真正的把数据写到socket缓冲区