0%

DotNetty完全教程(二)

Excerpt

第一个DotNetty应用程序准备工作NuGet包介绍DotNetty由九个项目构成,在NuGet中都是单独的包,可以按需引用,其中比较重要的几个是以下几个:DotNetty.Common 是公共的类库项目,包装线程池,并行任务和常用帮助类的封装DotNetty.Transport 是DotNetty核心的实现DotNetty.Buffers 是对内存缓冲区管理的封装DotNett…


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

第一个DotNetty应用程序

准备工作

NuGet包介绍

DotNetty由九个项目构成,在NuGet中都是单独的包,可以按需引用,其中比较重要的几个是以下几个:

  • DotNetty.Common 是公共的类库项目,包装线程池,并行任务和常用帮助类的封装
  • DotNetty.Transport 是DotNetty核心的实现
  • DotNetty.Buffers 是对内存缓冲区管理的封装
  • DotNetty.Codes 是对编码器解码器的封装,包括一些基础基类的实现,我们在项目中自定义的协议,都要继承该项目的特定基类和实现
  • DotNetty.Handlers 封装了常用的管道处理器,比如Tls编解码,超时机制,心跳检查,日志等,如果项目中没有用到可以不引用,不过一般都会用到

开始一个项目

  1. 新建一个解决方案
  2. 新建一个项目
  3. 到NuGet中引用 DotNetty.Common DotNetty.Transport DotNetty.Buffers
  4. 开始编写实例代码

编写测试程序

回声测试应用程序编写 源码下载

  1. 新建一个解决方案 名字叫NettyTest

  2. 新建一个项目 名字叫EchoServer

  3. 到NuGet中引用 DotNetty.Common DotNetty.Transport DotNetty.Buffers

  4. 新建一个类 EchoServerHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    using DotNetty.Buffers;
    using DotNetty.Transport.Channels;
    using System;
    using System.Text;

    namespace EchoServer
    {
    /// <summary>
    /// 因为服务器只需要响应传入的消息,所以只需要实现ChannelHandlerAdapter就可以了
    /// </summary>
    public class EchoServerHandler : ChannelHandlerAdapter
    {
    /// <summary>
    /// 每个传入消息都会调用
    /// 处理传入的消息需要复写这个方法
    /// </summary>
    /// <param name="ctx"></param>
    /// <param name="msg"></param>
    public override void ChannelRead(IChannelHandlerContext ctx, object msg)
    {
    IByteBuffer message = msg as IByteBuffer;
    Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8));
    ctx.WriteAsync(message);
    }
    /// <summary>
    /// 批量读取中的最后一条消息已经读取完成
    /// </summary>
    /// <param name="context"></param>
    public override void ChannelReadComplete(IChannelHandlerContext context)
    {
    context.Flush();
    }
    /// <summary>
    /// 发生异常
    /// </summary>
    /// <param name="context"></param>
    /// <param name="exception"></param>
    public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
    {
    Console.WriteLine(exception);
    context.CloseAsync();
    }
    }
    }

    上面的代码注释已经非常详细了,相信看注释你就能明白这个类大致干了些什么,但是突如其来的一个类还是有点难以理解,那么本着认真负责的精神我会再详细解释一下没有学过Netty的同学难以理解的点:

    1. 问:EchoServerHandler 是干什么用的?回答:Netty帮我们封装了底层的通信过程让我们不需要再关心套接字等网络底层的问题,更加专注于处理业务,何为业务?就是数据来了之后我要怎么办,Handler就是一个处理数据的工厂,那么上面的Handler中我们做了什么事情呢?稍加分析就能发现,我们在接到消息之后打印在了控制台上,之后将消息再发送回去。
    2. 问:WriteAsync 是在干什么?Flush 又是在干什么?答:由于是初学,不灌输太多,大家现在只需要知道数据写入之后并不会直接发出去,Flush的时候才会发出去。
  5. 在自动生成的Program.cs中写入服务器引导程序。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    using DotNetty.Transport.Bootstrapping;
    using DotNetty.Transport.Channels;
    using DotNetty.Transport.Channels.Sockets;
    using System;
    using System.Threading.Tasks;

    namespace EchoServer
    {
    public class Program
    {
    static async Task RunServerAsync()
    {
    IEventLoopGroup eventLoop;
    eventLoop = new MultithreadEventLoopGroup();
    try
    {
    // 服务器引导程序
    var bootstrap = new ServerBootstrap();
    bootstrap.Group(eventLoop);
    bootstrap.Channel<TcpServerSocketChannel>();
    bootstrap.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
    {
    IChannelPipeline pipeline = channel.Pipeline;
    pipeline.AddLast(new EchoServerHandler());
    }));
    IChannel boundChannel = await bootstrap.BindAsync(3000);
    Console.ReadLine();
    await boundChannel.CloseAsync();
    }
    catch (Exception ex)
    {
    Console.WriteLine(ex);
    }
    finally
    {
    await eventLoop.ShutdownGracefullyAsync();
    }
    }
    static void Main(string[] args) => RunServerAsync().Wait();
    }
    }

    这个程序中同样有很多需要解释的,但是对于初学者来说,先明白这些概念就好了:

    1. bootstrap是启动引导的意思,Netty中的bootstrap的意思就是启动一个网络应用程序,那在启动之前我们肯定需要设置很多参数,bootstrap可以接收参数,引导用户启动Netty应用。
    2. EventLoopGroup 是一系列EventLoop的集合
    3. EventLoop 就对应了一个选择器(选择器看上一节的图)
    4. 一个Channel都需要绑定到一个选择器(EventLoop)上
    5. 每一个选择器(EventLoop)和一个线程绑定
    6. 我们可以把Handler串起来处理数据,这个我们后面再讲,这里的做法是把Handler串到pipeline上。
  6. 再新建一个项目取名叫EchoClient

  7. 新建一个类 EchoClientHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    using DotNetty.Buffers;
    using DotNetty.Transport.Channels;
    using System;
    using System.Text;

    namespace EchoClient
    {
    public class EchoClientHandler : SimpleChannelInboundHandler<IByteBuffer>
    {
    /// <summary>
    /// Read0是DotNetty特有的对于Read方法的封装
    /// 封装实现了:
    /// 1. 返回的message的泛型实现
    /// 2. 丢弃非该指定泛型的信息
    /// </summary>
    /// <param name="ctx"></param>
    /// <param name="msg"></param>
    protected override void ChannelRead0(IChannelHandlerContext ctx, IByteBuffer msg)
    {
    if (msg != null)
    {
    Console.WriteLine("Receive From Server:" + msg.ToString(Encoding.UTF8));
    }
    ctx.WriteAsync(Unpooled.CopiedBuffer(msg));
    }
    public override void ChannelReadComplete(IChannelHandlerContext context)
    {
    context.Flush();
    }
    public override void ChannelActive(IChannelHandlerContext context)
    {
    Console.WriteLine("发送Hello World");
    context.WriteAndFlushAsync(Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes("Hello World!")));
    }

    public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
    {
    Console.WriteLine(exception);
    context.CloseAsync();
    }
    }
    }

    Handler的编写方法于上面服务器的Handler基本一致,这里我们还是需要解释一些问题:

    1. SimpleChannelInboundHandler 继承自 ChannelHandlerAdapter,前者更强大的地方是对于资源的自动释放(这是一个伏笔)
    2. Read0方法在代码的注释中已经解释过了,有兴趣的同学可以看一下源码。这里我就不贴出来了
    3. ctx.WriteAsync(Unpooled.CopiedBuffer(msg));如果这里直接将msg发送出去,大家就会发现,实验失败了,这是为什么呢?简单解释就是因为引用计数器机制,IByteBuffer只能使用一次,而在我们使用Read0方法接收这个消息的时候,这个消息的引用计数就被归零了,这时候我们再次使用就会报出异常,所以这里需要将源消息再复制一份。当然,如果你使用的Read方法则不会有这样的问题。原则上来说,我们不应该存储指向任何消息的引用供未来使用,因为这些引用都会自动失效(意思就是消息收到了处理完就丢掉,消息不应该被长久保存)。
  8. 编写客户端引导程序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    using DotNetty.Transport.Bootstrapping;
    using DotNetty.Transport.Channels;
    using DotNetty.Transport.Channels.Sockets;
    using System;
    using System.Net;
    using System.Threading.Tasks;

    namespace EchoClient
    {
    class Program
    {
    static async Task RunClientAsync()
    {
    var group = new MultithreadEventLoopGroup();
    try
    {
    var bootstrap = new Bootstrap();
    bootstrap
    .Group(group)
    .Channel<TcpSocketChannel>()
    .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
    {
    IChannelPipeline pipeline = channel.Pipeline;
    pipeline.AddLast(new EchoClientHandler());
    }));
    IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse("10.10.10.158"), 3000));
    Console.ReadLine();
    await clientChannel.CloseAsync();
    }
    catch (Exception ex)
    {
    Console.WriteLine(ex);
    }
    finally
    {
    await group.ShutdownGracefullyAsync();
    }
    }
    static void Main(string[] args) => RunClientAsync().Wait();
    }
    }

