由于项目开发过程中需要对接物联网设备 , 并且MQTT协议在物联网应用中有着及其广泛的应用, 这里首先简单实现一个demo级别的mqtt-server

核心在于理解MQTT协议的工作方式以及具体的流程。

本文不涉及任何业务代码

完整代码请参考 https://github.com/adorabled4/demo0828com.dhx.dem0828.mqtt

关于MQTT协议

对于后端开发人员来说MQTT协议并不模式, 常用的消息队列如RabbitMQ , ActiveMQ等都支持MQTT协议。

MQTTMessage Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅publish/subscribe)模式的“轻量级”通讯协议

该协议构建于TCP/IP协议上,由IBM在1999年发布。

MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,MQTT协议在物联网应用上有着非常广阔的应用场景。

MQTT协议的主要特点包括

  • **轻量级:**开销低、报文小,消耗资源更少,即使在有限的能力下也能实现高效的通信。

  • **可靠:**MQTT 支持多种 QoS 等级、会话感知和持久连接,即使在困难的条件下也能保证消息的可靠传递。

    QoS 0(最多一次,At most once)

    • 这是最低的服务质量级别。消息被发布后,不进行确认,也不进行重新传输。发布者发送消息后,不会知道消息是否被成功接收。
    • 消息会尽力传递,但可能会丢失或重复。这种级别适用于对消息的可靠性要求不高的情况

    QoS 1(至少一次,At least once)

    • 在这个级别下,发布者会收到一个消息确认(PUBACK)来确认消息已经被接收,如果没有确认,发布者会尝试重新发送消息。
    • 这个级别保证消息至少传递一次但可能会导致消息重复。适用于需要确保消息到达,但可以容忍重复的情况

    QoS 2(仅一次,Exactly once)

    • 最高的服务质量级别,确保消息只传递一次且不会重复。它通过使用两个确认(PUBREC 和 PUBCOMP)来实现。
    • 发布者发送消息后会等待完整的确认过程,确保消息不会丢失,也不会重复传递。适用于对消息的可靠性要求非常高的情况,但它会引入一些额外的开销。

    选择适当的服务质量级别取决于应用程序的需求和性能要求。

    通常情况下,QoS 0用于需要低延迟和可以容忍消息丢失的场景,QoS 1用于需要保证消息传递至少一次的场景,而QoS 2则用于对消息传递的可靠性要求非常高的场景。

  • 持久性会话:MQTT客户端可以选择创建持久性会话,以便在重新连接时接收未传递的消息。这对于确保不会丢失重要消息非常有用。

  • 会话状态保持:MQTT代理服务器可以维护客户端的会话状态,以确保客户端能够重新连接并保持之前的订阅和消息处理状态。

  • 安全性:MQTT可以与TLS/SSL一起使用,以加密和保护消息传递过程中的数据安全性。也支持基本的用户名和密码身份验证。

以下几个概念是理解MQTT协议的重点 , 接下来会做简单的介绍 , 具体内容以 https://www.runoob.com/w3cnote/mqtt-intro.html 为准

  • MQTT 客户端
  • MQTT Broker
  • 发布-订阅模式

MQTT客户端

任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端

  • 客户端可以是传感器、嵌入式设备、移动应用程序、服务器或任何能够实现 MQTT 协议的实体
  • 客户端可以发布消息订阅主题接收消息或执行其他与 MQTT 协议相关的操作。

实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。

其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

MQTT服务端

MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

  • 接受来自客户的网络连接;
  • 接受客户发布的应用信息;
  • 处理来自客户端的订阅和退订请求;
  • 向订阅的客户转发应用程序消息。

发布-订阅模式

