0%

该篇blog将讲述如何通过Edgex的SDK接口来创建device service,分为两部分,第一部分是configuration.toml与profile.yml的配置参数讲解,第二部分是有关SDK接口的实现。

首先到github上下载device-sdk-go,进入example目录下,接下来将主要对这个示例进行分析讲解。

configuration & Profile

configuration.toml

configuration.toml主要是用来配置device service的地址和端口号,注册到EdgeX服务中以及设备的预定义。

进入cmd/device-simple/res/目录下,打开configuration.toml,如果你的EdgeX部署在服务器上,则需要将[Service]中的Host修改成服务器的ip地址,[Registry], [Clients], [Device], [Logging]等可以不需要修改。

找到[[DeviceList]]该部分是用来预定义设备的

1
2
3
4
5
Name是你创建的Device的名字
Profile是用来创建Device的配置文件,即profile.yml中name的值
[[DeviceList.AutoEvents]]是用来自动发送Command请求的,对应profile.yml是调用get命令,对应SDK接口是调用HandleReadCommand函数
Frequency 是发送请求的频率
Resource 是发送请求对应的数据类型,与proflie.yml中的deviceResources对应。

Profile.yml

Profile.yml是用来定义Device的数据类型以及EdgeX如何通过core-Command来对device发送命令,先讲解deviceResources,deviceCommands与coreCommands的关系,deviceResources中的值是定义Device上传到deviceService中数据的类型,coreComands定义了核心服务core-command能对设备进行的操作,包括读(get)和写(put)。deviceCommands是联系coreCommands与deviceResources的桥梁,他定义了coreCommands操作设备时对哪些数值类型进行操作。
如图所示:
在这里插入图片描述

1
2
3
第一行name对应configuration.toml中DeviceList中的Name,是这个Profile的“名字”
deviceResources中name: “SwitchButton”代表这个设备的一个数据类型的名称
下面的 properties定义了SwitchButton的基本数值类型

在这里插入图片描述

1
2
3
corecommands 定义了core-command对设备操作的两种方法,一个是读(get),一个是写(put)
get中 expectedValues表示希望读取到的值的类型,与deviceResources中name目录下的名称一致
put中parameterNames表示传入的参数的值的类型,也与deviceResources中name目录下名称一致

在这里插入图片描述

1
2
3
4
deviceCommands连接deviceResources与coreCommands
name与 coreCommands中name一致
在coreCommands中定义了get则需要get, 定义了put则需要定义set。
deviceResource与deviceResources中name一致。

通过Configuration.toml的预定义Device和Simple.yaml文件就可以生成一个Device并且注册到core-metadata中去,当然也可以自己通过Simple.yaml手动注册一个Device

通过SDK创建设备服务

创建一个设备服务主要是要实现ProtocolDriver中给出的接口,在这个实例中进入到driver目录,主要是实现Initialize、HandleReadCommands、HandleWriteCommands三个函数。

Initialize

1
Initialize(lc logger.LoggingClient, asyncCh chan<- *dsModels.AsyncValues, deviceCh chan<- []dsModels.DiscoveredDevice) error 

lc应该是日志系统,asynCh是一个通道,我们通过将我们想上传的值赋予给asynCh就可以上传至coredata,deviceCh没有使用过,应该是用来自动发现设备的。
在Initialize函数中要定实现创建的设备服务的服务器,比如像mqtt服务器、http服务器等等,这些服务能监听某个Broker或者某个URL,当真实物理设备向这个Borker或者URL发送数据时,该服务就能取得数据,包装成asynCh,发送至coredata。具体如何实现服务设备,这个example中没有讲到,在device-rest-go和device-mqtt-go中有实现服务设备,我下篇博客也会讲如何具体实现。

HandleReadCommands

1
func (s *SimpleDriver) HandleReadCommands(deviceName string, protocols map[string]contract.ProtocolProperties, reqs []dsModels.CommandRequest) (res []*dsModels.CommandValue, err error)

该函数的作用是在用户通过core-command发送get命令时调用,行为是像物理设备发送一次读命令请求。

1
2
3
4
deviceName:发送get命令时的物理设备的名称
protocals:不清楚有什么作用
reqs:命令的详细内容,可以通过该参数获取请求的数值类型等等
返回值res:将命令需要返回的值封装到res中,会自动包装成asyncCh发送到coredata中去

下面分析这个示例的handleReadCommands函数
在这里插入图片描述

首先判断reqs的长度,即需要读取的deviceResources中变量的个数,
然后创建变量now,作为时间戳,之后通过调用reqs[0].DeviceResourceName来获得数据类型,进入到if之后通过dsModels.*Value(reqs[0].DeviceResourceName, now, value)来创建一个dsModels.CommandValue类型的变量,其中now是之前创建的时间戳,value是要上传到coredata的值,赋值给res,就可以通过res上传到coredata完成数据上传。

HandleWriteCommands

1
2
func (s *SimpleDriver) HandleWriteCommands(deviceName string, protocols map[string]contract.ProtocolProperties, reqs []dsModels.CommandRequest,
params []*dsModels.CommandValue) error {

用法基本与HandleRandomCommends一致,通过post上传的参数放到了params里面,其他就不多说了。

EdgeX 相关概念

本文将介绍以下内容:

  1. EdgeX 编译和EdgeX docker image 编译
  2. EdgeX 部署的基本环境准备
  3. EdgeX CLI
  4. EdgeX 连接 Modbus设备
  5. EdgeX 连接MQTT设备
  6. 分布式部署Device Service
  7. 如何设定定时任务
  8. 基于Release1.3.0(Hanoi)版本

实验设备

本文将同时基于x86-64架构设备和ARM64(AARCH64)设备进行实验,下面是硬件信息表

x86-64 设备

CPU

Intel® Core™ i5-7267U CPU @ 3.10GHz 单CPU双核四线程

Mem

7.7Gi

Disk

110GB

NIC

1000Mb/s

OS

Ubuntu 20.04.2 LTS (Focal Fossa)

kernel

5.8.0-43-generic

ARM64 设备

CPU

AArch64 Processor rev 12 (aarch64) Qualcomm Technologies, Inc SDA845双CPU八核八线程

Mem

3.5G

Disk

64GB

NIC

1000Mb/s

OS

Ubuntu 16.04.4 LTS (Xenial Xerus)

kernel

4.9.103

基础软件环境准备

  1. 安装docker
    a. 参考:https://docs.docker.com/engine/install/ubuntu/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

sudo apt install -y docker.io

sudo apt remove docker docker-engine docker.io containerd runc
sudo apt update
sudo apt install -y\
apt-transport-https \
ca-certificates \
curl \
gnupg-agent \
software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
sudo apt-key fingerprint 0EBFCD88
sudo add-apt-repository \
"deb [arch=amd64] https://download.docker.com/linux/ubuntu \
$(lsb_release -cs) \
stable"

sudo add-apt-repository \
"deb [arch=arm64] https://download.docker.com/linux/ubuntu \
$(lsb_release -cs) \
stable"
sudo apt update
apt-cache madison docker-ce

sudo apt install docker-ce docker-ce-cli containerd.io

sudo apt install docker-ce=5:20.10.0~3-0~ubuntu-focal \
docker-ce-cli=5:20.10.0~3-0~ubuntu-focal \
containerd.io

sudo apt install docker-ce=5:20.10.0~3-0~ubuntu-xenial \
docker-ce-cli=5:20.10.0~3-0~ubuntu-xenial \
containerd.io
  1. 安装docker-compose
    a. 参考:https://docs.docker.com/compose/install/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19



sudo curl -L "https://github.com/docker/compose/releases/download/1.28.4/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose






export LC_ALL=C
sudo apt install -y libssl-dev libffi-dev build-essential python3-dev \
protobuf-compiler libprotoc-dev zlib1g-dev gcc g++ make \
libxml2-dev libxslt1-dev
sudo apt install python3-pip
sudo pip3 install --upgrade pip
sudo pip3 install docker-compose
  1. 安装go语言环境(如果需要编译源码)
    a. 参考:https://golang.org/dl/
    b. 下载&安装合适版本golang安装包(当前需要 >=1.15.x)
1
2
3
4
5
wget https://golang.org/dl/go1.15.8.linux-amd64.tar.gz
sudo tar -C /usr/local -xzf ./go1.15.8.linux-amd64.tar.gz

wget https://golang.org/dl/go1.15.8.linux-arm64.tar.gz
sudo tar -C /usr/local -xzf ./go1.15.8.linux-arm64.tar.gz

c. 配置环境变量

1
2
3
4
5
6
7
8
9


cat >> ~/.bashrc << EOF
export PATH=\$PATH:/usr/local/go/bin
export GOROOT=/usr/local/go
export GOPATH=/{your gopath}/gopath
EOF
source ~/.bashrc
go version

d. 配置国内依赖下载资源

1
2
go env -w GO111MODULE=on
go env -w GOPROXY=https://goproxy.io,direct

e. 其他依赖(如果需要编译源码)

1
2
3
sudo apt install -y libtool pkg-config build-essential \
autoconf automake uuid-dev
sudo apt install -y libzmq3-dev libzmq5

EdgeX 编译源码

  1. 下载源码
1
2
sudo apt install -y git
git clone https://github.com/edgexfoundry/edgex-go.git
  1. 编译
1
2
3
4
5
6
7
cd edgex-go
go get github.com/rjeczalik/pkgconfig/cmd/pkg-config
go get github.com/edgexfoundry/edgex-go
make build


sudo make docker

部署EdgeX

部署方式

  1. snap
  2. native binaries
  3. docker-compose
  4. kubernetes

native binaries

在上述编译源码完成的基础上,在edgex-go/目录下:(注意需要提前配置好数据库)

1
make run

docker-compose

docker-compose 部署需要edgex各微服务的容器镜像,在上述源码编译中最后一步(make docker)可以生成docker 镜像,也可以使用 Docker Hub上有已经编译好的docker 镜像。

可以使用github上提供的edgex部署脚本中的docker-compose的yaml文件进行部署,方法最为简便,方法如下:

  1. 下载配置文件
1
git clone https://github.com/edgexfoundry/developer-scripts.git
  1. 选择对应的release版本
1
2
3
4
5
6
7
8
9
10
11
12

cd developer-scripts/releases/hanoi/compose-files/


sudo make pull
sudo make run

sudo docker-compose -pedgex -f docker-compose-hanoi.yml ps

sudo make pull arm64
sudo make run arm64
sudo docker-compose -pedgex -f docker-compose-hanoi-arm64.yml ps

部署成功
3. UI

1
2
3
4
 
sudo make run-ui

sudo make run-ui arm64
  • 当前版本的UI只是一个开发测试版本,并不是商业版本
  • 此时可通过浏览器访问本机4000端口访问到UI页面(可以尝试用github上最新的ui代码编译镜像)
    ui

edgex-cli 是用以和edgex微服务交互的命令行接口(工具)。取代以往使用curl命令的方式访问edgex内部的微服务接口。

  1. 安装
1
sudo snap install edgex-cli

CLI

Modbus 设备

部署Modbus设备服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
vim add-device-modbus.yml

"""
version: '3.7'
networks:
edgex-network:
external: true
name: edgex_edgex-network
services:
device-modbus:
container_name: edgex-device-modbus
environment:
SERVICE_HOST: edgex-device-modbus
CLIENTS_COMMAND_HOST: edgex-core-command
CLIENTS_COREDATA_HOST: edgex-core-data
CLIENTS_DATA_HOST: edgex-core-data
CLIENTS_METADATA_HOST: edgex-core-metadata
CLIENTS_NOTIFICATIONS_HOST: edgex-support-notifications
CLIENTS_RULESENGINE_HOST: edgex-kuiper
CLIENTS_SCHEDULER_HOST: edgex-support-scheduler
CLIENTS_VIRTUALDEVICE_HOST: edgex-device-random
DATABASES_PRIMARY_HOST: edgex-redis
EDGEX_SECURITY_SECRET_STORE: "false"
REGISTRY_HOST: edgex-core-consul
hostname: edgex-device-modbus
image: edgexfoundry/docker-device-modbus-go:1.3.1
# For ARM64
# image: edgexfoundry/docker-device-modbus-go-arm64:1.3.1
networks:
edgex-network: {}
ports:
- "127.0.0.1:49991:49991"
"""
sudo docker-compose -p edgex -f add-device-modbus.yml up -d

edgex-cli deviceservice list

获取数据的例子(GET)—— RST5900

设备介绍

RST5900 是一个温湿度一体的传感器,有温度值和湿度值两个数据源。具体信息如下:

  1. 协议和连接信息
    rst5900-1述
  2. Modbus通讯协议功能代码
    rst5900-2
  3. 获取温湿度请求报文
    rst5900-3
  4. 传感器应答数据
    rst5900-4

设备调试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>

int main(int argc, char *argv[])
{
int client_socket,ct;
unsigned int SERVER_ADDR=0x0a0014c8;
struct sockaddr_in server_addr;
int err;
client_socket=socket(AF_INET,SOCK_STREAM,0);
if(client_socket<0)
{
printf("socket error\n");
return -1;
}
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_addr.s_addr=htonl(SERVER_ADDR);
server_addr.sin_port=htons(502);
ct=connect(client_socket,(struct sockaddr *)&server_addr,sizeof(struct sockaddr));
if(ct<0)
{
printf("connect error\n");
return -1;
}
modbusTCP_request(client_socket);
close(client_socket);
return 0;
}

int modbusTCP_request(int client_socket)
{
int recv,send;
unsigned char buff[13];
char request[12];

request[5]=0x06;
request[7]=0x04;
request[11]=0x02;
while(1)
{
send=write(client_socket,request,sizeof(request));
if (send >= 0){
printf("send: ");
for (int i=0; i<send; i++){
printf("%02x\t",request[i]);
}
printf("\n");
}
else {
printf("send error\n");
return -1;
}
recv=read(client_socket,buff,13);
if(recv>0)
{
printf("recive: ");
for (int i=0; i<recv; i++)
{
printf("%02x\t",buff[i]);
}
printf("\n--------------------------------\n");
}
request[0]++;
sleep(1);
}
return 0;
}

编译运行

1
2
gcc rst_5900_test.c -o rst_5900_test
./rst_5900_test

在这里插入图片描述

创建 profile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

name: "RST5900"
manufacturer: "TS Beijing"
model: "RST5900"
description: "Temperature and Humidity Senser."
labels:
- "modbus"
- "senser"
deviceResources:
-
name: "Temperature"
description: "Sensor Temperature"
attributes:
{ primaryTable: "INPUT_REGISTERS", startingAddress: "1", rawType: "INT16"}
properties:
value:
{ type: "FLOAT32", readWrite: "R",scale: "0.1", floatEncoding: "eNotation"}
units:
{ type: "String", readWrite: "R", defaultValue: "°C"}
-
name: "Humidity"
description: "Sensor Relative Humidity %"
attributes:
{ primaryTable: "INPUT_REGISTERS", startingAddress: "2", rawType: "INT16" }
properties:
value:
{ type: "FLOAT32", readWrite: "R",scale: "0.1", floatEncoding: "eNotation"}
units:
{ type: "String", readWrite: "R", defaultValue: "%RH"}
deviceCommands:
-
name: "Data"
get:
- { index: "1", operation: "get", object: "Temperature", parameter: "Temperature" }
- { index: "2", operation: "get", object: "Humidity", parameter: "Humidity" }
coreCommands:
-
name: "Data"
get:
path: "/api/v1/device/{deviceId}/Data"
responses:
-
code: "200"
description: "Get the Data"
expectedValues: ["Temperature","Humidity"]
-
code: "503"
description: "service unavailable"
expectedValues: []

关于profile的介绍:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
关于profile的介绍:
1. name:profile标识。添加设备时,设备需要绑定一个profile名称以明确所支持的指令和数据源。
2. deviceResources:一个设备提供的数据源(如温湿度传感器的温度、湿度为两个resouce)
1. name:数据源的名称标识
2. attributes:数据源的特征值。在modbus device service中用来标识功能寄存器类型、起始地址等
1. primaryTable:寄存器类型
1. HOLDING_REGISTERS
2. INPUT_REGISTERS
3. COILS
4. DISCRETES_INPUT
2. startingAddress:寄存器起始地址(这里是从1开始,通常对应寄存器手册的0位)
3. rawType:(可选)解析二进制数据的原始数据类型。如果没有设置,默认使用properties-value-type
[ 避免精度丢失:如果一个精度为0.01的温度值存储在INT16类型数据中,如果实际温度 26.53,
则读取值为2653。但是,在转换之后,值是26]
4. properties:定义数据源的数据类型、精度值、单位等
3. deviceCommands:定义一个命令将数据源组合
1. name:命令名称
2. get:get方法,组合可以get的数据源
3. set:set(post)方法,组合可以控制的数据源
4. coreCommands:定义外部调用的API接口
  • (引)For example, a Modbus device stores the temperature and humidity in an INT16 data type with a float scale of 0.01. If the temperature is 26.53, the read value is 2653. However, following transformation, the value is 26.

设备接入

  1. 上传profile
1
2
edgex-cli profile add -f rst_5900_profile.yaml

在这里插入图片描述
2. 添加设备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

edgex-cli device add --description "Temperature & Humidity Senser " \
--profileName RST5900 --operatingStatus ENABLED -i


curl -X POST -d \
'{
"description": "Temperature & Humidity Senser ",
"name": "RST5900",
"adminState": "unlocked",
"operatingState": "enabled",
"protocols":{
"modbus-tcp":{
"name":"senser",
"Address":"10.0.20.200",
"Port":"502",
"UnitID":"1"
}
},
"labels": [
],
"location": null,
"service": {
"name": "edgex-device-modbus"
},
"profile": {
"name": "RST5900"
}
}' \
http://localhost:48081/api/v1/device




获取数据

  1. 查看设备
1
2
3
edgex-cli device list
edgex-cli device list --name RST5900
edgex-cli device list --name RST5900 -v

在这里插入图片描述
2. 获取数据

1
2

curl -X GET http://localhost:48082/api/v1/device/name/RST5900/command/Data

在这里插入图片描述
在这里插入图片描述
3. 获取历史数据

1
2
3
4
5
6

edgex-cli reading list -d RST5900


curl -X GET "http://localhost:48080/api/v1/reading/device/RST5900/10" \
-H "accept: */*"|jq .

在这里插入图片描述

下发控制的例子(SET)—— Patlite LA6

设备介绍

Patlite LA6 是一组有5个LED灯和一个蜂鸣器的报警器。LED灯和蜂鸣器可以单独控制或群组控制,具体操作需要查看说明书的寄存器手册。本例只使用5个LED的单独控制作为例子,演示对Modbus设备的控制。

  1. 协议和连接信息
    在这里插入图片描述
  2. Modbus通讯协议
    根据说明书寄存器手册可知Holding 寄存器管理单个LED/蜂鸣器,功能码03为读,06为写。
1
2
3
4
5
1. 读取单个LED/蜂鸣器的状态
1. 请求数据
1. 起始地址表示从哪一位开始查
2. 起始地址 0~4 对应 LED1~5
2. 响应数据

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

调试设备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#define PORT 502

int main(int argc, char *argv[])
{
int client_socket,ct;
unsigned int SERVER_ADDR=0x0a0014c9;
struct sockaddr_in server_addr;
int err;
client_socket=socket(AF_INET,SOCK_STREAM,0);
if(client_socket<0)
{
printf("socket error\n");
return -1;
}
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_addr.s_addr=htonl(SERVER_ADDR);
server_addr.sin_port=htons(PORT);
ct=connect(client_socket,(struct sockaddr *)&server_addr,sizeof(struct sockaddr));
if(ct<0)
{
printf("connect error\n");
return -1;
}
modbusTCP_request_read(client_socket);
modbusTCP_request_write(client_socket);
modbusTCP_request_read(client_socket);
close(client_socket);
return 0;
}


#if 1

int modbusTCP_request_read(int client_socket)
{
int recv,send;
char request[12];
int BUFFERSIZE = 21;
unsigned char buff[BUFFERSIZE];
request[0]=0x00;
request[1]=0x01;
request[2]=0x00;
request[3]=0x00;
request[4]=0x00;
request[5]=0x06;
request[6]=0x01;
request[7]=0x03;

request[8]=0x00;

request[9]=0x00;

request[10]=0x00;

request[11]=0x06;

send=write(client_socket,request,sizeof(request));
if (send >= 0){
printf("Send:%d\n",send);
for (int i=0; i<send; i++){
printf("%02x\t",request[i]);
}
}else{
printf("send error\n");
return -1;
}
recv=read(client_socket,buff,BUFFERSIZE);
if(recv>0)
{
printf("\nRecv:%d\n",recv);
for (int i=0; i<recv; i++)
{
printf("%02x\t",buff[i]);
}
printf("\n--------------------------------\n");
}
return 0;
}
#endif

#if 1

int modbusTCP_request_write(int client_socket)
{
int recv,send;
char request[12];
int BUFFERSIZE = 12;
unsigned char buff[BUFFERSIZE];
request[0]=0x00;
request[1]=0x03;
request[2]=0x00;
request[3]=0x00;
request[4]=0x00;
request[5]=0x06;
request[6]=0xFF;
request[7]=0x06;
request[8]=0x00;



request[9]=0x02;



request[10]=0x01;
request[11]=0x01;
send=write(client_socket,request,sizeof(request));
if (send >= 0){
printf("Send:%d\n",send);
for (int i=0; i<send; i++){
printf("%02x\t",request[i]);
}
}else{
printf("send error\n");
return -1;
}
recv=read(client_socket,buff,BUFFERSIZE);
if(recv>0)
{
printf("\nrecv:%d\n",recv);
for (int i=0; i<BUFFERSIZE; i++)
{
printf("%02x\t",buff[i]);
}
printf("\n--------------------------------\n");
}
return 0;
}
#endif

编译运行

1
2
gcc patlite_la6_test.c -o patlite_la6_test
./patlite_la6_test

创建profile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
name: "LA6-POE"
manufacturer: "PATLATE Japan(Profile by TS Beijing)"
model: "LA6-POE"
description: "Signal Tower LA6-POE."
labels:
- "modbus"
- "led"
deviceResources:
-
name: "LED1"
description: "First LED Light RED, (0-ignore, 1-ctrl | 0-off, 1-on, 02-flashing, 09-no-chang)"
attributes:
{ primaryTable: "HOLDING_REGISTERS", startingAddress: "1" }
properties:
value:
{ type: "INT16", readWrite: "RW", scale: "1", minimum: "0", maximum: "65535", defaultValue: "0"}
units:
{ type: "String", readWrite: "R", defaultValue: "Operation Mode"}
-
name: "LED2"
description: "Second LED Light YELLOW, (0-ignore, 1-ctrl | 0-off, 1-on, 02-flashing, 09-no-chang)"
attributes:
{ primaryTable: "HOLDING_REGISTERS", startingAddress: "2" }
properties:
value:
{ type: "INT16", readWrite: "RW", scale: "1", minimum: "0", maximum: "65535", defaultValue: "0"}
units:
{ type: "String", readWrite: "R", defaultValue: "Operation Mode"}
-
name: "LED3"
description: "3rd LED Light RED, (0-ignore, 1-ctrl | 0-off, 1-on, 02-flashing, 09-no-chang)"
attributes:
{ primaryTable: "HOLDING_REGISTERS", startingAddress: "3" }
properties:
value:
{ type: "INT16", readWrite: "RW", scale: "1", minimum: "0", maximum: "65535", defaultValue: "0"}
units:
{ type: "String", readWrite: "R", defaultValue: "Operation Mode"}
-
name: "LED4"
description: "4th LED Light RED, (0-ignore, 1-ctrl | 0-off, 1-on, 02-flashing, 09-no-chang)"
attributes:
{ primaryTable: "HOLDING_REGISTERS", startingAddress: "4" }
properties:
value:
{ type: "INT16", readWrite: "RW", scale: "1", minimum: "0", maximum: "65535", defaultValue: "0"}
units:
{ type: "String", readWrite: "R", defaultValue: "Operation Mode"}
-
name: "LED5"
description: "5th LED Light RED, (0-ignore, 1-ctrl | 0-off, 1-on, 02-flashing, 09-no-chang)"
attributes:
{ primaryTable: "HOLDING_REGISTERS", startingAddress: "5" }
properties:
value:
{ type: "INT16", readWrite: "RW", scale: "1", minimum: "0", maximum: "65535", defaultValue: "0"}
units:
{ type: "String", readWrite: "R", defaultValue: "Operation Mode"}
deviceCommands:
-
name: "CUSTOM"
set:
- { index: "1", operation: "set", object: "LED1", parameter: "LED1" }
- { index: "2", operation: "set", object: "LED2", parameter: "LED2" }
- { index: "3", operation: "set", object: "LED3", parameter: "LED3" }
- { index: "4", operation: "set", object: "LED4", parameter: "LED4" }
- { index: "5", operation: "set", object: "LED5", parameter: "LED5" }
get:
- { index: "1", operation: "get", object: "LED1", parameter: "LED1" }
- { index: "2", operation: "get", object: "LED2", parameter: "LED2" }
- { index: "3", operation: "get", object: "LED3", parameter: "LED3" }
- { index: "4", operation: "get", object: "LED4", parameter: "LED4" }
- { index: "5", operation: "get", object: "LED5", parameter: "LED5" }
coreCommands:
-
name: "CUSTOM"
get:
path: "/api/v1/device/{deviceId}/CUSTOM"
responses:
-
code: "200"
description: "Get the Configuration"
expectedValues: ["LED1","LED2","LED3","LED4","LED5"]
-
code: "503"
description: "service unavailable"
expectedValues: []
put:
path: "/api/v1/device/{deviceId}/CUSTOM"
parameterNames: ["LED1","LED2","LED3","LED4","LED5" ]
responses:
-
code: "204"
description: "Set the Configuration"
expectedValues: []
-
code: "503"
description: "service unavailable"
expectedValues: []
-
name: "LED1"
get:
path: "/api/v1/device/{deviceId}/LED1"
responses:
-
code: "200"
description: "Get the Configuration"
expectedValues: ["LED1"]
-
code: "503"
description: "service unavailable"
expectedValues: []
put:
path: "/api/v1/device/{deviceId}/LED1"
parameterNames: ["LED1" ]
responses:
-
code: "204"
description: "Set the Configuration"
expectedValues: []
-
code: "503"
description: "service unavailable"
expectedValues: []