写在最后

项目的完整代码我放在了码云上,你可以点击这里可以下载。我相信很多完全没有接触过Netty的同学在跟着写完了第一个项目之后还是很懵,虽然解释了很多,但是还是感觉似懂非懂,这很正常。就如同我们写完HelloWorld之后,仍然会纠结一下static void Main(string[] args)为什么要这么写。我要说的是,只要坚持写完了第一个应用程序,你就是好样的,关于Netty我们还有很多很多要讲,相信你学了之后的知识以后,回过头来再看这个实例,会有恍然大悟的感觉。如果你坚持看完了文章并且敲了程序并且试验成功了,恭喜你,晚饭加个鸡腿,我们还有很多东西要学。

DotNetty完全教程(五)

Excerpt

ChannelHandler本篇文章着重介绍ChannelHandlerChannel的生命周期我们复习一下,Channel是Socket的抽象,可以被注册到一个EventLoop上,EventLoop相当于Selector,每一个EventLoop又有自己的处理线程。复习了这部分的知识,我们就知道在Channel的生命中,有以下这么几个关键的时间节点。ChannelHandler的生…

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

ChannelHandler

本篇文章着重介绍ChannelHandler

Channel的生命周期

我们复习一下,Channel是Socket的抽象,可以被注册到一个EventLoop上,EventLoop相当于Selector,每一个EventLoop又有自己的处理线程。复习了这部分的知识,我们就知道在Channel的生命中,有以下这么几个关键的时间节点。

ChannelHandler的生命周期

我们复习一下,ChannelHandler是定义了如何处理数据的处理器,被串在ChannelPipeline中用于入站或者出站数据的处理。既然是处理Channel中的数据,就需要关注很多的时间节点,比如Channel被激活,比如,读取到了数据,所以,ChannelHandler不仅需要关心数据何时来,还需要关注Channel处于一个什么样的状态,所以ChannelHandler的生命周期如下:
在这里插入图片描述

使用适配器类创建自己的Handler

你可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter
类作为自己的 ChannelHandler 的起始点。使用的时候我们只需要扩展使用这些适配器类,然后重新我们需要的方法即可。

注意适配器类都有IsSharable属性,标识这个Hanlder能不能被添加到多个Pipeline中。

DotNetty完全教程(八)

Excerpt

