0%

DotNetty完全教程(三)

Excerpt

组件介绍ChannelChannel是Socket的封装,提供绑定,读,写等操作,降低了直接使用Socket的复杂性。EventLoop我们之前就讲过EventLoop这里回顾一下:一个 EventLoopGroup 包含一个或者多个 EventLoop;一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;所有由 EventLoop 处理的 I/O 事件都将在它…

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

组件介绍

Channel

Channel是Socket的封装,提供绑定,读,写等操作,降低了直接使用Socket的复杂性。

EventLoop

我们之前就讲过EventLoop这里回顾一下:

  1. 一个 EventLoopGroup 包含一个或者多个 EventLoop;
  2. 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
  3. 所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
  4. 一个 Channel 在它的生命周期内只注册于一个 EventLoop;
  5. 一个 EventLoop 可能会被分配给一个或多个 Channel。

ChannelFuture

本身是Channel中消息的回调,在DotNetty中被Task取代。

ChannelHandler

ChannelHandler是处理数据的逻辑容器

ChannelInboundHandler是接收并处理入站事件的逻辑容器,可以处理入站数据以及给客户端以回复。

ChannelPipeline

ChannelPipeline是将ChannelHandler穿成一串的的容器。

需要说明的是:

  1. ChannelInboundHandler只处理入站事件,ChannelOutboundHandler只处理出站事件
  2. ChannelInboundHandler和ChannelOutboundHandler可以注册在同一个ChannelPipeline中

(尝试一下)在 Netty 中,有两种发送消息的方式。你可以直接写到 Channel 中,也可以 写到和 ChannelHandler相关联的ChannelHandlerContext对象中。前一种方式将会导致消息从ChannelPipeline 的尾端开始流动,而后者将导致消息从 ChannelPipeline 中的下一个 ChannelHandler 开始流动。

编码器和解码器

Netty中内置了一些编码器和解码器,用来进行处理字节流数据,编码器用来将消息编码为字节流,解码器用来将字节流解码为另一种格式(字符串或一个对象)。

需要注意的是,编码器和解码器都实现了ChannelInboundHandler和 ChannelOutboundHandler接口用于处理入站或出站数据。

Bootstrap引导类

  1. Bootstrap用于引导客户端,ServerBootstrap用于引导服务器
  2. 客户端引导类只需要一个EventLoopGroup服务器引导类需要两个EventLoopGroup。但是在简单使用中,也可以公用一个EventLoopGroup。为什么服务器需要两个EventLoopGroup呢?是因为服务器的第一个EventLoopGroup只有一个EventLoop,只含有一个SeverChannel用于监听本地端口,一旦连接建立,这个EventLoop就将Channel控制权移交给另一个EventLoopGroup,这个EventLoopGroup分配一个EventLoop给Channel用于管理这个Channel。

DotNetty完全教程(九)

Excerpt

引导Bootstrap引导一个应用程序是指对他进行配置并且使他运行的过程。体系结构注意,DotNetty没有实现Cloneable的接口,而是直接实现了一个Clone方法。Netty实现这个接口是为了创建两个有着相同配置的应用程序,可以把一个配置整体应用到另一个上面,需要注意的是EventLoopGroup是一个浅拷贝,这就导致了拷贝的Bootstrap都会使用同一个EventLoopGr…


引导Bootstrap

引导一个应用程序是指对他进行配置并且使他运行的过程。

体系结构


注意,DotNetty没有实现Cloneable的接口,而是直接实现了一个Clone方法。Netty实现这个接口是为了创建两个有着相同配置的应用程序,可以把一个配置整体应用到另一个上面,需要注意的是EventLoopGroup是一个浅拷贝,这就导致了拷贝的Bootstrap都会使用同一个EventLoopGroup,这在每个Channel生命周期很短的时候是没有太大影响的。

服务器引导和普通引导有什么区别呢?区别在于,服务器接收到客户端的连接请求,会用一个Channel接受连接,然后用另一个Channel与客户端进行交流,但是客户端只需要一个Channel就可以与服务器进行交互。

关于链式调用

我们发现Bootstrap类可以通过流式语法进行链式调用,这要归功于Bootstrap类的特殊定义。下面我们来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 定义
public abstract class AbstractBootstrap<TBootstrap, TChannel>
where TBootstrap : AbstractBootstrap<TBootstrap, TChannel>
where TChannel : IChannel
// 定义子类
public class Bootstrap : AbstractBootstrap<Bootstrap, IChannel>
// 方法实现
public virtual TBootstrap Group(IEventLoopGroup group)
{
Contract.Requires(group != null);

if (this.group != null)
{
throw new InvalidOperationException("group has already been set.");
}
this.group = group;
return (TBootstrap)this;
}
// 使用
var bootstrap = new Bootstrap();
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast(new EchoClientHandler());
}));

API


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KF8h1wnu-1572421399290)(https://ws1.sinaimg.cn/large/007hF5Quly1g1o4mltliij30jr05mabe.jpg)]

客户端引导

1
2
3
4
5
6
7
8
9
10
11
12
13
var group = new MultithreadEventLoopGroup();
var bootstrap = new Bootstrap();
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast(new EchoClientHandler());
}));
IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse("10.10.10.158"), 3000));
Console.ReadLine();
await clientChannel.CloseAsync();

服务器引导

API:

注意上面箭头指示的是与Bootstrap不一样的方法。
为什么会有子Channel的概念呢,我们看下面这个图:

因为服务器是一对多的,所以有子Channel的概念。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
IEventLoopGroup eventLoop;
eventLoop = new MultithreadEventLoopGroup();
try
{
// 服务器引导程序
var bootstrap = new ServerBootstrap();
bootstrap.Group(eventLoop);
bootstrap.Channel<TcpServerSocketChannel>();
bootstrap.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast(new EchoServerHandler());
}));
IChannel boundChannel = await bootstrap.BindAsync(3000);
Console.ReadLine();
await boundChannel.CloseAsync();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
await eventLoop.ShutdownGracefullyAsync();
}

从Channel中引导客户端

  • 场景

    如果我们的服务器需要去第三方获取数据,这时候服务器就需要充当客户端去第三方取数据,这时候就需要在Channel中再开一个客户端获取数据。

  • 方式

    我们最好是从Channel中获取当前EventLoop,这样新开的客户端就跟当前Channel在一个线程中,减少了线程切换带来的开销,尽可能的重用了EventLoop

  • 实现

    1
    2
    3
    // 从Context创建客户端引导
    var bootstrap = new Bootstrap();
    bootstrap.Group(ctx.Channel.EventLoop);

初始化Pipeline

如果要添加的Handler不止一个,我们就需要用到ChannelInitializer,在DotNetty中,我们有十分简单的方法可以初始化一个pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var bootstrap = new Bootstrap();
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;

if (cert != null)
{
pipeline.AddLast("tls", new TlsHandler(stream => new SslStream(stream, true, (sender, certificate, chain, errors) => true), new ClientTlsSettings(targetHost)));
}
pipeline.AddLast(new LoggingHandler());
pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));

pipeline.AddLast("echo", new EchoClientHandler());
}));

ChannelOption

ChannelOption可以在引导的时候将设置批量的设置到所有Channel上,而不必要在每一个Channel建立的时候手动的去指定它的配置,应用场景是比如设置KeepAlive或者设置超时时间。

1
2
bootstrap.Option(ChannelOption.SoKeepalive, true)
.Option(ChannelOption.ConnectTimeout, new TimeSpan(5000));

面向无连接的用户数据报文

UDP的全称是“User Datagram Protocol”,在DotNetty中实现了SocketDatagramChannel来创建无连接的引导,需要注意的是无连接的引导不需要Connect只需要bind即可,代码如下:

1
2
3
4
5
6
7
8
9
10
11
var bootstrap = new Bootstrap();
bootstrap
.Group(group)
.Channel<SocketDatagramChannel>()
.Option(ChannelOption.SoBroadcast, true)
.Handler(new ActionChannelInitializer<IChannel>(channel =>
{
channel.Pipeline.AddLast("Quote", new QuoteOfTheMomentClientHandler());
}));

IChannel clientChannel = await bootstrap.BindAsync(IPEndPoint.MinPort);

关闭

Channel的关闭:

1
await clientChannel.CloseAsync();

EventLoopGroup的关闭:

1
await group.ShutdownGracefullyAsync();

DotNetty完全教程(二)

Excerpt

第一个DotNetty应用程序准备工作NuGet包介绍DotNetty由九个项目构成,在NuGet中都是单独的包,可以按需引用,其中比较重要的几个是以下几个:DotNetty.Common 是公共的类库项目,包装线程池,并行任务和常用帮助类的封装DotNetty.Transport 是DotNetty核心的实现DotNetty.Buffers 是对内存缓冲区管理的封装DotNett…


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

第一个DotNetty应用程序

准备工作

NuGet包介绍

DotNetty由九个项目构成,在NuGet中都是单独的包,可以按需引用,其中比较重要的几个是以下几个:

  • DotNetty.Common 是公共的类库项目,包装线程池,并行任务和常用帮助类的封装
  • DotNetty.Transport 是DotNetty核心的实现
  • DotNetty.Buffers 是对内存缓冲区管理的封装
  • DotNetty.Codes 是对编码器解码器的封装,包括一些基础基类的实现,我们在项目中自定义的协议,都要继承该项目的特定基类和实现
  • DotNetty.Handlers 封装了常用的管道处理器,比如Tls编解码,超时机制,心跳检查,日志等,如果项目中没有用到可以不引用,不过一般都会用到

开始一个项目

  1. 新建一个解决方案
  2. 新建一个项目
  3. 到NuGet中引用 DotNetty.Common DotNetty.Transport DotNetty.Buffers
  4. 开始编写实例代码

编写测试程序

