0%

官方文档(中文)

节点名称 节点IP 配置 系统版本
VIP 192.168.50.220 虚拟IP
k8s-master-221 192.168.50.221 4核 2G debian 11
k8s-master-222 192.168.50.222 4核 2G debian 11
k8s-master-223 192.168.50.223 4核 2G debian 11
k8s-node-224 192.168.50.224 4核 2G debian 11
k8s-node-225 192.168.50.225 4核 2G debian 11

主机配置

时间同步

1

配置 hostname

注意节名称不能重复

1
hostnamectl --static set-hostname k8s-master-221

配置防火墙

1
2
3
4
5
service iptables stop 

iptables -F

systemctl stop firewalld && systemctl disable firewalld

如果需要打开防火墙,执行以下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# master节点执行
ufw allow 6443/tcp
ufw allow 2379/tcp
ufw allow 2380/tcp
ufw allow 10250/tcp
ufw allow 10251/tcp
ufw allow 10252/tcp
ufw allow 10255/tcp
ufw reload

# worker节点执行
ufw allow 10250/tcp
ufw allow 30000:32767/tcp
ufw reload

关闭交换分区

1
2
swapoff -a
set -ri 's/.*swap.*/#&/' /etc/fstab

若需允许交换分区参考官方文档 交换分区的配置

配置hosts

1
2
3
4
5
6
7
cat >> /etc/hosts << EOF
192.168.50.221 k8s-master-221
192.168.50.222 k8s-master-222
192.168.50.223 k8s-master-223
192.168.50.224 k8s-worker-224
192.168.50.225 k8s-worker-225
EOF

开启 bridge 网桥过滤功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 桥接的ipv4流量转到iptables
cat << EOF | sudo tee /etc/modules-load.d/k8s.conf
overlay
br_netfilter
EOF

sudo modprobe overlay
sudo modprobe br_netfilter

# 设置所需的 sysctl 参数,参数在重新启动后保持不变
cat << EOF | sudo tee /etc/sysctl.d/k8s.conf
net.bridge.bridge-nf-call-iptables = 1 # 开启网桥模式(必须)
net.bridge.bridge-nf-call-ip6tables = 1 # 开启网桥模式(必须)
net.ipv4.ip_forward = 1 # 转发模式(默认开启)
vm.panic_on_oom = 0 # 开启OOM(默认开启)
vm.swappiness  = 0 # 禁止使用swap空间
vm.overcommit_memory = 1 # 不检查物理内存是否够用
EOF

# 应用 sysctl 参数而不重新启动
sudo sysctl --system

配置 IPVS

1
2
3
4
5
6
7
8
9
10
11
modprobe br_netfilter

cat > /etc/sysconfig/modules/ipvs.modules << EOF
#!/bin/bash
modprobe -- ip_vs
modprobe -- ip_vs_rr
modprobe -- ip_vs_wrr
modprobe -- ip_vs_sh
modprobe -- nf_conntrack_ipv
EOF

安装工具

安装 Containerd

1
2
3
4
5
6
7
# 安装
apt update
apt install -y containerd

# 导出默认配置
containerd config default | sudo tee /etc/containerd/config.toml >/dev/null 2>&1

设置cgroupdriversystemd,编辑 /etc/containerd/config.toml 文件,找到 [plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc.options] 部分,添加一行内容:SystemdCgroup = true

1
sed -i 's/SystemdCgroup \= false/SystemdCgroup \= true/g' /etc/containerd/config.toml

![[Kubernetes/Kubernetes搭建/Pasted image 20240920103157.png]]

重启containerd并设置开机启动

1
2
systemctl restart containerd
systemctl enable containerd

安装 keadm,kubelete,kubectl

1
2
3
4
5
6
# 添加安装源

# 安装
apt update
apt install -y kubelet kubeadm kubectl
apt-mark hold kubelet kubeadm kubectl

部署高可用(仅 master 节点)

安装

1
apt install keepalived haproxy

修改haproxy配置

/etc/haproxy/haproxy.cfg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
global
maxconn 2000
ulimit-n 16384
log 127.0.0.1 local0 err
stats timeout 30s

defaults
log global
mode http
option httplog
timeout connect 5000
timeout client 50000
timeout server 50000
timeout http-request 15s
timeout http-keep-alive 15s

frontend monitor-in
bind *:33305
mode http
option httplog
monitor-uri /monitor

frontend k8s-master
bind 0.0.0.0:16443
bind 127.0.0.1:16443
mode tcp
option tcplog
tcp-request inspect-delay 5s
default_backend k8s-master

backend k8s-master
mode tcp
option tcplog
option tcp-check
balance roundrobin
default-server inter 10s downinter 5s rise 2 fall 2 slowstart 60s maxconn 250 maxqueue 256 weight 100
server k8s-master1 172.16.12.111:6443 check
server k8s-master2 172.16.12.112:6443 check
server k8s-master3 172.16.12.113:6443 check

配置 keepalived

interface # 网卡名称
mcast_src_ip # 节点ip
virtual_ipaddress # vip地址

k8s-master-221配置文件/etc/keepalived/keepalived.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
! Configuration File for keepalived
global_defs {
router_id LVS_DEVEL
script_user root
enable_script_security
}
vrrp_script chk_apiserver {
script "/etc/keepalived/check_apiserver.sh" #健康检查脚本
interval 5
weight -5
fall 2
rise 1
}
vrrp_instance VI_1 {
state MASTER #高可用主1
interface eth0 #网卡名称
mcast_src_ip 192.168.50.221 #该节点 IP
virtual_router_id 51
priority 100 #设置最高级优先级
advert_int 2
authentication {
auth_type PASS
auth_pass K8SHA_KA_AUTH
}
virtual_ipaddress {
192.168.50.220 #vip地址
}
track_script {
chk_apiserver
}
}

k8s-master-222配置文件/etc/keepalived/keepalived.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
! Configuration File for keepalived
global_defs {
router_id LVS_DEVEL
script_user root
enable_script_security
}
vrrp_script chk_apiserver {
script "/etc/keepalived/check_apiserver.sh"
interval 5
weight -5
fall 2
rise 1
}
vrrp_instance VI_1 {
state BACKUP #高可用 从1
interface ens33 #网卡名称
mcast_src_ip 192.168.50.222 #该节点 IP
virtual_router_id 51
priority 50 #设置优先级
advert_int 2
authentication {
auth_type PASS
auth_pass K8SHA_KA_AUTH
}
virtual_ipaddress {
192.168.50.220 #vip地址
}
track_script {
chk_apiserver
}
}

k8s-master-222配置文件/etc/keepalived/keepalived.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
! Configuration File for keepalived
global_defs {
router_id LVS_DEVEL
script_user root
enable_script_security
}
vrrp_script chk_apiserver {
script "/etc/keepalived/check_apiserver.sh"
interval 5
weight -5
fall 2
rise 1
}
vrrp_instance VI_1 {
state BACKUP #高可用从2
interface ens33 #网卡名称
mcast_src_ip 192.168.50.223 #该节点 IP
virtual_router_id 51
priority 49 #设置优先级
advert_int 2
authentication {
auth_type PASS
auth_pass K8SHA_KA_AUTH
}
virtual_ipaddress {
192.168.50.220 #vip地址
}
track_script {
chk_apiserver
}
}

