0%

目前直播已成为一个相当热门的服务,自己对视音频的采集、传输、播放等等比较感兴趣,因此想记录下实现一个直播平台的过程,不仅是对已用过的知识进行记录,以防后期使用,也可以为其他的初学者提供一个借鉴的历程,接下来,根据自己的理解分析一下一个直播平台的基本结构。

目前实现一个直播平台从推流到拉流主要由5部分构成:

1、视音频的采集:通过调用摄像头、麦克风直接采集视音频数据,一般视频采集的数据用RGB或者YUV格式、音频采集数据采用PCM格式,由于采集的原始数据体积非常大,需要经过压缩技术处理来提高传输效率。并且对于采集的视频可以做一些前处理,比如美颜、水印等等。

2、数据编码:为了便于手机视频的推流、拉流以及存储,通常采用视音频编码压缩技术来减少体积,编码方式:CBR、VBR,

视频-编码格式:H.265、H.264、MPEG-4等,封装容器有TS、MKV、AVI、MP4等。现在比较常用的视频编码是H.264。

音频-编码格式:G.711μ、AAC、Opus等,封装有MP3、OGG、AAC等,比较常用的是AAC编码格式。

视频经过编码压缩大大提高了视频的存储和传输效率,当然,经过压缩后的视频在播放时必须进行解码。

3、数据传输:将编码完成后的音视频数据进行传输,早期的音视频通过同轴电缆之类的线缆进行传输,IP网络发展后,使用IP网络优传输

涉及技术或协议:

传输协议:RTP与RTCP、RTSP、RTMP、HTTP、HLS(HTTP Live

Streaming)等

控制信令:SIP和SDP、SNMP等

4、解码数据

5、播放显示

三、常见的视频直播相关协议:

1、RTMP(Real Time MessagingProtocol,实时消息传送协议)

RTMP是Adobe Systems公司为Flash播放器和服务器之间音频、视频和数据传输开发的开放协议。它有三种变种:

1)、工作在TCP之上的明文协议,使用端口1935;

2)、RTMPT封装在HTTP请求之中,可穿越防火墙;

3)、RTMPS类似RTMPT,但使用的是HTTPS连接;

RTMP协议是被Flash用于对象、视频、音频的传输。这个协议建立在TCP协议或者轮询HTTP协议之上。RTMP协议就像一个用来装数据包的容器,这些数据既可以是AMF格式的数据,也可以是FLV中的视音频数据。一个单一的连接可以通过不同的通道传输多路网络流,这些通道中的包都是按照固定大小的包传输的。

2、RTSP(Real Time StreamingProtocol,实时流传输协议)

RTSP定义了一对多应用程序如何有效地通过IP网络传送多媒体数据。RTSP提供了一个可扩展框架,数据源可以包括实时数据与已有的存储的数据。该协议目的在于控制多个数据发送连接,为选择发送通道如UDP、组播UDP与TCP提供途径,并为选择基于RTP上发送机制提供方法。

RTSP语法和运作跟HTTP/1.1类似,但并不特别强调时间同步,所以比较能容忍网络延迟。代理服务器的缓存功能也同样适用于RTSP,并且因为RTSP具有重新导向功能,可根据实际负载情况来切换提供服务的服务器,以避免过大的负载集中于同一服务器而造成延迟。

3、RTP(Real-time TransportProtocol,实时传输协议)

RTP是针对多媒体数据流的一种传输层协议,详细说明了在互联网上传递音频和视频的标准数据包格式。RTP协议常用于流媒体系统(配合RTCP协议),视频会议和一键通系统(配合H.323或SIP),使它成为IP电话产业的技术基础。

RTP是建立在UDP协议上的,常与RTCP一起使用,其本身并没有提供按时发送机制或其它服务质量(QoS)保证,它依赖于低层服务去实现这一过程。

RTP并不保证传送或防止无序传送,也不确定底层网络的可靠性,只管发送,不管传输是否丢包,也不管接收方是否有收到包。RTP 实行有序传送,RTP中的序列号允许接收方重组发送方的包序列,同时序列号也能用于决定适当的包位置,如在视频解码中,就不需要顺序解码。

4、RTCP(Real-time TransportControl Protocol,实时传输控制协议)

RTCP是RTP的配套协议,为RTP媒体流提供信道外的控制。RTCP和RTP一起协作将多媒体数据打包和发送,定期在多媒体流会话参与者之间传输控制数据。

RTCP的主要功能是为RTP所提供的服务质量(QoS)提供反馈,收集相关媒体连接的统计信息,例如传输字节数,传输分组数,丢失分组数,单向和双向网络延迟等等。网络应用程序可以利用RTCP所提供的信息来提高服务质量,比如限制流量或改用压缩比小的编解码器。

为什么做日志系统

首先,什么是日志? 日志就是程序产生的,遵循一定格式(通常包含时间戳)的文本数据。

通常日志由服务器生成,输出到不同的文件中,一般会有系统日志、 应用日志、安全日志。这些日志分散地存储在不同的机器上。

通常当系统发生故障时,工程师需要登录到各个服务器上,使用 grep / sed / awk 等 Linux 脚本工具去日志里查找故障原因。在没有日志系统的情况下,首先需要定位处理请求的服务器,如果这台服务器部署了多个实例,则需要去每个应用实例的日志目录下去找日志文件。每个应用实例还会设置日志滚动策略(如:每天生成一个文件),还有日志压缩归档策略等。

这样一系列流程下来,对于我们排查故障以及及时找到故障原因,造成了比较大的麻烦。因此,如果我们能把这些日志集中管理,并提供集中检索功能,不仅可以提高诊断的效率,同时对系统情况有个全面的理解,避免事后救火的被动。

我认为,日志数据在以下几方面具有非常重要的作用:

  • 数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案
  • 服务诊断:通过对日志信息进行统计、分析,了解服务器的负荷和服务运行状态
  • 数据分析:可以做进一步的数据分析,比如根据请求中的课程 id ,找出 TOP10 用户感兴趣课程。

针对这些问题,为了提供分布式的实时日志搜集和分析的监控系统,我们采用了业界通用的日志数据管理解决方案 - 它主要包括 Elasticsearch 、 Logstash 和 Kibana 三个系统。通常,业界把这套方案简称为ELK,取三个系统的首字母,但是我们实践之后将其进一步优化为EFK,F代表Filebeat,用以解决Logstash导致的问题。下面,我们展开详细介绍。

文中涉及的 ELK stack 版本是:

Elasticsearch 5.2.2 
Logstash 5.2.2 
Kibana 5.2.2 
Filebeat 5.2.2 
Kafka 2.10

这里写图片描述

Logstash :数据收集处理引擎。支持动态的从各种数据源搜集数据,并对数据进行过滤、分析、丰富、统一格式等操作,然后存储以供后续使用。

Kibana :可视化化平台。它能够搜索、展示存储在 Elasticsearch 中索引数据。使用它可以很方便的用图表、表格、地图展示和分析数据。

Elasticsearch :分布式搜索引擎。具有高可伸缩、高可靠、易管理等特点。可以用于全文检索、结构化检索和分析,并能将这三者结合起来。Elasticsearch 基于 Lucene 开发,现在使用最广的开源搜索引擎之一,Wikipedia 、StackOverflow、Github 等都基于它来构建自己的搜索引擎。

Filebeat :轻量级数据收集引擎。基于原先 Logstash-fowarder 的源码改造出来。换句话说:Filebeat就是新版的 Logstash-fowarder,也会是 ELK Stack 在 shipper 端的第一选择。

既然要谈 ELK 在沪江系统中的应用,那么 ELK 架构就不得不谈。本次分享主要列举我们曾经用过的 ELK 架构,并讨论各种架构所适合的场景和优劣供大家参考

简单版架构

这里写图片描述

这种架构下我们把 Logstash 实例与 Elasticsearch 实例直接相连。Logstash 实例直接通过 Input 插件读取数据源数据(比如 Java 日志, Nginx 日志等),经过 Filter 插件进行过滤日志,最后通过 Output 插件将数据写入到 ElasticSearch 实例中。

这个阶段,日志的收集、过滤、输出等功能,主要由这三个核心组件组成 Input 、Filter、Output。

Input:输入,输入数据可以是 File 、 Stdin(直接从控制台输入) 、TCP、Syslog 、Redis 、Collectd 等。

Filter:过滤,将日志输出成我们想要的格式。Logstash 存在丰富的过滤插件:Grok 正则捕获、时间处理、JSON 编解码、数据修改 Mutate 。Grok 是 Logstash 中最重要的插件,强烈建议每个人都要使用 Grok Debugger 来调试自己的 Grok 表达式。

1
grok {match => ["message", "(?m)\[%{LOGLEVEL:level}\] \[%{TIMESTAMP_ISO8601:timestamp}\] \[%{DATA:logger}\] \[%{DATA:threadId}\] \[%{DATA:requestId}\] %{GREEDYDATA:msgRawData}"]    }

Output:输出,输出目标可以是 Stdout (直接从控制台输出)、Elasticsearch 、Redis 、TCP 、File 等

这是最简单的一种ELK架构方式,Logstash 实例直接与 Elasticsearch 实例连接。优点是搭建简单,易于上手。建议供初学者学习与参考,不能用于线上的环境。

集群版架构

这里写图片描述

这种架构下我们采用多个 Elasticsearch 节点组成 Elasticsearch 集群,由于 Logstash 与 Elasticsearch 采用集群模式运行,集群模式可以避免单实例压力过重的问题,同时在线上各个服务器上部署 Logstash Agent,来满足数据量不大且可靠性不强的场景。

数据收集端:每台服务器上面部署 Logstash Shipper Agent 来收集当前服务器上日志,日志经过 Logstash Shipper 中 Input插件、Filter插件、Output 插件传输到 Elasticsearch 集群。

数据存储与搜索:Elasticsearch 配置默认即可满足,同时我们看数据重要性来决定是否添加副本,如果需要的话,最多一个副本即可。

数据展示:Kibana 可以根据 Elasticsearch 的数据来做各种各样的图表来直观的展示业务实时状况。

这种架构使用场景非常有限,主要存在以下两个问题:

  • 消耗服务器资源:Logstash 的收集、过滤都在服务器上完成,这就造成服务器上占用系统资源较高、性能方面不是很好,调试、跟踪困难,异常处理困难;
  • 数据丢失:大并发情况下,由于日志传输峰值比较大,没有消息队列来做缓冲,就会导致 Elasticsearch 集群丢失数据。

这个架构相对上个版本略微复杂,不过维护起来同样比较方便,同时可以满足数据量不大且可靠性不强的业务使用。

引入消息队列

这里写图片描述

该场景下面,多个数据首先通过 Lostash Shipper Agent 来收集数据,然后经过 Output 插件将数据投递到 Kafka 集群中,这样当遇到 Logstash 接收数据的能力超过 Elasticsearch 集群处理能力的时候,就可以通过队列就能起到削峰填谷的作用, Elasticsearch 集群就不存在丢失数据的问题。

