0%

前言#

上篇介绍了go-grpc-middlewaregrpc_zapgrpc_authgrpc_recovery使用,本篇将介绍grpc_validator,它可以对gRPC数据的输入和输出进行验证。

创建proto文件,添加验证规则#

这里使用第三方插件go-proto-validators自动生成验证规则。

go get github.com/mwitkow/go-proto-validators

1.新建simple.proto文件

Copy

``syntax = “proto3”;

package proto;

import “github.com/mwitkow/go-proto-validators/validator.proto”;

message InnerMessage {
// some_integer can only be in range (1, 100).
int32 some_integer = 1 [(validator.field) = {int_gt: 0, int_lt: 100}];
// some_float can only be in range (0;1).
double some_float = 2 [(validator.field) = {float_gte: 0, float_lte: 1}];
}

message OuterMessage {
// important_string must be a lowercase alpha-numeric of 5 to 30 characters (RE2 syntax).
string important_string = 1 [(validator.field) = {regex: “^[a-z]{2,5}$”}];
// proto3 doesn’t have required, the msg_exist enforces presence of InnerMessage.
InnerMessage inner = 2 [(validator.field) = {msg_exists : true}];
}

service Simple{
rpc Route (InnerMessage) returns (OuterMessage){};
}``

代码import "github.com/mwitkow/go-proto-validators/validator.proto",文件validator.proto需要import "google/protobuf/descriptor.proto";包,不然会报错。

google/protobuf地址:https://github.com/protocolbuffers/protobuf/tree/master/src/google/protobuf/descriptor.proto

src文件夹中的protobuf目录下载到GOPATH目录下。

2.编译simple.proto文件

go get github.com/mwitkow/go-proto-validators/protoc-gen-govalidators

指令编译:protoc --govalidators_out=. --go_out=plugins=grpc:./ ./simple.proto

或者使用VSCode-proto3插件,第一篇有介绍。只需要添加"--govalidators_out=."即可。

Copy

`”protoc”: {

    "path": "C:\\Go\\bin\\protoc.exe",
    
    "compile_on_save": true,
    "options": [
        
        "--go_out=plugins=grpc:.",
        "--govalidators_out=."
    ]
},` 

编译完成后,自动生成simple.pb.gosimple.validator.pb.go文件,simple.pb.go文件不再介绍,我们看下simple.validator.pb.go文件。

Copy

``package proto

import (
fmt “fmt”
math “math”
proto “github.com/golang/protobuf/proto”
_ “github.com/mwitkow/go-proto-validators”
regexp “regexp”
github_com_mwitkow_go_proto_validators “github.com/mwitkow/go-proto-validators”
)

var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

func (this *InnerMessage) Validate() error {
if !(this.SomeInteger > 0) {
return github_com_mwitkow_go_proto_validators.FieldError(“SomeInteger”, fmt.Errorf(value '%v' must be greater than '0', this.SomeInteger))
}
if !(this.SomeInteger < 100) {
return github_com_mwitkow_go_proto_validators.FieldError(“SomeInteger”, fmt.Errorf(value '%v' must be less than '100', this.SomeInteger))
}
if !(this.SomeFloat >= 0) {
return github_com_mwitkow_go_proto_validators.FieldError(“SomeFloat”, fmt.Errorf(value '%v' must be greater than or equal to '0', this.SomeFloat))
}
if !(this.SomeFloat <= 1) {
return github_com_mwitkow_go_proto_validators.FieldError(“SomeFloat”, fmt.Errorf(value '%v' must be lower than or equal to '1', this.SomeFloat))
}
return nil
}

var _regex_OuterMessage_ImportantString = regexp.MustCompile(^[a-z]{2,5}$)

func (this *OuterMessage) Validate() error {
if !_regex_OuterMessage_ImportantString.MatchString(this.ImportantString) {
return github_com_mwitkow_go_proto_validators.FieldError(“ImportantString”, fmt.Errorf(value '%v' must be a string conforming to regex "^[a-z]{2,5}$", this.ImportantString))
}
if nil == this.Inner {
return github_com_mwitkow_go_proto_validators.FieldError(“Inner”, fmt.Errorf(“message must exist”))
}
if this.Inner != nil {
if err := github_com_mwitkow_go_proto_validators.CallValidatorIfExists(this.Inner); err != nil {
return github_com_mwitkow_go_proto_validators.FieldError(“Inner”, err)
}
}
return nil
}``

里面自动生成了message中属性的验证规则。

grpc_validator验证拦截器添加到服务端#

Copy

grpcServer := grpc.NewServer(cred.TLSInterceptor(), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_validator.StreamServerInterceptor(), grpc_auth.StreamServerInterceptor(auth.AuthInterceptor), grpc_zap.StreamServerInterceptor(zap.ZapInterceptor()), grpc_recovery.StreamServerInterceptor(recovery.RecoveryInterceptor()), )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_validator.UnaryServerInterceptor(), grpc_auth.UnaryServerInterceptor(auth.AuthInterceptor), grpc_zap.UnaryServerInterceptor(zap.ZapInterceptor()), grpc_recovery.UnaryServerInterceptor(recovery.RecoveryInterceptor()), )), )

运行后,当输入数据验证失败后,会有以下错误返回

Copy

Call Route err: rpc error: code = InvalidArgument desc = invalid field SomeInteger: value '101' must be less than '100'

其他类型验证规则设置#

enum验证

Copy

`syntax = “proto3”;
package proto;
import “github.com/mwitkow/go-proto-validators/validator.proto”;

message SomeMsg {
Action do = 1 [(validator.field) = {is_in_enum : true}];
}

enum Action {
ALLOW = 0;
DENY = 1;
CHILL = 2;
}`

UUID验证

Copy

`syntax = “proto3”;
package proto;
import “github.com/mwitkow/go-proto-validators/validator.proto”;

message UUIDMsg {
// user_id must be a valid version 4 UUID.
string user_id = 1 [(validator.field) = {uuid_ver: 4, string_not_empty: true}];
}`

总结#

go-grpc-middlewaregrpc_validator集成go-proto-validators,我们只需要在编写proto时设好验证规则,并把grpc_validator添加到gRPC服务端,就能完成gRPC的数据验证,很简单也很方便。

教程源码地址:https://github.com/Bingjian-Zhu/go-grpc-example

前言#

gRPC默认的请求的超时时间是很长的,当你没有设置请求超时时间时,所有在运行的请求都占用大量资源且可能运行很长的时间,导致服务资源损耗过高,使得后来的请求响应过慢,甚至会引起整个进程崩溃。

为了避免这种情况,我们的服务应该设置超时时间。前面的入门教程提到,当客户端发起请求时候,需要传入上下文context.Context,用于结束超时取消的请求。

本篇以简单RPC为例,介绍如何设置gRPC请求的超时时间。

客户端请求设置超时时间#

修改调用服务端方法

1.把超时时间设置为当前时间+3秒

Copy

clientDeadline := time.Now().Add(time.Duration(3 * time.Second)) ctx, cancel := context.WithDeadline(ctx, clientDeadline) defer cancel()

2.响应错误检测中添加超时检测

Copy

`res, err := grpcClient.Route(ctx, &req)
if err != nil {

    statu, ok := status.FromError(err)
    if ok {
        
        if statu.Code() == codes.DeadlineExceeded {
            log.Fatalln("Route timeout!")
        }
    }
    log.Fatalf("Call Route err: %v", err)
}

log.Println(res.Value)` 

完整的client.go代码