设备接入

  1. 上传profile
1
2
edgex-cli profile add -f patlite_la6_led_profile.yaml

在这里插入图片描述
2. 添加设备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

curl -X POST -d \
'{
"description": "Patlite LA6 POE",
"name": "patlite",
"adminState": "unlocked",
"operatingState": "enabled",
"protocols":{
"modbus-tcp":{
"name":"patlite_la6",
"Address":"10.0.20.201",
"Port":"502",
"UnitID":"255"
}
},
"labels": [
],
"location": null,
"service": {
"name": "edgex-device-modbus"
},
"profile": {
"name": "LA6-POE"
}
}' \
http://localhost:48081/api/v1/device

通过edgex控制

  1. 查看设备
1
2
3
edgex-cli device list
edgex-cli device list --name patlite
edgex-cli device list --name patlite -v

在这里插入图片描述
2. 控制设备
通过coreCommands——deviceResource(LED1)

1
2
3
4
5




curl -X PUT "http://localhost:48082/api/v1/device/name/patlite/command/LED1" -d '{"LED1":"257"}'

在这里插入图片描述
3. 通过coreCommands——deviceCommands——deviceResources组合(LED1~LED5)

1
2
3


curl -X PUT "http://localhost:48082/api/v1/device/name/patlite/command/CUSTOM" -d '{"LED1":"256", "LED2":"257", "LED3":"256", "LED4":"258", "LED5":"256"}'

在这里插入图片描述
4. 获取设备状态

1
curl -X GET "http://localhost:48082/api/v1/device/name/patlite/command/CUSTOM"|jq .

在这里插入图片描述

MQTT设备

部署MQTT设备服务

  1. Broker
    MQTT Device Service依赖MQTT Broker, 使用时可以单独部署一个MQTT Broker或使用外部的MQTT Broker。在我们本次的实验中,将MQTT Device Service和Broker放在一个yaml文件中部署使用。
  2. docker-compose.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
sudo vim add-device-mqtt.yml
"""
version: '3.7'
networks:
edgex-network:
external: true
name: edgex_edgex-network

services:
mqtt-broker:
container_name: edgex-mqtt-broker
hostname: edgex-mqtt-broker
image: "eclipse-mosquitto"
networks:
edgex-network: {}
ports:
- "1883:1883"

device-mqtt:
container_name: edgex-device-mqtt
environment:
CLIENTS_COMMAND_HOST: edgex-core-command
CLIENTS_COREDATA_HOST: edgex-core-data
CLIENTS_DATA_HOST: edgex-core-data
CLIENTS_METADATA_HOST: edgex-core-metadata
CLIENTS_NOTIFICATIONS_HOST: edgex-support-notifications
CLIENTS_RULESENGINE_HOST: edgex-kuiper
CLIENTS_SCHEDULER_HOST: edgex-support-scheduler
CLIENTS_VIRTUALDEVICE_HOST: edgex-device-random
DATABASES_PRIMARY_HOST: edgex-redis
EDGEX_SECURITY_SECRET_STORE: "false"
REGISTRY_HOST: edgex-core-consul
SERVICE_HOST: edgex-device-mqtt
DRIVER_INCOMINGHOST: edgex-mqtt-broker
DRIVER_INCOMINGTOPIC: MyDataTopic
DRIVER_RESPONSEHOST: edgex-mqtt-broker
DRIVER_RESPONSETOPIC: MyResponseTopic
depends_on:
- mqtt-broker
hostname: edgex-device-mqtt
image: edgexfoundry/docker-device-mqtt-go:1.3.1
# image: edgexfoundry/docker-device-mqtt-go-arm64:1.3.1
networks:
edgex-network: {}
ports:
- "127.0.0.1:49982:49982"
"""


sudo docker-compose -p edgex -f add-device-mqtt.yml up -d

edgex-cli deviceservice list --name edgex-device-mqtt

MQTT Device Service 的几种模式

在这里插入图片描述

设备主动上报数据

在这里插入图片描述

设备服务请求

在这里插入图片描述

虚拟一个EdgeX MQTT设备

模拟设备程序

该设备有一个resource : time; cmd为:localtime;支持get / set

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65






import paho.mqtt.client as mqtt
import time
import json

def on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe(topic="MyCommandTopic")
'''
MyCommandTopic 是该设备订阅的CommandTopic
'''

def on_message(client, userdata, msg):
print("topic:",msg.topic , "\nmessage:" + str(msg.payload))
'''
'''
try:
rcv_data = json.loads(msg.payload.decode('utf-8'))
except:
print("error")
ret_data = rcv_data
'''
返回信息中需要:uuid、cmd、method等信息
'''
print("method:",rcv_data.get("method",None))

if rcv_data.get("method",None) == "get":
print ("cmd:",rcv_data.get("cmd",None))
if rcv_data.get("cmd",None) == "time":
ret_data["time"] = "{}".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
print ("MyResponseTopic publish:",json.dumps(ret_data))

elif rcv_data.get("method",None) == "set":
print ("cmd:",rcv_data.get("cmd",None))
print ("set resource value:",rcv_data.get(rcv_data.get("cmd",None)))
client.publish(topic="MyResponseTopic", payload=json.dumps(ret_data), qos=0)


def mqtt_connect(client, username,password,broker_ip,broker_port=1883,keepalive=60):
ret = 0
try:
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username, password)
ret = client.connect(broker_ip, broker_port, keepalive)
client.loop_start()
except:
ret = -1
return ret

if __name__ == "__main__":
client = mqtt.Client()
ret = mqtt_connect(client,"admin","public","10.0.20.29",1883,60)
print("mqtt_connect:",ret)
while True:

ret_data = { "name": "mqtt_device", "cmd": "time", "time": "{}".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) }
print ("MyDataTopic publish:",json.dumps(ret_data))
client.publish(topic="MyDataTopic", payload=json.dumps(ret_data), qos=0)
time.sleep(6000)
profile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
name: "Test.Device.MQTT.Profile"
manufacturer: "TS Beijing"
model: "MQTT-DEVICE"
description: "Test device profile"
labels:
- "mqtt"
- "test"
deviceResources:
-
name: time
description: "local time"
properties:
value:
{ type: "String", size: "0", readWrite: "W" ,scale: "", offset: "", base: "" }
units:
{ type: "String", readWrite: "R", defaultValue: "" }

deviceCommands:
-
name: localtime
get:
- { index: "1", operation: "get", object: "time", parameter: "time" }
set:
- { index: "1", operation: "set", object: "time", parameter: "time" }

coreCommands:
-
name: localtime
get:
path: "/api/v1/device/{deviceId}/localtime"
responses:
-
code: "200"
description: "get the local time"
expectedValues: ["time"]
-
code: "503"
description: "service unavailable"
expectedValues: []
put:
path: "/api/v1/device/{deviceId}/localtime"
parameterNames: ["time"]
responses:
-
code: "204"
description: "set the local time."
expectedValues: []
-
code: "503"
description: "service unavailable"
expectedValues: []

创建设备

  1. 上传profile
1
2
edgex-cli profile add -f mqtt_device_profile.yaml

  1. 添加设备
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

curl -X POST -d \
'{
"Description": "mqtt device",
"Name": "mqtt_device",
"adminState": "unlocked",
"operatingState": "enabled",
"Protocols":{
"mqtt":{
"Schema": "tcp",
"Host": "edgex-mqtt-broker",
"Port": "1883",
"ClientId": "mqtt_device_test",
"User": "admin",
"Password": "public",
"Topic": "MyCommandTopic"
}
},
"labels": [
],
"location": null,
"service": {
"name": "edgex-device-mqtt"
},
"profile": {
"name": "Test.Device.MQTT.Profile"
}
}' \
http://localhost:48081/api/v1/device

启动模拟的设备程序

1
2
pip3 install paho.mqtt
python3 edgex_mqtt_device.py

查看主动上报的数据

1
edgex-cli reading list -d mqtt_device

在这里插入图片描述

get

1
curl -X GET http://localhost:48082/api/v1/device/name/mqtt_device/command/localtime|jq .

在这里插入图片描述

set

1
curl -X PUT http://localhost:48082/api/v1/device/name/mqtt_device/command/localtime -d '{"time":"12345"}'|jq .

在这里插入图片描述

分布式部署Device Service

DeviceService的分布式部署是该版本(1.3)的一个新feature,下面进行验证。

注销已有的设备服务

以Modbus device service为例

1
2
3
4
5

sudo docker-compose -p edgex -f add-device-modbus.yml stop device-modbus
sudo docker-compose -p edgex -f add-device-modbus.yml rm device-modbus
edgex-cli deviceservice rm --name edgex-device-modbus
edgex-cli deviceservice list

在这里插入图片描述
解除服务对localhost的限制

1
2
3
4
5
6
7
8
9
10
11
vim docker-compose-hanoi.yml

"""例如
ports:
# - 127.0.0.1:48100:48100/tcp
- 48100:48100/tcp
"""

sudo make down

sudo make run

在这里插入图片描述

删除device service的旧记录

Consul 作为保存了所有微服务的注册信息,如果不删除,将会继续使用旧的配置

1
2
3
4



curl --request DELETE http://127.0.0.1:8500/v1/kv/edgex/devices/1.0/edgex-device-modbus

在其他节点重新启动device service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
需要修改device service的docker-compose file。有几点需要注意:
1. 环境变量
1. 环境变量中SERVICE_HOST需要设置为运行device service的主机IP
2. 其他HOST信息需要写运行edgex其他服务的主机IP
2. 设置command
1. 原来镜像的dockerfile中指定了consul://edgex-core-consul 这里需要改为IP
3. 端口映射要允许除localhost外的访问
4. network_mode: "host"
1. 原因
1. SERVICE_HOST一方面被用为服务启动时指定的IP,另一方面要注册到consul和metadata
2. 如果写为宿主机IP则容器内网络不同,web server will stop
3. 如果写成localhost则其他微服务如consul,command 调用时调用不通
2. 解决
1. 修改程序引入其他环境变量区分
2. 设备服务侧使用host部署/host network
3. 使用kubernetes的部署方式,引入service概念解决集群的DNS问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
vim add-device-modbus.yml
"""
version: '3.7'
services:
device-modbus:
container_name: edgex-device-modbus
environment:
SERVICE_HOST: 10.0.20.58
CLIENTS_COMMAND_HOST: 10.0.20.29
CLIENTS_COREDATA_HOST: 10.0.20.29
CLIENTS_DATA_HOST: 10.0.20.29
CLIENTS_METADATA_HOST: 10.0.20.29
CLIENTS_NOTIFICATIONS_HOST: 10.0.20.29
CLIENTS_RULESENGINE_HOST: 10.0.20.29
CLIENTS_SCHEDULER_HOST: 10.0.20.29
CLIENTS_VIRTUALDEVICE_HOST: 10.0.20.29
DATABASES_PRIMARY_HOST: 10.0.20.29
EDGEX_SECURITY_SECRET_STORE: "false"
REGISTRY_HOST: 10.0.20.29
hostname: edgex-device-modbus
image: edgexfoundry/docker-device-modbus-go:1.3.1
# For ARM64
# image: edgexfoundry/docker-device-modbus-go-arm64:1.3.1
ports:
- "49991:49991"
network_mode: "host"
command: ["--cp=consul://10.0.20.29:8500", "--registry", "--confdir=/res"]
"""
sudo docker-compose -p edgex -f add-device-modbus.yml up -d

设置定时任务 —— Scheduler & event

Scheduler 微服务提供了内部时钟(计时器),和事件event(触发器)以定时发送请求。下面我们创建一个每10秒读一次温湿度数据的任务。

1
2
3
4
5

curl -X POST -H "Content-Type: application/json" \
-H "Cache-Control: no-cache" \
-d '{"name": "every_10_seconds", "start": "","frequency": "PT10S"}' \
http://localhost:48085/api/v1/interval
  • name - 唯一名称
  • start - 生效时间,以ISO 8601 YYYYMMDD’T’hhmmss格式表示。空意味着现在。
  • end - 失效时间,同上。
  • frequency - 频度,以ISO 8601 PxYxMxD’T’xHxMxS格式表示。空意味着没有频率
1
2
3
4
5
6
7
8
9
10
11
12
13
14

curl -X POST -H "Content-Type: application/json" \
-H "Cache-Control: no-cache" -d \
'{
"name":"get-temp-humy-events",
"interval":"every_10_seconds",
"target":"core-data",
"protocol":"http",
"httpMethod":"GET",
"address":"edgex-core-command",
"port":48082,
"path":"/api/v1/device/name/RST5900/command/Data"
}' \
http://localhost:48085/api/v1/intervalaction
  • name - 动作唯一名称
  • interval - 间隔唯一名称,与Scheduler名称对应
  • target - 间隔 操作接收者名称(ergo service or name).
  • protocol - 通讯协议 (example HTTP).
  • httpMethod - HTTP protocol verb.
  • address - IP.
  • port -端口.
  • path - url 路径.
  • parameters - (可选)参数例如 http post的 data
1
2
3

curl -X GET http://localhost:48085/api/v1/interval|jq .
curl -X GET http://localhost:48085/api/v1/intervalaction |jq .

在这里插入图片描述
在这里插入图片描述

RulesEngine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39


curl -X POST \
http://127.0.0.1:48075/streams \
-H 'Content-Type: application/json' \
-d '{ "sql": "create stream demo() WITH (FORMAT=\"JSON\", TYPE=\"edgex\")"}'

curl -X GET http://127.0.0.1:48075/streams
curl -X GET http://127.0.0.1:48075/streams/{规则流名称}

curl -X DELETE http://127.0.0.1:48075/streams/{规则流名称}


curl -X POST \
http://127.0.0.1:48075/rules \
-H 'Content-Type: application/json' \
-d '{
"id": "rule-hum-led1",
"sql": "SELECT uint8 FROM demo WHERE Humidity > 50.0",
"actions": [
{
"rest": {
"url": "http://edgex-core-command:48082/api/v1/device/name/patlite/command/LED1",
"method": "put",
"retryInterval": -1,
"dataTemplate": "{\"LED1\":\"257\"}",
"sendSingle": true
}
},
{
"log":{}
}
]
}'

curl -X GET http://127.0.0.1:48075/rules
curl -X GET http://127.0.0.1:48075/rules/{规则名称}

curl -X DELETE http://127.0.0.1:48075/rules/{规则名称}

在这里插入图片描述
配置完成后当湿度大于50%,LED1会亮灯
在这里插入图片描述

Elasticsearch,Kibana,Logstash,NLog实现ASP.NET Core 分布式日志系统 - 雨~桐 - 博客园

Excerpt

Elasticsearch - 简介 Elasticsearch 作为核心的部分,是一个具有强大索引功能的文档存储库,并且可以通过 REST API 来搜索数据。它使用 Java 编写,基于 Apache Lucene,尽管这些细节隐藏在 API 中。通过被索引的字段,可以用许多不同的聚合


Elasticsearch - 简介

Elasticsearch 作为核心的部分,是一个具有强大索引功能的文档存储库,并且可以通过 REST API 来搜索数据。它使用 Java 编写,基于 Apache Lucene,尽管这些细节隐藏在 API 中。通过被索引的字段,可以用许多不同的聚合方式找到任何被存储(索引)的文档。但是,ElasticSearch不仅仅只提供对这些被索引文档的强大搜索功能。快速、分布式、水平扩展,支持实时文档存储和分析,支持数百台服务器和 PB 级索引数据。同时作为 Elastic stack (aka ELK) 的核心,提供了诸如 LogStash、Kibana 和更多的强大应用。

Kibana 是 Elasticsearch 中专门提供强有力的可视化查询Web应用程序。使用Kibana,能非常简单地为 Elasticsearch 中索引的数据创建查询、图表和仪表盘。
Elasticsearch开放了一个 REST API,你会发现许多文档示例是 HTTP 调用,你可以尝试使用 curl 或 postman 等工具。当然,这个 API 的客户端已经用许多不同的语言编写,包括.Net、Java、Python、Ruby和JavaScript等。

Logstash是一个具备实时处理能力的开源的数据收集引擎。可以动态地从不同的来源收集数据,将数据处理(过滤、变形)过之后统一输出到某个特定地址,为将来更多样化的数据分析做准备。

如果你想阅读更多,Elasticsearch 官方网站 可能是最好的地方。

注意:文章可能比较长,如果对Elasticsearch安装以及了解的,可以直接略过,看ASP.NET Core和Nlog部分,本文所有linux命令均在root用户下操作。

Java环境安装

懒人一键安装

查看JDK版本信息

Elasticsearch文档

安装

  官方网站有很多安装方式,我这里采用的rpm安装,大家可以按照自己习惯的方式进行安装即可,下载地址

1

2

3

4

5

6

7

8

9

10

cd /usr/local

mkdir elasticsearch

cd elasticsearch

wget https:

rpm -ivh elasticsearch-5.5.0.rpm 

配置

1

2

3

4

5

6

whereis elasticsearch

cd /etc/elasticsearch

vi elasticsearch.yml

主要配置Network.host(本机ip)和http.port(默认9200)(目前单节点模式,其他参数请参考官方文档)

 

启动服务

1

2

3

4

5

6

7

8

9

firewall-cmd --add-port=9200/tcp --permanent

firewall-cmd --reload

systemctl enable elasticsearch

systemctl start elasticsearch

 在浏览器打开http://192.168.30.128:9200,如下图所示表示启动成功了

Kibana文档

安装

官方下载地址官方安装教程

1

2

3

4

5

6

cd /usr/local/elasticsearch

wget https:

rmp -ivh kibana-5.5.0-x86_64.rpm

配置

1

2

3

4

cd /etc/kibana

vi kibana.yml

 设置端口号:5601,Host地址:”192.168.30.128” ,elasticsearch服务地址为:”http://192.168.30.128:9200

启动服务

1

2

3

4

5

6

7

8

9

firewall-cmd --add-port=5601/tcp --permanent

firewall-cmd --reload

systemctl enable kibana

systemctl start kibana

在浏览器打开http://192.168.30.128:5601,将进入到Kibana管理界面

LogStash文档

安装

官方下载地址 官方安装教程

1

2

3

4

5

6

cd /usr/local/elasticsearch

wget https:

rpm -ivh logstash-5.5.0.rpm

配置

1

2

3

4

5

6

cd /etc/logstash

cd conf.d

vi nlog.conf

 input:采用TCP监控本机8001端口的消息

 filter:使用grok 插件,自定义消息格式,推荐使用grokdebug在线进行调试

 output:使用elasticsearch作为数据存储

 注意:官方有非常丰富的插件进行消息处理,具体可以查看官方文档。

启动服务

1

2

3

4

5

6

7

8

9

firewall-cmd --add-port=8001/tcp --permanent

firewall-cmd --reload

systemctl enable logstash

systemctl start logstash

ASP.ENT Core结合Nlog进行日志记录

 下面是本文介绍的重点内容了,通过Nlog记录日志,将消息发送到logstash,logstash将转换后的消息存储到elasticsearch,并供在kibana中查询使用。

创建ASP.NET Core项目

本文通过VS2017创建的.NETCore 1.1 项目 Elasticsearch.QuickStart

通过Nuget安装Nlog依赖包

NLog.Web.AspNetCore

 

 Nlog.Extensions.Logging (pre版本)

 

在Startup.cs添加Nlog服务

新增Nlog配置(Web根目录)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

<?xml version="1.0" encoding="utf-8" ?>

<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"

      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

      autoReload="true"

      internalLogLevel="Warn"

      internalLogFile="internal-nlog.txt">

  <extensions>

    <!--enable NLog.Web for ASP.NET Core-->

    <add assembly="NLog.Web.AspNetCore"/>

  </extensions>

  <!-- define various log targets -->

  <!--定义日志文件目录-->

  <variable name="logDirectory" value="${basedir}/logs/${shortdate}"/>

  <variable name="nodeName" value="node1"/>

  <targets async="true">

    <!-- 全部日志target -->

    <target xsi:type="File"

            name="allfile"

            fileName="${logDirectory}/nlog-all/${shortdate}.log"

            layout="#node1#${longdate}#${logger}#${uppercase:${level}}#${callsite}#${callsite-linenumber}#${aspnet-request-url}#${aspnet-request-method}#${aspnet-mvc-controller}#${aspnet-mvc-action}#${message}#${exception:format=ToString}#"

            keepFileOpen="false"

            />

    <!-- 本地文件日志target -->

    <target xsi:type="File"

            name="ownLog-file"

            fileName="${logDirectory}/nlog-${level}/${shortdate}.log"

            layout="#${longdate}#${nodeName}#${logger}#${uppercase:${level}}#${callsite}#${callsite-linenumber}#${aspnet-request-url}#${aspnet-request-method}#${aspnet-mvc-controller}#${aspnet-mvc-action}#${message}#${exception:format=ToString}#"

            keepFileOpen="false"

            />

    <!-- Tcp日志target -->

    <target xsi:type="Network"

            name="ownLog-tcp"

            keepConnection="false"

            address ="tcp://192.168.30.128:8001"

            layout="#${longdate}#${nodeName}#${logger}#${uppercase:${level}}#${callsite}#${callsite-linenumber}#${aspnet-request-url}#${aspnet-request-method}#${aspnet-mvc-controller}#${aspnet-mvc-action}#${message}#${exception:format=ToString}#"

            />

    <!--grok 规则-->

    <!--%#{DATA:request_time}#%{DATA:node_name}#%{DATA:class_name}#%{DATA:log_level}#%{DATA:call_site}#%{DATA:line_number}#%{DATA:request_url}#%{DATA:request_method}#%{DATA:container_name}#%{DATA:action_name}#%{DATA:log_info}#%{DATA:exception_msg}#-->

    <!--空白-->

    <target xsi:type="Null" name="blackhole" />

  </targets>

  <!--日志级别 Trace -》Debug-》 Info -》Warn-》 Error-》 Fatal-->

  <!--日志规则-->

  <rules>

    <!--全部日志, 包括Microsoft日志-->

    <logger name="*" minlevel="Trace" writeTo="allfile" />

    <!--自定义日志,排除Microsoft日志-->

    <logger name="Microsoft.*" minlevel="Trace" writeTo="blackhole" final="true" />

    <logger name="*" minlevel="Debug" writeTo="ownLog-file" />

    <logger name="*" minlevel="Info" writeTo="ownLog-tcp" />

  </rules>

</nlog>

 注意:Tcp target中的address指向在logstash中监听的地址,在注释中也给出来了grok的模板信息。

测试Nlog日志记录

在Kibana中查看最终效果

总结

本文只是一个示例教程,希望能起到一个抛砖引玉的作用,详细功能需要大家参考官方文档。Elasticsearch,Kibana,logstash功能非常强大,我也是刚刚接触到,如果有不对的地方,还望大家多多包涵和指正。如果这篇文档对大家有帮助,请点一个赞,谢谢了。

参考

1:LogStash+ElasticSearch简单使用(CentOS)

2:使用ElasticSearch,Kibana,ASP.NET Core和Docker可视化数据

3:Elastic Stack and Product Documentation

4:Elasticsearch在Centos 7上的安装与配置

5:Nlog 官方文档

6:从零开始搭建一个ELKB日志收集系统

EMQX-5.3.1单机集群部署并基于Nginx实现负载均衡-CSDN博客

Excerpt

文章浏览阅读1.6k次,点赞10次,收藏9次。有特殊需求,希望同一个客户端连接至同一个服务器,则可以使用 IP Hash 策略。部署的服务器存在性能差异,我们可以通过配置权重 weight 来修改轮询的几率。本例单机集群部署使用三个节点,分别为node1、node2、node3。3 创建docker-compose.yml文件。最小连接数策略,优先分配给获得连接数较少的服务器。随机轮询就是从待选列表中随机分配连接。6 配置Nginx的负载均衡策略。默认用户名: admin。默认密码: public。7 扩展其他负载均衡策略。IP Hash 策略。


Hoking 于 2023-11-25 17:15:32 发布

本例单机集群部署使用三个节点,分别为node1、node2、node3

一、安装与配置

1 创建数据目录

1
2
3
4
5
mkdir -p node1/data node1/logs

mkdir -p node2/data node2/logs

mkdir -p mode3/data node3/logs

2 数据目录授权

1
2
3
4
5
6
7
chown 1000 node1/ node2/ node3/

chown 1000 node1/data/ node1/logs/

chown 1000 node2/data/ node2/logs/

chown 1000 node3/data/ node3/logs/

3 创建docker-compose.yml文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
version: '3'

services:

emqx1:

image: emqx:5.3.1

container_name: emqx1

environment:

- "EMQX_NODE_NAME=emqx@node1.emqx.io"

- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"

- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io,emqx@node3.emqx.io]"

healthcheck:

test: ["CMD", "/opt/emqx/bin/emqx ctl", "status"]

interval: 5s

timeout: 25s

retries: 5

networks:

emqx-bridge:

aliases:

- node1.emqx.io

ports:

- 1883:1883

- 8083:8083

- 8084:8084

- 8883:8883

- 18083:18083

volumes:

- /etc/localtime:/etc/localtime:ro

- ./node1/logs:/opt/emqx/log

- ./node1/data:/opt/emqx/data

emqx2:

image: emqx:5.3.1

container_name: emqx2

environment:

- "EMQX_NODE_NAME=emqx@node2.emqx.io"

- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"

- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io,emqx@node3.emqx.io]"

healthcheck:

test: ["CMD", "/opt/emqx/bin/emqx ctl", "status"]

interval: 5s

timeout: 25s

retries: 5

networks:

emqx-bridge:

aliases:

- node2.emqx.io

ports:

- 1873:1883

- 8073:8083

- 8074:8084

- 8873:8883

- 18073:18083

volumes:

- /etc/localtime:/etc/localtime:ro

- ./node2/logs:/opt/emqx/log

- ./node2/data:/opt/emqx/data

emqx3:

image: emqx:5.3.1

container_name: emqx3

environment:

- "EMQX_NODE_NAME=emqx@node3.emqx.io"

- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"

- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io,emqx@node3.emqx.io]"

healthcheck:

test: ["CMD", "/opt/emqx/bin/emqx ctl", "status"]

interval: 5s

timeout: 25s

retries: 5

networks:

emqx-bridge:

aliases:

- node3.emqx.io

ports:

- 1863:1883

- 8063:8083

- 8064:8084

- 8863:8883

- 18063:18083

volumes:

- /etc/localtime:/etc/localtime:ro

- ./node3/logs:/opt/emqx/log

- ./node3/data:/opt/emqx/data

networks:

emqx-bridge:

driver: bridge

4 启动运行EMQX

1
docker-compose up -d

5 EMQX的Dashboard访问地址 http://${ip}:${port}   参考:http://192.168.1.xxx:18083/

默认用户名: admin

默认密码: public

登录后提示修改密码

6 配置Nginx的负载均衡策略

nginx.conf内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
user nginx;

worker_processes auto;

error_log /var/log/nginx/error.log warn;

pid /var/run/nginx.pid;

events {

worker_connections 1024;

}

stream {

upstream emqx_tcp_cluster {

server 10.10.1.100:1883 weight=1 max_fails=3 fail_timeout=30s;

server 10.10.1.110:1873 weight=1 max_fails=3 fail_timeout=30s;

server 10.10.1.120:1863 weight=1 max_fails=3 fail_timeout=30s;

}

server {

listen 1893;

proxy_pass emqx_tcp_cluster;

proxy_buffer_size 8k;

tcp_nodelay on;

}

}

7 扩展其他负载均衡策略

随机轮询

随机轮询就是从待选列表中随机分配连接

1
2
3
4
5
6
7
8
9
10
11
12
13
upstream emqx_tcp_cluster {

random;

# emqx的三个实例

server 10.10.1.100:1883;

server 10.10.1.110:1873;

server 10.10.1.120:1863;

}

带权轮询

部署的服务器存在性能差异,我们可以通过配置权重 weight 来修改轮询的几率

1
2
3
4
5
6
7
8
9
10
11
upstream emqx_tcp_cluster {

# emqx的三个实例

server 10.10.1.100:1883 weight=1;

server 10.10.1.110:1873 weight=2;

server 10.10.1.120:1863 weight=3;

}

least_conn 策略

最小连接数策略,优先分配给获得连接数较少的服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
upstream emqx_tcp_cluster {

least_conn;

# emqx的三个实例

server 10.10.1.100:1883;

server 10.10.1.110:1873;

server 10.10.1.120:1863;

}

IP Hash 策略

有特殊需求,希望同一个客户端连接至同一个服务器,则可以使用 IP Hash 策略

1
2
3
4
5
6
7
8
9
10
11
12
13
upstream emqx_tcp_cluster {

ip_hash;

# emqx的三个实例

server 10.10.1.100:1883;

server 10.10.1.110:1873;

server 10.10.1.120:1863;

}

本章节开始了解EF的各种关系。如果你对EF里实体间的各种关系还不是很熟悉,可以看看我的思路,能帮你更快的理解。

I.实体间一对一的关系

添加一个PersonPhoto类,表示用户照片类

复制代码

/// <summary>
/// 用户照片类 /// </summary>
public class PersonPhoto
{
    \[Key\] public int PersonId { get; set; } public byte\[\] Photo { get; set; } public string Caption { get; set; }  //标题
    public Person PhotoOf { get; set; }
}

复制代码

当然,也需要给Person类添加PersonPhoto的导航属性,表示和PersonPhoto一对一的关系:

public PersonPhoto Photo { get; set; }

直接运行程序会报一个错:

Unable to determine the principal end of an association between the types ‘Model.Per-sonPhoto’ and ‘Model.Person’. The principal end of this association must be explicitly configured using either the relationship fluent API or data annotations.

思考:为何第一节的Destination和Lodging类直接在类里加上导航属性就可以生成主外键关系,现在的这个不行呢?
解答:之前文章里的Destination和Lodging是一对多关系,既然是一对多,EF自然就知道设置Destination类的DestinationId为主键,同时设置Lodging类里的DestinationId为外键;但是现在的这个Person类和PersonPhoto类是一对一的关系,如果不手动指定,那么EF肯定不知道设置哪个为主键哪个为外键了,这个其实不难理解。按照逻辑Person类的PersonId肯定是主键了,直接标注[ForeignKey(“PhotoOf”)]即可,这是Data Annotation方式配置,自然也可以Fluent API一下,博主个人更喜欢这个方式。

在演示Fluent API如何配置Person类和PersonPhoto的一对一关系之前,先系统的学习下EF里实体关系配置的方法。EF里的实体关系配置分为Has和With系列的方法:Optional 可选的、Required 必须的、Many 多个。举例:

A.HasRequired(a => a.B).WithOptional(b => b.A);

这里的a=>a.B是lambda表示写法,就是找到A类里的导航属性B。命名a不固定,可以随意,q=>q.B也是可以的。但是B是A类的属性,故习惯用小写a。

Has方法:

  1. HasOptional:前者包含后者一个实例或者为null
  2. HasRequired:前者(A)包含后者(B)一个不为null的实例
  3. HasMany:前者包含后者实例的集合

With方法:

  1. WithOptional:后者(B)可以包含前者(A)一个实例或者null
  2. WithRequired:后者包含前者一个不为null的实例
  3. WithMany:后者包含前者实例的集合

摘自这里 这是较为好的理解方式。上面一句配置意思就是A类包含B类一个不为null的实例,B类包含A类一个实例,也可以不包含。最标准的一对一配置。ok,现在试着写下上面Person类和PersonPhoto类的一对一的关系如何配置:

this.HasRequired(p => p.PhotoOf).WithOptional(p => p.Photo);

再跑下程序,数据库就生成了,是一对一的关系。Person表可以没有对应的PersonPhoto表数据,但是PersonPhoto表每一条数据都必须对应一条Person表数据。意思就是人可以没有照片,但是有的照片必须属于某个人。关系配置是这样的效果,其实可以随便改,也可以配置成每个人都必须有对应的照片。把上面的WithOptional改成WithRequired,对应到数据库里就是null变成了not null。

思考:这里并没有像之前一样添加一个实体类就同时添加到BreakAwayContext类中,但是为何照样能在数据库中生成PersonPhotos表?
解答:添加到BreakAwayContext类中是让数据库上下文能跟踪到这个类,方便进行CRUD(增查改删)。这里不把PersonPhoto类添加到BreakAwayContext类中是因为程序中一般并不会去单独增删改查PersonPhoto类,对PersonPhoto类的操作都是先找Person类,然后通过一对一的关系找到PersonPhoto类,这个比较符合实际情况。数据库中能生成PersonPhotos就更好理解了,因为有这个实体类嘛。
思考:如果只需要加入主表类到BreakAwayContext类中,那么其他什么一对多,多对多的关系是不是都只要加主表类到BreakAwayContext类中呢?
解答:还是需要根据实际情况考虑,上面的PersonPhoto类已经解释过了,实际情况中不太可能单独操作PersonPhoto类。一对多关系里Logding住宿类是从表类,Destination是其主表。这个想想也知道必须要让数据库上下文跟踪到Lodging住宿类,因为太可能直接操作Lodging了。比如前台添加一个搜索住宿的功能,那是不是需要直接操作此从表了呢?肯定需要了。所以还是需要根据实际情况考虑。这里仅是个人观点,如有瑕疵,恳请指正。

II.实体间一对多的关系

之前的文章里,景点类Destination和住宿类Lodging是一对多的关系,这个很好理解:一个景点那有多个住宿的地方,而一个住宿的地方只属于一个景点。当然也可以没有,一个景点那一个住宿的地方就没有,一个住宿的地方不属于任何景点,这个也是可以的。之前的程序实现的就是互相不属于,全部可空。现在来配置下住宿的地方必须属于某个景点:

Data Annotations
直接在住宿类Lodging的导航属性上添加[Required]标注即可:

[Required] public Destination Destination { get; set; }

Fluent API

this.HasMany(d => d.Lodgings).WithRequired(l => l.Destination).Map(l => l.MapKey(“DestinationId”));

这行是在DestinationMap类里写的,对应到上面的描述,前者就是Destination,后者是Lodging。整句的意思就是:Destination类包含多个(HasMany)Lodging类实例的集合,Lodging类包含前者一个不为null(WithRequired)的实例。.MapKey是指定外键名的。此处如果住宿类不必须属于某个景点,那么直接把WithRequired换成WithOptional即可。查询的时候前者使用Inner join,后者使用Left join。不懂Inner、Left和Cross Join区别的点这里

上面是以Destination为前者的,当然也可以以Lodging为前者,去LodgingMap里写下如下配置,其实是一个意思:

this.HasRequired(d => d.Destination).WithMany(l => l.Lodgings).Map(l => l.MapKey(“DestinationId”));

重跑下程序,生成的数据库Lodging表的外键已经设置成为了不可空,并外键名是指定的“DestinationId”:

 官方给出的一对多的解释是这样的,其实还没我解释的通俗易懂,发个图你们感受下吧:

ok,上面说了一对多的关系,是标准的一对多关系,两个表里分别有导航属性。但是如果有列不遵循这个规则呢?
继续添加一个新类InternetSpecial,记录一些跟平常住宿价格不一样的类,节假日等。这个类不仅有导航属性Accommodation,还有主键列AccommodationId:

复制代码

/// <summary>
/// 住宿特殊价格类(节假日等) /// </summary>
public class InternetSpecial
{ public int InternetSpecialId { get; set; } public int Nights { get; set; }  //几晚 public decimal CostUSD { get; set; }  //价钱 public DateTime FromDate { get; set; } public DateTime ToDate { get; set; } 
    public int AccommodationId { get; set; }
    public Lodging Accommodation { get; set; }
}

复制代码

同时给住宿类Lodging添加一个InternetSpecial类的导航属性:

public List InternetSpecials { get; set; }

配置好了跑下程序,生成的数据库表:

由表可见,不仅有AccommodationId列,还有个外键列Accommodation_LodgingId,明显这个是因为没有设置外键的原因,EF不知道要给哪个属性当外键。现在分别使用Data Annotation和Fluent API设置试试

Data Annotation:

[ForeignKey(“Accommodation”)] public int AccommodationId { get; set; }

或者这样:

[ForeignKey(“AccommodationId”)] public Lodging Accommodation { get; set; }

Fluent API:

this.HasRequired(s => s.Accommodation)
.WithMany(l => l.InternetSpecials)
.HasForeignKey(s => s.AccommodationId); //外键 //如果实体类没定义AccommodationId,那么可以使用Map方法直接指定外键名:.Map(s => s.MapKey(“AccommodationId”))

这个就不详细解释了,如果还看不懂,看看文章开头我分析的Has和With系列方法。配置好重新跑下程序,外键就是AccommodationId了,没有多余的Accommodation_LodgingId列了。

III.实体间多对多的关系

添加一个活动类Activity,跟旅行类Trip是多对多的关系。这个也不难理解:一个旅行有多个活动,一个活动可以属于多个旅行。

复制代码

/// <summary>
/// 活动类 /// </summary>
public class Activity
{ public int ActivityId { get; set; } //\[Required, MaxLength(50)\]
    public string Name { get; set; } public List<Trip> Trips { get; set; }    //和Trip类是多对多关系
}

复制代码

跟之前的一样在BreakAwayContext类里添加Activity类,让数据库上下文知道Activity类:

public DbSet<CodeFirst.Model.Activity> Activitys { get; set; }

同时在Trip旅行类里添加上导航属性,形成跟Activity活动类的多对多关系

public List Activitys { get; set; }

ok,已经可以了,跑下程序得到如下数据库:

可以看出,EF里的多对多关系是由第三张表来连接两个表的。ActivityTrips表连接了Activityes表和Trips表。表名列名都是默认命名,都可以自己配置。文章的开头已经说了那么多了,多对多肯定是用HasMany和WithMany方法,在ActivityMap类里写下如下Fluent API:

复制代码

        this.HasMany(a => a.Trips).WithMany(t => t.Activitys).Map(m => {
                m.ToTable("TripActivities");      //中间关系表表名
                m.MapLeftKey("ActivityId");        //设置Activity表在中间表主键名
                m.MapRightKey("TripIdentifier");   //设置Trip表在中间表主键名
            });

复制代码

同样也可以在TripMap里配置,顺序不一样罢了:

复制代码

        this.HasMany(t => t.Activities).WithMany(a => a.Trips).Map(m => {
            m.ToTable("TripActivities");    //中间关系表表名
            m.MapLeftKey("TripIdentifier");   //设置Activity表在中间表的主键名
            m.MapRightKey("ActivityId");    //设置Trip表在中间表的主键名
        });

复制代码

两种配置任选其一就可以了,重新跑下程序就可以了。都配置好了在程序里如何读取这个对多对的数据呢,简单写一句:

var tripWithActivities = context.Trips.Include(“Activities”).FirstOrDefault();

很明显,用到了Include贪婪加载把相关的外键表数据(如果有)也拿到了内存中:

是不是也需要考虑性能的问题呢?如果只需要修改主表的某个列,那贪婪加载出相关联的从表数据做什么?会发送很多冗余的sql到数据库。当然如果要根据主表找从表数据的话,这么加载也是好事,超级方便。EF小组的原话是:Entity Framework took care of the joins to get across the join table without you having to be aware of its presence. In the same way, any time you do inserts, updates, or deletes within this many-to-many relationship, Entity Framework will work out the proper SQL for the join without you having to worry about it in your code.
意思就是如果你配置好了主外键关系,EF会帮你生成合适的连表查询(join)sql,不会你再多费心。关于一对多、多对多的EF查询和效率问题,后续会有专门系列文章讲解。

IV.级联删除

EF配置的外键关系除了配置为Optional(可选的,也就是可空),其他默认都是级联删除的,意思就是删除主表的某个数据,相关联的从表数据都自动删除:

为了演示添加一个方法:

复制代码

    //级联删除(服务端延迟加载)
    private static void DeleteDestinaInMemoryAndDbCascade()
    { int destinationId; using (var context = new CodeFirst.DataAccess.BreakAwayContext())
        { var destination = new CodeFirst.Model.Destination
            {
                Name \= "Sample Destination",
                Lodgings \= new List<CodeFirst.Model.Lodging> { new CodeFirst.Model.Lodging {Name="Lodging One"}, new CodeFirst.Model.Lodging {Name="Lodging Two"}
                }
            };
            context.Destinations.Add(destination);  //添加测试数据
            context.SaveChanges();
            destinationId \= destination.DestinationId;  //记住主键id
        } using (var context = new CodeFirst.DataAccess.BreakAwayContext())
        { //这里用了贪婪加载,把主键和相关的外键记录都加载到内存中了
            var destination = context.Destinations.Include("Lodgings").Single(d => d.DestinationId == destinationId); var aLodging = destination.Lodgings.FirstOrDefault();
            context.Destinations.Remove(destination);

context.SaveChanges();
}
}

复制代码

很简单,添加了一条主键数据Sample Destination,同时添加了以此主键为基础的两条外键数据:Lodging One和Lodging Two,即:添加了一个旅游景点,又添加了此旅游景点下的两个住宿的地方。之后延迟加载出主表数据和相关联的两条从表数据并删除,使用sql profiler能监测到如下sql:

第一条是删除主表的数据,后两条是删除相关联从表数据的sql。这种级联删除稍显麻烦,同时加载了相关联从表的数据到内存中再发送删除命令到数据库。其实只需要加载要删除的主表记录到内存中就可以了,因为数据库已经打开了级联删除,只需要发送删除主表数据的指令到数据库,数据库会自动删除相关联的从表记录。可以监控到如下sql:

exec sp_executesql N’SELECT
[Project2].[DestinationId] AS [DestinationId],
[Project2].[Name] AS [Name],
[Project2].[Country] AS [Country],
[Project2].[Description] AS [Description],
[Project2].[image] AS [image],
[Project2].[C1] AS [C1],
[Project2].[LodgingId] AS [LodgingId],
[Project2].[Name1] AS [Name1],
[Project2].[Owner] AS [Owner],
[Project2].[IsResort] AS [IsResort],
[Project2].[MilesFromNearestAirport] AS [MilesFromNearestAirport],
[Project2].[PrimaryContact_PersonId] AS [PrimaryContact_PersonId],
[Project2].[SecondaryContact_PersonId] AS [SecondaryContact_PersonId],
[Project2].[DestinationId1] AS [DestinationId1]
FROM ( SELECT
[Limit1].[DestinationId] AS [DestinationId],
[Limit1].[Name] AS [Name],
[Limit1].[Country] AS [Country],
[Limit1].[Description] AS [Description],
[Limit1].[image] AS [image],
[Extent2].[LodgingId] AS [LodgingId],
[Extent2].[Name] AS [Name1],
[Extent2].[Owner] AS [Owner],
[Extent2].[IsResort] AS [IsResort],
[Extent2].[MilesFromNearestAirport] AS [MilesFromNearestAirport],
[Extent2].[PrimaryContact_PersonId] AS [PrimaryContact_PersonId],
[Extent2].[SecondaryContact_PersonId] AS [SecondaryContact_PersonId],
[Extent2].[DestinationId] AS [DestinationId1],
CASE WHEN ([Extent2].[LodgingId] IS NULL) THEN CAST(NULL AS int) ELSE 1 END AS [C1]
FROM (SELECT TOP (2)
[Extent1].[DestinationId] AS [DestinationId],
[Extent1].[Name] AS [Name],
[Extent1].[Country] AS [Country],
[Extent1].[Description] AS [Description],
[Extent1].[image] AS [image]
FROM [dbo].[Destinations] AS [Extent1]
WHERE [Extent1].[DestinationId] = @p__linq__0 ) AS [Limit1]
LEFT OUTER JOIN [dbo].[Lodgings] AS [Extent2] ON [Limit1].[DestinationId] = [Extent2].[DestinationId]
) AS [Project2]
ORDER BY [Project2].[DestinationId] ASC, [Project2].[C1] ASC’,N‘@p__linq__0 int’,@p__linq__0=3

View Code

直接复制到数据库执行查询,发现它会返回一条主表数据和两条相关联的从表数据。除非必须查出外键记录才使用Include贪婪加载,否则千万不要,EF中跟手写ado不一样,很容易生成很冗余的sql。这里其实只需要主键的记录就可以了,修改下方法:

复制代码

    //级联删除(仅加载主键记录)
private static void DeleteDestinationInMemeryAndDbCascade()
{ int destinationId; using (var context = new CodeFirst.DataAccess.BreakAwayContext())
{ var destination = new CodeFirst.Model.Destination
{
Name = “Sample Destination”,
Lodgings = new List<CodeFirst.Model.Lodging> { new CodeFirst.Model.Lodging {Name=”Lodging One”}, new CodeFirst.Model.Lodging {Name=”Lodging Two”}
}
};
context.Destinations.Add(destination);
context.SaveChanges();
destinationId = destination.DestinationId;
} using (var context = new CodeFirst.DataAccess.BreakAwayContext())
{ var destination = context.Destinations
.Single(d => d.DestinationId == destinationId); //只取一条主键记录
context.Destinations.Remove(destination); //然后移除主键记录,外键记录又数据库级联删除
context.SaveChanges();
}
}

复制代码

监控的sql干干净净,只会查出主表数据。

复制代码

exec sp_executesql N’SELECT TOP (2)
[Extent1].[DestinationId] AS [DestinationId],
[Extent1].[Name] AS [Name],
[Extent1].[Country] AS [Country],
[Extent1].[Description] AS [Description],
[Extent1].[image] AS [image]
FROM [dbo].[Destinations] AS [Extent1]
WHERE [Extent1].[DestinationId] = @p__linq__0’,N‘@p__linq__0 int’,@p__linq__0=1

复制代码

补充:这里只查一条记录却使用SELECT TOP (2)… 是保证能查到记录。

删除sql更干净,只删除主表数据,相关联的从表数据删除由数据库级联删除完成:

exec sp_executesql N’delete [dbo].[Destinations]
where ([DestinationId] = @0)’,N‘@0 int’,@0=1

级联删除虽然方便,但是并不常用。试想我们在博客园写了很多随笔,为不同随笔加了不同的标签好区分和管理。某一天突然发现之前定的某个标签并不合理,但是这个标签已经在很多随笔里用了,如果此时删除标签,数据库级联的把标注此标签的随笔都删了,这个肯定不合适。应该是标签删了,之前贴过此标签的文章没了这个标签,这个才符合逻辑。

数据库里可以可视化的设置不级联删除,Fluent API配置此外键关系时可以设置不级联删除:

this.HasMany(d => d.Lodgings).WithRequired(l => l.Destination)
.Map(l => l.MapKey(“DestinationId”)) //一对多并指定外键名
.WillCascadeOnDelete(false); // 关闭级联删除

再跑下程序,去看下数据库本外键自然就没了级联删除。

园友提供了一个很好的建议:考虑到EF中的级联删除并不常用,所以可以在全局里关掉所有主外键关系的级联删除,如果需要可以打开某个主外键的级联删除。

@郭明锋:好文章,很久没有看到这么好的EF文章了,推荐
EF默认开启级联删除,确实是挺操蛋的设置,所以我的做法是在上下文的OnModelCreating方法中
modelBuilder.Conventions.Remove();
移除这个默认约定,再在需要开启级联删除的FluentAPI关系映射中用. WillCascadeOnDelete(true) 单独开启

ok,本文就到此结束,后续还有更通俗易懂的文章介绍EF,请保持关注。本章源码

EF Code First 系列文章导航

  1. EF Code First 初体验
  2. EF里的默认映射以及如何使用Data Annotations和Fluent API配置数据库的映射  本节源码
  3. EF里Guid类型数据的自增长、时间戳和复杂类型的用法  本节源码
  4. EF里一对一、一对多、多对多关系的配置和级联删除  本节源码
  5. EF里的继承映射关系TPH、TPT和TPC的讲解以及一些具体的例子  本节源码

ELKB5.2.2集群环境部署及优化终极文档-腾讯云开发者社区-腾讯云

Excerpt

本人陆陆续续接触了ELK的1.4,2.0,2.4,5.0,5.2版本,可以说前面使用当中一直没有太多感触,最近使用5.2才慢慢有了点感觉,可见认知事务的艰难,本次文档尽量详细点,现在写文档越来越喜欢简洁了,不知道是不是不太好。不扯了看正文(注意这里的配置是优化前配置,正常使用没问题,量大时需要优化)。


ELKB5.2.2集群环境部署

本人陆陆续续接触了ELK的1.4,2.0,2.4,5.0,5.2版本,可以说前面使用当中一直没有太多感触,最近使用5.2才慢慢有了点感觉,可见认知事务的艰难,本次文档尽量详细点,现在写文档越来越喜欢简洁了,不知道是不是不太好。不扯了看正文(注意这里的配置是优化前配置,正常使用没问题,量大时需要优化)。

备注:

本次属于大版本变更,有很多修改,部署重大修改如下:

1,filebeat直接输出kafka,并drop不必要的字段如beat相关的

2,elasticsearch集群布局优化:分三master节点6data节点

3,logstash filter 加入urldecode支持url、reffer、agent中文显示

4,logstash fileter加入geoip支持客户端ip区域城市定位功能

5, logstash mutate替换字符串并remove不必要字段如kafka相关的

5,elasticsearch插件需要另外部署node.js,不能像以前一样集成一起

6,nginx日志新增request参数、请求方法

一,架构

可选架构

filebeat–elasticsearch–kibana

filebeat–logstash–kafka–logstash–elasticsearch–kibana

filebeat–kafka–logstash–elasticsearch–kibana

由于filebeat5.2.2支持多种输出logstash、elasticsearch、kafka、redis、syslog、file等,为了优化资源使用率且能够支持大并发场景选择

filebeat(18)–kafka(3)–logstash(3)–elasticsearch(3)–kibana(3–nginx负载均衡

共3台物理机、12台虚拟机、系统CentOS6.8、具体划分如下:

1
服务器一(192.168.188.186) kafka1 32G700G4CPU logstash8G 100G 4CPU elasticsearch1 40G1.4T 8CPU elasticsearch2 40G 1.4T 8CPU 服务器二(192.168.188.187) kafka2 32G700G4CPU logstash8G 100G 4CPU elasticsearch3 40G1.4T 8CPU elasticsearch4 40G 1.4T 8CPU 服务器三(192.168.188.188) kafka3 32G700G4CPU logstash8G 100G 4CPU elasticsearch5 40G1.4T 8CPU elasticsearch6 40G 1.4T 8CPU 磁盘分区 Logstach 100G SWAP 8G /boot 200M 剩下/ Kafka 700G SWAP 8G /boot 200M /30G 剩下 /data Elasticsearch 1.4T SWAP 8G /boot 200M /30G 剩下 /data IP分配 Elasticsearch1-6 192.168.188.191-196 kibana1-3 192.168.188.191/193/195 kafka1-3 192.168.188.237-239 logstash 192.168.188.238/198/240

二,环境准备

1
yum -y remove java-1.6.0-openjdk yum -y remove java-1.7.0-openjdk yum -y remove perl-* yum -y remove sssd-* yum -y install java-1.8.0-openjdk java -version yum update reboot

设置host环境kafka需要用到

cat /etc/hosts

1
192.168.188.191 ES191(master和data) 192.168.188.192 ES192(data) 192.168.188.193 ES193(master和data) 192.168.188.194 ES194(data) 192.168.188.195 ES195(master和data) 192.168.188.196 ES196(data) 192.168.188.237 kafka237 192.168.188.238 kafka238 192.168.188.239 kafka239 192.168.188.197 logstash197 192.168.188.198 logstash198 192.168.188.240 logstash240

三,部署elasticsearch集群

mkdir /data/esnginx

mkdir /data/eslog

rpm -ivh /srv/elasticsearch-5.2.2.rpm 

chkconfig –add elasticsearch

chkconfig postfix off

rpm -ivh /srv/kibana-5.2.2-x86_64.rpm 

chown  elasticsearch:elasticsearch /data/eslog -R

chown  elasticsearch:elasticsearch /data/esnginx -R

配置文件(3master+6data)

[root@ES191 elasticsearch]# cat elasticsearch.yml|grep -Ev ‘^#|^$’

1
cluster.name: nginxlog node.name: ES191 node.master: true node.data: true node.attr.rack: r1 path.data: /data/esnginx path.logs: /data/eslog bootstrap.memory_lock: true network.host: 192.168.188.191 http.port: 9200 transport.tcp.port: 9300 discovery.zen.ping.unicast.hosts: ["192.168.188.191","192.168.188.192","192.168.188.193","192.168.188.194","192.168.188.195","192.168.188.196"] discovery.zen.minimum_master_nodes: 2 gateway.recover_after_nodes: 5 gateway.recover_after_time: 5m gateway.expected_nodes: 6 cluster.routing.allocation.same_shard.host: true script.engine.groovy.inline.search: on script.engine.groovy.inline.aggs: on indices.recovery.max_bytes_per_sec: 30mb http.cors.enabled: true http.cors.allow-origin: "*" bootstrap.system_call_filter: false#内核3.0以下的需要,centos7内核3.10不需要

特别注意

1
/etc/security/limits.conf elasticsearch soft memlock unlimited elasticsearch hard memlock unlimited elasticsearch soft nofile 65536 elasticsearch hard nofile 131072 elasticsearch soft nproc 2048 elasticsearch hard nproc 4096 /etc/elasticsearch/jvm.options # Xms represents the initial size of total heap space # Xmx represents the maximum size of total heap space -Xms20g -Xmx20g

启动集群

service elasticsearch start

健康检查

1
http://192.168.188.191:9200/_cluster/health?pretty=true { "cluster_name" : "nginxlog", "status" : "green", "timed_out" : false, "number_of_nodes" : 6, "number_of_data_nodes" : 6, "active_primary_shards" : 0, "active_shards" : 0, "relocating_shards" : 0, "initializing_shards" : 0, "unassigned_shards" : 0, "delayed_unassigned_shards" : 0, "number_of_pending_tasks" : 0, "number_of_in_flight_fetch" : 0, "task_max_waiting_in_queue_millis" : 0, "active_shards_percent_as_number" : 100.0 }

elasticsearch-head插件

http://192.168.188.215:9100/

连接上面192.168.188.191:9200任意一台即可

设置分片

官方建议生成索引时再设置

curl -XPUT ‘http://192.168.188.193:9200/\_all/\_settings?preserve\_existing=true‘ -d ‘{

  “index.number_of_replicas” : “1”,

  “index.number_of_shards” : “6”

}’