目前业界在日志服务场景中,使用比较多的两种消息队列为 :Kafka VS Redis。尽管 ELK Stack 官网建议使用 Redis 来做消息队列,但是我们建议采用 Kafka 。主要从下面两个方面考虑:

  • 数据丢失:Redis 队列多用于实时性较高的消息推送,并不保证可靠。Kafka保证可靠但有点延时;
  • 数据堆积:Redis 队列容量取决于机器内存大小,如果超过设置的Max memory,数据就会抛弃。Kafka 的堆积能力取决于机器硬盘大小。

综合上述的理由,我们决定采用 Kafka 来缓冲队列。不过在这种架构下仍然存在一系列问题:

  • Logstash shipper 收集数据同样会消耗 CPU 和内存资源;
  • 不支持多机房部署。

这种架构适合较大集群的应用部署,通过消息队列解决了消息丢失、网络堵塞的问题。

多机房部署

这里写图片描述

随着沪江业务的飞速增长,单机房的架构已经不能满足需求。不可避免的,沪江的业务需要分布到不同机房中,对于日志服务来说也是不小的挑战。当然业界也有不少成熟的方法,比如阿里的单元化、腾讯的 SET 方案等等。

最终我们决定采用单元化部署的方式来解决 ELK 多机房中遇到的问题(延时、专线流量过大等),从日志的产生、收集、传输、存储、展示都是在同机房里面闭环消化,不存在跨机房传输与调用的问题。因为交互紧密的应用尽量部署在同机房,所以这种方案并不会给业务查询造成困扰。

Logstash、Elasticsearch、Kafka、Kibana 四个集群都部署到同一机房中,每个机房都要每个机房自己的日志服务集群,比如A机房业务的日志只能传输给本机房 Kafka ,而A机房 Indexer 集群消费并写入到A机房 Elasticsearch 集群中,并由A机房 Kibana 集群展示,中间任何一个步骤不依赖B机房任何服务。

引入Filebeat

这里写图片描述

Filebeat 是基于原先 logstash-forwarder 的源码改造出来的,无需依赖 Java 环境就能运行,安装包10M不到。

如果日志的量很大,Logstash 会遇到资源占用高的问题,为解决这个问题,我们引入了Filebeat。Filebeat 是基于 logstash-forwarder 的源码改造而成,用 Golang 编写,无需依赖 Java 环境,效率高,占用内存和 CPU 比较少,非常适合作为 Agent 跑在服务器上。

下面看看Filebeat的基本用法。编写配置文件,从 Nginx access.log 中解析日志数据:

1
filebeat.prospectors:- input_type: log  paths: /var/log/nginx/access.log  json.message_key:output.elasticsearch:  hosts: ["localhost"]index: "filebeat-nginx-%{+yyyy.MM.dd}"

我们来看看压测数据:

压测环境

  • 虚拟机 8 cores 64G内存 540G SATA盘
  • Logstash 版本 2.3.1
  • Filebeat 版本 5.5.0

压测方案

Logstash / Filebeat 读取 350W 条日志 到 console,单行数据 580B,8个进程写入采集文件。

压测结果

项目 workers cpu usr 总共耗时 收集速度
Logstash 8 53.7% 210s 1.6w line/s
Filebeat 8 38.0% 30s 11w line/s

Filebeat 所消耗的CPU只有 Logstash 的70%,但收集速度为 Logstash 的7倍。从我们的应用实践来看,Filebeat 确实用较低的成本和稳定的服务质量,解决了 Logstash 的资源消耗问题。

最后,分享给大家一些血泪教训,希望大家以我为鉴。

Indexer 运行一段时间后自动挂掉

突然有一天监控发现日志不消费了,排查下来发现消费 Kafka 数据的 indexer 挂掉了。所以,Indexer 进程也是需要用 supervisor 来监控的,保证它时刻都在运行。

Java异常日志输出

开始我们在通过 grok 切割日志的时候,发现 Java 的 Exception 日志输出之后,会出现换行的问题。后来使用 Logstash codec/multiline 插件来解决。

1
input {    stdin {        codec => multiline {            pattern => "^\["            negate => true            what => "previous"        }    }}

由于时区导致日志8小时时差

Logstash 2.3版本 date插件配置如下,查看解析结果发现@timestamp比中国时间早了8小时。

解决方案 Kibana 读取浏览器的当前时区,然后在页面上转换时间内容的显示。

1
date {match => [ "log_timestamp", "YYYY-MM-dd HH:mm:ss.SSS" ]    target => "@timestamp"  }

Grok parse failure

我们遇到线上 node 日志突然有几天日志查看不出来。后来拉出原始日志对比才发现生成出来的日志格式不正确,同时包含 JSON 格式和非 JSON 格式的日志。但是我们用grok解析的时候采用是 json 格式。建议大家输出日志保证格式一致同时不要出现空格等异常字符,可以使用在线 grok debug来调试正则。

总结

基于 ELK stack 的日志解决方案的优势主要体现于:

  • 可扩展性:采用高可扩展性的分布式系统架构设计,可以支持每日 TB 级别的新增数据。
  • 使用简单:通过用户图形界面实现各种统计分析功能,简单易用,上手快。
  • 快速响应:从日志产生到查询可见,能达到秒级完成数据的采集、处理和搜索统计。
  • 界面炫丽:Kibana 界面上,只需要点击鼠标,就可以完成搜索、聚合功能,生成炫丽的仪表板。

这里写图片描述

参考资料

作者曹林华,原文载于微信公众号「沪江技术学院」。

时间序列数据库简称时序数据库(Time Series Database),用于处理带时间标签(按照时间的顺序变化,即时间序列化)的数据,带时间标签的数据也称为时间序列数据。

时序数据的几个特点

1. 基本上都是插入,没有更新的需求。

2. 数据基本上都有时间属性,随着时间的推移不断产生新的数据。

3. 数据量大,每秒钟需要写入千万、上亿条数据

业务方常见需求

1. 获取最新状态,查询最近的数据(例如传感器最新的状态)

2. 展示区间统计,指定时间范围,查询统计信息,例如平均值,最大值,最小值,计数等。。。

3. 获取异常数据,根据指定条件,筛选异常数据

常见业务场景

监控软件系统: 虚拟机、容器、服务、应用

监控物理系统: 水文监控、制造业工厂中的设备监控、国家安全相关的数据监控、通讯监控、传感器数据、血糖仪、血压变化、心率等

资产跟踪应用: 汽车、卡车、物理容器、运货托盘

金融交易系统: 传统证券、新兴的加密数字货币

事件应用程序: 跟踪用户、客户的交互数据

商业智能工具: 跟踪关键指标和业务的总体健康情况

在互联网行业中,也有着非常多的时序数据,例如用户访问网站的行为轨迹,应用程序产生的日志数据等等。

一些基本概念(不同的时序数据库称呼略有不同)

Metric:  度量,相当于关系型数据库中的 table。

Data point:  数据点,相当于关系型数据库中的 row。

Timestamp:时间戳,代表数据点产生的时间。

Field:  度量下的不同字段。比如位置这个度量具有经度和纬度两个 field。一般情况下存放的是随时间戳而变化的数据。

Tag:  标签。一般存放的是不随时间戳变化的信息。timestamp 加上所有的 tags 可以视为 table 的 primary key。

例如采集有关风的数据,度量为 Wind,每条数据都有时间戳timestamp,两个字段 field:direction(风向)、speed(风速),两个tag:sensor(传感器编号)、city(城市)。第一行和第三行,存放的都是 sensor 编号为86F-2RT8的设备,城市是深圳。随着时间的变化,风向和风速发生了改变,风向从56.4变为45.6,风速从2.9变为3.6。

需要解决的几个问题

时序数据的写入:如何支持每秒钟成千上亿条数据的写入。

时序数据的读取:如何支持在秒级对上亿条数据的分组聚合运算。

成本敏感:海量数据存储带来的成本问题。如何以更低成本存储数据,将成为时序数据库需要解决的重中之重。

常见时序数据库

时序数据库出现的时间较晚,目前较成熟的时序数据库都仅有2、3年的历史。

InfluxDB(单机版免费,集群版收费)最成熟,Kairosdb(底层使用Cassandra),OpenTsdb(底层使用HBase),beringei(Facebook开源),TimeScaleDB(底层基于PostgreSQL),TSDB(百度开源),HiTSDB(阿里开源,底层是PostgreSQL)。

这两年互联网行业掀着一股新风,总是听着各种高大上的新名词。大数据、人工智能、物联网、机器学习、商业智能、智能预警啊等等。

以前的系统,做数据可视化,信息管理,流程控制。现在业务已经不仅仅满足于这种简单的管理和控制了。数据可视化分析,大数据信息挖掘,统计预测,建模仿真,智能控制成了各种业务的追求。

_“所有一切如泪水般消失在时间之中,时间正在死去“_,以前我们利用互联网解决现实的问题。现在我们已经不满足于现实,数据将连接成时间序列,可以往前可以观其历史,揭示其规律性,往后可以把握其趋势性,预测其走势。

于是,我们开始存储大量时间相关的数据(如日志,用户行为等),并总结出这些数据的结构特点和常见使用场景,不断改进和优化,创造了一种新型的数据库分类——时间序列数据库(Time Series Database).

时间序列模型

时间序列数据库主要用于指处理带时间标签(按照时间的顺序变化,即时间序列化)的数据,带时间标签的数据也称为时间序列数据。

每个时序点结构如下:

  • timestamp: 数据点的时间,表示数据发生的时间。

  • metric: 指标名,当前数据的标识,有些系统中也称为name。

  • value: 值,数据的数值,一般为double类型,如cpu使用率,访问量等数值,有些系统一个数据点只能有一个value,多个value就是多条时间序列。有些系统可以有多个value值,用不同的key表示

  • tag: 附属属性。 tsdb

实现

比如我想记录一系列传感器的时间序列数据。数据结构如下:

* 标识符:device_id,时间戳
* 元数据:location_id,dev_type,firmware_version,customer_id
* 设备指标:cpu_1m_avg,free_mem,used_mem,net_rssi,net_loss,电池
* 传感器指标:温度,湿度,压力,CO,NO2,PM10

如果使用传统RDBMS存储,建一张如下结构的表即可:

table

如此便是一个最简单的时间序列库了。但这只是满足了数据模型的需要。我们还需要在性能,高效存储,高可用,分布式和易用性上做更多的事情。

大家可以思考思考,如果让你自己来实现一个时间序列数据库,你会怎么设计,你会考虑哪些性能上的优化,又如何做到高可用,怎样做到简单易用。

Timescale

这个数据库其实就是一个基于传统关系型数据库postgresql改造的时间序列数据库。了解postgresql的同学都知道,postgresql是一个强大的,开源的,可扩展性特别强的一个数据库系统。