健康检查脚本 /etc/keepalived/check_apiserver.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/bin/bash
err=0
for k in $(seq 1 3);do
check_code=$(pgrep haproxy)
if [[ $check_code == "" ]]; then
err=$(expr $err + 1)
sleep 1
continue
else
err=0
break
fi
done

if [[ $err != "0" ]]; then
echo "systemctl stop keepalived"
/usr/bin/systemctl stop keepalived
exit 1
else
exit 0
fi

给监测脚本添加执行权限

1
chmod +x /etc/keepalived/check_apiserver.sh

启动keepalive和haproxy

1
2
3
4
5
6
systemctl daemon-reload
# 启动并设置开机启动
# systemctl enable --now haproxy
systemctl start haproxy && systemctl enable haproxy
# systemctl enable --now keepalived
systemctl start keepalived && systemctl enbale keepalived

测试vip漂移

1
2
3
4
5
# 查看ip与vip
hostname -I

# 测试vip的16443端口是否通
nc -v 192.168.50.220 16443

初始化集群

拉取镜像

1
2
3
4
5
# 查看需要的镜像文件
kubeadm config images list

# 拉取镜像
kubeadm config images pull

master 节点初始化

1
2
3
4
5
# 导出默认初始化配置
kubeadm config print init-defaults > kubeadm-config.yaml

# token过期后生成信息token
kubeadm token create --print-join-command

master 节点加入集群

1
2
3
4
5
6
# master节点需要生成certificate-key
kubeadm init --control-plane-endpoint=192.168.50.220:16443

kubeadm join 192.168.50.220:16443 --token {token} \
--discovery-token-ca-cert-hash {} \
--control-plane --certificate-key {}

worker 节点加入集群

1
2
kubeadm join 192.168.50.220:16643 --token {token} \
--discovery-token-ca-cert-hash {}

从集群种移除节点

1
kubectl delete node {node-name}

配置环境变量,用于访问集群

1
2
3
4
5
cat << EOF >> ~/.bashrc
export KUBECONFIG=/etc/kubernetes/admin/conf
EOF

source ~/.bashrc

查看集群节点状态

1
2
3
4
5
6
# 查看节点状态
kubectl get nodes

# 查看系统组件
kubectl get all -n kube-system -o wide

安装网络组件(只在master-221节点操作)

Calico
Flannel

去除 master节点污点

如果你打算让Master节点也参与到平常的Pod调度(生产环境一般不会这样做,以保证master节点的稳定性),那么你需要使用以下命令将Master节点上的 taint(污点标记)解除

1
kubectl taint nodes --all node-role.kubernetes.io/master-

最后我们使用以下命令查看当前集群的状态,发现Scheduler和Controller Manager组件处理不健康状态:

1
kubectl get cs

解决上述问题需要将每个Master节点上的 /etc/kubernetes/manifests/kube-scheduler.yaml 和 /etc/kubernetes/manifests/kube-controller-manager.yaml 文件中的- –port=0注释掉,然后重启一下各Master节点上的kubelet即可.

测试集群

1
2
3
4
kubectl create deployment nginx --image nginx --replicas 2
kubectl expose deployment nginx --name nginx --type NodePort --port 80 --target-port 80 --node-port 8080

curl http://192.168.50.220:8080

参考
如何用 Kubeadm 在 Debian 11 上安装 Kubernetes 集群 | Linux 中国 - 知乎 (zhihu.com)
Kubernetes多主多从高可用集群部署 - 个人文章 - SegmentFault 思否
搭建多主节点k8s高可用集群(三主两从一VIP)_kubernetes部署多主多从集群-CSDN博客
github - 基于Ubuntu22.04部署KubeEdge-v1.18.0环境 - 云原生_KubeEdge - SegmentFault 思否

原文链接:Git应用详解第十讲:Git子库:submodule与subtree

一个中大型项目往往会依赖几个模块,git提供了子库的概念。可以将这些子模块存放在不同的仓库中,通过submodulesubtree实现仓库的嵌套。本讲为Git应用详解的倒数第二讲,胜利离我们不远了!

一、submodule

submodule:子模块的意思,表示将一个版本库作为子库引入到另一个版本库中:

1.引入子库

需要使用如下命令:

git submodule add 子库地址 保存目录

比如:

1
git submodule add git@github.com:AhuntSun/git_child.git mymodule

执行上述命令会将地址对应的远程仓库作为子库,保存到当前版本库的mymodule目录下:

img

随后查看当前版本库的状态:

image-20200329203048016

可以发现新增了两个文件。查看其中的.gitmodules文件:

image-20200329203507411

可以看到当前文件的路径和子模块的url,随后将这两个新增文件添加提交推送。在当前仓库git_parent对应的远程仓库中多出了两个文件:

image-20200329203746236

其中mymodule文件夹上的3bd7f76 对应的是子仓库git_child中的最新提交

image-20200329203905051

点击mymodule文件夹,会自动跳转到子仓库中:

image-20200329203957392

通过上述分析,可以得出结论:两个仓库已经关联起来了,并且仓库git_child为仓库git_parent的子仓库;

2.同步子库变化

当被依赖的子版本库发生变化时:在子版本库git_child中新增文件world.txt并提交到远程仓库:

image-20200329204252524

这个时候依赖它的父版本库git_parent要如何感知这一变化呢?

方法一

这个时候git_parent只需要进入存放子库git_child的目录mymodule,执行git pull就能将子版本库git_child的更新拉取到本地:

image-20200330102106961

方法二

当父版本库git_parent依赖的多个子版本库都发生变化时,可以采用如下方法遍历更新所有子库:首先回到版本库主目录,执行以下指令:

1
git submodule foreach git pull

该命令会遍历当前版本库所依赖的所有子版本库,并将它们的更新拉取到父版本库git_parent

image-20200330102642607

拉取完成后,查看状态,发现mymodule目录下文件发生了变化,所以需要执行一次添加、提交、推送操作:

image-20200330102914556

3.复制父版本库

如果将使用了submodule添加依赖了子库的父版本库git_parent,克隆一份到本地的话。在克隆出来的新版本库git_parent2中,原父版本库存放依赖子库的目录虽在,但是内容不在:

image-20200330103417911

进入根据git_parent复制出来的仓库git_parent2,会发现mymodule目录为空:

image-20200330103502848

解决方法:可采用多条命令的分步操作,也可以通过参数将多步操作进行合并。

分步操作

这是在执行了clone操作后的额外操作,还需要做两件事:

  • 手动初始化submodule

    1
    git submodule init
  • 手动拉取依赖的子版本库;:

    1
    git submodule update --recursive

image-20200330103803762

执行完两步操作后,子版本库中就有内容了。由此完成了git_parent的克隆;

合并操作

分步操作相对繁琐,还可以通过添加参数的方式,将多步操作进行合并。通过以下指令基于git_parent克隆一份git_parent3

1
git clone git@github.com:AhuntSun/git_parent.git git_parent3 --recursive

image-20200330104210732

--recursive表示递归地克隆git_parent依赖的所有子版本库。

4.删除子版本库

git没有提供直接删除submodule子库的命令,但是我们可以通过其他指令的组合来达到这一目的,分为三步:

  • submodule从版本库中删除:

    1
    git rm --cache mymodule

image-20200330105131697

git rm的作用为删除版本库中的文件,并将这一操作纳入暂存区;

  • submodule从工作区中删除;

image-20200330105226923

  • 最后将.gitmodules目录删除;

image-20200330105542069