服务端判断请求是否超时#

当请求超时后,服务端应该停止正在进行的操作,避免资源浪费。

Copy

`func (s *SimpleService) Route(ctx context.Context, req *pb.SimpleRequest) (*pb.SimpleResponse, error) {
data := make(chan *pb.SimpleResponse, 1)
go handle(ctx, req, data)
select {
case res := <-data:
return res, nil
case <-ctx.Done():
return nil, status.Errorf(codes.Canceled, “Client cancelled, abandoning.”)
}
}

func handle(ctx context.Context, req *pb.SimpleRequest, data chan<- *pb.SimpleResponse) {
select {
case <-ctx.Done():
log.Println(ctx.Err())
runtime.Goexit()
case <-time.After(4 * time.Second):
res := pb.SimpleResponse{
Code: 200,
Value: “hello “ + req.Data,
}

    data <- &res
}

}`

一般地,在写库前进行超时检测,发现超时就停止工作。

完整server.go代码

运行结果#

服务端:

Copy

:8000 net.Listing... goroutine still running

客户端:

Copy

Route timeout!

总结#

超时时间的长短需要根据自身服务而定,例如返回一个hello grpc,可能只需要几十毫秒,然而处理大量数据的同步操作则可能要很长时间。需要考虑多方面因素来决定这个超时时间,例如系统间端到端的延时,哪些RPC是串行的,哪些是可以并行的等等。

教程源码地址:https://github.com/Bingjian-Zhu/go-grpc-example
参考:https://grpc.io/blog/deadlines/

Golang logrus日志包实战及日志切割的实现_logru 多进程日志拆分-CSDN博客

Excerpt

文章浏览阅读1.7k次,点赞2次,收藏2次。本文主要介绍 Golang 中…_logru 多进程日志拆分


本文主要介绍 Golang 中最佳日志解决方案,包括常用日志包logrus 的基本使用,如何结合file-rotatelogs 包实现日志文件的轮转切割两大话题。

Golang 关于日志处理有很多包可以使用,标准库提供的 log 包功能比较少,不支持日志级别的精确控制,自定义添加日志字段等。在众多的日志包中,更推荐使用第三方的 logrus 包,完全兼容自带的 log 包。logrus 是目前 Github 上 star 数量最多的日志库,logrus 功能强大,性能高效,而且具有高度灵活性,提供了自定义插件的功能。

很多开源项目,如 docker,prometheus,dejavuzhou/ginbro 等,都是用了 logrus 来记录其日志。

logrus 特性

  • 完全兼容 golang 标准库日志模块:logrus 拥有六种日志级别:debug、info、warn、error、fatal 和 panic,这是 golang 标准库日志模块的 API 的超集。
  • logrus.Debug(“Useful debugging information.”)
  • logrus.Info(“Something noteworthy happened!”)
  • logrus.Warn(“You should probably take a look at this.”)
  • logrus.Error(“Something failed but I’m not quitting.”)
  • logrus.Fatal(“Bye.”) //log之后会调用os.Exit(1)
  • logrus.Panic(“I’m bailing.”) //log之后会panic()
  • 可扩展的 Hook 机制:允许使用者通过 hook 的方式将日志分发到任意地方,如本地文件系统、标准输出、logstash、elasticsearch 或者 mq 等,或者通过 hook 定义日志内容和格式等。
  • 可选的日志输出格式:logrus 内置了两种日志格式,JSONFormatter 和 TextFormatter,如果这两个格式不满足需求,可以自己动手实现接口 Formatter 接口来定义自己的日志格式。
  • Field 机制:logrus 鼓励通过 Field 机制进行精细化的、结构化的日志记录,而不是通过冗长的消息来记录日志。
  • logrus 是一个可插拔的、结构化的日志框架。
  • Entry: logrus.WithFields 会自动返回一个 *Entry,Entry里面的有些变量会被自动加上
  • time:entry被创建时的时间戳
  • msg:在调用.Info()等方法时被添加
  • level,当前日志级别

logrus 基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (

"os"

"github.com/sirupsen/logrus"

  log "github.com/sirupsen/logrus"

)

var logger *logrus.Entry

func init() {

  log.SetFormatter(&log.JSONFormatter{})

  log.SetOutput(os.Stdout)

  log.SetLevel(log.InfoLevel)

  logger = log.WithFields(log.Fields{"request_id": "123444", "user_ip": "127.0.0.1"})

}

func main() {

  logger.Info("hello, logrus....")

  logger.Info("hello, logrus1....")

}

基于 logrus 和 file-rotatelogs 包实现日志切割

很多时候应用会将日志输出到文件系统,对于访问量大的应用来说日志的自动轮转切割管理是个很重要的问题,如果应用不能妥善处理日志管理,那么会带来很多不必要的维护开销:外部工具切割日志、人工清理日志等手段确保不会将磁盘打满。

file-rotatelogs: When you integrate this to to you app, it automatically write to logs that are rotated from within the app: No more disk-full alerts because you forgot to setup logrotate!

logrus 本身不支持日志轮转切割功能,需要配合 file-rotatelogs 包来实现,防止日志打满磁盘。file-rotatelogs 实现了 io.Writer 接口,并且提供了文件的切割功能,其实例可以作为 logrus 的目标输出,两者能无缝集成,这也是 file-rotatelogs 的设计初衷:

It’s normally expected that this library is used with some other logging service, such as the built-in log library, or loggers such as github.com/lestrrat-go/apache-logformat.

示例代码:

应用日志文件 /Users/opensource/test/go.log,每隔 1 分钟轮转一个新文件,保留最近 3 分钟的日志文件,多余的自动清理掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (

"time"

 rotatelogs "github.com/lestrrat-go/file-rotatelogs"

 log "github.com/sirupsen/logrus"

)

func init() {

 path := "/Users/opensource/test/go.log"

 writer, _ := rotatelogs.New(

 path+".%Y%m%d%H%M",

 rotatelogs.WithLinkName(path),

 rotatelogs.WithMaxAge(time.Duration(180)*time.Second),

 rotatelogs.WithRotationTime(time.Duration(60)*time.Second),

 )

 log.SetOutput(writer)

}

func main() {

for {

 log.Info("hello, world!")

 time.Sleep(time.Duration(2) * time.Second)

 }

}

Golang 标准日志库 log 使用

虽然 Golang 标准日志库功能少,但是可以选择性的了解下,下面为基本使用的代码示例,比较简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package main

import (

"fmt"

"log"

)

func init() {

  log.SetPrefix("【UserCenter】")             

  log.SetFlags(log.LstdFlags | log.Lshortfile | log.LUTC)

}

func main() {

  log.Println("log...")

  log.Fatalln("Fatal Error...")

  fmt.Println("Not print!")

}

自定义日志输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package main

import (

"io"

"log"

"os"

)

var (

  Info  *log.Logger

  Warning *log.Logger

  Error  *log.Logger

)

