0%

DotNetty系列三:编码解码器,IdleStateHandler心跳机制

DotNetty系列三:编码解码器,IdleStateHandler心跳机制

Excerpt

在上一节基础上,实现编码解码器。1.创建一个类库项目。用于实现编码解码器。编码器: public class CommonServerEncoder : MessageToByteEncoder<string> { protected override void Encode(IChannelHandlerContext context, s…


在上一节基础上,实现编码解码器。

1.创建一个类库项目。用于实现编码解码器。

编码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CommonServerEncoder : MessageToByteEncoder<string>    
{
protected override void Encode(IChannelHandlerContext context, string message, IByteBuffer output)
{
byte[] messageBytes = Encoding.UTF8.GetBytes(message);
IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length);
initialMessage.WriteBytes(messageBytes);
output.WriteBytes(initialMessage);
}
}

public class CommonClientEncoder : MessageToByteEncoder<string>
{
protected override void Encode(IChannelHandlerContext context, string message, IByteBuffer output)
{
byte[] messageBytes = Encoding.UTF8.GetBytes(message);
IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length);
initialMessage.WriteBytes(messageBytes);
output.WriteBytes(initialMessage);
}
}

解码器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CommonServerDecoder : ByteToMessageDecoder    
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
byte[] array = new byte[input.ReadableBytes];
input.GetBytes(input.ReaderIndex, array, 0, input.ReadableBytes);
input.Clear();
output.Add(array);
}
}

public class CommonClientDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
byte[] array = new byte[input.ReadableBytes];
input.GetBytes(input.ReaderIndex, array, 0, input.ReadableBytes);
input.Clear();
output.Add(array);
}
}

2.服务端里添加:

                        //配置编码解码器
                        pipeline.AddLast(new CommonServerEncoder());
                        pipeline.AddLast(new CommonServerDecoder());

客户端里添加:

                        //配置编码解码器
                        pipeline.AddLast(new CommonClientEncoder());
                        pipeline.AddLast(new CommonClientDecoder());

3.服务端接收和发送:

1
2
3
4
5
6
7
8
9
public override void ChannelRead(IChannelHandlerContext context, object message)        
{
if (message is byte[] o)
{
Console.WriteLine($"解码器方式,从客户端接收:{Encoding.UTF8.GetString(o)}:{DateTime.Now}");
}
string msg = "服务端从客户端接收到内容后返回,我是服务端";
context.WriteAsync(msg);
}

客户端接收和发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public override void ChannelActive(IChannelHandlerContext context)        
{
Console.WriteLine("我是客户端.");
Console.WriteLine($"连接至服务端{context}.");
string message = "客户端1";
context.WriteAndFlushAsync(message);
}

public override void ChannelRead(IChannelHandlerContext context, object message)
{
if (message is byte[] o)
{
Console.WriteLine($"解码器方式,从服务端接收:{Encoding.UTF8.GetString(o)}:{DateTime.Now}");
}
}

实现了上一节一样的效果。

4.IdleStateHandler心跳机制:

4.1服务端添加IdleStateHandler心跳检测处理器,添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理.

IdleStateHandler心跳检测每十五秒进行一次读检测,如果十五秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法.

                        // IdleStateHandler 心跳
                        //服务端为读IDLE
                        pipeline.AddLast(new IdleStateHandler(15, 0, 0));//第一个参数为读,第二个为写,第三个为读写全部

4.2服务端Handler重载UserEventTriggered:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private int lossConnectCount = 0;
public override void UserEventTriggered(IChannelHandlerContext context, object evt) {
Console.WriteLine("已经15秒未收到客户端的消息了!");
if (evt is IdleStateEvent eventState)
{
if (eventState.State == IdleState.ReaderIdle)
{
lossConnectCount++;if (lossConnectCount > 2)
{
Console.WriteLine("关闭这个不活跃通道!");
context.CloseAsync();
} }
}
else
{
base.UserEventTriggered(context, evt);
}
}

接收部分改为判断心跳:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public override void ChannelRead(IChannelHandlerContext context, object message)        
{
if (message is byte[] o)
{
Console.WriteLine($"解码器方式,从客户端接收:{Encoding.UTF8.GetString(o)}:{DateTime.Now}");
if (Encoding.UTF8.GetString(o).Contains("biubiu:"))
{
string temp = "服务端接收到心跳连接";
context.WriteAsync(temp);return;
}
}
string msg = "服务端从客户端接收到内容后返回,我是服务端";
context.WriteAsync(msg);
}

4.3客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;

设定IdleStateHandler心跳检测每十秒进行一次写检测,如果十秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每十秒向服务端发送一次消息;

                        // IdleStateHandler 心跳
                        //客户端为写IDLE
                        pipeline.AddLast(new IdleStateHandler(0, 10, 0));//第一个参数为读,第二个为写,第三个为读写全部

4.4客户端Handler重载UserEventTriggered:

1
2
3
4
5
6
7
8
9
10
public override void UserEventTriggered(IChannelHandlerContext context, object evt)        {           
Console.WriteLine("客户端循环心跳监测发送: " + DateTime.Now);
if (evt is IdleStateEvent eventState)
{
if (eventState.State == IdleState.WriterIdle)
{
context.WriteAndFlushAsync($"biubiu:{DateTime.Now}");
}
}
}

4.5实现效果:

5.群发:将客户端上下线通知,群发至所有客户端。只在服务端修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static volatile IChannelGroup groups;
public override void HandlerAdded(IChannelHandlerContext context)
{
Console.WriteLine($"客户端{context}上线.");

base.HandlerAdded(context);
IChannelGroup g = groups;if (g == null)
{
lock (this)
{
if (groups == null)
{
g = groups = new DefaultChannelGroup(context.Executor);
}
}
}
g.Add(context.Channel);
groups.WriteAndFlushAsync($"欢迎{context.Channel.RemoteAddress}加入.");
}

public override void HandlerRemoved(IChannelHandlerContext context)
{
Console.WriteLine($"客户端{context}下线.");
base.HandlerRemoved(context);
groups.Remove(context.Channel);
groups.WriteAndFlushAsync($"恭送{context.Channel.RemoteAddress}离开.");
}

实现效果:

项目下载地址:项目下载