完成三步操作后,再进行添加,提交,推送即可完成删除子库的操作:

image-20200330105614793

二、subtree

1.简介

subtreesubmodule的作用是一样的,但是subtree出现得比submodule晚,它的出现是为了弥补submodule存在的问题:

  • 第一:submodule不能在父版本库中修改子版本库的代码,只能在子版本库中修改,是单向的;
  • 第二:submodule没有直接删除子版本库的功能;

subtree则可以实现双向数据修改。官方推荐使用subtree替代submodule

2.创建子库

首先创建两个版本库:git_subtree_parentgit_subtree_child然后在git_subtree_parent中执行git subtree会列出该指令的一些常见的参数:

image-20200330112616987

3.建立关联

首先需要给git_subtree_parent添加一个子库git_subtree_child:

第一步:添加子库的远程地址:

1
git remote add subtree-origin git@github.com:AhuntSun/git_subtree_child.git

添加完成后,父版本库中就有两个远程地址了:

image-20200330113223780

这里的subtree-origin就代表了远程仓库git_subtree_child的地址。

第二步:建立依赖关系:

1
2
git subtree add --prefix=subtree subtree-origin master --squash
//其中的--prefix=subtree可以写成:--p subtree 或 --prefix subtree

该命令表示将远程地址为subtree-origin的,子版本库上master分支的,文件克隆到subtree目录下;

注意:是在某一分支(如master)上将subtree-origin代表的远程仓库的某一分支(如master)作为子库拉取到subtree文件夹中。可切换到其他分支重复上述操作,也就是说子库的实质就是子分支。

--squash是可选参数,它的含义是合并,压缩的意思。

  • 如果不增加这个参数,则会把远程的子库中指定的分支(这里是master)中的提交一个一个地拉取到本地再去创建一个合并提交;
  • 如果增加了这个参数,会将远程子库指定分支上的多次提交合并压缩成一次提交再拉取到本地,这样拉取到本地的,远程子库中的,指定分支上的,历史提交记录就没有了。

image-20200330114203889

拉取完成后,父版本库中会增添一个subtree目录,里面是子库的文件,相当于把依赖的子库代码拉取到了本地:

image-20200330114316257

此时查看一下父版本库的提交历史:
image-20200330114500554

会发现其中没有子库李四的提交信息,这是因为--squash参数将他的提交压缩为一次提交,并由父版本库张三进行合并和提交。所以父版本库多出了两次提交。

随后,我们在父版本库中进行一次推送:

image-20200330114730534

结果远程仓库中多出了一个存放子版本库文件的subtree目录,并且完全脱离了版本库git_subtree_child,仅仅是属于父版本库git_subtree_parent的一个目录。而不像使用submodule那样,是一个点击就会自动跳转到依赖子库的指针

  • subtree的远程父版本库:

image-20200330115004586

  • submodule的远程父版本库:

image-20200329203746236

submodulesubtree子库的区别为:

image-20200408224805624

4.同步子库变化

在子库中创建一个新文件world并推送到远程子库:
image-20200330115440136

在父库中通过如下指令更新依赖的子库内容:

1
git subtree pull --prefix=subtree subtree-origin master --squash

image-20200330115726052

此时查看一下提交历史:

image-20200330115755340

发现没有子库李四的提交信息,这都是--squash的作用。子库的修改交由父库来提交。

5.参数--squash

该参数的作用为:防止子库指定分支上的提交历史污染父版本库。比如在子库的master分支上进行了三次提交分别为:abc,并推送到远程子库。

首先,复习一下合并分支时遵循的三方合并原则:

image-20200408003842196

当提交46需要合并的时候,git会先寻找二者的公共父提交节点,如图中的2,然后在提交2的基础上进行246的三方合并,合并后得到提交7

父仓库执行pull操作时:如果添加参数--squash,就会把远程子库master分支上的这三次提交合并为一次新的提交abc;随后再与父仓库中子库的master分支进行合并,又产生一次提交X。整个pull的过程一共产生了五次提交,如下图所示:

image-20200420103912282

存在的问题:

由于--squash指令的合并操作,会导致远程master分支上的合并提交abc与本地master分支上的最新提交2,找不到公共父节点,从而合并失败。同时push操作也会出现额外的问题。

最佳实践:要么全部操作都使用--squash指令,要么全部操作都不使用该参数,这样就不会出错。

错误示范:

为了验证,重新创建两个仓库AB,并通过subtreeB设置为A的子库。这次全程都没有使用参数--squash,重复上述操作:

  • 首先,修改子库文件;
  • 然后,通过下列指令,在不使用参数--squash的情况下,将远程子库A变化的文件拉取到本地:
1
git subtree pull --prefix=subtree subtree-origin master

image-20200330141920474

此时查看提交历史:

image-20200330142000915

可以看到子库儿子的提交信息污染了父版本库的提交信息,验证了上述的结论。

所以要么都使用该指令,要么都不使用才能避免错误;如果不需要子库的提交日志,推荐使用--squash指令。

补充:echo 'new line' >> test.txt:表示在test.txt文件末尾追加文本new line;如果是一个>表示替换掉test.txt内的全部内容。

6.修改子库

subtree的强大之处在于,它可以在父版本库中修改依赖的子版本库。以下为演示:

进入父版本库存放子库的subtree目录,修改子库文件child.txt,并推送到远程父仓库:

image-20200330121429186

此时远程父版本库中存放子库文件的subtree目录发生了变化,但是独立的远程子库git_subtree_child并没有发生变化。

  • 修改独立的远程子库:

    可执行以下命令,同步地修改远程子版本库:

    1
    git subtree push --prefix=subtree subtree-origin master

    如下图所示,父库中的子库文件child.txt新增的child2内容,同步到了独立的远程子库中:

    image-20200330125911158

  • 修改独立的本地子库:

    回到本地子库git_subtree_child,将对应的远程子库进行的修改拉取到本地进行合并同步:

    image-20200330144044823

    由此无论是远程的还是本地的子库都被修改了。

实际上使用subtree后,在外部看起来父仓库和子仓库是一个整体的仓库。执行clone操作时,不会像submodule那样需要遍历子库来单独克隆。而是可以将整个父仓库和它所依赖的子库当做一个整体进行克隆。

存在的问题

父版本库拉取远程子库进行更新同步会出现的问题:

  • 子仓库第一次修改:

    经历了上述操作,本地子库与远程子库的文件达到了同步,其中文件child.txt的内容都是child~4。在此基础上本地子库为该文件添加child5~6

    image-20200330145702019

    然后推送到远程子库。

  • 父仓库第一次拉取:

    随后父版本库通过下述指令,拉取远程子库,与本地父仓库git_subtree_parent中的子库进行同步:

    1
    git subtree pull --p subtree subtree-origin master --squash

    结果出现了合并失败的情况:

    image-20200330145839093

    我们查看冲突产生的文件:

    image-20200330145922152

    发现父版本库中的子库与远程子库内容上并无冲突,但是却发生了冲突,这是为什么呢?

    探究冲突产生的原因之前我们先解决冲突,先删除多余的内容:

    image-20200330150141430

    随后执行git add命令和git commit命令标识解决了冲突:

    image-20200330150312944

    image-20200330150406317

    解决完冲突后将该文件推送到独立的远程子库,发现文件并没有发生更新,也就是说git认为我们并没有解决冲突:

    image-20200330150747452

  • 子仓库第二次修改与父仓库第二次拉取:

    再次修改本地子库的文件并推送到对应的远程仓库,父版本库再次将远程子库更新的文件拉取到本地进行同步:

    image-20200330151140092

    这次却成功了!为什么同样的操作,有的时候成功有的时候失败呢?

