唐山企業(yè)網(wǎng)站建設(shè)濟(jì)南百度
文章目錄
- 前言
- 一、ChannelPipeline 接口
- 1.1 創(chuàng)建 ChannelPipeline
- 1.2 ChannelPipeline 事件傳輸機(jī)制
- 1.2.1 處理出站事件
- 1.2.2 處理入站事件
- 二、ChannelPipeline 中的 ChannelHandler
- 三、ChannelHandlerContext 接口
- 3.1 ChannelHandlerContext 與其他組件的關(guān)系
- 3.2 跳過(guò)某些 ChannelHandler
- 總結(jié)
前言
我們?cè)谇懊娴奈恼轮幸矊?duì)ChannelPipeline接口做了初步的介紹。
- Netty 概述(一)
- Netty 架構(gòu)設(shè)計(jì)(二)
- Netty Channel 概述(三)
- Netty ChannelHandler(四)
一、ChannelPipeline 接口
ChannelPipeline接口采用了責(zé)任鏈設(shè)計(jì)模式,底層采用雙向鏈表的數(shù)據(jù)結(jié)構(gòu),將鏈上的各個(gè)處理器串聯(lián)起來(lái)
??蛻?hù)端每一個(gè)請(qǐng)求的到來(lái),ChannelPipeline中所有的處理器都有機(jī)會(huì)處理它。
每一個(gè)新創(chuàng)建的Channel都將會(huì)被分配一個(gè)新的ChannelPipeline。這項(xiàng)關(guān)聯(lián)是永久性的;Channel既不能附加另一個(gè)ChannelPipeline,也不能分離其當(dāng)前的。
1.1 創(chuàng)建 ChannelPipeline
ChannelPipeline數(shù)據(jù)管道是與Channel管道綁定的,一個(gè)Channel通道對(duì)應(yīng)一個(gè)ChannelPipeline,ChannelPipeline是在Channel初始化時(shí)被創(chuàng)建。
觀察下面這個(gè)實(shí)例:
public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap(); // (2)b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3).childHandler(new ChannelInitializer<SocketChannel>() { // (4)@Overridepublic void initChannel(SocketChannel ch) throws Exception {// 添加ChannelHandler到ChannelPipelinech.pipeline().addLast(new DiscardServerHandler());}}).option(ChannelOption.SO_BACKLOG, 128) // (5).childOption(ChannelOption.SO_KEEPALIVE, true); // (6)// 綁定端口,開(kāi)始接收進(jìn)來(lái)的連接ChannelFuture f = b.bind(port).sync(); // (7)System.out.println("DiscardServer已啟動(dòng),端口:" + port);// 等待服務(wù)器 socket 關(guān)閉 。// 在這個(gè)例子中,這不會(huì)發(fā)生,但你可以?xún)?yōu)雅地關(guān)閉你的服務(wù)器。f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}
}
從上述代碼中可以看到,當(dāng)ServerBootstrap初始化后,直接就可以獲取到SocketChannel上的ChannelPipeline,而無(wú)需手動(dòng)實(shí)例化,因?yàn)?Netty 會(huì)為每個(gè)Channel連接創(chuàng)建一個(gè)ChannelPipeline。
Channel的大部分子類(lèi)都繼承了AbstractChannel,在創(chuàng)建實(shí)例時(shí)也會(huì)調(diào)用AbstractChannel構(gòu)造器。在AbstractChannel構(gòu)造器中會(huì)創(chuàng)建ChannelPipeline管道實(shí)例,核心代碼如下:
protected AbstractChannel(Channel parent) {this.parent = parent;this.id = this.newId();this.unsafe = this.newUnsafe();this.pipeline = this.newChannelPipeline();
}protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);
}
從上述代碼中可以看出,在創(chuàng)建Channel時(shí),會(huì)由Channel創(chuàng)建DefaultChannelPipeline類(lèi)的實(shí)例。DefaultChannelPipeline是ChannelPipeline的默認(rèn)實(shí)現(xiàn)。
pipeline是AbstractChannel的屬性,內(nèi)部維護(hù)著一個(gè)以AbstractChannelHandlerContext為節(jié)點(diǎn)的雙向鏈表,創(chuàng)建的head和tail節(jié)點(diǎn)分別指向鏈表頭尾,源碼如下:
public class DefaultChannelPipeline implements ChannelPipeline { protected DefaultChannelPipeline(Channel channel) {this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");this.succeededFuture = new SucceededChannelFuture(channel, (EventExecutor)null);this.voidPromise = new VoidChannelPromise(channel, true);this.tail = new DefaultChannelPipeline.TailContext(this);this.head = new DefaultChannelPipeline.HeadContext(this);this.head.next = this.tail;this.tail.prev = this.head;}...final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {TailContext(DefaultChannelPipeline pipeline) {super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, DefaultChannelPipeline.TailContext.class);this.setAddComplete();}...}final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, DefaultChannelPipeline.HeadContext.class);this.unsafe = pipeline.channel().unsafe();this.setAddComplete();}...}...
}
從上述源碼可以看到,TailContext和HeadContext都繼承了AbstractChannelHandlerContext,并實(shí)現(xiàn)了ChannelHandler接口。AbstractChannelHandlerContext內(nèi)部維護(hù)著next、prev鏈表指針和入站、出站節(jié)點(diǎn)方向等。其中TailContext實(shí)現(xiàn)了ChannelInboundHandler,HeadContext實(shí)現(xiàn)了ChannelOutboundHandler和ChannelInboundHandler。
1.2 ChannelPipeline 事件傳輸機(jī)制
通過(guò)ChannelPipeline的addFirst()方法來(lái)添加ChannelHandler,并為這個(gè)ChannelHandler創(chuàng)建一個(gè)對(duì)應(yīng)的DefaultChannelHandlerContext實(shí)例。
public class DefaultChannelPipeline implements ChannelPipeline { //...public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {AbstractChannelHandlerContext newCtx;synchronized(this) {checkMultiplicity(handler);name = this.filterName(name, handler);newCtx = this.newContext(group, name, handler);this.addFirst0(newCtx);if (!this.registered) {newCtx.setAddPending();this.callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {this.callHandlerAddedInEventLoop(newCtx, executor);return this;}}this.callHandlerAdded0(newCtx);return this;}//...private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {return new DefaultChannelHandlerContext(this, this.childExecutor(group), name, handler);}//...}
1.2.1 處理出站事件
當(dāng)處理出站事件時(shí),channelRead()方法的示例如下:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(ctx.channel().remoteAddress() + " -> Server :" + msg);// 寫(xiě)消息到管道ctx.write(msg);// 寫(xiě)消息}//...
}
上述代碼中的write()方法會(huì)觸發(fā)一個(gè)出站事件,該方法會(huì)調(diào)用DefaultChannelPipeline上的write()方法。
public final ChannelFuture write(Object msg) {return this.tail.write(msg);
}
從上述源碼可以看到,調(diào)用的是DefaultChannelPipeline上尾部節(jié)點(diǎn)(tail)的write方法。
上述方法最終會(huì)調(diào)用到DefaultChannelHandlerContext的write()方法。
private void write(Object msg, boolean flush, ChannelPromise promise) {ObjectUtil.checkNotNull(msg, "msg");try {if (this.isNotValidPromise(promise, true)) {ReferenceCountUtil.release(msg);return;}} catch (RuntimeException var8) {ReferenceCountUtil.release(msg);throw var8;}AbstractChannelHandlerContext next = this.findContextOutbound(flush ? 98304 : '耀');Object m = this.pipeline.touch(msg, next);EventExecutor executor = next.executor();if (executor.inEventLoop()) {if (flush) {next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {AbstractChannelHandlerContext.WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush);if (!safeExecute(executor, task, promise, m, !flush)) {task.cancel();}}}
上述的write()方法會(huì)查找下一個(gè)出站的節(jié)點(diǎn),也就是當(dāng)前ChannelHandler后的一個(gè)出站類(lèi)型的ChannelHandler,并調(diào)用下一個(gè)節(jié)點(diǎn)的invokeWrite()方法。
void invokeWrite(Object msg, ChannelPromise promise) {if (this.invokeHandler()) {this.invokeWrite0(msg, promise);} else {this.write(msg, promise);}}
接著調(diào)用invokeWrite0()方法,該方法最終調(diào)用ChannelOutboundHandler的write方法。
private void invokeWrite0(Object msg, ChannelPromise promise) {try {((ChannelOutboundHandler)this.handler()).write(this, msg, promise);} catch (Throwable var4) {notifyOutboundHandlerException(var4, promise);}}
至此,處理完成了第一個(gè)節(jié)點(diǎn)的處理,開(kāi)始執(zhí)行下一個(gè)節(jié)點(diǎn)并不斷循環(huán)。
所以,處理出站事件時(shí),數(shù)據(jù)傳輸?shù)姆较蚴菑奈膊抗?jié)點(diǎn)tail到頭部節(jié)點(diǎn)head。
1.2.2 處理入站事件
入站事件處理的起點(diǎn)是觸發(fā)ChannelPipeline fire方法,例如fireChannelActive()方法的示例如下:
public class DefaultChannelPipeline implements ChannelPipeline { //...public final ChannelPipeline fireChannelActive() {AbstractChannelHandlerContext.invokeChannelActive(this.head);return this;}//...
}
從上述源碼可以看到,處理的節(jié)點(diǎn)是頭部節(jié)點(diǎn)head。AbstractChannelHandlerContext.invokeChannelActive方法定義如下:
static void invokeChannelActive(final AbstractChannelHandlerContext next) {EventExecutor executor = next.executor();if (executor.inEventLoop()) {next.invokeChannelActive();} else {executor.execute(new Runnable() {public void run() {next.invokeChannelActive();}});}}
該方法最終調(diào)用ChannelInboundHandler的channelActive方法。
private void invokeChannelActive() {if (this.invokeHandler()) {try {((ChannelInboundHandler)this.handler()).channelActive(this);} catch (Throwable var2) {this.invokeExceptionCaught(var2);}} else {this.fireChannelActive();}}
至此完成了第一個(gè)節(jié)點(diǎn)的處理,開(kāi)始執(zhí)行下一個(gè)節(jié)點(diǎn)的不斷循環(huán)。
所以,處理入站事件時(shí),數(shù)據(jù)傳輸?shù)姆较蚴菑念^部節(jié)點(diǎn)head到尾部節(jié)點(diǎn)tail。
二、ChannelPipeline 中的 ChannelHandler
從上述的ChannelPipeline 接口源碼可以看出,ChannelPipeline 是通過(guò)addXxx或者removeXxx方法來(lái)將ChannelHandler動(dòng)態(tài)的添加到ChannelPipeline中,或者從ChannelPipeline移除ChannelHandler的。那么ChannelPipeline是如何保障并發(fā)訪(fǎng)問(wèn)時(shí)的安全呢?
以addLast方法為例,DefaultChannelPipeline的源碼如下:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {AbstractChannelHandlerContext newCtx;//synchronized 保障線(xiàn)程安全synchronized(this) {checkMultiplicity(handler);newCtx = this.newContext(group, this.filterName(name, handler), handler);this.addLast0(newCtx);if (!this.registered) {newCtx.setAddPending();this.callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {this.callHandlerAddedInEventLoop(newCtx, executor);return this;}}this.callHandlerAdded0(newCtx);return this;
}
從上述源碼可以看到,使用synchronized關(guān)鍵字保障了線(xiàn)程的安全訪(fǎng)問(wèn)。其他方法的實(shí)現(xiàn)方式也是類(lèi)似。
三、ChannelHandlerContext 接口
ChannelHandlerContext 接口是聯(lián)系ChannelHandler和ChannelPipeline 之間的紐帶。
每當(dāng)有ChannelHandler添加到ChannelPipeline 中時(shí),都會(huì)創(chuàng)建ChannelHandlerContext 。
ChannelHandlerContext 的主要功能是管理它所關(guān)聯(lián)的ChannelHandler和在同一個(gè)ChannelPipeline 中的其他ChannelHandler之間的交互。
例如,ChannelHandlerContext 可以通知ChannelPipeline 中的下一個(gè)ChannelHandler開(kāi)始執(zhí)行及動(dòng)態(tài)修改其所屬的ChannelPipeline 。
ChannelHandlerContext 中包含了許多方法,其中一些方法也出現(xiàn)在Channel和ChannelPipeline 中。如果通過(guò)Channel或ChannelPipeline 的實(shí)例來(lái)調(diào)用這些方法,它們就會(huì)在整個(gè)ChannelPipeline 中傳播。相比之下,一樣的方法在ChannelHandlerContext 的實(shí)例上調(diào)用,就只會(huì)從當(dāng)前ChannelHandler開(kāi)始并傳播到相關(guān)管道中的下一個(gè)有處理事件能力的ChannelHandler中。因此ChannelHandlerContext 所包含的事件流比其他類(lèi)中同樣的方法都要短,利用這一點(diǎn)可以盡可能提高性能。
3.1 ChannelHandlerContext 與其他組件的關(guān)系
下圖展示了ChannelPipeline 、Channel、ChannelHandler和ChannelHandlerContext 之間的關(guān)系做了如下說(shuō)明:
- Channel被綁定到ChannelPipeline 上。
- 和Channel綁定的ChannelPipeline 包含了所有的ChannelHandler。
- ChannelHandler。
- 當(dāng)添加ChannelHandler到ChannelPipeline 時(shí),ChannelHandlerContext 被創(chuàng)建。
3.2 跳過(guò)某些 ChannelHandler
下面的代碼,展示了從ChannelHandlerContext 獲取到Channel的引用,并通過(guò)調(diào)用Channel上的write()方法來(lái)觸發(fā)一個(gè)寫(xiě)事件到流中。
ChannelHandlerContext ctx = context;
Channel channel = ctx.channel(); //獲取ChannelHandlerContext上的Channel
channel.write(msg);
以下代碼展示了從ChannelHandlerContext 獲取到ChannelPipeline 。
ChannelHandlerContext ctx = context;
ChannelPipeline pipeline = ctx.pipeline(); //獲取ChannelHandlerContext上的ChannelPipeline
pipeline.write(msg);
上述的兩個(gè)示例,事件流是一樣的。雖然被調(diào)用的Channel和ChannelPipeline 上的write()方法將一直傳播事件通過(guò)整個(gè)ChannelPipeline ,但是在ChannelHandler的級(jí)別上,事件從一個(gè)ChannelHandler到下一個(gè)ChannelHandler的移動(dòng)是由ChannelHandlerContext 上的調(diào)用完成的。
下圖展示了Channel或者ChannelPipeline 進(jìn)行的事件傳播機(jī)制。
在上圖中可以看出:
- 事件傳遞給ChannelPipeline 的第一個(gè)ChannelHandler;
- ChannelHandler通過(guò)關(guān)聯(lián)的ChannelHandlerContext 傳遞事件給ChannelPipeline 中的下一個(gè)ChannelHandler。
- ChannelHandler通過(guò)關(guān)聯(lián)的ChannelHandlerContext 傳遞事件給ChannelPipeline 中的下一個(gè)ChannelHandler。
從上面的流程可以看出,如果通過(guò)Channel或ChannelPipeline 的實(shí)例來(lái)調(diào)用這些方法,它們肯定會(huì)在整個(gè)ChannelPipeline 中傳播。
那么是否可以跳過(guò)某些處理器呢?答案是肯定的。
通過(guò)減少ChannelHandler不感興趣的事件的傳遞減少開(kāi)銷(xiāo),并排除掉特定的對(duì)此事件感興趣的處理器的處理以提升性能。想要實(shí)現(xiàn)從一個(gè)特定的ChannelHandler開(kāi)始處理,必須引用與此ChannelHandler的前一個(gè)ChannelHandler關(guān)聯(lián)的ChannelHandlerContext 。這個(gè)ChannelHandlerContext 將會(huì)調(diào)用與自身關(guān)聯(lián)的ChannelHandler的下一個(gè)ChannelHandler,代碼如下:
ChannelHandlerContext ctx = context;
ctx.write(msg);
直接調(diào)用ChannelHandlerContext 的write()方法,將會(huì)把緩沖區(qū)發(fā)送到下一個(gè)ChannelHandler。
如下圖,消息會(huì)將從下一個(gè)ChannelHandler開(kāi)始流過(guò)ChannelPipeline ,繞過(guò)所有在它之前的ChannelHandler。
- 執(zhí)行ChannelHandlerContext 方法調(diào)用。
- 事件發(fā)送到了下一個(gè)ChannelHandler。
- 經(jīng)過(guò)最后一個(gè)ChannelHandler后,事件從ChannelPipeline 中移除。
當(dāng)調(diào)用某個(gè)特定的ChannelHandler操作時(shí),它尤為有用。
例如:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(ctx.channel().remoteAddress() + " -> Server :" + msg);// 寫(xiě)消息到管道ctx.write(msg);// 寫(xiě)消息ctx.flush(); // 沖刷消息// 上面兩個(gè)方法等同于 ctx.writeAndFlush(msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// 當(dāng)出現(xiàn)異常就關(guān)閉連接cause.printStackTrace();ctx.close();}
}
總結(jié)
以上就是關(guān)于ChannelPipeline 的源碼分析,相信認(rèn)真看完了,你就明白ChannelPipeline 、Channel、ChannelHandler和ChannelHandlerContext 之間的關(guān)系。下節(jié)我們繼續(xù)來(lái)剖析 Netty 的源碼。