EventLoop介绍我们先回顾一下,EventLoop就是我们在最开始的示意图中的Selector,每个EventLoop和一个线程绑定,用于处理多个Channel。任务调度如果我们想实现延时任务的调度,比如连接成功5s之后发送一包数据,就可以用到EventLoop的计划任务ctx.Channel.EventLoop.Schedule(() =>{ Console.Wr…


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

EventLoop

介绍

我们先回顾一下,EventLoop就是我们在最开始的示意图中的Selector,每个EventLoop和一个线程绑定,用于处理多个Channel。

任务调度

  1. 如果我们想实现延时任务的调度,比如连接成功5s之后发送一包数据,就可以用到EventLoop的计划任务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    ctx.Channel.EventLoop.Schedule(() =>
    {
    Console.WriteLine("delay 1s");
    }, new TimeSpan(1000));
    // 如果需要提前取消,可以调用Cancel方法
    IScheduledTask task = ctx.Channel.EventLoop.Schedule(() =>
    {
    Console.WriteLine("delay 1s");
    }, new TimeSpan(1000));
    tsak.Cancel();
  2. 一个任务引发后,会判断当前是否在需要处理这个任务的EventLoop中(程序知道自己目前在执行哪个线程,线程又跟EventLoop对应),如果在就直接执行该任务,如果不在该任务中,则任务入队稍后处理

  3. 永远不要把一个需要耗费长时间的任务放到EventLoop执行队列来执行,需要使用我们前面介绍的EventExecutor的方法。

Group

许多Channel对应一个EventLoop,但是EventLoop能分配给她的Channel个数是有限的,要处理可以扩展的无数个Channel就需要EventLoopGroup。他们的结构关系如下图:

我们之前讲过,Netty不仅能够完成NIO系统的搭建,也能通过一些简单的配置,变成OIO阻塞IO系统,阻塞IO的话,就不能多个Channel共享一个EventLoop了,就需要一个Channel分配一个EventLoop。总的来说,EventLoop跟线程的关系是不会改变的。

需要注意的是:

  1. 给Channel分配EventLoop的是EventLoopGroup。而他将尽量均衡的将Channel进行分配。

DotNetty完全教程(六)

Excerpt

资源管理目的在处理数据的过程中,我们需要确保没有任何的资源泄漏。这时候我们就得很关心资源管理。引用计数的处理使用完ByteBuf之后,需要调整其引用计数以确保资源的释放内存内漏探测Netty提供了ResourceLeakDetector来检测内存泄漏,因为其是采样检测的,所以相关开销并不大。泄露日志检测级别手动释放消息ReferenceCountUtil.SafeRelea…

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

资源管理

目的

在处理数据的过程中,我们需要确保没有任何的资源泄漏。这时候我们就得很关心资源管理。

引用计数的处理

使用完ByteBuf之后,需要调整其引用计数以确保资源的释放

内存内漏探测

Netty提供了ResourceLeakDetector来检测内存泄漏,因为其是采样检测的,所以相关开销并不大。

  1. 泄露日志

  2. 检测级别

  3. 手动释放消息

    1
    ReferenceCountUtil.SafeRelease(this.Message);
  4. 分析SimpleChannelInboundHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public override void ChannelRead(IChannelHandlerContext ctx, object msg)
    {
    bool release = true;
    try
    {
    if (this.AcceptInboundMessage(msg))
    {
    I imsg = (I)msg;
    this.ChannelRead0(ctx, imsg);
    }
    else
    {
    release = false;
    ctx.FireChannelRead(msg);
    }
    }
    finally
    {
    if (autoRelease && release)
    {
    ReferenceCountUtil.Release(msg);
    }
    }
    }

由上面的源码可以看出,Read0事实上是Read的封装,区别就是Read0方法在调用的时候,消息一定是被释放了,这就是手动释放的例子。

DotNetty完全教程(十一)

Excerpt

编码器和解码器定义编码器负责将应用程序可以识别的数据结构转化为可传输的数据流,解码器反之。对于应用程序来说,编码器操作出站数据,解码器操作入站数据。解码器和Handler解码器因为是处理入站数据的,所以继承了ChannelInBoundHandler.我们理解的时候可以认为解码器就是一种特殊的Handler,用于处理信息。解码器的类型ByteToMessageDecoderRepl…

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

编码器和解码器

定义

编码器负责将应用程序可以识别的数据结构转化为可传输的数据流,解码器反之。对于应用程序来说,编码器操作出站数据,解码器操作入站数据。

解码器和Handler

解码器因为是处理入站数据的,所以继承了ChannelInBoundHandler.我们理解的时候可以认为解码器就是一种特殊的Handler,用于处理信息。

解码器的类型

  • ByteToMessageDecoder
  • ReplayingDecoder
  • MessageToMessageDecoder

解码器实例

ByteToMessageDecoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// ByteToMessageDecoder
public class ToIntDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
if (input.ReadableBytes >= 4) output.Add(input.ReadInt());
}
}
// 测试代码
[Fact]
public void TestIntDecoder()
{
IByteBuffer buf = Unpooled.Buffer();
for (int i = 0; i < 8; i++)
{
buf.WriteByte(i);
}
// 构建Channel
EmbeddedChannel channel = new EmbeddedChannel(new ToIntDecoder());
// 测试
Assert.True(channel.WriteInbound(buf));
Assert.True(channel.Finish());

// 比如 0 1 2 3
// 3*2^0+2*2^8+1*2^16+0*2^24
Assert.Equal(66051, channel.ReadInbound<int>());

}

ReplayingDecoder

1
2
3
4
5
6
7
8
9
10
11
12
// 不需要判断ReadableBytes的ReplayingDecoder
public class ToIntDecoder2 : ReplayingDecoder<int>
{
public ToIntDecoder2(int initialState) : base(initialState)
{
}

protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
output.Add(input.ReadInt());
}
}

MessageToMessageDecoder

1
2
3
4
5
6
7
public class IntToStringDecoder : MessageToMessageDecoder<int>
{
protected override void Decode(IChannelHandlerContext context, int message, List<object> output)
{
output.Add(message.ToString());
}
}

更多解码器

  • LineBaseFrameDecoder 使用行尾控制符解析数据,可以把数据一行一行解析出来
  • HttpObjectDecoder HTTP解码器

编码器

根据我们之前的知识可以轻易的推导出,Encoder继承了ChannelOutBoundHandler

  • MessageToByteEncoder
  • MessageToMessageEncoder

编码器实例

MessageToByteEncoder

1
2
3
4
5
6
7
public class ShortToByteEncoder : MessageToByteEncoder<short>
{
protected override void Encode(IChannelHandlerContext context, short message, IByteBuffer output)
{
output.WriteShort(message);
}
}

MessageToMessageEncoder

1
2
3
4
5
6
7
public class IntToStringEncoder : MessageToMessageEncoder<int>
{
protected override void Encode(IChannelHandlerContext context, int message, List<object> output)
{
output.Add(message.ToString());
}
}

编解码器

MessageToMessageCodec,它拥有encode和decode两个方法,用于实现来回的转换数据,这种编解码器我们在后面实例的时候再举例说明。
这种编解码器可以把数据的转换,逆转换过程封装,但是同时他的缺点是,不如分开写重用方便。那我们就会想了,既然如此的话,为什么我们不能把一个编码器,一个解码器结合起来,作为一个编解码器呢?这样的话,编码器解码器分别可以重用,结合出来的编解码器也可以方便的使用 CombinedChannelDuplexHandler 就可以实现这样的作用。

1
2
3
4
5
// 提供结合的编解码器
public class CombinedCodec : CombinedChannelDuplexHandler<ToIntDecoder, ShortToByteEncoder>
{

}

DotNetty完全教程(十)

Excerpt

单元测试EmbeddedChannel介绍EmbeddedChannel是专门为了测试ChannelHandler的传输。我们先看一下他的API用一张图来描述这样的一个模拟过程编写基于xUnit的单元测试新建一个xUnit工程 UnitTest新建一个用于测试EmbededChannel的工程 EmbededChannelTestEmbededChannelTest工程需要引用…

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

单元测试

EmbeddedChannel介绍

EmbeddedChannel是专门为了测试ChannelHandler的传输。我们先看一下他的API

用一张图来描述这样的一个模拟过程

编写基于xUnit的单元测试

  1. 新建一个xUnit工程 UnitTest
  2. 新建一个用于测试EmbededChannel的工程 EmbededChannelTest
  3. EmbededChannelTest工程需要引用DotNetty的类库,这里因为我们需要测试一个解码器,所以除了原先的Buffer Common Transport之外我们还需要引用Codecs
  4. xUnit工程需要引用EmbededChannelTest工程
  5. 在EmbededChannelTest工程之下新建FixedLengthFrameDecoder待测试类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Transport.Channels;