解决方案

原因出现在--squash指令中。实际上,--squash指令把子库中的提交信息合并了,导致父仓库在执行git pull操作时找不到公共的父节点,从而导致即使文件没有冲突的内容,也会出现合并冲突的情况。其实不使用--squash也会有这种问题,问题的根本原因仍然是三方合并时找不到公共父节点。我们打开gitk

image-20200330154944300

从图中不难看出,当使用subtree时,子库与父库之间是没有公共节点的,所以时常会因为找不到公共节点而出现合并冲突的情况,此时只需要解决冲突,手动合并即可。

不使用subtree时,普通的版本库中的各分支总会有一个公共节点:

image-20200330160206258

再次强调:使用--squash指令时一定要小心,要么都使用它,要么都不使用。

7.抽离子库

git subtree split

当开发过程中出现某些子库完全可以复用到其他项目中时,我们希望将它独立出来。

  • 方法一:可以手动将文件拷贝出来。缺点是,这样会丢失关于该子库的提交记录;

  • 方法二:

    使用

    1
    git subtree split

    指令,该指令会把关于独立出来的子库的每次提交都记录起来。但是,这样存在弊端:

    • 比如该独立子库为company.util,当一次提交同时修改了company.utilcompany.server两个子库时。
    • 通过上述命令独立出来的子库util只会记录对自身修改的提交,而不会记录对company.server的修改,这样在别人看来这次提交就只修改了util,这是不完整的。

来源:https://blog.guoqianfan.com/2019/11/24/timestamp-in-csharp/

什么是时间戳

时间戳默认是Unix时间戳

首先要清楚JavaScript与Unix的时间戳的区别:

JavaScript时间戳:是指格林威治时间1970年01月01日00时00分00秒(北京时间1970年01月01日08时00分00秒)起至现在的总毫秒数

Unix时间戳:是指格林威治时间1970年01月01日00时00分00秒(北京时间1970年01月01日08时00分00秒)起至现在的总秒数

可以看出JavaScript时间戳是总毫秒数,Unix时间戳是总秒数

比如同样是的 2016/11/03 12:30:00 ,转换为JavaScript时间戳为 1478147400000;转换为Unix时间戳为 1478147400。

从上面也可以看出时间戳与时区无关

Unix时间戳相互转换

C# DateTime转换为Unix时间戳

.NET 4.6新方法

只能在 .NET 4.6及更高版本里才能使用。

1
2
long timeStamp = DateTimeOffset.Now.ToUnixTimeSeconds(); 
Console.WriteLine(timeStamp);

通用的老方法

1
2
3
System.DateTime startTime = TimeZone.CurrentTimeZone.ToLocalTime(new System.DateTime(1970, 1, 1)); 
long timeStamp = (long)(DateTime.Now - startTime).TotalSeconds;
System.Console.WriteLine(timeStamp);

Unix时间戳转换为C# DateTime

.NET 4.6新方法

由时间戳转换的DateTimeOffset的时区默认是+00:00,此时我们需要转为本地时区,否则后续使用可能会有问题。

转为本地时区:DateTimeOffset.LocalDateTime

示例代码如下:

1
2
3
4
5
6

DateTimeOffset dto = DateTimeOffset.FromUnixTimeMilliseconds(1573696406184);

DateTime dt01 = dto.DateTime;

DateTime dt02 = dto.LocalDateTime;

通用的老方法

1
2
3
4
long unixTimeStamp = 1478162177;
System.DateTime startTime = TimeZone.CurrentTimeZone.ToLocalTime(new System.DateTime(1970, 1, 1));
DateTime dt = startTime.AddSeconds(unixTimeStamp);
System.Console.WriteLine(dt.ToString("yyyy/MM/dd HH:mm:ss:ffff"));

备注

DateTimeOffset使用Now还是UtcNow

对于DateTimeOffset,发现有2个获取当前时间的属性:DateTimeOffset.NowDateTimeOffset.UtcNow

如果只是获取时间戳,这2个使用哪个都可以,得到的值是一样的。

因为DateTimeOffset里面有时区信息,获取时间戳时会使用时区进行转换的,所以获得的时间戳一样。

而也是因为时区的原因,DateTimeOffset的其他操作可能会不一样。例如DateTimeOffset.DateTime就不一样,此时推荐使用DateTimeOffset.LocalDateTime来获得本地时区的时间。

测试代码如下:

1
2
3
4
5
6
7
8
9

Console.WriteLine("none:{0}", DateTimeOffset.Now);

Console.WriteLine("utc:{0}", DateTimeOffset.UtcNow);


Console.WriteLine("none:{0}", DateTimeOffset.Now.ToUnixTimeSeconds());

Console.WriteLine("utc:{0}", DateTimeOffset.UtcNow.ToUnixTimeSeconds());

DateTime转换为DateTimeOffset

可以直接把DateTime赋值给DateTimeOffset,内部会自动进行隐式转换。这里涉及到时区,请往下看。

DateTime的时区信息(Kind属性)

DateTime时区信息存放在Kind属性里。Kind属性的数据类型是DateTimeKind枚举,只有3个值:

  • Unspecified:未指定/未规定
  • UtcUTC时间
  • Local:本地时区

不同情况下得到的DateTimeKind是不同的,具体如下:

  • DateTime.NowDateTime.Kind是 **Local(本地时区)**。

  • DateTime.UtcNowDateTime.Kind是 **Utc**。

  • DateTime.Parse()

    • 默认】在未指定时区时,DateTime.KindUnspecified

    • 指定时区:指定时区后DateTime.Kind就是相对应的值。

      指定时区有2种方式:

      • 默认+优先待转换的字符串里有时区信息。例如:2019/11/24 17:40:32 +08:00
      • 使用DateTimeStyles参数来指定时区。DateTimeStyles是枚举类型,更多信息自己查看定义,这里不再多说。

LocalUtc都会把相应的时区传递过去。对于 Unspecified(未指定),会被当做本地时区来处理(结果已验证,源码没看懂)。

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

DateTime dtNow = DateTime.Now;

DateTime dtUtcNow = DateTime.UtcNow;

DateTime dtParse = DateTime.Parse("2019-11-24 17:40:13");


DateTimeOffset dtoNow = dtNow;

DateTimeOffset dtoUtcNow = dtUtcNow;

DateTimeOffset dtoParse = dtParse;

Console.WriteLine("DateTime:");
Console.WriteLine("dtNow:{0}(Kind:{1})", dtNow, dtNow.Kind);
Console.WriteLine("dtUtcNow:{0}(Kind:{1})", dtUtcNow, dtUtcNow.Kind);
Console.WriteLine("dtParse:{0}(Kind:{1})", dtParse, dtParse.Kind);

Console.WriteLine();

Console.WriteLine("DateTimeOffset:");
Console.WriteLine("dtoNow:{0}", dtoNow);
Console.WriteLine("dtoUtcNow:{0}", dtoUtcNow);
Console.WriteLine("dtoParse:{0}", dtoParse);

输出结果如下:

1
2
3
4
5
6
7
8
9
DateTime:
dtNow:2019/11/24 17:40:32(Kind:Local)
dtUtcNow:2019/11/24 9:40:32(Kind:Utc)
dtParse:2019/11/24 17:40:13(Kind:Unspecified)

