0%

Springboot+Netty搭建MQTT协议的服务端(基础Demo)_netty mqtt-CSDN博客

Excerpt

文章浏览阅读2.7w次,点赞49次,收藏191次。本文介绍了如何使用SpringBoot和Netty框架构建一个基于MQTT协议的服务端。通过添加相关依赖,创建启动类,定义Netty的MQTT处理类以及消息回调方法,实现了MQTT连接、订阅、发布等基本功能。此外,还提到了使用Eclipse的Paho工具进行客户端测试,并展示了不同MQTT报文类型的示例。


Netty是业界最流行的nio框架之一,结合springboot可以满足快速开发

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上的。MQTT协议的可以用在物联网、小型设备、还有移动应用上。

Netty也可以实现MQTT协议,他的内部封装了MQTT协议的相关对象。

Springboot+Netty搭建MQTT协议的服务端基础Demo代码案例

使用Netty+SpringBoot方式可以快速地开发一套基于MQTT协议(主要是MQTT3.1和MQTT3.1.1)的服务端程序

SpringBoot+Netty创建,pom.xml文件导入依赖包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
<?xml version="1.0"?>

<project

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"

xmlns="http://maven.apache.org/POM/4.0.0"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

<modelVersion>4.0.0</modelVersion>

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.0.5.RELEASE</version>

<relativePath />

</parent>

<groupId>boot.base.mqtt.server</groupId>

<artifactId>boot-example-base-mqtt-server-2.0.5</artifactId>

<version>0.0.1-SNAPSHOT</version>

<name>boot-example-base-mqtt-server-2.0.5</name>

<url>http:

<properties>

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<java.version>1.8</java.version>

</properties>

<dependencies>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<dependency>

<groupId>io.netty</groupId>

<artifactId>netty-all</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-devtools</artifactId>

<optional>true</optional>

</dependency>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<scope>test</scope>

</dependency>

</dependencies>

<build>

<plugins>

<!-- 打包成一个可执行jar -->

<plugin>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-maven-plugin</artifactId>

<executions>

<execution>

<goals>

<goal>repackage</goal>

</goals>

</execution>

</executions>

</plugin>

</plugins>

</build>

</project>

 Springboot启动类,直接在main里面启动netty的MQTT服务(也包含web应用的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package boot.example.mqtt.server;

import boot.example.mqtt.server.netty.BootNettyMqttServerThread;

import org.springframework.boot.CommandLineRunner;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import boot.example.mqtt.server.netty.BootNettyMqttServer;

@SpringBootApplication

public class BootNettyMqttApplication implements CommandLineRunner {

public static void main( String[] args ) {

SpringApplication app = new SpringApplication(BootNettyMqttApplication.class);

app.run(args);

}

@Override

public void run(String... args) throws Exception {

int port = 1883;

BootNettyMqttServerThread bootNettyMqttServerThread = new BootNettyMqttServerThread(port);

bootNettyMqttServerThread.start();

}

}

Netty的MQTT启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package boot.example.mqtt.server.netty;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.buffer.PooledByteBufAllocator;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.mqtt.MqttDecoder;

import io.netty.handler.codec.mqtt.MqttEncoder;

import io.netty.handler.timeout.IdleStateHandler;

public class BootNettyMqttServer {

private NioEventLoopGroup bossGroup;

private NioEventLoopGroup workGroup;

public void startup(int port) {

try {

bossGroup = new NioEventLoopGroup(1);

workGroup = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup, workGroup);

bootstrap.channel(NioServerSocketChannel.class);

bootstrap.option(ChannelOption.SO_REUSEADDR, true)

.option(ChannelOption.SO_BACKLOG, 1024)

.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

.option(ChannelOption.SO_RCVBUF, 10485760);

bootstrap.childOption(ChannelOption.TCP_NODELAY, true)

.childOption(ChannelOption.SO_KEEPALIVE, true)

.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

protected void initChannel(SocketChannel ch) {

ChannelPipeline channelPipeline = ch.pipeline();

channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));

channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);

channelPipeline.addLast("decoder", new MqttDecoder());

channelPipeline.addLast(new BootNettyMqttChannelInboundHandler());

}

});

ChannelFuture f = bootstrap.bind(port).sync();

if(f.isSuccess()){

System.out.println("startup success port = " + port);

f.channel().closeFuture().sync();

} else {

System.out.println("startup fail port = " + port);

}

} catch (Exception e) {

System.out.println("start exception"+e.toString());

}

}

public void shutdown() throws InterruptedException {

if (workGroup != null && bossGroup != null) {

bossGroup.shutdownGracefully().sync();

workGroup.shutdownGracefully().sync();

System.out.println("shutdown success");

}

}

}

