0%

Influx集群解决方案(Influx Proxy篇)_influxdb集群方案-CSDN博客

Excerpt

文章浏览阅读2.1k次,点赞14次,收藏25次。github地址Influx Proxy 是一个基于高可用、一致性哈希的 InfluxDB 集群代理服务,实现了 InfluxDB 高可用集群的部署方案,具有动态扩/缩容、故障恢复、数据同步等能力。连接到 Influx Proxy 和连接原生的 InfluxDB Server 没有显著区别(支持的查询语句列表),对上层客户端是透明的,上层应用可以像使用单机的 InfluxDB 一样使用,Influx Proxy会处理请求的转发,并对各个 InfluxDB 集群节点进行管理。_influxdb集群方案


InFluxDB 集群搭建

本次搭建使用influx proxy

介绍

github地址:https://github.com/chengshiwen/influx-proxy/

Influx Proxy 是一个基于高可用、一致性哈希的 InfluxDB 集群代理服务,实现了 InfluxDB 高可用集群的部署方案,
具有动态扩/缩容、故障恢复、数据同步等能力。连接到 Influx Proxy 和连接原生的 InfluxDB Server 没有显著区别
(支持的查询语句列表),对上层客户端是透明的,上层应用可以像使用单机的 InfluxDB 一样使用,Influx Proxy
会处理请求的转发,并对各个 InfluxDB 集群节点进行管理。Influx Proxy 基于饿了么开源的 Influx-Proxy,
并进一步开发和优化,支持了更多的特性,移除了 Python、Redis 依赖,解决了受限于一个数据库、需要额外配置
KEYMAPS 、数据负载不均衡的问题。

架构说明

  • 在改造我们的系统中我们相当于要实现以下步骤

    在这里插入图片描述

实现步骤

Influx1.8环境

Influx1.8+Influx Proxy +SpringBoot +Ngnix