回声测试应用程序编写 源码下载

  1. 新建一个解决方案 名字叫NettyTest

  2. 新建一个项目 名字叫EchoServer

  3. 到NuGet中引用 DotNetty.Common DotNetty.Transport DotNetty.Buffers

  4. 新建一个类 EchoServerHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    using DotNetty.Buffers;
    using DotNetty.Transport.Channels;
    using System;
    using System.Text;

    namespace EchoServer
    {
    /// <summary>
    /// 因为服务器只需要响应传入的消息,所以只需要实现ChannelHandlerAdapter就可以了
    /// </summary>
    public class EchoServerHandler : ChannelHandlerAdapter
    {
    /// <summary>
    /// 每个传入消息都会调用
    /// 处理传入的消息需要复写这个方法
    /// </summary>
    /// <param name="ctx"></param>
    /// <param name="msg"></param>
    public override void ChannelRead(IChannelHandlerContext ctx, object msg)
    {
    IByteBuffer message = msg as IByteBuffer;
    Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8));
    ctx.WriteAsync(message);
    }
    /// <summary>
    /// 批量读取中的最后一条消息已经读取完成
    /// </summary>
    /// <param name="context"></param>
    public override void ChannelReadComplete(IChannelHandlerContext context)
    {
    context.Flush();
    }
    /// <summary>
    /// 发生异常
    /// </summary>
    /// <param name="context"></param>
    /// <param name="exception"></param>
    public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
    {
    Console.WriteLine(exception);
    context.CloseAsync();
    }
    }
    }

    上面的代码注释已经非常详细了,相信看注释你就能明白这个类大致干了些什么,但是突如其来的一个类还是有点难以理解,那么本着认真负责的精神我会再详细解释一下没有学过Netty的同学难以理解的点:

    1. 问:EchoServerHandler 是干什么用的?回答:Netty帮我们封装了底层的通信过程让我们不需要再关心套接字等网络底层的问题,更加专注于处理业务,何为业务?就是数据来了之后我要怎么办,Handler就是一个处理数据的工厂,那么上面的Handler中我们做了什么事情呢?稍加分析就能发现,我们在接到消息之后打印在了控制台上,之后将消息再发送回去。
    2. 问:WriteAsync 是在干什么?Flush 又是在干什么?答:由于是初学,不灌输太多,大家现在只需要知道数据写入之后并不会直接发出去,Flush的时候才会发出去。
  5. 在自动生成的Program.cs中写入服务器引导程序。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    using DotNetty.Transport.Bootstrapping;
    using DotNetty.Transport.Channels;
    using DotNetty.Transport.Channels.Sockets;
    using System;
    using System.Threading.Tasks;

    namespace EchoServer
    {
    public class Program
    {
    static async Task RunServerAsync()
    {
    IEventLoopGroup eventLoop;
    eventLoop = new MultithreadEventLoopGroup();
    try
    {
    // 服务器引导程序
    var bootstrap = new ServerBootstrap();
    bootstrap.Group(eventLoop);
    bootstrap.Channel<TcpServerSocketChannel>();
    bootstrap.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
    {
    IChannelPipeline pipeline = channel.Pipeline;
    pipeline.AddLast(new EchoServerHandler());
    }));
    IChannel boundChannel = await bootstrap.BindAsync(3000);
    Console.ReadLine();
    await boundChannel.CloseAsync();
    }
    catch (Exception ex)
    {
    Console.WriteLine(ex);
    }
    finally
    {
    await eventLoop.ShutdownGracefullyAsync();
    }
    }
    static void Main(string[] args) => RunServerAsync().Wait();
    }
    }

    这个程序中同样有很多需要解释的,但是对于初学者来说,先明白这些概念就好了:

    1. bootstrap是启动引导的意思,Netty中的bootstrap的意思就是启动一个网络应用程序,那在启动之前我们肯定需要设置很多参数,bootstrap可以接收参数,引导用户启动Netty应用。
    2. EventLoopGroup 是一系列EventLoop的集合
    3. EventLoop 就对应了一个选择器(选择器看上一节的图)
    4. 一个Channel都需要绑定到一个选择器(EventLoop)上
    5. 每一个选择器(EventLoop)和一个线程绑定
    6. 我们可以把Handler串起来处理数据,这个我们后面再讲,这里的做法是把Handler串到pipeline上。
  6. 再新建一个项目取名叫EchoClient

  7. 新建一个类 EchoClientHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    using DotNetty.Buffers;
    using DotNetty.Transport.Channels;
    using System;
    using System.Text;

    namespace EchoClient
    {
    public class EchoClientHandler : SimpleChannelInboundHandler<IByteBuffer>
    {
    /// <summary>
    /// Read0是DotNetty特有的对于Read方法的封装
    /// 封装实现了:
    /// 1. 返回的message的泛型实现
    /// 2. 丢弃非该指定泛型的信息
    /// </summary>
    /// <param name="ctx"></param>
    /// <param name="msg"></param>
    protected override void ChannelRead0(IChannelHandlerContext ctx, IByteBuffer msg)
    {
    if (msg != null)
    {
    Console.WriteLine("Receive From Server:" + msg.ToString(Encoding.UTF8));
    }
    ctx.WriteAsync(Unpooled.CopiedBuffer(msg));
    }
    public override void ChannelReadComplete(IChannelHandlerContext context)
    {
    context.Flush();
    }
    public override void ChannelActive(IChannelHandlerContext context)
    {
    Console.WriteLine("发送Hello World");
    context.WriteAndFlushAsync(Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes("Hello World!")));
    }

    public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
    {
    Console.WriteLine(exception);
    context.CloseAsync();
    }
    }
    }

    Handler的编写方法于上面服务器的Handler基本一致,这里我们还是需要解释一些问题:

    1. SimpleChannelInboundHandler 继承自 ChannelHandlerAdapter,前者更强大的地方是对于资源的自动释放(这是一个伏笔)
    2. Read0方法在代码的注释中已经解释过了,有兴趣的同学可以看一下源码。这里我就不贴出来了
    3. ctx.WriteAsync(Unpooled.CopiedBuffer(msg));如果这里直接将msg发送出去,大家就会发现,实验失败了,这是为什么呢?简单解释就是因为引用计数器机制,IByteBuffer只能使用一次,而在我们使用Read0方法接收这个消息的时候,这个消息的引用计数就被归零了,这时候我们再次使用就会报出异常,所以这里需要将源消息再复制一份。当然,如果你使用的Read方法则不会有这样的问题。原则上来说,我们不应该存储指向任何消息的引用供未来使用,因为这些引用都会自动失效(意思就是消息收到了处理完就丢掉,消息不应该被长久保存)。
  8. 编写客户端引导程序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    using DotNetty.Transport.Bootstrapping;
    using DotNetty.Transport.Channels;
    using DotNetty.Transport.Channels.Sockets;
    using System;
    using System.Net;
    using System.Threading.Tasks;

    namespace EchoClient
    {
    class Program
    {
    static async Task RunClientAsync()
    {
    var group = new MultithreadEventLoopGroup();
    try
    {
    var bootstrap = new Bootstrap();
    bootstrap
    .Group(group)
    .Channel<TcpSocketChannel>()
    .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
    {
    IChannelPipeline pipeline = channel.Pipeline;
    pipeline.AddLast(new EchoClientHandler());
    }));
    IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse("10.10.10.158"), 3000));
    Console.ReadLine();
    await clientChannel.CloseAsync();
    }
    catch (Exception ex)
    {
    Console.WriteLine(ex);
    }
    finally
    {
    await group.ShutdownGracefullyAsync();
    }
    }
    static void Main(string[] args) => RunClientAsync().Wait();
    }
    }

