凈水器公司網(wǎng)站源碼小江seo
前言:
???????? ?公司的加密機(jī)調(diào)度系統(tǒng)一直使用的是http請(qǐng)求調(diào)度的方式去調(diào)度,但是會(huì)出現(xiàn)網(wǎng)絡(luò)故障導(dǎo)致某個(gè)客戶端或者服務(wù)端斷線的情況,導(dǎo)致很多請(qǐng)求信息以及回執(zhí)信息丟失的情況,接著我們拋棄了http的方式,改為Tcp的方式去建立客戶端和服務(wù)器之間的連接,并且要去實(shí)現(xiàn)斷線重連的功能,經(jīng)過(guò)討論后決定使用java中成熟的nio框架 – netty去解決這一系列的問(wèn)題。
1.?????? netty簡(jiǎn)單介紹:
在百度中對(duì)netty的解釋是:
Netty是由JBOSS提供的一個(gè)java開源框架。Netty提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。
Netty框架并不只是封裝了多路復(fù)用的IO模型,也包括提供了傳統(tǒng)的阻塞式/非阻塞式 同步IO的模型封,Netty 是一個(gè)利用 Java 的高級(jí)網(wǎng)絡(luò)的能力,隱藏其背后的復(fù)雜性而提供一個(gè)易于使用的 API 的客戶端/服務(wù)器框架。其并發(fā)高、傳輸快、封裝好的特性受到了許多大公司的青睞,在這里我們就不過(guò)多的分析netty的原理和特性了,之后我會(huì)寫一篇文章專門寫一下從io到nio,再到netty的整個(gè)過(guò)程。重點(diǎn)講一下netty的魅力所在,今天我們已代碼實(shí)現(xiàn)為主,講解一下在springboot架構(gòu)中,用netty實(shí)現(xiàn)服務(wù)端和客戶端之間的通信以及斷線重連等機(jī)制。
?
2.?????? 服務(wù)端代碼:
首先,引入netty的pom依賴
?
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>5.0.0.Alpha2</version>
</dependency>
?
然后我們?cè)谂渲梦募袑懭敕?wù)端的ip和端口號(hào),用于連接
在springboot的application啟動(dòng)類中寫入服務(wù)端的啟動(dòng)start方法,用于在啟動(dòng)項(xiàng)目時(shí)自動(dòng)啟動(dòng)服務(wù)端
1 @SpringBootApplication2 public class Application implements CommandLineRunner {3 4 @Value("${netty.server.port}")5 private int port;6 7 @Value("${netty.server.host}")8 private String host;9
10 @Autowired
11 NettyServer server;
12
13 public static void main(String[] args) {
14 SpringApplication.run(Application.class, args);
15 }
16
17
18 @Override
19 public void run(String... strings) throws Exception {
20 this.startServer();
21
22 }
23
24 //啟動(dòng)service
25 public void startServer(){//這個(gè)類實(shí)現(xiàn)一個(gè)IP套接字地址(IP地址+端口號(hào))
26 InetSocketAddress address = new InetSocketAddress(host, port);
27 ChannelFuture future = server.start(address);
28
29 Runtime.getRuntime().addShutdownHook(new Thread(){
30 @Override
31 public void run() {
32 server.destroy();
33 }
34 });
35
36 future.channel().closeFuture().syncUninterruptibly();
37 }
38 }
39
40
41 }
ChannelFuture:
Future最早出現(xiàn)于JDK的java.util.concurrent.Future,它用于表示異步操作的結(jié)果.由于Netty的Future都是與異步I/O操作相關(guān)的,因此命名為ChannelFuture,代表它與Channel操作相關(guān).由于Netty中的所有I / O操作都是異步的,因此Netty為了解決調(diào)用者如何獲取異步操作結(jié)果的問(wèn)題而專門設(shè)計(jì)了ChannelFuture接口.?
因此,Channel與ChannelFuture可以說(shuō)形影不離的.
然后我們要去重點(diǎn)看server.start()
public class NettyServer {private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class);private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);private final EventLoopGroup bossGroup = new NioEventLoopGroup();private final EventLoopGroup workGroup = new NioEventLoopGroup();private Channel channel;/*** 開啟及服務(wù)線程*/public ChannelFuture start(InetSocketAddress address) {//服務(wù)端引導(dǎo)類ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workGroup)//通過(guò)ServerBootstrap的group方法,設(shè)置(1)中初始化的主從"線程池".channel(NioServerSocketChannel.class)//指定通道channel的類型,由于是服務(wù)端,故而是NioServerSocketChannel.childHandler(new NettyServerInitializer())//設(shè)置ServerSocketChannel的處理器.option(ChannelOption.SO_BACKLOG, 100)// 設(shè)置tcp協(xié)議的請(qǐng)求等待隊(duì)列.childOption(ChannelOption.SO_KEEPALIVE, true);//配置子通道也就是SocketChannel的選項(xiàng)ChannelFuture future = bootstrap.bind(address).syncUninterruptibly();logger.info("準(zhǔn)備接收——————");channel = future.channel();return future;}public void destroy() {if(channel != null) {channel.close();}channelGroup.close();workGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}
?在這里的設(shè)置中,.childHandler(new NettyServerInitializer()) 用于設(shè)置了服務(wù)器管道 NioServerSocketChannel 的處理器handler,
這個(gè)handler是我們自定義封裝的一些對(duì)channel的public class NettyServerInitializer extends ChannelInitializer<Channel>{
@Component
public class TcpMsgHandler extends ChannelInboundHandlerAdapter {
@Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//處理日志//pipeline.addLast(new LoggingHandler(LogLevel.INFO));//處理心跳pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));//消息編碼pipeline.addLast(new MessageEncoder());//粘包長(zhǎng)度控制pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4));//消息解碼pipeline.addLast(new MessageDecoder());//自定義handerpipeline.addLast(new TcpMsgHandler());}
}
IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));//消息編碼pipeline.addLast(new MessageEncoder());//粘包長(zhǎng)度控制pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4));//消息解碼pipeline.addLast(new MessageDecoder());//自定義handerpipeline.addLast(new TcpMsgHandler());}
}
?
ChannelPipeline :
Netty 的 Channel 過(guò)濾器實(shí)現(xiàn)原理與 Servlet Filter 機(jī)制一致,它將 Channel 的數(shù)據(jù)管道抽象為 ChannelPipeline,消息在 ChannelPipeline 中流動(dòng)和傳遞。ChannelPipeline 持有 I/O 事件攔截器 ChannelHandler 的鏈表,由 ChannelHandler 來(lái)對(duì) I/O 事件進(jìn)行具體的攔截和處理,可以方便地通過(guò)新增和刪除 ChannelHandler 來(lái)實(shí)現(xiàn)不同業(yè)務(wù)邏輯的定制,能夠?qū)崿F(xiàn)對(duì)修改封閉和對(duì)擴(kuò)展到支持。我們看到我們添加了idleStateHandler用來(lái)處理心跳,那么心跳究竟是什么呢,我們先來(lái)介紹一下心跳
心跳機(jī)制
- 心跳是在TCP長(zhǎng)連接中,客戶端和服務(wù)端定時(shí)向?qū)Ψ桨l(fā)送數(shù)據(jù)包通知對(duì)方自己還在線,保證連接的有效性的一種機(jī)制
- 在服務(wù)器和客戶端之間一定時(shí)間內(nèi)沒(méi)有數(shù)據(jù)交互時(shí), 即處于 idle 狀態(tài)時(shí), 客戶端或服務(wù)器會(huì)發(fā)送一個(gè)特殊的數(shù)據(jù)包給對(duì)方, 當(dāng)接收方收到這個(gè)數(shù)據(jù)報(bào)文后, 也立即發(fā)送一個(gè)特殊的數(shù)據(jù)報(bào)文, 回應(yīng)發(fā)送方, 此即一個(gè) PING-PONG 交互. 自然地, 當(dāng)某一端收到心跳消息后, 就知道了對(duì)方仍然在線, 這就確保 TCP 連接的有效性
在我們的服務(wù)端中,不會(huì)主動(dòng)發(fā)心跳給客戶端,只會(huì)對(duì)對(duì)應(yīng)的心跳消息,進(jìn)行回應(yīng),告訴那些給我發(fā)心跳的客戶端說(shuō):我還活著!
-
服務(wù)端添加IdleStateHandler心跳檢測(cè)處理器,并添加自定義處理Handler類實(shí)現(xiàn)userEventTriggered()方法作為超時(shí)事件的邏輯處理;
-
設(shè)定IdleStateHandler心跳檢測(cè)每五秒進(jìn)行一次讀檢測(cè),如果五秒內(nèi)ChannelRead()方法未被調(diào)用則觸發(fā)一次userEventTrigger()方法
?
?
<strong>TcpMsgHandler.java</strong>
?
@Component
public class TcpMsgHandler extends ChannelInboundHandlerAdapter {private final static Logger logger = LoggerFactory.getLogger("");@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) { }IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.READER_IDLE) {ctx.close();}} else {super.userEventTriggered(ctx, evt);}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {TcpMsg msg = (TcpMsg) obj;try {//處理心跳...ctx.writeAndFlush(msg);}}catch(Exception ex){logger.info(ex.getMessage());}}
}
在這里,我們的channelRead比較簡(jiǎn)單,只是將客戶端發(fā)來(lái)的心跳直接發(fā)回去了,實(shí)現(xiàn)了響應(yīng)客戶端心跳請(qǐng)求的目的,除了心跳,我們還可以去定義不同的消息類別,比如說(shuō)是加密請(qǐng)求,還是處理數(shù)據(jù)的請(qǐng)求,入庫(kù)的請(qǐng)求等等,
我們可以自己從channel中獲取到客戶端發(fā)過(guò)來(lái)的信息做處理,記得要即使響應(yīng),比如,心跳中,我們將msg又返回給了channel:
ctx.writeAndFlush(msg);
在handler中,decoder用于解碼的作用,將客戶端發(fā)來(lái)的ByteBuf流的形式,轉(zhuǎn)為我們需要的格式,可以轉(zhuǎn)為我們要的對(duì)象,或者是一個(gè)string字符串
<strong>MessageDecoder.java</strong>
public class MessageDecoder extends ByteToMessageDecoder {private Logger logger = LoggerFactory.getLogger("");@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int len = in.readableBytes();byte[] bytes = new byte[len];//將ByteBuf轉(zhuǎn)為byte數(shù)組in.readBytes(bytes);try {TcpMsg msg = TcpMsg.ByteToObj(bytes);out.add(msg);} catch (Exception ex) {logger.error("MessageDecoder",ex);}}}
//將ByteBuf轉(zhuǎn)為byte數(shù)組in.readBytes(bytes);try {TcpMsg msg = TcpMsg.ByteToObj(bytes);out.add(msg);} catch (Exception ex) {logger.error("MessageDecoder",ex);}}}
encoder負(fù)責(zé)在我們發(fā)送數(shù)據(jù)的時(shí)候,將我們的對(duì)象、或者是字符串轉(zhuǎn)為byte數(shù)組,然后輸出
public class MessageEncoder extends MessageToByteEncoder<TcpMsg>{private Logger logger = LoggerFactory.getLogger("");@Overrideprotected void encode(ChannelHandlerContext ctx, TcpMsg msg, ByteBuf out) throws Exception {try{if (msg.getType() != 0){//logger.info("send: " + msg.getType() + ":" + msg.getGuid() + ":" + msg.getBody());}byte[] src = msg.ToBytes();out.writeBytes(src);}catch (Exception e){logger.error("MessageEncoder",e);}}
}
?
3.?????? 客戶端代碼:
在application配置文件中加入服務(wù)端的主機(jī)名和端口號(hào)
?
netty.server.host = 127.0.0.1
netty.server.port = 9090
啟動(dòng)類Application:
@SpringBootApplication
public class Application{@Autowiredprivate NettyClient client;@Value("${netty.server.port}")private int port;@Value("${netty.server.host}")private String host;public static void main(String[] args) throws Exception {SpringApplication.run(NettyClientApplication.class, args);}@Beanpublic NettyClient nettyClient() {return new NettyClient();}@Overridepublic void run(String... arg0) throws Exception {client.start(host, port);}}
NettyClient.java: 客戶端啟動(dòng)類
?
@Component
public class NettyClient {//日志輸出private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);//主要連接地址private static String nettyHost = "";//備用連接地址private static String nettyHostRe = "";private static Integer nettyPort = 0;public boolean start(String host1,String host2,int port){//主要連接地址nettyHost = host1;//備用連接地址nettyHostRe = host2;nettyPort = port;//EventLoopGroup可以理解為是一個(gè)線程池,這個(gè)線程池用來(lái)處理連接、接受數(shù)據(jù)、發(fā)送數(shù)據(jù)EventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();//NioEventLoop//客戶端引導(dǎo)類Bootstrap bootstrap = new Bootstrap();//多線程處理bootstrap.group(nioEventLoopGroup);//指定通道類型為NioServerSocketChannel,一種異步模式bootstrap.channel(NioSocketChannel.class);//指定請(qǐng)求地址bootstrap.remoteAddress(new InetSocketAddress(nettyHost,port));bootstrap.option(ChannelOption.TCP_NODELAY,true);final ConnectionWatchdog watchDog = new ConnectionWatchdog(bootstrap, new HashedWheelTimer(), nettyHost,nettyHostRe, port) {@Overridepublic ChannelHandler[] handlers() {return new ChannelHandler[]{new MessageEncoder(),new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4),new MessageDecoder(),this,// 每隔5s的時(shí)間觸發(fā)一次userEventTriggered的方法,并且指定IdleState的狀態(tài)位是WRITER_IDLEnew IdleStateHandler(0, 1, 0, TimeUnit.SECONDS),// 實(shí)現(xiàn)userEventTriggered方法,并在state是WRITER_IDLE的時(shí)候發(fā)送一個(gè)心跳包到sever端,告訴server端我還活著new ClientHeartBeatHandler(),};}};final ChannelFuture future;try {synchronized (bootstrap) {bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(watchDog.handlers());}});future = bootstrap.connect().sync();// 鏈接服務(wù)器.調(diào)用sync()方法會(huì)同步阻塞//服務(wù)端連接ip:logger.info("目前服務(wù)端連接ip為" + nettyHost);}if (!future.isSuccess()) {logger.info("---- 連接服務(wù)器失敗,2秒后重試 ---------port=" + port);future.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {start(nettyHost,nettyHostRe,nettyPort);}}, 2L, TimeUnit.SECONDS);}} catch (Exception e) {logger.info("exception happends e {}", e);return false;}return true;}}
?
?
?
ConnectionWatchdog.java ?:重連檢測(cè)狗,當(dāng)發(fā)現(xiàn)當(dāng)前的鏈路不穩(wěn)定關(guān)閉之后,進(jìn)行重連
?
@ChannelHandler.Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask,ChannelHandlerHolder{//日志輸出private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class);//客戶端引導(dǎo)類private Bootstrap bootstrap;private Timer timer;private final String host;//備用服務(wù)端ipprivate final String host2;//使用ipprivate String useHost;private final int port;private volatile boolean reconnect = true;private int attempts;//刷新時(shí)間private volatile long refreshTime = 0L;//心跳連接標(biāo)識(shí)private volatile boolean heartBeatCheck = false;//通道private volatile Channel channel;//失敗次數(shù)private static int failCount;public ConnectionWatchdog(Bootstrap boot, Timer timer, String host,String host2, int port) {this.bootstrap = boot;this.timer = timer;this.host = host;this.host2 = host2;this.port = port;}public boolean isReconnect() {return reconnect;}public void setReconnect(boolean reconnect) {this.reconnect = reconnect;}//連接成功時(shí)調(diào)用的方法@Overridepublic void channelActive(final ChannelHandlerContext ctx) throws Exception {channel = ctx.channel();attempts = 0;reconnect =false;refreshTime = new Date().getTime();if (!heartBeatCheck) {heartBeatCheck = true;channel.eventLoop().scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {long time = new Date().getTime() - refreshTime;logger.info(String.valueOf(time));if (time > 5 * 1000L) {channel.close();logger.info("心跳檢查失敗");} else {logger.info("心跳檢查Successs");}}}, 5L, 5L, TimeUnit.SECONDS);}logger.info("Connects with {}.", channel);ctx.fireChannelActive();}/*** 因?yàn)殒溌窋嗟糁?#xff0c;會(huì)觸發(fā)channelInActive方法,進(jìn)行重連 2秒重連一次*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {reconnect = true;logger.warn("Disconnects with {}, doReconnect = {},attempts == {}", ctx.channel(), reconnect, attempts);if (reconnect) {/*if (attempts < 12) {attempts++;} else {reconnect = false;}*/long timeout = 2;logger.info("再過(guò) {} 秒客戶端將進(jìn)行重連",timeout);timer.newTimeout(this, timeout, TimeUnit.SECONDS);}}/** run啟動(dòng)方法* */public void run(Timeout timeout) throws Exception {//Future表示異步操作的結(jié)果final ChannelFuture future;if(failCount > 2){//使用備用ipif(host.equals(useHost)){useHost = host2;}else{useHost = host;}}else {if(StrUtil.IsNullOrEmpty(useHost)) {//首次重連useHost = host;}}synchronized (bootstrap) {future = bootstrap.connect(useHost, port);}//使用future監(jiān)聽結(jié)果,執(zhí)行異步操作結(jié)束后的回調(diào).future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(final ChannelFuture f) throws Exception {boolean succeed = f.isSuccess();logger.warn("連接通過(guò) {}, {}.", useHost + ":" + port, succeed ? "成功" : "失敗");if (!succeed) {logger.info("重連失敗");failCount ++;f.channel().pipeline().fireChannelInactive();}else{failCount = 0;logger.info("重連成功");}}});}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof TcpMsg) {TcpMsg heartMsg = (TcpMsg) msg;if (heartMsg.getType()>=0) {refreshTime = new Date().getTime();}logger.warn("得到服務(wù)器響應(yīng),響應(yīng)內(nèi)容為"+ ((TcpMsg) msg).getBody());}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);Channel channel = ctx.channel();logger.info("客戶端:"+channel.remoteAddress()+"網(wǎng)絡(luò)異常");cause.printStackTrace();if(channel.isActive())ctx.close();}}
//連接成功時(shí)調(diào)用的方法@Overridepublic void channelActive(final ChannelHandlerContext ctx) throws Exception {channel = ctx.channel();attempts = 0;reconnect =false;refreshTime = new Date().getTime();if (!heartBeatCheck) {heartBeatCheck = true;channel.eventLoop().scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {long time = new Date().getTime() - refreshTime;logger.info(String.valueOf(time));if (time > 5 * 1000L) {channel.close();logger.info("心跳檢查失敗");} else {logger.info("心跳檢查Successs");}}}, 5L, 5L, TimeUnit.SECONDS);}logger.info("Connects with {}.", channel);ctx.fireChannelActive();}/*** 因?yàn)殒溌窋嗟糁?#xff0c;會(huì)觸發(fā)channelInActive方法,進(jìn)行重連 2秒重連一次*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {reconnect = true;logger.warn("Disconnects with {}, doReconnect = {},attempts == {}", ctx.channel(), reconnect, attempts);if (reconnect) {/*if (attempts < 12) {attempts++;} else {reconnect = false;}*/long timeout = 2;logger.info("再過(guò) {} 秒客戶端將進(jìn)行重連",timeout);timer.newTimeout(this, timeout, TimeUnit.SECONDS);}}/** run啟動(dòng)方法* */public void run(Timeout timeout) throws Exception {//Future表示異步操作的結(jié)果final ChannelFuture future;if(failCount > 2){//使用備用ipif(host.equals(useHost)){useHost = host2;}else{useHost = host;}}else {if(StrUtil.IsNullOrEmpty(useHost)) {//首次重連useHost = host;}}synchronized (bootstrap) {future = bootstrap.connect(useHost, port);}//使用future監(jiān)聽結(jié)果,執(zhí)行異步操作結(jié)束后的回調(diào).future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(final ChannelFuture f) throws Exception {boolean succeed = f.isSuccess();logger.warn("連接通過(guò) {}, {}.", useHost + ":" + port, succeed ? "成功" : "失敗");if (!succeed) {logger.info("重連失敗");failCount ++;f.channel().pipeline().fireChannelInactive();}else{failCount = 0;logger.info("重連成功");}}});}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof TcpMsg) {TcpMsg heartMsg = (TcpMsg) msg;if (heartMsg.getType()>=0) {refreshTime = new Date().getTime();}logger.warn("得到服務(wù)器響應(yīng),響應(yīng)內(nèi)容為"+ ((TcpMsg) msg).getBody());}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);Channel channel = ctx.channel();logger.info("客戶端:"+channel.remoteAddress()+"網(wǎng)絡(luò)異常");cause.printStackTrace();if(channel.isActive())ctx.close();}}
這里我們定義了一個(gè)變量:?refreshTime,當(dāng)我們從channel中read到了服務(wù)端發(fā)來(lái)的心跳響應(yīng)消息的話,就刷新refreshTime為當(dāng)前時(shí)間
當(dāng)連接成功時(shí),會(huì)觸發(fā)channelActive 方法,在這里我們開啟了一個(gè)定時(shí)任務(wù)去判斷refreshTime和當(dāng)前時(shí)間的時(shí)間差,超過(guò)5秒說(shuō)明斷線了,要進(jìn)行重連,我這里由于配置了兩個(gè)服務(wù)器,所有在我的邏輯中,嘗試連接2次以上連不上就去連另一個(gè)服務(wù)器去了
?
下面的handler用于發(fā)送心跳消息,實(shí)現(xiàn)userEventTriggered方法,并在state是WRITER_IDLE的時(shí)候發(fā)送一個(gè)心跳包到sever端,告訴server端我還活著
?
@Component
public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {private static final Logger logger = LoggerFactory.getLogger(ClientHeartBeatHandler.class);@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {clientname = ReadFileUtil.readFile("C:/CrawlProgram/wrapper_nettyClient/name.txt");if (evt instanceof IdleStateEvent) {IdleState state = ((IdleStateEvent) evt).state();if (state == IdleState.WRITER_IDLE) {//用于心跳的客戶端類型為0int type = 0;//客戶端機(jī)器名String body = clientname;TcpMsg msg = new TcpMsg(type,body);try {ctx.writeAndFlush(msg).sync();logger.info("發(fā)送消息成功,消息類型為:"+type+",請(qǐng)求id為" + msg.getGuid() + ",客戶端機(jī)器號(hào)為:" + msg.getBody());} catch (Exception ex) {ex.printStackTrace();logger.info("發(fā)送失敗");}}} else {super.userEventTriggered(ctx, evt);}}}
?
然后就是和服務(wù)端一樣的decoder、encoder過(guò)程,不同的是,我們?cè)赿ecoder的時(shí)候使用了線程池去將任務(wù)放入隊(duì)列中去,防止請(qǐng)求慢的時(shí)候丟失任務(wù)請(qǐng)求
MessageDecoder.java
public class MessageDecoder extends ByteToMessageDecoder {private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);@Autowiredprivate VisiableThreadPoolTaskExecutor visiableThreadPoolTaskExecutor;//線程池常量public static VisiableThreadPoolTaskExecutor executor;private TcpMsg tcpMsg;List<Object> out;// 用@PostConstruct方法引導(dǎo)綁定@PostConstructpublic void init() {executor = visiableThreadPoolTaskExecutor;encryptService = encrypt;orderService = order;}@Overridepublic void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {this.context = ctx;this.out = out;int len = in.readableBytes();if (len > 0) {logger.info("得到返回?cái)?shù)據(jù),長(zhǎng)度為" + len);byte[] bytes = new byte[len];in.readBytes(bytes);TcpMsg msg = TcpMsg.ByteToObj(bytes);this.tcpMsg = msg;logger.info("start asyncServiceExecutor");executor.execute(new Runnable() {@Overridepublic void run() {executeTask();}});logger.info("end executeAsync");}}}
這里,我們使用了netty來(lái)實(shí)現(xiàn)了服務(wù)端、客戶端通信、心跳檢測(cè)的功能。體會(huì)到了netty的傳輸效率高、封裝好的特性,用起來(lái)簡(jiǎn)單、實(shí)用。我們不僅可以做斷線重連、還可以做很多業(yè)務(wù)請(qǐng)求,可以配置多臺(tái)客戶端去做不同的事情,來(lái)達(dá)到服務(wù)器調(diào)度的目的。
? 歸根結(jié)底,netty還是一個(gè)框架的東西,我們還是沒(méi)有過(guò)多的去看透nio的本質(zhì)、我們要做的不僅僅是會(huì)用netty,而且還要了解nio、了解netty的實(shí)現(xiàn)原理,它的底層是如何封裝的,希望大家多去研究,我們一起去搞懂它
?
Netty 的 Channel 過(guò)濾器實(shí)現(xiàn)原理與 Servlet Filter 機(jī)制一致,它將 Channel 的數(shù)據(jù)管道抽象為 ChannelPipeline,消息在 ChannelPipeline 中流動(dòng)和傳遞。ChannelPipeline 持有 I/O 事件攔截器 ChannelHandler 的鏈表,由 ChannelHandler 來(lái)對(duì) I/O 事件進(jìn)行具體的攔截和處理,可以方便地通過(guò)新增和刪除 ChannelHandler 來(lái)實(shí)現(xiàn)不同業(yè)務(wù)邏輯的定制,能夠?qū)崿F(xiàn)對(duì)修改封閉和對(duì)擴(kuò)展到支持。
posted @ 2018-08-15 17:55 袋?飼養(yǎng)員 閱讀(...) 評(píng)論(...) 編輯 收藏