DateTimeOffset:
dtoNow:2019/11/24 17:40:32 +08:00
dtoUtcNow:2019/11/24 9:40:32 +00:00
dtoParse:2019/11/24 17:40:13 +08:00

DateTimeOffset.Parse的默认时区

DateTimeOffset.Parse的默认时区是当前时区

1
2

Console.WriteLine("parse:{0}", DateTimeOffset.Parse("2019-6-14 15:38:49"));

参考

  1. C# DateTime与时间戳转换:https://www.cnblogs.com/polk6/p/6024892.html
  2. 如何将Unix时间戳转换为DateTime,反之亦然?:https://stackoverflow.com/questions/249760/how-can-i-convert-a-unix-timestamp-to-datetime-and-vice-versa
  3. DateTimeOffset源码:https://source.dot.net/#System.Private.CoreLib/DateTimeOffset.cs

DotNetty完全教程(一)

Excerpt

写本系列文章的目的我一直以来都在从事.NET相关的工作,做过工控,做过网站,工作初期维护过别人写的网络库,后来自己写网络库,我发现在使用C#编程的程序员中,能否写出高性能的网络库一直都是考验一个程序员能力的标杆。为了写出高性能的网络库,我查阅了很多资料,发现Java的Netty有着得天独厚的设计以及实现优势,Java也因为Netty的存在,在开发大吞吐量的应用程序中得心应手。我想,.NET程序…


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

写本系列文章的目的

我一直以来都在从事.NET相关的工作,做过工控,做过网站,工作初期维护过别人写的网络库,后来自己写网络库,我发现在使用C#编程的程序员中,能否写出高性能的网络库一直都是考验一个程序员能力的标杆。为了写出高性能的网络库,我查阅了很多资料,发现Java的Netty有着得天独厚的设计以及实现优势,Java也因为Netty的存在,在开发大吞吐量的应用程序中得心应手。

我想,.NET程序员为什么不能使用这么好的应用程序框架。好在,Azure团队写出了DotNetty,使得.NET程序员也可以迅速的,便捷的搭建一个高性能的网络应用程序,但是,DotNetty并没有多少资料,项目代码中也没有多少注释,这对我们的学习以及使用带来了极大的障碍。

我通过对于Netty的研究,一步步的使用DotNetty来创建应用程序,分析DotNetty实现了哪些,没有实现哪些,实现的有何不同,希望通过最简单的描述,让读者能够了解DotNetty,无论是在工作学习中快速搭建网络应用程序还是通过分析Netty的思想,为自己写的网络库添砖加瓦都是十分有意义的。

本系列文章参考了《Netty实战》,感兴趣的同学可以去看看这本书。

Netty是什么

Netty 是一款用于创建高性能网络应用程序的高级框架。

Netty 是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器
和客户端

DotNetty是什么

DotNetty是微软的Azure团队仿造Netty编写的网络应用程序框架。

优点

  1. 关注点分离——业务和网络逻辑解耦;
  2. 模块化和可复用性;
  3. 可测试性作为首要的要求

历史

  1. 阻塞Socket通信特点:
    1. 建立连接要阻塞线程,读取数据要阻塞线程
    2. 如果要管理多个客户端,就需要为每个客户端建立不同的线程
    3. 会有大量的线程在休眠状态,等待接收数据,资源浪费
    4. 每个线程都要占用系统资源
    5. 线程的切换很耗费系统资源
  2. 非阻塞Socket(NIO)特点:
    1. 如图,每个Socket如果需要读写操作,都通过事件通知的方式通知选择器,这样就实现了一个线程管理多个Socket的目的。
    2. 选择器甚至可以在所有的Socket空闲的时候允许线程先去干别的事情
    3. 减少了线程数量导致的资源占用,减少了线程切换导致的资源消耗
  3. Netty特点
    在这里插入图片描述

Netty设计的关键点

异步和事件驱动是Netty设计的关键

核心组件

  • Channel:一个连接就是一个Channel
  • 回调:通知的基础
1
2
3
4
5
6
7
8
9
10
11
12
public class ConnectHandler : SimpleChannelInboundHandler<string>
{
public override void ChannelActive(IChannelHandlerContext context)
{
// 新的连接建立的时候会触发这个回调
base.ChannelActive(context);
}
protected override void ChannelRead0(IChannelHandlerContext ctx, string msg)
{
throw new NotImplementedException();
}
}
  • Future:通知的另一种方式,可以认为ChannelFuture是包装了一系列Channel事件的对象。回调和Future相互补充,相互结合同时也可以理解Future是一种更加精细的回调。

    但是ChannelFuture在DotNetty中被Task取代

  • 事件和ChannelHandler
    ChannelHandler是事件处理器,负责处理入站事件和出站事件。通常每一个事件都由一系列的Handler处理。

本文参考资料以及截图来自《Netty实战》

DotNetty完全教程(七)

Excerpt

ChannelPipeline和ChannelHandleContext介绍ChannelPipeline是一系列ChannelHandler连接的实例链,这个实例链构成了应用程序逻辑处理的核心。下图反映了这种关联:ChannelHandlerContext提供了一个ChannelPipeline的上下文,用于ChannelHandler在Pipeline中的交互,这种交互十分的灵活,不仅…


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

ChannelPipeline和ChannelHandleContext

介绍

ChannelPipeline是一系列ChannelHandler连接的实例链,这个实例链构成了应用程序逻辑处理的核心。下图反映了这种关联:

ChannelHandlerContext提供了一个ChannelPipeline的上下文,用于ChannelHandler在Pipeline中的交互,这种交互十分的灵活,不仅是信息可以交互,甚至可以改变其他Handler在Pipeline中的位置。

特性

  1. 每一个Channel都会被分配到一个ChannelPipeline,这种关联是永久性的。在Netty中是关联,在DotNetty中这种关联被进一步的强绑定,变成了一个Channel中存在一个Pipeline。
  2. 对于Pipeline来说,入站口被当作Pipeline的头部,出站口被当作尾部。虽然我们看到有两条线,但是在Pipeline中其实是线性的,在事件传播的时候,如果Pipeline发现这个事件的属性(入站出站)跟下一个Handler不匹配,就会跳过这个Handler,前进到下一个。
  3. 一个Handler可以既作为入站处理器也作为出站处理器。
  4. 修改Pipeline

  5. 为了保证ChannelHandler处理事件的高效性,在Handler中不能有阻塞代码,但是如果遇到了一些阻塞API,就需要用到DefaultEventExecutorGroup,其功能是把这个事件的处理从原先的EventLoop中移除,送到一个专门的执行事件处理器中进行处理,从而不阻塞Pipeline。

ChanelPipeline的事件


我们可以看到fire方法都是调用下一个Handler中的方法,我们可以在合适的时机调用下一个Handler中的方法以实现数据的流动。

这里我们注意一下,Write方法并不会将消息写入Socket中,而是写入消息队列中,等待Flush将数据冲刷下去。

Context的API支持


Pipeline和Context

我们可以发现,Pipeline上也有fire–的方法,Context也有类似的方法,他们的差别在于,Pipeline或者Channel上的这些方法引发的事件流将从Pipeline的头部开始移动,而Context上的方法会让事件从当前Handler开始移动,所以为了更短的事件流,我们应该尽可能的使用Context的方法。