写在最后

项目的完整代码我放在了码云上,你可以点击这里可以下载。我相信很多完全没有接触过Netty的同学在跟着写完了第一个项目之后还是很懵,虽然解释了很多,但是还是感觉似懂非懂,这很正常。就如同我们写完HelloWorld之后,仍然会纠结一下static void Main(string[] args)为什么要这么写。我要说的是,只要坚持写完了第一个应用程序,你就是好样的,关于Netty我们还有很多很多要讲,相信你学了之后的知识以后,回过头来再看这个实例,会有恍然大悟的感觉。如果你坚持看完了文章并且敲了程序并且试验成功了,恭喜你,晚饭加个鸡腿,我们还有很多东西要学。

DotNetty完全教程(五)

Excerpt

ChannelHandler本篇文章着重介绍ChannelHandlerChannel的生命周期我们复习一下,Channel是Socket的抽象,可以被注册到一个EventLoop上,EventLoop相当于Selector,每一个EventLoop又有自己的处理线程。复习了这部分的知识,我们就知道在Channel的生命中,有以下这么几个关键的时间节点。ChannelHandler的生…

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

ChannelHandler

本篇文章着重介绍ChannelHandler

Channel的生命周期

我们复习一下,Channel是Socket的抽象,可以被注册到一个EventLoop上,EventLoop相当于Selector,每一个EventLoop又有自己的处理线程。复习了这部分的知识,我们就知道在Channel的生命中,有以下这么几个关键的时间节点。

ChannelHandler的生命周期

我们复习一下,ChannelHandler是定义了如何处理数据的处理器,被串在ChannelPipeline中用于入站或者出站数据的处理。既然是处理Channel中的数据,就需要关注很多的时间节点,比如Channel被激活,比如,读取到了数据,所以,ChannelHandler不仅需要关心数据何时来,还需要关注Channel处于一个什么样的状态,所以ChannelHandler的生命周期如下:
在这里插入图片描述

使用适配器类创建自己的Handler

你可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter
类作为自己的 ChannelHandler 的起始点。使用的时候我们只需要扩展使用这些适配器类,然后重新我们需要的方法即可。

注意适配器类都有IsSharable属性,标识这个Hanlder能不能被添加到多个Pipeline中。

DotNetty完全教程(六)

Excerpt

资源管理目的在处理数据的过程中,我们需要确保没有任何的资源泄漏。这时候我们就得很关心资源管理。引用计数的处理使用完ByteBuf之后,需要调整其引用计数以确保资源的释放内存内漏探测Netty提供了ResourceLeakDetector来检测内存泄漏,因为其是采样检测的,所以相关开销并不大。泄露日志检测级别手动释放消息ReferenceCountUtil.SafeRelea…

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

资源管理

目的

在处理数据的过程中,我们需要确保没有任何的资源泄漏。这时候我们就得很关心资源管理。

引用计数的处理

使用完ByteBuf之后,需要调整其引用计数以确保资源的释放

内存内漏探测

Netty提供了ResourceLeakDetector来检测内存泄漏,因为其是采样检测的,所以相关开销并不大。

  1. 泄露日志

  2. 检测级别

  3. 手动释放消息

    1
    ReferenceCountUtil.SafeRelease(this.Message);
  4. 分析SimpleChannelInboundHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public override void ChannelRead(IChannelHandlerContext ctx, object msg)
    {
    bool release = true;
    try
    {
    if (this.AcceptInboundMessage(msg))
    {
    I imsg = (I)msg;
    this.ChannelRead0(ctx, imsg);
    }
    else
    {
    release = false;
    ctx.FireChannelRead(msg);
    }
    }
    finally
    {
    if (autoRelease && release)
    {
    ReferenceCountUtil.Release(msg);
    }
    }
    }

由上面的源码可以看出,Read0事实上是Read的封装,区别就是Read0方法在调用的时候,消息一定是被释放了,这就是手动释放的例子。

DotNetty完全教程(八)

Excerpt