using System;
using System.Collections.Generic;
using System.Text;

namespace EmbededChannelTest
{
public class FixedLengthFrameDecoder : ByteToMessageDecoder
{
private int _frameLength;
public FixedLengthFrameDecoder(int frameLength)
{
if (frameLength <= 0)
{
throw new Exception("不合法的参数。");
}
_frameLength = frameLength;
}
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
// 解码器实现固定的帧长度
while (input.ReadableBytes >= _frameLength)
{
IByteBuffer buf = input.ReadBytes(_frameLength);
output.Add(buf);
}
}
}
}

我们可以看到这个解码器将buffer中的字节流转化为每3个一帧。接下来我们需要编写测试类,我们在UnitTest工程下新建一个类,名字叫做UnitTester,编写如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
using DotNetty.Buffers;
using DotNetty.Transport.Channels.Embedded;
using EmbededChannelTest;
using System;
using System.Collections.Generic;
using System.Text;
using Xunit;

namespace UnitTest
{
public class UnitTester
{
[Fact]
public void testFrameDecoder()
{
IByteBuffer buf = Unpooled.Buffer();
for (int i = 0; i < 9; i++)
{
buf.WriteByte(i);
}
IByteBuffer input = buf.Duplicate();
EmbeddedChannel channel = new EmbeddedChannel(new FixedLengthFrameDecoder(3));
// 写数据
// retain能够将buffer的引用计数加1,并且返回这个buffer本身
Assert.True(channel.WriteInbound(input.Retain()));
Assert.True(channel.Finish());
// 读数据
IByteBuffer read = channel.ReadInbound<IByteBuffer>();
Assert.Equal(buf.ReadSlice(3), read);
read.Release();

read = channel.ReadInbound<IByteBuffer>();
Assert.Equal(buf.ReadSlice(3), read);
read.Release();

read = channel.ReadInbound<IByteBuffer>();
Assert.Equal(buf.ReadSlice(3), read);
read.Release();

Assert.Null(channel.ReadInbound<object>());
buf.Release();
}
}
}

编写完成之后直接右键点击运行测试即可。同理我们可以测试用于出站数据的Encoder,这里不贴代码了,感兴趣的可以去工程中自己看源码进行学习。

DotNetty完全教程(四)

Excerpt

ByteBufferNetty中ByteBuffer的介绍Netty 的数据处理 API 通过两个组件暴露——abstract class ByteBuf 和 interfaceByteBufHolderDotNetty中有AbstractByteBuffer IByteBuffer IByteBufferHolder优点:它可以被用户自定义的缓冲区类型扩展;通过内置的复合缓冲区…


ByteBuffer

Netty中ByteBuffer的介绍

Netty 的数据处理 API 通过两个组件暴露——abstract class ByteBuf 和 interface
ByteBufHolder

DotNetty中有AbstractByteBuffer IByteBuffer IByteBufferHolder

优点:

  • 它可以被用户自定义的缓冲区类型扩展;
  • 通过内置的复合缓冲区类型实现了透明的零拷贝;
  • 容量可以按需增长(类似于 JDK 的 StringBuilder);
  • 在读和写这两种模式之间切换不需要调用 ByteBuffer 的 flip()方法;
  • 读和写使用了不同的索引;
  • 支持方法的链式调用;
  • 支持引用计数;
  • 支持池化

原理:

每一个ByteBuf都有两个索引,读索引和写索引,read和write会移动索引,set和get不会引动索引。

使用ByteBuf

  1. 堆缓冲区(使用数组的方式展示和操作数据)

使用支撑数组给ByteBuf提供快速的分配和释放的能力。适用于有遗留数据需要处理的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public override void ChannelRead(IChannelHandlerContext ctx, object msg)
{
IByteBuffer message = msg as IByteBuffer;
// 检查是否有支撑数组
if (message.HasArray)
{
// 获取数组
byte[] array = message.Array;
// 计算第一个字节的偏移
int offset = message.ArrayOffset + message.ReaderIndex;
// 获取可读字节数
int length = message.ReadableBytes;
// 调用方法,处理数据
HandleArray(array, offset, length);
}
Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8));
ctx.WriteAsync(message);
}
  1. 直接缓冲区
1
2
3
4
5
6
7
8
9
10
11
12
13
public override void ChannelRead(IChannelHandlerContext ctx, object msg)
{
IByteBuffer message = msg as IByteBuffer;
if (message.HasArray)
{
int length = message.ReadableBytes;
byte[] array = new byte[length];
message.GetBytes(message.ReaderIndex, array);
HandleArray(array, 0, length);
}
Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8));
ctx.WriteAsync(message);
}
  1. CompositeByteBuffer 复合缓冲区

如果要发送的命令是由两个ByteBuf拼接构成的,那么就需要复合缓冲区,比如Http协议中一个数据流由头跟内容构成这样的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public override void ChannelRead(IChannelHandlerContext ctx, object msg)
{
IByteBuffer message = msg as IByteBuffer;
// 创建一个复合缓冲区
CompositeByteBuffer messageBuf = Unpooled.CompositeBuffer();
// 创建两个ByteBuffer
IByteBuffer headBuf = Unpooled.CopiedBuffer(message);
IByteBuffer bodyBuf = Unpooled.CopiedBuffer(message);
// 添加到符合缓冲区中
messageBuf.AddComponents(headBuf, bodyBuf);
// 删除
messageBuf.RemoveComponent(0);

Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8));
ctx.WriteAsync(message);
}

字节级操作

  1. 读取(不移动索引)
1
2
3
4
5
6
7
8
9
10
11
12
13
public override void ChannelRead(IChannelHandlerContext ctx, object msg)
{
IByteBuffer message = msg as IByteBuffer;
for (int i = 0; i < message.Capacity; i++)
{
// 如此使用索引访问不会改变读索引也不会改变写索引
byte b = message.GetByte(i);
Console.WriteLine(b);
}

Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8));
ctx.WriteAsync(message);
}
  1. 丢弃可丢弃字节
    所谓可丢弃字节就是调用read方法之后,readindex已经移动过了的区域,这段区域的字节称为可丢弃字节。
1
message.DiscardReadBytes();

只有在内存十分宝贵需要清理的时候再调用这个方法,随便调用有可能会造成内存的复制,降低效率。
3. 读取所有可读字节(移动读索引)

1
2
3
4
while (message.IsReadable())
{
Console.WriteLine(message.ReadByte());
}
  1. 写入数据
