DotNetty系列三:编码解码器,IdleStateHandler心跳机制
Excerpt 在上一节基础上,实现编码解码器。1.创建一个类库项目。用于实现编码解码器。编码器: public class CommonServerEncoder : MessageToByteEncoder<string> { protected override void Encode(IChannelHandlerContext context, s…
在上一节基础上,实现编码解码 器。
1.创建一个类库项目。用于实现编码解码器。
编码器 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class CommonServerEncoder : MessageToByteEncoder <string > { protected override void Encode (IChannelHandlerContext context, string message, IByteBuffer output ) { byte [] messageBytes = Encoding.UTF8.GetBytes(message); IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length); initialMessage.WriteBytes(messageBytes); output.WriteBytes(initialMessage); } } public class CommonClientEncoder : MessageToByteEncoder <string > { protected override void Encode (IChannelHandlerContext context, string message, IByteBuffer output ) { byte [] messageBytes = Encoding.UTF8.GetBytes(message); IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length); initialMessage.WriteBytes(messageBytes); output.WriteBytes(initialMessage); } }
解码器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class CommonServerDecoder : ByteToMessageDecoder { protected override void Decode (IChannelHandlerContext context, IByteBuffer input, List<object > output ) { byte [] array = new byte [input.ReadableBytes]; input.GetBytes(input.ReaderIndex, array, 0 , input.ReadableBytes); input.Clear(); output.Add(array); } } public class CommonClientDecoder : ByteToMessageDecoder { protected override void Decode (IChannelHandlerContext context, IByteBuffer input, List<object > output ) { byte [] array = new byte [input.ReadableBytes]; input.GetBytes(input.ReaderIndex, array, 0 , input.ReadableBytes); input.Clear(); output.Add(array); } }
2.服务端里添加:
//配置编码解码器 pipeline.AddLast(new CommonServerEncoder()); pipeline.AddLast(new CommonServerDecoder());
客户端里添加:
//配置编码解码器 pipeline.AddLast(new CommonClientEncoder()); pipeline.AddLast(new CommonClientDecoder());
3.服务端接收和发送:
1 2 3 4 5 6 7 8 9 public override void ChannelRead (IChannelHandlerContext context, object message ) { if (message is byte [] o) { Console.WriteLine($"解码器方式,从客户端接收:{Encoding.UTF8.GetString(o)} :{DateTime.Now} " ); } string msg = "服务端从客户端接收到内容后返回,我是服务端" ; context.WriteAsync(msg); }
客户端接收和发送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public override void ChannelActive (IChannelHandlerContext context ) { Console.WriteLine("我是客户端." ); Console.WriteLine($"连接至服务端{context} ." ); string message = "客户端1" ; context.WriteAndFlushAsync(message); } public override void ChannelRead (IChannelHandlerContext context, object message ) { if (message is byte [] o) { Console.WriteLine($"解码器方式,从服务端接收:{Encoding.UTF8.GetString(o)} :{DateTime.Now} " ); } }
实现了上一节一样的效果。
4.IdleStateHandler心跳机制:
4.1服务端添加IdleStateHandler心跳检测处理器,添加自定义处理Handler 类实现userEventTriggered()方法作为超时事件的逻辑处理.
IdleStateHandler心跳检测每十五秒进行一次读检测,如果十五秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法.
// IdleStateHandler 心跳 //服务端为读IDLE pipeline.AddLast(new IdleStateHandler(15, 0, 0));//第一个参数为读,第二个为写,第三个为读写全部
4.2服务端Handler重载UserEventTriggered:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private int lossConnectCount = 0 ;public override void UserEventTriggered (IChannelHandlerContext context, object evt ) { Console.WriteLine("已经15秒未收到客户端的消息了!" ); if (evt is IdleStateEvent eventState) { if (eventState.State == IdleState.ReaderIdle) { lossConnectCount++;if (lossConnectCount > 2 ) { Console.WriteLine("关闭这个不活跃通道!" ); context.CloseAsync(); } } } else { base .UserEventTriggered(context, evt); } }
接收部分改为判断心跳:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public override void ChannelRead (IChannelHandlerContext context, object message ) { if (message is byte [] o) { Console.WriteLine($"解码器方式,从客户端接收:{Encoding.UTF8.GetString(o)} :{DateTime.Now} " ); if (Encoding.UTF8.GetString(o).Contains("biubiu:" )) { string temp = "服务端接收到心跳连接" ; context.WriteAsync(temp);return ; } } string msg = "服务端从客户端接收到内容后返回,我是服务端" ; context.WriteAsync(msg); }
4.3客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;
设定IdleStateHandler心跳检测每十秒进行一次写检测,如果十秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每十秒向服务端发送一次消息;
// IdleStateHandler 心跳 //客户端为写IDLE pipeline.AddLast(new IdleStateHandler(0, 10, 0));//第一个参数为读,第二个为写,第三个为读写全部
4.4客户端Handler重载UserEventTriggered:
1 2 3 4 5 6 7 8 9 10 public override void UserEventTriggered (IChannelHandlerContext context, object evt ) { Console.WriteLine("客户端循环心跳监测发送: " + DateTime.Now); if (evt is IdleStateEvent eventState) { if (eventState.State == IdleState.WriterIdle) { context.WriteAndFlushAsync($"biubiu:{DateTime.Now} " ); } } }
4.5实现效果:
5.群发:将客户端上下线通知,群发至所有客户端。只在服务端修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 static volatile IChannelGroup groups;public override void HandlerAdded (IChannelHandlerContext context ) { Console.WriteLine($"客户端{context} 上线." ); base .HandlerAdded(context); IChannelGroup g = groups;if (g == null ) { lock (this ) { if (groups == null ) { g = groups = new DefaultChannelGroup(context.Executor); } } } g.Add(context.Channel); groups.WriteAndFlushAsync($"欢迎{context.Channel.RemoteAddress} 加入." ); } public override void HandlerRemoved (IChannelHandlerContext context ) { Console.WriteLine($"客户端{context} 下线." ); base .HandlerRemoved(context); groups.Remove(context.Channel); groups.WriteAndFlushAsync($"恭送{context.Channel.RemoteAddress} 离开." ); }
实现效果:
项目下载地址:项目下载