EventLoop介绍我们先回顾一下,EventLoop就是我们在最开始的示意图中的Selector,每个EventLoop和一个线程绑定,用于处理多个Channel。任务调度如果我们想实现延时任务的调度,比如连接成功5s之后发送一包数据,就可以用到EventLoop的计划任务ctx.Channel.EventLoop.Schedule(() =>{ Console.Wr…


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

EventLoop

介绍

我们先回顾一下,EventLoop就是我们在最开始的示意图中的Selector,每个EventLoop和一个线程绑定,用于处理多个Channel。

任务调度

  1. 如果我们想实现延时任务的调度,比如连接成功5s之后发送一包数据,就可以用到EventLoop的计划任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    ctx.Channel.EventLoop.Schedule(() =>
    {
    Console.WriteLine("delay 1s");
    }, new TimeSpan(1000));
    // 如果需要提前取消,可以调用Cancel方法
    IScheduledTask task = ctx.Channel.EventLoop.Schedule(() =>
    {
    Console.WriteLine("delay 1s");
    }, new TimeSpan(1000));
    tsak.Cancel();
  2. 一个任务引发后,会判断当前是否在需要处理这个任务的EventLoop中(程序知道自己目前在执行哪个线程,线程又跟EventLoop对应),如果在就直接执行该任务,如果不在该任务中,则任务入队稍后处理

  3. 永远不要把一个需要耗费长时间的任务放到EventLoop执行队列来执行,需要使用我们前面介绍的EventExecutor的方法。

Group

许多Channel对应一个EventLoop,但是EventLoop能分配给她的Channel个数是有限的,要处理可以扩展的无数个Channel就需要EventLoopGroup。他们的结构关系如下图:

我们之前讲过,Netty不仅能够完成NIO系统的搭建,也能通过一些简单的配置,变成OIO阻塞IO系统,阻塞IO的话,就不能多个Channel共享一个EventLoop了,就需要一个Channel分配一个EventLoop。总的来说,EventLoop跟线程的关系是不会改变的。

需要注意的是:

  1. 给Channel分配EventLoop的是EventLoopGroup。而他将尽量均衡的将Channel进行分配。

DotNetty完全教程(十)

Excerpt

单元测试EmbeddedChannel介绍EmbeddedChannel是专门为了测试ChannelHandler的传输。我们先看一下他的API用一张图来描述这样的一个模拟过程编写基于xUnit的单元测试新建一个xUnit工程 UnitTest新建一个用于测试EmbededChannel的工程 EmbededChannelTestEmbededChannelTest工程需要引用…

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

单元测试

EmbeddedChannel介绍

EmbeddedChannel是专门为了测试ChannelHandler的传输。我们先看一下他的API

用一张图来描述这样的一个模拟过程

编写基于xUnit的单元测试

  1. 新建一个xUnit工程 UnitTest
  2. 新建一个用于测试EmbededChannel的工程 EmbededChannelTest
  3. EmbededChannelTest工程需要引用DotNetty的类库,这里因为我们需要测试一个解码器,所以除了原先的Buffer Common Transport之外我们还需要引用Codecs
  4. xUnit工程需要引用EmbededChannelTest工程
  5. 在EmbededChannelTest工程之下新建FixedLengthFrameDecoder待测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Transport.Channels;
using System;
using System.Collections.Generic;
using System.Text;

namespace EmbededChannelTest
{
public class FixedLengthFrameDecoder : ByteToMessageDecoder
{
private int _frameLength;
public FixedLengthFrameDecoder(int frameLength)
{
if (frameLength <= 0)
{
throw new Exception("不合法的参数。");
}
_frameLength = frameLength;
}
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
// 解码器实现固定的帧长度
while (input.ReadableBytes >= _frameLength)
{
IByteBuffer buf = input.ReadBytes(_frameLength);
output.Add(buf);
}
}
}
}

我们可以看到这个解码器将buffer中的字节流转化为每3个一帧。接下来我们需要编写测试类,我们在UnitTest工程下新建一个类,名字叫做UnitTester,编写如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
using DotNetty.Buffers;
using DotNetty.Transport.Channels.Embedded;
using EmbededChannelTest;
using System;
using System.Collections.Generic;
using System.Text;
using Xunit;

namespace UnitTest
{
public class UnitTester
{
[Fact]
public void testFrameDecoder()
{
IByteBuffer buf = Unpooled.Buffer();
for (int i = 0; i < 9; i++)
{
buf.WriteByte(i);
}
IByteBuffer input = buf.Duplicate();
EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
// 写数据
// retain能够将buffer的引用计数加1,并且返回这个buffer本身
Assert.True(channel.WriteInbound(input.Retain()));
Assert.True(channel.Finish());
// 读数据
IByteBuffer read = channel.ReadInbound<IByteBuffer>();
Assert.Equal(buf.ReadSlice(3), read);
read.Release();

read = channel.ReadInbound<IByteBuffer>();
Assert.Equal(buf.ReadSlice(3), read);
read.Release();

read = channel.ReadInbound<IByteBuffer>();
Assert.Equal(buf.ReadSlice(3), read);
read.Release();

Assert.Null(channel.ReadInbound<object>());
buf.Release();
}
}
}

编写完成之后直接右键点击运行测试即可。同理我们可以测试用于出站数据的Encoder,这里不贴代码了,感兴趣的可以去工程中自己看源码进行学习。

DotNetty完全教程(十一)

Excerpt

编码器和解码器定义编码器负责将应用程序可以识别的数据结构转化为可传输的数据流,解码器反之。对于应用程序来说,编码器操作出站数据,解码器操作入站数据。解码器和Handler解码器因为是处理入站数据的,所以继承了ChannelInBoundHandler.我们理解的时候可以认为解码器就是一种特殊的Handler,用于处理信息。解码器的类型ByteToMessageDecoderRepl…

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

编码器和解码器

定义

编码器负责将应用程序可以识别的数据结构转化为可传输的数据流,解码器反之。对于应用程序来说,编码器操作出站数据,解码器操作入站数据。

解码器和Handler

解码器因为是处理入站数据的,所以继承了ChannelInBoundHandler.我们理解的时候可以认为解码器就是一种特殊的Handler,用于处理信息。

解码器的类型

  • ByteToMessageDecoder
  • ReplayingDecoder
  • MessageToMessageDecoder

解码器实例

ByteToMessageDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// ByteToMessageDecoder
public class ToIntDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
if (input.ReadableBytes >= 4) output.Add(input.ReadInt());
}
}
// 测试代码
[Fact]
public void TestIntDecoder()
{
IByteBuffer buf = Unpooled.Buffer();
for (int i = 0; i < 8; i++)
{
buf.WriteByte(i);
}
// 构建Channel
EmbeddedChannel channel = new EmbeddedChannel(new ToIntDecoder());
// 测试
Assert.True(channel.WriteInbound(buf));
Assert.True(channel.Finish());

// 比如 0 1 2 3
// 3*2^0+2*2^8+1*2^16+0*2^24
Assert.Equal(66051, channel.ReadInbound<int>());

}