MQTT服务端I/O数据读写处理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package boot.example.mqtt.server.netty;

import io.netty.channel.Channel;

import io.netty.channel.ChannelHandler;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.ChannelInboundHandlerAdapter;

import io.netty.handler.codec.mqtt.*;

import java.io.IOException;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable

public class BootNettyMqttChannelInboundHandler extends ChannelInboundHandlerAdapter {

private final Logger log = LoggerFactory.getLogger(this.getClass());

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {

if (null != msg) {

MqttMessage mqttMessage = (MqttMessage) msg;

log.info("info--"+mqttMessage.toString());

MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();

Channel channel = ctx.channel();

if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){

BootNettyMqttMsgBack.connack(channel, mqttMessage);

}

switch (mqttFixedHeader.messageType()){

case PUBLISH:

System.out.println("123");

BootNettyMqttMsgBack.puback(channel, mqttMessage);

break;

case PUBREL:

BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);

break;

case SUBSCRIBE:

BootNettyMqttMsgBack.suback(channel, mqttMessage);

break;

case UNSUBSCRIBE:

BootNettyMqttMsgBack.unsuback(channel, mqttMessage);

break;

case PINGREQ:

BootNettyMqttMsgBack.pingresp(channel, mqttMessage);

break;

case DISCONNECT:

break;

default:

break;

}

}

}

@Override

public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {

}

@Override

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

super.channelRegistered(ctx);

}

@Override

public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

super.channelUnregistered(ctx);

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

super.exceptionCaught(ctx, cause);

ctx.close();

}

@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {

super.channelActive(ctx);

}

@Override

public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {

super.channelInactive(ctx);

}

@Override

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {

super.userEventTriggered(ctx, evt);

ctx.close();

}

@Override

public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

super.channelWritabilityChanged(ctx);

}

}

对MQTT客户端发送消息后,处理的返回消息,基于MQTT协议的,需要MQTT协议的主要内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package boot.example.mqtt.server.netty;

import java.util.ArrayList;

import java.util.List;

import java.util.Set;

import java.util.stream.Collectors;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import io.netty.channel.Channel;

import io.netty.handler.codec.mqtt.MqttConnAckMessage;

import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;

import io.netty.handler.codec.mqtt.MqttConnectMessage;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;

import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;

import io.netty.handler.codec.mqtt.MqttFixedHeader;

import io.netty.handler.codec.mqtt.MqttMessage;

import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;

import io.netty.handler.codec.mqtt.MqttMessageType;

import io.netty.handler.codec.mqtt.MqttPubAckMessage;

import io.netty.handler.codec.mqtt.MqttPublishMessage;

import io.netty.handler.codec.mqtt.MqttQoS;

import io.netty.handler.codec.mqtt.MqttSubAckMessage;

import io.netty.handler.codec.mqtt.MqttSubAckPayload;

import io.netty.handler.codec.mqtt.MqttSubscribeMessage;

import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;

public class BootNettyMqttMsgBack {

private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class);

public static void connack (Channel channel, MqttMessage mqttMessage) {

MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;

MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();

MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();

MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());

MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);

MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);

log.info("back--"+connAck.toString());

channel.writeAndFlush(connAck);

}

public static void puback (Channel channel, MqttMessage mqttMessage) {

MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;

MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();

MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();

byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];

mqttPublishMessage.payload().readBytes(headBytes);

String data = new String(headBytes);

System.out.println("publish data--"+data);

switch (qos) {

case AT_MOST_ONCE:

break;

case AT_LEAST_ONCE:

MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());

MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);

MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);

log.info("back--"+pubAck.toString());

channel.writeAndFlush(pubAck);

break;

case EXACTLY_ONCE:

MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);

MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());

MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);

log.info("back--"+mqttMessageBack.toString());

channel.writeAndFlush(mqttMessageBack);

break;

default:

break;

}

}

public static void pubcomp (Channel channel, MqttMessage mqttMessage) {

MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();

MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);

MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());

MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);

log.info("back--"+mqttMessageBack.toString());

channel.writeAndFlush(mqttMessageBack);

}

public static void suback(Channel channel, MqttMessage mqttMessage) {

MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;

MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();

MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());

Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());

List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());

for (int i = 0; i < topics.size(); i++) {

grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());

}

MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);

MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());

MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);

log.info("back--"+subAck.toString());

channel.writeAndFlush(subAck);

}

public static void unsuback(Channel channel, MqttMessage mqttMessage) {

MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();

MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());

MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);

MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);

log.info("back--"+unSubAck.toString());

channel.writeAndFlush(unSubAck);

}

public static void pingresp (Channel channel, MqttMessage mqttMessage) {

MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);

MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);

log.info("back--"+mqttMessageBack.toString());

channel.writeAndFlush(mqttMessageBack);

}

}