MQTT是基于发布(Publish)/**订阅(Subscribe)**模式来进行通信及数据交换的,

与HTTP的***请求(Request)应答(Response)***的模式有本质的不同
订阅者(Subscriber)会向 消息服务器(Broker)订阅一个主题(Topic)。

成功订阅后,消息服务器会将该主题下的消息转发给所有订阅者
主题(Topic)以’/‘为分隔符区分不同的层级,包含通配符’+’ 或 ‘#’的主题又称为主题过滤器(Topic Filters); 不含通配符的成为主题名(Topic Names)

  • +:表示单层通配符,例如 a/+ 匹配 a/xa/y
  • #:表示多层通配符,例如 a/# 匹配 a/xa/b/c/d
  • 注意:通配符主题只能用于订阅,不能用于发布。

环境

item version
JDK 1.8
springboot 2.5.6
Mqtt 3.13

安装Broker

对于上图的结构 , 我们需要安装一个Broker用来在 消息发布者与 消息订阅者之间传播消息。

这里使用EMQX , 并通过Docker来安装

官网 : https://www.emqx.io/docs/zh/v4/getting-started/getting-started.html#安装-emqx

1
2
docker pull emqx/emqx:latest
docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest

mqtt-server实现

首先 , 我们需要明确的是 , 对于MQTT协议中真正的组成部分是

  • publisher
  • broker
  • subscriber

前文提到的 客户端 , 服务端只是具体的实现 , 不可脱离本质。

那么接下来实现的mqtt-server , 兼顾 publisher以及 subscriber的功能

这里可以简单设想一个场景

假定有一个智能药盒 , 每隔一段时间需要发送消息 , 通知用户吃药。

业务服务器中定义一个broker , 用来通知用户吃药 , 同时我们也需要在业务服务器中定义一个subscriber ,用来执行相关的业务操作

那么接下来通过springboot来实现一个mqtt-server , 主要的功能有

  • 转发
  • 订阅消息

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version> <!-- 请根据需要选择最新版本 -->
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>

进行MQTT相关配置

创建MQTT服务端连接工具类MqttConfig,用于加载配置参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
@ConfigurationProperties(MqttConfig.PREFIX)
@Getter
@Setter
public class MqttConfig {
//指定配置文件application-local.properties中的属性名前缀
public static final String PREFIX = "mqtt";
private String host;
private String clientid;
private String username;
private String password;
private boolean cleansession;
private String default_topic;
private int timeout;
private int keepalive;
private int connectionTimeout;
}

在application.properties文件中配置如下内容 :

注意, MQTT协议的URI通常使用tcp而不是mqtt来指定协议,因此正确的形式是tcp://localhost:1883而不是mqtt://localhost:1883

在大多数情况下,使用tcp://来指定MQTT Broker的URI是标准做法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#MQTT-服务端地址
mqtt.host=tcp://localhost:1883
#MQTT-服务端用户名
mqtt.username=admin
#MQTT-服务端密码
mqtt.password=admin
#MQTT-是否清理session
mqtt.cleansession=false
#MQTT-当前客户端的唯一标识
mqtt.clientid=mqtt_publish
#当前客户端的默认主题(大多数时候没什么用)
mqtt.default_topic=测试
#发送超时时间
mqtt.timeout=1000
#心跳时间
mqtt.keepalive = 10
#连接超时时间
mqtt.connectionTimeout=3000

配置MQTT连接

建立MQTT服务端连接类MqttConnect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Component
public class MqttConnect {

@Autowired
private MqttConfig config;

public MqttConnect(MqttConfig config) {
this.config = config;
}

public MqttConnectOptions getOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(config.isCleansession());
options.setUserName(config.getUsername());
options.setPassword(config.getPassword().toCharArray());
options.setConnectionTimeout(config.getConnectionTimeout());
//设置心跳
options.setKeepAliveInterval(config.getKeepalive());
return options;
}

public MqttConnectOptions getOptions(MqttConnectOptions options) {

options.setCleanSession(options.isCleanSession());
options.setUserName(options.getUserName());
options.setPassword(options.getPassword());
options.setConnectionTimeout(options.getConnectionTimeout());
options.setKeepAliveInterval(options.getKeepAliveInterval());
return options;
}

}

创建mqtt-server

这里在server中设置了两个MqttClient是为了让发布者订阅者分开

如果订阅者和发布者都用一个MqttClient链接对象,则会出现两方都订阅了某个主题后谁发送了消息,都会自己接收到自己发的消息,

所以分开,里面主要就是回调类的设置setCallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@Service
@Slf4j
public class MqttServer {
/* 订阅者客户端对象 */
private MqttClient subscribeClient;
/*发布者客户端对象 */
private MqttClient publishClient;
/* 主题对象 */
public MqttTopic topic;
/* 消息内容对象 */
public MqttMessage message;
@Autowired
private MqttConnect mqttConnect;
@Autowired
private MqttConfig config;
public MqttServer() {log.info("mqtt-server启动!");}
/**
* 发布者客户端和服务端建立连接
*/
public MqttClient publishConnect() {
//防止重复创建MQTTClient实例
try {
if (publishClient == null) {
//先让客户端和服务器建立连接,MemoryPersistence设置clientid的保存形式,默认为以内存保存
publishClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());
//发布消息不需要回调连接
//client.setCallback(new PushCallback());
}
MqttConnectOptions options = mqttConnect.getOptions();
//判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
if (!publishClient.isConnected()) {
publishClient.connect(options);
log.info("---------------------连接成功");
} else {//这里的逻辑是如果连接成功就重新连接
publishClient.disconnect();
publishClient.connect(mqttConnect.getOptions(options));
log.info("---------------------连接成功");
}
} catch (MqttException e) {
log.info(e.toString());
}
return publishClient;
}
/**
* 订阅端的链接方法,关键是回调类的设置,要对订阅的主题消息进行处理
* 断线重连方法,如果是持久订阅,重连时不需要再次订阅
* 如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true);
* true为非持久订阅
*/
public void subscribeConnect() {
try {
//防止重复创建MQTTClient实例
if (subscribeClient == null) {
//clientId不能和其它的clientId一样,否则会出现频繁断开连接和重连的问题
subscribeClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence());// MemoryPersistence设置clientid的保存形式,默认为以内存保存
//如果是订阅者则添加回调类,发布不需要,PushCallback类在后面,继续往下看
subscribeClient.setCallback(new PushCallback(MqttServer.this));
}
MqttConnectOptions options = mqttConnect.getOptions();
//判断拦截状态,这里注意一下,如果没有这个判断,是非常坑的
if (!subscribeClient.isConnected()) {
subscribeClient.connect(options);
} else {//这里的逻辑是如果连接成功就重新连接
subscribeClient.disconnect();
subscribeClient.connect(mqttConnect.getOptions(options));
}
log.info("----------客户端连接成功");
} catch (MqttException e) {
log.info(e.getMessage(), e);
}
}