没有生效,后来发现这个分片设置可以在模版创建时指定,目前还是使用默认1副本,5分片。

其他报错(这个只是参考,优化时有方案)

bootstrap.system_call_filter: false   # 针对 system call filters failed to install,

参见 https://www.elastic.co/guide/en/elasticsearch/reference/current/system-call-filter-check.html

[WARN ][o.e.b.JNANatives ] unable to install syscall filter: 

java.lang.UnsupportedOperationException: seccomp unavailable: requires kernel 3.5+ with CONFIG_SECCOMP and CONFIG_SECCOMP_FILTER compiled in

四、部署kafka集群

kafka集群搭建

1,zookeeper集群

1
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz tar zxvf zookeeper-3.4.10.tar.gz -C /usr/local/ ln -s /usr/local/zookeeper-3.4.10/ /usr/local/zookeeper mkdir -p /data/zookeeper/data/ vim /usr/local/zookeeper/conf/zoo.cfg tickTime=2000 initLimit=5 syncLimit=2 dataDir=/data/zookeeper/data clientPort=2181 server.1=192.168.188.237:2888:3888 server.2=192.168.188.238:2888:3888 server.3=192.168.188.239:2888:3888 vim /data/zookeeper/data/myid 1 /usr/local/zookeeper/bin/zkServer.sh start

2,kafka集群

wget http://mirrors.hust.edu.cn/apache/kafka/0.10.0.1/kafka\_2.11-0.10.0.1.tgz

tar zxvf kafka_2.11-0.10.0.1.tgz -C /usr/local/

ln -s /usr/local/kafka_2.11-0.10.0.1 /usr/local/kafka

diff了下server.properties和zookeeper.properties变动不大可以直接使用

vim /usr/local/kafka/config/server.properties

1
broker.id=237 port=9092 host.name=192.168.188.237 num.network.threads=4 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafkalog num.partitions=3 num.recovery.threads.per.data.dir=1 log.retention.hours=24 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=192.168.188.237:2181,192.168.188.238:2181,192.168.188.239:237 zookeeper.connection.timeout.ms=6000 producer.type=async broker.list=192.168.188.237:9092,192.168.188.238:9092,192.168.188.239:9092

mkdir /data/kafkalog

修改内存使用大小

vim /usr/local/kafka/bin/kafka-server-start.sh

    export KAFKA_HEAP_OPTS=”-Xmx16G -Xms16G”

启动kafka

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

创建六组前端topic

/usr/local/kafka/bin/kafka-topics.sh –create –topic ngx1-168 –replication-factor 1 –partitions 3 –zookeeper 192.168.188.237:2181,192.168.188.238:2181,192.168.188.239:2181

/usr/local/kafka/bin/kafka-topics.sh –create –topic ngx2-178 –replication-factor 1 –partitions 3 –zookeeper  192.168.188.237:2181,192.168.188.238:2181,192.168.188.239:2181

/usr/local/kafka/bin/kafka-topics.sh –create –topic ngx3-188 –replication-factor 1 –partitions 3 –zookeeper  192.168.188.237:2181,192.168.188.238:2181,192.168.188.239:2181

检查topic

/usr/local/kafka/bin/kafka-topics.sh –list –zookeeper  192.168.188.237:2181,192.168.188.238:2181,192.168.188.239:2181

ngx1-168

ngx2-178

ngx3-188

3,开机启动

cat /etc/rc.local

/usr/local/zookeeper/bin/zkServer.sh start

/usr/local/kafka/bin/kafka-server-start.sh  -daemon /usr/local/kafka/config/server.properties 

注意:开机启动如果设置在rc.local里,java安装又不是用yum安装的openjdk-1.8.0时,需要指定JAVA_HOME,否则java环境不生效,受java环境影响的zookeeper与kafka服务也启动不了,因为java环境一般配置在/etc/profile里,它的生效时间在rc.local后。 

五,部署配置logstash

安装

rpm -ivh logstash-5.2.2.rpm

mkdir /usr/share/logstash/config

#1. 复制配置文件到logstash home

cp /etc/logstash /usr/share/logstash/config

#2. 配置路径

vim /usr/share/logstash/config/logstash.yml

修改前:

path.config: /etc/logstash/conf.d

修改后:

path.config: /usr/share/logstash/config/conf.d

#3.修改 startup.options

修改前:

LS_SETTINGS_DIR=/etc/logstash

修改后:

LS_SETTINGS_DIR=/usr/share/logstash/config

修改startup.options需要执行/usr/share/logstash/bin/system-install 生效

配置

消费者输出端3个logstash只负责一部分

in-kafka-ngx1-out-es.conf

in-kafka-ngx2-out-es.conf

in-kafka-ngx3-out-es.conf

[root@logstash197 conf.d]# cat in-kafka-ngx1-out-es.conf 

1
input { kafka { bootstrap_servers => "192.168.188.237:9092,192.168.188.238:9092,192.168.188.239:9092" group_id => "ngx1" topics => ["ngx1-168"] codec => "json" consumer_threads => 3 decorate_events => true } } filter { mutate { gsub => ["message", "\\x", "%"] remove_field => ["kafka"] } json { source => "message" remove_field => ["message"] } geoip { source => "clientRealIp" } urldecode { all_fields => true } } output { elasticsearch { hosts => ["192.168.188.191:9200","192.168.188.192:9200","192.168.188.193:9200","192.168.188.194:9200","192.168.188.195:9200","192.168.188.196:9200"] index => "filebeat-%{type}-%{+YYYY.MM.dd}" manage_template => true template_overwrite => true template_name => "nginx_template" template => "/usr/share/logstash/templates/nginx_template" flush_size => 50000 idle_flush_time => 10 } }

nginx 模版

[root@logstash197 logstash]# cat /usr/share/logstash/templates/nginx_template 

1
{ "template" : "filebeat-*", "settings" : { "index.refresh_interval" : "10s" }, "mappings" : { "_default_" : { "_all" : {"enabled" : true, "omit_norms" : true}, "dynamic_templates" : [ { "string_fields" : { "match_pattern": "regex", "match" : "(agent)|(status)|(url)|(clientRealIp)|(referrer)|(upstreamhost)|(http_host)|(request)|(request_method)|(upstreamstatus)", "match_mapping_type" : "string", "mapping" : { "type" : "string", "index" : "analyzed", "omit_norms" : true, "fields" : { "raw" : {"type": "string", "index" : "not_analyzed", "ignore_above" : 512} } } } } ], "properties": { "@version": { "type": "string", "index": "not_analyzed" }, "geoip" : { "type": "object", "dynamic": true, "properties": { "location": { "type": "geo_point" } } } } } } }

启动

/usr/share/logstash/bin/logstash -f /usr/share/logstash/config/conf.d/in-kafka-ngx1-out-es.conf  &

默认logstash开机启动

参考

/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.5/DEVELOPER.md

报错处理

[2017-05-08T12:24:30,388][ERROR][logstash.inputs.kafka    ] Unknown setting ‘zk_connect’ for kafka

[2017-05-08T12:24:30,390][ERROR][logstash.inputs.kafka    ] Unknown setting ‘topic_id’ for kafka

[2017-05-08T12:24:30,390][ERROR][logstash.inputs.kafka    ] Unknown setting ‘reset_beginning’ for kafka

[2017-05-08T12:24:30,395][ERROR][logstash.agent           ] Cannot load an invalid configuration {:reason=>”Something is wrong with your configuration.”}

验证日志 

[root@logstash197 conf.d]# cat /var/log/logstash/logstash-plain.log 