ReplayingDecoder

1
2
3
4
5
6
7
8
9
10
11
12
// 不需要判断ReadableBytes的ReplayingDecoder
public class ToIntDecoder2 : ReplayingDecoder<int>
{
public ToIntDecoder2(int initialState) : base(initialState)
{
}

protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
output.Add(input.ReadInt());
}
}

MessageToMessageDecoder

1
2
3
4
5
6
7
public class IntToStringDecoder : MessageToMessageDecoder<int>
{
protected override void Decode(IChannelHandlerContext context, int message, List<object> output)
{
output.Add(message.ToString());
}
}

更多解码器

  • LineBaseFrameDecoder 使用行尾控制符解析数据,可以把数据一行一行解析出来
  • HttpObjectDecoder HTTP解码器

编码器

根据我们之前的知识可以轻易的推导出,Encoder继承了ChannelOutBoundHandler

  • MessageToByteEncoder
  • MessageToMessageEncoder

编码器实例

MessageToByteEncoder

1
2
3
4
5
6
7
public class ShortToByteEncoder : MessageToByteEncoder<short>
{
protected override void Encode(IChannelHandlerContext context, short message, IByteBuffer output)
{
output.WriteShort(message);
}
}

MessageToMessageEncoder

1
2
3
4
5
6
7
public class IntToStringEncoder : MessageToMessageEncoder<int>
{
protected override void Encode(IChannelHandlerContext context, int message, List<object> output)
{
output.Add(message.ToString());
}
}

编解码器

MessageToMessageCodec,它拥有encode和decode两个方法,用于实现来回的转换数据,这种编解码器我们在后面实例的时候再举例说明。
这种编解码器可以把数据的转换,逆转换过程封装,但是同时他的缺点是,不如分开写重用方便。那我们就会想了,既然如此的话,为什么我们不能把一个编码器,一个解码器结合起来,作为一个编解码器呢?这样的话,编码器解码器分别可以重用,结合出来的编解码器也可以方便的使用 CombinedChannelDuplexHandler 就可以实现这样的作用。

1
2
3
4
5
// 提供结合的编解码器
public class CombinedCodec : CombinedChannelDuplexHandler<ToIntDecoder, ShortToByteEncoder>
{

}

DotNetty完全教程(四)

Excerpt

ByteBufferNetty中ByteBuffer的介绍Netty 的数据处理 API 通过两个组件暴露——abstract class ByteBuf 和 interfaceByteBufHolderDotNetty中有AbstractByteBuffer IByteBuffer IByteBufferHolder优点:它可以被用户自定义的缓冲区类型扩展;通过内置的复合缓冲区…


ByteBuffer

Netty中ByteBuffer的介绍

Netty 的数据处理 API 通过两个组件暴露——abstract class ByteBuf 和 interface
ByteBufHolder

DotNetty中有AbstractByteBuffer IByteBuffer IByteBufferHolder

优点:

  • 它可以被用户自定义的缓冲区类型扩展;
  • 通过内置的复合缓冲区类型实现了透明的零拷贝;
  • 容量可以按需增长(类似于 JDK 的 StringBuilder);
  • 在读和写这两种模式之间切换不需要调用 ByteBuffer 的 flip()方法;
  • 读和写使用了不同的索引;
  • 支持方法的链式调用;
  • 支持引用计数;
  • 支持池化

原理:

每一个ByteBuf都有两个索引,读索引和写索引,read和write会移动索引,set和get不会引动索引。

使用ByteBuf

  1. 堆缓冲区(使用数组的方式展示和操作数据)

使用支撑数组给ByteBuf提供快速的分配和释放的能力。适用于有遗留数据需要处理的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public override void ChannelRead(IChannelHandlerContext ctx, object msg)
{
IByteBuffer message = msg as IByteBuffer;
// 检查是否有支撑数组
if (message.HasArray)
{
// 获取数组
byte[] array = message.Array;
// 计算第一个字节的偏移
int offset = message.ArrayOffset + message.ReaderIndex;
// 获取可读字节数
int length = message.ReadableBytes;
// 调用方法,处理数据
HandleArray(array, offset, length);
}
Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8));
ctx.WriteAsync(message);
}
  1. 直接缓冲区
1
2
3
4
5
6
7
8
9
10
11
12
13
public override void ChannelRead(IChannelHandlerContext ctx, object msg)
{
IByteBuffer message = msg as IByteBuffer;
if (message.HasArray)
{
int length = message.ReadableBytes;
byte[] array = new byte[length];
message.GetBytes(message.ReaderIndex, array);
HandleArray(array, 0, length);
}
Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8));
ctx.WriteAsync(message);
}
  1. CompositeByteBuffer 复合缓冲区