func init() {

  errFile, err := os.OpenFile("errors.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)

if err != nil {

    log.Fatalln("打开日志文件失败:", err)

  }

  Info = log.New(os.Stdout, "Info:", log.Ldate|log.Ltime|log.Lshortfile)

  Warning = log.New(os.Stdout, "Warning:", log.Ldate|log.Ltime|log.Lshortfile)

  Error = log.New(io.MultiWriter(os.Stderr, errFile), "Error:", log.Ldate|log.Ltime|log.Lshortfile)

}

func main() {

  Info.Println("Info log...")

  Warning.Printf("Warning log...")

  Error.Println("Error log...")

}

Go语言的优劣,这里就不介绍了,下面直接讲Go 的安装:

Go 的官方网站http://golang.org/(需要翻墙软件)

国内下载地址http://www.golangtc.com/download

下载对应平台的安装包。注意区分32位还是64位操作系统。

安装包下载完成之后,安装过程很简单,傻瓜式下一步到底就好了。

安装go 的时候,安装程序会自动把相关目录写到系统环境。但是如果是zip 的安装,需要自己手动添加。

主要配置以下几个:

  • GOROOT:Go 安装后的根目录(例如:D:\Go),安装过程中会由安装程序自动写入系统环境变量中。
  • GOBIN:Go 的二进制文件存放目录(%GOROOT%\bin)
  • PATH:需要将 %GOBIN% 加在 PATH 变量的最后,方便在命令行下运行。

当环境变量都配置完成之后,Go 就已经安装完毕了。打开命令行,运行 go 命令,就可以看到如下的提示了。

GOPATH : Go 的工作空间,就是我们的开发和依赖包的目录(例如:我的是 D:\Go_Path\go) ,此目录需要手动配置到系统环境变量

GOPATH 工作空间是一个目录层次结构,其根目录包含三个子目录:

  • src:包含 Go 源文件,注意:你自己创建依赖的package,也要放到GOPATH 目录下,这样才能够被引用到。
  • pkg:包含包对象,编译好的库文件
  • bin:包含可执行命令

注意:

1. 需要将GOPATH 路径,手动写入到系统环境变量。

2. 不要把 GOPATH 设置成 Go 的安装路径

3. 你自己创建依赖的package,也要放到GOPATH 目录下,这样才能够被引用到。

配置好之后,通过 go env 命令来查看go环境是否配置正确:

现在,一起来 Hello World 吧!

复制代码

package main
import ( “fmt” )

func main() {
fmt.Println(“Hello World!”)
}

复制代码

将上面的程序保存成 helloworld.go,然后在命令行中执行:

1. IDE 的下载安装这里就不说,大家直接去这个地址下载就行。

  Goland:https://www.jetbrains.com/go/download/#section=windows

  LiteIDE: https://studygolang.com/dl 这个是最新的1.10.3,免费的IDE 

2. 我用的是Atom 编辑器。配置有点麻烦,不建议大家使用。

3. 后面直接讲Go语言的如何使用。

4. 能翻的最好翻墙,因为很多package 在golang官网,不翻墙下载不下来。

背景

在之前的文章《漫谈微服务》我已经简单的介绍过微服务,微服务特性是轻量级跨平台和跨语言的服务,也列举了比较了集中微服务通信的手段的利弊,本文将通过RPC通信的方式实现一个增删查Redis的轻量级微服务示例,大部分内容翻译自文章《Microservice in golang, using Redis and gRPC》,中间加上自己的实践和理解。

实验环境

Mac OS

go version go1.12.4 darwin/amd64

Docker version 18.09.2, build 6247962

代码仓库

https://github.com/felipeinfantino/microservice-golang

微服务实践

gRPC代码生成

选用gRPC的原因是因为gRPC本身是一款开源且高性能的RPC框架,支持跨平台,支持golang,java,c,C++ 等10多种编程语言。因为我们要实现一个通过gRPC通信的基于Redis 数据库的增删改微服务,所以我们首先需要定义一个gRPC的通信描述文件server.proto:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

syntax = "proto3"``;

package proto;

message SetRequest{

string key = 1;

string value = 2;

}

message GetRequest{

string key = 1;

}

message DeleteRequest{

string key = 1;

}

message ServerResponse{

bool success = 1;

string value = 2;

string error = 3;

}

service BasicService{

rpc Set(SetRequest) returns (ServerResponse);

rpc Get(GetRequest) returns (ServerResponse);

rpc Delete(DeleteRequest) returns (ServerResponse);

}

想要将上面的server.proto文件转换为golang代码需要安装protocol buffer的编译器:

1.下载https://github.com/protocolbuffers/protobuf/releases/tag/v3.11.0中的protoc-3.11.0-osx-x86_64.zip包。

2.解压拷贝里面的二进制protoc及google子目录到该示例工程目录下。

3.通过上面定义的server.proto 生成golang代码,可以看到proto目录下生成了service.pb.go文件。

1

./protoc --proto_path=proto --proto_path=google --go_out=plugins=grpc:proto service.proto

如果还不行可以参考protocol buffer官方安装详细步骤 https://github.com/protocolbuffers/protobuf

工程代码目录结构如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

.

├── Dockerfile

├── README.md

├── database

│   ├── database.model.go

│   ├── errors.go

│   └── redis.go

├── docker-compose.yml

├── go.mod

├── go.``sum

├── google

│   └── protobuf

│       ├── any.proto

│       ├── api.proto

│       ├── compiler

│       │   └── plugin.proto

│       ├── descriptor.proto

│       ├── duration.proto

│       ├── empty.proto

│       ├── field_mask.proto

│       ├── source_context.proto

│       ├── struct.proto

│       ├── timestamp.proto

│       ├── type``.proto

│       └── wrappers.proto

├── main

├── main.go

├── proto

│   ├── service.pb.go

│   └── service.proto

└── protoc

Redis服务

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

package database

type Database interface {

Set(key string, value string) (string, error)

Get(key string) (string, error)

Delete(key string) (string, error)

}

func Factory(databaseName string) (Database, error) {

switch databaseName {

case "redis"``:

return createRedisDatabase()

default``:

return nil, &NotImplementedDatabaseError{databaseName}

}

}

定义了一个Database的接口,里面含有增删查三种方法,只要实现了这三种方法的数据库都可以作为该微服务的数据库,所以提供一个工厂函数供用户后续扩展,目前只实现了Redis一种存储。Redis的实现直接引用了开源第三方Redis操作用github/go-redis/redis 然后封装了上面增删查三种方法。这里就不展开讲redis实现了。

主程序

然后就是我们的golang主程序,程序逻辑为开启gRPC服务端,提供增删查三个接口及响应。代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

func main() {

listener, err := net.Listen(``"tcp"``, ":4040"``)

if err != nil {

panic(err)

}

srv := grpc.NewServer()

databaseImplementation := os.Args[1]

db, err = database.Factory(databaseImplementation)

if err != nil {

panic(err)

}

proto.RegisterBasicServiceServer(srv, &server{})

fmt.Println(``"Prepare to serve"``)

if e := srv.Serve(listener); e != nil {

panic(err)

}

}

func (s *server) Set(ctx context.Context, in *proto.SetRequest) (*proto.ServerResponse, error) {

value, err := db.Set(in.GetKey(), in.GetValue())

return generateResponse(value, err)

}

func (s *server) Get(ctx context.Context, in *proto.GetRequest) (*proto.ServerResponse, error) {

value, err := db.Get(in.GetKey())

return generateResponse(value, err)

}

func (s *server) Delete(ctx context.Context, in *proto.DeleteRequest) (*proto.ServerResponse, error) {

value, err := db.Delete(in.GetKey())

return generateResponse(value, err)

}

func generateResponse(value string, err error) (*proto.ServerResponse, error) {

if err != nil {

return &proto.ServerResponse{Success: false, Value: value, Error: err.Error()}, nil

}

return &proto.ServerResponse{Success: true, Value: value, Error: ""``}, nil

}

启动服务

为了测试方便用docker-compose定义了我们的微服务,对docker-compose不太熟悉的朋友可以简单的看下我之前写的《利用Docker Compose快速搭建本地测试环境》这篇文章。该服务的docker-compose.yaml内容如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

version: "3.7"

services:

server:

build: .

ports:

- "4040:4040"

depends_on:

- redis

redis:

container_name: redis_container

image: redis

可以看出我们通过暴露4040端口提供我们的服务,服务依赖于redis,所以redis服务会在我们服务之前以容器的方式被拉起来。微服务的启动命令可以从Dockerfile中获取:

1

2

3

4

5

6

FROM golang:latest

RUN mkdir /app

ADD . /app/

WORKDIR /app

EXPOSE 4040

CMD ["go", "run", "main.go", "redis"]

拉起服务:  

1

2

3

4

5

6

7

8

9

10

11

12

13

14

docker-compose up

Starting redis_container ... done

Starting microservice-golang_server_1 ... done

Attaching to redis_container, microservice-golang_server_1

redis_container | 1:C 22 Dec 2019 13:11:10.761 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo

redis_container | 1:C 22 Dec 2019 13:11:10.761 # Redis version=5.0.7, bits=64, commit=00000000, modified=0, pid=1, just started

redis_container | 1:C 22 Dec 2019 13:11:10.761 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf

redis_container | 1:M 22 Dec 2019 13:11:10.763 * Running mode=standalone, port=6379.

redis_container | 1:M 22 Dec 2019 13:11:10.763 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.

redis_container | 1:M 22 Dec 2019 13:11:10.763 # Server initialized

redis_container | 1:M 22 Dec 2019 13:11:10.763 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.

redis_container | 1:M 22 Dec 2019 13:11:10.764 * DB loaded from disk: 0.000 seconds

redis_container | 1:M 22 Dec 2019 13:11:10.764 * Ready to accept connections

server_1  | Prepare to serve

通过docker ps看到启动了两个容器,一个是redis,一个是我们的主程序:

测试

测试的客户端用的gRPC的图形化工具BloomRPC,安装方法比较简单:brew cask install bloomrpc

然后导入我们的gRPC定义文件server.proto就能点击测试:

总结

本文从工程实践的角度带读者实现了一个通过gRPC通信的增删查Redis的微服务,希望对读者有所启发。

本文章是介绍和记录如何创建GraphQL项目,以及如何使用GraphQL进行数据的相关操作。项目参照GraphQL .Net 的官方文档进行实践

一、项目结构:

  为了更好的和原有的项目结合在一起,尽可能减少对原项目的修改。我对项目结构做了如下分层。

二、项目结构分层说明

  Contracts层: 项目的接口层,重点存放项目的一些接口。和原项目的分层结构的Contracts一致

  Entities层: 实体模型层,存放实体模型。与原有项目的分层结构Entites层一致

  GraphQLDemo: 是使用Console控制台应用程序对GraphQL的调用实例

  GraphQLs: 使用GraphQL 的模型定义和查询、变更等操作的定义

  Services: 提供服务的具体实现层,和原有项目分层中的Services 层一致

  Tests: 使用Unit Test 测试调用GraphQL

在这里重点关注 标红的部分的介绍

三、GraphQLs项目介绍:

  GraphQLs重点是存储项目的GraphQL操作相关的内容

  1.在项目解决方案中,新建程序集,命名为GraphQLs

  2. 安装Graphql

  3.创建GraphQL 的相关概念

  GraphQL有两种方式创建Schema,

    • 一种是使用Schema First,也就是使用GraphQL Schema Language创建Schema. 可以对比EntityFramework的DB First
    • 一种是使用Graph Type定义Schema,可以对比EntityFramework 的Code First

  在这里适用Code First定义数据模型,可以与原有的数据服务应用一起使用。可分为以下步骤:

  1)定义数据模型:

  假设原有的数据模型Book的结构是这样的:

复制代码

复制代码

public class User
{
    public int Id { get; set; }

    public string Name { get; set; }

    public int Age { get; set; }

    public string Gender { get; set; }
}

复制代码

复制代码

  那么定义对应的GraphQL的数据模型可以是这样的:

复制代码

复制代码

public class UserType:ObjectGraphType<User>// 继承自ObjectGraphType,并传递范型User
{
    public UserType()// 在构造函数中,对属性作影射
    {
        Name = "User";

        Field(x => x.Id);
        Field(x => x.Name);
        Field(x => x.Age);
        Field(x => x.Gender);
    }
}

复制代码

复制代码

  2)定义操作模型: 

  GraphQL的操作分为: Query(Select), Mutation(Create,Update,Delete),Subscription(订阅)

  • 定义Query操作

复制代码

复制代码

public class Query : ObjectGraphType// 定义Query
{
    private IWrapper wrapper = new Wrapper();
    IEnumerable<User> users = null;
    public Query()
    {
        Field<ListGraphType<UserType>>(//在构造函数中定义查询操作
            name: "users", //注意这个名字,后边查询的时候需要对应
            arguments: new QueryArguments //定义查询参数
            {
                new QueryArgument<StringGraphType>
                {
                    Name = "name",
                    Description = "The name for the user"
                },
                new QueryArgument<IntGraphType>
                {
                    Name = "age",
                    Description = "The age for the user"
                },
                new QueryArgument<StringGraphType>
                {
                    Name = "gender",
                    Description = "The gender for user"
                }
            },
            resolve: context =>// 定义查询操作的执行
            {
                var usercontext = context.UserContext;// 获取上下文,可在此作用户验证操作
                users = wrapper.User.Find(u => true);
                var name = context.GetArgument<string>("name");
                users = users.Where(u => name == null || u.Name == name);
                var age = context.GetArgument<int?>("age");
                users = users.Where(u => age == null || u.Age == age);
                var gender = context.GetArgument<string>("gender");
                users = users.Where(u => gender == null || u.Gender == gender);
                return users;
            });  

    }
}

复制代码

复制代码

  • 定义Mutation操作

复制代码

复制代码

public class Mutation:ObjectGraphType
{
    private IWrapper wrapper = new Wrapper();
    IEnumerable<User> users = null;
    public Mutation()
    {
        Field<UserType>(
            name: "createUser",
            arguments: new QueryArguments(
                new QueryArgument<NonNullGraphType<UserInputType>>
                {
                    Name = "user"
                }
            ),
            resolve: context =>
            {
                var user = context.GetArgument<User>("user");
                return wrapper.User.Add(user);
            }
        );
    }
}

复制代码

复制代码

  3. 定义GraphSchema

  定义GraphSchema就是定义Schema的Query、Mutation、Subscription操作

复制代码

复制代码

public class GraphSchema:Schema
{
    public GraphSchema()
    {
        Query = new Query();
        Mutation = new Mutation();
    }
}

复制代码

复制代码

  4. 附.

  为了检验查询、修改操作,这里定义一个GraphQLQuery来定义操作,并定义一个查询操作类

复制代码

复制代码

public class GraphQLQuery
{
    public string OperationName { get; set; }
    public string NamedQuery { get; set; }
    public string Query { get; set; }

    public object UserContext { get; set; }
    public JObject Variables { get; set; }
}

复制代码

复制代码

复制代码

复制代码

public class ActionExecute
{
private IDocumentExecuter executer;
private IDocumentWriter writer;
private ISchema schema;

    public ActionExecute()
    {
        executer = new DocumentExecuter();
        writer = new DocumentWriter();
        schema = new GraphSchema();
    }