1
[2017-05-09T10:43:20,832][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://192.168.188.191:9200/, http://192.168.188.192:9200/, http://192.168.188.193:9200/, http://192.168.188.194:9200/, http://192.168.188.195:9200/, http://192.168.188.196:9200/]}} [2017-05-09T10:43:20,838][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.188.191:9200/, :path=>"/"} [2017-05-09T10:43:20,919][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x59d1baad URL:http://192.168.188.191:9200/>} [2017-05-09T10:43:20,920][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.188.192:9200/, :path=>"/"} [2017-05-09T10:43:20,922][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x39defbff URL:http://192.168.188.192:9200/>} [2017-05-09T10:43:20,924][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.188.193:9200/, :path=>"/"} [2017-05-09T10:43:20,927][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x6e2b7f40 URL:http://192.168.188.193:9200/>} [2017-05-09T10:43:20,927][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.188.194:9200/, :path=>"/"} [2017-05-09T10:43:20,929][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x208910a2 URL:http://192.168.188.194:9200/>} [2017-05-09T10:43:20,930][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.188.195:9200/, :path=>"/"} [2017-05-09T10:43:20,932][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x297a8bbd URL:http://192.168.188.195:9200/>} [2017-05-09T10:43:20,933][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.188.196:9200/, :path=>"/"} [2017-05-09T10:43:20,935][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>#<URI::HTTP:0x3ac661af URL:http://192.168.188.196:9200/>} [2017-05-09T10:43:20,936][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>"/usr/share/logstash/templates/nginx_template"} [2017-05-09T10:43:20,970][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"filebeat-*", "settings"=>{"index.refresh_interval"=>"10s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "omit_norms"=>true}, "dynamic_templates"=>[{"string_fields"=>{"match_pattern"=>"regex", "match"=>"(agent)|(status)|(url)|(clientRealIp)|(referrer)|(upstreamhost)|(http_host)|(request)|(request_method)", "match_mapping_type"=>"string", "mapping"=>{"type"=>"string", "index"=>"analyzed", "omit_norms"=>true, "fields"=>{"raw"=>{"type"=>"string", "index"=>"not_analyzed", "ignore_above"=>512}}}}}]}}}} [2017-05-09T10:43:20,974][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/nginx_template [2017-05-09T10:43:21,009][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>[#<URI::Generic:0x65ed1af5 URL://192.168.188.191:9200>, #<URI::Generic:0x2d2a52a6 URL://192.168.188.192:9200>, #<URI::Generic:0x6e79e44b URL://192.168.188.193:9200>, #<URI::Generic:0x531436ae URL://192.168.188.194:9200>, #<URI::Generic:0x5e23a48b URL://192.168.188.195:9200>, #<URI::Generic:0x2163628b URL://192.168.188.196:9200>]} [2017-05-09T10:43:21,010][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-filter-geoip-4.0.4-java/vendor/GeoLite2-City.mmdb"} [2017-05-09T10:43:21,022][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500} [2017-05-09T10:43:21,037][INFO ][logstash.pipeline ] Pipeline main started [2017-05-09T10:43:21,086][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

六,部署配置filebeat

安装

rpm -ivh filebeat-5.2.2-x86_64.rpm

nginx日志格式需要为json的

1
log_format access '{ "@timestamp": "$time_iso8601", ' '"clientRealIp": "$clientRealIp", ' '"size": $body_bytes_sent, ' '"request": "$request", ' '"method": "$request_method", ' '"responsetime": $request_time, ' '"upstreamhost": "$upstream_addr", ' '"http_host": "$host", ' '"url": "$uri", ' '"referrer": "$http_referer", ' '"agent": "$http_user_agent", ' '"status": "$status"} ';

配置filebeat

vim /etc/filebeat/filebeat.yml

1
filebeat.prospectors: - input_type: log paths: - /data/wwwlogs/*.log document_type: ngx1-168 tail_files: true json.keys_under_root: true json.add_error_key: true output.kafka: enabled: true hosts: ["192.168.188.237:9092","192.168.188.238:9092","192.168.188.239:9092"] topic: '%{[type]}' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000 worker: 3 processors: - drop_fields: fields: ["input_type", "beat.hostname", "beat.name", "beat.version", "offset", "source"] logging.to_files: true logging.files: path: /var/log/filebeat name: filebeat rotateeverybytes: 10485760 # = 10MB keepfiles: 7

filebeat详细配置参考官网

https://www.elastic.co/guide/en/beats/filebeat/5.2/index.html

采用kafka作为日志输出端

https://www.elastic.co/guide/en/beats/filebeat/5.2/kafka-output.html

output.kafka:

  # initial brokers for reading cluster metadata

  hosts: [“kafka1:9092”, “kafka2:9092”, “kafka3:9092”]

  # message topic selection + partitioning

  topic: ‘%{[type]}’

  partition.round_robin:

    reachable_only: false

  required_acks: 1

  compression: gzip

  max_message_bytes: 1000000

启动

chkconfig filebeat on

/etc/init.d/filebeat start

报错处理

[root@localhost ~]# tail -f /var/log/filebeat/filebeat

2017-05-09T15:21:39+08:00 ERR Error decoding JSON: invalid character ‘x’ in string escape code

使用$uri 可以在nginx对URL进行更改或重写,但是用于日志输出可以使用$request_uri代替,如无特殊业务需求,完全可以替换

参考

http://www.mamicode.com/info-detail-1368765.html

七,验证

1,kafka消费者查看

/usr/local/kafka/bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic ngx1-168 

2,elasticserch head查看Index及分片信息

八,部署配置kibana

1,配置启动

cat /etc/kibana/kibana.yml

server.port: 5601

server.host: “192.168.188.191”

elasticsearch.url: “http://192.168.188.191:9200

chkconfig –add kibana

/etc/init.d/kibana start

2,字段格式

1
{ "_index": "filebeat-ngx1-168-2017.05.10", "_type": "ngx1-168", "_id": "AVvvtIJVy6ssC9hG9dKY", "_score": null, "_source": { "request": "GET /qiche/奥迪A3/ HTTP/1.1", "agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36", "geoip": { "city_name": "Jinhua", "timezone": "Asia/Shanghai", "ip": "122.226.77.150", "latitude": 29.1068, "country_code2": "CN", "country_name": "China", "continent_code": "AS", "country_code3": "CN", "region_name": "Zhejiang", "location": [ 119.6442, 29.1068 ], "longitude": 119.6442, "region_code": "33" }, "method": "GET", "type": "ngx1-168", "http_host": "www.niubi.com", "url": "/qiche/奥迪A3/", "referrer": "http://www.niubi.com/qiche/奥迪S6/", "upstreamhost": "172.17.4.205:80", "@timestamp": "2017-05-10T08:14:00.000Z", "size": 10027, "beat": {}, "@version": "1", "responsetime": 0.217, "clientRealIp": "122.226.77.150", "status": "200" }, "fields": { "@timestamp": [ 1494404040000 ] }, "sort": [ 1494404040000 ] }

3,视图仪表盘

1),添加高德地图

编辑kibana配置文件kibana.yml,最后面添加

tilemap.url: ‘http://webrd02.is.autonavi.com/appmaptile?lang=zh\_cn&size=1&scale=1&style=7&x={x}&y={y}&z={z}

ES 模版的调整,Geo-points 不适用 dynamic mapping 因此这类项目需要显式的指定:

需要将 geoip.location 指定为 geo_point 类型,则在模版的 properties 中增加一个项目,如下所示:

       “properties”: {

         “@version”: { “type”: “string”, “index”: “not_analyzed” },

         “geoip”  : {

           “type”: “object”,

             “dynamic”: true,

             “properties”: {

               “location”: { “type”: “geo_point” }

             }

         }

       }

4,安装x-pack插件

参考

https://www.elastic.co/guide/en/x-pack/5.2/installing-xpack.html#xpack-installing-offline

https://www.elastic.co/guide/en/x-pack/5.2/setting-up-authentication.html#built-in-users

注意要修改密码

http://192.168.188.215:5601/app/kibana#/dev\_tools/console?load\_from=https://www.elastic.co/guide/en/x-pack/5.2/snippets/setting-up-authentication/1.json

http://192.168.188.215:5601/app/kibana#/dev\_tools/console?load\_from=https://www.elastic.co/guide/en/x-pack/5.2/snippets/setting-up-authentication/2.json

http://192.168.188.215:5601/app/kibana#/dev\_tools/console?load\_from=https://www.elastic.co/guide/en/x-pack/5.2/snippets/setting-up-authentication/3.json

或者

curl -XPUT ‘localhost:9200/_xpack/security/user/elastic/_password?pretty’ -H ‘Content-Type: application/json’ -d’

{

  “password”: “elasticpassword”

}

curl -XPUT ‘localhost:9200/_xpack/security/user/kibana/_password?pretty’ -H ‘Content-Type: application/json’ -d’

{

  “password”: “kibanapassword”

}

curl -XPUT ‘localhost:9200/_xpack/security/user/logstash_system/_password?pretty’ -H ‘Content-Type: application/json’ -d’

{

  “password”: “logstashpassword”

}

下面是官网x-pack安装升级卸载文档,后发现注册版本的x-pack,只具有监控功能,就没安装

1
Installing X-Pack on Offline Machines The plugin install scripts require direct Internet access to download and install X-Pack. If your server doesn’t have Internet access, you can manually download and install X-Pack. To install X-Pack on a machine that doesn’t have Internet access: Manually download the X-Pack zip file: https://artifacts.elastic.co/downloads/packs/x-pack/x-pack-5.2.2.zip (sha1) Transfer the zip file to a temporary directory on the offline machine. (Do NOT put the file in the Elasticsearch plugins directory.) Run bin/elasticsearch-plugin install from the Elasticsearch install directory and specify the location of the X-Pack zip file. For example: bin/elasticsearch-plugin install file:///path/to/file/x-pack-5.2.2.zip Note You must specify an absolute path to the zip file after the file:// protocol. Run bin/kibana-plugin install from the Kibana install directory and specify the location of the X-Pack zip file. (The plugins for Elasticsearch, Kibana, and Logstash are included in the same zip file.) For example: bin/kibana-plugin install file:///path/to/file/x-pack-5.2.2.zip Run bin/logstash-plugin install from the Logstash install directory and specify the location of the X-Pack zip file. (The plugins for Elasticsearch, Kibana, and Logstash are included in the same zip file.) For example: bin/logstash-plugin install file:///path/to/file/x-pack-5.2.2.zip Enabling and Disabling X-Pack Features By default, all X-Pack features are enabled. You can explicitly enable or disable X-Pack features in elasticsearch.yml and kibana.yml: SettingDescription xpack.security.enabled Set to false to disable X-Pack security. Configure in both elasticsearch.yml and kibana.yml. xpack.monitoring.enabled Set to false to disable X-Pack monitoring. Configure in both elasticsearch.yml and kibana.yml. xpack.graph.enabled Set to false to disable X-Pack graph. Configure in both elasticsearch.yml and kibana.yml. xpack.watcher.enabled Set to false to disable Watcher. Configure in elasticsearch.yml only. xpack.reporting.enabled Set to false to disable X-Pack reporting. Configure in kibana.yml only.

九、Nginx负载均衡

1,配置负载

[root@~# cat /usr/local/nginx/conf/nginx.conf 

server

1
{ listen 5601; server_name 192.168.188.215; index index.html index.htm index.shtml; location / { allow 192.168.188.0/24; deny all; proxy_pass http://kibanangx_niubi_com; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection 'upgrade'; proxy_set_header Host $host; proxy_cache_bypass $http_upgrade; auth_basic "Please input Username and Password"; auth_basic_user_file /usr/local/nginx/conf/.pass_file_elk; } access_log /data/wwwlogs/access_kibanangx.niubi.com.log access; } upstream kibanangx_niubi_com { ip_hash; server 192.168.188.191:5601; server 192.168.188.193:5601; server 192.168.188.195:5601; }

2,访问

http://192.168.188.215:5601/app/kibana#

-————————————————————————————————

完美的分割线

-————————————————————————————————

优化文档

ELKB5.2集群优化方案

一,优化效果

优化前 

收集日志请求达到1万/s,延时10s内,默认设置数据10s刷新。

优化后

收集日志请求达到3万/s,延时10s内,默认设置数据10s刷新。(预估可以满足最大请求5万/s)

缺点:CPU处理能力不足,在dashboard大时间聚合运算是生成仪表视图会有超时现象发生;另外elasticsarch结构和搜索语法等还有进一步优化空间。

二,优化步骤

1,内存和CPU重新规划

1),es                       16CPU  48G内存

2),kafka                 8CPU   16G内存

3),logstash            16CPU  12G内存

2,kafka优化

kafka manager 监控观察消费情况

kafka heap size需要修改

logstash涉及kafka的一个参数修改

1),修改jvm内存数

vi /usr/local/kafka/bin/kafka-server-start.sh

if [ “x$KAFKA_HEAP_OPTS” = “x” ]; then

    export KAFKA_HEAP_OPTS=”-Xmx8G -Xms8G”

    export JMX_PORT=”8999”

fi

2),Broker参数配置

配置优化都是修改server.properties文件中参数值

网络和io操作线程配置优化

# broker处理消息的最大线程数(默认3,可以为CPU核数) 

num.network.threads=4   

# broker处理磁盘IO的线程数 (默认4,可以为CPU核数2倍左右)

num.io.threads=8     

3),安装kafka监控

/data/scripts/kafka-manager-1.3.3.4/bin/kafka-manager

http://192.168.188.215:8099/clusters/ngxlog/consumers

3,logstah优化

logstas需要修改2个配置文件

1),修改jvm参数

vi /usr/share/logstash/config/jvm.options 

-Xms2g

-Xmx6g

2),修改logstash.yml

vi /usr/share/logstash/config/logstash.yml

path.data: /var/lib/logstash

pipeline.workers: 16#cpu核心数

pipeline.output.workers: 4#这里相当于output elasticsearch里面的workers数

pipeline.batch.size: 5000#根据qps,压力情况等填写

pipeline.batch.delay: 5

path.config: /usr/share/logstash/config/conf.d

path.logs: /var/log/logstash

3),修改对应的logstash.conf文件

input文件

vi /usr/share/logstash/config/in-kafka-ngx12-out-es.conf 

1
input { kafka { bootstrap_servers => "192.168.188.237:9092,192.168.188.238:9092,192.168.188.239:9092" group_id => "ngx1" topics => ["ngx1-168"] codec => "json" consumer_threads => 3 auto_offset_reset => "latest" #添加这行 #decorate_events => #true 这行去掉 } }

filter文件

1
filter { mutate { gsub => ["message", "\\x", "%"] #这个是转义,url里面的加密方式和request等不一样,用于汉字显示 #remove_field => ["kafka"]这行去掉 decorate events 默认false后就不添加kafka.{}字段了,这里也及不需要再remove了 }

output文件

修改前

    flush_size => 50000

    idle_flush_time => 10

修改后

4秒集齐8万条一次性输出

    flush_size => 80000

    idle_flush_time => 4

启动后logstash输出(pipeline.max_inflight是8万)

1
[2017-05-16T10:07:02,552][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>16, "pipeline.batch.size"=>5000, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>80000} [2017-05-16T10:07:02,553][WARN ][logstash.pipeline ] CAUTION: Recommended inflight events max exceeded! Logstash will run with up to 80000 events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently 5000), or changing the number of pipeline workers (currently 16)

4,elasticsearch优化

1),修改jvm参加

vi /etc/elasticsearch/jvm.options

调整为24g,最大为虚拟机内存的50%

-Xms24g

-Xmx24g

2),修改GC方法(待定,后续观察,该参数不确定时不建议修改)

elasticsearch默认使用的GC是CMS GC

如果你的内存大小超过6G,CMS是不给力的,容易出现stop-the-world

建议使用G1 GC

注释掉:

JAVA_OPTS=”$JAVA_OPTS -XX:+UseParNewGC”

JAVA_OPTS=”$JAVA_OPTS -XX:+UseConcMarkSweepGC”

JAVA_OPTS=”$JAVA_OPTS -XX:CMSInitiatingOccupancyFraction=75″

JAVA_OPTS=”$JAVA_OPTS -XX:+UseCMSInitiatingOccupancyOnly”

修改为:

JAVA_OPTS=”$JAVA_OPTS -XX:+UseG1GC”

JAVA_OPTS=”$JAVA_OPTS -XX:MaxGCPauseMillis=200″

3),安装elasticsearch集群监控工具Cerebro

https://github.com/lmenezes/cerebro

Cerebro 时一个第三方的 elasticsearch 集群管理软件,可以方便地查看集群状态:

https://github.com/lmenezes/cerebro/releases/download/v0.6.5/cerebro-0.6.5.tgz

安装后访问地址

http://192.168.188.215:9000/

4),elasticsearch搜索参数优化(难点问题)

发现没事可做的,首先默认配置已经很好了,其次bulk,刷新等配置里都写好了 

5),elasticsarch集群角色优化

es191,es193,es195只做master节点+ingest节点

es192,es194,es196只做data节点(上面是虚拟机2个虚拟机共用一组raid5磁盘,如果都做data节点性能表现不好)

再加2个data节点,这样聚合计算性能提升很大

5,filebeat优化

1),使用json格式输入,这样logstash就不需要dcode减轻后端压力

  json.keys_under_root: true

  json.add_error_key: true

2),drop不必要的字段如下

vim /etc/filebeat/filebeat.yml 

processors:

- drop_fields:

    fields: [“input_type”, “beat.hostname”, “beat.name”, “beat.version”, “offset”, “source”]

3),计划任务删索引

index默认保留5天

cat /data/scripts/delindex.sh 

1
#!/bin/bash OLDDATE=`date -d -5days +%Y.%m.%d` echo $OLDDATE curl -XDELETE http://192.168.188.193:9200/filebeat-ngx1-168-$OLDDATE curl -XDELETE http://192.168.188.193:9200/filebeat-ngx2-178-$OLDDATE curl -XDELETE http://192.168.188.193:9200/filebeat-ngx3-188-$OLDDATE

介绍

当谈及集中日志到 Elasticsearch 时,首先想到的日志传输(log shipper)就是 Logstash。开发者听说过它,但是不太清楚它具体是干什么事情的:

当深入这个话题时,我们才明白集中存储日志通常隐含着很多的事情,Logstash 也不是唯一的日志传输工具(log shipper)

  • 从数据源获取数据:文件、UNIX socket、TCP、UDP 等等
  • 处理:添加时间戳、解析非结构化数据、根据 IP 添加地理位置信息
  • 传输:到目标存储。比如,Elasticsearch 。由于 Elasticsearch 可能会宕机,或正存在性能问题,或网络存在问题,那么传输工具最好就需要有能力提供缓冲以及重试。

本篇博文旨在比较 Logstash 已经它的五种替代方案(Filebeat、Fluentd、rsyslog、syslog-ng 以及 Logagent),这样就可以知道它们各适合于何种场景。

分析

Logstash

Logstash 不是这个列表里最老的传输工具(最老的应该是 syslog-ng ,讽刺的是它也是唯一一个名字里带有 new 的),但 Logstash 绝对可以称得上最有名的。因为它有很多插件:输入、编解码器、过滤器以及输出。基本上,可以获取并丰富任何数据,然后将它们推送到多种目标存储。

优势

Logstash 主要的有点就是它的灵活性,这还主要因为它有很多插件。然后它清楚的文档已经直白的配置格式让它可以再多种场景下应用。这样的良性循环让我们可以在网上找到很多资源,几乎可以处理任何问题。以下是一些例子:

劣势

Logstash 致命的问题是它的性能以及资源消耗(默认的堆大小是 1GB)。尽管它的性能在近几年已经有很大提升,与它的替代者们相比还是要慢很多的。这里有 Logstash 与 rsyslog 性能对比以及Logstash 与 filebeat 的性能对比。它在大数据量的情况下会是个问题。

另一个问题是它目前不支持缓存,目前的典型替代方案是将 Redis 或 Kafka 作为中心缓冲池:

典型应用场景

因为 Logstash 自身的灵活性以及网络上丰富的资料,Logstash 适用于原型验证阶段使用,或者解析非常的复杂的时候。在不考虑服务器资源的情况下,如果服务器的性能足够好,我们也可以为每台服务器安装 Logstash 。我们也不需要使用缓冲,因为文件自身就有缓冲的行为,而 Logstash 也会记住上次处理的位置。

如果服务器性能较差,并不推荐为每个服务器安装 Logstash ,这样就需要一个轻量的日志传输工具,将数据从服务器端经由一个或多个 Logstash 中心服务器传输到 Elasticsearch:

随着日志项目的推进,可能会因为性能或代价的问题,需要调整日志传输的方式(log shipper)。当判断 Logstash 的性能是否足够好时,重要的是对吞吐量的需求有着准确的估计,这也决定了需要为 Logstash 投入多少硬件资源。

Filebeat

作为 Beats 家族的一员,Filebeat 是一个轻量级的日志传输工具,它的存在正弥补了 Logstash 的缺点:Filebeat 作为一个轻量级的日志传输工具可以将日志推送到中心 Logstash。

在版本 5.x 中,Elasticsearch 具有解析的能力(像 Logstash 过滤器)— Ingest。这也就意味着可以将数据直接用 Filebeat 推送到 Elasticsearch,并让 Elasticsearch 既做解析的事情,又做存储的事情。也不需要使用缓冲,因为 Filebeat 也会和 Logstash 一样记住上次读取的偏移:

如果需要缓冲(例如,不希望将日志服务器的文件系统填满),可以使用 Redis/Kafka,因为 Filebeat 可以与它们进行通信:

优势

Filebeat 只是一个二进制文件没有任何依赖。它占用资源极少,尽管它还十分年轻,正式因为它简单,所以几乎没有什么可以出错的地方,所以它的可靠性还是很高的。它也为我们提供了很多可以调节的点,例如:它以何种方式搜索新的文件,以及当文件有一段时间没有发生变化时,何时选择关闭文件句柄。

劣势

Filebeat 的应用范围十分有限,所以在某些场景下我们会碰到问题。例如,如果使用 Logstash 作为下游管道,我们同样会遇到性能问题。正因为如此,Filebeat 的范围在扩大。开始时,它只能将日志发送到 Logstash 和 Elasticsearch,而现在它可以将日志发送给 Kafka 和 Redis,在 5.x 版本中,它还具备过滤的能力。

典型应用场景

Filebeat 在解决某些特定的问题时:日志存于文件,我们希望

  • 将日志直接传输存储到 Elasticsearch。这仅在我们只是抓去(grep)它们或者日志是存于 JSON 格式(Filebeat 可以解析 JSON)。或者如果打算使用 Elasticsearch 的 Ingest 功能对日志进行解析和丰富。
  • 将日志发送到 Kafka/Redis。所以另外一个传输工具(例如,Logstash 或自定义的 Kafka 消费者)可以进一步丰富和转发。这里假设选择的下游传输工具能够满足我们对功能和性能的要求。

Logagent

Logagent 是 Sematext 提供的传输工具,它用来将日志传输到 Logsene(一个基于 SaaS 平台的 Elasticsearch API),因为 Logsene 会暴露 Elasticsearch API,所以 Logagent 可以很容易将数据推送到 Elasticsearch 。

优势

可以获取 /var/log 下的所有信息,解析各种格式(Elasticsearch,Solr,MongoDB,Apache HTTPD等等),它可以掩盖敏感的数据信息,例如,个人验证信息(PII),出生年月日,信用卡号码,等等。它还可以基于 IP 做 GeoIP 丰富地理位置信息(例如,access logs)。同样,它轻量又快速,可以将其置入任何日志块中。在新的 2.0 版本中,它以第三方 node.js 模块化方式增加了支持对输入输出的处理插件。重要的是 Logagent 有本地缓冲,所以不像 Logstash ,在数据传输目的地不可用时会丢失日志。

劣势

尽管 Logagent 有些比较有意思的功能(例如,接收 Heroku 或 CloudFoundry 日志),但是它并没有 Logstash 灵活。

典型应用场景

Logagent 作为一个可以做所有事情的传输工具是值得选择的(提取、解析、缓冲和传输)。

rsyslog

绝大多数 Linux 发布版本默认的 syslog 守护进程,rsyslog 可以做的不仅仅是将日志从 syslog socket 读取并写入 /var/log/messages 。它可以提取文件、解析、缓冲(磁盘和内存)以及将它们传输到多个目的地,包括 Elasticsearch 。可以从此处找到如何处理 Apache 以及系统日志。

优势

rsyslog 是经测试过的最快的传输工具。如果只是将它作为一个简单的 router/shipper 使用,几乎所有的机器都会受带宽的限制,但是它非常擅长处理解析多个规则。它基于语法的模块(mmnormalize)无论规则数目如何增加,它的处理速度始终是线性增长的。这也就意味着,如果当规则在 20-30 条时,如解析 Cisco 日志时,它的性能可以大大超过基于正则式解析的 grok ,达到 100 倍(当然,这也取决于 grok 的实现以及 liblognorm 的版本)。

它同时也是我们能找到的最轻的解析器,当然这也取决于我们配置的缓冲。

劣势

rsyslog 的配置工作需要更大的代价(这里有一些例子),这让两件事情非常困难:

  • 文档难以搜索和阅读,特别是那些对术语比较陌生的开发者。
  • 5.x 以上的版本格式不太一样(它扩展了 syslogd 的配置格式,同时也仍然支持旧的格式),尽管新的格式可以兼容旧格式,但是新的特性(例如,Elasticsearch 的输出)只在新的配置下才有效,然后旧的插件(例如,Postgres 输出)只在旧格式下支持。

尽管在配置稳定的情况下,rsyslog 是可靠的(它自身也提供多种配置方式,最终都可以获得相同的结果),它还是存在一些 bug 。

典型应用场景

rsyslog 适合那些非常轻的应用(应用,小VM,Docker容器)。如果需要在另一个传输工具(例如,Logstash)中进行处理,可以直接通过 TCP 转发 JSON ,或者连接 Kafka/Redis 缓冲。

rsyslog 还适合我们对性能有着非常严格的要求时,特别是在有多个解析规则时。那么这就值得为之投入更多的时间研究它的配置。

syslog-ng

可以将 syslog-ng 当作 rsyslog 的替代品(尽管历史上它们是两种不同的方式)。它也是一个模块化的 syslog 守护进程,但是它可以做的事情要比 syslog 多。它可以接收磁盘缓冲并将 Elasticsearch HTTP 作为输出。它使用 PatternDB 作为语法解析的基础,作为 Elasticsearch 的传输工具,它是一个不错的选择。

优势

和 rsyslog 一样,作为一个轻量级的传输工具,它的性能也非常好。它曾经比 rsyslog 慢很多,但是 2 年前能达到 570K Logs/s 的性能并不差。并不像 rsyslog ,它有着明确一致的配置格式以及完好的文档。

劣势

Linux 发布版本转向使用 rsyslog 的原因是 syslog-ng 高级版曾经有很多功能在开源版中都存在,但是后来又有所限制。我们这里只关注与开源版本,所有的日志传输工具都是开源的。现在又有所变化,例如磁盘缓冲,曾经是高级版存在的特性,现在开源版也有。但有些特性,例如带有应用层的通知的可靠传输协议(reliable delivery protocol)还没有在开源版本中。

典型应用场景

和 rsyslog 类似,可能将 syslog-ng 部署在资源受限的环境中,但仍希望它能在处理复杂计算时有着良好的性能。如果使用 rsyslog ,它可以输出至 Kafka ,以 Kafka 作为一个中心队列,并以 Logstash 作为一个自定义消费者:

不同的是,syslog-ng 使用起来比 rsyslog 更容易,性能没有 rsyslog 那么极致:例如,它只对输出进行缓冲,所以它所有的计算处理在缓冲之前就完成了,这也意味着它会给日志流带来压力。

Fluentd

Fluentd 创建的初衷主要是尽可能的使用 JSON 作为日志输出,所以传输工具及其下游的传输线不需要猜测子字符串里面各个字段的类型。这样,它为几乎所有的语言都提供库,这也意味着,我们可以将它插入到我们自定义的程序中。

优势

和多数 Logstash 插件一样,Fluentd 插件是用 Ruby 语言开发的非常易于编写维护。所以它数量很多,几乎所有的源和目标存储都有插件(各个插件的成熟度也不太一样)。这也意味这我们可以用 Fluentd 来串联所有的东西。

劣势

因为在多数应用场景下,我们会通过 Fluentd 得到结构化的数据,它的灵活性并不好。但是我们仍然可以通过正则表达式,来解析非结构化的数据。尽管,性能在大多数场景下都很好,但它并不是最好的,和 syslog-ng 一样,它的缓冲只存在与输出端,单线程核心以及 Ruby GIL 实现的插件意味着它大的节点下性能是受限的,不过,它的资源消耗在大多数场景下是可以接受的。对于小的或者嵌入式的设备,可能需要看看 Fluent Bit,它和 Fluentd 的关系与 Filebeat 和 Logstash 之间的关系类似

典型应用场景

Fluentd 在日志的数据源和目标存储各种各样时非常合适,因为它有很多插件。而且,如果大多数数据源都是自定义的应用,所以可以发现用 fluentd 的库要比将日志库与其他传输工具结合起来要容易很多。特别是在我们的应用是多种语言编写的时候,即我们使用了多种日志库,日志的行为也不太一样。

结论

工具的选择由使用场景决定。

参考

参考来源:

5 Logstash Alternatives

结束

ELK实时日志分析平台环境部署–完整记录 - 散尽浮华 - 博客园

Excerpt

在日常运维工作中,对于系统和业务日志的处理尤为重要。今天,在这里分享一下自己部署的ELK(+Redis)-开源实时日志分析平台的记录过程(仅依据本人的实际操作为例说明,如有误述,敬请指出)~ 概念介绍 日志主要包括系统日志、应用程序日志和安全日志。系统运维和开发人员可以通过日志了解服务器软硬件信息、


在日常运维工作中,对于系统和业务日志的处理尤为重要。今天,在这里分享一下自己部署的ELK(+Redis)-开源实时日志分析平台的记录过程(仅依据本人的实际操作为例说明,如有误述,敬请指出)~

================概念介绍================
日志主要包括系统日志、应用程序日志和安全日志。系统运维和开发人员可以通过日志了解服务器软硬件信息、检查配置过程中的错误及错误发生的原因。经常分析日志可以了解服务器的负荷,性能安全性,从而及时采取措施纠正错误。

通常,日志被分散在储存不同的设备上。如果你管理数十上百台服务器,你还在使用依次登录每台机器的传统方法查阅日志。这样是不是感觉很繁琐和效率低下。当务之急我们使用集中化的日志管理,例如:开源的syslog,将所有服务器上的日志收集汇总。

集中化管理日志后,日志的统计和检索又成为一件比较麻烦的事情,一般我们使用grep、awk和wc等Linux命令能实现检索和统计,但是对于要求更高的查询、排序和统计等要求和庞大的机器数量依然使用这样的方法难免有点力不从心。

通过我们需要对日志进行集中化管理,将所有机器上的日志信息收集、汇总到一起。完整的日志数据具有非常重要的作用:
1)信息查找。通过检索日志信息,定位相应的bug,找出解决方案。
2)服务诊断。通过对日志信息进行统计、分析,了解服务器的负荷和服务运行状态,找出耗时请求进行优化等等。
3)数据分析。如果是格式化的log,可以做进一步的数据分析,统计、聚合出有意义的信息,比如根据请求中的商品id,找出TOP10用户感兴趣商品。

开源实时日志分析ELK平台能够完美的解决我们上述的问题,ELK由ElasticSearch、Logstash和Kiabana三个开源工具组成:
1)ElasticSearch是一个基于Lucene的开源分布式搜索服务器。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是第二流行的企业搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。
在elasticsearch中,所有节点的数据是均等的。
2)Logstash是一个完全开源的工具,它可以对你的日志进行收集、过滤、分析,支持大量的数据获取方法,并将其存储供以后使用(如搜索)。说到搜索,logstash带有一个web界面,搜索和展示所有日志。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
3)Kibana 是一个基于浏览器页面的Elasticsearch前端展示工具,也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助您汇总、分析和搜索重要数据日志。

为什么要用到ELK?
一般我们需要进行日志分析场景是:直接在日志文件中 grep、awk 就可以获得自己想要的信息。但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询。需要集中化的日志管理,所有服务器上的日志收集汇总。常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。
一般大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定位到具体的服务器和服务模块,构建一套集中式日志系统,可以提高定位问题的效率。

一般大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定位到具体的服务器和服务模块,构建一套集中式日志系统,可以提高定位问题的效率。
一个完整的集中式日志系统,需要包含以下几个主要特点:
1)收集-能够采集多种来源的日志数据
2)传输-能够稳定的把日志数据传输到中央系统
3)存储-如何存储日志数据
4)分析-可以支持 UI 分析
5)警告-能够提供错误报告,监控机制

ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用。目前主流的一种日志系统。

ELK工作原理展示图:

如上图:Logstash收集AppServer产生的Log,并存放到ElasticSearch集群中,而Kibana则从ES集群中查询数据生成图表,再返回给Browser。

Logstash工作原理:
Logstash事件处理有三个阶段:inputs → filters → outputs。是一个接收,处理,转发日志的工具。支持系统日志,webserver日志,错误日志,应用日志,总之包括所有可以抛出来的日志类型。

Input:输入数据到logstash。

一些常用的输入为:
file:从文件系统的文件中读取,类似于tial -f命令
syslog:在514端口上监听系统日志消息,并根据RFC3164标准进行解析
redis:从redis service中读取
beats:从filebeat中读取
Filters:数据中间处理,对数据进行操作。

一些常用的过滤器为:
grok:解析任意文本数据,Grok 是 Logstash 最重要的插件。它的主要作用就是将文本格式的字符串,转换成为具体的结构化的数据,配合正则表达式使用。内置120多个解析语法。
mutate:对字段进行转换。例如对字段进行删除、替换、修改、重命名等。
drop:丢弃一部分events不进行处理。
clone:拷贝 event,这个过程中也可以添加或移除字段。
geoip:添加地理信息(为前台kibana图形化展示使用)
Outputs:outputs是logstash处理管道的最末端组件。一个event可以在处理过程中经过多重输出,但是一旦所有的outputs都执行结束,这个event也就完成生命周期。

一些常见的outputs为:
elasticsearch:可以高效的保存数据,并且能够方便和简单的进行查询。
file:将event数据保存到文件中。
graphite:将event数据发送到图形化组件中,一个很流行的开源存储图形化展示的组件。
Codecs:codecs 是基于数据流的过滤器,它可以作为input,output的一部分配置。Codecs可以帮助你轻松的分割发送过来已经被序列化的数据。

一些常见的codecs:
json:使用json格式对数据进行编码/解码。
multiline:将汇多个事件中数据汇总为一个单一的行。比如:java异常信息和堆栈信息。

======================ELK整体方案=======================
ELK中的三个系统分别扮演不同的角色,组成了一个整体的解决方案。Logstash是一个ETL工具,负责从每台机器抓取日志数据,对数据进行格式转换和处理后,输出到Elasticsearch中存储。Elasticsearch是一个分布式搜索引擎和分析引擎,用于数据存储,可提供实时的数据查询。Kibana是一个数据可视化服务,根据用户的操作从Elasticsearch中查询数据,形成相应的分析结果,以图表的形式展现给用户。
ELK的安装很简单,可以按照”下载->修改配置文件->启动”方法分别部署三个系统,也可以使用docker来快速部署。具体的安装方法这里不详细介绍,下面来看一个常见的部署方案,如下图所示,部署思路是:
1)在每台生成日志文件的机器上,部署Logstash,作为Shipper的角色,负责从日志文件中提取数据,但是不做任何处理,直接将数据输出到Redis队列(list)中;
2)需要一台机器部署Logstash,作为Indexer的角色,负责从Redis中取出数据,对数据进行格式化和相关处理后,输出到Elasticsearch中存储;
3)部署Elasticsearch集群,当然取决于你的数据量了,数据量小的话可以使用单台服务,如果做集群的话,最好是有3个以上节点,同时还需要部署相关的监控插件;
4)部署Kibana服务,提供Web服务。

在前期部署阶段,主要工作是Logstash节点和Elasticsearch集群的部署,而在后期使用阶段,主要工作就是Elasticsearch集群的监控和使用Kibana来检索、分析日志数据了,当然也可以直接编写程序来消费Elasticsearch中的数据。

在上面的部署方案中,我们将Logstash分为Shipper和Indexer两种角色来完成不同的工作,中间通过Redis做数据管道,为什么要这样做?为什么不是直接在每台机器上使用Logstash提取数据、处理、存入Elasticsearch?

首先,采用这样的架构部署,有三点优势:第一,降低对日志所在机器的影响,这些机器上一般都部署着反向代理或应用服务,本身负载就很重了,所以尽可能的在这些机器上少做事;第二,如果有很多台机器需要做日志收集,那么让每台机器都向Elasticsearch持续写入数据,必然会对Elasticsearch造成压力,因此需要对数据进行缓冲,同时,这样的缓冲也可以一定程度的保护数据不丢失;第三,将日志数据的格式化与处理放到Indexer中统一做,可以在一处修改代码、部署,避免需要到多台机器上去修改配置。 

其次,我们需要做的是将数据放入一个消息队列中进行缓冲,所以Redis只是其中一个选择,也可以是RabbitMQ、Kafka等等,在实际生产中,Redis与Kafka用的比较多。由于Redis集群一般都是通过key来做分片,无法对list类型做集群,在数据量大的时候必然不合适了,而Kafka天生就是分布式的消息队列系统。

1)配置nginx日志格式
首先需要将nginx日志格式规范化,便于做解析处理。在nginx.conf文件中设置:

1

2

log_format main '$remote_addr "$time_iso8601" "$request" $status $body_bytes_sent "$http_user_agent" "$http_referer" "$http_x_forwarded_for" "$request_time" "$upstream_response_time" "$http_cookie" "$http_Authorization" "$http_token"';

access_log  /var/log/nginx/example.access.log  main;

2)nginx日志–>>Logstash–>>消息队列
这部分是Logstash Shipper的工作,涉及input和output两种插件。input部分,由于需要提取的是日志文件,一般使用file插件,该插件常用的几个参数是:
path:指定日志文件路径。
type:指定一个名称,设置type后,可以在后面的filter和output中对不同的type做不同的处理,适用于需要消费多个日志文件的场景。
start_position:指定起始读取位置,“beginning”表示从文件头开始,“end”表示从文件尾开始(类似tail -f)。
sincedb_path:与Logstash的一个坑有关。通常Logstash会记录每个文件已经被读取到的位置,保存在sincedb中,如果Logstash重启,那么对于同一个文件,会继续从上次记录的位置开始读取。如果想重新从头读取文件,需要删除sincedb文件,sincedb_path则是指定了该文件的路径。为了方便,我们可以根据需要将其设置为“/dev/null”,即不保存位置信息。

1

2

3

4

5

6

7

8

9

input {

    file {

        type => "example_nginx_access"

        path => ["/var/log/nginx/example.access.log"]

        start_position => "beginning"

        sincedb_path => "/dev/null"

    }

}

output部分,将数据输出到消息队列,以redis为例,需要指定redis server和list key名称。另外,在测试阶段,可以使用stdout来查看输出信息。

1

2

3

4

5

6

7

8

9

10

11

12

output {

    if [type] == "example_nginx_access" {

        redis {

            host => "127.0.0.1"

            port => "6379"

            data_type => "list"

            key => "logstash:example_nginx_access"

        }

    }

}

3)消息队列–>>Logstash–>>Elasticsearch
这部分是Logstash Indexer的工作,涉及input、filter和output三种插件。在input部分,我们通过redis插件将数据从消息队列中取出来。在output部分,我们通过elasticsearch插件将数据写入Elasticsearch。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

input {

    redis {

            host => "127.0.0.1"

            port => "6379"

            data_type => "list"

            key => "logstash:example_nginx_access"

    }

}

output {

    elasticsearch {

        index => "logstash-example-nginx-%{+YYYY.MM}"

        hosts => ["127.0.0.1:9200"]

    }

}

这里,需要重点关注filter部分,下面列举几个常用的插件,实际使用中根据自身需求从官方文档中查找适合自己业务的插件并使用即可,当然也可以编写自己的插件。
grok:是Logstash最重要的一个插件,用于将非结构化的文本数据转化为结构化的数据。grok内部使用正则语法对文本数据进行匹配,为了降低使用复杂度,其提供了一组pattern,我们可以直接调用pattern而不需要自己写正则表达式,参考源码grok-patterns。grok解析文本的语法格式是%{SYNTAX:SEMANTIC},SYNTAX是pattern名称,SEMANTIC是需要生成的字段名称,使用工具Grok Debugger可以对解析语法进行调试。例如,在下面的配置中,我们先使用grok对输入的原始nginx日志信息(默认以message作为字段名)进行解析,并添加新的字段request_path_with_verb(该字段的值是verb和request_path的组合),然后对request_path字段做进一步解析。
kv:用于将某个字段的值进行分解,类似于编程语言中的字符串Split。在下面的配置中,我们将request_args字段值按照“&”进行分解,分解后的字段名称以“request_args_”作为前缀,并且丢弃重复的字段。
geoip:用于根据IP信息生成地理位置信息,默认使用自带的一份GeoLiteCity database,也可以自己更换为最新的数据库,但是需要数据格式需要遵循Maxmind的格式(参考GeoLite),似乎目前只能支持legacy database,数据类型必须是.dat。下载GeoLiteCity.dat.gz后解压, 并将文件路径配置到source中即可。
translate:用于检测某字段的值是否符合条件,如果符合条件则将其翻译成新的值,写入一个新的字段,匹配pattern可以通过YAML文件来配置。例如,在下面的配置中,我们对request_api字段翻译成更加易懂的文字描述。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

filter {

    grok {

        match => {"message" => "%{IPORHOST:client_ip} \"%{TIMESTAMP_ISO8601:timestamp}\" \"%{WORD:verb} %{NOTSPACE:request_path} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response_status:int} %{NUMBER:response_body_bytes:int} \"%{DATA:user_agent}\" \"%{DATA:http_referer}\" \"%{NOTSPACE:http_x_forwarder_for}\" \"%{NUMBER:request_time:float}\" \"%{DATA:upstream_resopnse_time}\" \"%{DATA:http_cookie}\" \"%{DATA:http_authorization}\" \"%{DATA:http_token}\""}

        add_field => {"request_path_with_verb" => "%{verb} %{request_path}"}

    }

    grok {

        match => {"request_path" => "%{URIPATH:request_api}(?:\?%{NOTSPACE:request_args}|)"}

        add_field => {"request_annotation" => "%{request_api}"}

    }

    kv {

        prefix => "request_args_"

        field_split => "&"

        source => "request_args"

        allow_duplicate_values => false

    }

    geoip {

        source => "client_ip"

        database => "/home/elktest/geoip_data/GeoLiteCity.dat"

    }

   translate {

        field => request_path

        destination => request_annotation

        regex => true

        exact => true

        dictionary_path => "/home/elktest/api_annotation.yaml"

        override => true

    }

}

Elasticsearch
Elasticsearch承载了数据存储和查询的功能,其基础概念和使用方法可以参考另一篇博文Elasticsearch使用总结,这里主要介绍些实际生产中的问题和方法:
1)关于集群配置,重点关注三个参数:第一,discovery.zen.ping.unicast.hosts,Elasticsearch默认使用Zen Discovery来做节点发现机制,推荐使用unicast来做通信方式,在该配置项中列举出Master节点。第二,discovery.zen.minimum_master_nodes,该参数表示集群中可工作的具有Master节点资格的最小数量,默认值是1。为了提高集群的可用性,避免脑裂现象(所谓脑裂,就是同一个集群中的不同节点,对集群的状态有不一致的理解。),官方推荐设置为(N/2)+1,其中N是具有Master资格的节点的数量。第三,discovery.zen.ping_timeout,表示节点在发现过程中的等待时间,默认值是3秒,可以根据自身网络环境进行调整,一定程度上提供可用性。

1

2

3

discovery.zen.ping.unicast.hosts: ["master1", "master2", "master3"]

discovery.zen.minimum_master_nodes: 2

discovery.zen.ping_timeout: 10

2)关于集群节点,第一,节点类型包括:候选Master节点、数据节点和Client节点。通过设置两个配置项node.master和node.data为true或false,来决定将一个节点分配为什么类型的节点。第二,尽量将候选Master节点和Data节点分离开,通常Data节点负载较重,需要考虑单独部署。
3)关于内存,Elasticsearch默认设置的内存是1GB,对于任何一个业务部署来说,这个都太小了。通过指定ES_HEAP_SIZE环境变量,可以修改其堆内存大小,服务进程在启动时候会读取这个变量,并相应的设置堆的大小。建议设置系统内存的一半给Elasticsearch,但是不要超过32GB。参考官方文档。
4)关于硬盘空间,Elasticsearch默认将数据存储在/var/lib/elasticsearch路径下,随着数据的增长,一定会出现硬盘空间不够用的情形,此时就需要给机器挂载新的硬盘,并将Elasticsearch的路径配置到新硬盘的路径下。通过“path.data”配置项来进行设置,比如“path.data: /data1,/var/lib/elasticsearch,/data”。需要注意的是,同一分片下的数据只能写入到一个路径下,因此还是需要合理的规划和监控硬盘的使用。
5)关于Index的划分和分片的个数,这个需要根据数据量来做权衡了,Index可以按时间划分,比如每月一个或者每天一个,在Logstash输出时进行配置,shard的数量也需要做好控制。
6)关于监控,笔者使用过head和marvel两个监控插件,head免费,功能相对有限,marvel现在需要收费了。另外,不要在数据节点开启监控插件。

Kibana
Kibana提供的是数据查询和显示的Web服务,有丰富的图表样板,能满足大部分的数据可视化需求,这也是很多人选择ELK的主要原因之一。UI的操作没有什么特别需要介绍的,经常使用就会熟练,这里主要介绍经常遇到的三个问题。
a)查询语法
在Kibana的Discover页面中,可以输入一个查询条件来查询所需的数据。查询条件的写法使用的是Elasticsearch的Query String语法,而不是Query DSL,参考官方文档query-string-syntax,这里列举其中部分常用的:
.单字段的全文检索,比如搜索args字段中包含first的文档,写作 args:first;
.单字段的精确检索,比如搜索args字段值为first的文档,写作 args: “first”;
.多个检索条件的组合,使用 NOT, AND 和 OR 来组合,注意必须是大写,比如 args:(“first” OR “second”) AND NOT agent: “third”;
.字段是否存在,_exists_:agent表示要求agent字段存在,_missing_:agent表示要求agent字段不存在;
.通配符:用 ? 表示单字母,* 表示任意个字母。
b)错误“Discover: Request Timeout after 30000ms”
这个错误经常发生在要查询的数据量比较大的情况下,此时Elasticsearch需要较长时间才能返回,导致Kibana发生Timeout报错。解决这个问题的方法,就是在Kibana的配置文件中修改elasticsearch.requestTimeout一项的值,然后重启Kibana服务即可,注意单位是ms。
c)疑惑“字符串被分解了”
经常碰到这样一个问题:为什么查询结果的字段值是正确的,可是做图表时却发现字段值被分解了,不是想要的结果?如下图所示的client_agent_info字段。

得到这样一个不正确结果的原因是使用了Analyzed字段来做图表分析,默认情况下Elasticsearch会对字符串数据进行分析,建立倒排索引,所以如果对这么一个字段进行terms聚合,必然会得到上面所示的错误结果了。那么应该怎么做才对?默认情况下,Elasticsearch还会创建一个相对应的没有被Analyzed的字段,即带“.raw”后缀的字段,在这样的字段上做聚合分析即可。
又会有很多人问这样的问题:为什么我的Elasticsearch没有自动创建带“.raw”后缀的字段?然而在Logstash中输出数据时,设置index名称前缀为“logstash-”就有了这个字段。这个问题的根源是Elasticsearch的dynamic template在捣鬼,dynamic temlate用于指导Elasticsearch如何为插入的数据自动建立Schema映射关系,默认情况下,Logstash会在Elasticsearch中建立一个名为“logstash”的模板,所有前缀为“logstash-”的index都会参照这个模板来建立映射关系,在该模板中申明了要为每个字符串数据建立一个额外的带“.raw”后缀的字段。可以向Elasticsearch来查询你的模板,使用API:GET http://localhost:9200/\_template。

以上便是对ELK日志系统的总结介绍,还有一个重要的功能没有提到,就是如何将日志数据与自身产品业务的数据融合起来。举个例子,在nginx日志中,通常会包含API请求访问时携带的用户Token信息,由于Token是有时效性的,我们需要及时将这些Token转换成真实的用户信息存储下来。这样的需求通常有两种实现方式,一种是自己写一个Logstash filter,然后在Logstash处理数据时调用;另一种是将Logstash Indexer产生的数据再次输出到消息队列中,由我们自己的脚本程序从消息队列中取出数据,做相应的业务处理后,输出到Elasticsearch中。

==================ELK环境部署==================

(0)基础环境介绍

系统: Centos7.1
防火墙: 关闭
Sellinux: 关闭

机器环境: 两台
elk-node1: 192.168.1.160       #master机器
elk-node2:192.168.1.161      #slave机器

注明:
master-slave模式:
master收集到日志后,会把一部分数据碎片到salve上(随机的一部分数据);同时,master和slave又都会各自做副本,并把副本放到对方机器上,这样就保证了数据不会丢失。
如果master宕机了,那么客户端在日志采集配置中将elasticsearch主机指向改为slave,就可以保证ELK日志的正常采集和web展示。

==========================================================================
由于elk-node1和elk-node2两台是虚拟机,没有外网ip,所以访问需要通过宿主机进行代理转发实现。

有以下两种转发设置:(任选其一)

通过访问宿主机的19200,19201端口分别转发到elk-node1,elk-node2的9200端口
通过访问宿主机的15601端口转发到elk-node1的5601端口

宿主机:112.110.115.10(内网ip为192.168.1.7)  (为了不让线上的真实ip暴露,这里任意给了一个ip做记录)

a)通过宿主机的haproxy服务进行代理转发,如下是宿主机上的代理配置:

[root@kvm-server conf]# pwd
/usr/local/haproxy/conf
[root@kvm-server conf]# cat haproxy.cfg
……….
……….
listen node1-9200 0.0.0.0:19200
mode tcp
option tcplog
balance roundrobin
server 192.168.1.160 192.168.1.160:9200 weight 1 check inter 1s rise 2 fall 2

listen node2-9200 0.0.0.0:19201
mode tcp
option tcplog
balance roundrobin
server 192.168.1.161 192.168.1.161:9200 weight 1 check inter 1s rise 2 fall 2

listen node1-5601 0.0.0.0:15601
mode tcp
option tcplog
balance roundrobin
server 192.168.1.160 192.168.1.160:5601 weight 1 check inter 1s rise 2 fall 2

重启haproxy服务
[root@kvm-server conf]# /etc/init.d/haproxy restart

设置宿主机防火墙
[root@kvm-server conf]# cat /etc/sysconfig/iptables
………
-A INPUT -p tcp -m state –state NEW -m tcp –dport 19200 -j ACCEPT
-A INPUT -p tcp -m state –state NEW -m tcp –dport 19201 -j ACCEPT
-A INPUT -p tcp -m state –state NEW -m tcp –dport 15601 -j ACCEPT

[root@kvm-server conf]# /etc/init.d/iptables restart

b)通过宿主机的NAT端口转发实现

[root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp –dport 19200 -j DNAT –to-destination 192.168.1.160:9200
[root@kvm-server conf]# iptables -t nat -A POSTROUTING -d 192.168.1.160/32 -p tcp -m tcp –sport 9200 -j SNAT –to-source 192.168.1.7
[root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state –state NEW -m tcp –dport 19200 -j ACCEPT

[root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp –dport 19201 -j DNAT –to-destination 192.168.1.161:9200
[root@kvm-server conf]# iptables -t nat -A POSTROUTING -d 192.168.1.161/32 -p tcp -m tcp –sport 9200 -j SNAT –to-source 192.168.1.7
[root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state –state NEW -m tcp –dport 19201 -j ACCEPT

[root@kvm-server conf]# iptables -t nat -A PREROUTING -p tcp -m tcp –dport 15601 -j DNAT –to-destination 192.168.1.160:5601
[root@kvm-server conf]# iptables -t nat -A POSTROUTING -d 192.168.1.160/32 -p tcp -m tcp –sport 5601 -j SNAT –to-source 192.168.1.7
[root@kvm-server conf]# iptables -t filter -A INPUT -p tcp -m state –state NEW -m tcp –dport 15601 -j ACCEPT

[root@kvm-server conf]# service iptables save
[root@kvm-server conf]# service iptables restart

提醒一点:
nat端口转发设置成功后,/etc/sysconfig/iptables文件里要注释掉下面两行!不然nat转发会有问题!一般如上面在nat转发规则设置好并save和restart防火墙之后就会自动在/etc/sysconfig/iptables文件里删除掉下面两行内容了。
[root@kvm-server conf]# vim /etc/sysconfig/iptables
……….
#-A INPUT -j REJECT –reject-with icmp-host-prohibited
#-A FORWARD -j REJECT –reject-with icmp-host-prohibited
[root@linux-node1 ~]# service iptables restart

=============================================================

(1)Elasticsearch安装配置

基础环境安装(elk-node1和elk-node2同时操作)

1)下载并安装GPG Key
[root@elk-node1 ~]# rpm –import https://packages.elastic.co/GPG-KEY-elasticsearch

2)添加yum仓库
[root@elk-node1 ~]# vim /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-2.x]
name=Elasticsearch repository for 2.x packages
baseurl=http://packages.elastic.co/elasticsearch/2.x/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

3)安装elasticsearch
[root@elk-node1 ~]# yum install -y elasticsearch

4)安装相关测试软件
#提前先下载安装epel源:epel-release-latest-7.noarch.rpm,否则yum会报错:No Package…..
[root@elk-node1 ~]# wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
[root@elk-node1 ~]# rpm -ivh epel-release-latest-7.noarch.rpm
#安装Redis
[root@elk-node1 ~]# yum install -y redis
#安装Nginx
[root@elk-node1 ~]# yum install -y nginx
#安装java
[root@elk-node1 ~]# yum install -y java

安装完java后,检测
[root@elk-node1 ~]# java -version
openjdk version “1.8.0_102”
OpenJDK Runtime Environment (build 1.8.0_102-b14)
OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)

配置部署(下面先进行elk-node1的配置)

1)配置修改配置文件
[root@elk-node1 ~]# mkdir -p /data/es-data
[root@elk-node1 ~]# vim /etc/elasticsearch/elasticsearch.yml                               【将里面内容情况,配置下面内容】
cluster.name: huanqiu                            # 组名(同一个组,组名必须一致)
node.name: elk-node1                            # 节点名称,建议和主机名一致
path.data: /data/es-data                         # 数据存放的路径
path.logs: /var/log/elasticsearch/             # 日志存放的路径
bootstrap.mlockall: true                         # 锁住内存,不被使用到交换分区去
network.host: 0.0.0.0                            # 网络设置
http.port: 9200                                    # 端口

2)启动并查看
[root@elk-node1 ~]# chown -R elasticsearch.elasticsearch /data/
[root@elk-node1 ~]# systemctl start elasticsearch
[root@elk-node1 ~]# systemctl status elasticsearch
CGroup: /system.slice/elasticsearch.service
└─3005 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSI…

注意:上面可以看出elasticsearch设置的内存最小256m,最大1g

=====================温馨提示:  Elasticsearch启动出现”could not find java”===================

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

yum方法安装elasticsearch, 使用"systemctl start elasticsearch"启动服务失败.

"systemctl status elasticsearch"查看, 发现报错说could not find java

但是"java -version" 查看发现java已经安装了

这是因为elasticsearch在启动过程中, 引用的java路径找不到

解决办法: 在elasticsearch配置文件中定义java全路径

[root@elk-node01 ~]

java version "1.8.0_131"

Java(TM) SE Runtime Environment (build 1.8.0_131-b11)

Java HotSpot(TM) 64-Bit Server VM (build 25.131-b11, mixed mode

[root@elk-node01 ~]

/var/lib/alternatives/java

/usr/share/swig/2.0.10/java

/usr/java

/usr/java/jdk1.8.0_131/bin/java

/usr/java/jdk1.8.0_131/jre/bin/java

/usr/bin/java

/etc/pki/java

/etc/pki/ca-trust/extracted/java

/etc/alternatives/java

[root@elk-node01 ~]

添加JAVA_HOME环境变量的配置

JAVA_HOME=/usr/java/jdk1.8.0_131

[root@linux-node1 src]# netstat -antlp |egrep “9200|9300”
tcp6 0 0 :::9200 :::* LISTEN 3005/java
tcp6 0 0 :::9300 :::* LISTEN 3005/java

然后通过web访问(访问的浏览器最好用google浏览器)

http://112.110.115.10:19200/

3)通过命令的方式查看数据(在112.110.115.10宿主机或其他外网服务器上查看,如下)
[root@kvm-server src]# curl -i -XGET ‘http://192.168.1.160:9200/\_count?pretty‘ -d ‘{“query”:{“match_all”:{}}}’
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 95

{
“count” : 0,
“_shards” : {
“total” : 0,
“successful” : 0,
“failed” : 0
}
}

这样感觉用命令来查看,特别的不爽。

4)接下来安装插件,使用插件进行查看~  (下面两个插件要在elk-node1和elk-node2上都要安装)
4.1)安装head插件
==================================================================
a)插件安装方法一
[root@elk-node1 src]# /usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head

b)插件安装方法二
首先下载head插件,下载到/usr/loca/src目录下
下载地址:https://github.com/mobz/elasticsearch-head

======================================================
head插件包百度云盘下载:https://pan.baidu.com/s/1boBE0qj
提取密码:ifj7
======================================================

[root@elk-node1 src]# unzip elasticsearch-head-master.zip
[root@elk-node1 src]# ls
elasticsearch-head-master elasticsearch-head-master.zip

在/usr/share/elasticsearch/plugins目录下创建head目录
然后将上面下载的elasticsearch-head-master.zip解压后的文件都移到/usr/share/elasticsearch/plugins/head下
接着重启elasticsearch服务即可!
[root@elk-node1 src]# cd /usr/share/elasticsearch/plugins/
[root@elk-node1 plugins]# mkdir head
[root@elk-node1 plugins]# ls
head
[root@elk-node1 plugins]# cd head
[root@elk-node1 head]# cp -r /usr/local/src/elasticsearch-head-master/* ./
[root@elk-node1 head]# pwd
/usr/share/elasticsearch/plugins/head

[root@elk-node1 head]# chown -R elasticsearch:elasticsearch /usr/share/elasticsearch/plugins
[root@elk-node1 head]# ll
total 40
-rw-r–r–. 1 elasticsearch elasticsearch 104 Sep 28 01:57 elasticsearch-head.sublime-project
-rw-r–r–. 1 elasticsearch elasticsearch 2171 Sep 28 01:57 Gruntfile.js
-rw-r–r–. 1 elasticsearch elasticsearch 3482 Sep 28 01:57 grunt_fileSets.js
-rw-r–r–. 1 elasticsearch elasticsearch 1085 Sep 28 01:57 index.html
-rw-r–r–. 1 elasticsearch elasticsearch 559 Sep 28 01:57 LICENCE
-rw-r–r–. 1 elasticsearch elasticsearch 795 Sep 28 01:57 package.json
-rw-r–r–. 1 elasticsearch elasticsearch 100 Sep 28 01:57 plugin-descriptor.properties
-rw-r–r–. 1 elasticsearch elasticsearch 5211 Sep 28 01:57 README.textile
drwxr-xr-x. 5 elasticsearch elasticsearch 4096 Sep 28 01:57 _site
drwxr-xr-x. 4 elasticsearch elasticsearch 29 Sep 28 01:57 src
drwxr-xr-x. 4 elasticsearch elasticsearch 66 Sep 28 01:57 test

[root@elk-node1 _site]# systemctl restart elasticsearch
=========================================================================

插件访问(最好提前将elk-node2节点的配置和插件都安装后,再来进行访问和数据插入测试)
http://112.110.115.10:19200/\_plugin/head/

先插入数据实例,测试下
如下:打开”复合查询“,在POST选项下,任意输入如/index-demo/test,然后在下面输入数据(注意内容之间换行的逗号不要漏掉);
数据输入好之后(如下输入wangshibo;hello world内容),下面点击”验证JSON“->”提交请求“,提交成功后,观察右栏里出现的信息:有index,type,version等信息,failed:0(成功消息)

再查看测试实例,如下:
“复合查询”下,选择GET选项,在/index-demo/test/后面输入上面POST结果中的id号,不输入内容,即{}括号里为空!
然后点击”验证JSON“->”提交请求”,观察右栏内就有了上面插入的数据了(即wangshibo,hello world)

打开”基本查询”,查看下数据,如下,即可查询到上面插入的数据:

打开“数据浏览”,也能查看到插入的数据:

如下:一定要提前在elk-node2节点上也完成配置(配置内容在下面提到),否则上面插入数据后,集群状态会呈现黄色yellow状态,elk-node2完成配置加入到集群里后就会恢复到正常的绿色状态。

4.2)安装kopf监控插件
==========================================================================

a)监控插件安装方法一

[root@elk-node1 src]# /usr/share/elasticsearch/bin/plugin install lmenezes/elasticsearch-kopf

b)监控插件安装方法二

首先下载监控插件kopf,下载到/usr/loca/src目录下
下载地址:https://github.com/lmenezes/elasticsearch-kopf

====================================================
kopf插件包百度云盘下载:https://pan.baidu.com/s/1qYixSL2
提取密码:ya4t
===================================================

[root@elk-node1 src]# unzip elasticsearch-kopf-master.zip
[root@elk-node1 src]# ls
elasticsearch-kopf-master elasticsearch-kopf-master.zip

在/usr/share/elasticsearch/plugins目录下创建kopf目录
然后将上面下载的elasticsearch-kopf-master.zip解压后的文件都移到/usr/share/elasticsearch/plugins/kopf下
接着重启elasticsearch服务即可!
[root@elk-node1 src]# cd /usr/share/elasticsearch/plugins/
[root@elk-node1 plugins]# mkdir kopf
[root@elk-node1 plugins]# cd kopf
[root@elk-node1 kopf]# cp -r /usr/local/src/elasticsearch-kopf-master/* ./
[root@elk-node1 kopf]# pwd
/usr/share/elasticsearch/plugins/kopf

[root@elk-node1 kopf]# chown -R elasticsearch:elasticsearch /usr/share/elasticsearch/plugins
[root@elk-node1 kopf]# ll
total 40
-rw-r–r–. 1 elasticsearch elasticsearch 237 Sep 28 16:28 CHANGELOG.md
drwxr-xr-x. 2 elasticsearch elasticsearch 22 Sep 28 16:28 dataset
drwxr-xr-x. 2 elasticsearch elasticsearch 73 Sep 28 16:28 docker
-rw-r–r–. 1 elasticsearch elasticsearch 4315 Sep 28 16:28 Gruntfile.js
drwxr-xr-x. 2 elasticsearch elasticsearch 4096 Sep 28 16:28 imgs
-rw-r–r–. 1 elasticsearch elasticsearch 1083 Sep 28 16:28 LICENSE
-rw-r–r–. 1 elasticsearch elasticsearch 1276 Sep 28 16:28 package.json
-rw-r–r–. 1 elasticsearch elasticsearch 102 Sep 28 16:28 plugin-descriptor.properties
-rw-r–r–. 1 elasticsearch elasticsearch 3165 Sep 28 16:28 README.md
drwxr-xr-x. 6 elasticsearch elasticsearch 4096 Sep 28 16:28 _site
drwxr-xr-x. 4 elasticsearch elasticsearch 27 Sep 28 16:28 src
drwxr-xr-x. 4 elasticsearch elasticsearch 4096 Sep 28 16:28 tests

[root@elk-node1 _site]# systemctl restart elasticsearch

============================================================================

访问插件:(如下,同样要提前安装好elk-node2节点上的插件,否则访问时会出现集群节点为黄色的yellow告警状态)

http://112.110.115.10:19200/\_plugin/kopf/#!/cluster

*************************************************************************
下面进行节点elk-node2的配置  (如上的两个插件也在elk-node2上同样安装)

注释:其实两个的安装配置基本上是一样的。

[root@elk-node2 src]# mkdir -p /data/es-data
[root@elk-node2 ~]# cat /etc/elasticsearch/elasticsearch.yml
cluster.name: huanqiu
node.name: elk-node2
path.data: /data/es-data
path.logs: /var/log/elasticsearch/
bootstrap.mlockall: true
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicast.hosts: [“192.168.1.160”, “192.168.1.161”]

# 修改权限配置
[root@elk-node2 src]# chown -R elasticsearch.elasticsearch /data/

# 启动服务
[root@elk-node2 src]# systemctl start elasticsearch
[root@elk-node2 src]# systemctl status elasticsearch
● elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; enabled; vendor preset: disabled)
Active: active (running) since Wed 2016-09-28 16:49:41 CST; 1 weeks 3 days ago
Docs: http://www.elastic.co
Process: 17798 ExecStartPre=/usr/share/elasticsearch/bin/elasticsearch-systemd-pre-exec (code=exited, status=0/SUCCESS)
Main PID: 17800 (java)
CGroup: /system.slice/elasticsearch.service
└─17800 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFra…

Oct 09 13:42:22 elk-node2 elasticsearch[17800]: [2016-10-09 13:42:22,295][WARN ][transport ] [elk-node2] Transport res…943817]
Oct 09 13:42:23 elk-node2 elasticsearch[17800]: [2016-10-09 13:42:23,111][WARN ][transport ] [elk-node2] Transport res…943846]
…………….
…………….

# 查看端口
[root@elk-node2 src]# netstat -antlp|egrep “9200|9300”
tcp6 0 0 :::9200 :::* LISTEN 2928/java
tcp6 0 0 :::9300 :::* LISTEN 2928/java
tcp6 0 0 127.0.0.1:48200 127.0.0.1:9300 TIME_WAIT -
tcp6 0 0 ::1:41892 ::1:9300 TIME_WAIT -
*************************************************************************

通过命令的方式查看elk-node2数据(在112.110.115.10宿主机或其他外网服务器上查看,如下)
[root@kvm-server ~]# curl -i -XGET ‘http://192.168.1.161:9200/\_count?pretty‘ -d ‘{“query”:{“match_all”:{}}}’
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 95

{
“count” : 1,
“_shards” : {
“total” : 5,
“successful” : 5,
“failed” : 0
}

然后通过web访问elk-node2
http://112.110.115.10:19201/

访问两个插件:
http://112.110.115.10:19201/\_plugin/head/
http://112.110.115.10:19201/\_plugin/kopf/#!/cluster

 

 (2)Logstash安装配置(这个在客户机上是要安装的。elk-node1和elk-node2都安装)

基础环境安装(客户端安装logstash,收集到的数据写入到elasticsearch里,就可以登陆logstash界面查看到了

1)下载并安装GPG Key
[root@elk-node1 ~]# rpm –import https://packages.elastic.co/GPG-KEY-elasticsearch

2)添加yum仓库
[root@hadoop-node1 ~]# vim /etc/yum.repos.d/logstash.repo
[logstash-2.1]
name=Logstash repository for 2.1.x packages
baseurl=http://packages.elastic.co/logstash/2.1/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1

3)安装logstash
[root@elk-node1 ~]# yum install -y logstash

4)logstash启动
[root@elk-node1 ~]# systemctl start elasticsearch
[root@elk-node1 ~]# systemctl status elasticsearch
● elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; disabled; vendor preset: disabled)
Active: active (running) since Mon 2016-11-07 18:33:28 CST; 3 days ago
Docs: http://www.elastic.co
Main PID: 8275 (java)
CGroup: /system.slice/elasticsearch.service
└─8275 /bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFrac…
……….
……….

数据的测试

1)基本的输入输出
[root@elk-node1 ~]# /opt/logstash/bin/logstash -e ‘input { stdin{} } output { stdout{} }’
Settings: Default filter workers: 1
Logstash startup completed
hello                                                                                     #输入这个
2016-11-11T06:41:07.690Z elk-node1 hello                        #输出这个
wangshibo                                                                            #输入这个
2016-11-11T06:41:10.608Z elk-node1 wangshibo               #输出这个

2)使用rubydebug详细输出
[root@elk-node1 ~]# /opt/logstash/bin/logstash -e ‘input { stdin{} } output { stdout{ codec => rubydebug} }’
Settings: Default filter workers: 1
Logstash startup completed
hello                                                                                    #输入这个
{                                                                                         #输出下面信息
           “message” => “hello”,
           “@version” => “1”,
      “@timestamp” => “2016-11-11T06:44:06.711Z”,
                  “host” => “elk-node1”
}
wangshibo                                                                         #输入这个
{                                                                                       #输出下面信息
         “message” => “wangshibo”,
        “@version” => “1”,
   “@timestamp” => “2016-11-11T06:44:11.270Z”,
               “host” => “elk-node1”
}

  1. 把内容写到elasticsearch中
    [root@elk-node1 ~]# /opt/logstash/bin/logstash -e ‘input { stdin{} } output { elasticsearch { hosts => [“192.168.1.160:9200”]} }’
    Settings: Default filter workers: 1
    Logstash startup completed                       #输入下面的测试数据
    123456
    wangshibo
    huanqiu
    hahaha

使用rubydebug和写到elasticsearch中的区别:其实就在于后面标准输出的区别,前者使用codec;后者使用elasticsearch

写到elasticsearch中在logstash中查看,如下图:
注意:
master收集到日志后,会把一部分数据碎片到salve上(随机的一部分数据),master和slave又都会各自做副本,并把副本放到对方机器上,这样就保证了数据不会丢失。
如下,master收集到的数据放到了自己的第1,3分片上,其他的放到了slave的第0,2,4分片上。

4)即写到elasticsearch中又写在文件中一份
[root@elk-node1 ~]# /opt/logstash/bin/logstash -e ‘input { stdin{} } output { elasticsearch { hosts => [“192.168.1.160:9200”]} stdout{ codec => rubydebug}}’
Settings: Default filter workers: 1
Logstash startup completed
huanqiupc
{
           “message” => “huanqiupc”,
          “@version” => “1”,
     “@timestamp” => “2016-11-11T07:27:42.012Z”,
                 “host” => “elk-node1”
}
wangshiboqun
{
         “message” => “wangshiboqun”,
        “@version” => “1”,
   “@timestamp” => “2016-11-11T07:27:55.396Z”,
               “host” => “elk-node1”
}

以上文本可以长期保留、操作简单、压缩比大。下面登陆elasticsearch界面中查看;

 logstash的配置和文件的编写

1)logstash的配置
简单的配置方式:
[root@elk-node1 ~]# vim /etc/logstash/conf.d/01-logstash.conf
input { stdin { } }
output {
        elasticsearch { hosts => [“192.168.1.160:9200”]}
        stdout { codec => rubydebug }
}

它的执行:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/01-logstash.conf
Settings: Default filter workers: 1
Logstash startup completed
beijing                                                #输入内容
{                                                       #输出下面信息
             “message” => “beijing”,
            “@version” => “1”,
       “@timestamp” => “2016-11-11T07:41:48.401Z”,
                   “host” => “elk-node1”
}

===============================================================
参考内容:
https://www.elastic.co/guide/en/logstash/current/configuration.html
https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html
===============================================================

2)收集系统日志

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

[root@elk-node1 ~]# vim  file.conf

input {

    file {

      path => "/var/log/messages"

      type => "system"

      start_position => "beginning"

    }

}

output {

    elasticsearch {

       hosts => ["192.168.1.160:9200"]

       index => "system-%{+YYYY.MM.dd}"

    }

}

执行上面日志信息的收集,如下,这个命令会一直在执行中,表示日志在监控收集中;如果中断,就表示日志不在收集!所以需要放在后台执行~
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

登陆elasticsearch界面,查看本机系统日志的信息:

 

================================================================
参考内容:
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
================================================================

3)收集java日志,其中包含上面讲到的日志收集

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

[root@elk-node1 ~]# vim  file.conf

input {

    file {

      path => "/var/log/messages"

      type => "system"

      start_position => "beginning"

    }

}

input {

    file {

       path => "/var/log/elasticsearch/huanqiu.log"

       type => "es-error"

       start_position => "beginning"

    }

}

output {

    if [type] == "system"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "system-%{+YYYY.MM.dd}"

        }

    }

    if [type] == "es-error"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "es-error-%{+YYYY.MM.dd}"

        }

    }

}

注意:
如果你的日志中有type字段 那你就不能在conf文件中使用type

执行如下命令收集:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

登陆elasticsearch界面,查看数据:

====================================================================
参考内容:
https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html
====================================================================

有个问题:
每个报错都给收集成一行了,不是按照一个报错,一个事件模块收集的。

下面将行换成事件的方式展示:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

[root@elk-node1 ~]# vim multiline.conf

input {

    stdin {

       codec => multiline {

          pattern => "^\["

          negate => true

          what => "previous"

        }

    }

}

output {

    stdout {

      codec => "rubydebug"

     

}

执行命令:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f multiline.conf

Settings: Default filter workers: 1

Logstash startup completed

123

456

[123

{

    "@timestamp" => "2016-11-11T09:28:56.824Z",

       "message" => "123\n456",

      "@version" => "1",

          "tags" => [

        [0] "multiline"

    ],

          "host" => "elk-node1"

}

123]

[456]

{

    "@timestamp" => "2016-11-11T09:29:09.043Z",

       "message" => "[123\n123]",

      "@version" => "1",

          "tags" => [

        [0] "multiline"

    ],

          "host" => "elk-node1"

}

在没有遇到[的时候,系统不会收集,只有遇见[的时候,才算是一个事件,才收集起来。
======================================================================
参考内容
https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html
======================================================================

(3)Kibana安装配置

1)kibana的安装:
[root@elk-node1 ~]# cd /usr/local/src
[root@elk-node1 src]# wget https://download.elastic.co/kibana/kibana/kibana-4.3.1-linux-x64.tar.gz
[root@elk-node1 src]# tar zxf kibana-4.3.1-linux-x64.tar.gz
[root@elk-node1 src]# mv kibana-4.3.1-linux-x64 /usr/local/
[root@elk-node1 src]# ln -s /usr/local/kibana-4.3.1-linux-x64/ /usr/local/kibana

2)修改配置文件:
[root@elk-node1 config]# pwd
/usr/local/kibana/config
[root@elk-node1 config]# cp kibana.yml kibana.yml.bak
[root@elk-node1 config]# vim kibana.yml
server.port: 5601
server.host: “0.0.0.0”
elasticsearch.url: “http://192.168.1.160:9200
kibana.index: “.kibana”        #注意这个.Kibana索引用来存储数据,千万不要删除了它。它是将es数据通过kibana进行web展示的关键。这个配置后,在es的web界面里就会看到这个.kibana索引。

因为他一直运行在前台,要么选择开一个窗口,要么选择使用screen。
安装并使用screen启动kibana:
[root@elk-node1 ~]# yum -y install screen
[root@elk-node1 ~]# screen                          #这样就另开启了一个终端窗口
[root@elk-node1 ~]# /usr/local/kibana/bin/kibana
log [18:23:19.867] [info][status][plugin:kibana] Status changed from uninitialized to green - Ready
log [18:23:19.911] [info][status][plugin:elasticsearch] Status changed from uninitialized to yellow - Waiting for Elasticsearch
log [18:23:19.941] [info][status][plugin:kbn_vislib_vis_types] Status changed from uninitialized to green - Ready
log [18:23:19.953] [info][status][plugin:markdown_vis] Status changed from uninitialized to green - Ready
log [18:23:19.963] [info][status][plugin:metric_vis] Status changed from uninitialized to green - Ready
log [18:23:19.995] [info][status][plugin:spyModes] Status changed from uninitialized to green - Ready
log [18:23:20.004] [info][status][plugin:statusPage] Status changed from uninitialized to green - Ready
log [18:23:20.010] [info][status][plugin:table_vis] Status changed from uninitialized to green - Ready

然后按ctrl+a+d组合键,这样在上面另启的screen屏里启动的kibana服务就一直运行在前台了….
[root@elk-node1 ~]# screen -ls
There is a screen on:
15041.pts-0.elk-node1 (Detached)
1 Socket in /var/run/screen/S-root.

(3)访问kibana:http://112.110.115.10:15601/
如下,如果是添加上面设置的java日志收集信息,则在下面填写es-error*;如果是添加上面设置的系统日志信息system*,以此类型(可以从logstash界面看到日志收集项)

 然后点击上面的Discover,在Discover中查看:

查看日志登陆,需要点击“Discover”–>”message”,点击它后面的“add”
注意:
需要右边查看日志内容时带什么属性,就在左边点击相应属性后面的“add”
如下图,添加了message和path的属性:

这样,右边显示的日志内容的属性就带了message和path

点击右边日志内容属性后面隐藏的<<,就可将内容向前缩进

添加新的日志采集项,点击Settings->+Add New,比如添加system系统日志。注意后面的*不要忘了。

删除kibana里的日志采集项,如下,点击删除图标即可。

如果打开kibana查看日志,发现没有日志内容,出现“No results found”,如下图所示,这说明要查看的日志在当前时间没有日志信息输出,可以点击右上角的时间钟来调试日志信息的查看。

4)收集nginx的访问日志

修改nginx的配置文件,分别在nginx.conf的http和server配置区域添加下面内容:

##### http 标签中
          log_format json ‘{“@timestamp”:”$time_iso8601”,’
                           ‘“@version”:”1”,’
                           ‘“client”:”$remote_addr”,’
                           ‘“url”:”$uri”,’
                           ‘“status”:”$status”,’
                           ‘“domain”:”$host”,’
                           ‘“host”:”$server_addr”,’
                           ‘“size”:$body_bytes_sent,’
                           ‘“responsetime”:$request_time,’
                           ‘“referer”: “$http_referer”,’
                           ‘“ua”: “$http_user_agent”‘
‘}’;
##### server标签中
            access_log /var/log/nginx/access_json.log json;

截图如下:

启动nginx服务:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

[root@elk-node1 ~]# systemctl start nginx

[root@elk-node1 ~]# systemctl status nginx

● nginx.service - The nginx HTTP and reverse proxy server

   Loaded: loaded (/usr/lib/systemd/system/nginx.service; disabled; vendor preset: disabled)

   Active: active (running) since Fri 2016-11-11 19:06:55 CST; 3s ago

  Process: 15119 ExecStart=/usr/sbin/nginx (code=exited, status=0/SUCCESS)

  Process: 15116 ExecStartPre=/usr/sbin/nginx -t (code=exited, status=0/SUCCESS)

  Process: 15114 ExecStartPre=/usr/bin/rm -f /run/nginx.pid (code=exited, status=0/SUCCESS)

 Main PID: 15122 (nginx)

   CGroup: /system.slice/nginx.service

           ├─15122 nginx: master process /usr/sbin/nginx

           ├─15123 nginx: worker process

           └─15124 nginx: worker process

Nov 11 19:06:54 elk-node1 systemd[1]: Starting The nginx HTTP and reverse proxy server...

Nov 11 19:06:55 elk-node1 nginx[15116]: nginx: the configuration file /etc/nginx/nginx.conf syntax is ok

Nov 11 19:06:55 elk-node1 nginx[15116]: nginx: configuration file /etc/nginx/nginx.conf test is successful

Nov 11 19:06:55 elk-node1 systemd[1]: Started The nginx HTTP and reverse proxy server.

编写收集文件
这次使用json的方式收集:

1

2

3

4

5

6

7

8

9

10

11

12

13

[root@elk-node1 ~]# vim json.conf

input {

   file {

      path => "/var/log/nginx/access_json.log"

      codec => "json"

   }

}

output {

   stdout {

      codec => "rubydebug"

   }

}

启动日志收集程序:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f json.conf        #或加个&放在后台执行

访问nginx页面(在elk-node1的宿主机上执行访问页面的命令:curl http://192.168.1.160)就会出现以下内容:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f json.conf

Settings: Default filter workers: 1

Logstash startup completed

{

      "@timestamp" => "2016-11-11T11:10:53.000Z",

        "@version" => "1",

          "client" => "192.168.1.7",

             "url" => "/index.html",

          "status" => "200",

          "domain" => "192.168.1.160",

            "host" => "192.168.1.160",

            "size" => 3700,

    "responsetime" => 0.0,

         "referer" => "-",

              "ua" => "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2",

            "path" => "/var/log/nginx/access_json.log"

}

注意:
上面的json.conf配置只是将nginx日志输出,还没有输入到elasticsearch里,所以这个时候在elasticsearch界面里是采集不到nginx日志的。

需要配置一下,将nginx日志输入到elasticsearch中,将其汇总到总文件file.conf里,如下也将nginx-log日志输入到elasticserach里:(后续就可以只用这个汇总文件,把要追加的日志汇总到这个总文件里即可)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

[root@elk-node1 ~]# cat file.conf

input {

    file {

      path => "/var/log/messages"

      type => "system"

      start_position => "beginning"

    }

    file {

       path => "/var/log/elasticsearch/huanqiu.log"

       type => "es-error"

       start_position => "beginning"

       codec => multiline {

           pattern => "^\["

           negate => true

           what => "previous"

       }

    }

    file {

       path => "/var/log/nginx/access_json.log"

       codec => json

       start_position => "beginning"

       type => "nginx-log"

    }

}

output {

    if [type] == "system"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "system-%{+YYYY.MM.dd}"

        }

    }

    if [type] == "es-error"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "es-error-%{+YYYY.MM.dd}"

        }

    }

    if [type] == "nginx-log"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "nignx-log-%{+YYYY.MM.dd}"

        }

    }

}

可以加上--configtest参数,测试下配置文件是否有语法错误或配置不当的地方,这个很重要!!
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf –configtest
Configuration OK

然后接着执行logstash命令(由于上面已经将这个执行命令放到了后台,所以这里其实不用执行,也可以先kill之前的,再放后台执行),然后可以再访问nginx界面测试下
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

登陆elasticsearch界面查看:

 将nginx日志整合到kibana界面里,如下:

5)收集系统日志

编写收集文件并执行。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

[root@elk-node1 ~]# cat syslog.conf

input {

    syslog {

        type => "system-syslog"

        host => "192.168.1.160"

        port => "514"

    }

}

output {

    stdout {

        codec => "rubydebug"

    }

}

对上面的采集文件进行执行:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f syslog.conf

重新开启一个窗口,查看服务是否启动:
[root@elk-node1 ~]# netstat -ntlp|grep 514
tcp6 0 0 192.168.1.160:514 :::* LISTEN 17842/java
[root@elk-node1 ~]# vim /etc/rsyslog.conf
#*.* @@remote-host:514                                                           【在此行下面添加如下内容】
*.* @@192.168.1.160:514

[root@elk-node1 ~]# systemctl restart rsyslog

回到原来的窗口(即上面采集文件的执行终端),就会出现数据:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f syslog.conf

Settings: Default filter workers: 1

Logstash startup completed

{

           "message" => "Stopping System Logging Service...\n",

          "@version" => "1",

        "@timestamp" => "2016-11-13T10:35:30.000Z",

              "type" => "system-syslog",

              "host" => "192.168.1.160",

          "priority" => 30,

         "timestamp" => "Nov 13 18:35:30",

         "logsource" => "elk-node1",

           "program" => "systemd",

          "severity" => 6,

          "facility" => 3,

    "facility_label" => "system",

    "severity_label" => "Informational"

}

........

........

再次添加到总文件file.conf中:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

[root@elk-node1 ~]# cat file.conf

input {

    file {

      path => "/var/log/messages"

      type => "system"

      start_position => "beginning"

    }

    file {

       path => "/var/log/elasticsearch/huanqiu.log"

       type => "es-error"

       start_position => "beginning"

       codec => multiline {

           pattern => "^\["

           negate => true

           what => "previous"

       }

    }

    file {

       path => "/var/log/nginx/access_json.log"

       codec => json

       start_position => "beginning"

       type => "nginx-log"

    }

    syslog {

        type => "system-syslog"

        host => "192.168.1.160"

        port => "514"

    }

}

output {

    if [type] == "system"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "system-%{+YYYY.MM.dd}"

        }

    }

    if [type] == "es-error"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "es-error-%{+YYYY.MM.dd}"

        }

    }

    if [type] == "nginx-log"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "nignx-log-%{+YYYY.MM.dd}"

        }

    }

    if [type] == "system-syslog"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "system-syslog-%{+YYYY.MM.dd}"

        }

    }

}

执行总文件(先测试下总文件配置是否有误,然后先kill之前在后台启动的file.conf文件,再次执行):
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf --configtest
Configuration OK
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f file.conf &

测试:
向日志中添加数据,看elasticsearch和kibana的变化:
[root@elk-node1 ~]# logger “hehehehehehe1”
[root@elk-node1 ~]# logger “hehehehehehe2”
[root@elk-node1 ~]# logger “hehehehehehe3”
[root@elk-node1 ~]# logger “hehehehehehe4”
[root@elk-node1 ~]# logger “hehehehehehe5”

添加到kibana界面中:

 

6)TCP日志的收集

编写日志收集文件,并执行:(有需要的话,可以将下面收集文件的配置汇总到上面的总文件file.conf里,进而输入到elasticsearch界面里和kibana里查看)
[root@elk-node1 ~]# cat tcp.conf
input {
tcp {
host => “192.168.1.160”
port => “6666”
}
}
output {
stdout {
codec => “rubydebug”
}
}

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf

开启另外一个窗口,测试一(安装nc命令:yum install -y nc):
[root@elk-node1 ~]# nc 192.168.1.160 6666 </etc/resolv.conf

回到原来的窗口(即上面采集文件的执行终端),就会出现数据:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf
Settings: Default filter workers: 1
Logstash startup completed
{
        “message” => “”,
       “@version” => “1”,
   “@timestamp” => “2016-11-13T11:01:15.280Z”,
              “host” => “192.168.1.160”,
              “port” => 49743
}

测试二:
[root@elk-node1 ~]# echo “hehe” | nc 192.168.1.160 6666
[root@elk-node1 ~]# echo “hehe” > /dev/tcp/192.168.1.160/6666

回到之前的执行端口,在去查看,就会显示出来:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f tcp.conf

Settings: Default filter workers: 1

Logstash startup completed<br>.......

{

       "message" => "hehe",

      "@version" => "1",

    "@timestamp" => "2016-11-13T11:39:58.263Z",

          "host" => "192.168.1.160",

          "port" => 53432

}

{

       "message" => "hehe",

      "@version" => "1",

    "@timestamp" => "2016-11-13T11:40:13.458Z",

          "host" => "192.168.1.160",

          "port" => 53457

}

7)使用filter
编写文件:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

[root@elk-node1 ~]# cat grok.conf

input {

    stdin{}

}

filter {

  grok {

    match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }

  }

}

output {

    stdout{

        codec => "rubydebug"

    }

}

执行检测:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f grok.conf

Settings: Default filter workers: 1

Logstash startup completed

55.3.244.1 GET /index.html 15824 0.043                    #输入这个,下面就会自动形成字典的形式

{

       "message" => "55.3.244.1 GET /index.html 15824 0.043",

      "@version" => "1",

    "@timestamp" => "2016-11-13T11:45:47.882Z",

          "host" => "elk-node1",

        "client" => "55.3.244.1",

        "method" => "GET",

       "request" => "/index.html",

         "bytes" => "15824",

      "duration" => "0.043"

}

其实上面使用的那些变量在程序中都有定义:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

[root@elk-node1 ~]# cd /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.2/patterns/

[root@elk-node1 patterns]# ls

aws     bro   firewalls      haproxy  junos         mcollective           mongodb  postgresql  redis

bacula  exim  grok-patterns  java     linux-syslog  mcollective-patterns  nagios   rails       ruby

[root@elk-node1 patterns]# cat grok-patterns

filter {

      # drop sleep events

    grok {

        match => { "message" =>"SELECT SLEEP" }

        add_tag => [ "sleep_drop" ]

        tag_on_failure => [] # prevent default _grokparsefailure tag on real records

      }

     if "sleep_drop" in [tags] {

        drop {}

     }

     grok {

        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n#\s*" ]

      }

      date {

        match => [ "timestamp", "UNIX" ]

        remove_field => [ "timestamp" ]

      }

}

8)mysql慢查询

收集文件:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

[root@elk-node1 ~]# cat mysql-slow.conf

input {

    file {

        path => "/root/slow.log"

        type => "mysql-slowlog"

        codec => multiline {

            pattern => "^# User@Host"

            negate => true

            what => "previous"

        }

    }

}

filter {

      # drop sleep events

    grok {

        match => { "message" =>"SELECT SLEEP" }

        add_tag => [ "sleep_drop" ]

        tag_on_failure => [] # prevent default _grokparsefailure tag on real records

      }

     if "sleep_drop" in [tags] {

        drop {}

     }

     grok {

        match => [ "message", "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @ (?:(?<clienthost>\S*) )?\[(?:%{IP:clientip})?\]\s+Id: %{NUMBER:row_id:int}\s*# Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent: %{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:int}\s*(?:use %{DATA:database};\s*)?SET timestamp=%{NUMBER:timestamp};\s*(?<query>(?<action>\w+)\s+.*)\n#\s*" ]

      }

      date {

        match => [ "timestamp", "UNIX" ]

        remove_field => [ "timestamp" ]

      }

}

output {

    stdout {

       codec =>"rubydebug"

    }

}

执行检测:
上面需要的/root/slow.log是自己上传的,然后自己插入数据保存后,会显示:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f mysql-slow.conf

Settings: Default filter workers: 1

Logstash startup completed

{

    "@timestamp" => "2016-11-14T06:53:54.100Z",

       "message" => "# Time: 161114 11:05:18",

      "@version" => "1",

          "path" => "/root/slow.log",

          "host" => "elk-node1",

          "type" => "mysql-slowlog",

          "tags" => [

        [0] "_grokparsefailure"

    ]

}

{

    "@timestamp" => "2016-11-14T06:53:54.105Z",

       "message" => "# User@Host: test[test] @  [124.65.197.154]\n# Query_time: 1.725889  Lock_time: 0.000430 Rows_sent: 0  Rows_examined: 0\nuse test_zh_o2o_db;\nSET timestamp=1479092718;\nSELECT trigger_name, event_manipulation, event_object_table, action_statement, action_timing, DEFINER FROM information_schema.triggers WHERE BINARY event_object_schema='test_zh_o2o_db' AND BINARY event_object_table='customer';\n# Time: 161114 12:10:30",

      "@version" => "1",

          "tags" => [

        [0] "multiline",

        [1] "_grokparsefailure"

    ],

          "path" => "/root/slow.log",

          "host" => "elk-node1",

          "type" => "mysql-slowlog"

}

.........

.........

======================================================================
接下来描述会遇见到的一个问题:
一旦我们的elasticsearch出现问题,就不能进行日志采集处理了!
这种情况下该怎么办呢?

解决方案;
可以在client和elasticsearch之间添加一个中间件作为缓存,先将采集到的日志内容写到中间件上,然后再从中间件输入到elasticsearch中。
这就完美的解决了上述的问题了。

(4)ELK中使用redis作为中间件,缓存日志采集内容

1)redis的配置和启动

[root@elk-node1 ~]# vim /etc/redis.conf               #修改下面两行内容
daemonize yes
bind 192.168.1.160
[root@elk-node1 ~]# systemctl start redis
[root@elk-node1 ~]# lsof -i:6379
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
redis-ser 19474 redis 4u IPv4 1344465 0t0 TCP elk-node1:6379 (LISTEN)
[root@elk-node1 ~]# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
redis_version:2.8.19
…….

2)编写从Client端收集数据的文件

1

2

3

4

5

6

7

8

9

10

11

12

13

14

[root@elk-node1 ~]# vim redis-out.conf

input {

   stdin {}

}

output {

   redis {

      host => "192.168.1.160"

      port => "6379"

      db => "6"

      data_type => "list"

      key => "demo"

   }

}

3)执行收集数据的文件,并输入数据hello redis 

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf
Settings: Default filter workers: 1
Logstash startup completed             #下面输入数据hello redis
hello redis

4)在redis中查看数据

[root@elk-node1 ~]# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
…….
…….
# Keyspace
db6:keys=1,expires=0,avg_ttl=0                   #在最下面一行,显示是db6
192.168.1.160:6379> select 6
OK
192.168.1.160:6379[6]> keys *

  1. “demo”
    192.168.1.160:6379[6]> LINDEX demo -1
    “{\“message\“:\“hello redis\“,\“@version\“:\“1\“,\“@timestamp\“:\“2016-11-14T08:04:25.981Z\“,\“host\“:\“elk-node1\“}”

5)继续随便写点数据

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-out.conf

Settings: Default filter workers: 1

Logstash startup completed

hello redis

123456

asdf

ert

wang

shi

bo

guohuihui

as

we

r

g

asdfjkdfsak

5423wer

34rt3

6y

7uj

u

io9

sdjfhsdk890

huanqiu

huanqiuchain

hqsb

asda   

6)在redis中查看

在redis中查看长度:
[root@elk-node1 ~]# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
redis_version:2.8.19
…….
…….
# Keyspace
db6:keys=1,expires=0,avg_ttl=0      #显示是db6
192.168.1.160:6379> select 6
OK
192.168.1.160:6379[6]> keys *

  1. “demo”
    192.168.1.160:6379[6]> LLEN demo
    (integer) 24

7)将redis中的内容写到ES中

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

[root@elk-node1 ~]# vim redis-in.conf

input {

    redis {

      host => "192.168.1.160"

      port => "6379"

      db => "6"

      data_type => "list"

      key => "demo"

   }

}

output {

    elasticsearch {

      hosts => ["192.168.1.160:9200"]

      index => "redis-in-%{+YYYY.MM.dd}"

    }

}

执行:
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-in.conf –configtest
Configuration OK
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f redis-in.conf &

在redis中查看,发现数据已被读出:
192.168.1.160:6379[6]> LLEN demo
(integer) 0

=============================温馨提示===========================

1

2

3

4

5

6

redis默认只有16个数据库, 也就是说最多只能有16个db, 即db01-db15

但是key值可以设置不同, 也就是针对不同日志的key前缀可以设置不同.

比如:

key => "nginx.log"的值最多可以设置16个db, 即db01-db15

key => "mysql.log"的值最多可以设置16个db, 即db01-db15

key => "tomcat.log"的值最多可以设置16个db, 即db01-db15

登陆elasticsearch界面查看:

8)接着,将收集到的所有日志写入到redis中。这了重新定义一个添加redis缓存后的总文件shipper.conf。(可以将之前执行的总文件file.conf停掉)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

[root@elk-node1 ~]# vim shipper.conf

input {

    file {

      path => "/var/log/messages"

      type => "system"

      start_position => "beginning"

    }

    file {

       path => "/var/log/elasticsearch/huanqiu.log"

       type => "es-error"

       start_position => "beginning"

       codec => multiline {

           pattern => "^\["

           negate => true

           what => "previous"

       }

    }

    file {

       path => "/var/log/nginx/access_json.log"

       codec => json

       start_position => "beginning"

       type => "nginx-log"

    }

    syslog {

        type => "system-syslog"

        host => "192.168.1.160"

        port => "514"

    }

}

output {

   if [type] == "system"{

     redis {

        host => "192.168.1.160"

        port => "6379"

        db => "6"

        data_type => "list"

        key => "system"

     }

   }

    if [type] == "es-error"{

      redis {

        host => "192.168.1.160"

        port => "6379"

        db => "6"

        data_type => "list"

        key => "demo"

        }

     }

    if [type] == "nginx-log"{   

       redis {

          host => "192.168.1.160"

          port => "6379"

          db => "6"

          data_type => "list"

          key => "nginx-log"

       }

    }

    if [type] == "system-syslog"{

       redis {

          host => "192.168.1.160"

          port => "6379"

          db => "6"

          data_type => "list"

          key => "system-syslog"

       }   

     }

}

执行上面的文件(提前将上面之前启动的file.conf文件的执行给结束掉!)
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f shipper.conf –configtest
Configuration OK
[root@elk-node1 ~]# /opt/logstash/bin/logstash -f shipper.conf
Settings: Default filter workers: 1
Logstash startup completed

在redis中查看:
[root@elk-node1 ~]# redis-cli -h 192.168.1.160
192.168.1.160:6379> info
# Server
redis_version:2.8.19
…….
…….
# Keyspace
db6:keys=1,expires=0,avg_ttl=0                      #显示是db6
192.168.1.160:6379> select 6
OK
192.168.1.160:6379[6]> keys *

  1. “demo”
  2. “system”
    192.168.1.160:6379[6]> keys *
  3. “nginx-log”
  4. “demo”
  5. “system”

另开一个窗口,添加点日志:
[root@elk-node1 ~]# logger “12325423”
[root@elk-node1 ~]# logger “12325423”
[root@elk-node1 ~]# logger “12325423”
[root@elk-node1 ~]# logger “12325423”
[root@elk-node1 ~]# logger “12325423”
[root@elk-node1 ~]# logger “12325423”

又会增加日志:
192.168.1.160:6379[6]> keys *

  1. “system-syslog”
  2. “nginx-log”
  3. “demo”
  4. “system”

其实可以在任意的一台ES中将数据从redis读取到ES中。
下面咱们在elk-node2节点,将数据从redis读取到ES中:

编写文件:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

[root@elk-node2 ~]# cat file.conf

input {

     redis {

        type => "system"

        host => "192.168.1.160"

        port => "6379"

        db => "6"

        data_type => "list"

        key => "system"

     }

      redis {

        type => "es-error"

        host => "192.168.1.160"

        port => "6379"

        db => "6"

        data_type => "list"

        key => "es-error"

        }

       redis {

          type => "nginx-log"

          host => "192.168.1.160"

          port => "6379"

          db => "6"

          data_type => "list"

          key => "nginx-log"

       }

       redis {

          type => "system-syslog"

          host => "192.168.1.160"

          port => "6379"

          db => "6"

          data_type => "list"

          key => "system-syslog"

       }   

}

output {

    if [type] == "system"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "system-%{+YYYY.MM.dd}"

        }

    }

    if [type] == "es-error"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "es-error-%{+YYYY.MM.dd}"

        }

    }

    if [type] == "nginx-log"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "nignx-log-%{+YYYY.MM.dd}"

        }

    }

    if [type] == "system-syslog"{

        elasticsearch {

           hosts => ["192.168.1.160:9200"]

           index => "system-syslog-%{+YYYY.MM.dd}"

        }

    }

}

执行:
[root@elk-node2 ~]# /opt/logstash/bin/logstash -f file.conf –configtest
Configuration OK
[root@elk-node2 ~]# /opt/logstash/bin/logstash -f file.conf &

去redis中检查,发现数据已经被读出到elasticsearch中了。
192.168.1.160:6379[6]> keys *
(empty list or set)

同时登陆logstash和kibana看,发现可以正常收集到日志了。

可以执行这个 去查看nginx日志
[root@elk-node1 ~]# ab -n10000 -c1 http://192.168.1.160/

也可以启动多个redis写到ES中,具体根据自己的实际情况而定。

==============logstash配置java环境===============
由于新版的ELK环境要求java1.8,但是有些服务器由于业务代码自身限制只能用java6或java7。
这种情况下,要安装Logstash,就只能单独配置Logstas自己使用的java环境了。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

操作如下:

0) 使用rpm包安装logstash

1)安装java8,参考:http://www.cnblogs.com/kevingrace/p/7607442.html

2)在/etc/sysconfig/logstash文件结尾添加下面两行内容:

[root@cx-app01 ~]

.......

JAVA_CMD=/usr/local/jdk1.8.0_172/bin

JAVA_HOME=/usr/local/jdk1.8.0_172

3)在/opt/logstash/bin/logstash.lib.sh文件添加下面一行内容:

[root@cx-app02 ~]

.......

export JAVA_HOME=/usr/local/jdk1.8.0_172

4) 然后使用logstash收集日志,就不会报java环境错误了。

==================配置范例===================

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

如下的配置范例:

192.168.10.44为elk的master节点,同时也是redis节点

[root@client-node01 opt]

/opt

[root@client-node01 opt]

input {

    file {

       path => "/usr/local/tomcat8/logs/catalina.out"

       type => "tomcat8-logs"

       start_position => "beginning"

       codec => multiline {

           pattern => "^\["           //表示收集以"["开头的日志信息

           negate => true

           what => "previous"

       }

    }

}

output {

    if [type] == "tomcat8-logs"{

       redis {

          host => "192.168.10.44"

          port => "6379"

          db => "1"

          data_type => "list"

          key => "tomcat8-logs"

       }

     }

}

[root@client-node01 opt]

input {

  file {

        path => "/var/log/messages"

        type => "systemlog"

        start_position => "beginning"

        stat_interval => "2"

  }

}

output {

  if [type] == "systemlog" {

        redis {

                data_type => "list"

                host => "192.168.10.44"

                db => "2"

                port => "6379"

                key => "systemlog"

        }

  }

}

[root@client-node01 opt]

input {

     redis {

        type => "tomcat8-logs"

        host => "192.168.10.44"

        port => "6379"

        db => "1"

        data_type => "list"

        key => "tomcat8-logs"

     }

       redis {

          type => "systemlog"

          host => "192.168.10.44"

          port => "6379"

          db => "2"

          data_type => "list"

          key => "systemlog"

       }

}

output {

    if [type] == "tomcat8-logs"{

        elasticsearch {

           hosts => ["192.168.10.44:9200"]

           index => "elk-node2-tomcat8-logs-%{+YYYY.MM.dd}"

        }

    }

    if [type] == "systemlog"{

        elasticsearch {

           hosts => ["192.168.10.44:9200"]

           index => "elk-node2-systemlog-%{+YYYY.MM.dd}"

        }

    }

}

[root@client-node01 opt]

Configuration OK

[root@client-node01 opt]

Configuration OK

[root@client-node01 opt]

Configuration OK

启动logstash

[root@client-node01 opt]

[root@client-node01 opt]

[root@client-node01 opt]

这时候,当/usr/local/tomcat8/logs/catalina.out和/var/log/messages文件里有新日志信息写入时,就会触发动作,

在redis里就能查看到相关信息,并查看写入到es里。

=========================================================================================================

温馨提示:

当客户机的日志信息收集后,经过redis刚读到es数据库里后,如果没有新数据写入,则默认在es的访问界面里是看不到

数据的,只有当日志文件里有新的日志写入后才会触发数据展示的动作,即es的访问界面(http://192.168.10.44:9200/_plugin/head/

里才能看到日志数据的展示效果。

==========================================================================================================

假设想上面两个文件里写入测试数据

[root@client-node01 opt]

[root@client-node01 opt]

到redis里发现有相关的key,很快就会读到es里。可以配置到kibana里观察。

可以先测试下日志信息是否写到redis里?然后再测试下数据是否从redis读到es里?一步步确定数据去向。

注意上面redis-in.conf文件中的下面设置,使用正则匹配,收集以哪些字符开头的日志信息:

pattern => "^\["                    表示收集以"["开头的日志信息

pattern => "^2018"                  表示收集以"2018"开头的日志信息

pattern => "^[a-zA-Z0-9]"           表示收集以字母(大小写)或数字开头的日志信息

pattern => "^[a-zA-Z0-9]|[^ ]+"     表示收集以字母(大小写)或数字或空格的日志信息

EF使用Fluent API配置映射关系 - 心存善念 - 博客园

Excerpt

定义一个继承自EntityTypeConfiguration<>泛型类的类来定义domain中每个类的数据库配置,在这个自定义类的构造函数中使用我们上次提到的那些方法配置数据库的映射。 映射实例 待读:http://www.aizhengli.com/entity-framework6-


目录导航

  定义一个继承自EntityTypeConfiguration<>泛型类的类来定义domain中每个类的数据库配置,在这个自定义类的构造函数中使用我们上次提到的那些方法配置数据库的映射。

映射实例

复制代码

1
2
3
4
5
6
7
8
9
10
11
12
<span>this</span>.HasRequired(s =&gt; s.Company).WithMany().HasForeignKey(s =&gt;<span> s.CompanyId);
</span><span>this</span>.HasOptional(s =&gt; s.User).WithMany().HasForeignKey(s =&gt;<span> s.UserId);
</span><span>this</span>.HasRequired(s =&gt; s.User).WithOptional(s =&gt;<span> s.WXUser);
</span><span>this</span>.Property(s =&gt;<span> s.Id).HasDatabaseGeneratedOption(System.ComponentModel.DataAnnotations.Schema.DatabaseGeneratedOption.None);
</span><span>this</span>.HasRequired(p =&gt; p.User).WithOptional(p =&gt;<span> p.UserDepartment);
</span><span>this</span>.HasMany(s =&gt;<span> s.Tags);
</span><span>this</span>.HasMany(s =&gt; s.CompanyAdminUsers).WithRequired().HasForeignKey(s =&gt;<span> s.CompanyId);
HasMany(s </span>=&gt; s.Images).WithOptional().HasForeignKey(s =&gt;<span> s.FeedbackId);
modelBuilder.Entity</span>&lt;Course&gt;<span>()
.HasRequired(c </span>=&gt;<span> c.Department)
.WithMany(t </span>=&gt;<span> t.Courses)
.Map(m </span>=&gt; m.MapKey(<span>"</span><span>ChangedDepartmentID</span><span>"</span>));

复制代码

待读:http://www.aizhengli.com/entity-framework6-code-first-get-started/95/entity-framework-code-first--fluent-api.html

实体类关系的映射

实体类映射中,关系的映射配置在关系的两端都可以配置。例如,用户信息与登录信息的 一对多 关系可以在用户信息端配置:

HasMany(m => m.LoginLogs).WithRequired(n => n.Member);

等效于在登录日志信息端配置:

HasRequired(m => m.Member).WithMany(n => n.LoginLogs);

但是,如果所有的关系映射都在作为主体的用户信息端进行配置,势必造成用户信息端配置的臃肿与职责不明。所以,为了保持各个实体类型的职责单一,实体关系推荐在关系的非主体端进行映射。

用户信息映射类,用户信息是关系的主体,所有的关系都不在此映射类中进行配置

1
2
3
4
5
6
namespace GMF.Demo.Core.Data.Configurations
{
public class MemberConfiguration : EntityTypeConfiguration&lt;Member&gt;
{
}
}

用户扩展信息映射类,配置用户扩展信息与用户信息的 0:1 关系

复制代码

1
2
3
4
5
6
7
8
9
10
namespace GMF.Demo.Core.Data.Configurations
{
public class MemberExtendConfiguration : EntityTypeConfiguration&lt;MemberExtend&gt;
{
public MemberExtendConfiguration()
{
HasRequired(m =&gt; m.Member).WithOptional(n =&gt; n.Extend);
}
}
}

复制代码

OnModelCreating配置

  1. ToTable - TableAttribute:配置此实体类型映射到的表名
  2. HasColumnName - ColumnAttribute:配置用于存储属性的数据库列的名称
  3. HasForeignKey - ForeignKeyAttribute:将关系配置为使用在对象模型中的外键属性。如果未在对象模型中公开外键属性,则使用Map方法
  4. Ignore - NotMappedAttribute:从模型中排队某个属性,使该属性不会映射到数据库
  5. HasRequired:通过此实体类型配置必需关系。除非指定此关系,否则实体类型的实例将无法保存到数据库。数据库中的外键不可为null。
  6. HasOptional:从此实体类型配置可选关系。实体类型的实例将能保存到数据库,而无需指定此关系。数据库中的外键可为null。
  7. HasMany:从此实体类型配置一对多关系。
  8. WithOptional:将关系配置为required:optional。(required:0…1端的1,表示必需,不可为null;optional:0…1端的0,表示可选,可为null。下同)
  9. WithOptionalDependent:将关系配置为optional:optional。要配置的实体类型将成为依赖对象,且包含主体的外键。作为关系目标的实体类型将成为关系中的主体。
  10. WithOptionalPrincipal:将关系配置为optional:optional。要配置的实体类型将成为关系中的主体。作为关系目标的实体类型将成为依赖对象,且包含主体的外键。
  11. WithRequired:将关系的指定端配置为必需的,且在关系的另一端有导航属性。
  12. WithRequiredDependent:将关系配置为required:required。要配置的实体类型将成为依赖对象,且包含主体的外键。作为关系目标的实体类型将成为关系中的主体。
  13. WithRequiredPrincipal:将关系配置为required:required。要配置的实体类型将成为关系中的实体。作为关系目标的实体类型将成为依赖对象,且包含主体的外键。
  14. WillCascadeOnDelete:配置是否对关系启用级联删除。
  15. Map:将关系配置为使用未在对象模型中公开的外键属性。可通过指定配置操作来自定义列和表。如果指定了空的配置操作,则约定将生成列名。如果在对象模型中公开了外键属性,则使用 HasForeignKey 方法。并非所有关系都支持在对象模型中公开外键属性。
  16. MapKey:配置外键的列名。
  17. ToTable:配置外键列所在表的名称和架构。

属性映射

主要配置:主键、数值长度、配置为必须、不映射,外键等

配置主键:

1
modelBuilder.Entity&lt;ClassA&gt;().HasKey(t =&gt; t.ID);    //配置ClassA的ID属性为主键

配置联合主键:

1
modelBuilder.Entity&lt;ClassA&gt;().HasKey(t =&gt; new { t.ID, t.Name });    //配置ClassA的ID和Name为主键

设置数据非数据库生成:

1
modelBuilder.Entity&lt;ClassA&gt;().Property(t =&gt; t.Id).HasDatabaseGeneratedOption(DatabaseGeneratedOption.None);    //ClassA的Id属性不用数据库控制生成

设置字段最大长度:

1
modelBuilder.Entity&lt;ClassA&gt;().Property(t =&gt; t.Name).HasMaxLength(100);     //设置ClassA类的Name属性的最大长度为100,如果值长度100,会抛出 DbEntityValidationException异常

设置字段为必需:

1
modelBuilder.Entity&lt;ClassA&gt;().Property(t =&gt;t.Id).IsRequired();   //设置ClassA类的Id属性为必需   

属性不映射到数据库:

1
modelBuilder.Entity&lt;ClassA&gt;().Ignore(t =&gt; t.A);    //调过ClassA类的A属性,让之不映射到数据库中

将属性映射到数据库中特定列名:

1
2
3
modelBuilder.Entity&lt;ClassA&gt;() 
.Property(t =&gt; t.A)
.HasColumnName("A_a"); //将类ClassA的属性A映射到数据库中对应列名A_a

类中不指定外键,但在数据库中指定外键名:

1
2
3
4
modelBuilder.Entity&lt;Staff&gt;() 
.HasRequired(c =&gt; c.Department)
.WithMany(t =&gt; t.Staffs)
.Map(m =&gt; m.MapKey("DepartmentID")); //指定员工表中DepartmentID为Staff到Department的外键

指定属性映射的字段为Unicode类型:

1
2
3
modelBuilder.Entity&lt;ClassA&gt;() 
.Property(t =&gt; t.Name)
.IsUnicode(true);

设置属性映射的列的类型:

1
2
3
modelBuilder.Entity&lt;Department&gt;() 
.Property(p =&gt; p.Name)
.HasColumnType("varchar"); //设置列为varchar类型

设置复杂类型的属性(何为复杂类型? 没指定主键的类型):

1
2
3
modelBuilder.ComplexType&lt;Details&gt;() 
.Property(t =&gt; t.Location)
.HasMaxLength(20);
1
2
3
modelBuilder.Entity&lt;OnsiteCourse&gt;() 
.Property(t =&gt; t.Details.Location)
.HasMaxLength(20);

显示设定为复杂类型:

1
modelBuilder.ComplexType&lt;ClassA&gt;();

将属性配置为用作乐观并发令牌:

方法1、用 ConcurrencyCheck 特性或 IsConcurrencyToken 方法

1
2
3
modelBuilder.Entity&lt;OfficeAssignment&gt;() 
.Property(t =&gt; t.Timestamp)
.IsConcurrencyToken();

方法2、IsRowVersion 

1
2
3
modelBuilder.Entity&lt;OfficeAssignment&gt;() 
.Property(t =&gt; t.Timestamp)
.IsRowVersion();

忽略类型,不映射到数据库中:

1
modelBuilder.Ignore&lt;OnlineCourse&gt;();

设置索引

您必须添加引用 ︰**using System.Data.Entity.Infrastructure.Annotations;**

基本例子

在这里是一种简单的用法,加上 User.FirstName属性的索引

1
2
3
4
<span>modelBuilder 
.Entity</span>&lt;User&gt;<span>()
.Property(t </span>=&gt;<span> t.FirstName)
.HasColumnAnnotation(IndexAnnotation.AnnotationName, </span><span>new</span> IndexAnnotation(<span>new</span> IndexAttribute()));

实例 ︰

在这里是一个更现实的例子。它对多个属性添加一个 唯一索引 ︰ User.FirstName和 User.LastName,与索引名称”IX_FIrstNameLastName”

复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<span>modelBuilder 
.Entity</span>&lt;User&gt;<span>()
.Property(t </span>=&gt;<span> t.FirstName)
.IsRequired()
.HasMaxLength(</span><span>60</span><span>)
.HasColumnAnnotation(
IndexAnnotation.AnnotationName,
</span><span>new</span><span> IndexAnnotation(
</span><span>new</span> IndexAttribute(<span>"</span><span>IX_FirstNameLastName</span><span>"</span>, <span>1</span>) { IsUnique = <span>true</span><span> }));

modelBuilder
.Entity</span>&lt;User&gt;<span>()
.Property(t </span>=&gt;<span> t.LastName)
.IsRequired()
.HasMaxLength(</span><span>60</span><span>)
.HasColumnAnnotation(
IndexAnnotation.AnnotationName,
</span><span>new</span><span> IndexAnnotation(
</span><span>new</span> IndexAttribute(<span>"</span><span>IX_FirstNameLastName</span><span>"</span>, <span>2</span>) { IsUnique = <span>true</span> }));

复制代码

数据库模型发生改变的处理

暴力处理:直接删除掉后重新生成

复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
namespace Portal
{
public class PortalContext : DbContext
{
static PortalContext()
{
Database.SetInitializer(new DropCreateDatabaseIfModelChanges&lt;PortalContext&gt;());
}

public DbSet&lt;Province&gt; Provinces { get; set; }
public DbSet&lt;Category&gt; Categories { get; set; }

protected override void OnModelCreating(DbModelBuilder modelBuilder)
{
modelBuilder.Configurations.Add(new ProvinceMap());
modelBuilder.Configurations.Add(new CategoryMap());
}
}
}

复制代码

1
<strong>DropCreateDatabaseIfModelChanges&lt;PortalContext&gt;()太暴力了</strong>

Code First数据库迁移

改变原来类的结构后数据库将发生错误提示

1.第一次建立数据库迁移通过nugget来进行编辑

Package Manager Console-》Enable-Migrations -StartUpProjectName CodeFirst-》执行“Add-Migration FirstMigration”命令-》

执行“Update-Database”命令,更新数据库架构

你的项目中将自动生成一个名为”Migrations“的文件夹,里面包含两个文件: Configuration.cs和201308211510117_InitialCreate.cs(201308211510117是时间戳)。

Configuration.cs:是迁移配置代码,一般我们不需要修改。

复制代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
namespace CodeFirst.Migrations
{
using System;
using System.Data.Entity;
using System.Data.Entity.Migrations;
using System.Linq;

internal sealed class Configuration : DbMigrationsConfiguration&lt;CodeFirst.OrderContext&gt;
{
public Configuration()
{
AutomaticMigrationsEnabled = true;
}

protected override void Seed(CodeFirst.OrderContext context)
{
// This method will be called after migrating to the latest version.

// You can use the DbSet&lt;T&gt;.AddOrUpdate() helper extension method
// to avoid creating duplicate seed data. E.g.
//
// context.People.AddOrUpdate(
// p =&gt; p.FullName,
// new Person { FullName = "Andrew Peters" },
// new Person { FullName = "Brice Lambson" },
// new Person { FullName = "Rowan Miller" }
// );
//
}
}

复制代码

201308211510117_InitialCreate.cs:以代码的形式记录了本地数据库的表结构定义。

设置自动迁移

有以下两个参数可以对自动迁移进行设置: 1. AutomaticMigrationsEnabled:获取或设置 指示迁移数据库时是否可使用自动迁移的值。 2. AutomaticMigrationDataLossAllowed:获取或设置 指示是否可接受自动迁移期间的数据丢失的值。如果设置为false,则将在数据丢失可能作为自动迁移一部分出现时引发异常。

原文:

http://www.mamicode.com/info-detail-872834.html

http://www.cnblogs.com/liupeng/p/4797046.html

http://www.guanggua.com/question/21573550-setting-unique-constraint-with-fluent-api.html

https://msdn.microsoft.com/en-us/data/jj591617.aspx#PropertyIndex

 http://www.cnblogs.com/lyq2012/p/6183895.html

作者:心存善念
本文地址:https://www.cnblogs.com/xcsn/p/6883162.html
欢迎转载,请在明显位置给出出处及链接。

前言

在原生的EF框架中,针对批量数据操作的接口有限,EF扩展框架弥补了EF在批量操作时的接口,这些批量操作包括:批量修改、批量查询、批量删除和数据缓存,如果您想在EF中更方便的批量操作数据,这个扩展将对您来说很有用。

下载安装

这个框架支持通过NuGet包管理器进行安装,你可以在包管理器中搜索:EntityFramework.Extended,最简单的方法就是程序包管理控制台进行安装,安装命令如下:

PM > Install - Package EntityFramework.Extended

框架安装后,你需要在您的类中是引用如下命名空间:

using EntityFramework.Extensions;

批量更新与删除数据

在EF中默认提供的更新和删除操作,你必须首先将数据查询到内存中,这在有些时候,性能是非常差的,而通过EntityFramework.Extended你只需要通过LINQ生成表达式即可直接批量删除或更新,示例代码如下:

MyContext context = newMyContext();
context.Books.Where(b => b.Price >= 100).Delete();
context.SaveChanges();

以上示例演示批量删除图书价格大于等于100的所有记录

1

2

3

MyContext context = newMyContext();

context.Books.Where(b => b.Price >= 100).Update(b => newBook { Price = 88 });

context.SaveChanges();

以上示例演示将图书价格大于100所有记录的价格修改成88元

批量查询

这个扩展框架允许你将多个查询表达式包装在同一个连接进行查询,这样可以减少数据库连接数,从而提高查询性能,示例如下:

复制代码

MyContext context = newMyContext();

var books = context.Books.Where(b => b.Price >= 100).Future(); //生成第一个查询表达式
var accounts = context.Accounts.Where(a => a.Money <= 10).Future(); //生成第二个查询表达式

var bookList = books.ToList(); //在同一个数据库连接上查询上面两个表达式,只访问一次数据库

复制代码

在分页的时候,我们经常需要知道分页列表与总记录数,如果你用默认EF提供的方法进行查询,你需要访问两次数据库,在EF扩展框架中,您可以将获得列表与总记录数的查询包装在同一个数据库连接上进行,示例如下:

复制代码

复制代码

MyContext context = newMyContext();

var query = context.Books.Where(b => b.Price >= 100);
var query1 = query.Skip(pageIndex).Take(pageSize).Future();
var query2 = query.FutureCount();

var bookList = query1.ToList();
var bookTotalCount = query2.Value;

复制代码

复制代码

缓存查询结果

EF扩展框架允许缓存查询结果,用法示例如下:

MyContext context = newMyContext();

var books = context.Books.Where(b => b.Price >= 100).FromCache(); //不指定时间,使用默认的缓存时间
var books2= context.Books.Where(b => b.Price >= 100).FromCache(CachePolicy.WithDurationExpiration(TimeSpan.FromSeconds(300))); //将结果缓存300秒

当然,您也可以给缓存打上TAG标记,标记缓存的好处是,在以后的查询中可以设置指定的缓存过期或者获取指定TAG所缓存的数据,示例如下:

MyContext context = newMyContext();

var books = context.Books.Where(b => b.Price >= 100).FromCache(tags: new[] { “Books”,”100” });
CacheManager.Current.Expire(“Books”); //将标记为Books的缓存立即过期

值得注意的是,EF扩展框架默认使用MemoryCache系统内存进行缓存,如果您想第三方分布式缓存框架,只需要移除系统内存缓存,注入自己的缓存提供者即可,如下用Memcache缓存结果。

Locator.Current.Register(() => new MemcachedProvider());

以上就是EntityFramework.Extended扩展库的所有内容,来自零度分享。

http://www.xcode.me/book/entity-framework-extended