如果要发送的命令是由两个ByteBuf拼接构成的,那么就需要复合缓冲区,比如Http协议中一个数据流由头跟内容构成这样的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public override void ChannelRead(IChannelHandlerContext ctx, object msg)
{
IByteBuffer message = msg as IByteBuffer;
// 创建一个复合缓冲区
CompositeByteBuffer messageBuf = Unpooled.CompositeBuffer();
// 创建两个ByteBuffer
IByteBuffer headBuf = Unpooled.CopiedBuffer(message);
IByteBuffer bodyBuf = Unpooled.CopiedBuffer(message);
// 添加到符合缓冲区中
messageBuf.AddComponents(headBuf, bodyBuf);
// 删除
messageBuf.RemoveComponent(0);

Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8));
ctx.WriteAsync(message);
}

字节级操作

  1. 读取(不移动索引)
1
2
3
4
5
6
7
8
9
10
11
12
13
public override void ChannelRead(IChannelHandlerContext ctx, object msg)
{
IByteBuffer message = msg as IByteBuffer;
for (int i = 0; i < message.Capacity; i++)
{
// 如此使用索引访问不会改变读索引也不会改变写索引
byte b = message.GetByte(i);
Console.WriteLine(b);
}

Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8));
ctx.WriteAsync(message);
}
  1. 丢弃可丢弃字节
    所谓可丢弃字节就是调用read方法之后,readindex已经移动过了的区域,这段区域的字节称为可丢弃字节。
1
message.DiscardReadBytes();

只有在内存十分宝贵需要清理的时候再调用这个方法,随便调用有可能会造成内存的复制,降低效率。
3. 读取所有可读字节(移动读索引)

1
2
3
4
while (message.IsReadable())
{
Console.WriteLine(message.ReadByte());
}
  1. 写入数据
1
2
3
4
5
// 使用随机数填充可写区域
while (message.WritableBytes > 4)
{
message.WriteInt(new Random().Next(0, 100));
}
  1. 管理索引
  • MarkReaderIndex ResetReaderIndex 标记和恢复读索引
  • MarkWriterIndex ResetWriterIndex 标记和恢复写索引
  • SetReaderIndex(int) SetWriterIndex(int) 直接移动索引
  • clear() 重置两个索引都为0,但是不会清除内容
  1. 查找
  • IndexOf()
  • 使用Processor
1
2
// 查找\r
message.ForEachByte(ByteProcessor.FindCR);
  1. 派生

派生的意思是创建一个新的ByteBuffer,这个ByteBuf派生于其他的ByteBuf,派生出来的子ByteBuf具有自己的读写索引,但是本质上指向同一个对象,这样就导致了改变一个,另一个也会改变。

  • duplicate();
  • slice();
  • slice(int, int);
  • Unpooled.unmodifiableBuffer(…);
  • order(ByteOrder);
  • readSlice(int)。
  1. 复制
    复制不同于派生,会复制出一个独立的ByteBuf,修改其中一个不会改变另一个。
  • copy
  1. 释放
1
2
// 显式丢弃消息
ReferenceCountUtil.release(msg);
  1. 增加引用计数防止释放
1
ReferenceCountUtil.retain(message)
  1. 其他api
    在这里插入图片描述

ByteBufHolder

  1. 目的
    再数据处理的过程中不仅仅有字节数据内容本身,还会有一些附加信息,比如HTTP响应的状态码,Cookie等。给ByteBuf附加信息就要用到ByteBufHolder.
  2. API

管理ByteBuffer

  1. 按需分配 ByteBufAllocator

    注意分配是池化的,最大程度上降低分配和释放内存的开销。

    1
    2
    3
    4
    5
    6
    7
    // 获取Allocator
    // 1
    IChannelHandlerContext ctx = null;
    IByteBufferAllocator allocator = ctx.Allocator;
    // 2
    IChannel channel = null;
    allocator = channel.Allocator;

有两种ByteBufAllocator的实现:PooledByteBufAllocator和UnpooledByteBufAllocator,前者池化了ByteBuf的实例,极大限度的提升了性能减少了内存碎片。
2. Unpooled缓冲区
获取不到 ByteBufAllocator的引用的时候我们可以使用Unpooled工具类来操作ByteBuf。
在这里插入图片描述

  1. ByteBufUtil
    这个类提供了一些通用的API,都是静态的辅助方法,例如hexdump方法可以以十六进制的方式打印ByteBuf的内容。还有equal方法判断bytebuf是否相等。

引用计数

  1. 目的

    ByteBuf和ByteBufHolder都有计数的机制。引用计数都从1开始,如果计数大于0则不被释放,如果等于0就会被释放。它的目的是为了支持池化的实现,降低了内存分配的开销。

  2. 异常

    如果访问一个计数为0的对象就会引发IllegalReferenceCountException。

DotNetty系列三:编码解码器,IdleStateHandler心跳机制

Excerpt

在上一节基础上,实现编码解码器。1.创建一个类库项目。用于实现编码解码器。编码器: public class CommonServerEncoder : MessageToByteEncoder<string> { protected override void Encode(IChannelHandlerContext context, s…


在上一节基础上,实现编码解码器。

1.创建一个类库项目。用于实现编码解码器。

编码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CommonServerEncoder : MessageToByteEncoder<string>    
{
protected override void Encode(IChannelHandlerContext context, string message, IByteBuffer output)
{
byte[] messageBytes = Encoding.UTF8.GetBytes(message);
IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length);
initialMessage.WriteBytes(messageBytes);
output.WriteBytes(initialMessage);
}
}

public class CommonClientEncoder : MessageToByteEncoder<string>
{
protected override void Encode(IChannelHandlerContext context, string message, IByteBuffer output)
{
byte[] messageBytes = Encoding.UTF8.GetBytes(message);
IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length);
initialMessage.WriteBytes(messageBytes);
output.WriteBytes(initialMessage);
}
}

