DotNetty系列三:编码解码器,IdleStateHandler心跳机制
Excerpt
在上一节基础上,实现编码解码器。1.创建一个类库项目。用于实现编码解码器。编码器:    public class CommonServerEncoder : MessageToByteEncoder<string>    {        protected override void Encode(IChannelHandlerContext context, s…
在上一节基础上,实现编码解码器。
1.创建一个类库项目。用于实现编码解码器。
编码器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
   | public class CommonServerEncoder : MessageToByteEncoder<string>     {     protected override void Encode(IChannelHandlerContext context, string message, IByteBuffer output)             {         byte[] messageBytes = Encoding.UTF8.GetBytes(message);                     IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length);                     initialMessage.WriteBytes(messageBytes);                     output.WriteBytes(initialMessage);             }     }
  public class CommonClientEncoder : MessageToByteEncoder<string>     {     protected override void Encode(IChannelHandlerContext context, string message, IByteBuffer output)             {         byte[] messageBytes = Encoding.UTF8.GetBytes(message);                     IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length);                     initialMessage.WriteBytes(messageBytes);                     output.WriteBytes(initialMessage);             }     }
   | 
 
解码器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
   | public class CommonServerDecoder : ByteToMessageDecoder     {     protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)             {         byte[] array = new byte[input.ReadableBytes];                     input.GetBytes(input.ReaderIndex, array, 0, input.ReadableBytes);                     input.Clear();                     output.Add(array);             }     }
  public class CommonClientDecoder : ByteToMessageDecoder     {     protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)             {         byte[] array = new byte[input.ReadableBytes];                     input.GetBytes(input.ReaderIndex, array, 0, input.ReadableBytes);                     input.Clear();                     output.Add(array);             }     }
   | 
 
2.服务端里添加:
                        //配置编码解码器
                        pipeline.AddLast(new CommonServerEncoder());
                        pipeline.AddLast(new CommonServerDecoder());
客户端里添加:
                        //配置编码解码器
                        pipeline.AddLast(new CommonClientEncoder());
                        pipeline.AddLast(new CommonClientDecoder());
3.服务端接收和发送:
1 2 3 4 5 6 7 8 9
   | public override void ChannelRead(IChannelHandlerContext context, object message)         {     if (message is byte[] o)                 {                         Console.WriteLine($"解码器方式,从客户端接收:{Encoding.UTF8.GetString(o)}:{DateTime.Now}");                }     string msg = "服务端从客户端接收到内容后返回,我是服务端";                 context.WriteAsync(msg);         }
   | 
 
客户端接收和发送:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
   | public override void ChannelActive(IChannelHandlerContext context)         {                Console.WriteLine("我是客户端.");                Console.WriteLine($"连接至服务端{context}.");     string message = "客户端1";                 context.WriteAndFlushAsync(message);         }
  public override void ChannelRead(IChannelHandlerContext context, object message)         {     if (message is byte[] o)                 {                         Console.WriteLine($"解码器方式,从服务端接收:{Encoding.UTF8.GetString(o)}:{DateTime.Now}");                 }         }
   | 
 
实现了上一节一样的效果。
4.IdleStateHandler心跳机制:
4.1服务端添加IdleStateHandler心跳检测处理器,添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理.
IdleStateHandler心跳检测每十五秒进行一次读检测,如果十五秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法.
                        // IdleStateHandler 心跳
                        //服务端为读IDLE
                        pipeline.AddLast(new IdleStateHandler(15, 0, 0));//第一个参数为读,第二个为写,第三个为读写全部
4.2服务端Handler重载UserEventTriggered:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
   | private int lossConnectCount = 0; public override void UserEventTriggered(IChannelHandlerContext context, object evt)        {                 Console.WriteLine("已经15秒未收到客户端的消息了!");     if (evt is IdleStateEvent eventState)                 {         if (eventState.State == IdleState.ReaderIdle)                        {                                lossConnectCount++;if (lossConnectCount > 2)                                 {                                         Console.WriteLine("关闭这个不活跃通道!");                                         context.CloseAsync();                                }                }                 }     else                 {         base.UserEventTriggered(context, evt);                 }         }
   | 
 
接收部分改为判断心跳:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
   | public override void ChannelRead(IChannelHandlerContext context, object message)         {     if (message is byte[] o)                 {                         Console.WriteLine($"解码器方式,从客户端接收:{Encoding.UTF8.GetString(o)}:{DateTime.Now}");         if (Encoding.UTF8.GetString(o).Contains("biubiu:"))                         {             string temp = "服务端接收到心跳连接";                                 context.WriteAsync(temp);return;                         }                 }     string msg = "服务端从客户端接收到内容后返回,我是服务端";                 context.WriteAsync(msg);        }
   | 
 
4.3客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;
设定IdleStateHandler心跳检测每十秒进行一次写检测,如果十秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每十秒向服务端发送一次消息;
                        // IdleStateHandler 心跳
                        //客户端为写IDLE
                        pipeline.AddLast(new IdleStateHandler(0, 10, 0));//第一个参数为读,第二个为写,第三个为读写全部
4.4客户端Handler重载UserEventTriggered:
1 2 3 4 5 6 7 8 9 10
   | public override void UserEventTriggered(IChannelHandlerContext context, object evt)        {                Console.WriteLine("客户端循环心跳监测发送: " + DateTime.Now);     if (evt is IdleStateEvent eventState)                 {         if (eventState.State == IdleState.WriterIdle)                         {                                 context.WriteAndFlushAsync($"biubiu:{DateTime.Now}");                  }                }        }
  | 
 
4.5实现效果:
5.群发:将客户端上下线通知,群发至所有客户端。只在服务端修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
   | static volatile IChannelGroup groups; public override void HandlerAdded(IChannelHandlerContext context)         {                 Console.WriteLine($"客户端{context}上线.");                                                                                base.HandlerAdded(context);                 IChannelGroup g = groups;if (g == null)                 {         lock (this)          {             if (groups == null)                                 {                                         g = groups = new DefaultChannelGroup(context.Executor);             }                        }                 }                g.Add(context.Channel);                 groups.WriteAndFlushAsync($"欢迎{context.Channel.RemoteAddress}加入.");         }
  public override void HandlerRemoved(IChannelHandlerContext context)         {                 Console.WriteLine($"客户端{context}下线.");     base.HandlerRemoved(context);                 groups.Remove(context.Channel);                 groups.WriteAndFlushAsync($"恭送{context.Channel.RemoteAddress}离开.");        }
   | 
 
实现效果:

项目下载地址:项目下载