SpringBoot搭建
  • 引入依赖

    1
    <dependency> <groupId>org.influxdb</groupId> <artifactId>influxdb-java</artifactId> <version>2.6</version> </dependency>
  • java代码(参考https://github.com/influxdata/influxdb-java)

    1
    // Create an object to handle the communication with InfluxDB. // (best practice tip: reuse the 'influxDB' instance when possible) final String serverURL = "http://127.0.0.1:8086", username = "root", password = "root"; final InfluxDB influxDB = InfluxDBFactory.connect(serverURL, username, password); // Create a database... // https://docs.influxdata.com/influxdb/v1.7/query_language/database_management/ String databaseName = "NOAA_water_database"; influxDB.query(new Query("CREATE DATABASE " + databaseName)); influxDB.setDatabase(databaseName); // ... and a retention policy, if necessary. // https://docs.influxdata.com/influxdb/v1.7/query_language/database_management/ String retentionPolicyName = "one_day_only"; influxDB.query(new Query("CREATE RETENTION POLICY " + retentionPolicyName + " ON " + databaseName + " DURATION 1d REPLICATION 1 DEFAULT")); influxDB.setRetentionPolicy(retentionPolicyName); // Enable batch writes to get better performance. influxDB.enableBatch( BatchOptions.DEFAULTS .threadFactory(runnable -> { Thread thread = new Thread(runnable); thread.setDaemon(true); return thread; }) ); // Close it if your application is terminating or you are not using it anymore. Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close)); // Write points to InfluxDB. influxDB.write(Point.measurement("h2o_feet") .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) .tag("location", "santa_monica") .addField("level description", "below 3 feet") .addField("water_level", 2.064d) .build()); influxDB.write(Point.measurement("h2o_feet") .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) .tag("location", "coyote_creek") .addField("level description", "between 6 and 9 feet") .addField("water_level", 8.12d) .build()); // Wait a few seconds in order to let the InfluxDB client // write your points asynchronously (note: you can adjust the // internal time interval if you need via 'enableBatch' call). Thread.sleep(5_000L); // Query your data using InfluxQL. // https://docs.influxdata.com/influxdb/v1.7/query_language/data_exploration/#the-basic-select-statement QueryResult queryResult = influxDB.query(new Query("SELECT * FROM h2o_feet")); System.out.println(queryResult); // It will print something like: // QueryResult [results=[Result [series=[Series [name=h2o_feet, tags=null, // columns=[time, level description, location, water_level], // values=[ // [2020-03-22T20:50:12.929Z, below 3 feet, santa_monica, 2.064], // [2020-03-22T20:50:12.929Z, between 6 and 9 feet, coyote_creek, 8.12] // ]]], error=null]], error=null]
Ngnix搭建(搭建中)

请参考 custom.conf

服务器搭建与数据库部署

使用docker来搭建对应的influxdb信息

本地存在两台服务器(请确保influx1可以访问到influx2 同一局域网)

influx1: 192.168.137.130

influx2: 192.168.137.131

  1. docker-compose代码

    1
    version: "3.5" services: influx-proxy: image: chengshiwen/influx-proxy:latest container_name: influx-proxy ports: - 7076:7076 environment: - TZ=Asia/Shanghai volumes: - ./proxy.json:/etc/influx-proxy/proxy.json restart: unless-stopped networks: - influx_net influxdb-1: image: influxdb:1.8 container_name: influxdb-1 restart: unless-stopped networks: - influx_net volumes: - ./influxdb1/influxdb.conf:/etc/influxdb/influxdb.conf - ./influxdb1/meta:/var/lib/influxdb/meta - ./influxdb1/data:/var/lib/influxdb/data - ./influxdb1/wal:/var/lib/influxdb/wal ports: - "8086:8086" command: ["influxd", "--config", "/etc/influxdb/influxdb.conf"] influxdb-2: image: influxdb:1.8 container_name: influxdb-2 restart: unless-stopped networks: - influx_net volumes: - ./influxdb2/influxdb.conf:/etc/influxdb/influxdb.conf - ./influxdb2/meta:/var/lib/influxdb/meta - ./influxdb2/data:/var/lib/influxdb/data - ./influxdb2/wal:/var/lib/influxdb/wal ports: - "8087:8086" command: ["influxd", "--config", "/etc/influxdb/influxdb.conf"] networks: influx_net:
  2. proxy.json代码

    1
    { "circles": [ { "name": "circle-1", "backends": [ { "name": "influxdb-1-1", "url": "http://192.168.137.130:8086", "username": "", "password": "" }, { "name": "influxdb-1-2", "url": "http://192.168.137.130:8087", "username": "", "password": "" } ] }, { "name": "circle-2", "backends": [ { "name": "influxdb-2-1", "url": "http://192.168.137.131:8086", "username": "", "password": "" }, { "name": "influxdb-2-2", "url": "http://192.168.137.131:8087", "username": "", "password": "" } ] } ], "listen_addr": ":7076", "db_list": [], "data_dir": "data", "tlog_dir": "log", "hash_key": "idx", "flush_size": 10000, "flush_time": 1, "check_interval": 1, "rewrite_interval": 10, "conn_pool_size": 20, "write_timeout": 10, "idle_timeout": 10, "username": "", "password": "", "write_tracing": false, "query_tracing": false, "pprof_enabled": false, "https_enabled": false, "https_cert": "", "https_key": "" }
  3. influx.conf配置

    1
    #这里只放出几处需要修改的 其他按照默认即可 如果生产环境可以考虑把internal禁掉 # Determines whether the Flux query endpoint is enabled. flux-enabled = true (如果需要支持flux语句 请设置为true)
  4. 测试

    1
    curl -XPOST 'http://127.0.0.1:7076/query' --data-urlencode 'q=CREATE DATABASE "testdb"' sudo curl -XPOST 'http://127.0.0.1:7076/api/v2/write?bucket=testdb&precision=s' --data-binary 'mem,host=host1 used_percent=25 1700469476' sudo curl -XPOST 'http://127.0.0.1:7076/api/v2/write?bucket=testdb&precision=s' --data-binary 'mem2,host=host2 used_percent=23 1700469476' sudo curl -XPOST 'http://127.0.0.1:7076/api/v2/write?bucket=testdb&precision=s' --data-binary 'mem3,host=host3 used_percent=24 1700531670' sudo curl -XPOST 'http://127.0.0.1:7076/api/v2/query' \ -H 'Accept:application/csv' \ -H 'Content-type:application/vnd.flux' \ -d 'from(bucket:"testdb") |> range(start:-5m) |> filter(fn:(r) => r._measurement == "mem")'

Influx2.5环境

SpringBoot搭建
  • 引入依赖

    1
    <dependency> <groupId>com.influxdb</groupId> <artifactId>influxdb-client-java</artifactId> <version>6.3.0</version> </dependency>
  • Java代码

    1
    public class influxProxyTest { public static void main(String[] args) { String url = "http://192.168.137.130:7076"; String token = "testinfo"; String org = "admin"; String bucket = "analyse"; InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket); QueryApi queryApi = client.getQueryApi(); String flux = "from(bucket:\"analyse\")\n" + "|> range(start:-5d)\n" + "|> filter(fn:(r) => r._measurement == \"mem\")"; List<FluxTable> list = queryApi.query(flux, org); for (FluxTable fluxTable:list){ List<FluxRecord> records = fluxTable.getRecords(); for (FluxRecord fluxRecord:records){ System.out.println(fluxRecord.getValue()); } } }
Ngnix搭建(实现中)

请参考 custom.conf

服务器搭建与数据库部署

使用docker来搭建对应的influxdb信息

本地存在两台服务器(请确保influx1可以访问到influx2 同一局域网)

influx1: 192.168.137.130

influx2: 192.168.137.131

  1. docker-compose代码

    1
    version: "3.5" services: influx-proxy: image: chengshiwen/influx-proxy:3.0.0-preview container_name: influx-proxy ports: - 7076:7076 environment: - TZ=Asia/Shanghai volumes: - ./proxy.json:/etc/influx-proxy/proxy.json restart: unless-stopped networks: - influx_net influxdb-1: image: influxdb:2.5.1 container_name: influxdb-1 restart: unless-stopped ports: - "8086:8086" networks: - influx_net volumes: - ./influxdb1:/var/lib/influxdb2 influxdb-2: image: influxdb:2.5.1 container_name: influxdb-2 restart: unless-stopped ports: - "8087:8086" networks: - influx_net volumes: - ./influxdb2:/var/lib/influxdb2 networks: influx_net:
  2. proxy.json代码

    1
    { "circles": [ { "name": "circle-1", "backends": [ { "name": "influxdb-1-1", "url": "http://192.168.137.130:8086", "token": "" }, { "name": "influxdb-1-2", "url": "http://192.168.137.130:8087", "token": "" } ] }, { "name": "circle-2", "backends": [ { "name": "influxdb-2-1", "url": "http://192.168.137.131:8086", "token": "" }, { "name": "influxdb-2-2", "url": "http://192.168.137.131:8087", "token": "" } ] } ], "dbrp": { "separator": "/", "mapping": {"mydb": "admin/analyse", "mydb/myrp": "admin/analyse"} }, "listen_addr": ":7076", "data_dir": "data", "flush_size": 10000, "flush_time": 1, "check_interval": 1, "rewrite_interval": 10, "conn_pool_size": 20, "write_timeout": 10, "write_tracing": false, "query_tracing": false, "token": "", "pprof_enabled": false, "https_enabled": false, "https_cert": "", "https_key": "" }
  3. 测试

    1
    数据写入 sudo curl -XPOST 'http://192.168.137.130:7076/api/v2/write?org=admin&bucket=analyse&precision=s' --data-binary 'mem,host=host3 used_percent=241 1700531671'

InfluxDB简介:

  1. InfluxDB 是一个开源分布式时序、事件和指标数据库。使用Go语言编写,无需外部依赖。其设计目标是实现分布式和水平伸缩扩展。
           它有三大特性:
           1. Time Series (时间序列):你可以使用与时间有关的相关函数(如最大,最小,求和等);
           2. Metrics(度量):你可以实时对大量数据进行计算;
           3. Eevents(事件):它支持任意的事件数据。
           详细请参考官网:https://influxdata.com/

InfluxDB 安装

  1. 下载地址,

    64bit:https://dl.influxdata.com/influxdb/releases/influxdb-1.7.4_windows_amd64.zip

    chronograf:https://dl.influxdata.com/chronograf/releases/chronograf-1.7.8_windows_amd64.zip

        2.解压安装包

         

修改配置文件

       InfluxDB 的数据存储主要有三个目录。默认情况下是 meta, wal 以及 data 三个目录,服务器运行后会自动生成。

meta 用于存储数据库的一些元数据,meta 目录下有一个 meta.db 文件。

wal 目录存放预写日志文件,以 .wal 结尾。

data 目录存放实际存储的数据文件,以 .tsm 结尾。

如果不使用influxdb.conf配置的话,那么直接双击打开influxd.exe就可以使用influx,此时上面三个文件夹的目录则存放在Windows系统的C盘User目录下的.Influx目录下,默认端口为8086,以下为修改文件夹地址,以及端口号方法。

1.修改以下部分的路径

 

2. 如果需要更改端口号,则修改以下部分配置

 

3. 修改配置后启动方式

InfluxDB 使用时需要首先打开Influxd.exe,直接打开会使用默认配置,需要使用已配置的配置文件的话,需要指定conf文件进行启动,启动命令如下:

influxd.exe -config influxdb.conf(cmd目录为influxDB目录)

启动可写成bat文件,内容如下:

 

打开成功画面:

 

Influxd成功启动后,即可打开influx.exe,若使用默认配置,则直接打开即可,使用配置文件的情况下,在cmd中输入influx命令(cmd目录为influxDB目录),启动可写成bat文件,文件内容如下:

 

-port是使用特定port号启动

启动成功画面显示如下:

 

备注:运行influx.exe 时,influxd.exe不可关闭

配置文件具体内容详解:

官方介绍:https://docs.influxdata.com/influxdb/v1.2/administration/config/

转自:https://www.cnblogs.com/guyeshanrenshiwoshifu/p/9188368.html

 全局配置

1

2

reporting-disabled = false  # 该选项用于上报influxdb的使用信息给InfluxData公司,默认值为``false

bind-address = ":8088"  # 备份恢复时使用,默认值为8088

1、meta相关配置

1

2

3

4

[meta]

dir = "/var/lib/influxdb/meta"  # meta数据存放目录

retention-autocreate = true  # 用于控制默认存储策略,数据库创建时,会自动生成autogen的存储策略,默认值:``true

logging-enabled = true  # 是否开启meta日志,默认值:``true

2、data相关配置

1

2

3

4

5

6

7

8

9

10

[data]

dir = "/var/lib/influxdb/data"  # 最终数据(TSM文件)存储目录

wal-dir = "/var/lib/influxdb/wal"  # 预写日志存储目录

query-log-enabled = true  # 是否开启tsm引擎查询日志,默认值: true

cache-max-memory-size = 1048576000  # 用于限定shard最大值,大于该值时会拒绝写入,默认值:1000MB,单位:``byte

cache-snapshot-memory-size = 26214400  # 用于设置快照大小,大于该值时数据会刷新到tsm文件,默认值:25MB,单位:``byte

cache-snapshot-write-cold-duration = "10m"  # tsm引擎 snapshot写盘延迟,默认值:10Minute

compact-full-write-cold-duration = "4h"  # tsm文件在压缩前可以存储的最大时间,默认值:4Hour

max-series-per-database = 1000000  # 限制数据库的级数,该值为0时取消限制,默认值:1000000

max-values-per-tag = 100000  # 一个tag最大的value数,0取消限制,默认值:100000

3、coordinator查询管理的配置选项

1

2

3

4

5

6

7

8

[coordinator]

write-timeout = "10s"  # 写操作超时时间,默认值: 10s

max-concurrent-queries = 0  # 最大并发查询数,0无限制,默认值: 0

query-timeout = "0s  # 查询操作超时时间,0无限制,默认值:0s

log-queries-after = "0s"  # 慢查询超时时间,0无限制,默认值:0s

max-``select``-point = 0  # SELECT语句可以处理的最大点数(points),0无限制,默认值:0

max-``select``-series = 0  # SELECT语句可以处理的最大级数(series),0无限制,默认值:0

max-``select``-buckets = 0  # SELECT语句可以处理的最大``"GROUP BY time()"``的时间周期,0无限制,默认值:0

4、retention旧数据的保留策略

1

2

3

[retention]

enabled = true  # 是否启用该模块,默认值 : true

check-interval = "30m"  # 检查时间间隔,默认值 :``"30m"

5、shard-precreation分区预创建

1

2

3

4

[shard-precreation]

enabled = true  # 是否启用该模块,默认值 : true

check-interval = "10m"  # 检查时间间隔,默认值 :``"10m"

advance-period = "30m"  # 预创建分区的最大提前时间,默认值 :``"30m"

6、monitor 控制InfluxDB自有的监控系统。 默认情况下,InfluxDB把这些数据写入_internal 数据库,如果这个库不存在则自动创建。 _internal 库默认的retention策略是7天,如果你想使用一个自己的retention策略,需要自己创建。

1

2

3

4

[monitor]

store-enabled = true  # 是否启用该模块,默认值 :``true

store-database = "_internal"  # 默认数据库:``"_internal"

store-interval = "10s  # 统计间隔,默认值:"``10s"

7、admin web管理页面

1

2

3

4

5

[admin]

enabled = true  # 是否启用该模块,默认值 : false

bind-address = ":8083"  # 绑定地址,默认值 :``":8083"

https-enabled = false  # 是否开启https ,默认值 :``false

https-certificate = "/etc/ssl/influxdb.pem"  # https证书路径,默认值:``"/etc/ssl/influxdb.pem"

8、http API

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

[http]

enabled = true  # 是否启用该模块,默认值 :``true

bind-address = ":8086"  # 绑定地址,默认值:``":8086"

auth-enabled = false  # 是否开启认证,默认值:``false

realm = "InfluxDB"  # 配置JWT realm,默认值: "InfluxDB"

log-enabled = true  # 是否开启日志,默认值:``true

write-tracing = false  # 是否开启写操作日志,如果置成``true``,每一次写操作都会打日志,默认值:``false

pprof-enabled = true  # 是否开启pprof,默认值:``true

https-enabled = false  # 是否开启https,默认值:``false

https-certificate = "/etc/ssl/influxdb.pem"  # 设置https证书路径,默认值:``"/etc/ssl/influxdb.pem"

https-``private``-key = ""  # 设置https私钥,无默认值

shared-secret = ""  # 用于JWT签名的共享密钥,无默认值

max-row-limit = 0  # 配置查询返回最大行数,0无限制,默认值:0

max-connection-limit = 0  # 配置最大连接数,0无限制,默认值:0

unix-socket-enabled = false  # 是否使用unix-socket,默认值:``false

bind-socket = "/var/run/influxdb.sock"  # unix-socket路径,默认值:``"/var/run/influxdb.sock"

9、subscriber 控制Kapacitor接受数据的配置

1

2

3

4

5

6

7

[subscriber]

enabled = true  # 是否启用该模块,默认值 :``true

http-timeout = "30s"  # http超时时间,默认值:``"30s"

insecure-skip-verify = false  # 是否允许不安全的证书

ca-certs = ""  # 设置CA证书

write-concurrency = 40  # 设置并发数目,默认值:40

write-buffer-size = 1000  # 设置buffer大小,默认值:1000

10、graphite 相关配置

1

2

3

4

5

6

7

8

9

10

11

12

[[graphite]]

enabled = false  # 是否启用该模块,默认值 :``false

database = "graphite"  # 数据库名称,默认值:``"graphite"

retention-policy = ""  # 存储策略,无默认值

bind-address = ":2003"  # 绑定地址,默认值:``":2003"

protocol = "tcp"  # 协议,默认值:``"tcp"

consistency-level = "one"  # 一致性级别,默认值:"one

batch-size = 5000  # 批量size,默认值:5000

batch-pending = 10  # 配置在内存中等待的batch数,默认值:10

batch-timeout = "1s"  # 超时时间,默认值:``"1s"

udp-read-buffer = 0  # udp读取buffer的大小,0表示使用操作系统提供的值,如果超过操作系统的默认配置则会出错。 该配置的默认值:0

separator = "."  # 多个measurement间的连接符,默认值: "."

11、collectd

1

2

3

4

5

6

7

8

9

10

11

[[collectd]]

enabled = false  # 是否启用该模块,默认值 :``false

bind-address = ":25826"  # 绑定地址,默认值: ":25826"

database = "collectd"  # 数据库名称,默认值:``"collectd"

retention-policy = ""  # 存储策略,无默认值

typesdb = "/usr/local/share/collectd"  # 路径,默认值:``"/usr/share/collectd/types.db"

auth-file = "/etc/collectd/auth_file"

batch-size = 5000

batch-pending = 10

batch-timeout = "10s"

read-buffer = 0  # udp读取buffer的大小,0表示使用操作系统提供的值,如果超过操作系统的默认配置则会出错。默认值:0

12、opentsdb

1

2

3

4

5

6

7

8

9

10

11

12

[[opentsdb]]

enabled = false  # 是否启用该模块,默认值:``false

bind-address = ":4242"  # 绑定地址,默认值:``":4242"

database = "opentsdb"  # 默认数据库:``"opentsdb"

retention-policy = ""  # 存储策略,无默认值

consistency-level = "one"  # 一致性级别,默认值:``"one"

tls-enabled = false  # 是否开启tls,默认值:``false

certificate= "/etc/ssl/influxdb.pem"  # 证书路径,默认值:``"/etc/ssl/influxdb.pem"

log-point-errors = true  # 出错时是否记录日志,默认值:``true

batch-size = 1000

batch-pending = 5

batch-timeout = "1s"

13、udp

1

2

3

4

5

6

7

8

9

[[udp]]

enabled = false  # 是否启用该模块,默认值:``false

bind-address = ":8089"  # 绑定地址,默认值:``":8089"

database = "udp"  # 数据库名称,默认值:``"udp"

retention-policy = ""  # 存储策略,无默认值

batch-size = 5000

batch-pending = 10

batch-timeout = "1s"

read-buffer = 0  # udp读取buffer的大小,0表示使用操作系统提供的值,如果超过操作系统的默认配置则会出错。 该配置的默认值:0

14、continuous_queries

1

2

3

4

[continuous_queries]

enabled = true  # enabled 是否开启CQs,默认值:``true

log-enabled = true  # 是否开启日志,默认值:``true

run-interval = "1s"  # 时间间隔,默认值:``"1s"

InfluxDB数据库常用命令

   1、显示所有数据库

       show databases

  

2、 创建数据库

   create database test

 

3、 使用某个数据库

use test

 

4、 显示所有表

show measurements

 

没有表则无返回。

 

5、新建表和插入数据

新建表没有具体的语法,只是增加第一条数据时,会自动建立表

insert results,hostname=index1 value=1

 

这里的时间看不懂,可以设置一下时间显示格式

precision rfc3339

 

6、 查询数据

表名有点号时,输入双引号

 

和sql语法相同,区别:

measurement 数据库中的表

points 表里面的一行数据,Point由时间戳(time)、数据(field)、标签(tags)组成。

7、 用户显示

a. 显示所有用户

show users

 

b.新增用户

_--_普通用户

create user “user” with password ‘user’

_--_管理员用户

create user “admin” with password ‘admin’ with all privileges

 

c.删除用户

drop user “user”

 

Chronograf 使用

1、解压文件后,直接进入安装目录,执行chronograf.exe后;

2、输入:http://localhost:8888(chronograf默认是8888端口)

3、influxDB数据源连接

 

InfluxDB基本概念

1、数据格式

在 InfluxDB 中,我们可以粗略的将要存入的一条数据看作**一个虚拟的 key 和其对应的 value(field value)**。格式如下:

1

cpu_usage,host``=``server01,region``=``us``-``west value``=``0.64 1434055562000000000

虚拟的 key 包括以下几个部分: database, retention policy, measurement, tag sets, field name, timestamp。

  • database: 数据库名,在 InfluxDB 中可以创建多个数据库,不同数据库中的数据文件是隔离存放的,存放在磁盘上的不同目录。
  • retention policy: 存储策略,用于设置数据保留的时间,每个数据库刚开始会自动创建一个默认的存储策略 autogen,数据保留时间为永久,之后用户可以自己设置,例如保留最近2小时的数据。插入和查询数据时如果不指定存储策略,则使用默认存储策略,且默认存储策略可以修改。InfluxDB 会定期清除过期的数据。
  • measurement: 测量指标名,例如 cpu_usage 表示 cpu 的使用率。
  • tag sets: tags 在 InfluxDB 中会按照字典序排序,不管是 tagk 还是 tagv,只要不一致就分别属于两个 key,例如 host=server01,region=us-west 和 host=server02,region=us-west 就是两个不同的 tag set。
  • tag–标签,在InfluxDB中,tag是一个非常重要的部分,表名+tag一起作为数据库的索引,是“key-value”的形式。
  • field name: 例如上面数据中的 value 就是 fieldName,InfluxDB 中支持一条数据中插入多个 fieldName,这其实是一个语法上的优化,在实际的底层存储中,是当作多条数据来存储。
  • timestamp: 每一条数据都需要指定一个时间戳,在 TSM 存储引擎中会特殊对待,以为了优化后续的查询操作。

2、与传统数据库中的名词做比较

influxDB中的名词

传统数据库中的概念

database

数据库

measurement

数据库中的表

points

表里面的一行数据

3、Point

Point由时间戳(time)、数据(field)、标签(tags)组成。

Point相当于传统数据库里的一行数据,如下表所示:

Point属性

传统数据库中的概念

time

每个数据记录时间,是数据库中的主索引(会自动生成)

fields

各种记录值(没有索引的属性)

tags

各种有索引的属性

4、Series

Series 相当于是 InfluxDB 中一些数据的集合,在同一个 database 中,retention policy、measurement、tag sets 完全相同的数据同属于一个 series,同一个 series 的数据在物理上会按照时间顺序排列存储在一起。

5、Shard

Shard 在 InfluxDB 中是一个比较重要的概念,它和 retention policy 相关联。每一个存储策略下会存在许多 shard,每一个 shard 存储一个指定时间段内的数据,并且不重复,例如 7点-8点 的数据落入 shard0 中,8点-9点的数据则落入 shard1 中。每一个 shard 都对应一个底层的 tsm 存储引擎,有独立的 cache、wal、tsm file。

6、组件

TSM 存储引擎主要由几个部分组成: cache、wal、tsm file、compactor。

1)Cache:cache 相当于是 LSM Tree 中的 memtabl。插入数据时,实际上是同时往 cache 与 wal 中写入数据,可以认为 cache 是 wal 文件中的数据在内存中的缓存。当 InfluxDB 启动时,会遍历所有的 wal 文件,重新构造 cache,这样即使系统出现故障,也不会导致数据的丢失。

cache 中的数据并不是无限增长的,有一个 maxSize 参数用于控制当 cache 中的数据占用多少内存后就会将数据写入 tsm 文件。如果不配置的话,默认上限为 25MB,每当 cache 中的数据达到阀值后,会将当前的 cache 进行一次快照,之后清空当前 cache 中的内容,再创建一个新的 wal 文件用于写入,剩下的 wal 文件最后会被删除,快照中的数据会经过排序写入一个新的 tsm 文件中。