    public async Task<ExecutionResult> ExecuteAction(GraphQLQuery query)
    {
        var result = await executer.ExecuteAsync(\_ =>
        {
            \_.Schema = schema;
            \_.Query = query.Query;
            \_.Inputs = query.Variables.ToInputs();// 查询变量的输入
            \_.OperationName = query.OperationName;// 操作名称
            \_.UserContext = query.UserContext;// 添加用户上下文对象
            \_.ValidationRules = DocumentValidator.CoreRules(); // 添加自定义查询验证 逻辑 
            \_.ExposeExceptions = true;// 是否追踪错误
            \_.FieldMiddleware.Use<ErrorHandlerMiddleware>(); // 使用中间件
            \_.EnableMetrics = true;// 是否使用查询度量

            \_.ComplexityConfiguration = new ComplexityConfiguration // 防止恶意查询
            {
                MaxComplexity = 12,
                MaxDepth = 15 // 允许查询总最大嵌套数
            };
        });
        return result;
    }

    public async Task<string> Execute(GraphQLQuery query)
    {
        var result = await ExecuteAction(query).ConfigureAwait(false);

        var json = await writer.WriteToStringAsync(result);

        return json;
    }
}

复制代码

复制代码

四、 测试和检验

  一切准备就绪,下边对创建的GraphQL进行测试

  1. 查询测试:

复制代码

复制代码

public class QueryTest
{
    private ActionExecute execute = new ActionExecute();
    \[Fact\]
    public void TestMethod1()
    {
        Assert.True(1 == 1);
    }
    \[Theory\]
    \[InlineData(16, "Male")\]
    \[InlineData(18, "FeMale")\]
    public async void QueryUsers(int age, string gender)
    {
        var queryStr = @"{users(age:" + age + ",gender:" + "\\"" + gender + "\\"" + "){id name gender age}}";
        var result = await execute.ExecuteAction(new GraphQLQuery { Query = queryStr,UserContext= "Add Role" });
        var data = result.Data;
        Assert.Null(result.Errors?.Count);
    }
}

复制代码

复制代码

  为了检验GraphQL的查询优越性,你可以修改一下queryStr=@”{users{id name gender age}}”; 或queryStr=@”{users{gender age}}”;queryStr=@”{users{ name age}}”;注意这里的@和{}只是C# 对字符串操作的一种方式。

  发现了什么?

  如果我们在前端(Web、微信小程序、手机APP),在web端,作为后台管理系统,我可能需要获取用户的所有信息,那么我可能需要使用queryStr=@”{users{id name gender age}}”。在微信小程序端,我只要根据用户的id查询用户名字就可以了,那么我只用变动查询语句:queryStr=@”{users(id){ name}}”;

  意味着什么?

  意味着我们只需要提供一个API接口,该端口接受传递的查询字符串就可以了。所有的实体都可以只用这一个接口了。想查询什么,由前端决定了,再也不需要追着后端接口开发工程师要数据了。我想这样以来,前端和后端只需要一个接口沟通,会比REST API来的更方便了。

2.变更测试:

复制代码

复制代码

public class MutationTest
{
    private ActionExecute execute = new ActionExecute();

    \[Theory\]
    \[InlineData(16, "Test1")\]
    \[InlineData(18, "Test2")\]
    public async void CreateUser(int age, string name)
    {
        var queryStr = @"{query: mutation ($user: UserInput!){createUser(user:$user){id name age}},variables:{user:{name: " + name + @",age:" + age + @"}}}";

        var query = new GraphQLQuery
        {
            Query = "mutation ($user: UserInput!){createUser(user:$user){id name age}}",
            Variables = JObject.Parse("{user:{\\"name\\": \\"" + name + "\\",\\"age\\":" + age + "}}")
        };
        var result = await execute.ExecuteAction(query);
        Assert.Null(result.Errors.Count);
    }
}

复制代码

复制代码

  发现了什么?

  同样的。我们只需要传递查询的参数,传递对应的参数Variables 就能完成修改动作。同时,该变更和查询的操作字符串语句很像,只是多了一个mutation。

五、后续

  这篇文章只是介绍了使用控制台和UnitTest测试使用了GraphQL,后续会更新在Asp.Net Core MVC 中使用GraphQL,也可以学习杨旭的文章。很好的博主https://www.cnblogs.com/cgzl/p/9691323.html

Nginx本身是一个非常出色的HTTP服务器,FFMPEG是非常好的音视频解决方案.这两个东西通过一个nginx的模块nginx-rtmp-module,组合在一起即可以搭建一个功能相对比较完善的流媒体服务器.

这个流媒体服务器可以支持RTMP和HLS(Live Http Stream)

从安装开始

Nginx的安装参照我之前的这个: http://blog.csdn.net/redstarofsleep/article/details/45092127

在configure的时候需要增加nginx-rtmp-module的支持,下载好nginx-rtmp-module后解压,然后nginx安装时增加这个模块(–add-module),其它都是一样的.

./configure –prefix=/usr/local/nginx –with-pcre=/home/user/pcre/pcre-8.32 –with-zlib=/home/user/zlib/zlib-1.2.8 –with-openssl=/home/user/openssl/openssl-1.0.1i –add-module=/home/user/nginx-rtmp-module

## Mac 系统可以使用Homebrew, Homebrew是以最简单,最灵活的方式来安装苹果公司在MacOS中不包含的UNIX工具

FFMPEG的安装

ubuntu 安装: http://blog.csdn.net/redstarofsleep/article/details/45092145

Liunx下yum 安装: http://www.cnblogs.com/dennisit/archive/2012/12/27/2835089.html

nginx配合ffmpeg做流媒体服务器的原理是: nginx通过rtmp模块提供rtmp服务, ffmpeg推送一个rtmp流到nginx,然后客户端通过访问nginx来收看实时视频流. HLS也是差不多的原理,只是最终客户端是通过HTTP协议来访问的,但是ffmpeg推送流仍然是rtmp的。

 在整个 http{} 之后添加 rtmp的配置内容 

官方rtmp 手册https://github.com/arut/nginx-rtmp-module/wiki/Directives

中文手册 :视频直播点播nginx-rtmp开发手册中文版

复制代码