启动MQTT的线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package boot.example.mqtt.server.netty;

public class BootNettyMqttServerThread extends Thread {

private final int port;

public BootNettyMqttServerThread(int port){

this.port = port;

}

public void run() {

BootNettyMqttServer bootNettyMqttServer = new BootNettyMqttServer();

bootNettyMqttServer.startup(this.port);

}

}

使用springboot+netty很容易在极短的时间内搭建基于MQTT协议的服务端,简单地DEMO应用仅仅只需要几个类就能满足要求,这个Demo应用是基于Netty4.x的。

使用Netty来搭建MQTT协议的服务端DEMO搭建好了,需要一个MQTT客户端来做测试,我使用的是eclipse的paho工具来测试的,支持window客户端版本,我使用的是(org.eclipse.paho.ui.app-1.1.1-win32.win32.x86_64.zip)  UI的意思是可以桌面用的,当然需要java的jdk支持的,此工具官网有的。

测试效果截图

DEMO仅仅只是DEMO,他只是单方面的支持客户端到服务端创建连接,订阅,取消订阅,发布消息等,最主要地还是要看到MQTT创建过程和数据

在类中,使用了log日志,使用springboot默认的log日志配置,便可以得到MQTT创建过程中的数据

MQTT协议主要由三部分组成

  • 固定头(MqttFixedHeader):所有的 MQTT 数据包都有,用于表示数据包类型及对应标识,还有数据包的大小
  • 可变头(variableHeader):部分的 MQTT 数据包中有,需要根据协议中具体类型来决定
  • 消息体(payload):部分的 MQTT 数据包中有,具体数据信息(关键真正业务用到的数据哦)

具体的MQTT协议需要参考文档

我们使用的是netty封装的mqtt类,在(io.netty.handler.codec.mqtt)里toString()方法得到报文信息参考如下

1.连接服务器(CONNECT)和确认连接请求(CONNACK)

客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是CONNECT报文

1
2
3
4
5
6
7
8
9
MqttConnectMessage[

fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=35],

variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=20],

payload=MqttConnectPayload[clientIdentifier=paho1614153936872000000, willTopic=null, willMessage=null, userName=null, password=null]

]

从CONNECT报文中,我们可以看到很多的信息,协议标识,协议级别,会话,遗嘱,用户,密码等,我这里抓取的报文只是一个基础参考

服务端发送CONNACK报文响应从客户端收到的CONNECT报文,服务端发送给客户端的第一个报文必须是CONNACK,这里也只是一个参考,具体需要根据CONNECT来返回报文

1
2
3
4
5
6
7
MqttConnAckMessage[

fixedHeader=MqttFixedHeader[messageType=CONNACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2],

variableHeader=MqttConnAckVariableHeader[connectReturnCode=CONNECTION_ACCEPTED, sessionPresent=true],

payload=]

2.订阅主题(SUBSCRIBE)和确认订阅(SUBACK)

客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题,为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端,SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端,具体的应用,DEMO里只是体现了订阅主题的过程,实际业务并不是如此简单,订阅主题的报文参考如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
MqttSubscribeMessage[

fixedHeader=MqttFixedHeader[messageType=SUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=56],

variableHeader=MqttMessageIdVariableHeader[messageId=1],

payload=MqttSubscribePayload[

MqttTopicSubscription[topicFilter=test/netty/post, qualityOfService=AT_MOST_ONCE],

MqttTopicSubscription[topicFilter=test/netty/get, qualityOfService=AT_LEAST_ONCE],

MqttTopicSubscription[topicFilter=test/netty/event, qualityOfService=EXACTLY_ONCE]

]]

服务端发送SUBACK报文给客户端,用于确认它已收到并且正在处理SUBSCRIBE报文,SUBACK报文包含一个返回码清单,它们指定了SUBSCRIBE请求的每个订阅被授予的最大QoS等级,确认订阅的报文参考如下

1
2
3
4
5
6
7
8
9
MqttSubAckMessage[

fixedHeader=MqttFixedHeader[messageType=SUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=5],

variableHeader=MqttMessageIdVariableHeader[messageId=1],

payload=MqttSubAckPayload[grantedQoSLevels=[0, 1, 2]]

]

3.取消订阅(UNSUBSCRIBE)和取消订阅确认(UNSUBACK)

客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题,参考报文如下

1
2
3
4
5
6
7
8
9
MqttUnsubscribeMessage[

fixedHeader=MqttFixedHeader[messageType=UNSUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=53],

variableHeader=MqttMessageIdVariableHeader[messageId=2],

payload=MqttUnsubscribePayload[topicName = test/netty/post, topicName = test/netty/get, topicName = test/netty/event]

]

服务端发送UNSUBACK报文给客户端用于确认收到UNSUBSCRIBE报文,参考报文如下