2)WAL:wal 文件的内容与内存中的 cache 相同,其作用就是为了持久化数据,当系统崩溃后可以通过 wal 文件恢复还没有写入到 tsm 文件中的数据。

3)TSM File:单个 tsm file 大小最大为 2GB,用于存放数据。

4)Compactor:compactor 组件在后台持续运行,每隔 1 秒会检查一次是否有需要压缩合并的数据。

主要进行两种操作,一种是 cache 中的数据大小达到阀值后,进行快照,之后转存到一个新的 tsm 文件中。

另外一种就是合并当前的 tsm 文件,将多个小的 tsm 文件合并成一个,使每一个文件尽量达到单个文件的最大大小,减少文件的数量,并且一些数据的删除操作也是在这个时候完成。

7、目录与文件结构

InfluxDB 的数据存储主要有三个目录。默认情况下是 meta, wal 以及 data 三个目录。

meta 用于存储数据库的一些元数据,meta 目录下有一个 meta.db 文件。

wal 目录存放预写日志文件,以 .wal 结尾。

data 目录存放实际存储的数据文件,以 .tsm 结尾。

上面几张图中,_internal为数据库名,monitor为存储策略名称,再下一层目录中的以数字命名的目录是 shard 的 ID 值。

存储策略下有两个 shard,ID 分别为 1 和 2,shard 存储了某一个时间段范围内的数据。再下一级的目录则为具体的文件,分别是 .wal 和 .tsm 结尾的文件。

InfluxDB基本操作

InfluxDB提供多种操作方式:

1)客户端命令行方式

2)HTTP API接口

3)各语言API库

4)基于WEB管理页面操作

客户端命令行方式操作

进入命令行

1

influx -``precision rfc3339

1、InfluxDB数据库操作

  • 显示数据库

  • 新建数据库

1

create database shhnwangjian

  • 删除数据库

1

drop database shhnwangjian

  • 使用指定数据库

2、InfluxDB数据表操作

在InfluxDB当中,并没有表(table)这个概念,取而代之的是MEASUREMENTS,MEASUREMENTS的功能与传统数据库中的表一致,因此我们也可以将MEASUREMENTS称为InfluxDB中的表。

  • 显示所有表

  • 新建表

InfluxDB中没有显式的新建表的语句,只能通过insert数据的方式来建立新表。

1

insert disk_free,hostname``=``server01 value``=``442221834240i

其中 disk_free 就是表名,hostname是索引(tag),value=xx是记录值(field),记录值可以有多个,系统自带追加时间戳

或者添加数据时,自己写入时间戳

1

insert disk_free,hostname``=``server01 value``=``442221834240i 1435362189575692182

  • 删除表

1

drop measurement disk_free

3、数据保存策略(Retention Policies)

influxDB是没有提供直接删除数据记录的方法,但是提供数据保存策略,主要用于指定数据保留时间,超过指定时间,就删除这部分数据。

  • 查看当前数据库Retention Policies

1

show retention policies on "db_name"

  • 创建新的Retention Policies

1

create retention policy "rp_name" on "db_name" duration 3w replication 1 default

rp_name:策略名;

db_name:具体的数据库名;

3w:保存3周,3周之前的数据将被删除,influxdb具有各种事件参数,比如:h(小时),d(天),w(星期);

replication 1:副本个数,一般为1就可以了;

default:设置为默认策略

  • 修改Retention Policies

1

alter retention policy "rp_name" on "db_name" duration 30d default

  • 删除Retention Policies

1

drop retention policy "rp_name" on "db_name"

4、连续查询(Continuous Queries)

InfluxDB的连续查询是在数据库中自动定时启动的一组语句,语句中必须包含 SELECT 关键词和 GROUP BY time() 关键词。

InfluxDB会将查询结果放在指定的数据表中。

目的:使用连续查询是最优的降低采样率的方式,连续查询和存储策略搭配使用将会大大降低InfluxDB的系统占用量。而且使用连续查询后,数据会存放到指定的数据表中,这样就为以后统计不同精度的数据提供了方便。

  • 新建连续查询

1

2

3

4

5

CREATE CONTINUOUS QUERY <cq_name> ON <database_name>

[RESAMPLE [EVERY <interval>] [FOR <interval>]]

BEGIN SELECT <function>(<stuff>)[,<function>(<stuff>)] INTO <different_measurement>

FROM <current_measurement> [WHERE <stuff>] GROUP BY time(<interval>)[,<stuff>]

END

样例:

1

CREATE CONTINUOUS QUERY wj_30m ON shhnwangjian BEGIN SELECT mean(connected_clients), MEDIAN(connected_clients), MAX``(connected_clients), MIN``(connected_clients) INTO redis_clients_30m FROM redis_clients GROUP BY ip,port,time(``30m``) END

在shhnwangjian库中新建了一个名为 wj_30m 的连续查询,每三十分钟取一个connected_clients字段的平均值、中位值、最大值、最小值 redis_clients_30m 表中。使用的数据保留策略都是 default。

不同database样例:

1

CREATE CONTINUOUS QUERY wj_30m ON shhnwangjian_30 BEGIN SELECT mean(connected_clients), MEDIAN(connected_clients), MAX``(connected_clients), MIN``(connected_clients) INTO shhnwangjian_30.autogen.redis_clients_30m FROM shhnwangjian.autogen.redis_clients GROUP BY ip,port,time(``30m``) END

  • 显示所有已存在的连续查询

  • 删除Continuous Queries

1

DROP CONTINUOUS QUERY <cq_name> ON <database_name>

参考文章:

http://blog.fatedier.com/2016/08/05/detailed-in-influxdb-tsm-storage-engine-one/

http://www.linuxdaxue.com/noun-interpretation-of-influxdb.html

IdentityServer4:IdentityServer4+API+Client实践OAuth2.0客户端模式(1) - huoit - 博客园

Excerpt

一、OAuth2.0 1、OAuth2.0概念 OAuth2.0(Open Authorization)是一个开放授权协议;第三方应用不需要接触到用户的账户信息(如用户名密码),通过用户的授权访问用户资源 OAuth的步骤一般如下: 1、客户端要求用户给予授权2、用户同意给予授权3、根据上一步获得的


  huoit  阅读(1137)  评论()  编辑  收藏  举报

一、OAuth2.0

1、OAuth2.0概念

OAuth2.0(Open Authorization)是一个开放授权协议;第三方应用不需要接触到用户的账户信息(如用户名密码),通过用户的授权访问用户资源

OAuth的步骤一般如下:

1、客户端要求用户给予授权
2、用户同意给予授权
3、根据上一步获得的授权,向认证服务器请求令牌(token)
4、认证服务器对授权进行认证,确认无误后发放令牌
5、客户端使用令牌向资源服务器请求资源
6、资源服务器使用令牌向认证服务器确认令牌的正确性,确认无误后提供资源

该协议的参与者至少包含:

RO (resource owner): 资源所有者:用户。

RS (resource server): 资源服务器:数据中心;它存储资源,并处理对资源的访问请求。如:API资源,相册服务器、博客服务器。

AS (authorization server): 授权服务器

Client: 第三方应用

2、授权模式

四种模式:

1、授权码模式(authorization code)
2、简化模式(implicit)
3、密码模式(resource owner password credentials)
4、客户端模式(client credentials)

二、IdentityServer + API+Client演示客户端模式

客户端模式(ClientCredentials):经常运用于服务器对服务器中间通讯使用;步骤如下:

1、客户端直接用自身的信息向授权服务器请求token:

HTTP请求:

granttype:授权类型

scope:授权范围

复制代码

1
2
3
4
5
6
POST /token HTTP/<span>1.1</span><span>
Host: server.example.com
Authorization: Basic czZCaGRSa3F0MzpnWDFmQmF0M2JW
Content</span>-Type: application/x-www-form-<span>urlencoded

grant_type</span>=client_credentials&amp;scope=api001

复制代码

2、授权服务器验证信息后返回token

复制代码

1
2
3
4
5
6
7
8
9
10
11
HTTP/<span>1.1</span> <span>200</span><span> OK
Content</span>-Type: application/json;charset=UTF-<span>8</span><span>
Cache</span>-Control: no-<span>store
Pragma: no</span>-<span>cache

{
</span><span>"</span><span>access_token</span><span>"</span>:<span>"</span><span>2YotnFZFEjr1zCsicMWpAA</span><span>"</span><span>,
</span><span>"</span><span>token_type</span><span>"</span>:<span>"</span><span>example</span><span>"</span><span>,
</span><span>"</span><span>expires_in</span><span>"</span>:<span>3600</span><span>,
</span><span>"</span><span>example_parameter</span><span>"</span>:<span>"</span><span>example_value</span><span>"</span><span>
}</span>

复制代码

下面通过一个快速示例理解;快速示例将通过服务器与服务器直接通过api访问数据;

1、授权服务端;

这里将通过IdnetityServer4实现一个标准的Oauth2.0协议的服务端;

引用IdentityServer4包

新建ASP.NET Core Web Application ——Empty项目;这里通过程序包控制台添加IdentityServer4引用包

Install-Package IdentityServer4

定义API资源、定义客户端

新建类Config.cs;定义资源Scopes、Client;

复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<span>using</span><span> IdentityServer4.Models;
</span><span>using</span><span> System;
</span><span>using</span><span> System.Collections.Generic;
</span><span>using</span><span> System.Linq;
</span><span>using</span><span> System.Threading.Tasks;

</span><span>namespace</span><span> Practice.IdentityServer
{
</span><span>public</span> <span>class</span><span> Config
{
</span><span>//</span><span>scopes定义</span>
<span>public</span> <span>static</span> IEnumerable&lt;ApiResource&gt;<span> GetApiResource()
{
</span><span>return</span> <span>new</span> List&lt;ApiResource&gt;<span>
{
</span><span>//</span><span>给api资源定义一个scopes</span>
<span>new</span> ApiResource(<span>"</span><span>api1</span><span>"</span>,<span>"</span><span>my api</span><span>"</span><span>)
};

}

</span><span>//</span><span>客户端注册,客户端能够访问的资源(通过:AllowedScopes)</span>
<span>public</span> <span>static</span> IEnumerable&lt;Client&gt;<span> GetClient()
{
</span><span>return</span> <span>new</span> List&lt;Client&gt;<span>
{
</span><span>new</span><span> Client
{
ClientId</span>=<span>"</span><span>client</span><span>"</span><span>,
AllowedGrantTypes</span>=<span>GrantTypes.ClientCredentials,
ClientSecrets</span>={<span>new</span> Secret(<span>"</span><span>secrect</span><span>"</span><span>.Sha256())},
AllowedScopes</span>={<span>"</span><span>api</span><span>"</span><span>}
}
};
}
}
}</span>

复制代码

把资源和客户端、存储方式、添加到service container(DI system)

复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<span>using</span><span> System;
</span><span>using</span><span> System.Collections.Generic;
</span><span>using</span><span> System.Linq;
</span><span>using</span><span> System.Threading.Tasks;
</span><span>using</span><span> Microsoft.AspNetCore.Builder;
</span><span>using</span><span> Microsoft.AspNetCore.Hosting;
</span><span>using</span><span> Microsoft.AspNetCore.Http;
</span><span>using</span><span> Microsoft.Extensions.DependencyInjection;
</span><span>using</span><span> Microsoft.Extensions.Logging;

</span><span>namespace</span><span> Practice.IdentityServer
{
</span><span>public</span> <span>class</span><span> Startup
{
</span><span>//</span><span> 添加服务到容器(add services to the container)DI系统.
</span>
<span>public</span> <span>void</span><span> ConfigureServices(IServiceCollection services)
{
services.AddIdentityServer()
.AddTemporarySigningCredential()
.AddInMemoryApiResources(Config.GetApiResource())
.AddInMemoryClients(Config.GetClient());
}

</span><span>//</span><span>配置HTTP request 管道(pipeline).</span>
<span>public</span> <span>void</span><span> Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole(LogLevel.Debug);

</span><span>if</span><span> (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}

app.UseIdentityServer();

</span><span>//</span><span>app.Run(async (context) =&gt;
</span><span>//</span><span>{
</span><span>//</span><span> await context.Response.WriteAsync("Hello World!");
</span><span>//</span><span>});</span>
<span> }
}
}</span>

复制代码

配置

注意:使用自宿的方式调试;会把日志输出到控制台;自宿的配置方式:

方法1:

方法2:

 配置地址:

在program.cs添加一句:.UseUrls(“http://localhost:5000“) 设置调试url;

运行

运行、通过http://localhost:5000/.well-known/openid-configuration访问 ;可以看到是一个restful的api;

2、API资源

 新建ASP.NET Core Web API 项目;添加中间件IdentityServer4.AccessTokenValidation 包引用

配置api的地址

添加控制器

复制代码

1
2
3
4
5
6
7
8
9
10
[Route(<span>"</span><span>identity</span><span>"</span><span>)]
[Authorize]
</span><span>public</span> <span>class</span><span> IdentityController : Controller
{
[HttpGet]
</span><span>public</span><span> IActionResult Get()
{
</span><span>return</span> <span>new</span> JsonResult(<span>from</span> a <span>in</span> User.Claims <span>select</span> <span>new</span><span> { a.Type,a.Value});
}
}</span>

复制代码

配置

把授权中间件配置到api host里;IdentityServer4.AccessTokenValidation这里的主要作用

1、验证token令牌,确保token令牌的Issuer发行者是经过注册认证可信任的发行者;

2、验证token令牌,确保这个令牌的授权范围(scope)包括授权使用这个api

复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    <span>public</span> <span>class</span><span> Startup
{
</span><span>public</span><span> Startup(IHostingEnvironment env)
{
</span><span>var</span> builder = <span>new</span><span> ConfigurationBuilder()
.SetBasePath(env.ContentRootPath)
.AddJsonFile(</span><span>"</span><span>appsettings.json</span><span>"</span>, optional: <span>false</span>, reloadOnChange: <span>true</span><span>)
.AddJsonFile($</span><span>"</span><span>appsettings.{env.EnvironmentName}.json</span><span>"</span>, optional: <span>true</span><span>)
.AddEnvironmentVariables();
Configuration </span>=<span> builder.Build();
}

</span><span>public</span> IConfigurationRoot Configuration { <span>get</span><span>; }

</span><span>//</span><span> This method gets called by the runtime. Use this method to add services to the container.</span>
<span>public</span> <span>void</span><span> ConfigureServices(IServiceCollection services)
{
</span><span>//</span><span> Add framework services.</span>
<span> services.AddMvcCore()
.AddAuthorization()
.AddJsonFormatters();
}

</span><span>//</span><span> This method gets called by the runtime. Use this method to configure the HTTP request pipeline.</span>
<span>public</span> <span>void</span><span> Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory)
{
loggerFactory.AddConsole(Configuration.GetSection(</span><span>"</span><span>Logging</span><span>"</span><span>));
loggerFactory.AddDebug();
app.UseIdentityServerAuthentication(</span><span>new</span><span> IdentityServerAuthenticationOptions {
Authority </span>= <span>"</span><span>http://localhost:5000</span><span>"</span><span>,
RequireHttpsMetadata</span>=<span>false</span><span>,
ApiName</span>=<span>"</span><span>api1</span><span>"</span><span>
});
app.UseMvc();
}
}</span>

复制代码

运行后,直接浏览器访问http://localhost:5001/identity会被拒绝说明成功;访问这个api需要在http请求的header加入token才可以访问;

3、Client客户端

新建.Net Core——控制台应用;添加中间件

 IdentityModel是官方提供给我们的一个Client类库;当然用户也可以自行构建原始的http协议访问API;

复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<span>public</span> <span>class</span><span> Program
{
</span><span>public</span> <span>static</span> <span>void</span> Main(<span>string</span>[] args) =&gt;<span> MainAsync().GetAwaiter().GetResult();


</span><span>private</span> <span>static</span> <span>async</span><span> Task MainAsync()
{
</span><span>//
</span> <span>var</span> dico = <span>await</span> DiscoveryClient.GetAsync(<span>"</span><span>http://localhost:5000</span><span>"</span><span>);

</span><span>//</span><span>token</span>
<span>var</span> tokenClient = <span>new</span> TokenClient(dico.TokenEndpoint, <span>"</span><span>client</span><span>"</span>, <span>"</span><span>secret</span><span>"</span><span>);
</span><span>var</span> tokenresp = <span>await</span> tokenClient.RequestClientCredentialsAsync(<span>"</span><span>api1</span><span>"</span><span>);
</span><span>if</span><span> (tokenresp.IsError)
{
Console.WriteLine(tokenresp.Error);
</span><span>return</span><span>;

}

Console.WriteLine(tokenresp.Json);
Console.WriteLine(</span><span>"</span><span>\n\n</span><span>"</span><span>);


</span><span>var</span> client = <span>new</span><span> HttpClient();
client.SetBearerToken(tokenresp.AccessToken);

</span><span>var</span> resp = <span>await</span> client.GetAsync(<span>"</span><span>http://localhost:5000/identity</span><span>"</span><span>);
</span><span>if</span> (!<span>resp.IsSuccessStatusCode)
{
Console.WriteLine(resp.StatusCode);
}
</span><span>else</span><span>
{
</span><span>var</span> content = <span>await</span><span> resp.Content.ReadAsStringAsync();
Console.WriteLine(JArray.Parse(content));
}


}
}</span>

复制代码