1 rtmp {
2 server {
3 listen 1935; #端口 4      RTMP 直播流配置
5 application rtmplive {
6 live on;
7 }
       HLS 直播流配置 8 application hls {
9 live on; 10 hls on; #开启hls 11 hls_path /tmp/hls;
          hls_fragment 5s #一个ts 文件的时长 5s 12 } 13 } 14 }

复制代码

需要在http里面增加一个location配置

复制代码

1 location /hls { 2 types { 3 application/vnd.apple.mpegurl m3u8; 4 video/mp2t ts; 5 } 6 root /tmp; 7 add_header Cache-Control no-cache; 8 }

复制代码

注意:修改nginx.conf之后,需重启nginx服务,才会生效:$nginx -s reload。

     再次在浏览器中测试:http://localhost:8080,以确认nginx开启的状态。

保存完配置文件后,启动nginx,通过netstat -ltn命令可以看到增加了一个1935端口的监听.8080是nginx默认的http监听端口。

复制代码

# netstat -ltn
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State
tcp 0 0 127.0.1.1:53 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN
tcp 0 0 127.0.0.1:631 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:1935 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:8080 0.0.0.0:* LISTEN
tcp6 0 0 :::22 :::* LISTEN
tcp6 0 0 ::1:631 :::* LISTEN

复制代码

然后用ffmpeg推流到nginx:

RTMP流,推流至rtmplive:

ffmpeg -re -i “D:\download\film\aqgy\02.mp4” -vcodec libx264 -vprofile baseline -acodec aac -ar 44100 -strict -2 -ac 1 -f flv -s 1280x720 -q 10 rtmp://server:1935/rtmplive/test1

HLS流,推流至hls:

ffmpeg -re -i “D:\download\film\aqgy\02.mp4” -vcodec libx264 -vprofile baseline -acodec aac -ar 44100 -strict -2 -ac 1 -f flv -s 1280x720 -q 10 rtmp://ip:1935/hls/test2

其中,HLS流表现较明显,在nginx的临时目录下,直观的可看到m3u8索引文件和N多个.ts文件。m3u8列表会实时更新,且会动态更改当前播放索引切片(.ts)。这种实时更新的机制,不会使得.ts文件长时间存在于Nginx服务器上,且当推流结束之后,该目录下的内容会被全部清除,这样无形中减缓了nginx服务器的压力。HLS协议在服务器端将直播数据流存储为连续的、很短时长的媒体文件(MPEG-TS格式),而客户端则不断的下载并播放这些小文件,因为服务器端总是会将最新的直播数据生成新的小文件,这样客户端只要不停的按顺序播放从服务器获取到的文件,就实现了直播。由此可见,基本上可以认为,HLS是以点播的技术方式来实现直播。由于数据通过HTTP协议传输,所以完全不用考虑防火墙或者代理的问题,而且分段文件的时长很短,客户端可以很快的选择和切换码率,以适应不同带宽条件下的播放。不过HLS的这种技术特点,决定了它的延迟一般总是会高于普通的流媒体直播协议。

m3u8索引文件

复制代码

#EXTM3U m3u文件头,必须放在第一行
#EXT-X-MEDIA-SEQUENCE 第一个TS分片的序列号 #当前索引
#EXT-X-TARGETDURATION 每个分片TS的最大的时长
#EXT-X-ALLOW-CACHE 是否允许cache
#EXT-X-ENDLIST m3u8文件结束符
#EXTINF extra info,分片TS的信息,如时长,带宽等

复制代码

 

现在我们的流媒体服务器有两个实时流了,一个是rtmp的,另一个是hls的,用流媒体播放器播放一下,流媒体播放器可以用vlc也可以用ffmpeg带的ffplay.手机也是可以播放的。

第一个就是推送的地址: rtmp://serverIp:1935/myapp/test1

第二个是HTTP地址: http://serverIp:8080/hls/test2.m3u8

播放rtmp流或hls流

最简单的测试,可通过VLC播放器,建立网络任务实现播放。所谓的播放,就是从Nginx服务器取到视频流并播放,也称之为“拉流”。需注意的是,HLS是基于HTTP的流媒体传输协议,端口为8080 ,hls的话用hls on开启hls,并且为hls设置一个临时文件目录hls_path /tmp/hls;其它更高级的配置可以参看nginx-rtmp-module的readme;而RTMP本身即为实时消息传输协议,端口为1935。由此决定了客户端访问直播流的方式,见下图:(客户端拉流过程)

拉流地址:

RTMP流:rtmp://localhost:1935/rtmplive/test

HLS流:http://localhost:8080/hls/test.m3u8

文章引用:HLS-搭建Nginx流媒体服务器(3)

        nginx+nginx-rtmp-module+ffmpeg搭建流媒体服务器

Helm Chart 部署 Redis 的完美指南 - 知乎

Excerpt

一、Helm介绍Helm是一个Kubernetes的包管理工具,就像Linux下的包管理器,如yum/apt等,可以很方便的将之前已经打包好的yaml文件部署到kubernetes上。 三个基本概念: Chart:Chart 代表着 helm 包。它包含在 Kube…


一、Helm介绍

Helm是一个Kubernetes的包管理工具,就像Linux下的包管理器,如yum/apt等,可以很方便的将之前已经打包好的yaml文件部署到kubernetes上。

三个基本概念:

  1. Chart:Chart 代表着 helm 包。它包含在 Kubernetes 集群内部运行应用程序,工具或服务所需的所有资源定义。
  2. Repository:是 chart 的存储库。例如:https://charts.bitnami.com/bitnami
  3. Release:Release 是运行在 Kubernetes 集群中的 chart 的实例。一个 chart 通常可以在同一个集群中安装多次。每一次安装都会创建一个新的 release。以 MySQL chart为例,如果你想在你的集群中运行两个数据库,你可以安装该chart两次。每一个数据库都会拥有它自己的 release 和 release name。

二、安装Helm

1
wget https://get.helm.sh/helm-v3.13.2-linux-amd64.tar.gz tar -xvf helm-v3.13.2-linux-amd64.tar.gz mv linux-amd64/helm /usr/local/bin/helm

安装完后可以使用 helm version 查看版本

三、配置Helm的repository

1
# 添加仓库 helm repo add bitnami https://charts.bitnami.com/bitnami# 也可以添加国内的一些库(阿里云等) helm repo add aliyun https://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts

添加完成后可以使用 helm list 查看repo列表:

四、部署chart(以部署redis为例)

1. 搜索chart

1
# 查找redis helm search repo redis

搜索出来可以看到有单机的redis或者集群的redis-cluster,APP VERSION表示redis的版本。

2. 拉取chart

1
# 拉取redis helm pull bitnami/redis

拉取下来是一个tgz的压缩包,需要进行解压 tar -xvf redis-18.5.0.tgz ,解压后会得到一个redis目录,里面包含了redis的各种配置文件和启动文件。

Chart.yaml # 包含了chart的一些基本信息 charts # 该目录保存其他依赖的 chart templates # chart 配置模板,用于渲染最终的 Kubernetes YAML 文件 NOTES.txt # 用户运行 helm install 时候的提示信息 values.yaml # 定义 chart 模板中的自定义配置的默认值,可以在执行 helm install 或 helm update 的

3. 修改values.yaml的一些配置(简单演示一下基本的配置)

  • global部分修改redis密码,如果有动态存储可以加上你提前建好的storgeClass,保证数据持久性;

  • architecture部分表示这个redis是集群还是单节点运行,如果集群改为replication,单节点改为standalone

  • service部分修改redis的服务,包括端口,是否对外访问,这里我们不对外,就改为ClusterIP

4. 启动chart

1
helm install redis-cluster ./redis/ -n redis# redis-cluster表示部署的名称 # ./redis/ 表示chart的本地路径,这里为当前目录下的reids # -n redis 表示命名空间,可以提前创建一个redis的命令空间

启动后会有一些提示,包括告诉你如何进入redis容器内,对外暴露端口等一些操作。

1
# 查看redis是否启动成功 kubectl get all -n redis

可以看到redis这个chart的所有状态都为running,一主三从,进入redis终端内验证,使用刚刚配置的密码登录redis:

kubectl exec -it redis-master-0 -n redis bash

创建一些数据 set name kubernetes ,然后在从节点验证数据是否存在。

可以看到从节点已经获取到刚刚创建的name,但是在从节点创建数据是不行的,从节点只能读取数据。

5. 升级和回滚

a. 升级

我们在刚刚已经创建了一个redis的chart,实际生产环境中,可能需要修改一些配置,然后基于这个chart对我们应用进行升级。

例如,我们这里修改一些redis的密码,然后升级这个chart,我们编辑一下values.yaml把开头创建的密码“redis123”改成“redis456”。

1
# 升级redis helm upgrade redis-cluster ./redis/ -n redis# redis-cluster表示部署的名称 # ./redis/ 表示chart的本地路径,这里为当前目录下的reids # -n redis 表示命名空间,可以提前创建一个redis的命令空间

可以从时间看到我们的pod刚刚进行了更新,并且状态都处于running状态,然后进入容器进行验证,当我们输入之前的密码“redis123”,会报错说密码不正确,输入更新后的密码“redis456”,正常进入redis,如果一开始使用了数据持久化,这时候创建的数据应该也不会丢失。

b. 回滚

1
# 先看看我们历史的chart helm history redis -n redis

可以看到第一个状态为superseded是我们最开始创建的chart,第二个状态为deployed是我们刚刚升级的chart,就是正在运行的,我们将版本回滚到第一个版本。

1
helm rollback redis 1 -n redis

可以看到我们已经回滚到第一个版本,并且pod正在更新中,等到创建成功,可以验证一下输入“redis123”是否能进入到redis。

验证成功,已经回滚到版本1,使用旧密码成功登录redis。

当我们深入探索 Helm Chart 部署 Redis 的过程时,不仅仅是学习了如何利用 Helm 简化复杂的部署任务,更是领略到了在容器化世界中管理和维护应用的便捷之处。通过 Helm Chart,我们可以轻松地定义、配置和部署复杂的应用程序,使得整个过程更加灵活、可维护性更高。

在未来,我们可以继续深入学习 Helm 的更多高级功能,探索更多复杂应用场景下的部署和管理方法。通过不断学习和实践,我们能够更好地应对日益复杂的容器化环境,提高工作效率,确保应用的可靠性和稳定性。

希望这篇博客能够帮助你更好地理解 Helm Chart 部署 Redis,并在你的容器化旅程中提供一些有益的经验。让我们一起迎接技术的挑战,不断进步,探索更多的可能性。感谢你的阅读,期待在未来的技术探索中再次相遇!

前言#

上篇介绍了gRPC中TLS认证和自定义方法认证,最后还简单介绍了gRPC拦截器的使用。gRPC自身只能设置一个拦截器,所有逻辑都写一起会比较乱。本篇简单介绍go-grpc-middleware的使用,包括grpc_zapgrpc_authgrpc_recovery

go-grpc-middleware简介#

go-grpc-middleware封装了认证(auth), 日志( logging), 消息(message), 验证(validation), 重试(retries) 和监控(retries)等拦截器。

  • 安装 go get github.com/grpc-ecosystem/go-grpc-middleware
  • 使用

Copy

import "github.com/grpc-ecosystem/go-grpc-middleware" myServer := grpc.NewServer( grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_ctxtags.StreamServerInterceptor(), grpc_opentracing.StreamServerInterceptor(), grpc_prometheus.StreamServerInterceptor, grpc_zap.StreamServerInterceptor(zapLogger), grpc_auth.StreamServerInterceptor(myAuthFunction), grpc_recovery.StreamServerInterceptor(), )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_ctxtags.UnaryServerInterceptor(), grpc_opentracing.UnaryServerInterceptor(), grpc_prometheus.UnaryServerInterceptor, grpc_zap.UnaryServerInterceptor(zapLogger), grpc_auth.UnaryServerInterceptor(myAuthFunction), grpc_recovery.UnaryServerInterceptor(), )), )