使用ChannelHandlerContext

  1. 获取当前Channel

    1
    2
    IChannelHandlerContext ctx = ...;
    IChannel channel = ctx.Channel
  2. 获取当前pipeline

    1
    2
    3
    4
    5
    6
    7
    // 注意一下在Netty中可以直接通过context获取pipeline,在DotNetty中需要从Channel中获取
    // Netty
    IChannelHandlerContext ctx = ...;
    IChannel channel = ctx.pipeline
    // DotNetty
    IChannel channel = ctx.Channel;
    IChannelPipeline pipeline = channel.Pipeline;
  3. 写入pipeline让事件从尾端开始移动

    1
    2
    3
    4
    IChannel channel = ctx.Channel;
    IChannelPipeline pipeline = channel.Pipeline;
    channel.WriteAndFlushAsync("Hello World!");
    pipeline.WriteAndFlushAsync("Hello World!");

注意,Write是出站事件,他的流动方向是从末尾到头部,这个一定要注意。在pipeline或者channel中写入事件,都是从最末尾开始流动,在Context中写入是从当前Handler中开始移动,这个我们已经在很多地方都说明了这样的不同。

应用

  1. 协议切换
    因为我们可以通过Context获取Pipeline的引用,获取了pipeline之后又可以动态的加载和删除Handler,利用这个特性我们可以实现协议的切换,
  2. 随时随地使用Context
    这里我们补充一个知识,Context和Handler的关系是一对一的,而不是一个Context对应多个Handler,这就让我们可以缓存下Context的引用,在任何时候进行使用,这里的任何时候可以是不同的线程。举个例子就是我们之前写的回声程序是在收到信息之后发送,但是复杂一点我们需要在按下按钮的时候发送一条数据,这时候我们可以在连接之后缓存Context的引用,在按下按钮的时候使用Ctx.Write();方法来发送一条数据。

线程安全

在Netty中,如果想要将一个Handler用于多个Pipeline中,需要标注Shared,同时需要保证线程安全,因为这里可能有多线程的重入问题。

异常处理

  1. 入站异常无论在何时引发,都会顺着Pipeline继续向下流动,如果最后的Handler没有处理,则会被标记为未处理。所以为了处理所有的入站异常,我们可以在pipeline的尾端通过复写ExceptionCaught来处理所有pipeline上的异常。
  2. 在出站Handler中获取异常在Netty中需要使用ChannelFuture以及ChannelPromise这里先不做叙述

DotNetty完全教程(三)

Excerpt

组件介绍ChannelChannel是Socket的封装,提供绑定,读,写等操作,降低了直接使用Socket的复杂性。EventLoop我们之前就讲过EventLoop这里回顾一下:一个 EventLoopGroup 包含一个或者多个 EventLoop;一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;所有由 EventLoop 处理的 I/O 事件都将在它…

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

组件介绍

Channel

Channel是Socket的封装,提供绑定,读,写等操作,降低了直接使用Socket的复杂性。

EventLoop

我们之前就讲过EventLoop这里回顾一下:

  1. 一个 EventLoopGroup 包含一个或者多个 EventLoop;
  2. 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定;
  3. 所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理;
  4. 一个 Channel 在它的生命周期内只注册于一个 EventLoop;
  5. 一个 EventLoop 可能会被分配给一个或多个 Channel。

ChannelFuture

本身是Channel中消息的回调,在DotNetty中被Task取代。

ChannelHandler

ChannelHandler是处理数据的逻辑容器

ChannelInboundHandler是接收并处理入站事件的逻辑容器,可以处理入站数据以及给客户端以回复。

ChannelPipeline

ChannelPipeline是将ChannelHandler穿成一串的的容器。

需要说明的是:

  1. ChannelInboundHandler只处理入站事件,ChannelOutboundHandler只处理出站事件
  2. ChannelInboundHandler和ChannelOutboundHandler可以注册在同一个ChannelPipeline中

(尝试一下)在 Netty 中,有两种发送消息的方式。你可以直接写到 Channel 中,也可以 写到和 ChannelHandler相关联的ChannelHandlerContext对象中。前一种方式将会导致消息从ChannelPipeline 的尾端开始流动,而后者将导致消息从 ChannelPipeline 中的下一个 ChannelHandler 开始流动。

编码器和解码器

Netty中内置了一些编码器和解码器,用来进行处理字节流数据,编码器用来将消息编码为字节流,解码器用来将字节流解码为另一种格式(字符串或一个对象)。

需要注意的是,编码器和解码器都实现了ChannelInboundHandler和 ChannelOutboundHandler接口用于处理入站或出站数据。

Bootstrap引导类

  1. Bootstrap用于引导客户端,ServerBootstrap用于引导服务器
  2. 客户端引导类只需要一个EventLoopGroup服务器引导类需要两个EventLoopGroup。但是在简单使用中,也可以公用一个EventLoopGroup。为什么服务器需要两个EventLoopGroup呢?是因为服务器的第一个EventLoopGroup只有一个EventLoop,只含有一个SeverChannel用于监听本地端口,一旦连接建立,这个EventLoop就将Channel控制权移交给另一个EventLoopGroup,这个EventLoopGroup分配一个EventLoop给Channel用于管理这个Channel。

DotNetty完全教程(九)

Excerpt

引导Bootstrap引导一个应用程序是指对他进行配置并且使他运行的过程。体系结构注意,DotNetty没有实现Cloneable的接口,而是直接实现了一个Clone方法。Netty实现这个接口是为了创建两个有着相同配置的应用程序,可以把一个配置整体应用到另一个上面,需要注意的是EventLoopGroup是一个浅拷贝,这就导致了拷贝的Bootstrap都会使用同一个EventLoopGr…


引导Bootstrap

引导一个应用程序是指对他进行配置并且使他运行的过程。

体系结构


注意,DotNetty没有实现Cloneable的接口,而是直接实现了一个Clone方法。Netty实现这个接口是为了创建两个有着相同配置的应用程序,可以把一个配置整体应用到另一个上面,需要注意的是EventLoopGroup是一个浅拷贝,这就导致了拷贝的Bootstrap都会使用同一个EventLoopGroup,这在每个Channel生命周期很短的时候是没有太大影响的。

服务器引导和普通引导有什么区别呢?区别在于,服务器接收到客户端的连接请求,会用一个Channel接受连接,然后用另一个Channel与客户端进行交流,但是客户端只需要一个Channel就可以与服务器进行交互。

关于链式调用

我们发现Bootstrap类可以通过流式语法进行链式调用,这要归功于Bootstrap类的特殊定义。下面我们来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 定义
public abstract class AbstractBootstrap<TBootstrap, TChannel>
where TBootstrap : AbstractBootstrap<TBootstrap, TChannel>
where TChannel : IChannel
// 定义子类
public class Bootstrap : AbstractBootstrap<Bootstrap, IChannel>
// 方法实现
public virtual TBootstrap Group(IEventLoopGroup group)
{
Contract.Requires(group != null);

if (this.group != null)
{
throw new InvalidOperationException("group has already been set.");
}
this.group = group;
return (TBootstrap)this;
}
// 使用
var bootstrap = new Bootstrap();
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast(new EchoClientHandler());
}));