DiscoveryClient类:IdentityModel提供给我们通过基础地址(如:http://localhost:5000)就可以访问令牌服务端;当然可以根据上面的restful api里面的url自行构建;上面就是通过基础地址,获取一个TokenClient;(对应restful的url:token_endpoint   “http://localhost:5000/connect/token")

RequestClientCredentialsAsync方法:请求令牌;

获取令牌后,就可以通过构建http请求访问API接口;这里使用HttpClient构建请求,获取内容;

运行效果:

我们换一种原始的方式模拟这个流程

打开Postman:按照restful api页面的说明,依次进行下面的步骤操作,一个很原始的http流程就熟悉了;自行查看原图

资料:

 http://wiki.connect.qq.com/%E4%BD%BF%E7%94%A8implicit\_grant%E6%96%B9%E5%BC%8F%E8%8E%B7%E5%8F%96access\_token

上一篇:《IDDD 实现领域驱动设计-SOA、REST 和六边形架构

阅读目录:

  1. CQRS-命令查询职责分离

  2. EDA-事件驱动架构

    1. Domin Event-领域事件

    2. Long-Running Process(Saga)-长时处理过程

    3. Event Sourcing-事件溯源

  3. CQRS Journey-微软示例项目

  4. ENode-netfocus 实践项目

存在即是理由,每一种架构的产生都会有一种特定的场景,或者解决某一种实际应用问题,经验的累积促成了某一种架构的产生。

1. CQRS-命令查询职责分离

说明:本图摘自 MSDN

CQRS(Command & Query Responsibility Segregation)命令查询职责分离,和 REST 同属于架构风格,如果单纯理解 CQRS,是比较容易的,另一种方式解释就是,一个方法要么是执行某种动作的命令,要么是返回数据的查询,命令的体现是对系统状态的修改,而查询则不会,职责的分离更加有利于领域模型的提炼,系统的灵活性和可扩展性也得到进一步加强。

为什么要进行命令和查询职责分离?

如果你有时间,可以先阅读下上面几篇博文及相关评论。

我们都知道 Repository 的职责就是管理聚合根(Aggregate)对象,一般是一一对应关系,领域层中的业务逻辑要对某种聚合根对象进行操作,必须要通过 Repository,而应用层接受用户请求获取数据对象显示,也必须要通过 Repository 进行聚合根对象转换,这个一般没有涉及到领域业务操作,仅仅只是获取聚合根对象数据。领域层中的业务逻辑要求 Repository 实现对聚合根状态的管理,所以我们一般会在领域层 IRepository 接口中定义 Add、Update、GetById 等方法,然后在基础设施层中的 Repository 进行实现,而来自应用层的要求,需要获取聚合根对象数据,所以在 Repository 中还需要添加一些 GetList 等操作,而根据 IRepository 的接口契约,返回的类型必须是聚合根,而在这种场景中,是不需要获取聚合根对象的,只需要获取数据(DTO)就可以了。。。

我大致列一下上面描述中,所出现的一系列问题:

  1. Repository 职责变得飘忽不定。
  2. IRepository 会被污染,导致的结果是领域层也会被污染。
  3. Repository 会出现本不应该出现的 DTO 概念。
  4. Repository 会被大量 GetList 操作所吞没。
  5. Repository 最后会变得“人不像人,鬼不像鬼”。

如果你带着这些问题去理解 CQRS,就会有这样的感慨:“天哪,这简直就是老天派下的一个救星啊!”。

回到一开始的那张图上,看起来感觉很简单的样子,来自用户 UI 的请求分为 Query(查询)和 Command(命令),这些请求操作都会被 Service Interfaces(服务接口,只是一个统称)接收,然后再进行分发处理,对于命令操作会更新 Update Data store,因为读与写分离,为了保持数据的一致性,我们还需要把数据更新应用到 Read Data store。对于一般的应用系统来说,查询会占很大的比重,因为读与写分离了,所以我们可以针对查询进行进一步性能优化,而且还可以保持查询的灵活性和独立性,这种方式在应对大型业务系统来说是非常重要的,从这种层面上来说,CQRS 不用于 DDD 架构好像也是可以的,因为它是一种风格,并不局限于一种架构实现,所以你可以把它有价值的东西进行提炼,应用到合适的一个架构系统中也是可以的。

如果 CQRS 中包含有 Domain(领域)的概念,会是怎样的一种情形呢?

说明:本图摘自 AxonFramework

上面图中包含有很多的概念,但本质是和第一张图是一样的,只不过在其基础上进行了扩展和延伸,先列举一下所涉及的概念:

  • Command Bus(命令总线):图中没有,应该放在 Command Handler 之前,可以看作是 Command 发布者。
  • Command Handler(命令处理器):处理来自 Command Bus 分发的请求,可以看作是 Command 订阅者、处理者。
  • Event Bus(事件总线):一般在 Command Handler 完成之后,可以看作是 Event 发布者。
  • Event Handler(事件处理器):处理来自 Event Bus 分发的请求,可以看作是 Event 订阅者、处理者。
  • Event Store(事件存储):对应概念 Event Sourcing(事件溯源),可以用于事件回放处理,还原指定对象状态。

上面有些是 EDA(事件驱动架构)中的概念,这个在后面会有详细说明,我简单描述一下处理流程,首先抽离两个重要概念:Command(命令)和 Event(事件),Command 是一种命令的语气(本身就是命令的意思,呵呵),它的效果就是对某种对象状态的修改,Command Bus 收集来自 UI 的 Command 命令,并根据具体命令分发给具体的 Command Handler 进行处理,这时候就会产生一些领域操作,并对相应的领域对象进行修改,Command Handler 只是修改操作,并不会涉及到修改之后的操作(比如保存、事件发布等),Command Handler 完成之后并不表示这个 Command 命令就此结束,它需要把接下来的操作交给 Event Bus(完成之后的操作),并分发给相应的 Event Handler 订阅者进行处理,一般是数据保存、事件存储等。

我们来看 IDDD 中的一段代码(P126):

1
2
3
4
5
6
7
8
9
10
11
12
13
public void commitBacklogItemToSprint(
String aTenantId, String aBacklogItemId, String aSprintId) {

TenantId tenantId = new TenantId(aTenantId);

BacklogItem backlogItem = backlogItemRepository().backlogItemOfId(
tenantId, new BacklogItemId(aBacklogItemId));

Sprint sprint = sprintRepository().backlogItemOfId(
tenantId, new SprintId(aSprintId));

backlogItem.commitTo(sprint);
}

commitBacklogItemToSprint 就可以看作是一个 Command Handler,注意其命名(commitXXXXToXXXX),一眼看过去就是命令的意思,commitTo 之后的操作是提交给 Event Bus,然后分发给相应 Event Handler 订阅者,来完成状态修改后确定的操作,这样一个领域对象状态的变更才算完成。

关于 Event Handler 保存领域状态操作,其实说简单也简单,说复杂会很复杂,对于它的实现一般会采用异步的方式,也就是说领域状态的保存操作不会延时领域中的业务操作,数据的一致性使用 Unit of Work,具体的领域状态保存用 Repository 实现。

梳理 Command 整个流程,你会发现一个关键词:状态(Status),在上一篇博文讲 REST 概念时,也有一个相似的概念:应用状态(Application State),REST 其中的一个含义就是状态转换,从客气端的发起请求开始,到服务端响应请求结束,应用状态在其过程中会进行不断的转换,请求响应的整个过程也就是应用状态转换的过程,对于 Command 处理流程来说,领域对象的状态和应用状态其实是相类似。我举一个例子,在 REST 架构风格中,应用状态是不会保存到服务端的,客户端发起请求(包含应用状态信息),服务端做出相应处理,此时的状态会转换成资源状态呈现给客户端,这就是表现层状态转换的意思,回到 Command 处理流程上,Command Bus 接收来自 UI 的请求,分发给相应的 Command Handler 进行处理,在处理过程中,就会对领域对象进行修改操作,但它不会保存修改之后的状态信息,而是交给 Event Handler 进行保存状态信息。

和 Command 相比,Query 的处理流程就简单很多了,Query Service 接收来自 UI 的查询请求,这个查询处理可以用各种方式实现,你可以使用 ORM,也可以直接写 SQL 代码,反正是:怎么能提高性能,就怎么来!返回的结果类型一般是 DTO(数据传输对象),根据 UI 进行设计,可以减少不必要的数据传输。

2. EDA-事件驱动架构

Event-Driven Architecture(事件驱动架构),来自解道的定义:

事件代表过去发生的事件,事件既是技术架构概念,也是业务概念,以事件为驱动的编程模型称为事件驱动架构 EDA。

EDA 架构的三个特性:

  1. 异步
  2. 实时
  3. 彻底解耦

EDA 架构的核心是基于消息的发布订阅模式,通过发布订阅模式实现事件的一对多灵活分发。消息消费方对发送方而言完全透明,消息发送方只管把消息发送到消息中间件,其它事情全部不用关心,由于消息中间件中的 MQ 等技术,即使发送消息时候,消息接收方不可用,但仍然可以正常发送,这才叫彻底解耦。其次一对多的发布订阅模式也是一个核心重点,对于消息的订阅方和订阅机制,可以在消息中间件灵活的进行配置和管理,而对于消息发送方和发送逻辑基本没有任何影响。

EDA 要求我们的是通过业务流程,首先要识别出有价值的业务事件,这些事件符合异步、实时和发布订阅等基本的事件特征;其次是对事件进行详细的分析和定义,对事件对应的消息格式进行定义,对事件的发布订阅机制进行定义等,最后才是基于消息事件模式的开发和测试等工作。

在上一篇博文中有讲到 SOA,我们知道分为客户端和服务端,客户端发起请求给服务端,服务端做出相应的响应,也就是说客户端是主动的,服务端是被动的,这种情况就会造成服务的分散,也就是说,我们一般在设计服务的时候,会根据客户端的响应而被迫的切分业务逻辑,最后导致的情况是各个业务模块所属的服务,被分散在各个业务系统中,这种设计就会导致很多问题的发生。而对于 EDA 架构来说,订阅者向 Event Bus 订阅事件,告诉事件总线我要这个,而 Event Bus 接收订阅后,并不会立即进行处理,而是想什么时候处理就什么时候处理,主动权在 Event Bus 手中,当 Event Bus 想进行处理的时候,一般是接受来自 Command Handler 的请求,然后就分别向指定订阅者发布通知,告诉它们我已经处理了,你们可以接着做下面的事了。

从上面的描述中,我们可以看到 SOA 和 EDA 的明显区别,相对于 SOA 来说,EDA 更加有利于领域的聚合,主动权在领域手中,我们就可以从容面对很多的情形,简单画了一张图:

另外,需要注意的一点,CQRS 可以结合 EDA,也可以不结合,但反过来对于 EDA 来说,则必须结合 CQRS 使用。

2.1 Domin Event-领域事件

领域事件和 Domain Service(领域服务)一样,同属于 DDD 战术模式,这部分内容在 IDDD 第八章有详细介绍,因为我还没学习到那部分,这边就简单说明一下。在 EDA 的定义中说到:事件代表过去发生的事件,换句话说它是代表已完成的事件,准备来说,还应该包含正在完成的事件,既然是属于 DDD 战术模式的一种,那在领域设计中必然有所用武之地。

我用大白话来描述下领域事件在领域中的作用:我们知道行军打仗需要做出抉择,也就是说,需要指挥部商量后下达作战命令,然后把命令交给各个负责的作战中心,有陆军、海军、空军、导弹部队等,它们是命令的实施者,而指挥部是命令的决策者,这个和领域事件是一样的,领域中处理一些业务逻辑后,就会对领域对象的状态做出一些改变,这个就相当于作战命令,然后根据作战命令分配的作战中心进行完成,也就是领域事件的订阅者去完成领域对象状态改变之后的操作,简单而言,领域事件就是领域中的“跑腿者”。

在上面 EDA 的介绍中,有这样的一段代码:backlogItem.commitTo(sprint);,用通用语言表述就是:待定项提交到冲刺,这是领域中完成的一个操作,由 Command Handler 进行委派完成,backlogItem 是一个聚合根对象,commitTo 是聚合根中的一个操作,这个操作完成后,backlogItem 聚合根对象的状态就会被修改了,那在 commitTo 中具体有怎么的操作呢?看下示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void commitTo(Sprint aSprint)
{
this.assertArgumentNotNull(aSprint, "Sprint must not be null.");
this.assertArgumentEquals(aSprint.tenantId(), this.tenantId(), "Sprint must be of same tenant.");
this.assertArgumentEquals(aSprint.productId(), this.productId(), "Sprint must be of same product.");

if (!this.isScheduledForRelease())
{
throw new IllegalStateException("Must be scheduled for release to commit to sprint.");
}

if (this.isCommittedToSprint())
{
if (!aSprint.sprintId().equals(this.sprintId()))
{
this.uncommitFromSprint();
}
}

this.elevateStatusWith(BacklogItemStatus.COMMITTED);

this.setSprintId(aSprint.sprintId());

DomainEventPublisher
.instance()
.publish(new BacklogItemCommitted(
this.tenantId(),
this.backlogItemId(),
this.sprintId()));
}

注意 commitTo 所处在 BacklogItem 聚合根内,前面都是对聚合根对象的一些状态操作,在后面我们会看到 DomainEventPublisher(领域事件发布者),BacklogItemCommitted 继承自 DomainEvent,BacklogItemCommitted 对应的领域事件,在 BacklogItemApplicationService 中进行订阅,一般是聚合根对象在初始化的时候。

根据上面这个代码示例,然后结合 EDA 的三个特性就可以很好理解了,首先对于领域事件的处理操作一般是异步完成,这样就不会影响聚合根中的其他业务操作,当领域事件发布的时候,会实时的告知订阅者进行处理,因为它不管订阅者的具体处理情况,订阅者和发布者的规范在 DomainEvent 中,而不是像接口定义和实现那么强制,所以,当领域事件发布的时候,就说明订阅者已经被告知并进行了处理,所以他们直接的关系可以彻底的解耦。

在之前的短消息项目中,我没用到领域事件,对它也不是很深入的了解,在后面的博文中,再进行详细说明。

2.2 Long-Running Process(Saga)-长时处理过程

来自 IDDD 中的定义:

事件驱动的、分布式的并行处理模式。

关于 Saga 的相关博文,国内几乎没有(netfocus 有一篇相关说明),长时处理过程,说明它是一个需要耗时、多任务并行的处理过程,那在领域中,什么时候会有它的用武之地呢?比如一个看似简单的业务逻辑,可能会涉及到领域中很复杂的业务操作,而且对于这些处理需要耗费很长的时间。

在电子商城提交一个订单操作,用户看来可能会非常简单,但在领域进行处理的时候,会涉及到订单操作、客户操作、商品操作、库存操作、消息通知操作等等,有些会进行及时的处理,但有些则不会,比如消息通知操作等等,我们可以把这个业务操作分离一下,对于一些耗时比较长的操作精拣一下,商品的减少对应库存的减少,减少之后会进行警戒线判断,如果低于警戒下,则会给库存管理人员发送消息,商品减少了对应的商品统计也要进行更新,客户购买之后也要进行发送消息通知,我们可以把这些用一个 Saga 进行处理,因为是基于事件驱动,所以一个 Saga 中会订阅多个事件,Saga 会对这些事件进行跟踪,对于一些事件处理失败,也要进行做出相应的弥补措施,当所有的操作完成后,Saga 会返回一个状态给领域,也许这个返回操作已经在开始的几天以后了。

说明:本图摘自 MSDN

上图描述的是:一个会议购买座位的业务过程,中间的 Order Process Manager 就是一个 Saga,在 CQRS 架构中的表现就是 Process Manager(过程管理),我们一般会用它处理多个聚合根交互的业务逻辑,比如 netfocus 博文中列举的 TransferProcessCommandHandlers 操作,还有上图中的购买座位业务操作,那我们应该怎么设计 Saga 呢?或者说在设计的时候,应该需要注意哪些地方呢?我大致列一下:

  • 将 Saga 尽量设计成组合任务的形式,你可以把它看作是一个任务的结合体,并对内部每个任务进行跟踪操作。
  • Saga 也可以用一组聚合的形式体现,也就是上面的图中示例。
  • 无状态处理,因为是基于事件驱动,状态信息会包裹在事件中,对于 Sage 整个处理过程来说,可以看作是无状态处理。
  • 可以适用于分布式设计。

2.3 Event Sourcing-事件溯源

字面上的理解:

事件即 Event,溯是一个动词,可以理解为追溯的意思,源代表原始、源头的意思,合起来表示一个事件追踪过程。

我们都知道在源代码管理的时候,可以使用 SVN、Git、CVS 进行对代码修改的跟踪操作,从一个代码文件的创建开始,我们可以查看各个版本对代码的修改情况,甚至可以指定版本进行还原操作,这就是对代码跟踪所带来的好处。而对于领域对象来说,我们也应该知晓其整个生命周期的演变过程,这样有利于查看并还原某一“时刻”的领域对象,在 EDA 架构中,对于领域对象状态的保存是通过领域事件进行完成,所以我们要想记录领域对象的状态记录,就需要把领域对象所经历的所有事件进行保存下来,这就是 Event Store(事件存储),这个东西就相当于 Git 代码服务器,存储所有领域对象所经历的事件,对于某一事件来说,可以看作是对应领域对象的“快照”。

总的来说,ES 事件溯源可以概括为两点:

  1. 记录
  2. 还原

最后,贴一张 CQRS、EDA、Saga、ES 结合图:

说明:本图来自 netfocus

CQRS 参考资料:

EDA 参考资料:


未完成的两点:

  • 3. CQRS Journey-微软示例项目
  • 4. ENode-netfocus 实践项目

本来还想把这两个项目分析一下,至少可以看懂一个业务流程,比如 Conference 项目中的 AssignSeats、ConferencePublish 等,ENode 项目中的 BankTransferSample 示例,但分析起来,真的有些吃力,有时候概念是一方面,实践又是另一方面,后面有时间理解了,再把这两点内容进行补充下。

这篇博文内容有点虚,大家借鉴有用的地方就行,也欢迎拍砖斧正。

上一篇:《IDDD 实现领域驱动设计-CQRS(命令查询职责分离)和 EDA(事件驱动架构)

学习架构知识,需要有一些功底和经验,要不然你会和我一样吃力,CQRS、EDA、ES、Saga 等等,这些是实践 DDD 所必不可少的架构,所以,如果你不懂这些,是很难看懂上篇所提到的 CQRS Journey 和 ENode 项目,那怎么办呢?我们可以从简单的 Demo 一点一滴开始。

代码地址:https://github.com/yuezhongxin/CQRS.Sample


说明:一张很丑陋的图

CQRS.Sample 所描述的一个流程是 SendMessage(发消息),也就是之前 MessageManager 中的一个业务示例,这个业务流程用到 CQRS 示例中,可能会有些不太准确,或者是有些牵强,但我主要的目的是想做一次 Commond->Event 的过程,熟悉它们到底是怎么交互的?所以,你查看代码的时候,尽量忽略这个业务流程,当然,如果你有针对这个业务流程更好的具体应用,我是非常欢迎交流。

首先,我们根据上面的流程图一步一步进行,UI 创建一个 Commond,然后交给 CommandBus 进行 Send(发送),也就是下面这段代码:

1
2
3
4
5
6
7
8
9
var commond = new SendMessageCommond()
{
Title = "this is title",
Content = "this is content",
SenderLoginName = "this is senderLoginName",
RecipientDisplayName = "this is recipientDisplayName"
};
var commandBus = IocContainer.Default.Resolve<CommandBus>();
commandBus.Send(commond);

项目中所有的类型映射都是通过 IoC 进行注入,ICommandBus 接口定义为:void Send<TCommand>(TCommand cmd) where TCommand : ICommand;,CommandBus 的实现主要是在 Send 方法中,解析 ICommandHandler<TCommand> 注入的类型对象,然后调用 ICommandHandler 接口定义的 Handle 方法,传入 TCommand 参数对象。

SendMessageCommond 的定义很简单,主要是一些来自 UI 的参数,所以,我们一般会定义一些属性字段,有时候我们会进行数据验证,可以使用 Validate,方法签名是: IEnumerable<ValidationResult> Validate(ValidationContext validationContext),不过需要继承 IValidatableObject 接口,这样我们就可以在 MVC View 前端进行输出验证结果,使用起来非常方便,SendMessageCommond 继承一个无实现的 ICommand 接口,主要用来约束类型,SendMessageCommond 对应 SendMessageCommondHandler,实现代码:

1
2
3
4
5
6
7
8
9
10
11
public class SendMessageCommondHandler : 
ICommandHandler<SendMessageCommond>
{
public void Handle(SendMessageCommond command)
{
var sender = VerifySenderService.Verify(command.SenderLoginName);
var receiver = VerifyReceiverService.Verify(command.RecipientDisplayName);
var message = new Message(command.Title, command.Content, sender, receiver);
message.Send();
}
}

CommondHandler 的功能有点类似于经典分层架构中的 Application(应用层),从它的具体实现中,我们就可以看出领域到底在做哪些工作,它的主要工作就是协调这些工作的流程,领域中的代码我就不贴了,这里我简单说明一下,在之前的 SendMessage 实现中,设计了一个 SendMessageService 领域服务,里面主要进行的工作是验证收发件人,以及消息是否符合规则,后来我就想,SendMessageService 和它实际进行的工作不相符,发消息所蕴含的实际业务意义,我也一直没有想明白,但是在具体实现中,发消息所做的工作是验证,那验证是不是发消息真正的业务含义呢?所以,稀里糊涂就有了上面的代码,VerifySenderService 和 VerifyReceiverService 是用来验证收发件人信息的,成功的话就返回一个 Contact 对象,具体的验证逻辑可以查看下实现代码。

下面说一下 message.Send();,这个可能有很大的问题,在 CQRS Journey 项目中,有很多类似于这样的操作,就是在 CommondHandler 中,创建一个聚合根对象,然后执行聚合根中的一个行为,比如我搜刮的 order.Confirm(),订单可以提交自己吗?消息可以发送自己吗?这样做的含义是什么?其实我也搞不太清楚,在 Message 聚合根中的 Send 方法中,主要是事件的发布,

1
2
3
4
public void Send()
{
eventBus.Publish(new MessageSentEvent(this));
}

先抛开 Send 的合理性,看下 EventBus 是如何 Publish 的?我的实现中和 CommondBus 很相似,但我觉得可能有些问题,Commond 和 CommondHandler 是一一对应关系,我们知道事件发布和事件订阅是一对多关系,也就是说一个事件可能有很对的订阅者,这些订阅者所处理的过程可能会有些不同,比如用户注册的一个事件,可能会有邮件通知订阅者,也可能会有统计数据更新订阅者,在 CQRS Journey 项目中,EventBus 的 Publish 大概是这样实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void Publish(Envelope<IEvent> @event)
{
var message = this.BuildMessage(@event);

this.sender.Send(message);
}
private Message BuildMessage(Envelope<IEvent> @event)
{
using (var payloadWriter = new StringWriter())
{
this.serializer.Serialize(payloadWriter, @event.Body);
return new Message(payloadWriter.ToString(), correlationId: @event.CorrelationId);
}
}

而我的实现则是和 CommondBus 一样,都是用 IoC 注入的,所以肯定有问题,我们来看下 MessageSentEventHandler 中的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class MessageSentEventHandler : 
IEventHandler<MessageSentEvent>,
IEventHandler<MailSentEvent>
{
private IEventBus eventBus;

public void Handle(MessageSentEvent @event)
{
eventBus = IocContainer.Default.Resolve<IEventBus>();
new MessageRepository().Save<Message>(@event.Message);
eventBus.Publish(new MailSentEvent
{
Title = @event.Message.Title,
Content = @event.Message.Title,
});
}

public void Handle(MailSentEvent @event)
{
var mailMessage = new System.Net.Mail.MailMessage();
mailMessage.Subject = @event.Title;
mailMessage.Body = @event.Content;
mailMessage.IsBodyHtml = true;
mailMessage.BodyEncoding = System.Text.Encoding.UTF8;
mailMessage.Priority = System.Net.Mail.MailPriority.Normal;
System.Net.Mail.SmtpClient smtpClient = new System.Net.Mail.SmtpClient();
Task.Run(() => { smtpClient.SendAsync(mailMessage, mailMessage.Body); });
}
}

消息发送之后,进行持久化操作,然后再进行邮件通知,这样一整个 SendMessage 的流程就走完了。

这个流程中,只有 CQRS 的 Commond,并没有 Query,也没有 ES、Saga 的概念,主要是它们太深奥了,我搞不来。CQRS.Sample 只是一个简单的示例,发消息的业务含义所表达的可能不是很准确,本来是想用用户注册、订单提交业务示例,但还是想想用发消息进行尝试下,除去 EventBus 的实现有问题,CQRS 的简化版基本上都能表现出来了。

另外,从简单分层架构改造成 CQRS 确实有很多挑战,但有时候想想,领域模型都设计的有问题,那用什么架构实现都毫无意义,如果在现有的项目中,你发现经典分层架构实现起来有很多别扭的地方,比如 Domain 引用了 DTO,你可以尝试先把 Repositories 进行分离下,创建一个 Query 项目,把一些无业务逻辑的查询发到里面(主要是应用层调用的),这样使 Repositories 更加简化,如果可以简化成只有一个 GetById 方法,那么就达到 CQRS 的标准了,因为 Repositories 的接口定义在领域层,因为 Query 项目的分离,所以,Domain 就可以去除 DTO 的引用了,应用层也就直接调用 Query,这只是一个中和方案。

领域模型需要一点一滴设计,架构也需要一点一滴设计,但后者需要建立在前者的基础上。

一个对我非常有帮助的 CQRS 系列(初级):

public interface INavigationAware

{

bool IsNavigationTarget(NavigationContext navigationContext);

void OnNavigatedTo(NavigationContext navigationContext);

void OnNavigatedFrom(NavigationContext navigationContext);

}

当从本页面转到其它页面的时候,会调用OnNavigatedFrom方法,navigationContext会包含目标页面的URI。

当从其它页面导航至本页面的时候,首先会调用IsNavigationTarget,IsNavigationTarget返回一个bool值,简单地说这个方法的作用就是告诉Prism,是重复使用这个视图的实例还是再创建一个。然后调用OnNavigatedTo方法。在导航到本页面的时候,就可以从navigationContext中取出传递过来的参数。

在prism中要实现页面导入时传入参数中比如MyView?param1=abc&param2=123,如何获得这些参数呢?这就要实现INavigationAware接口。

上面那个视图View叫MyView,假设它的ViewModel叫MyViewModel,那个这个ViewModel必须实现INavigationAware接口。

INavigationAware接口源码

public interface INavigationAware
{ bool IsNavigationTarget(NavigationContext navigationContext); void OnNavigatedTo(NavigationContext navigationContext); void OnNavigatedFrom(NavigationContext navigationContext);
}

View Code

这三个方法都很重要:

  • IsNavigationTarget方法:当前的视图模型是否可以处理请求的导航行为,通常用来指定当前的视图/模型是否可以重用。例如,一个显示客户明细的视图可以显示客户a,客户b….等的信息,他们重用同一个视图。
  • OnNavigatedTo方法:当前的页面被导航到以后发生,这个函数可以用来处理URI的参数。
  • OnNavigatedFrom方法:当前的页面导航到其他页面的时候发生。

这里给出一个实现了INavigationAware接口的ViewModel例子:

[Export(“MyViewModel “, typeof(MyViewModel ))]
[PartCreationPolicy(CreationPolicy.NonShared)] public class MyViewModel : INavigationAware
{ private IRegionNavigationJournal navigationJournal; bool INavigationAware.IsNavigationTarget(NavigationContext navigationContext)
{ return true;
} void INavigationAware.OnNavigatedFrom(NavigationContext navigationContext)
{ // Intentionally not implemented.
} void INavigationAware.OnNavigatedTo(NavigationContext navigationContext)
{ var pram1 = navigationContext.Parameters[“param1”];

      UpdateDataAsync(pram1 ); this.navigationJournal = navigationContext.NavigationService.Journal;
    }

}

View Code

在上面的例子中,OnNavigatedTo方法获取了URI的参数,当参数发生变化自动重新获取数据刷新ViewModel。这个场景可以用在:例如,一个显示客户明细的视图可以显示客户a,客户b….等的信息,他们重用同一个视图。当我们navigate导航到URI:MyView?param1=abc&param2=123的时候,导航到同一个视图但客户ID变化,放在URI的参数中传递,OnNavigatedTo方法获取了URI的参数,重用当前视图,同时刷新数据。整个过程很清晰。

IConfirmNavigationRequest接口

有些时候当我们导航到其他页面的时候,需要弹出一个框提示用户“是否放弃修改?保存、放弃、取消”,这就需要实现IConfirmNavigationRequest接口。还是上面那个例子,那个ViewModel还要实现IConfirmNavigationRequest接口。这样这个ViewModel既实现了INavigationAware接口又实现了IConfirmNavigationRequest接口,触发的顺序如下:

  • 导航发生的时候,如果目标也实现了IConfirmNavigationRequest接口,那么先自动调用ConfirmNavigationRequest.
  • ViewModel触发interaction来打开一个“是否放弃修改?保存、放弃、取消” 确认的UI。
  • 当用户选择了”保存、放弃、取消”以后interaction的callback自动触发
  • 根据用户的选择决定是否继续导航

来一个例子:

ViewModel实现了IConfirmNavigationRequest接口 public class ComposeEmailViewModel : NotificationObject, IConfirmNavigationRequest
{ private readonly InteractionRequest confirmExitInteractionRequest; public ComposeEmailViewModel(IEmailService emailService)
{ this.confirmExitInteractionRequest = new InteractionRequest();
} public IInteractionRequest ConfirmExitInteractionRequest
{ get { return this.confirmExitInteractionRequest; }
}
}

XAML定义Interaction <UserControl.Resources>
<DataTemplate x:Name=”ConfirmExitDialogTemplate”>
<TextBlock HorizontalAlignment=”Center” VerticalAlignment=”Center” Text=“{Binding}”/>

</UserControl.Resources>
<Grid x:Name=”LayoutRoot” Background=”White”>
ei:Interaction.Triggers
<prism:InteractionRequestTrigger
SourceObject=“{Binding ConfirmExitInteractionRequest}”>
<prism:PopupChildWindowAction
ContentTemplate=“{StaticResource ConfirmExitDialogTemplate}”/>

</ei:Interaction.Triggers> …

用户确认 void IConfirmNavigationRequest.ConfirmNavigationRequest(
NavigationContext navigationContext, Action<bool> continuationCallback)
{ this.confirmExitInteractionRequest.Raise( new Confirmation {Content = “…”, Title = “…”},
c => {continuationCallback(c.Confirmed);});
}

View Code

一、什么是IOC

学习IOC之前先来了解一个依赖导致原则(DIP),依赖导致原则是IOC的核心原理。

依赖导致:即上层模块不应该依赖于低层模块,二者应该通过抽象来依赖。依赖于抽象,而不是依赖于细节。

首先来看下面的例子:

1、定义一个接口,封装数据库的基本CRUD操作,接口定义如下:

复制代码

1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using System.Data; 7
8 namespace DataBase.Interface 9 { 10 ///


11 /// 数据访问接口 12 ///

13 public interface IDbInterface 14 { 15 string Insert(); 16 string Delete(); 17 string Update(); 18 string Query(); 19 } 20 }

复制代码

 2、定义一个MSSQL类实现该接口,用来模仿SQLServer操作,MSSQL类定义如下:

复制代码

1 using DataBase.Interface; 2 using System; 3 using System.Collections.Generic; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7
8 namespace DataBase.MSSQL 9 { 10 public class DbMSSQL : IDbInterface 11 { 12 public string Delete() 13 { 14 return “MSSQL执行删除”; 15 } 16
17 public string Insert() 18 { 19 return “MSSQL执行插入”; 20 } 21
22 public string Query() 23 { 24 return “MSSQL执行查询”; 25 } 26
27 public string Update() 28 { 29 return “MSSQL执行更新”; 30 } 31 } 32 }

复制代码

 3、定义一个Oracle类实现该接口,模仿Oracle的操作,Oracle类定义如下:

复制代码

1 using DataBase.Interface; 2 using System; 3 using System.Collections.Generic; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7
8 namespace DataBase.Oracle 9 { 10 public class DbOracle : IDbInterface 11 { 12 public string Delete() 13 { 14 return “Oracle执行删除”; 15 } 16
17 public string Insert() 18 { 19 return “Oracle执行插入”; 20 } 21
22 public string Query() 23 { 24 return “Oracle执行查询”; 25 } 26
27 public string Update() 28 { 29 return “Oracle执行更新”; 30 } 31 } 32 }

复制代码

 4、定义一个控制台应用程序来调用:

复制代码

1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using DataBase.Interface; 7 using DataBase.MSSQL; 8
9 namespace IOCConApp 10 { 11 class Program 12 { 13 static void Main(string[] args) 14 { 15 // 常规做法,即程序的上端,依赖于下端,依赖于细节
16 DbMSSQL mssql = new DbMSSQL(); 17 } 18 } 19 }

复制代码

 常规做法是添加引用,然后直接实例化类,但是这样会依赖于细节实现,现将代码修改如下:

1

2

// 通过抽象来依赖

IDbInterface dbInterface = new DbMSSQL();

 但是这样修改以后,虽然左边是抽象了,但是右边还是依赖于细节。

那就究竟什么是IOC呢?

IOC(Inversion of Control)即控制反转,是一个重要的面向对象编程的法则来消减程序之间的耦合问题,把程序中上层对下层依赖,转移到一个第三方容器中来装配。IOC是程序设计的目标,实现方式包含依赖注入和依赖查找,在.net中只有依赖注入。

说到IOC,就不能不说DI。DI:即依赖注入,是IOC的实现手段。

二、使用Unity实现IOC

Unity是一个IoC容器,用来实现依赖注入(Dependency Injection,DI),减少耦合的,Unity出自于伟大的微软。

unity组件网址:http://unity.codeplex.com/

unity能够做什么呢,列举部分如下:

1.Unity支持简单对象创建,特别是分层对象结构和依赖,以简化程序代码。其包含一个编译那些可能存在依赖于其他对象的对象实例机制。
2.Unity支持必要的抽象,其允许开发者在运行时或配置去指定依赖关系同时可以简单的管理横切点(AOP)。
3.Unity增加了推迟到容器组件配置的灵活性。其同样支持一个容器层次的结构。
4.Unity拥有服务定位能力,对于一个程序在许多情况下重复使用组件来分离和集中功能是非常有用的。
5.Unity允许客户端储存或缓存容器。对于在ASP.NET Web applications中开发者将容器持久化于ASP.NET中的session或application中特别有效。
6.Unity拥有拦截能力,其允许开发者通过创建并执行handlers(在方法或属性被调用到达之前)来为已存在的组件增加一个函数,并再次为返回调用结果。
7.Unity可以从标准配置系统中读取配置信息,例如:XML文件,同时使用配置文件来配置容器。
8.Unity支持开发者实现自定义容器扩展,例如:你可以实现方法来允许额外的对象构造和容器特征,例如缓存。
9.Unity允许架构师和开发者在现代化的程序中更简单的实现通用设计模式。

什么情况下要使用unity呢?

1.所构建的系统依赖于健全的面向对象原则,但是大量不同的代码交织在一起而难以维护。
2.构建的对象和类需要依赖其他对象或类。
3.依赖于复杂的或需要抽象的对象。
4.希望利用构造函数、方法或属性的调用注入优势。
5.希望管理对象实例的生命周期。
6.希望能够在运行时管理并改变依赖关系。
7.希望在拦截方法或属性调用的时候生成一个策略链或管道处理容器来实现横切(AOP)任务。
8.希望在Web Application中的回发操作时能够缓存或持久化依赖关系。

1、程序中安装Unity

使用管理NuGet程序包来安装Unity,在项目上右键,选择管理NuGet程序包:

在搜索框里面输入Unity,点击右侧安装按钮进行安装:

出现以下信息表示安装成功:

2、使用Unity实现DI

先来看看最简单的Unity实现方式:

IUnityContainer container = new UnityContainer();//1、定义一个空容器
container.RegisterType<IDbInterface, DbMSSQL>();//2、注册类型,表示遇到IDbInterface的类型,创建DbMSSQL的实例
var db = container.Resolve();
Console.WriteLine(db.Insert());
Console.ReadKey();

 结果:

从结果中可以看出,db是DbMSSQL类型的实例。

除了使用RegisterType注册类型以外,还可以注册一个实例,例如:

// 使用RegisterInstance注册IDbInterface的实例:new DbMSSQL()
container.RegisterInstance(new DbMSSQL());

3、三种注入方式

三种注入方式:构造函数注入、属性注入、方法注入。

3.1 定义IHeadphone接口,代码如下:

复制代码

1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6
7 namespace DataBase.Interface 8 {
9 public interface IHeadphone 10 { 11
12 } 13 }

复制代码

 3.2 定义IMicrophone接口,代码如下:

复制代码

1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6
7 namespace DataBase.Interface 8 {
9 public interface IMicrophone 10 { 11
12 } 13 }

复制代码

 3.3 定义IPower接口,代码如下:

复制代码

1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6
7 namespace DataBase.Interface 8 {
9 public interface IPower 10 { 11
12 } 13 }

复制代码

 3.4 定义IPhone接口,代码如下:

复制代码

1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6
7 namespace DataBase.Interface 8 {
9 public interface IPhone 10 { 11 void Call(); 12 IMicrophone iMicrophone { get; set; } 13 IHeadphone iHeadphone { get; set; } 14 IPower iPower { get; set; } 15 } 16 }

复制代码

 3.5 分别实现上面定义的接口

IPhone接口的实现如下:

复制代码

1 using DataBase.Interface; 2 using System; 3 using System.Collections.Generic; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7 using Unity.Attributes; 8
9 namespace DataBase.MSSQL 10 { 11 public class ApplePhone : IPhone 12 { 13 [Dependency]//属性注入
14 public IMicrophone iMicrophone { get; set; } 15 public IHeadphone iHeadphone { get; set; } 16 public IPower iPower { get; set; } 17
18 [InjectionConstructor]//构造函数注入
19 public ApplePhone(IHeadphone headphone) 20 { 21 this.iHeadphone = headphone; 22 Console.WriteLine(“{0}带参数构造函数”, this.GetType().Name); 23 } 24
25 public void Call() 26 { 27 Console.WriteLine(“{0}打电话”, this.GetType().Name); ; 28 } 29
30 [InjectionMethod]//方法注入
31 public void Init1234(IPower power) 32 { 33 this.iPower = power; 34 } 35 } 36 }

复制代码

 IHeadphone接口的实现如下:

复制代码

1 using DataBase.Interface; 2 using System; 3 using System.Collections.Generic; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7
8 namespace DataBase.MSSQL 9 { 10 public class Headphone : IHeadphone 11 { 12 public Headphone() 13 { 14 Console.WriteLine(“Headphone 被构造”); 15 } 16 } 17 }

复制代码

 IMicrophone接口的实现如下:

复制代码

1 using DataBase.Interface; 2 using System; 3 using System.Collections.Generic; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7
8 namespace DataBase.MSSQL 9 { 10 public class Microphone : IMicrophone 11 { 12 public Microphone() 13 { 14 Console.WriteLine(“Microphone 被构造”); 15 } 16 } 17 }

复制代码

 IPower接口的实现如下:

复制代码

1 using DataBase.Interface; 2 using System; 3 using System.Collections.Generic; 4 using System.Linq; 5 using System.Text; 6 using System.Threading.Tasks; 7
8 namespace DataBase.MSSQL 9 { 10 public class Power : IPower 11 { 12 public Power() 13 { 14 Console.WriteLine(“Power 被构造”); 15 } 16 } 17 }

复制代码

 控制台程序调用:

复制代码

1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using DataBase.Interface; 7 using DataBase.MSSQL; 8 using Unity; 9
10 namespace IOCConApp 11 { 12 ///


13 /// IOC():控制反转,把程序上层对下层的依赖,转移到第三方的容器来装配 14 /// 是程序设计的目标,实现方式包含了依赖注入和依赖查找(.net里面只有依赖注入) 15 /// DI:依赖注入,是IOC的实习方式。 16 ///

17 class Program 18 { 19 static void Main(string[] args) 20 { 21 #region MyRegion
22 //// 常规做法,即程序的上端,依赖于下端,依赖于细节
23 //DbMSSQL mssql = new DbMSSQL();
24
25 //// 通过抽象来依赖
26 //IDbInterface dbInterface = new DbMSSQL(); 27
28 //IUnityContainer container = new UnityContainer();//1、定义一个空容器 29 //container.RegisterType<IDbInterface, DbMSSQL>();//2、注册类型,表示遇到IDbInterface的类型,创建DbMSSQL的实例 30 //var db = container.Resolve();
31
32 //// 使用RegisterInstance注册IDbInterface的实例:new DbMSSQL()
33 //container.RegisterInstance(new DbMSSQL()); 34 //Console.WriteLine(db.Insert());
35 #endregion
36
37 IUnityContainer container = new UnityContainer(); 38 container.RegisterType<IPhone, ApplePhone>(); 39 container.RegisterType<IMicrophone, Microphone>(); 40 container.RegisterType<IHeadphone, Headphone>(); 41 container.RegisterType<IPower, Power>(); 42
43 IPhone phone = container.Resolve(); 44
45 Console.WriteLine($”phone.iHeadphone==null? {phone.iHeadphone == null}”); 46 Console.WriteLine($”phone.iMicrophone==null? {phone.iMicrophone == null}”); 47 Console.WriteLine($”phone.iPower==null? {phone.iPower == null}”); 48
49 Console.ReadKey(); 50 } 51 } 52 }

复制代码

 输出结果:

从输出结果中可以看出三种注入方式的执行顺序:先执行构造函数注入,在执行属性注入,最后执行方法注入。

注意:默认情况下如果构造函数上面没有使用特性,那么默认找参数最多的构造函数执行注入。

4、一个接口多个实现进行注册

如果多个不同的实例实现同一个接口,这种情况该怎么注册呢?先来看看下面的代码:

1 IUnityContainer container = new UnityContainer();//1、定义一个空容器
2 container.RegisterType<IDbInterface, DbMSSQL>();//2、注册类型,表示遇到IDbInterface的类型,创建DbMSSQL的实例
3 container.RegisterType<IDbInterface, DbOracle>();//表示遇到IDbInterface的类型,创建DbMSSQL的实例
4 var db = container.Resolve(); 5 Console.WriteLine(db.Insert());

运行结果:

从运行结果中可以看出,后面注册的类型会把前面注册的类型给覆盖掉,那么该如何解决呢?可以通过参数的方式来解决,代码如下:

复制代码

1 IUnityContainer container = new UnityContainer();//1、定义一个空容器
2 container.RegisterType<IDbInterface, DbMSSQL>(“sql”);//2、注册类型,表示遇到IDbInterface的类型,创建DbMSSQL的实例
3 container.RegisterType<IDbInterface, DbOracle>(“oracle”);//表示遇到IDbInterface的类型,创建DbMSSQL的实例
4 var sql = container.Resolve(“sql”); 5 var oracle = container.Resolve(“oracle”); 6 Console.WriteLine(sql.Insert()); 7 Console.WriteLine(oracle.Insert());

复制代码

运行结果:

5、生命周期

生命周期及一个对象从创建到释放中间经过的时间。先看下面的代码:

复制代码

1 IUnityContainer container = new UnityContainer(); 2 container.RegisterType<IDbInterface, DbMSSQL>(); 3 IDbInterface db1 = container.Resolve(); 4 IDbInterface db2 = container.Resolve(); 5 Console.WriteLine(“HashCode:”+db1.GetHashCode().ToString()); 6 Console.WriteLine(“HashCode:” + db2.GetHashCode().ToString()); 7 Console.WriteLine(object.ReferenceEquals(db1,db2));

复制代码

 结果:

表明db1和db2是两个不同的实例,即默认情况下生命周期是瞬时的,每次都是创建一个新的实例。

container.RegisterType<IDbInterface, DbMSSQL>(new TransientLifetimeManager());表示是瞬时生命周期,默认情况下即这种。

在看下面的代码:

复制代码

1 IUnityContainer container = new UnityContainer(); 2 container.RegisterType<IDbInterface, DbMSSQL>(new ContainerControlledLifetimeManager()); 3 IDbInterface db1 = container.Resolve(); 4 IDbInterface db2 = container.Resolve(); 5 Console.WriteLine(“HashCode:” + db1.GetHashCode().ToString()); 6 Console.WriteLine(“HashCode:” + db2.GetHashCode().ToString()); 7 Console.WriteLine(object.ReferenceEquals(db1, db2));

复制代码

结果:

上图的结果可以看出,db1和db2是同一个实例。

container.RegisterType<IDbInterface, DbMSSQL>(new ContainerControlledLifetimeManager())表示是容器单例,每次都是同一个实例。

6、使用配置文件实现

在上面的例子中,所有的例子都是一直在依赖于细节,那么怎么解决不依赖于细节呢?答案是只能使用配置文件,配置文件如下:

复制代码

1
2
3


4
5
6
7
8 <container name=”testContainer”>
9
10
11
12
13

14

15

复制代码

 注意:这个一个单独的配置文件,要把属性里面的复制到输出目录改为始终复制,那么这个配置文件才会生成到Debug目录里面。

程序如下:

复制代码

1 ExeConfigurationFileMap fileMap = new ExeConfigurationFileMap(); 2 fileMap.ExeConfigFilename = Path.Combine(AppDomain.CurrentDomain.BaseDirectory + “CfgFiles\\Unity.Config”);//找配置文件的路径
3 Configuration configuration = ConfigurationManager.OpenMappedExeConfiguration(fileMap, ConfigurationUserLevel.None); 4 UnityConfigurationSection section = (UnityConfigurationSection)configuration.GetSection(UnityConfigurationSection.SectionName); 5 IUnityContainer container = new UnityContainer(); 6 section.Configure(container, “testContainer”); 7 IDbInterface db = container.Resolve(“sql”); 8 Console.WriteLine(db.Insert());

复制代码

结果:

观察上面的代码,会发现,如果改成使用配置文件的方式实现的话,代码里面就不会依赖于细节了,只要一个接口类型。既然没有细节了,那么对项目进行如下的改造:把引用里面对细节的引用都去掉(即去掉DataBase.MSSQL和DataBase.Oracle),然后Debug文件夹里面没有这两个DLL了,但是这时需要把这两个DLL复制到Debug目录下面,否则程序运行的时候会找不到具体实现的类型。这样就意味着程序架构只依赖于接口。

引用里面只要对接口的引用了,没有对具体实现的引用。去掉了对细节的依赖。

注意:使用配置文件实现时,必须把接口的具体实现类复制到程序目录下面。

如果有额外添加了一种数据库,那么只需要修改配置文件,把新的实现类复制到程序目录下面即可实现程序的升级。

使用配置文件的时候,需要把UnityContainer容器定义为静态的,这样只需要读取一次配置文件即可。

IT项目中,常见的风险都在这里了! - 知乎

Excerpt

项目的风险无非体现在以下四个方面:需求、技术、成本和进度。IT项目开发中常见的风险有如下几类: 1、需求风险① 需求已经成为项目基准,但需求还在继续变化; ② 需求定义欠佳,而进一步的定义会扩展项目范畴; ③ …


项目的风险无非体现在以下四个方面:需求、技术、成本和进度。IT项目开发中常见的风险有如下几类:

1、需求风险

① 需求已经成为项目基准,但需求还在继续变化;

② 需求定义欠佳,而进一步的定义会扩展项目范畴;

③ 添加额外的需求;

④ 产品定义含混的部分比预期需要更多的时间;

⑤ 在做需求中客户参与不够;

⑥ 缺少有效的需求变化管理过程。

2、计划编制风险

① 计划、资源和产品定义全凭客户或上层领导口头指令,并且不完全一致;

② 计划是优化的,是”最佳状态”,但计划不现实,只能算是”期望状态”;

③ 计划基于使用特定的小组成员,而那个特定的小组成员其实指望不上;

④ 产品规模(代码行数、功能点、与前一产品规模的百分比)比估计的要大;

⑤ 完成目标日期提前,但没有相应地调整产品范围或可用资源;

⑥ 涉足不熟悉的产品领域,花费在设计和实现上的时间比预期的要多。

3、组织和管理风险

① 仅由管理层或市场人员进行技术决策,导致计划进度缓慢,计划时间延长;

② 低效的项目组结构降低生产率;

③ 管理层审查 决策的周期比预期的时间长;

④ 预算削减,打乱项目计划;

⑤ 管理层作出了打击项目组织积极性的决定;

⑥ 缺乏必要的规范,导至工作失误与重复工作;

⑦ 非技术的第三方的工作(预算批准、设备采购批准、法律方面的审查、安全保证等)时间比预期的延长。

4、人员风险

① 作为先决条件的任务(如培训及其他项目)不能按时完成;

② 开发人员和管理层之间关系不佳,导致决策缓慢,影响全局;

③ 缺乏激励措施,士气低下,降低了生产能力;

④ 某些人员需要更多的时间适应还不熟悉的软件工具和环境;

⑤ 项目后期加入新的开发人员,需进行培训并逐渐与现有成员沟通,从而使现有成员的工作效率降低;

⑥ 由于项目组成员之间发生冲突,导致沟通不畅、设计欠佳、接口出现错误和额外的重复工作;

⑦ 不适应工作的成员没有调离项目组,影响了项目组其他成员的积极性;⑧没有找到项目急需的具有特定技能的人

5、开发环境风险

① 设施未及时到位;

② 设施虽到位,但不配套,如没有电话、网线、办公用品等;

③ 设施拥挤、杂乱或者破损;

开发工具未及时到位;

⑤ 开发工具不如期望的那样有效,开发人员需要时间创建工作环境或者切换新的工具;

⑥ 新的开发工具的学习期比预期的长,内容繁多。

6、客户风险

① 客户对于最后交付的产品不满意,要求重新设计和重做;

② 客户的意见未被采纳,造成产品最终无法满足用户要求,因而必须重做;

③ 客户对规划、原型和规格的审核 决策周期比预期的要长;

④ 客户没有或不能参与规划、原型和规格阶段的审核,导致需求不稳定和产品生产周期的变更;

⑤ 客户答复的时间(如回答或澄清与需求相关问题的时间)比预期长;

⑥ 客户提供的组件质量欠佳,导致额外的测试、设计和集成工作,以及额外的客户关系管理工作。

7产品风险

① 矫正质量低下的不可接受的产品,需要比预期更多的测试、设计和实现工作;

② 开发额外的不需要的功能(镀金),延长了计划进度;

③ 严格要求与现有系统兼容,需要进行比预期更多的测试、设计和实现工作;

④ 要求与其他系统或不受本项目组控制的系统相连,导致无法预料的设计、实现和测试工作;

⑤ 在不熟悉或未经检验的软件和硬件环境中运行所产生的未预料到的问题;

⑥ 开发一种全新的模块将比预期花费更长的时间;

⑦ 依赖正在开发中的技术将延长计划进度。

8、设计和实现风险

① 设计质量低下,导致重复设计;

② 一些必要的功能无法使用现有的代码和库实现,开发人员必须使用新的库或者自行开发新的功能;

③ 代码和库质量低下,导致需要进行额外的测试,修正错误,或重新制作;

④ 过高估计了增强型工具对计划进度的节省量;

⑤ 分别开发的模块无法有效集成,需要重新设计或制作。

9、过程风险

① 大量的纸面工作导致进程比预期的慢;

② 前期的质量保证行为不真实,导致后期的重复工作;

③ 太不正规(缺乏对软件开发策略和标准的遵循),导致沟通不足,质量欠佳,甚至需重新开发;

④ 过于正规(教条地坚持软件开发策略和标准),导致过多耗时于无用的工作;

⑤ 向管理层撰写进程报告占用开发人员的时间比预期的多;

风险管理粗心,导致未能发现重大的项目风险。

一.前言

大家好,许久没有更新博客了,最近从重庆来到了成都,换了个工作环境,前面都比较忙没有什么时间,这次趁着清明假期有时间,又可以分享一些知识给大家。在QQ群里有许多人都问过IdentityServer4怎么用Role(角色)来控制权限呢?还有关于Claim这个是什么呢?下面我带大家一起来揭开它的神秘面纱!

二.Claim详解

我们用过IdentityServer4或者熟悉ASP.NET Core认证的都应该知道有Claim这个东西,Claim我们通过在线翻译有以下解释:

(1)百度翻译

(2)谷歌翻译

这里我理解为声明,我们每个用户都有多个Claim,每个Claim声明了用户的某个信息比如:Role=Admin,UserID=1000等等,这里Role,UserID每个都是用户的Claim,都是表示用户信息的单元 ,我们不妨把它称为用户信息单元

建议阅读杨总的Claim相关的解析 http://www.cnblogs.com/savorboard/p/aspnetcore-identity.html

三.测试环境中添加角色Claim

这里我们使用IdentityServer4的QuickStart中的第二个Demo:ResourceOwnerPassword来进行演示(代码地址放在文末),所以项目的创建配置就不在这里演示了。

这里我们需要自定义IdentityServer4(后文简称id4)的验证逻辑,然后在验证完毕之后,将我们自己需要的Claim加入验证结果。便可以向API资源服务进行传递。id4定义了IResourceOwnerPasswordValidator接口,我们实现这个接口就行了。

Id4为我们提供了非常方便的In-Memory测试支持,那我们在In-Memory测试中是否可以实现自定义添加角色Claim呢,答案当时是可以的。

1.首先我们需要在定义TestUser测试用户时,定义用户Claims属性,意思就是为我们的测试用户添加额外的身份信息单元,这里我们添加角色身份信息单元:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
new TestUser
{
SubjectId = "1",
Username = "alice",
Password = "password",
Claims = new List<Claim>(){new Claim(JwtClaimTypes.Role,"superadmin") }
},
new TestUser
{
SubjectId = "2",
Username = "bob",
Password = "password",
Claims = new List<Claim>(){new Claim(JwtClaimTypes.Role,"admin") }
}

JwtClaimTypes是一个静态类在IdentityModel程序集下,里面定义了我们的jwt token的一些常用的Claim,JwtClaimTypes.Role是一个常量字符串public const string Role = "role";如果JwtClaimTypes定义的Claim类型没有我们需要的,那我们直接写字符串即可。

2.分别启动 QuickstartIdentityServer、Api、ResourceOwnerClient 查看 运行结果:

可以看见我们定义的API资源通过HttpContext.User.Claims并没有获取到我们为测试用户添加的Role Claim,那是因为我们为API资源做配置。

3.配置API资源需要的Claim

在QuickstartIdentityServer项目下的Config类的GetApiResources做出如下修改:

1
2
3
4
5
6
7
8
public static IEnumerable<ApiResource> GetApiResources()
{
return new List<ApiResource>
{

new ApiResource("api1", "My API",new List<string>(){JwtClaimTypes.Role})
};
}

我们添加了一个Role Claim,现在再次运行(需要重新QuickstartIdentityServer方可生效)查看结果。

可以看到,我们的API服务已经成功获取到了Role Claim。

这里有个疑问,为什么需要为APIResource配置Role Claim,我们的API Resource才能获取到呢,我们查看ApiResource的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public ApiResource(string name, string displayName, IEnumerable<string> claimTypes)
{
if (name.IsMissing()) throw new ArgumentNullException(nameof(name));

Name = name;
DisplayName = displayName;

Scopes.Add(new Scope(name, displayName));

if (!claimTypes.IsNullOrEmpty())
{
foreach (var type in claimTypes)
{
UserClaims.Add(type);
}
}
}

从上面的代码可以分析出,我们自定义的Claim添加到了一个名为UserClaims的属性中,查看这个属性:

1
2
3
4



public ICollection<string> UserClaims { get; set; } = new HashSet<string>();

根据注释我们便知道了原因:请求此资源时应包含的相关用户身份单元信息列表。

四.通过角色控制API访问权限

我们在API项目下的IdentityController做出如下更改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[Route("[controller]")]

public class IdentityController : ControllerBase
{
[Authorize(Roles = "superadmin")]
[HttpGet]
public IActionResult Get()
{
return new JsonResult(from c in HttpContext.User.Claims select new { c.Type, c.Value });
}

[Authorize(Roles = "admin")]
[Route("{id}")]
[HttpGet]
public string Get(int id)
{
return id.ToString();
}
}

我们定义了两个API通过Authorize特性赋予了不同的权限(我们的测试用户只添加了一个角色,通过访问具有不同角色的API来验证是否能通过角色来控制)

我们在ResourceOwnerClient项目下,Program类最后添加如下代码:

1
2
3
4
5
6
7
8
9
10
11
response = await client.GetAsync("http://localhost:5001/identity/1");
if (!response.IsSuccessStatusCode)
{
Console.WriteLine(response.StatusCode);
Console.WriteLine("没有权限访问 http://localhost:5001/identity/1");
}
else
{
var content = response.Content.ReadAsStringAsync().Result;
Console.WriteLine(content);
}

这里我们请求第二个API的代码,正常情况应该会没有权限访问的(我们使用的用户只具有superadmin角色,而第二个API需要admin角色),运行一下:

可以看到提示我们第二个,无权访问,正常。

五.如何使用已有用户数据自定义Claim

我们前面的过程都是使用的TestUser来进行测试的,那么我们正式使用时肯定是使用自己定义的用户(从数据库中获取),这里我们可以实现IResourceOwnerPasswordValidator接口,来定义我们自己的验证逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48



public class CustomResourceOwnerPasswordValidator: IResourceOwnerPasswordValidator
{





private readonly TestUserStore _users;
private readonly ISystemClock _clock;

public CustomResourceOwnerPasswordValidator(TestUserStore users, ISystemClock clock)
{
_users = users;
_clock = clock;
}






public Task ValidateAsync(ResourceOwnerPasswordValidationContext context)
{

if (_users.ValidateCredentials(context.UserName, context.Password))
{
var user = _users.FindByUsername(context.UserName);






context.Result = new GrantValidationResult(
user.SubjectId ?? throw new ArgumentException("Subject ID not set", nameof(user.SubjectId)),
OidcConstants.AuthenticationMethods.Password, _clock.UtcNow.UtcDateTime,
user.Claims);
}
else
{

context.Result = new GrantValidationResult(TokenRequestErrors.InvalidGrant, "invalid custom credential");
}
return Task.CompletedTask;
}

在Startup类里配置一下我们自定义的验证器:

实现了IResourceOwnerPasswordValidator还不够,我们还需要实现IProfileService接口,他是专门用来装载我们需要的Claim信息的,比如在token创建期间和请求用户信息终结点是会调用它的GetProfileDataAsync方法来根据请求需要的Claim类型,来为我们装载信息,下面是一个简单实现:

这里特别说明一下:本节讲的是“如何使用已有用户数据自定义Claim”,实现 IResourceOwnerPasswordValidator 是为了对接已有的用户数据,然后才是实现 IProfileService 以添加自定义 claim,这两步共同完成的是 “使用已有用户数据自定义Claim”,并不是自定义 Claim 就非得把两个都实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class CustomProfileService: IProfileService
{



protected readonly ILogger Logger;




protected readonly TestUserStore Users;






public CustomProfileService(TestUserStore users, ILogger<TestUserProfileService> logger)
{
Users = users;
Logger = logger;
}






public virtual Task GetProfileDataAsync(ProfileDataRequestContext context)
{
context.LogProfileRequest(Logger);


if (context.RequestedClaimTypes.Any())
{

var user = Users.FindBySubjectId(context.Subject.GetSubjectId());
if (user != null)
{


context.AddRequestedClaims(user.Claims);
}
}

context.LogIssuedClaims(Logger);

return Task.CompletedTask;
}






public virtual Task IsActiveAsync(IsActiveContext context)
{
Logger.LogDebug("IsActive called from: {caller}", context.Caller);

var user = Users.FindBySubjectId(context.Subject.GetSubjectId());
context.IsActive = user?.IsActive == true;

return Task.CompletedTask;
}

同样在Startup类里启用我们自定义的ProfileServiceAddProfileService<CustomProfileService>()

值得注意的是如果我们直接将用户的所有Claim加入 context.IssuedClaims集合,那么用户所有的Claim都将会无差别返回给请求方。比如默认情况下请求用户终结点(http://Identityserver4地址/connect/userinfo)只会返回sub(用户唯一标识)信息,如果我们在此处直接 context.IssuedClaims=User.Claims,那么所有Claim都将被返回,而不会根据请求的Claim来进行筛选,这样做虽然省事,但是损失了我们精确控制的能力,所以不推荐。

上述说明配图:

如果直接 context.IssuedClaims=User.Claims,那么返回结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
         




public virtual Task GetProfileDataAsync(ProfileDataRequestContext context)
{
var user = Users.FindBySubjectId(context.Subject.GetSubjectId());
if (user != null)
context.IssuedClaims .AddRange(user.Claims);

return Task.CompletedTask;
}

用户的所有Claim都将被返回。这样降低了我们控制的能力,我们可以通过下面的方法来实现同样的效果,但却不会丢失控制的能力。

(1).自定义身份资源资源

身份资源的说明:身份资源也是数据,如用户ID,姓名或用户的电子邮件地址。 身份资源具有唯一的名称,您可以为其分配任意身份信息单元(比如姓名、性别、身份证号和有效期等都是身份证的身份信息单元)类型。 这些身份信息单元将被包含在用户的身份标识(Id Token)中。 客户端将使用scope参数来请求访问身份资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static IEnumerable<IdentityResource> GetIdentityResourceResources()
{
var customProfile = new IdentityResource(
name: "custom.profile",
displayName: "Custom profile",
claimTypes: new[] { "role"});

return new List<IdentityResource>
{
new IdentityResources.OpenId(),
new IdentityResources.Profile(),
customProfile
};
}

(2).配置Scope
通过上面的代码,我们自定义了一个名为“customProfile“的身份资源,他包含了”role” Claim(可以包含多个Claim),然后我们还需要配置Scope,我们才能访问到:

1
2
3
4
5
6
7
8
9
10
11
12
new Client
{
ClientId = "ro.client",
AllowedGrantTypes = GrantTypes.ResourceOwnerPassword,

ClientSecrets =
{
new Secret("secret".Sha256())
},
AllowedScopes = { "api1" ,IdentityServerConstants.StandardScopes.OpenId,
IdentityServerConstants.StandardScopes.Profile,"custom.profile"}
}

我们在Client对象的AllowedScopes属性里加入了我们刚刚定义的身份资源,下载访问用户信息终结点将会得到和上面一样的结果。

六. Client Claims

新增于2018.12.14

在定义 Client 资源的时候发现,Client也有一个Claims属性,根据注释得知,在此属性上设置的值将会被直接添加到AccessToken,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
new Client
{
ClientId = "client",
AllowedGrantTypes = GrantTypes.ClientCredentials,

ClientSecrets =
{
new Secret("secret".Sha256())
},
AllowedScopes =
{
"api1", IdentityServerConstants.StandardScopes.OpenId,
IdentityServerConstants.StandardScopes.Profile
},
Claims = new List<Claim>
{
new Claim(JwtClaimTypes.Role, "admin")
}
};

只用在客户端资源这里设置就行,其他地方不用设置,然后请求AccessToken就会被带入。

值得注意的是Client这里设置的Claims默认都会被带一个client_前缀。如果像前文一样使用 [Authorize(Roles =”admin”)] 是行的,因为 [Authorize(Roles =”admin”)] 使用的Claim是role而不是client_role

七.总结

写这篇文章,简单分析了一下相关的源码,如果因为有本文描述不清楚或者不明白的地方建议阅读一下源码,或者加下方QQ群在群内提问。如果我们的根据角色的权限认证没有生效,请检查是否正确获取到了角色的用户信息单元。我们需要接入已有用户体系,只需实现IProfileServiceIResourceOwnerPasswordValidator接口即可,并且在Startup配置Service时不再需要AddTestUsers,因为将使用我们自己的用户信息。

Demo地址:https://github.com/stulzq/IdentityServer4.Samples/tree/master/Practice/01_RoleAndClaim