于是timescale.inc开发了Timescale,一款兼容sql的时序数据库, 底层存储架构在postgresql上。 作为一个postgresql的扩展提供服务。其特点如下:

基础:

  • PostgreSQL原生支持的所有SQL,包含完整SQL接口(包括辅助索引,非时间聚合,子查询,JOIN,窗口函数)

  • 用PostgreSQL的客户端或工具,可以直接应用到该数据库,不需要更改。

  • 时间为导向的特性,API功能和相应的优化。

  • 可靠的数据存储。

扩展:

  • 透明时间/空间分区,用于放大(单个节点)和扩展

  • 高数据写入速率(包括批量提交,内存中索引,事务支持,数据备份支持)

  • 单个节点上的大小合适的块(二维数据分区),以确保即使在大数据量时即可快速读取。

  • 块之间和服务器之间的并行操作

劣势:

  • 因为TimescaleDB没有使用列存技术,它对时序数据的压缩效果不太好,压缩比最高在4X左右

  • 目前暂时不完全支持分布式的扩展(正在开发相关功能),所以会对服务器单机性能要求较高

其实大家都可以去深入了解一下这个数据库。对RDBMS我们都很熟悉,了解这个可以让我们对RDBMS有更深入的了解,了解其实现机制,存储机制。在对时间序列的特殊化处理之中,我们又可以学到时间序列数据的特点,并学习到如何针对时间序列模型去优化RDBMS。

之后我们也可以写一篇文章来深入的了解一下这个数据库的特点和实现。

Influxdb

Influxdb是业界比较流行的一个时间序列数据库,特别是在IOT和监控领域十分常见。其使用go语言开发,突出特点是性能。

特性:

  • 高效的时间序列数据写入性能。自定义TSM引擎,快速数据写入和高效数据压缩。

  • 无额外存储依赖。

  • 简单,高性能的HTTP查询和写入API。

  • 以插件方式支持许多不同协议的数据摄入,如:graphite,collectd,和openTSDB

  • SQL-like查询语言,简化查询和聚合操作。

  • 索引Tags,支持快速有效的查询时间序列。

  • 保留策略有效去除过期数据。

  • 连续查询自动计算聚合数据,使频繁查询更有效。

Influxdb已经将分布式版本转为闭源。所以在分布式集群这块是一个弱点,需要自己实现。

OpenTSDB

The Scalable Time Series Database. 打开OpenTSDB官网,第一眼看到的就是这句话。其将Scalable作为其重要的特点。OpenTSDB运行在Hadoop和HBase上,其充分利用HBase的特性。通过独立的Time Series Demon(TSD)提供服务,所以它可以通过增减服务节点来轻松扩缩容。

tsdb-architecture

  • Opentsdb是一个基于Hbase的时间序列数据库(新版也支持Cassandra)。

    其基于Hbase的分布式列存储特性实现了数据高可用,高性能写的特性。受限于Hbase,存储空间较大,压缩不足。依赖整套HBase, ZooKeeper

  • 采用无模式的tagset数据结构(sys.cpu.user 1436333416 23 host=web01 user=10001)

    结构简单,多value查询不友好

  • HTTP-DSL查询

OpenTSDB在HBase上针对TSDB的表设计和RowKey设计是值得我们深入学习的一个特点。有兴趣的同学可以找一些详细的资料学习学习。

Druid

Druid是一个实时在线分析系统(LOAP)。其架构融合了实时在线数据分析,全文检索系统和时间序列系统的特点,使其可以满足不同使用场景的数据存储需求。

  • 采用列式存储:支持高效扫描和聚合,易于压缩数据。

  • 可伸缩的分布式系统:Druid自身实现可伸缩,可容错的分布式集群架构。部署简单。

  • 强大的并行能力:Druid各集群节点可以并行地提供查询服务。

  • 实时和批量数据摄入:Druid可以实时摄入数据,如通过Kafka。也可以批量摄入数据,如通过Hadoop导入数据。

  • 自恢复,自平衡,易于运维:Druid自身架构即实现了容错和高可用。不同的服务节点可以根据响应需求添加或减少节点。

  • 容错架构,保证数据不丢失:Druid数据可以保留多副本。另外可以采用HDFS作为深度存储,来保证数据不丢失。

  • 索引:Druid对String列实现反向编码和Bitmap索引,所以支持高效的filter和groupby。

  • 基于时间分区:Druid对原始数据基于时间做分区存储,所以Druid对基于时间的范围查询将更高效。

  • 自动预聚合:Druid支持在数据摄入期就对数据进行预聚合处理。

Druid架构蛮复杂的。其按功能将整个系统细分为多种服务,query、data、master不同职责的系统独立部署,对外提供统一的存储和查询服务。其以分布式集群服务的方式提供了一个底层数据存储的服务。

druid-architecture

Druid在架构上的设计很值得我们学习。如果你不仅仅对时间序列存储感兴趣,对分布式集群架构也有兴趣,不妨看看Druid的架构。另外Druid在segment(Druid的数据存储结构)的设计也是一大亮点,既实现了列式存储,又实现了反向索引。

Elasticsearch

Elasticsearch 是一个分布式的开源搜索和分析引擎,适用于所有类型的数据,包括文本、数字、地理空间、结构化和非结构化数据。Elasticsearch 在 Apache Lucene 的基础上开发而成,由 Elasticsearch N.V.(即现在的 Elastic)于 2010 年首次发布。Elasticsearch 以其简单的 REST 风格 API、分布式特性、速度和可扩展性而闻名。

Elasticsearch以ELK stack被人所熟知。许多公司基于ELK搭建日志分析系统和实时搜索系统。之前我们在ELK的基础上开始开发metric监控系统。即想到了使用Elasticsearch来存储时间序列数据库。对Elasticserach的mapping做相应的优化,使其更适合存储时间序列数据模型,收获了不错的效果,完全满足了业务的需求。后期发现Elasticsearch新版本竟然也开始发布Metrics组件和APM组件,并大量的推广其全文检索外,对时间序列的存储能力。真是和我们当时的想法不谋而合。

Elasticsearch的时序优化可以参考一下这篇文章:《elasticsearch-as-a-time-series-data-store》

也可以去了解一下Elasticsearch的Metric组件:Elastic Metrics

Beringei

Beringei是Facebook在2017年最新开源的一个高性能内存时序数据存储引擎。其具有快速读写和高压缩比等特性。

2015年Facebook发表了一篇论文《Gorilla: A Fast, Scalable, In-Memory Time Series Database 》,Beringei正是基于此想法实现的一个时间序列数据库。

Beringei使用Delta-of-Delta算法存储数据,使用XOR编码压缩数值。使其可以用很少的内存即可存储下大量的数据。

如何选择一个适合自己的时间序列数据库

  • Data model

    时间序列数据模型一般有两种,一种无schema,具有多tag的模型,还有一种name、timestamp、value型。前者适合多值模式,对复杂业务模型更适合。后者更适合单维数据模型。

  • Query language

    目前大部分TSDB都支持基于HTTP的SQL-like查询。

  • Reliability

    可用性主要体现在系统的稳定高可用上,以及数据的高可用存储上。一个优秀的系统,应该有一个优雅而高可用的架构设计。简约而稳定。

  • Performance

    性能是我们必须考虑的因素。当我们开始考虑更细分领域的数据存储时,除了数据模型的需求之外,很大的原因都是通用的数据库系统在性能上无法满足我们的需求。大部分时间序列库倾向写多读少场景,用户需要平衡自身的需求。下面会有一份各库的性能对比,大家可以做一个参考。

  • Ecosystem

    我一直认为生态是我们选择一个开源组件必须认真考虑的问题。一个生态优秀的系统,使用的人多了,未被发现的坑也将少了。另外在使用中遇到问题,求助于社区,往往可以得到一些比较好的解决方案。另外好的生态,其周边边界系统将十分成熟,这让我们在对接其他系统时会有更多成熟的方案。

  • Operational management

    易于运维,易于操作。

  • Company and support

    一个系统其背后的支持公司也是比较重要的。背后有一个强大的公司或组织,这在项目可用性保证和后期维护更新上都会有较大的体验。

性能对比

 

Timescale

InfluxDB

OpenTSDB

Druid

Elasticsearch

Beringei

write(single node)

15K/sec

470k/sec

32k/sec

25k/sec

30k/sec

10m/sec

write(5 node)

 

 

128k/sec

100k/sec

120k/sec

 

总结

可以按照以下需求自行选择合适的存储:

  • 小而精,性能高,数据量较小(亿级): InfluxDB

  • 简单,数据量不大(千万级),有联合查询、关系型数据库基础:timescales

  • 数据量较大,大数据服务基础,分布式集群需求: opentsdb、KairosDB

  • 分布式集群需求,olap实时在线分析,资源较充足:druid

  • 性能极致追求,数据冷热差异大:Beringei

  • 兼顾检索加载,分布式聚合计算: elsaticsearch

  • 如果你兼具索引和时间序列的需求。那么Druid和Elasticsearch是最好的选择。其性能都不差,同时满足检索和时间序列的特性,并且都是高可用容错架构。

最后

之后我们可以来深入了解一两个TSDB,比如Influxdb,OpenTSDB,Druid,Elasticsearch等。并可以基于此学习一下行存储与列存储的不同,LSM的实现原理,数值数据的压缩,MMap提升读写性能的知识等。

系列推荐

Mysql:小主键,大问题
Mysql大数据量问题与解决
你应该知道一些其他存储——列式存储
时间序列数据库(TSDB)初识与选择(InfluxDB、OpenTSDB、Druid、Elasticsearch对比)
十分钟了解Apache Druid(集数据仓库、时间序列、全文检索于一体的存储方案)
Apache Druid 底层存储设计(列存储与全文检索)
Apache Druid 的集群设计与工作流程

关注公众号,深入了解TSDB,Druid。获取更多技术内容。

公众号

时间序列数据库(TSDB)初识与选择

机器学习库sklearn的K-Means聚类算法的使用方法 - 知乎

Excerpt

1,本Notebook背景介绍之前我们已经发布过一篇有关K-means算法实验的Notebook:《 Jupyter Notebook使用Python做K-Means聚类分析》,在那篇Notebook里,我们从K-Means的基础原理和实现方法开始讨论,实际上K-Means…


1,本Notebook背景介绍

之前我们已经发布过一篇有关K-means算法实验的Notebook:《Jupyter Notebook使用Python做K-Means聚类分析》,在那篇Notebook里,我们从K-Means的基础原理和实现方法开始讨论,实际上K-Means作为一个常用的算法,在众多机器学习程序库中都有现成的函数可以调用。今天这篇notebook主要演示怎样调用sklearn的K-Means函数。

我们先简单回顾一下上一篇notebook的内容,罗列如下:

1.什么是K-means聚类算法

2.K-means聚类算法应用场景

3.K-means聚类算法步骤

4.K-means不适合的数据集

5.准备测试数据

6.基于python原生代码做K-Means聚类分析实验

7.使用matplotlib进行可视化输出