API


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KF8h1wnu-1572421399290)(https://ws1.sinaimg.cn/large/007hF5Quly1g1o4mltliij30jr05mabe.jpg)]

客户端引导

1
2
3
4
5
6
7
8
9
10
11
12
13
var group = new MultithreadEventLoopGroup();
var bootstrap = new Bootstrap();
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast(new EchoClientHandler());
}));
IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse("10.10.10.158"), 3000));
Console.ReadLine();
await clientChannel.CloseAsync();

服务器引导

API:

注意上面箭头指示的是与Bootstrap不一样的方法。
为什么会有子Channel的概念呢,我们看下面这个图:

因为服务器是一对多的,所以有子Channel的概念。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
IEventLoopGroup eventLoop;
eventLoop = new MultithreadEventLoopGroup();
try
{
// 服务器引导程序
var bootstrap = new ServerBootstrap();
bootstrap.Group(eventLoop);
bootstrap.Channel<TcpServerSocketChannel>();
bootstrap.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast(new EchoServerHandler());
}));
IChannel boundChannel = await bootstrap.BindAsync(3000);
Console.ReadLine();
await boundChannel.CloseAsync();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
finally
{
await eventLoop.ShutdownGracefullyAsync();
}

从Channel中引导客户端

  • 场景

    如果我们的服务器需要去第三方获取数据,这时候服务器就需要充当客户端去第三方取数据,这时候就需要在Channel中再开一个客户端获取数据。

  • 方式

    我们最好是从Channel中获取当前EventLoop,这样新开的客户端就跟当前Channel在一个线程中,减少了线程切换带来的开销,尽可能的重用了EventLoop

  • 实现

    1
    2
    3
    // 从Context创建客户端引导
    var bootstrap = new Bootstrap();
    bootstrap.Group(ctx.Channel.EventLoop);

初始化Pipeline

如果要添加的Handler不止一个,我们就需要用到ChannelInitializer,在DotNetty中,我们有十分简单的方法可以初始化一个pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var bootstrap = new Bootstrap();
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;

if (cert != null)
{
pipeline.AddLast("tls", new TlsHandler(stream => new SslStream(stream, true, (sender, certificate, chain, errors) => true), new ClientTlsSettings(targetHost)));
}
pipeline.AddLast(new LoggingHandler());
pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));

pipeline.AddLast("echo", new EchoClientHandler());
}));

ChannelOption

ChannelOption可以在引导的时候将设置批量的设置到所有Channel上,而不必要在每一个Channel建立的时候手动的去指定它的配置,应用场景是比如设置KeepAlive或者设置超时时间。

1
2
bootstrap.Option(ChannelOption.SoKeepalive, true)
.Option(ChannelOption.ConnectTimeout, new TimeSpan(5000));

面向无连接的用户数据报文

UDP的全称是“User Datagram Protocol”,在DotNetty中实现了SocketDatagramChannel来创建无连接的引导,需要注意的是无连接的引导不需要Connect只需要bind即可,代码如下:

1
2
3
4
5
6
7
8
9
10
11
var bootstrap = new Bootstrap();
bootstrap
.Group(group)
.Channel<SocketDatagramChannel>()
.Option(ChannelOption.SoBroadcast, true)
.Handler(new ActionChannelInitializer<IChannel>(channel =>
{
channel.Pipeline.AddLast("Quote", new QuoteOfTheMomentClientHandler());
}));

IChannel clientChannel = await bootstrap.BindAsync(IPEndPoint.MinPort);

关闭

Channel的关闭:

1
await clientChannel.CloseAsync();

EventLoopGroup的关闭:

1
await group.ShutdownGracefullyAsync();

DotNetty完全教程(二)

Excerpt

第一个DotNetty应用程序准备工作NuGet包介绍DotNetty由九个项目构成,在NuGet中都是单独的包,可以按需引用,其中比较重要的几个是以下几个:DotNetty.Common 是公共的类库项目,包装线程池,并行任务和常用帮助类的封装DotNetty.Transport 是DotNetty核心的实现DotNetty.Buffers 是对内存缓冲区管理的封装DotNett…


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

第一个DotNetty应用程序

准备工作

NuGet包介绍

DotNetty由九个项目构成,在NuGet中都是单独的包,可以按需引用,其中比较重要的几个是以下几个:

  • DotNetty.Common 是公共的类库项目,包装线程池,并行任务和常用帮助类的封装
  • DotNetty.Transport 是DotNetty核心的实现
  • DotNetty.Buffers 是对内存缓冲区管理的封装
  • DotNetty.Codes 是对编码器解码器的封装,包括一些基础基类的实现,我们在项目中自定义的协议,都要继承该项目的特定基类和实现
  • DotNetty.Handlers 封装了常用的管道处理器,比如Tls编解码,超时机制,心跳检查,日志等,如果项目中没有用到可以不引用,不过一般都会用到

开始一个项目

  1. 新建一个解决方案
  2. 新建一个项目
  3. 到NuGet中引用 DotNetty.Common DotNetty.Transport DotNetty.Buffers
  4. 开始编写实例代码

编写测试程序