1
2
3
4
5
// 使用随机数填充可写区域
while (message.WritableBytes > 4)
{
message.WriteInt(new Random().Next(0, 100));
}
  1. 管理索引
  • MarkReaderIndex ResetReaderIndex 标记和恢复读索引
  • MarkWriterIndex ResetWriterIndex 标记和恢复写索引
  • SetReaderIndex(int) SetWriterIndex(int) 直接移动索引
  • clear() 重置两个索引都为0,但是不会清除内容
  1. 查找
  • IndexOf()
  • 使用Processor
1
2
// 查找\r
message.ForEachByte(ByteProcessor.FindCR);
  1. 派生

派生的意思是创建一个新的ByteBuffer,这个ByteBuf派生于其他的ByteBuf,派生出来的子ByteBuf具有自己的读写索引,但是本质上指向同一个对象,这样就导致了改变一个,另一个也会改变。

  • duplicate();
  • slice();
  • slice(int, int);
  • Unpooled.unmodifiableBuffer(…);
  • order(ByteOrder);
  • readSlice(int)。
  1. 复制
    复制不同于派生,会复制出一个独立的ByteBuf,修改其中一个不会改变另一个。
  • copy
  1. 释放
1
2
// 显式丢弃消息
ReferenceCountUtil.release(msg);
  1. 增加引用计数防止释放
1
ReferenceCountUtil.retain(message)
  1. 其他api
    在这里插入图片描述

ByteBufHolder

  1. 目的
    再数据处理的过程中不仅仅有字节数据内容本身,还会有一些附加信息,比如HTTP响应的状态码,Cookie等。给ByteBuf附加信息就要用到ByteBufHolder.
  2. API

管理ByteBuffer

  1. 按需分配 ByteBufAllocator

    注意分配是池化的,最大程度上降低分配和释放内存的开销。

    1
    2
    3
    4
    5
    6
    7
    // 获取Allocator
    // 1
    IChannelHandlerContext ctx = null;
    IByteBufferAllocator allocator = ctx.Allocator;
    // 2
    IChannel channel = null;
    allocator = channel.Allocator;

有两种ByteBufAllocator的实现:PooledByteBufAllocator和UnpooledByteBufAllocator,前者池化了ByteBuf的实例,极大限度的提升了性能减少了内存碎片。
2. Unpooled缓冲区
获取不到 ByteBufAllocator的引用的时候我们可以使用Unpooled工具类来操作ByteBuf。
在这里插入图片描述

  1. ByteBufUtil
    这个类提供了一些通用的API,都是静态的辅助方法,例如hexdump方法可以以十六进制的方式打印ByteBuf的内容。还有equal方法判断bytebuf是否相等。

引用计数

  1. 目的

    ByteBuf和ByteBufHolder都有计数的机制。引用计数都从1开始,如果计数大于0则不被释放,如果等于0就会被释放。它的目的是为了支持池化的实现,降低了内存分配的开销。

  2. 异常

    如果访问一个计数为0的对象就会引发IllegalReferenceCountException。

DotNetty系列三:编码解码器,IdleStateHandler心跳机制

Excerpt

在上一节基础上,实现编码解码器。1.创建一个类库项目。用于实现编码解码器。编码器: public class CommonServerEncoder : MessageToByteEncoder<string> { protected override void Encode(IChannelHandlerContext context, s…


在上一节基础上,实现编码解码器。

1.创建一个类库项目。用于实现编码解码器。

编码器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CommonServerEncoder : MessageToByteEncoder<string>    
{
protected override void Encode(IChannelHandlerContext context, string message, IByteBuffer output)
{
byte[] messageBytes = Encoding.UTF8.GetBytes(message);
IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length);
initialMessage.WriteBytes(messageBytes);
output.WriteBytes(initialMessage);
}
}

public class CommonClientEncoder : MessageToByteEncoder<string>
{
protected override void Encode(IChannelHandlerContext context, string message, IByteBuffer output)
{
byte[] messageBytes = Encoding.UTF8.GetBytes(message);
IByteBuffer initialMessage = Unpooled.Buffer(messageBytes.Length);
initialMessage.WriteBytes(messageBytes);
output.WriteBytes(initialMessage);
}
}

解码器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class CommonServerDecoder : ByteToMessageDecoder    
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
byte[] array = new byte[input.ReadableBytes];
input.GetBytes(input.ReaderIndex, array, 0, input.ReadableBytes);
input.Clear();
output.Add(array);
}
}

public class CommonClientDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
byte[] array = new byte[input.ReadableBytes];
input.GetBytes(input.ReaderIndex, array, 0, input.ReadableBytes);
input.Clear();
output.Add(array);
}
}

2.服务端里添加:

                        //配置编码解码器
                        pipeline.AddLast(new CommonServerEncoder());
                        pipeline.AddLast(new CommonServerDecoder());

客户端里添加:

                        //配置编码解码器
                        pipeline.AddLast(new CommonClientEncoder());
                        pipeline.AddLast(new CommonClientDecoder());

3.服务端接收和发送:

1
2
3
4
5
6
7
8
9
public override void ChannelRead(IChannelHandlerContext context, object message)        
{
if (message is byte[] o)
{
Console.WriteLine($"解码器方式,从客户端接收:{Encoding.UTF8.GetString(o)}:{DateTime.Now}");
}
string msg = "服务端从客户端接收到内容后返回,我是服务端";
context.WriteAsync(msg);
}

客户端接收和发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public override void ChannelActive(IChannelHandlerContext context)        
{
Console.WriteLine("我是客户端.");
Console.WriteLine($"连接至服务端{context}.");
string message = "客户端1";
context.WriteAndFlushAsync(message);
}

public override void ChannelRead(IChannelHandlerContext context, object message)
{
if (message is byte[] o)
{
Console.WriteLine($"解码器方式,从服务端接收:{Encoding.UTF8.GetString(o)}:{DateTime.Now}");
}
}

实现了上一节一样的效果。

4.IdleStateHandler心跳机制:

4.1服务端添加IdleStateHandler心跳检测处理器,添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理.

IdleStateHandler心跳检测每十五秒进行一次读检测,如果十五秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法.

                        // IdleStateHandler 心跳
                        //服务端为读IDLE
                        pipeline.AddLast(new IdleStateHandler(15, 0, 0));//第一个参数为读,第二个为写,第三个为读写全部

4.2服务端Handler重载UserEventTriggered:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private int lossConnectCount = 0;
public override void UserEventTriggered(IChannelHandlerContext context, object evt) {
Console.WriteLine("已经15秒未收到客户端的消息了!");
if (evt is IdleStateEvent eventState)
{
if (eventState.State == IdleState.ReaderIdle)
{
lossConnectCount++;if (lossConnectCount > 2)
{
Console.WriteLine("关闭这个不活跃通道!");
context.CloseAsync();
} }
}
else
{
base.UserEventTriggered(context, evt);
}
}