面对这么多内容,有同学反馈给我说,他只想使用K-Means做一些社会科学计算,不想费脑筋搞明白K-Means是怎么实现的。

好吧,调用机器学习库中的函数是最合适的,只要按照要求准备好样本数据,调用一个函数就把问题解决了。那么,我们今天就另发布一个使用机器学习库sklearn的k-means聚类算法的Notebook。

1.1 sklearn库简介

转载知乎文章《sklearn库主要模块功能简介》的介绍如下:

sklearn,全称scikit-learn,是python中的机器学习库,建立在numpy、scipy、matplotlib等数据科学包的基础之上,涵盖了机器学习中的样例数据、数据预处理、模型验证、特征选择、分类、回归、聚类、降维等几乎所有环节,功能十分强大,目前sklearn版本是0.23。与深度学习库存在pytorch、TensorFlow等多种框架可选不同,sklearn是python中传统机器学习的首选库,不存在其他竞争者。

1.2 基本原理

K-means是无监督学习的代表。主要目的是聚类,聚类的依据就是样本之间的距离。比如要分为K类。步骤是:

1. 随机选取K个点。

2. 计算每个点到K个质心的距离,分成K个簇。

3. 计算K个簇样本的平均值作新的质心

4. 循环2、3

5. 位置不变,距离完成

2, 第三方库

本notebook使用了sklearn库做k-means算法实验。

如果未安装,请先使用下面的命令安装sklearnm库,再运行实验本notebook:

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple sklearn #国内安装使用清华的源,速度快

3,本notebook所做的测试

基于测试数据和sklearn官网的例子,在Jupyter Notebook中使用Python做K-Means算法实验。

导入sklearn下的K-means模块。sklearn,全称scikit-learn,是python中的机器学习库,建立在numpy、scipy、matplotlib等数据科学包的基础之上,涵盖了机器学习中的样例数据、数据预处理、模型验证、特征选择、分类、回归、聚类、降维等几乎所有环节,功能十分强大,目前sklearn版本是0.23。

1
# coding:utf-8 from sklearn.cluster import KMeans

5,引入matplotlib库

matplotlib是一款命令式、较底层、可定制性强、图表资源丰富、简单易用、出版质量级别的python 2D绘图库。

matplotlib算是python绘图的元老级库,类似编程语言里的C语言。很多其它的python绘图库是基于matplotlib开发的,比如seaborn、ggplot、plotnine、holoviews、basemap等。

matplotlib可用于python脚本、python shell、jupyter notebook、web等。

最适合来运行matplotlib绘图的工具是jupyter notebook,本Notebook也是基于该工具做可视化实验:交互式操作,在浏览器上运行代码,能直接显示运行结果和图表,

1
import matplotlib.pyplot as plt

6,生成模拟数据

参看官网网页Generated Datasets,sklearn提供了一些方法,可以生成测试用数据集,生成过程中可以控制多个参数,便于验证算法。参看《sklearn中的make_blobs()函数详解》。

下面我们生成一个测试用数据集,含有500个样本,每个样本2个特征。那么函数返回的结果数据有:

X:是一个数组,存储生成的样本,格式是[n_samples, n_features],详见下面的测试用数据集的查看结果

y:是一个数组,里面是整数,表示每个样本所属的类别,格式是[n_samples]

下面我们生成一个含有2个特征的样本数据,两个特征便于在平面上显示出来,两个特征对应画图坐标轴x和y