1
2
3
4
5
6
7
8
9
MqttUnsubAckMessage[

fixedHeader=MqttFixedHeader[messageType=UNSUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2],

variableHeader=MqttMessageIdVariableHeader[messageId=2],

payload=

]

4.心跳请求(PINGREQ)和心跳响应(PINGRESP)

客户端发送PINGREQ报文给服务端的,在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着,请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开,参考报文如下

1
2
3
4
5
6
7
8
9
MqttMessage[

fixedHeader=MqttFixedHeader[messageType=PINGREQ, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0],

variableHeader=,

payload=

]

服务端发送PINGRESP报文响应客户端的PINGREQ报文,表示服务端还活着

1
2
3
4
5
6
7
8
9
MqttMessage[

fixedHeader=MqttFixedHeader[messageType=PINGRESP, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0],

variableHeader=,

payload=

]

5.断开连接(DISCONNECT)客户端主动断开连接

DISCONNECT报文是客户端发给服务端的最后一个控制报文,表示客户端正常断开连接,而服务端不需要返回消息了,处理业务逻辑便可。

1
2
3
4
5
6
7
8
9
MqttMessage[

fixedHeader=MqttFixedHeader[messageType=DISCONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0],

variableHeader=,

payload=

]

6.发布和订阅,根据(QoS等级)

  • 发布消息(PUBLISH): PUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息 
  • 发布确认(PUBACK): PUBACK报文是对QoS 1等级的PUBLISH报文的响应
  • 发布收到(PUBREC): PUBREC报文是对QoS等级2的PUBLISH报文的响应,它是QoS 2等级协议交换的第二个报文 
  • 发布释放(PUBREL): PUBREL报文是对PUBREC报文的响应,它是QoS 2等级协议交换的第三个报文
  • 发布完成(PUBCOMP): PUBCOMP报文是对PUBREL报文的响应,它是QoS 2等级协议交换的第四个也是最后一个报文

1).QoS0-至多一次,最多一次

客户端->服务端  PUBLISH  服务端无需向客户端发送确认消息,这就是最多一次消息,参考报文

1
2
3
4
5
6
7
8
9
MqttPublishMessage[

fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=25],

variableHeader=MqttPublishVariableHeader[topicName=test/netty/post, packetId=-1],

payload=PooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: PooledUnsafeDirectByteBuf(ridx: 27, widx: 27, cap: 496))

]

其中playload的数据可以用下面代码获取

1
2
3
4
5
6
7
8
9
10
11
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;

MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();

MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();

byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];

mqttPublishMessage.payload().readBytes(headBytes);

String data = new String(headBytes);

2).QoS1-至少一次,服务器下发确认消息

客户端->服务端 PUBLISH 参考报文

1
2
3
4
5
6
7
MqttPublishMessage[

fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=26],

variableHeader=MqttPublishVariableHeader[topicName=test/netty/get, packetId=4],

payload=PooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 480))]

服务端->客户端 PUBACK 参考报文

1
2
3
4
5
6
7
MqttPubAckMessage[

fixedHeader=MqttFixedHeader[messageType=PUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2],

variableHeader=MqttMessageIdVariableHeader[messageId=4],

payload=]

3).QoS2-刚好一次(共四个报文)

客户端->服务端 PUBLISH 第一个报文

1
2
3
4
5
6
7
MqttPublishMessage[

fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=EXACTLY_ONCE, isRetain=false, remainingLength=28],

variableHeader=MqttPublishVariableHeader[topicName=test/netty/post, packetId=5],

payload=PooledSlicedByteBuf(ridx: 0, widx: 9, cap: 9/9, unwrapped: PooledUnsafeDirectByteBuf(ridx: 30, widx: 30, cap: 496))]

服务端->客户端 PUBREC 第二个报文

1
2
3
4
5
6
7
MqttMessage[

fixedHeader=MqttFixedHeader[messageType=PUBREC, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=2],

variableHeader=MqttMessageIdVariableHeader[messageId=5],

payload=]

客户端->服务端 PUBREL 第三个报文

1
2
3
4
5
6
7
MqttMessage[

fixedHeader=MqttFixedHeader[messageType=PUBREL, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=2],

variableHeader=MqttMessageIdVariableHeader[messageId=5],

payload=]

服务端->客户端 PUBCOMP 第四个报文

1
2
3
4
5
6
7
MqttMessage[

fixedHeader=MqttFixedHeader[messageType=PUBCOMP, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2],

variableHeader=MqttMessageIdVariableHeader[messageId=5],

payload=]

这仅仅只是一个DEMO,不涉及任何业务,主要参考了(第一章 - MQTT介绍 · MQTT协议中文版)MQTT中文版协议