Springboot+Netty搭建MQTT协议的服务端(基础Demo)_netty mqtt-CSDN博客
Excerpt
文章浏览阅读2.7w次,点赞49次,收藏191次。本文介绍了如何使用SpringBoot和Netty框架构建一个基于MQTT协议的服务端。通过添加相关依赖,创建启动类,定义Netty的MQTT处理类以及消息回调方法,实现了MQTT连接、订阅、发布等基本功能。此外,还提到了使用Eclipse的Paho工具进行客户端测试,并展示了不同MQTT报文类型的示例。
Netty是业界最流行的nio框架之一,结合springboot可以满足快速开发
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上的。MQTT协议的可以用在物联网、小型设备、还有移动应用上。
Netty也可以实现MQTT协议,他的内部封装了MQTT协议的相关对象。
Springboot+Netty搭建MQTT协议的服务端基础Demo代码案例
使用Netty+SpringBoot方式可以快速地开发一套基于MQTT协议(主要是MQTT3.1和MQTT3.1.1)的服务端程序
SpringBoot+Netty创建,pom.xml文件导入依赖包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
| <?xml version="1.0"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath />
</parent>
<groupId>boot.base.mqtt.server</groupId>
<artifactId>boot-example-base-mqtt-server-2.0.5</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-example-base-mqtt-server-2.0.5</name>
<url>http:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包成一个可执行jar -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
|
Springboot启动类,直接在main里面启动netty的MQTT服务(也包含web应用的)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package boot.example.mqtt.server;
import boot.example.mqtt.server.netty.BootNettyMqttServerThread;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import boot.example.mqtt.server.netty.BootNettyMqttServer;
@SpringBootApplication
public class BootNettyMqttApplication implements CommandLineRunner {
public static void main( String[] args ) {
SpringApplication app = new SpringApplication(BootNettyMqttApplication.class);
app.run(args);
}
@Override
public void run(String... args) throws Exception {
int port = 1883;
BootNettyMqttServerThread bootNettyMqttServerThread = new BootNettyMqttServerThread(port);
bootNettyMqttServerThread.start();
}
}
|
Netty的MQTT启动类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
| package boot.example.mqtt.server.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;
public class BootNettyMqttServer {
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workGroup;
public void startup(int port) {
try {
bossGroup = new NioEventLoopGroup(1);
workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_RCVBUF, 10485760);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));
channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
channelPipeline.addLast("decoder", new MqttDecoder());
channelPipeline.addLast(new BootNettyMqttChannelInboundHandler());
}
});
ChannelFuture f = bootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("startup success port = " + port);
f.channel().closeFuture().sync();
} else {
System.out.println("startup fail port = " + port);
}
} catch (Exception e) {
System.out.println("start exception"+e.toString());
}
}
public void shutdown() throws InterruptedException {
if (workGroup != null && bossGroup != null) {
bossGroup.shutdownGracefully().sync();
workGroup.shutdownGracefully().sync();
System.out.println("shutdown success");
}
}
}
|
MQTT服务端I/O数据读写处理类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
| package boot.example.mqtt.server.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ChannelHandler.Sharable
public class BootNettyMqttChannelInboundHandler extends ChannelInboundHandlerAdapter {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
if (null != msg) {
MqttMessage mqttMessage = (MqttMessage) msg;
log.info("info--"+mqttMessage.toString());
MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
Channel channel = ctx.channel();
if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){
BootNettyMqttMsgBack.connack(channel, mqttMessage);
}
switch (mqttFixedHeader.messageType()){
case PUBLISH:
System.out.println("123");
BootNettyMqttMsgBack.puback(channel, mqttMessage);
break;
case PUBREL:
BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
break;
case SUBSCRIBE:
BootNettyMqttMsgBack.suback(channel, mqttMessage);
break;
case UNSUBSCRIBE:
BootNettyMqttMsgBack.unsuback(channel, mqttMessage);
break;
case PINGREQ:
BootNettyMqttMsgBack.pingresp(channel, mqttMessage);
break;
case DISCONNECT:
break;
default:
break;
}
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
super.channelInactive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
super.userEventTriggered(ctx, evt);
ctx.close();
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
}
}
|
对MQTT客户端发送消息后,处理的返回消息,基于MQTT协议的,需要MQTT协议的主要内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
| package boot.example.mqtt.server.netty;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
public class BootNettyMqttMsgBack {
private static final Logger log = LoggerFactory.getLogger(BootNettyMqttMsgBack.class);
public static void connack (Channel channel, MqttMessage mqttMessage) {
MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
log.info("back--"+connAck.toString());
channel.writeAndFlush(connAck);
}
public static void puback (Channel channel, MqttMessage mqttMessage) {
MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
mqttPublishMessage.payload().readBytes(headBytes);
String data = new String(headBytes);
System.out.println("publish data--"+data);
switch (qos) {
case AT_MOST_ONCE:
break;
case AT_LEAST_ONCE:
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
log.info("back--"+pubAck.toString());
channel.writeAndFlush(pubAck);
break;
case EXACTLY_ONCE:
MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
log.info("back--"+mqttMessageBack.toString());
channel.writeAndFlush(mqttMessageBack);
break;
default:
break;
}
}
public static void pubcomp (Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);
MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);
log.info("back--"+mqttMessageBack.toString());
channel.writeAndFlush(mqttMessageBack);
}
public static void suback(Channel channel, MqttMessage mqttMessage) {
MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
for (int i = 0; i < topics.size(); i++) {
grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
}
MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());
MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
log.info("back--"+subAck.toString());
channel.writeAndFlush(subAck);
}
public static void unsuback(Channel channel, MqttMessage mqttMessage) {
MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
log.info("back--"+unSubAck.toString());
channel.writeAndFlush(unSubAck);
}
public static void pingresp (Channel channel, MqttMessage mqttMessage) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
log.info("back--"+mqttMessageBack.toString());
channel.writeAndFlush(mqttMessageBack);
}
}
|
启动MQTT的线程类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| package boot.example.mqtt.server.netty;
public class BootNettyMqttServerThread extends Thread {
private final int port;
public BootNettyMqttServerThread(int port){
this.port = port;
}
public void run() {
BootNettyMqttServer bootNettyMqttServer = new BootNettyMqttServer();
bootNettyMqttServer.startup(this.port);
}
}
|
使用springboot+netty很容易在极短的时间内搭建基于MQTT协议的服务端,简单地DEMO应用仅仅只需要几个类就能满足要求,这个Demo应用是基于Netty4.x的。
使用Netty来搭建MQTT协议的服务端DEMO搭建好了,需要一个MQTT客户端来做测试,我使用的是eclipse的paho工具来测试的,支持window客户端版本,我使用的是(org.eclipse.paho.ui.app-1.1.1-win32.win32.x86_64.zip) UI的意思是可以桌面用的,当然需要java的jdk支持的,此工具官网有的。
测试效果截图
DEMO仅仅只是DEMO,他只是单方面的支持客户端到服务端创建连接,订阅,取消订阅,发布消息等,最主要地还是要看到MQTT创建过程和数据
在类中,使用了log日志,使用springboot默认的log日志配置,便可以得到MQTT创建过程中的数据
MQTT协议主要由三部分组成
- 固定头(MqttFixedHeader):所有的 MQTT 数据包都有,用于表示数据包类型及对应标识,还有数据包的大小
- 可变头(variableHeader):部分的 MQTT 数据包中有,需要根据协议中具体类型来决定
- 消息体(payload):部分的 MQTT 数据包中有,具体数据信息(关键真正业务用到的数据哦)
具体的MQTT协议需要参考文档
我们使用的是netty封装的mqtt类,在(io.netty.handler.codec.mqtt)里toString()方法得到报文信息参考如下
1.连接服务器(CONNECT)和确认连接请求(CONNACK)
客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是CONNECT报文
1 2 3 4 5 6 7 8 9
| MqttConnectMessage[
fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=35],
variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=20],
payload=MqttConnectPayload[clientIdentifier=paho1614153936872000000, willTopic=null, willMessage=null, userName=null, password=null]
]
|
从CONNECT报文中,我们可以看到很多的信息,协议标识,协议级别,会话,遗嘱,用户,密码等,我这里抓取的报文只是一个基础参考
服务端发送CONNACK报文响应从客户端收到的CONNECT报文,服务端发送给客户端的第一个报文必须是CONNACK,这里也只是一个参考,具体需要根据CONNECT来返回报文
1 2 3 4 5 6 7
| MqttConnAckMessage[
fixedHeader=MqttFixedHeader[messageType=CONNACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2],
variableHeader=MqttConnAckVariableHeader[connectReturnCode=CONNECTION_ACCEPTED, sessionPresent=true],
payload=]
|
2.订阅主题(SUBSCRIBE)和确认订阅(SUBACK)
客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题,为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端,SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端,具体的应用,DEMO里只是体现了订阅主题的过程,实际业务并不是如此简单,订阅主题的报文参考如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| MqttSubscribeMessage[
fixedHeader=MqttFixedHeader[messageType=SUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=56],
variableHeader=MqttMessageIdVariableHeader[messageId=1],
payload=MqttSubscribePayload[
MqttTopicSubscription[topicFilter=test/netty/post, qualityOfService=AT_MOST_ONCE],
MqttTopicSubscription[topicFilter=test/netty/get, qualityOfService=AT_LEAST_ONCE],
MqttTopicSubscription[topicFilter=test/netty/event, qualityOfService=EXACTLY_ONCE]
]]
|
服务端发送SUBACK报文给客户端,用于确认它已收到并且正在处理SUBSCRIBE报文,SUBACK报文包含一个返回码清单,它们指定了SUBSCRIBE请求的每个订阅被授予的最大QoS等级,确认订阅的报文参考如下
1 2 3 4 5 6 7 8 9
| MqttSubAckMessage[
fixedHeader=MqttFixedHeader[messageType=SUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=5],
variableHeader=MqttMessageIdVariableHeader[messageId=1],
payload=MqttSubAckPayload[grantedQoSLevels=[0, 1, 2]]
]
|
3.取消订阅(UNSUBSCRIBE)和取消订阅确认(UNSUBACK)
客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题,参考报文如下
1 2 3 4 5 6 7 8 9
| MqttUnsubscribeMessage[
fixedHeader=MqttFixedHeader[messageType=UNSUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=53],
variableHeader=MqttMessageIdVariableHeader[messageId=2],
payload=MqttUnsubscribePayload[topicName = test/netty/post, topicName = test/netty/get, topicName = test/netty/event]
]
|
服务端发送UNSUBACK报文给客户端用于确认收到UNSUBSCRIBE报文,参考报文如下
1 2 3 4 5 6 7 8 9
| MqttUnsubAckMessage[
fixedHeader=MqttFixedHeader[messageType=UNSUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2],
variableHeader=MqttMessageIdVariableHeader[messageId=2],
payload=
]
|
4.心跳请求(PINGREQ)和心跳响应(PINGRESP)
客户端发送PINGREQ报文给服务端的,在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着,请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开,参考报文如下
1 2 3 4 5 6 7 8 9
| MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PINGREQ, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0],
variableHeader=,
payload=
]
|
服务端发送PINGRESP报文响应客户端的PINGREQ报文,表示服务端还活着
1 2 3 4 5 6 7 8 9
| MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PINGRESP, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0],
variableHeader=,
payload=
]
|
5.断开连接(DISCONNECT)客户端主动断开连接
DISCONNECT报文是客户端发给服务端的最后一个控制报文,表示客户端正常断开连接,而服务端不需要返回消息了,处理业务逻辑便可。
1 2 3 4 5 6 7 8 9
| MqttMessage[
fixedHeader=MqttFixedHeader[messageType=DISCONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0],
variableHeader=,
payload=
]
|
6.发布和订阅,根据(QoS等级)
- 发布消息(PUBLISH): PUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息
- 发布确认(PUBACK): PUBACK报文是对QoS 1等级的PUBLISH报文的响应
- 发布收到(PUBREC): PUBREC报文是对QoS等级2的PUBLISH报文的响应,它是QoS 2等级协议交换的第二个报文
- 发布释放(PUBREL): PUBREL报文是对PUBREC报文的响应,它是QoS 2等级协议交换的第三个报文
- 发布完成(PUBCOMP): PUBCOMP报文是对PUBREL报文的响应,它是QoS 2等级协议交换的第四个也是最后一个报文
1).QoS0-至多一次,最多一次
客户端->服务端 PUBLISH 服务端无需向客户端发送确认消息,这就是最多一次消息,参考报文
1 2 3 4 5 6 7 8 9
| MqttPublishMessage[
fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=25],
variableHeader=MqttPublishVariableHeader[topicName=test/netty/post, packetId=-1],
payload=PooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: PooledUnsafeDirectByteBuf(ridx: 27, widx: 27, cap: 496))
]
|
其中playload的数据可以用下面代码获取
1 2 3 4 5 6 7 8 9 10 11
| MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
mqttPublishMessage.payload().readBytes(headBytes);
String data = new String(headBytes);
|
2).QoS1-至少一次,服务器下发确认消息
客户端->服务端 PUBLISH 参考报文
1 2 3 4 5 6 7
| MqttPublishMessage[
fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=26],
variableHeader=MqttPublishVariableHeader[topicName=test/netty/get, packetId=4],
payload=PooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 480))]
|
服务端->客户端 PUBACK 参考报文
1 2 3 4 5 6 7
| MqttPubAckMessage[
fixedHeader=MqttFixedHeader[messageType=PUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2],
variableHeader=MqttMessageIdVariableHeader[messageId=4],
payload=]
|
3).QoS2-刚好一次(共四个报文)
客户端->服务端 PUBLISH 第一个报文
1 2 3 4 5 6 7
| MqttPublishMessage[
fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=EXACTLY_ONCE, isRetain=false, remainingLength=28],
variableHeader=MqttPublishVariableHeader[topicName=test/netty/post, packetId=5],
payload=PooledSlicedByteBuf(ridx: 0, widx: 9, cap: 9/9, unwrapped: PooledUnsafeDirectByteBuf(ridx: 30, widx: 30, cap: 496))]
|
服务端->客户端 PUBREC 第二个报文
1 2 3 4 5 6 7
| MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PUBREC, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=2],
variableHeader=MqttMessageIdVariableHeader[messageId=5],
payload=]
|
客户端->服务端 PUBREL 第三个报文
1 2 3 4 5 6 7
| MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PUBREL, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=2],
variableHeader=MqttMessageIdVariableHeader[messageId=5],
payload=]
|
服务端->客户端 PUBCOMP 第四个报文
1 2 3 4 5 6 7
| MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PUBCOMP, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2],
variableHeader=MqttMessageIdVariableHeader[messageId=5],
payload=]
|
这仅仅只是一个DEMO,不涉及任何业务,主要参考了(第一章 - MQTT介绍 · MQTT协议中文版)MQTT中文版协议