1
from sklearn.datasets import make_blobs import matplotlib.pyplot as plt X, y = make_blobs(n_samples=500, # 500个样本 n_features=2, # 每个样本2个特征 centers=4, # 4个中心 random_state=1 #控制随机性 )

查看一下X和y两个数组的格式

输出结果:

array([[-6.92324165e+00, -1.06695320e+01],

[-8.63062033e+00, -7.13940564e+00],

[-9.63048069e+00, -2.72044935e+00],

…….

[-8.96014913e+00, -8.06349899e+00],

[-7.66603898e+00, -7.59715459e+00],

[-6.46534407e+00, -2.85544633e+00]])

输出结果:

array([2, 2, 1, 0, 3, 0, 3, 3, 1, 3, 2, 2, 3, 0, 3, 2, 1, 2, 0, 3, 1, 1,

3, 0, 3, 3, 0, 0, 1, 3, 2, 0, 3, 2, 3, 2, 1, 1, 2, 1, 3, 1, 0, 3,

3, 2, 1, 3, 0, 0, 0, 1, 1, 3, 2, 1, 1, 1, 1, 3, 0, 0, 1, 3, 0, 3,

……

1, 0, 0, 2, 1, 3, 0, 3, 3, 2, 2, 3, 2, 1, 1, 2, 1, 2, 1, 0, 2, 0,

1, 3, 0, 1, 3, 0, 2, 3, 0, 0, 1, 3, 1, 3, 2, 0, 2, 3, 0, 2, 2, 2,

1, 0, 3, 2, 3, 3, 1, 1, 2, 3, 3, 3, 3, 3, 3, 2, 3, 1, 2, 3, 0, 3,

0, 3, 1, 3, 0, 0, 0, 1, 3, 1, 2, 1, 0, 3, 2, 0, 2, 0, 2, 3, 0, 0,

2, 1, 3, 2, 1, 1, 1, 2, 3, 0, 1, 3, 2, 2, 2, 3])

7,使用matplotlib画图

对于上面已生成的模拟数据,使用matplotlib.pyplot画出图像

1
color = ['red', 'pink','orange','gray'] fig, axi1=plt.subplots(1) for i in range(4): axi1.scatter(X[y==i, 0], X[y==i,1], marker='o', s=8, c=color[i] ) plt.show()

8,K-means聚类之一:k=3

使用sklearn的KMeans模块进行聚类分析,可以设置要聚几类。

此处k设置为3

1
from sklearn.cluster import KMeans n_clusters=3 cluster = KMeans(n_clusters=n_clusters,random_state=0).fit(X)

9,查看聚类后的质心

k=3的情况下,会有3个质心

1
centroid=cluster.cluster_centers_ centroid

输出结果:

array([[-8.09286791, -3.50997357],

[-1.54234022, 4.43517599],

[-7.0877462 , -8.08923534]])

10,使用matplotlib画图

使用matplotlib.pyplot画出聚类后的图像

1
y_pred = cluster.labels_#获取训练后对象的每个样本的标签 centtrod = cluster.cluster_centers_ color=['red','pink','orange','gray'] fig, axi1=plt.subplots(1) for i in range(n_clusters): axi1.scatter(X[y_pred==i, 0], X[y_pred==i, 1], marker='o', s=8, c=color[i]) axi1.scatter(centroid[:,0],centroid[:,1],marker='x',s=100,c='black')

11,K-means聚类之一:k=4

使用sklearn的KMeans模块进行聚类分析

此处k设置为4

1
n_clusters=4 cluster2 = KMeans(n_clusters=n_clusters,random_state=0).fit(X)

12,查看聚类后的质心

k=4的情况下,会有4个质心

1
centroid=cluster2.cluster_centers_ centroid

输出结果:

array([[-10.00969056, -3.84944007],

[ -1.54234022, 4.43517599],

[ -6.08459039, -3.17305983],

[ -7.09306648, -8.10994454]])

13,使用matplotlib画图

使用matplotlib.pyplot画出聚类后的图像

1
y_pred = cluster2.labels_#获取训练后对象的每个样本的标签 centtrod = cluster2.cluster_centers_ color=['red','pink','orange','gray'] fig, axi1=plt.subplots(1) for i in range(n_clusters): axi1.scatter(X[y_pred==i, 0], X[y_pred==i, 1], marker='o', s=8, c=color[i]) axi1.scatter(centroid[:,0],centroid[:,1],marker='x',s=100,c='black')

14,总结

本文使用生成的样本数据集,如果是真实的样本数据,那么设法存成上述格式的X数组,然后交给Kmeans模型的fit()函数计算即可。我们将在后续的notebook讲解怎样利用Kmeans模型针对真实数据做计算。

数据仓储模式UnitOfWorks和Repository的实现(

网上看了相关内容关于UnitOfWorks和Repository的数据仓储模式的实现,也来动手搭建下。

ORM使用微软自己的EF来实现

建立一个项目,使用EF,我采用的是DBFirst。建立好连接,连上数据库拖入我要的表,DBFirst有时候还是挺方便的。

然后就要开始实现这个数据仓储模式了

建立泛型接口IUnitOfWorks和IRepository

具体实现代码:

1
public interface IUnitOfWorks<TContext>    {bool IsCommit { get; set; }void SetDb(TContext context);IQueryable<T> All<T>() where T : class;IQueryable<T> Where<T>(Expression<Func<T, bool>> whereLambda) where T : class;int Count<T>() where T : class;int Count<T>(Expression<Func<T, bool>> whereLambda) where T : class;int Add<T>(T model) where T : class;int Update<T>(T model) where T : class;int Update<T>(T model, params string[] proName) where T : class;int Delete<T>(T model) where T : class;int Delete<T>(Expression<Func<T, bool>> whereLambda) where T : class;int SaveChanges(bool validatonSave = true);void Dispose();    }

对数据的各种操作

1
public interface IRepository<T>where T : class    {IQueryable<T> All();IQueryable<T> Where(Expression<Func<T, bool>> whereLambda);int Count();int Count(Expression<Func<T, bool>> whereLambda);int Add(T model, bool IsCommit = false);int Update(T model, bool IsCommit = false);int Update(T model, bool IsCommit=false,params string[] proName);int Delete(T model,bool IsCommit=false);int Delete(Expression<Func<T, bool>> whereLambda,bool IsCommit=false);    }

对于IsCommit是否默认提交,设置成可选参数,默认设置成不直接提交需要最终统一提交以实现事务操作。当然也可以直接设置IsCommit为True那么可以单个操作提交.

看起来这两接口是不是挺像的

接下来是这两接口的具体实现

1
public class Repository<TContext, T> : IRepository<T>where TContext : DbContextwhere T : class    {protected TContext context;protected DbSet<T> dbSet;protected T entity;public Repository(TContext dbcontext)        {            context = dbcontext;            dbSet = dbcontext.Set<T>();        }public IQueryable<T> All()        {return dbSet.AsQueryable();        }public IQueryable<T> Where(System.Linq.Expressions.Expression<Func<T, bool>> whereLambda)        {return dbSet.Where(whereLambda);        }public int Count()        {return dbSet.Count();        }public int Count(System.Linq.Expressions.Expression<Func<T, bool>> whereLambda)        {return dbSet.Where(whereLambda).Count();        }public int Add(T model, bool IsCommit=false)        {            dbSet.Add(model);int i_flag = IsCommit ? context.SaveChanges() : 0;return i_flag;        }public int Update(T model, bool IsCommit = false)        {var entry = context.Entry<T>(model);            entry.State = EntityState.Modified;            dbSet.Attach(model);int i_flag = IsCommit ? context.SaveChanges() : 0;return i_flag;        }public int Update(T model, bool IsCommit=false,params string[] proName)        {var entry = context.Entry<T>(model);            entry.State = EntityState.Unchanged;foreach (string s in proName)            {                entry.Property(s).IsModified = true;            }int i_flag = IsCommit ? context.SaveChanges() : 0;return i_flag;        }        public int Delete(T model,bool IsCommit=false)        {            //var entry = context.Entry<T>(model);            dbSet.Attach(model);            dbSet.Remove(model);int i_flag = IsCommit ? context.SaveChanges() : 0;return i_flag;        }public int Delete(System.Linq.Expressions.Expression<Func<T, bool>> whereLambda,bool IsCommit=false)        {var enties = dbSet.Where(whereLambda).ToList();foreach (var item in enties)            {                dbSet.Remove(item);            }int i_flag = IsCommit ? context.SaveChanges() : 0;return i_flag;        }    }
1
public class UnitOfWorks<TContext> : IUnitOfWorks<TContext>where TContext : DbContext    {protected TContext dbContext;protected bool _IsCommit=false;public bool IsCommit { get { return _IsCommit; } set { _IsCommit = value; } }public UnitOfWorks()        {            dbContext = (TContext)EFContextFactory.GetDbContext();        }public void SetDb(TContext context)        {            dbContext = context;        }private IDictionary<Type, object> RepositoryDic = new Dictionary<Type, object>();protected IRepository<T> GenericRepository<T>() where T : class        {return new Repository<DbContext, T>(dbContext);        }public IRepository<T> GetRepository<T>()where T : class        {            IRepository<T> repository = null;var key = typeof(T);if (RepositoryDic.ContainsKey(key))            {                repository = (IRepository<T>)RepositoryDic[key];            }else            {                repository = GenericRepository<T>();                RepositoryDic.Add(key, repository);            }return repository;        }public IQueryable<T> All<T>() where T : class        {return GetRepository<T>().All();        }public IQueryable<T> Where<T>(Expression<Func<T, bool>> whereLambda) where T : class        {return GetRepository<T>().Where(whereLambda);        }public int Count<T>() where T : class        {return GetRepository<T>().Count();        }public int Count<T>(Expression<Func<T, bool>> whereLambda) where T : class        {return GetRepository<T>().Count(whereLambda);        }public int Add<T>(T model) where T : class        {return GetRepository<T>().Add(model, IsCommit);        }public int Update<T>(T model) where T : class        {return GetRepository<T>().Update(model, IsCommit);        }public int Update<T>(T model, params string[] proName) where T : class        {return GetRepository<T>().Update(model,IsCommit, proName);        }public int Delete<T>(T model) where T : class        {return GetRepository<T>().Delete(model, IsCommit);        }public int Delete<T>(Expression<Func<T, bool>> whereLambda) where T : class        {return GetRepository<T>().Delete(whereLambda, IsCommit);        }public int SaveChanges(bool validatonSave = true)        {if (!validatonSave)                dbContext.Configuration.ValidateOnSaveEnabled = false;return dbContext.SaveChanges();        }public void Dispose()        {if (dbContext != null)                dbContext.Dispose();            GC.SuppressFinalize(this);        }    }

具体使用方法:

使用IOC的方式,属性注入来实现

 public IUnitOfWorks uow { get; set; }

//查询

var query = uow.Where(b=>b.id==”test”);

//新增 对表employee进行新增人员

employee empa = new employee();

employee empb = new employee();

empa.id=”001”;

empa.name=”a”;

empb.id=”002”;

empb.name=”b”;

uow().add(empa);

uow().add(empb);

uow.savechange();//实现统一提交

数据库设计规范

1.【推荐】字段允许适当冗余,以提高查询性能,但必须考虑数据一致。冗余字段应遵循:

  • 不是频繁修改的字段。
  • 不是 varchar 超长字段,更不能是 text 字段。

2.【推荐】单表行数超过 500 万行或者单表容量超过 2GB,才推荐进行分库分表。 说明:如果预计2年后的数据量根本达不到这个级别,请不要在创建表时就分库分表。

3.【推荐】id必须是主键,每个表必须有主键,且保持增长趋势的, 小型系统可以依赖于 MySQL 的自增主键,大型系统或者需要分库分表时才使用内置的 ID 生成器

4.【强制】id类型没有特殊要求,必须使用bigint unsigned,禁止使用int,即使现在的数据量很小。id如果是数字类型的话,必须是8个字节。参见最后例子

  • 方便对接外部系统,还有可能产生很多废数据
  • 避免废弃数据对系统id的影响
  • 未来分库分表,自动生成id,一般也是8个字节

5.【推荐】字段尽量设置为 NOT NULL, 为字段提供默认值。 如字符型的默认值为一个空字符值串’’;数值型默认值为数值 0;逻辑型的默认值为数值 0;

6.【推荐】每个字段和表必须提供清晰的注释

7.【推荐】时间统一格式:‘YYYY-MM-DD HH:MM:SS’

8.【强制】更新数据表记录时,必须同时更新记录对应的 gmt_modified 字段值为当前时间,

命名规范

1.【强制】表达是与否概念的字段,必须使用 is_xxx 的方式命名,数据类型是 unsigned tinyint ( 1表示是,0表示否)。
说明:任何字段如果为非负数,必须是 unsigned。

正例:表达逻辑删除的字段名 is_deleted,1 表示删除,0 表示未删除。

2.【强制】表名、字段名必须使用小写字母或数字,禁止出现数字开头,禁止两个下划线中间只 出现数字。数据库字段名的修改代价很大,因为无法进行预发布,所以字段名称需要慎重考虑。 说明:MySQL 在 Windows 下不区分大小写,但在 Linux 下默认是区分大小写。因此,数据库 名、表名、字段名,都不允许出现任何大写字母,避免节外生枝。 正例:health_user,rdc_config,level3_name 反例:HealthUser,rdcConfig,level_3_name
3.【强制】表名不使用复数名词。 说明:表名应该仅仅表示表里面的实体内容,不应该表示实体数量,对应于 DO 类名也是单数 形式,符合表达习惯。
4.【强制】禁用保留字,如 desc、range、match、delayed 等,请参考 MySQL 官方保留字。
5.【强制】主键索引名为 pk_字段名;唯一索引名为 uk_字段名;普通索引名则为 idx_字段名。
说明:pk_ 即 primary key;uk_ 即 unique key;idx_ 即 index 的简称。
6.【强制】小数类型为 decimal,禁止使用 float 和 double。
说明:float 和 double 在存储的时候,存在精度损失的问题,很可能在值的比较时,得到不 正确的结果。如果存储的数据范围超过 decimal 的范围,建议将数据拆成整数和小数分开存储。
7.【强制】如果存储的字符串长度几乎相等,使用 char 定长字符串类型。
8.【强制】varchar 是可变长字符串,不预先分配存储空间,长度不要超过 5000,如果存储长 度大于此值,定义字段类型为 text,独立出来一张表,用主键来对应,避免影响其它字段索 引效率。
9.【强制】表必备三字段:id, is_delete,gmt_create, gmt_modified。 说明:其中id必为主键,类型为unsigned bigint、单表时自增、步长为1。gmt_create, gmt_modified 的类型均为 date_time 类型,前者现在时表示主动创建,后者过去分词表示被 动更新。
10.【强制】所有命名必须使用全名,有默认约定的除外,如果超过 30 个字符,使用缩写,请尽量名字易懂简短,如 description –> desc;information –> info;address –> addr 等
11.【推荐】表的命名最好是加上“业务名称_表的作用”。 正例:health_user / trade_config
12.【推荐】库名与应用名称尽量一致。如health
13.【推荐】如果修改字段含义或对字段表示的状态追加时,需要及时更新字段注释
14.【推荐】所有时间字段,都以 gmt_开始,后面加上动词的过去式,最后不要加上 time 单词,例如 gmt_create

类型规范

1.表示状态字段(0-255)的使用 TINYINT UNSINGED,禁止使用枚举 类型,注释必须清晰地说明每个枚举的含义,以及是否多选等

2.表示boolean类型的都使用TINYINT(1),因为mysql本身是没有boolean类型的,在自动生成代码的时候,DO对象的字段就是boolean类型,例如 is_delete;其余所有时候都使用TINYINT(4)

TINYINT(4),这个括号里面的数值并不是表示使用多大空间存储,而是最大显示宽度,并且只有字段指定zerofill时有用,没有zerofill,(m)就是无用的,例如id BIGINT ZEROFILL NOT NULL,所以建表时就使用默认就好了,不需要加括号了,除非有特殊需求,例如TINYINT(1)代表boolean类型。

TINYINT(1),TINYINT(4)都是存储一个字节,并不会因为括号里的数字改变。例如TINYINT(4)存储22则会显示0022,因为最大宽度为4,达不到的情况下用0来补充。

3.【参考】合适的字符存储长度,不但节约数据库表空间、节约索引存储,更重要的是提升检索速度。

类型 字节 表示范围
tinyint 1 无符号值: 0~255;有符号值: -128~127
smallint 2 无符号值: 0~65536;有符号值: -32768~32767
mediumint 3 无符号值: 0~16777215;有符号值: -8388608~8388607
int 4 无符号值: 04294967295;有符号值: -21474836482147483647
bigint 8 无符号值: 0~((2³²×²)-1);有符号值: -(2³²×²)/2 ~ (2³²×²)/2-1

4.非负的数字类型字段,都添加上 UNSINGED, 如可以使用 INT UNSINGED 字段存 IPV4

5.时间字段使用时间日期类型,不要使用字符串类型存储,日期使用DATE类型,年使用YEAR类型,日期时间使用DATETIME

6.字符串VARCHAR(N), 其中 N表示字符个数,请尽量减少 N 的大小,参考:code VARCHAR(32);name VARCHAR(32);memo VARCHAR(512);

7.Blob 和 Text 类型所存储的数据量大,删除和修改操作容易在数 据表里产生大量的碎片,避免使用 Blob 或 Text 类型

索引规范

1.【强制】业务上具有唯一特性的字段,即使是多个字段的组合,也必须建成唯一索引。

不要以为唯一索引影响了 insert 速度,这个速度损耗可以忽略,但提高查找速度是明 显的;另外,即使在应用层做了非常完善的校验控制,只要没有唯一索引,根据墨菲定律,必 然有脏数据产生。

2.【强制】超过三个表禁止 join。需要 join 的字段,数据类型必须绝对一致;多表关联查询时, 保证被关联的字段需要有索引。

即使双表 join 也要注意表索引、SQL 性能。

3.【强制】在 varchar 字段上建立索引时,必须指定索引长度,没必要对全字段建立索引,根据 实际文本区分度决定索引长度即可。 说明:索引的长度与区分度是一对矛盾体,一般对字符串类型数据,长度为 20 的索引,区分度会高达 90%以上,可以使用 count(distinct left(列名, 索引长度))/count(*)的区分度来确定。

4.【强制】页面搜索严禁左模糊或者全模糊,如果需要请走搜索引擎来解决。

索引文件具有 B-Tree 的最左前缀匹配特性,如果左边的值未确定,那么无法使用此索引。

5.【推荐】如果有 order by 的场景,请注意利用索引的有序性。order by 最后的字段是组合索引的一部分,并且放在索引组合顺序的最后,避免出现 file_sort 的情况,影响查询性能。

正例:where a=? and b=? order by c; 索引:a_b_c 反例:索引中有范围查找,那么索引有序性无法利用,如:WHERE a>10 ORDER BY b; 索引 a_b 无法排序。

6.【推荐】利用覆盖索引来进行查询操作,避免回表。
说明:如果一本书需要知道第11章是什么标题,会翻开第11章对应的那一页吗?目录浏览一下就好,这个目录就是起到覆盖索引的作用。

正例:能够建立索引的种类:主键索引、唯一索引、普通索引,而覆盖索引是一种查询的效果,用explain的结果,extra列会出现:using index。

7.【推荐】利用延迟关联或者子查询优化超多分页场景。
说明:MySQL并不是跳过 offset 行,而是取 offset+N 行,然后返回放弃前 offset 行,返回 N 行,那当 offset 特别大的时候,效率就非常的低下,要么控制返回的总页数,要么对超过特定阈值的页数进行 SQL 改写。

正例:先快速定位需要获取的 id 段,然后再关联:

SELECT a.* FROM 表 1 a, (select id from 表 1 where 条件 LIMIT 100000,20 ) b where a.id=b.id

8.【推荐】SQL 性能优化的目标:至少要达到 range 级别,要求是 ref 级别,如果可以是 consts 最好。

说明:

  • consts 单表中最多只有一个匹配行(主键或者唯一索引),在优化阶段即可读取到数据。
  • ref 指的是使用普通的索引(normal index)。
  • range 对索引进行范围检索。
    反例:explain 表的结果,type=index,索引物理文件全扫描,速度非常慢,这个 index 级 别比较 range 还低,与全表扫描是小巫见大巫。

9.【推荐】建组合索引的时候,区分度最高的在最左边。
正例:如果 where a=? and b=? ,a 列的几乎接近于唯一值,那么只需要单建 idx_a 索引即 可。

说明:存在非等号和等号混合判断条件时,在建索引时,请把等号条件的列前置。如:where a>? and b=? 那么即使 a 的区分度更高,也必须把 b 放在索引的最前列。

10【推荐】防止因字段类型不同造成的隐式转换,导致索引失效。

11.【参考】创建索引时避免有如下极端误解

  • 宁滥勿缺。认为一个查询就需要建一个索引。
  • 宁缺勿滥。认为索引会消耗空间、严重拖慢更新和新增速度。
  • 抵制惟一索引。认为业务的惟一性一律需要在应用层通过“先查后插”方式解决。

12. 总结

1
2
3
4
5
6
7
8
9
10
11
12
• 索引占磁盘空间,不要重复的索引,尽量短  
• 只给常用的查询条件加索引
• 过滤性高的列建索引,取值范围固定的列不建索引
• 唯一的记录添加唯一索引
• 频繁更新的列不要建索引
• 不要对索引列运算
• 同样过滤效果下,保持索引长度最小
• 合理利用组合索引,注意索引字段先后顺序
• 多列组合索引,过滤性高的字段最前
• order by 字段建立索引,避免 filesort
• 组合索引,不同的排序顺序不能使用索引
• <>!=无法使用索引

SQL规范

1.【强制】不要使用 count(列名)或 count(常量)来替代 count(),count()是 SQL92 定义的 标准统计行数的语法,跟数据库无关,跟 NULL 和非 NULL 无关。

count(*)会统计值为 NULL 的行,而 count(列名)不会统计此列为 NULL 值的行。

2.【强制】count(distinct col) 计算该列除 NULL 之外的不重复行数,

count(distinct col1, col2) 如果其中一列全为NULL,那么即使另一列有不同的值,也返回为0。

3.【强制】当某一列col的值全是 NULL 时,count(col)的返回结果为 0,但 sum(col)的返回结果为 NULL,因此使用 sum()时需注意 NPE 问题。

正例:可以使用如下方式来避免sum的NPE问题:SELECT IF(ISNULL(SUM(g)),0,SUM(g)) FROM table;

4.【强制】使用 ISNULL()来判断是否为 NULL 值。 说明:NULL 与任何值的直接比较都为 NULL。

  • NULL<>NULL的返回结果是NULL,而不是false。
  • NULL=NULL的返回结果是NULL,而不是true。
  • NULL<>1的返回结果是NULL,而不是true。

5.【强制】 在代码中写分页查询逻辑时,若 count 为 0 应直接返回,避免执行后面的分页语句。

6.【强制】不得使用外键与级联,一切外键概念必须在应用层解决。 说明:以学生和成绩的关系为例,学生表中的 student_id 是主键,那么成绩表中的 student_id 则为外键。如果更新学生表中的 student_id,同时触发成绩表中的 student_id 更新,即为 级联更新。外键与级联更新适用于单机低并发,不适合分布式、高并发集群;级联更新是强阻 塞,存在数据库更新风暴的风险;外键影响数据库的插入速度。

7.【强制】禁止使用存储过程,存储过程难以调试和扩展,更没有移植性。

8.【强制】数据订正时,删除和修改记录时,要先 select,避免出现误删除,确认无误才能执行更新语句。

9.【推荐】in操作能避免则避免,若实在避免不了,需要仔细评估 in 后边的集合元素数量,控
制在 1000 个之内。

10.【参考】如果有全球化需要,所有的字符存储与表示,均以 utf-8 编码,注意字符统计函数 的区别。

说明:

SELECT LENGTH(“轻松工作”); 返回为12
SELECT CHARACTER_LENGTH(“轻松工作”); 返回为4 如果需要存储表情,那么选择 utfmb4 来进行存储,注意它与 utf-8 编码的区别。

11.【参考】TRUNCATE TABLE 比 DELETE 速度快,且使用的系统和事务日志资源少,但 TRUNCATE 无事务且不触发trigger,有可能造成事故,故不建议在开发代码中使用此语句。 说明:TRUNCATE TABLE 在功能上与不带 WHERE 子句的 DELETE 语句相同。

12.【推荐】不要写一个大而全的数据更新接口。传入为 POJO 类,不管是不是自己的目标更新字 段,都进行 update table set c1=value1,c2=value2,c3=value3; 这是不对的。执行 SQL 时,不要更新无改动的字段,一是易出错;二是效率低;三是增加 binlog 存储。

13.总结

1
2
3
4
5
6
7
8
9
10
11
12
13
• 能够快速缩小结果集的 WHERE 条件写在前面,如果有恒量条 件,也尽量放在前面 ,例如 where 1=1  
• 避免使用 GROUP BY、DISTINCT 等语句的使用,避免联表查 询和子查询
• 能够使用索引的字段尽量进行有效的合理排列
• 针对索引字段使用 >, >=, =, <, <=, IF NULL 和 BETWEEN 将会 使用索引,如果对某个索引字段进行 LIKE 查询,使用 LIKE ‘%abc%’ 不能使用索引,使用 LIKE ‘abc%’ 将能够使用索引
• 如果在 SQL 里使用了 MySQL部分自带函数,索引将失效
• 避免直接使用 select *,只取需要的字段,增加使用覆盖索引使用的可能
• 对于大数据量的查询,尽量避免在 SQL 语句中使用 order by 字

• 连表查询的情况下,要确保关联条件的数据类型一致,避免嵌
套子查询
• 对于连续的数值,使用 between 代替 in
• where 语句中尽量不要使用 CASE 条件
• 当只要一行数据时使用 LIMIT 1

例子

1
2
3
4
5
6
7
8
9
CREATE TABLE `health_package` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '序号',
`package_id` int unsigned NOT NULL COMMENT '套系 id',
`module_id` int unsigned NOT NULL COMMENT '模块 id',
`is_delete` tinyint unsigned NOT NULL DEFAULT 0 COMMENT '是否删除,0-未删除,1-删除,默认为0',
`gmt_create` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time, common column by DB rules',
`gmt_modified` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modified time,common column by DB rules ',
PRIMARY KEY (`id`)
) COMMENT='This table stores module and package of health for ...';

正文

 前言:“我们有一个订单列表,希望能够根据当前登陆的不同用户看到不同类型的订单数据”、“我们希望不同的用户能看到不同时间段的扫描报表数据”、“我们系统需要不同用户查看不同的生产报表列”。诸如此类,最近经常收到项目上面的客户提出的这种问题,即所谓的“数据权限”,经过开会讨论决定:在目前的开发框架上面搭建一套通用的数据权限功能。

本文原创地址:http://www.cnblogs.com/landeanfen/p/7760803.html

有了上面的引言,自然而然就引出了今天需要和大家讨论的话题——数据权限。作为开发人员,我们肯定知道,一般的系统都离不开权限模块,它是支撑整个系统运行的基础模块。而根据项目类型和需求的不同,权限模块的设计更是大相径庭。但不管怎么变,权限模块从大的方面来说,可以分为两种大的类型:功能权限数据权限

  • 功能权限:主要控制不同的资源主体(用户、角色、组织等)有操作不同的资源的权限。比如常见的不同的角色能访问不同的页面(菜单权限),以及具有操作同一页面的不同功能(按钮权限)等等,都数据功能权限的范畴,这种设计相对比较简单,也比较为大多数系统所通用。当然网上资料、设计思路也可以找到很多。
  • 数据权限:主要控制不同的资源主体(用户、角色、组织等)有查看不同的数据信息的权限,一般来说,数据权限又分为数据行权限数据列权限,通过字面意思不难理解这两者的区别,比如上文“我们有一个订单列表,希望能够根据当前登陆的不同用户看到不同类型的订单数据” 这就是一个典型的数据行权限,而“我们系统需要不同用户查看不同的生产报表列”这就是数据列权限的范畴。由于数据权限和系统的业务逻辑关系非常密切,所以不同的系统设计差异性会非常大。从另一方面来说,由于数据权限和业务逻辑关联性非常强,如果系统的业务逻辑非常复杂,数据权限设计起来也会相对复杂,所以关于数据权限的设计一直没有一种相对通用和使用简单的设计方案。

如果你动手去设计数据权限,当你去各大平台、百度、谷歌查找设计思路的时候,你会发现很难找到有用的资料,很多设计思路局限性非常大。其实原因很简单:数据权限的设计和他人系统关系紧密,一般不太容易拿到你的项目上面直接使用。

当然也有另外部分人说“数据权限并不能作为权限模块去设计”,比如博主看到这样一条评论

从一定程度上来说,这样理解也不为过,如果你觉得你的系统灵活性和配置性不需要那么高,把数据权限的规则在代码里面写死又何妨,博主所在公司的另外一个部门就是这么干的,除了编码量大一点,其实也没什么太大的问题!其实博主说这么多无非是想表达一个观点:没有绝对通用的数据权限设计思路,关键看合不合你用!当然本文的设计思路也是一样,不强制要求,不提通用。设计思路供你参考!

关于权限设计的杂谈就告一段落,凡是点到即止,再说了多了就说烂了。到目前为止,博主找到的一篇写得相对比较好的文章 通用权限管理设计 之 数据权限。这位博主是用sql去实现的,如果是把这个运用到EF里面的话,考虑到EF复杂的导航属性,会有一些问题。接下来说说博主这边想到的设计思路。

先说说博主所在项目的情况,和数据打交道的部分采用EntityFramework+Repository的传统模式去实现的,整个项目从上到下,就是一种典型的”伪DDD”,什么是”伪DDD“?这里不做过多说明,使用过DDD的同仁应该很清楚。下面是设计思路流程图:

第一步:配置数据规则

第二步:页面使用数据规则

 

 以上是一个大致的思路图,总的来说,要实现基于EF的数据权限设计,主要分为两大步骤

1、配置数据规则

配置数据规则这里有三个大的方面:功能模块数据资源角色

  • 功能模块:为什么这里要加上功能模块的约束?是因为博主觉得我们某一个页面在查询数据的时候,会有一个查询的范围,比如订单查询页面肯定只能查询和订单有关联的实体功能,而不可能查询和它没有任何关联的业务。加这个约束更大的意义在于我们动态的构造Lambda去查询实体的时候不会产生”找不到相关联的实体“之类的错误
  • 数据资源:具体对哪种数据资源做数据权限,比如订单的状态不等于取消状态、订单的下单时间小于当前月份等等。
  • 角色:数据资源的主体,还可以是用户、部门、组织等等。

这三者配置之后得到的一个结果就是某一类角色的某一个功能模块对哪个数据资源的数据规则是什么样的。比如有一条销售总监的数据规则,配置销售总监在订单模块里面订单这个实体的订单类型是销售订单的所有数据,这就是针对销售总监在订单模块的数据规则。可能最终数据库存储得到的数据类似这样:

RoleId

FunctionCode

Rules

2

OrderQuery

{“rules”:[{“field”:”Order_Status”,”operate”:”in”,”value”:”[0,1,2]“},{“field”:”Order_Type”,”op”:”equal”,”value”:”1”}],”logicoperate”:”and”}

3

OrderQuery

{“rules”:[{“field”:”Order_Status”,”operate”:”in”,”value”:”[0]“},{“field”:”Product.Categary.Type”,”equal”:”equal”,”value”:”1”}],”logicoperate”:”and”}

5

Product

{“groups”:[

{“rules”:[{“field”:”Order_Status”,”operate”:”in”,”value”:”[0,5,10]“},{“field”:”Order_Type”,”op”:”equal”,”value”:”1 “}],”logicoperate”:”and”},

{“rules”:[{“field”:”LineName”,”operate”:”equal”,”value”:”fenzhuangxian”}]}

],”logicoperate”:”or”}

需要特别说明的是:由于EF有导航属性,这里的Rules在保存的时候如果遇到导航属性,我们的字段值需要这样保存——Product.Categary.Type。因为在我们转换成为lambda表达式的时候导航属性会是这样写:x=>x.Product.Categary.Type==1。这个我们在后面使用这个规则的时候加以说明。

2、使用数据规则

 有了上面的数据规则,接下来就是我们在取数据的时候如何使用了,这里有一点需要说明的是:我们这里需要传两个参数,一个是模块的名称,比如上面的OrderQuery、Product等;第二个是当前用户的角色id,这个可以通过当前登陆用户的id获取到角色。

要使用数据规则,之前博主分享过两篇关于动态Lambda的文章,现在派上用场了。只不过原来只是一些基础类型转lambda,现在涉及到了导航属性,不知道是否可行。博主查阅了一些资料,最终找到了解决方案。

复制代码

//遍历得到属性(包括遍历导航属性)
public Expression GetProperty(Expression source, ParameterExpression para, string Name)
{ string[] propertys = Name.Split(‘.’); if (source == null)
{
source = Expression.Property(para, typeof(Entity).GetProperty(propertys.First()));
} else {
source = Expression.Property(source, propertys.First());
} foreach (var item in propertys.Skip(1))
{
source = GetProperty(source, para, item);
} return source;
}

复制代码

然后测试如下

var oLamadaExtention = new LambdaExpression(); var left = oLamadaExtention.GetProperty(null, Expression.Parameter(typeof(Order), “x”), “Product.Categary.Type”); var value = Expression.Constant(“1”, left.Type); //动态转换类型
var right = Expression.Constant(value, left.Type);
Expression expRes = Expression.Equal(left, right);

测试得到的查询lambda结果为x=>x.Product.Categary.Type==”1”,测试成功!

3、补充一点

对于配置数据规则的时候还有一点比较麻烦的是,如果如何知道哪个功能模块使用哪些实体?不可能直接让用户去写Product.Categary.Type这些复杂的功能吧,如果是这样,谈何体验。那么只有使用另外一种解决思路了——反射EF实体。

反射EF实体的时候如果是导航属性,还得继续反射导航属性的实体,这样一层一层反射下去,最终确实是可以得到形如Product.Categary.Type这个的结构体,但界面如何展现还有待思考。比如思路如下:

以上只是一个设计思路,理论上来说是可以实现的,如有不足,欢迎斧正,谢谢。如果思路没有问题,后续博主会抽时间将这种设计的实现过程展现出来供大家参考,欢迎关注。其中的难点有两个:

1、逐级反射EF的导航属性,以及这个过程如何展现。是通过特性标记,还是开发人员配置;

2、动态Expression在构造Lambda的时候和配置数据的兼容性问题,比如数据类型的兼容性有点难控制。

本文原创出处:http://www.cnblogs.com/landeanfen/

欢迎各位转载,但是未经作者本人同意,转载文章之后必须在文章页面明显位置给出作者和原文连接,否则保留追究法律责任的权利


探索c#之Async、Await剖析 - 蘑菇先生 - 博客园

Excerpt

Async,主线程A逻辑->异步任务线程B逻辑->主线程C逻辑。
注意:这3个步骤是有可能会使用同一个线程的,也可能会使用2个,甚至3个线程。

  1. net4.5的async,抛去语法糖就是Net4.0的Task+状态机。
  2. net4.0的Task, 退化到3.5即是(Thread、Th

阅读目录:

  1. 基本介绍
  2. 基本原理剖析
  3. 内部实现剖析
  4. 重点注意的地方
  5. 总结

基本介绍

Async、Await是net4.x新增的异步编程方式,其目的是为了简化异步程序编写,和之前APM方式简单对比如下。

APM方式,BeginGetRequestStream需要传入回调函数,线程碰到BeginXXX时会以非阻塞形式继续执行下面逻辑,完成后回调先前传入的函数。

1
2
3
HttpWebRequest myReq =(HttpWebRequest)WebRequest.Create(<span>"</span><span>http://cnblogs.com/</span><span>"</span><span>);
myReq.BeginGetRequestStream();
</span><span>//</span><span>to do</span>

Async方式,使用Async标记Async1为异步方法,用Await标记GetRequestStreamAsync表示方法内需要耗时的操作。主线程碰到await时会立即返回,继续以非阻塞形式执行主线程下面的逻辑。当await耗时操作完成时,继续执行Async1下面的逻辑

复制代码

1
2
3
4
5
6
<span>static</span> <span>async</span> <span>void</span><span> Async1()
{
HttpWebRequest myReq </span>= (HttpWebRequest)WebRequest.Create(<span>"</span><span>http://cnblogs.com/</span><span>"</span><span>);
</span><span>await</span><span> myReq.GetRequestStreamAsync();
</span><span>//</span><span>to do</span>
}

复制代码

上面是net类库实现的异步,如果要实现自己方法异步。
APM方式:

1
2
3
<span>public</span> <span>delegate</span> <span>int</span> MyDelegate(<span>int</span><span> x);   
MyDelegate mathDel </span>= <span>new</span> MyDelegate((a) =&gt; { <span>return</span> <span>1</span><span>; });
mathDel.BeginInvoke(</span><span>1</span>, (a) =&gt; { },<span>null</span>);

Async方式:

复制代码

1
2
3
4
5
6
7
<span>static</span> <span>async</span> <span>void</span><span> Async2()
{
</span><span>await</span> Task.Run(() =&gt; { Thread.Sleep(<span>500</span>); Console.WriteLine(<span>"</span><span>bbb</span><span>"</span><span>); });
Console.WriteLine(</span><span>"</span><span>ccc</span><span>"</span><span>);
}
Async2();
Console.WriteLine(</span><span>"</span><span>aaa</span><span>"</span>);

复制代码

对比下来发现,async/await是非常简洁优美的,需要写的代码量更少,更符合人们编写习惯。
因为人的思维对线性步骤比较好理解的。

APM异步回调的执行步骤是:A逻辑->假C回调逻辑->B逻辑->真C回调逻辑,这会在一定程度造成思维的混乱,当一个项目中出现大量的异步回调时,就会变的难以维护。
Async、Await的加入让原先这种混乱的步骤,重新拨正了,执行步骤是:A逻辑->B逻辑->C逻辑。

基本原理剖析

作为一个程序员的自我修养,刨根问底的好奇心是非常重要的。 Async刚出来时会让人有一头雾水的感觉,await怎么就直接返回了,微软怎么又出一套新的异步模型。那是因为习惯了之前的APM非线性方式导致的,现在重归线性步骤反而不好理解。 学习Async时候,可以利用已有的APM方式去理解,以下代码纯属虚构
比如把Async2方法想象APM方式的Async3方法:

复制代码

1
2
3
4
5
6
7
8
9
<span>static</span> <span>async</span> <span>void</span><span> Async3()
{
</span><span>var</span> task= <span>await</span> Task.Run(() =&gt; { Thread.Sleep(<span>500</span>); Console.WriteLine(<span>"</span><span>bbb</span><span>"</span><span>); });
</span><span>//</span><span>注册task完成后回调</span>
task.RegisterCompletedCallBack(() =&gt;<span>
{
Console.WriteLine(</span><span>"</span><span>ccc</span><span>"</span><span>);
});
}</span>

复制代码

上面看其来就比较好理解些的,再把Async3方法想象Async4方法:

复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<span>static</span>  <span>void</span><span> Async4()
{
</span><span>var</span> thread = <span>new</span> Thread(() =&gt;<span>
{
Thread.Sleep(</span><span>500</span><span>);
Console.WriteLine(</span><span>"</span><span>bbb</span><span>"</span><span>);
});
</span><span>//</span><span>注册thread完成后回调</span>
thread.RegisterCompletedCallBack(() =&gt;<span>
{
Console.WriteLine(</span><span>"</span><span>ccc</span><span>"</span><span>);
});
thread.Start();
}</span>

复制代码

这样看起来就非常简单明了,连async都去掉了,变成之前熟悉的编程习惯。虽然代码纯属虚构,但基本思想是相通的,差别在于实现细节上面。

内部实现剖析

作为一个程序员的自我修养,严谨更是不可少的态度。上面的基本思想虽然好理解了,但具体细节呢,编程是个来不得半点虚假的工作,那虚构的代码完全对不住看官们啊。

继续看Async2方法,反编译后的完整代码如下:

View Code

发现async、await不见了,原来又是编译器级别提供的语法糖优化,所以说async不算是全新的异步模型。 可以理解为async更多的是线性执行步骤的一种回归,专门用来简化异步代码编写。
从反编译后的代码看出编译器新生成一个继承IAsyncStateMachine 的状态机结构asyncd(代码中叫d__2,后面简写AsyncD),下面是基于反编译后的代码来分析的

IAsyncStateMachine最基本的状态机接口定义:

1
2
3
4
5
<span>public</span> <span>interface</span><span> IAsyncStateMachine
{
</span><span>void</span><span> MoveNext();
</span><span>void</span><span> SetStateMachine(IAsyncStateMachine stateMachine);
}</span>

既然没有了async、await语法糖的阻碍,就可以把代码执行流程按线性顺序来理解,其整个执行步骤如下:

1. 主线程调用Async2()方法
2. Async2()方法内初始化状态机状态为-1,启动AsyncD
3. MoveNext方法内部开始执行,其task.run函数是把任务扔到线程池里,返回个可等待的任务句柄。MoveNext源码剖析:

//要执行任务的委托

1
Program.CS$&lt;&gt;9__CachedAnonymousMethodDelegate1 = <span>new</span> Action(Program.&lt;Async2&gt;b__0);

//开始使用task做异步,是net4.0基于任务task的编程方式。

1
awaiter =Task.Run(Program.CS$&lt;&gt;9__CachedAnonymousMethodDelegate1).GetAwaiter();

//设置状态为0,以便再次MoveNext直接break,执行switch后面的逻辑,典型的状态机模式。

//返回调用async2方法的线程,让其继续执行主线程后面的逻辑

1
2
<span>this</span>.&lt;&gt;t__builder.AwaitUnsafeOnCompleted&lt;TaskAwaiter, Program.&lt;Async2&gt;d__2&gt;(<span>ref</span> awaiter, <span>ref</span> <span>this</span><span>);
</span><span>return</span>;

4. 这时就已经有2个线程在跑了,分别是主线程和Task.Run在跑的任务线程。

5. 执行主线程后面逻辑输出aaa,任务线程运行完成后输出bbb、在继续执行任务线程后面的业务逻辑输出ccc。

1
2
3
4
<span>Label_0090: 
awaiter.GetResult();
awaiter </span>= <span>new</span><span> TaskAwaiter();
Console.WriteLine(</span><span>"</span><span>ccc</span><span>"</span>);

这里可以理解为async把整个主线程同步逻辑,分拆成二块。 第一块是在主线程直接执行,第二块是在任务线程完成后执行, 二块中间是任务线程在跑,其源码中awaiter.GetResult()就是在等待任务线程完成后去执行第二块。
从使用者角度来看执行步骤即为: 主线程A逻辑->异步任务线程B逻辑->主线程C逻辑。

复制代码

1
2
3
4
5
6
7
<span>        Test();
Console.WriteLine(</span><span>"</span><span>A逻辑</span><span>"</span><span>);
</span><span>static</span> <span>async</span> <span>void</span><span> Test()
{
</span><span>await</span> Task.Run(() =&gt; { Thread.Sleep(<span>1000</span>); Console.WriteLine(<span>"</span><span>B逻辑</span><span>"</span><span>); });
Console.WriteLine(</span><span>"</span><span>C逻辑</span><span>"</span><span>);
}</span>

复制代码

回过头来对比下基本原理剖析小节中的虚构方法Async4(),发现区别在于一个是完成后回调,一个是等待完成后再执行,这也是实现异步最基本的两大类方式。

重点注意的地方

主线程A逻辑->异步任务线程B逻辑->主线程C逻辑。

注意:这3个步骤是有可能会使用同一个线程的,也可能会使用2个,甚至3个线程。 可以用Thread.CurrentThread.ManagedThreadId测试下得知。

复制代码

1
2
3
4
5
6
7
8
9
10
<span>     Async7();
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
</span><span>static</span> <span>async</span> <span>void</span><span> Async7()
{
</span><span>await</span> Task.Run(() =&gt;<span>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
});
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
}</span>

复制代码

正由于此,才会有言论说Async不用开线程,也有说需要开线程的,从单一方面来讲都是对的,也都是错的。 上面源码是从简分析的,具体async内部会涉及到线程上下文切换,线程复用、调度等。 想深入的同学可以研究下ExecutionContextSwitcher、 SecurityContext.RestoreCurrentWI、ExecutionContext这几个东东。

其实具体的物理线程细节可以不用太关心,知道其【主线程A逻辑->异步任务线程B逻辑->主线程C逻辑】这个基本原理即可。 另外Async也会有线程开销的,所以要合理分业务场景去使用。

总结

从逐渐剖析Async中发现,Net提供的异步方式基本上一脉相承的,如:
1. net4.5的Async,抛去语法糖就是Net4.0的Task+状态机。
2. net4.0的Task, 退化到3.5即是(Thread、ThreadPool)+实现的等待、取消等API操作。

本文以async为起点,简单剖析了其内部原理及实现,希望对大家有所帮助。

手把手搭建K3S+Rancher - 周知非 - 博客园

Excerpt

Kubernetes发展到现在,已经在很多正式的场景落地应用,可以说现阶段使用Kubernetes是比较靠谱也比较轻松的,本文就根据官方文档手把手开始部署Kubernetes+Rancher集群。 架构 Master—192.168.15.252 Slave—192.168.15.251 本地域名—


Kubernetes发展到现在,已经在很多正式的场景落地应用,可以说现阶段使用Kubernetes是比较靠谱也比较轻松的,本文就根据官方文档手把手开始部署Kubernetes+Rancher集群。

架构
Master—192.168.15.252
Slave—192.168.15.251
本地域名—zhou.rancher.com
注意点:Master一定要有免密登录Slave权限!!!

总体思路
K3S部署Kubernetes集群,创建集群的https证书,Helm部署rancher,通过rancher的UI界面手动导入Kubernetes集群,使用Kubernetes集群。

Kubernetes部署
在rancher中文文档中推荐了一种更轻量的Kubernetes集群搭建方式:K3S,安装过程非常简单,只需要服务器能够访问互联网,执行相应的命令就可以了
Master主机执行命令,执行完成后获取master主机的K3S_TOKEN用于slave(默认路径:/var/lib/rancher/k3s/server/node-token)

1
curl -sfL http:<span>//</span><span>rancher-mirror.cnrancher.com/k3s/k3s-install.sh | INSTALL_K3S_MIRROR=cn  INSTALL_K3S_EXEC="--docker" sh -s - server</span>

slave主机执行命令,加入K3S集群

1
curl -sfL http:<span>//</span><span>rancher-mirror.cnrancher.com/k3s/k3s-install.sh | INSTALL_K3S_MIRROR=cn INSTALL_K3S_EXEC="--docker" K3S_URL=</span><span>https://192.168.15.252</span><span>:6443 K3S_TOKEN=K10bb35019b1669197e06f97b6c14bb3b3c7c7076cd20afe1f550d6793d02b9eed8::server:9599c8b3ffbbd38b7721207183cd6a62 sh -</span>

http://rancher-mirror.cnrancher.com/k3s/k3s-install.sh是国内的加速地址,可以正常执行。
执行完毕后,在master服务器上验证是否安装K3S集群成功。

Https私有证书
在rancher中文文档中提供了生成私有证书的方式,执行提供的脚本后,可以获得私有的证书,并需要把证书添加到集群中去
创建证书的空间,并使用脚本生成pem文件

1
kubectl create namespace cattle-system
1
<span>sh</span> ./create_self-signed-cert.<span>sh</span> --ssl-domain=zhou.rancher.com --ssl-trusted-ip=<span>192.168</span>.<span>15.251</span>,<span>192.168</span>.<span>15.252</span> --ssl-size=<span>2048</span> --ssl-<span>date</span>=<span>3650</span>

把生成的pem证书上传到集群中去

1
kubectl -n cattle-system create secret generic tls-ca   --from-<span>file</span>=cacerts.pem=./cacerts.pem

安装rancher
安装rancher之前还需要安装helm工具,解压官网下载的文件压缩包后,拷贝二进制文件到/usr/local/bin/目录,验证是否安装成功

添加rancher的国内的安装模块,并更新helm

1
2
helm repo add rancher-stable http:<span>//</span><span>rancher-mirror.oss-cn-beijing.aliyuncs.com/server-charts/stable</span>
helm repo update

执行helm安装rancher命令

1
helm <span>install</span> rancher rancher-stable/rancher   --namespace cattle-system   --set <span>hostname</span>=zhou.rancher.com   --set ingress.tls.source=secret   --set privateCA=<span>true</span>

验证rancher是否安装成功

补充
由于是在内部搭建的rancher,只能通过修改主机的host来指定域名,还需要额外的两个步骤
添加hostname的解析

复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
kubectl -n cattle-<span>system \
</span><span>patch</span> deployments cattle-cluster-agent --<span>patch</span> <span>'</span><span>{</span>
<span>"</span><span>spec</span><span>"</span><span>: {
</span><span>"</span><span>template</span><span>"</span><span>: {
</span><span>"</span><span>spec</span><span>"</span><span>: {
</span><span>"</span><span>hostAliases</span><span>"</span><span>: [
{
</span><span>"</span><span>hostnames</span><span>"</span><span>:
[
</span><span>"</span><span>zhou.rancher.com</span><span>"</span><span>
],
</span><span>"</span><span>ip</span><span>"</span>: <span>"</span><span>192.168.15.252</span><span>"</span><span>
}
]
}
}
}
}</span>

复制代码

复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
kubectl -n cattle-<span>system \
</span><span>patch</span> daemonsets cattle-node-agent --<span>patch</span> <span>'</span><span>{</span>
<span>"</span><span>spec</span><span>"</span><span>: {
</span><span>"</span><span>template</span><span>"</span><span>: {
</span><span>"</span><span>spec</span><span>"</span><span>: {
</span><span>"</span><span>hostAliases</span><span>"</span><span>: [
{
</span><span>"</span><span>hostnames</span><span>"</span><span>:
[
</span><span>"</span><span>zhou.rancher.com</span><span>"</span><span>
],
</span><span>"</span><span>ip</span><span>"</span>: <span>"</span><span>192.168.15.252</span><span>"</span><span>
}
]
}
}
}
}</span><span>'</span>

复制代码

最后打开我们设定的zhou.rancher.com网站,导入集群