grpc.StreamInterceptor中添加流式RPC的拦截器。
grpc.UnaryInterceptor中添加简单RPC的拦截器。

grpc_zap日志记录#

1.创建zap.Logger实例

Copy

func ZapInterceptor() *zap.Logger { logger, err := zap.NewDevelopment() if err != nil { log.Fatalf("failed to initialize zap logger: %v", err) } grpc_zap.ReplaceGrpcLogger(logger) return logger }

2.把zap拦截器添加到服务端

Copy

grpcServer := grpc.NewServer( grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_zap.StreamServerInterceptor(zap.ZapInterceptor()), )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_zap.UnaryServerInterceptor(zap.ZapInterceptor()), )), )

3.日志分析


各个字段代表的意思如下:

Copy

`{
“level”: “info”,
“msg”: “finished unary call”,

  "grpc.code": "OK",						
  "grpc.method": "Ping",					/ string  method name
  "grpc.service": "mwitkow.testproto.TestService",              
  "grpc.start_time": "2006-01-02T15:04:05Z07:00",               
  "grpc.request.deadline": "2006-01-02T15:04:05Z07:00",         
  "grpc.request.value": "something",				
  "grpc.time_ms": 1.345,					

  "peer.address": {
    "IP": "127.0.0.1",						
    "Port": 60216,						
    "Zone": ""							
  },
  "span.kind": "server",					
  "system": "grpc",						

  "custom_field": "custom_value",				
  "custom_tags.int": 1337,					
  "custom_tags.string": "something"				

}`

4.把日志写到文件中

上面日志是在控制台输出的,现在我们把日志写到文件中,修改ZapInterceptor方法。

Copy

`import (
grpc_zap “github.com/grpc-ecosystem/go-grpc-middleware/logging/zap”
“go.uber.org/zap”
“go.uber.org/zap/zapcore”
“gopkg.in/natefinch/lumberjack.v2”
)

func ZapInterceptor() *zap.Logger {
w := zapcore.AddSync(&lumberjack.Logger{
Filename: “log/debug.log”,
MaxSize: 1024,
LocalTime: true,
})

config := zap.NewProductionEncoderConfig()
config.EncodeTime = zapcore.ISO8601TimeEncoder
core := zapcore.NewCore(
    zapcore.NewJSONEncoder(config),
    w,
    zap.NewAtomicLevel(),
)

logger := zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1))
grpc_zap.ReplaceGrpcLogger(logger)
return logger

}`

grpc_auth认证#

go-grpc-middleware中的grpc_auth默认使用authorization认证方式,以authorization为头部,包括basic, bearer形式等。下面介绍bearer token认证。bearer允许使用access key(如JSON Web Token (JWT))进行访问。

1.新建grpc_auth服务端拦截器

Copy

`type TokenInfo struct {
ID string
Roles []string
}

func AuthInterceptor(ctx context.Context) (context.Context, error) {
token, err := grpc_auth.AuthFromMD(ctx, “bearer”)
if err != nil {
return nil, err
}
tokenInfo, err := parseToken(token)
if err != nil {
return nil, grpc.Errorf(codes.Unauthenticated, “ %v”, err)
}

newCtx := context.WithValue(ctx, tokenInfo.ID, tokenInfo)

return newCtx, nil

}

func parseToken(token string) (TokenInfo, error) {
var tokenInfo TokenInfo
if token == “grpc.auth.token” {
tokenInfo.ID = “1”
tokenInfo.Roles = []string{“admin”}
return tokenInfo, nil
}
return tokenInfo, errors.New(“Token无效: bearer “ + token)
}

func userClaimFromToken(tokenInfo TokenInfo) string {
return tokenInfo.ID
}`

代码中的对token进行简单验证并返回模拟数据。

2.客户端请求添加bearer token

实现和上篇的自定义认证方法大同小异。gRPC 中默认定义了 PerRPCCredentials,是提供用于自定义认证的接口,它的作用是将所需的安全认证信息添加到每个RPC方法的上下文中。其包含 2 个方法:

  • GetRequestMetadata:获取当前请求认证所需的元数据
  • RequireTransportSecurity:是否需要基于 TLS 认证进行安全传输