接收部分改为判断心跳:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public override void ChannelRead(IChannelHandlerContext context, object message)        
{
if (message is byte[] o)
{
Console.WriteLine($"解码器方式,从客户端接收:{Encoding.UTF8.GetString(o)}:{DateTime.Now}");
if (Encoding.UTF8.GetString(o).Contains("biubiu:"))
{
string temp = "服务端接收到心跳连接";
context.WriteAsync(temp);return;
}
}
string msg = "服务端从客户端接收到内容后返回,我是服务端";
context.WriteAsync(msg);
}

4.3客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;

设定IdleStateHandler心跳检测每十秒进行一次写检测,如果十秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每十秒向服务端发送一次消息;

                        // IdleStateHandler 心跳
                        //客户端为写IDLE
                        pipeline.AddLast(new IdleStateHandler(0, 10, 0));//第一个参数为读,第二个为写,第三个为读写全部

4.4客户端Handler重载UserEventTriggered:

1
2
3
4
5
6
7
8
9
10
public override void UserEventTriggered(IChannelHandlerContext context, object evt)        {           
Console.WriteLine("客户端循环心跳监测发送: " + DateTime.Now);
if (evt is IdleStateEvent eventState)
{
if (eventState.State == IdleState.WriterIdle)
{
context.WriteAndFlushAsync($"biubiu:{DateTime.Now}");
}
}
}

4.5实现效果:

5.群发:将客户端上下线通知,群发至所有客户端。只在服务端修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static volatile IChannelGroup groups;
public override void HandlerAdded(IChannelHandlerContext context)
{
Console.WriteLine($"客户端{context}上线.");

base.HandlerAdded(context);
IChannelGroup g = groups;if (g == null)
{
lock (this)
{
if (groups == null)
{
g = groups = new DefaultChannelGroup(context.Executor);
}
}
}
g.Add(context.Channel);
groups.WriteAndFlushAsync($"欢迎{context.Channel.RemoteAddress}加入.");
}

public override void HandlerRemoved(IChannelHandlerContext context)
{
Console.WriteLine($"客户端{context}下线.");
base.HandlerRemoved(context);
groups.Remove(context.Channel);
groups.WriteAndFlushAsync($"恭送{context.Channel.RemoteAddress}离开.");
}

实现效果:

项目下载地址:项目下载

一、下载链接https://portal.influxdata.com/downloads,选windows版

二、解压到安装盘,目录如下

三、修改conf文件,代码如下,直接复制粘贴(1.4.2版本),注意修改路径,带D盘的改为你的安装路径就好,一共三个,注意网上有配置admin进行web管理,但新版本配置文件里没有admin因为官方给删除了,需下载Chronograf,后文会介绍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
### Welcome to the InfluxDB configuration file.

# The values in this file override the default values used by the system if
# a config option is not specified. The commented out lines are the configuration
# field and the default value used. Uncommenting a line and changing the value
# will change the value used at runtime when the process is restarted.

# Once every 24 hours InfluxDB will report usage data to usage.influxdata.com
# The data includes a random ID, os, arch, version, the number of series and other
# usage data. No data from user databases is ever transmitted.
# Change this option to true to disable reporting.
# reporting-disabled = false

# Bind address to use for the RPC service for backup and restore.
# bind-address = "127.0.0.1:8088"

###
### [meta]
###
### Controls the parameters for the Raft consensus group that stores metadata
### about the InfluxDB cluster.
###

[meta]
# Where the metadata/raft database is stored
dir = "D:/influxdb-1.4.2-1/meta"

# Automatically create a default retention policy when creating a database.
retention-autocreate = true

# If log messages are printed for the meta service
logging-enabled = true

###
### [data]
###
### Controls where the actual shard data for InfluxDB lives and how it is
### flushed from the WAL. "dir" may need to be changed to a suitable place
### for your system, but the WAL settings are an advanced configuration. The
### defaults should work for most systems.
###

[data]
# The directory where the TSM storage engine stores TSM files.
dir = "D:/influxdb-1.4.2-1/data"

# The directory where the TSM storage engine stores WAL files.
wal-dir = "D:/influxdb-1.4.2-1/wal"

# The amount of time that a write will wait before fsyncing. A duration
# greater than 0 can be used to batch up multiple fsync calls. This is useful for slower
# disks or when WAL write contention is seen. A value of 0s fsyncs every write to the WAL.
# Values in the range of 0-100ms are recommended for non-SSD disks.
# wal-fsync-delay = "0s"


# The type of shard index to use for new shards. The default is an in-memory index that is
# recreated at startup. A value of "tsi1" will use a disk based index that supports higher
# cardinality datasets.
# index-version = "inmem"

# Trace logging provides more verbose output around the tsm engine. Turning
# this on can provide more useful output for debugging tsm engine issues.
# trace-logging-enabled = false

# Whether queries should be logged before execution. Very useful for troubleshooting, but will
# log any sensitive data contained within a query.
query-log-enabled = true

# Settings for the TSM engine

# CacheMaxMemorySize is the maximum size a shard's cache can
# reach before it starts rejecting writes.
# Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k).
# Vaues without a size suffix are in bytes.
# cache-max-memory-size = "1g"

# CacheSnapshotMemorySize is the size at which the engine will
# snapshot the cache and write it to a TSM file, freeing up memory
# Valid size suffixes are k, m, or g (case insensitive, 1024 = 1k).
# Values without a size suffix are in bytes.
# cache-snapshot-memory-size = "25m"

# CacheSnapshotWriteColdDuration is the length of time at
# which the engine will snapshot the cache and write it to
# a new TSM file if the shard hasn't received writes or deletes
# cache-snapshot-write-cold-duration = "10m"

# CompactFullWriteColdDuration is the duration at which the engine
# will compact all TSM files in a shard if it hasn't received a
# write or delete
# compact-full-write-cold-duration = "4h"

# The maximum number of concurrent full and level compactions that can run at one time. A
# value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime. Any number greater
# than 0 limits compactions to that value. This setting does not apply
# to cache snapshotting.
# max-concurrent-compactions = 0

# The maximum series allowed per database before writes are dropped. This limit can prevent
# high cardinality issues at the database level. This limit can be disabled by setting it to
# 0.
# max-series-per-database = 1000000

# The maximum number of tag values per tag that are allowed before writes are dropped. This limit
# can prevent high cardinality tag values from being written to a measurement. This limit can be
# disabled by setting it to 0.
# max-values-per-tag = 100000

###
### [coordinator]
###
### Controls the clustering service configuration.
###

[coordinator]
# The default time a write request will wait until a "timeout" error is returned to the caller.
# write-timeout = "10s"