回声测试应用程序编写 源码下载

  1. 新建一个解决方案 名字叫NettyTest

  2. 新建一个项目 名字叫EchoServer

  3. 到NuGet中引用 DotNetty.Common DotNetty.Transport DotNetty.Buffers

  4. 新建一个类 EchoServerHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    using DotNetty.Buffers;
    using DotNetty.Transport.Channels;
    using System;
    using System.Text;

    namespace EchoServer
    {
    /// <summary>
    /// 因为服务器只需要响应传入的消息,所以只需要实现ChannelHandlerAdapter就可以了
    /// </summary>
    public class EchoServerHandler : ChannelHandlerAdapter
    {
    /// <summary>
    /// 每个传入消息都会调用
    /// 处理传入的消息需要复写这个方法
    /// </summary>
    /// <param name="ctx"></param>
    /// <param name="msg"></param>
    public override void ChannelRead(IChannelHandlerContext ctx, object msg)
    {
    IByteBuffer message = msg as IByteBuffer;
    Console.WriteLine("收到信息:" + message.ToString(Encoding.UTF8));
    ctx.WriteAsync(message);
    }
    /// <summary>
    /// 批量读取中的最后一条消息已经读取完成
    /// </summary>
    /// <param name="context"></param>
    public override void ChannelReadComplete(IChannelHandlerContext context)
    {
    context.Flush();
    }
    /// <summary>
    /// 发生异常
    /// </summary>
    /// <param name="context"></param>
    /// <param name="exception"></param>
    public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
    {
    Console.WriteLine(exception);
    context.CloseAsync();
    }
    }
    }

    上面的代码注释已经非常详细了,相信看注释你就能明白这个类大致干了些什么,但是突如其来的一个类还是有点难以理解,那么本着认真负责的精神我会再详细解释一下没有学过Netty的同学难以理解的点:

    1. 问:EchoServerHandler 是干什么用的?回答:Netty帮我们封装了底层的通信过程让我们不需要再关心套接字等网络底层的问题,更加专注于处理业务,何为业务?就是数据来了之后我要怎么办,Handler就是一个处理数据的工厂,那么上面的Handler中我们做了什么事情呢?稍加分析就能发现,我们在接到消息之后打印在了控制台上,之后将消息再发送回去。
    2. 问:WriteAsync 是在干什么?Flush 又是在干什么?答:由于是初学,不灌输太多,大家现在只需要知道数据写入之后并不会直接发出去,Flush的时候才会发出去。
  5. 在自动生成的Program.cs中写入服务器引导程序。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    using DotNetty.Transport.Bootstrapping;
    using DotNetty.Transport.Channels;
    using DotNetty.Transport.Channels.Sockets;
    using System;
    using System.Threading.Tasks;

    namespace EchoServer
    {
    public class Program
    {
    static async Task RunServerAsync()
    {
    IEventLoopGroup eventLoop;
    eventLoop = new MultithreadEventLoopGroup();
    try
    {
    // 服务器引导程序
    var bootstrap = new ServerBootstrap();
    bootstrap.Group(eventLoop);
    bootstrap.Channel<TcpServerSocketChannel>();
    bootstrap.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
    {
    IChannelPipeline pipeline = channel.Pipeline;
    pipeline.AddLast(new EchoServerHandler());
    }));
    IChannel boundChannel = await bootstrap.BindAsync(3000);
    Console.ReadLine();
    await boundChannel.CloseAsync();
    }
    catch (Exception ex)
    {
    Console.WriteLine(ex);
    }
    finally
    {
    await eventLoop.ShutdownGracefullyAsync();
    }
    }
    static void Main(string[] args) => RunServerAsync().Wait();
    }
    }

    这个程序中同样有很多需要解释的,但是对于初学者来说,先明白这些概念就好了:

    1. bootstrap是启动引导的意思,Netty中的bootstrap的意思就是启动一个网络应用程序,那在启动之前我们肯定需要设置很多参数,bootstrap可以接收参数,引导用户启动Netty应用。
    2. EventLoopGroup 是一系列EventLoop的集合
    3. EventLoop 就对应了一个选择器(选择器看上一节的图)
    4. 一个Channel都需要绑定到一个选择器(EventLoop)上
    5. 每一个选择器(EventLoop)和一个线程绑定
    6. 我们可以把Handler串起来处理数据,这个我们后面再讲,这里的做法是把Handler串到pipeline上。
  6. 再新建一个项目取名叫EchoClient

  7. 新建一个类 EchoClientHandler

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    using DotNetty.Buffers;
    using DotNetty.Transport.Channels;
    using System;
    using System.Text;

    namespace EchoClient
    {
    public class EchoClientHandler : SimpleChannelInboundHandler<IByteBuffer>
    {
    /// <summary>
    /// Read0是DotNetty特有的对于Read方法的封装
    /// 封装实现了:
    /// 1. 返回的message的泛型实现
    /// 2. 丢弃非该指定泛型的信息
    /// </summary>
    /// <param name="ctx"></param>
    /// <param name="msg"></param>
    protected override void ChannelRead0(IChannelHandlerContext ctx, IByteBuffer msg)
    {
    if (msg != null)
    {
    Console.WriteLine("Receive From Server:" + msg.ToString(Encoding.UTF8));
    }
    ctx.WriteAsync(Unpooled.CopiedBuffer(msg));
    }
    public override void ChannelReadComplete(IChannelHandlerContext context)
    {
    context.Flush();
    }
    public override void ChannelActive(IChannelHandlerContext context)
    {
    Console.WriteLine("发送Hello World");
    context.WriteAndFlushAsync(Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes("Hello World!")));
    }

    public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
    {
    Console.WriteLine(exception);
    context.CloseAsync();
    }
    }
    }

    Handler的编写方法于上面服务器的Handler基本一致,这里我们还是需要解释一些问题:

    1. SimpleChannelInboundHandler 继承自 ChannelHandlerAdapter,前者更强大的地方是对于资源的自动释放(这是一个伏笔)
    2. Read0方法在代码的注释中已经解释过了,有兴趣的同学可以看一下源码。这里我就不贴出来了
    3. ctx.WriteAsync(Unpooled.CopiedBuffer(msg));如果这里直接将msg发送出去,大家就会发现,实验失败了,这是为什么呢?简单解释就是因为引用计数器机制,IByteBuffer只能使用一次,而在我们使用Read0方法接收这个消息的时候,这个消息的引用计数就被归零了,这时候我们再次使用就会报出异常,所以这里需要将源消息再复制一份。当然,如果你使用的Read方法则不会有这样的问题。原则上来说,我们不应该存储指向任何消息的引用供未来使用,因为这些引用都会自动失效(意思就是消息收到了处理完就丢掉,消息不应该被长久保存)。
  8. 编写客户端引导程序

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    using DotNetty.Transport.Bootstrapping;
    using DotNetty.Transport.Channels;
    using DotNetty.Transport.Channels.Sockets;
    using System;
    using System.Net;
    using System.Threading.Tasks;

    namespace EchoClient
    {
    class Program
    {
    static async Task RunClientAsync()
    {
    var group = new MultithreadEventLoopGroup();
    try
    {
    var bootstrap = new Bootstrap();
    bootstrap
    .Group(group)
    .Channel<TcpSocketChannel>()
    .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
    {
    IChannelPipeline pipeline = channel.Pipeline;
    pipeline.AddLast(new EchoClientHandler());
    }));
    IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse("10.10.10.158"), 3000));
    Console.ReadLine();
    await clientChannel.CloseAsync();
    }
    catch (Exception ex)
    {
    Console.WriteLine(ex);
    }
    finally
    {
    await group.ShutdownGracefullyAsync();
    }
    }
    static void Main(string[] args) => RunClientAsync().Wait();
    }
    }

写在最后

项目的完整代码我放在了码云上,你可以点击这里可以下载。我相信很多完全没有接触过Netty的同学在跟着写完了第一个项目之后还是很懵,虽然解释了很多,但是还是感觉似懂非懂,这很正常。就如同我们写完HelloWorld之后,仍然会纠结一下static void Main(string[] args)为什么要这么写。我要说的是,只要坚持写完了第一个应用程序,你就是好样的,关于Netty我们还有很多很多要讲,相信你学了之后的知识以后,回过头来再看这个实例,会有恍然大悟的感觉。如果你坚持看完了文章并且敲了程序并且试验成功了,恭喜你,晚饭加个鸡腿,我们还有很多东西要学。

DotNetty完全教程(五)

Excerpt

ChannelHandler本篇文章着重介绍ChannelHandlerChannel的生命周期我们复习一下,Channel是Socket的抽象,可以被注册到一个EventLoop上,EventLoop相当于Selector,每一个EventLoop又有自己的处理线程。复习了这部分的知识,我们就知道在Channel的生命中,有以下这么几个关键的时间节点。ChannelHandler的生…

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

ChannelHandler

本篇文章着重介绍ChannelHandler

Channel的生命周期

我们复习一下,Channel是Socket的抽象,可以被注册到一个EventLoop上,EventLoop相当于Selector,每一个EventLoop又有自己的处理线程。复习了这部分的知识,我们就知道在Channel的生命中,有以下这么几个关键的时间节点。

ChannelHandler的生命周期

我们复习一下,ChannelHandler是定义了如何处理数据的处理器,被串在ChannelPipeline中用于入站或者出站数据的处理。既然是处理Channel中的数据,就需要关注很多的时间节点,比如Channel被激活,比如,读取到了数据,所以,ChannelHandler不仅需要关心数据何时来,还需要关注Channel处于一个什么样的状态,所以ChannelHandler的生命周期如下:
在这里插入图片描述

使用适配器类创建自己的Handler

你可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter
类作为自己的 ChannelHandler 的起始点。使用的时候我们只需要扩展使用这些适配器类,然后重新我们需要的方法即可。

注意适配器类都有IsSharable属性,标识这个Hanlder能不能被添加到多个Pipeline中。