解码器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CommonServerDecoder : ByteToMessageDecoder    
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
byte[] array = new byte[input.ReadableBytes];
input.GetBytes(input.ReaderIndex, array, 0, input.ReadableBytes);
input.Clear();
output.Add(array);
}
}

public class CommonClientDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
byte[] array = new byte[input.ReadableBytes];
input.GetBytes(input.ReaderIndex, array, 0, input.ReadableBytes);
input.Clear();
output.Add(array);
}
}

2.服务端里添加:

                        //配置编码解码器
                        pipeline.AddLast(new CommonServerEncoder());
                        pipeline.AddLast(new CommonServerDecoder());

客户端里添加:

                        //配置编码解码器
                        pipeline.AddLast(new CommonClientEncoder());
                        pipeline.AddLast(new CommonClientDecoder());

3.服务端接收和发送:

1
2
3
4
5
6
7
8
9
public override void ChannelRead(IChannelHandlerContext context, object message)        
{
if (message is byte[] o)
{
Console.WriteLine($"解码器方式,从客户端接收:{Encoding.UTF8.GetString(o)}:{DateTime.Now}");
}
string msg = "服务端从客户端接收到内容后返回,我是服务端";
context.WriteAsync(msg);
}

客户端接收和发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public override void ChannelActive(IChannelHandlerContext context)        
{
Console.WriteLine("我是客户端.");
Console.WriteLine($"连接至服务端{context}.");
string message = "客户端1";
context.WriteAndFlushAsync(message);
}

public override void ChannelRead(IChannelHandlerContext context, object message)
{
if (message is byte[] o)
{
Console.WriteLine($"解码器方式,从服务端接收:{Encoding.UTF8.GetString(o)}:{DateTime.Now}");
}
}

实现了上一节一样的效果。

4.IdleStateHandler心跳机制:

4.1服务端添加IdleStateHandler心跳检测处理器,添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理.

IdleStateHandler心跳检测每十五秒进行一次读检测,如果十五秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法.

                        // IdleStateHandler 心跳
                        //服务端为读IDLE
                        pipeline.AddLast(new IdleStateHandler(15, 0, 0));//第一个参数为读,第二个为写,第三个为读写全部

4.2服务端Handler重载UserEventTriggered:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private int lossConnectCount = 0;
public override void UserEventTriggered(IChannelHandlerContext context, object evt) {
Console.WriteLine("已经15秒未收到客户端的消息了!");
if (evt is IdleStateEvent eventState)
{
if (eventState.State == IdleState.ReaderIdle)
{
lossConnectCount++;if (lossConnectCount > 2)
{
Console.WriteLine("关闭这个不活跃通道!");
context.CloseAsync();
} }
}
else
{
base.UserEventTriggered(context, evt);
}
}

接收部分改为判断心跳:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public override void ChannelRead(IChannelHandlerContext context, object message)        
{
if (message is byte[] o)
{
Console.WriteLine($"解码器方式,从客户端接收:{Encoding.UTF8.GetString(o)}:{DateTime.Now}");
if (Encoding.UTF8.GetString(o).Contains("biubiu:"))
{
string temp = "服务端接收到心跳连接";
context.WriteAsync(temp);return;
}
}
string msg = "服务端从客户端接收到内容后返回,我是服务端";
context.WriteAsync(msg);
}

4.3客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;

设定IdleStateHandler心跳检测每十秒进行一次写检测,如果十秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每十秒向服务端发送一次消息;

                        // IdleStateHandler 心跳
                        //客户端为写IDLE
                        pipeline.AddLast(new IdleStateHandler(0, 10, 0));//第一个参数为读,第二个为写,第三个为读写全部

4.4客户端Handler重载UserEventTriggered:

1
2
3
4
5
6
7
8
9
10
public override void UserEventTriggered(IChannelHandlerContext context, object evt)        {           
Console.WriteLine("客户端循环心跳监测发送: " + DateTime.Now);
if (evt is IdleStateEvent eventState)
{
if (eventState.State == IdleState.WriterIdle)
{
context.WriteAndFlushAsync($"biubiu:{DateTime.Now}");
}
}
}

4.5实现效果:

5.群发:将客户端上下线通知,群发至所有客户端。只在服务端修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static volatile IChannelGroup groups;
public override void HandlerAdded(IChannelHandlerContext context)
{
Console.WriteLine($"客户端{context}上线.");

base.HandlerAdded(context);
IChannelGroup g = groups;if (g == null)
{
lock (this)
{
if (groups == null)
{
g = groups = new DefaultChannelGroup(context.Executor);
}
}
}
g.Add(context.Channel);
groups.WriteAndFlushAsync($"欢迎{context.Channel.RemoteAddress}加入.");
}

public override void HandlerRemoved(IChannelHandlerContext context)
{
Console.WriteLine($"客户端{context}下线.");
base.HandlerRemoved(context);
groups.Remove(context.Channel);
groups.WriteAndFlushAsync($"恭送{context.Channel.RemoteAddress}离开.");
}

实现效果:

项目下载地址:项目下载