/**
* 把组装好的消息发出去
*
* @param topic
* @param message
* @return
*/
public boolean publish(MqttTopic topic, MqttMessage message) {

MqttDeliveryToken token = null;
try {
//把消息发送给对应的主题
token = topic.publish(message);
token.waitForCompletion();
//检查发送是否成功
boolean flag = token.isComplete();

StringBuffer sbf = new StringBuffer(200);
sbf.append("给主题为'" + topic.getName());
sbf.append("'发布消息:");
if (flag) {
sbf.append("成功!消息内容是:" + new String(message.getPayload()));
} else {
sbf.append("失败!");
}
log.info(sbf.toString());
} catch (MqttException e) {
log.info(e.toString());
}
return token.isComplete();
}

/**
* MQTT发送指令:主要是组装消息体
*
* @param topic 主题
* @param data 消息内容
* @param qos 消息级别
*/
public void sendMQTTMessage(String topic, String data, int qos) {

try {
this.publishClient = publishConnect();
this.topic = this.publishClient.getTopic(topic);
message = new MqttMessage();
message.setQos(qos);
//如果重复消费,则把值改为true,然后发送一条空的消息,之前的消息就会覆盖,然后在改为false
message.setRetained(false);
message.setPayload(data.getBytes());
//将组装好的消息发出去
publish(this.topic, message);
} catch (Exception e) {
log.info(e.toString());
e.printStackTrace();
}
}