# The maximum number of concurrent queries allowed to be executing at one time. If a query is
# executed and exceeds this limit, an error is returned to the caller. This limit can be disabled
# by setting it to 0.
# max-concurrent-queries = 0

# The maximum time a query will is allowed to execute before being killed by the system. This limit
# can help prevent run away queries. Setting the value to 0 disables the limit.
# query-timeout = "0s"

# The time threshold when a query will be logged as a slow query. This limit can be set to help
# discover slow or resource intensive queries. Setting the value to 0 disables the slow query logging.
# log-queries-after = "0s"

# The maximum number of points a SELECT can process. A value of 0 will make
# the maximum point count unlimited. This will only be checked every second so queries will not
# be aborted immediately when hitting the limit.
# max-select-point = 0

# The maximum number of series a SELECT can run. A value of 0 will make the maximum series
# count unlimited.
# max-select-series = 0

# The maxium number of group by time bucket a SELECT can create. A value of zero will max the maximum
# number of buckets unlimited.
# max-select-buckets = 0

###
### [retention]
###
### Controls the enforcement of retention policies for evicting old data.
###

[retention]
# Determines whether retention policy enforcement enabled.
enabled = true

# The interval of time when retention policy enforcement checks run.
check-interval = "30m"

###
### [shard-precreation]
###
### Controls the precreation of shards, so they are available before data arrives.
### Only shards that, after creation, will have both a start- and end-time in the
### future, will ever be created. Shards are never precreated that would be wholly
### or partially in the past.

[shard-precreation]
# Determines whether shard pre-creation service is enabled.
enabled = true

# The interval of time when the check to pre-create new shards runs.
check-interval = "10m"

# The default period ahead of the endtime of a shard group that its successor
# group is created.
advance-period = "30m"

###
### Controls the system self-monitoring, statistics and diagnostics.
###
### The internal database for monitoring data is created automatically if
### if it does not already exist. The target retention within this database
### is called 'monitor' and is also created with a retention period of 7 days
### and a replication factor of 1, if it does not exist. In all cases the
### this retention policy is configured as the default for the database.

[monitor]
# Whether to record statistics internally.
store-enabled = true

# The destination database for recorded statistics
store-database = "_internal"

# The interval at which to record statistics
store-interval = "10s"

###
### [http]
###
### Controls how the HTTP endpoints are configured. These are the primary
### mechanism for getting data into and out of InfluxDB.
###

[http]
# Determines whether HTTP endpoint is enabled.
enabled = true

# The bind address used by the HTTP service.
bind-address = ":8086"

# Determines whether user authentication is enabled over HTTP/HTTPS.
# auth-enabled = false

# The default realm sent back when issuing a basic auth challenge.
# realm = "InfluxDB"

# Determines whether HTTP request logging is enabled.
# log-enabled = true

# Determines whether detailed write logging is enabled.
# write-tracing = false

# Determines whether the pprof endpoint is enabled. This endpoint is used for
# troubleshooting and monitoring.
# pprof-enabled = true

# Determines whether HTTPS is enabled.
# https-enabled = false

# The SSL certificate to use when HTTPS is enabled.
# https-certificate = "/etc/ssl/influxdb.pem"

# Use a separate private key location.
# https-private-key = ""

# The JWT auth shared secret to validate requests using JSON web tokens.
# shared-secret = ""

# The default chunk size for result sets that should be chunked.
# max-row-limit = 0

# The maximum number of HTTP connections that may be open at once. New connections that
# would exceed this limit are dropped. Setting this value to 0 disables the limit.
# max-connection-limit = 0

# Enable http service over unix domain socket
# unix-socket-enabled = false

# The path of the unix domain socket.
# bind-socket = "/var/run/influxdb.sock"

# The maximum size of a client request body, in bytes. Setting this value to 0 disables the limit.
# max-body-size = 25000000


###
### [ifql]
###
### Configures the ifql RPC API.
###

[ifql]
# Determines whether the RPC service is enabled.
# enabled = true

# Determines whether additional logging is enabled.
# log-enabled = true

# The bind address used by the ifql RPC service.
# bind-address = ":8082"


###
### [subscriber]
###
### Controls the subscriptions, which can be used to fork a copy of all data
### received by the InfluxDB host.
###

[subscriber]
# Determines whether the subscriber service is enabled.
# enabled = true

# The default timeout for HTTP writes to subscribers.
# http-timeout = "30s"

# Allows insecure HTTPS connections to subscribers. This is useful when testing with self-
# signed certificates.
# insecure-skip-verify = false

# The path to the PEM encoded CA certs file. If the empty string, the default system certs will be used
# ca-certs = ""

# The number of writer goroutines processing the write channel.
# write-concurrency = 40

# The number of in-flight writes buffered in the write channel.
# write-buffer-size = 1000


###
### [[graphite]]
###
### Controls one or many listeners for Graphite data.
###

[[graphite]]
# Determines whether the graphite endpoint is enabled.
# enabled = false
# database = "graphite"
# retention-policy = ""
# bind-address = ":2003"
# protocol = "tcp"
# consistency-level = "one"

# These next lines control how batching works. You should have this enabled
# otherwise you could get dropped metrics or poor performance. Batching
# will buffer points in memory if you have many coming in.

# Flush if this many points get buffered
# batch-size = 5000

# number of batches that may be pending in memory
# batch-pending = 10

# Flush at least this often even if we haven't hit buffer limit
# batch-timeout = "1s"

# UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.
# udp-read-buffer = 0

### This string joins multiple matching 'measurement' values providing more control over the final measurement name.
# separator = "."

### Default tags that will be added to all metrics. These can be overridden at the template level
### or by tags extracted from metric
# tags = ["region=us-east", "zone=1c"]

### Each template line requires a template pattern. It can have an optional
### filter before the template and separated by spaces. It can also have optional extra
### tags following the template. Multiple tags should be separated by commas and no spaces
### similar to the line protocol format. There can be only one default template.
# templates = [
# "*.app env.service.resource.measurement",
# # Default template
# "server.*",
# ]

###
### [collectd]
###
### Controls one or many listeners for collectd data.
###

[[collectd]]
# enabled = false
# bind-address = ":25826"
# database = "collectd"
# retention-policy = ""
#
# The collectd service supports either scanning a directory for multiple types
# db files, or specifying a single db file.
# typesdb = "/usr/local/share/collectd"
#
# security-level = "none"
# auth-file = "/etc/collectd/auth_file"

# These next lines control how batching works. You should have this enabled
# otherwise you could get dropped metrics or poor performance. Batching
# will buffer points in memory if you have many coming in.

# Flush if this many points get buffered
# batch-size = 5000

# Number of batches that may be pending in memory
# batch-pending = 10

# Flush at least this often even if we haven't hit buffer limit
# batch-timeout = "10s"

# UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.
# read-buffer = 0

# Multi-value plugins can be handled two ways.
# "split" will parse and store the multi-value plugin data into separate measurements
# "join" will parse and store the multi-value plugin as a single multi-value measurement.
# "split" is the default behavior for backward compatability with previous versions of influxdb.
# parse-multivalue-plugin = "split"
###
### [opentsdb]
###
### Controls one or many listeners for OpenTSDB data.
###

[[opentsdb]]
# enabled = false
# bind-address = ":4242"
# database = "opentsdb"
# retention-policy = ""
# consistency-level = "one"
# tls-enabled = false
# certificate= "/etc/ssl/influxdb.pem"

# Log an error for every malformed point.
# log-point-errors = true

# These next lines control how batching works. You should have this enabled
# otherwise you could get dropped metrics or poor performance. Only points
# metrics received over the telnet protocol undergo batching.

# Flush if this many points get buffered
# batch-size = 1000

# Number of batches that may be pending in memory
# batch-pending = 5

# Flush at least this often even if we haven't hit buffer limit
# batch-timeout = "1s"

###
### [[udp]]
###
### Controls the listeners for InfluxDB line protocol data via UDP.
###

[[udp]]
# enabled = false
# bind-address = ":8089"
# database = "udp"
# retention-policy = ""

# These next lines control how batching works. You should have this enabled
# otherwise you could get dropped metrics or poor performance. Batching
# will buffer points in memory if you have many coming in.

# Flush if this many points get buffered
# batch-size = 5000

# Number of batches that may be pending in memory
# batch-pending = 10

# Will flush at least this often even if we haven't hit buffer limit
# batch-timeout = "1s"

# UDP Read buffer size, 0 means OS default. UDP listener will fail if set above OS max.
# read-buffer = 0

###
### [continuous_queries]
###
### Controls how continuous queries are run within InfluxDB.
###

[continuous_queries]
# Determines whether the continuous query service is enabled.
# enabled = true

# Controls whether queries are logged when executed by the CQ service.
# log-enabled = true

# Controls whether queries are logged to the self-monitoring data store.
# query-stats-enabled = false

# interval for how often continuous queries will be checked if they need to run
# run-interval = "1s"

四、使配置生效并打开数据库连接,双击influxd.exe就好,然后双击influx.exe进行操作,网上有操作教程,注意操作数据库时不能关闭influxd.exe,我不知道为什么总有这么个提示:There was an error writing history file: open : The system cannot find the file specified.不过好像没啥影响

五、要使用web管理需要下载Chronograf,https://portal.influxdata.com/downloads第三个就是,下载完直接解压,双击exe程序,在浏览器输入http://localhost:8888/,一开始登录要账户密码,我都用admin就进去了

这个是查看建立的数据库

这个是查看数据库的数据

没了

关于子仓库或者说是仓库共用,git官方推荐的工具是git subtree。 我自己也用了一段时间的git subtree,感觉比git submodule好用,但是也有一些缺点,在可接受的范围内。
所以对于仓库共用,在git subtree 与 git submodule之中选择的话,我推荐git subtree。

git subtree 可以实现一个仓库作为其他仓库的子仓库。

使用git subtree 有以下几个原因:

  • 旧版本的git也支持(最老版本可以到 v1.5.2).
  • git subtree与git submodule不同,它不增加任何像.gitmodule这样的新的元数据文件.
  • git subtree对于项目中的其他成员透明,意味着可以不知道git subtree的存在.

当然,git subtree也有它的缺点,但是这些缺点还在可以接受的范围内:

  • 必须学习新的指令(如:git subtree).
  • 子仓库的更新与推送指令相对复杂。

git subtree的主要命令有:

1
2
3
4
5
6
git subtree add   --prefix=<prefix> <commit>
git subtree add --prefix=<prefix> <repository> <ref>
git subtree pull --prefix=<prefix> <repository> <ref>
git subtree push --prefix=<prefix> <repository> <ref>
git subtree merge --prefix=<prefix> <commit>
git subtree split --prefix=<prefix> [OPTIONS] [<commit>]

准备

我们先准备一个仓库叫photoshop,一个仓库叫libpng,然后我们希望把libpng作为photoshop的子仓库。
photoshop的路径为https://github.com/test/photoshop.git,仓库里的文件有:

1
2
3
4
5
6
photoshop
|
|-- photoshop.c
|-- photoshop.h
|-- main.c
\-- README.md

libPNG的路径为https://github.com/test/libpng.git,仓库里的文件有:

1
2
3
4
5
libpng
|
|-- libpng.c
|-- libpng.h
\-- README.md

以下操作均位于父仓库的根目录中。

在父仓库中新增子仓库

我们执行以下命令把libpng添加到photoshop中:

1
git subtree add --prefix=sub/libpng https://github.com/test/libpng.git master --squash

(--squash参数表示不拉取历史信息,而只生成一条commit信息。)

执行git status可以看到提示新增两条commit:

image

git log查看详细修改:

image

执行git push把修改推送到远端photoshop仓库,现在本地仓库与远端仓库的目录结构为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
photoshop
|
|-- sub/
| |
| \--libpng/
| |
| |-- libpng.c
| |-- libpng.h
| \-- README.md
|
|-- photoshop.c
|-- photoshop.h
|-- main.c
\-- README.md

注意,现在的photoshop仓库对于其他项目人员来说,可以不需要知道libpng是一个子仓库。什么意思呢?
当你git clone或者git pull的时候,你拉取到的是整个photoshop(包括libpng在内,libpng就相当于photoshop里的一个普通目录);当你修改了libpng里的内容后执行git push,你将会把修改push到photoshop上。
也就是说photoshop仓库下的libpng与其他文件无异。

从源仓库拉取更新

如果源libpng仓库更新了,photoshop里的libpng如何拉取更新?使用git subtree pull,例如:

1
git subtree pull --prefix=sub/libpng https://github.com/test/libpng.git master --squash

推送修改到源仓库

如果在photoshop仓库里修改了libpng,然后想把这个修改推送到源libpng仓库呢?使用git subtree push,例如:

1
git subtree push --prefix=sub/libpng https://github.com/test/libpng.git master

简化git subtree命令

我们已经知道了git subtree 的命令的基本用法,但是上述几个命令还是显得有点复杂,特别是子仓库的源仓库地址,特别不方便记忆。
这里我们把子仓库的地址作为一个remote,方便记忆:

1
git remote add -f libpng https://github.com/test/libpng.git

然后可以这样来使用git subtree命令:

1
2
3
git subtree add --prefix=sub/libpng libpng master --squash
git subtree pull --prefix=sub/libpng libpng master --squash
git subtree push --prefix=sub/libpng libpng master