接下来我们实现这两个方法

Copy

`type Token struct {
Value string
}

const headerAuthorize string = “authorization”

func (t *Token) GetRequestMetadata(ctx context.Context, uri …string) (map[string]string, error) {
return map[string]string{headerAuthorize: t.Value}, nil
}

func (t *Token) RequireTransportSecurity() bool {
return true
}`

注意:这里要以authorization为头部,和服务端对应。

发送请求时添加token

Copy

`creds, err := credentials.NewClientTLSFromFile(“../tls/server.pem”, “go-grpc-example”)
if err != nil {
log.Fatalf(“Failed to create TLS credentials %v”, err)
}

token := auth.Token{
    Value: "bearer grpc.auth.token",
}

conn, err := grpc.Dial(Address, grpc.WithTransportCredentials(creds), grpc.WithPerRPCCredentials(&token))` 

注意:Token中的Value的形式要以bearer token值形式。因为我们服务端使用了bearer token验证方式。

3.把grpc_auth拦截器添加到服务端

Copy

grpcServer := grpc.NewServer(cred.TLSInterceptor(), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_auth.StreamServerInterceptor(auth.AuthInterceptor), grpc_zap.StreamServerInterceptor(zap.ZapInterceptor()), )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_auth.UnaryServerInterceptor(auth.AuthInterceptor), grpc_zap.UnaryServerInterceptor(zap.ZapInterceptor()), )), )

写到这里,服务端都会拦截请求并进行bearer token验证,使用bearer token是规范了与HTTP请求的对接,毕竟gRPC也可以同时支持HTTP请求。

grpc_recovery恢复#

把gRPC中的panic转成error,从而恢复程序。

1.直接把grpc_recovery拦截器添加到服务端

最简单使用方式

Copy

grpcServer := grpc.NewServer(cred.TLSInterceptor(), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_auth.StreamServerInterceptor(auth.AuthInterceptor), grpc_zap.StreamServerInterceptor(zap.ZapInterceptor()), grpc_recovery.StreamServerInterceptor, )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_auth.UnaryServerInterceptor(auth.AuthInterceptor), grpc_zap.UnaryServerInterceptor(zap.ZapInterceptor()), grpc_recovery.UnaryServerInterceptor(), )), )

2.自定义错误返回

panic时候,自定义错误码并返回。

Copy

func RecoveryInterceptor() grpc_recovery.Option { return grpc_recovery.WithRecoveryHandler(func(p interface{}) (err error) { return grpc.Errorf(codes.Unknown, "panic triggered: %v", p) }) }

添加grpc_recovery拦截器到服务端

Copy

grpcServer := grpc.NewServer(cred.TLSInterceptor(), grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( grpc_auth.StreamServerInterceptor(auth.AuthInterceptor), grpc_zap.StreamServerInterceptor(zap.ZapInterceptor()), grpc_recovery.StreamServerInterceptor(recovery.RecoveryInterceptor()), )), grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( grpc_auth.UnaryServerInterceptor(auth.AuthInterceptor), grpc_zap.UnaryServerInterceptor(zap.ZapInterceptor()), grpc_recovery.UnaryServerInterceptor(recovery.RecoveryInterceptor()), )), )

总结#

本篇介绍了go-grpc-middleware中的grpc_zapgrpc_authgrpc_recovery拦截器的使用。go-grpc-middleware中其他拦截器可参考GitHub学习使用。

教程源码地址:https://github.com/Bingjian-Zhu/go-grpc-example

前言#

上一篇介绍了客户端流式RPC,客户端不断的向服务端发送数据流,在发送结束或流关闭后,由服务端返回一个响应。本篇将介绍双向流式RPC

双向流式RPC:客户端和服务端双方使用读写流去发送一个消息序列,两个流独立操作,双方可以同时发送和同时接收。

情景模拟:双方对话(可以一问一答、一问多答、多问一答,形式灵活)。

新建proto文件#

新建both_stream.proto文件

1.定义发送信息

Copy

// 定义流式请求信息 message StreamRequest{ //流请求参数 string question = 1; }

2.定义接收信息

Copy

// 定义流式响应信息 message StreamResponse{ //流响应数据 string answer = 1; }

3.定义服务方法Conversations

双向流式rpc,只要在请求的参数前和响应参数前都添加stream即可

Copy

service Stream{ // 双向流式rpc,同时在请求参数前和响应参数前加上stream rpc Conversations(stream StreamRequest) returns(stream StreamResponse){}; }

4.编译proto文件

进入both_stream.proto所在目录,运行指令:

protoc --go_out=plugins=grpc:./ ./both_stream.proto

创建Server端#

1.定义我们的服务,并实现RouteList方法

这里简单实现对话中一问一答的形式

Copy

`type StreamService struct{}

func (s *StreamService) Conversations(srv pb.Stream_ConversationsServer) error {
n := 1
for {
req, err := srv.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
err = srv.Send(&pb.StreamResponse{
Answer: “from stream server answer: the “ + strconv.Itoa(n) + “ question is “ + req.Question,
})
if err != nil {
return err
}
n++
log.Printf(“from stream client question: %s”, req.Question)
}
}`

2.启动gRPC服务器

Copy

`const (

Address string = ":8000"

Network string = "tcp"

)

func main() {

listener, err := net.Listen(Network, Address)
if err != nil {
    log.Fatalf("net.Listen err: %v", err)
}
log.Println(Address + " net.Listing...")

grpcServer := grpc.NewServer()

pb.RegisterStreamServer(grpcServer, &StreamService{})


err = grpcServer.Serve(listener)
if err != nil {
    log.Fatalf("grpcServer.Serve err: %v", err)
}

}`

运行服务端

Copy

go run server.go :8000 net.Listing...

创建Client端#

1.创建调用服务端Conversations方法

Copy

`func conversations() {

stream, err := streamClient.Conversations(context.Background())
if err != nil {
    log.Fatalf("get conversations stream err: %v", err)
}
for n := 0; n < 5; n++ {
    err := stream.Send(&pb.StreamRequest{Question: "stream client rpc " + strconv.Itoa(n)})
    if err != nil {
        log.Fatalf("stream request err: %v", err)
    }
    res, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatalf("Conversations get stream err: %v", err)
    }
    
    log.Println(res.Answer)
}

err = stream.CloseSend()
if err != nil {
    log.Fatalf("Conversations close stream err: %v", err)
}

}`

2.启动gRPC客户端

Copy

`const Address string = “:8000”

var streamClient pb.StreamClient

func main() {

conn, err := grpc.Dial(Address, grpc.WithInsecure())
if err != nil {
    log.Fatalf("net.Connect err: %v", err)
}
defer conn.Close()


streamClient = pb.NewStreamClient(conn)
conversations()

}`

运行客户端,获取到服务端的应答

Copy

go run client.go from stream server answer: the 1 question is stream client rpc 0 from stream server answer: the 2 question is stream client rpc 1 from stream server answer: the 3 question is stream client rpc 2 from stream server answer: the 4 question is stream client rpc 3 from stream server answer: the 5 question is stream client rpc 4

服务端获取到来自客户端的提问

Copy

from stream client question: stream client rpc 0 from stream client question: stream client rpc 1 from stream client question: stream client rpc 2 from stream client question: stream client rpc 3 from stream client question: stream client rpc 4

总结#

本篇介绍了双向流式RPC的简单使用。

教程源码地址:https://github.com/Bingjian-Zhu/go-grpc-example
参考:gRPC官方文档中文版