/**
* 订阅端订阅消息
*
* @param topic 要订阅的主题
* @param qos 订阅消息的级别
*/
public void init(String topic, int qos) {
//建立连接
subscribeConnect();
//以某个消息级别订阅某个主题
try {
subscribeClient.subscribe(topic, qos);
} catch (MqttException e) {
log.info(e.getMessage(), e);
}
}

/**
* 订阅端取消订阅消息
*
* @param topic 要订阅的主题
*/
public void unionInit(String topic) {
//建立连接
subscribeConnect();
//取消订阅某个主题
try {
//MQTT 协议中订阅关系是持久化的,因此如果不需要订阅某些 Topic,需要调用 unsubscribe 方法取消订阅关系。
subscribeClient.unsubscribe(topic);
} catch (MqttException e) {
log.info(e.getMessage(), e);
}
}
}

消息接收回调-PushCallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 主要用来接收和处理订阅主题的消息
*/
@Slf4j
public class PushCallback implements MqttCallback {

private MqttServer MqttServer;

public PushCallback(MqttServer MqttServer) {
this.MqttServer = MqttServer;
}

public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
log.info("---------------------连接断开,可以做重连");
MqttServer.subscribeConnect();

while (true) {
try {
//如果没有发生异常说明连接成功,如果发生异常,则死循环
Thread.sleep(1000);
break;
} catch (Exception e) {
continue;
}
}

}

/**
* 发送消息,消息到达后处理方法
*
* @param token
*/
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}

/**
* 接收所订阅的主题的消息并处理
*
* @param topic
* @param message
*/
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
String result = new String(message.getPayload(), StandardCharsets.UTF_8);
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + result);
//这里可以针对收到的消息做处理,比如持久化
}

}

控制器代码

定义两个接口 ,分别用来发送消息以及订阅topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@RequestMapping("/mqtt")
@RestController
public class MqttController {

@Resource
private MqttServer mqttServer;

@GetMapping(value = "/send")
public String testRec(String topic, String message, int qos) {
mqttServer.sendMQTTMessage(topic, message, 0);
String data = "发送了一条主题是‘" + topic + "’,内容是:" + message + ",消息级别 " + qos;
return data;
}

@GetMapping(value = "/sub")
public String testSub(String topic) {
mqttServer.init(topic, 2);
mqttServer.subscribeConnect();
String data = "订阅主题: " + topic + " , 成功";
return data;
}
}

到这里基本的server已经实现完毕了 , 下面我们通过MQTTX来进行模拟。

通过MQTTX模拟mqtt客户端&测试

创建客户端

下载地址 : https://mqttx.app/zh/downloads

首先我们需要添加一个客户端

item val
名称 任意
Client_id 任意
服务端地址 填写springboot的运行地址(本机直接localhost即可)
端口 对应springboot的application.properties中的端口配置
用户名 对应springboot的application.properties中的端口配置
密码 对应springboot的application.properties中的端口配置

接着我们需要进行subscribe , 这里选择订阅接口topic

测试

重复上面的操作 , 添加一个相同的Client , 并且订阅主题为test

接着启动springboot项目 , 访问

http://localhost:8080/mqtt/sub?topic=test订阅test


接着我们通过MQTTX来发布消息

查看控制台以及另一个Client

可以看到都接收到了client_1发送的消息.


接着访问http://localhost:8080/mqtt/pub?topic=test&message=This%20is%20springboot_interface%20,%20can%20ACK?&qos=1 , 通过server来发布消息

MQTTX中的两个客户端都接收到了消息。

到这里我们的代码的主要功能基本测试完毕了, 通过简单的功能结合业务代码,可以帮助我们更好的构建项目。

参考