由于项目开发过程中需要对接物联网设备 , 并且MQTT协议在物联网应用中有着及其广泛的应用, 这里首先简单实现一个demo级别的mqtt-server
核心在于理解MQTT协议的工作方式以及具体的流程。
本文不涉及任何业务代码
完整代码请参考 https://github.com/adorabled4/demo0828 下 com.dhx.dem0828.mqtt
关于MQTT协议
对于后端开发人员来说MQTT协议并不模式, 常用的消息队列如RabbitMQ , ActiveMQ等都支持MQTT协议。
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅
(publish/subscribe
)模式的“轻量级”通讯协议
该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,MQTT协议在物联网应用上有着非常广阔的应用场景。
MQTT协议的主要特点包括
-
**轻量级:**开销低、报文小,消耗资源更少,即使在有限的能力下也能实现高效的通信。
-
**可靠:**MQTT 支持多种 QoS 等级、会话感知和持久连接,即使在困难的条件下也能保证消息的可靠传递。
QoS 0(最多一次,At most once):
- 这是最低的服务质量级别。消息被发布后,不进行确认,也不进行重新传输。发布者发送消息后,不会知道消息是否被成功接收。
- 消息会尽力传递,但可能会丢失或重复。这种级别适用于对消息的可靠性要求不高的情况。
QoS 1(至少一次,At least once):
- 在这个级别下,发布者会收到一个消息确认(PUBACK)来确认消息已经被接收,如果没有确认,发布者会尝试重新发送消息。
- 这个级别保证消息至少传递一次,但可能会导致消息重复。适用于需要确保消息到达,但可以容忍重复的情况。
QoS 2(仅一次,Exactly once):
- 最高的服务质量级别,确保消息只传递一次且不会重复。它通过使用两个确认(PUBREC 和 PUBCOMP)来实现。
- 发布者发送消息后会等待完整的确认过程,确保消息不会丢失,也不会重复传递。适用于对消息的可靠性要求非常高的情况,但它会引入一些额外的开销。
选择适当的服务质量级别取决于应用程序的需求和性能要求。
通常情况下,QoS 0用于需要低延迟和可以容忍消息丢失的场景,QoS 1用于需要保证消息传递至少一次的场景,而QoS 2则用于对消息传递的可靠性要求非常高的场景。
-
持久性会话:MQTT客户端可以选择创建持久性会话,以便在重新连接时接收未传递的消息。这对于确保不会丢失重要消息非常有用。
-
会话状态保持:MQTT代理服务器可以维护客户端的会话状态,以确保客户端能够重新连接并保持之前的订阅和消息处理状态。
-
安全性:MQTT可以与TLS/SSL一起使用,以加密和保护消息传递过程中的数据安全性。也支持基本的用户名和密码身份验证。
以下几个概念是理解MQTT协议的重点 , 接下来会做简单的介绍 , 具体内容以 https://www.runoob.com/w3cnote/mqtt-intro.html 为准
- MQTT 客户端
- MQTT Broker
- 发布-订阅模式
MQTT客户端
任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端
- 客户端可以是传感器、嵌入式设备、移动应用程序、服务器或任何能够实现 MQTT 协议的实体。
- 客户端可以发布消息、订阅主题、接收消息或执行其他与 MQTT 协议相关的操作。
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。
其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT服务端
MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:
- 接受来自客户的网络连接;
- 接受客户发布的应用信息;
- 处理来自客户端的订阅和退订请求;
- 向订阅的客户转发应用程序消息。
发布-订阅模式
MQTT是基于发布(Publish)/**订阅(Subscribe)**模式来进行通信及数据交换的,
与HTTP的***请求(Request)应答(Response)***的模式有本质的不同
订阅者(Subscriber)会向 消息服务器(Broker)订阅一个主题(Topic)。
成功订阅后,消息服务器会将该主题下的消息转发给所有订阅者
主题(Topic)以’/‘为分隔符区分不同的层级,包含通配符’+’ 或 ‘#’的主题又称为主题过滤器(Topic Filters); 不含通配符的成为主题名(Topic Names)
+
:表示单层通配符,例如 a/+
匹配 a/x
或 a/y
。
#
:表示多层通配符,例如 a/#
匹配 a/x
、a/b/c/d
。
- 注意:通配符主题只能用于订阅,不能用于发布。
环境
item |
version |
JDK |
1.8 |
springboot |
2.5.6 |
Mqtt |
3.13 |
安装Broker
对于上图的结构 , 我们需要安装一个Broker用来在 消息发布者与 消息订阅者之间传播消息。
这里使用EMQX , 并通过Docker来安装
官网 : https://www.emqx.io/docs/zh/v4/getting-started/getting-started.html#安装-emqx
1 2
| docker pull emqx/emqx:latest docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest
|
mqtt-server实现
首先 , 我们需要明确的是 , 对于MQTT协议中真正的组成部分是
- publisher
- broker
- subscriber
前文提到的 客户端 , 服务端只是具体的实现 , 不可脱离本质。
那么接下来实现的mqtt-server , 兼顾 publisher以及 subscriber的功能
这里可以简单设想一个场景
假定有一个智能药盒 , 每隔一段时间需要发送消息 , 通知用户吃药。
业务服务器中定义一个broker , 用来通知用户吃药 , 同时我们也需要在业务服务器中定义一个subscriber ,用来执行相关的业务操作
那么接下来通过springboot来实现一个mqtt-server , 主要的功能有
引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
|
进行MQTT相关配置
创建MQTT服务端连接工具类MqttConfig,用于加载配置参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Configuration @ConfigurationProperties(MqttConfig.PREFIX) @Getter @Setter public class MqttConfig { public static final String PREFIX = "mqtt"; private String host; private String clientid; private String username; private String password; private boolean cleansession; private String default_topic; private int timeout; private int keepalive; private int connectionTimeout; }
|
在application.properties文件中配置如下内容 :
注意, MQTT协议的URI通常使用tcp
而不是mqtt
来指定协议,因此正确的形式是tcp://localhost:1883
而不是mqtt://localhost:1883
。
在大多数情况下,使用tcp://
来指定MQTT Broker的URI是标准做法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| mqtt.host=tcp://localhost:1883
mqtt.username=admin
mqtt.password=admin
mqtt.cleansession=false
mqtt.clientid=mqtt_publish
mqtt.default_topic=测试
mqtt.timeout=1000
mqtt.keepalive = 10
mqtt.connectionTimeout=3000
|
配置MQTT连接
建立MQTT服务端连接类MqttConnect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Component public class MqttConnect {
@Autowired private MqttConfig config;
public MqttConnect(MqttConfig config) { this.config = config; }
public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(config.isCleansession()); options.setUserName(config.getUsername()); options.setPassword(config.getPassword().toCharArray()); options.setConnectionTimeout(config.getConnectionTimeout()); options.setKeepAliveInterval(config.getKeepalive()); return options; }
public MqttConnectOptions getOptions(MqttConnectOptions options) {
options.setCleanSession(options.isCleanSession()); options.setUserName(options.getUserName()); options.setPassword(options.getPassword()); options.setConnectionTimeout(options.getConnectionTimeout()); options.setKeepAliveInterval(options.getKeepAliveInterval()); return options; }
}
|
创建mqtt-server
这里在server中设置了两个MqttClient
是为了让发布者和订阅者分开
如果订阅者和发布者都用一个MqttClient
链接对象,则会出现两方都订阅了某个主题后谁发送了消息,都会自己接收到自己发的消息,
所以分开,里面主要就是回调类的设置setCallback

| @Service @Slf4j public class MqttServer { private MqttClient subscribeClient; private MqttClient publishClient; public MqttTopic topic; public MqttMessage message; @Autowired private MqttConnect mqttConnect; @Autowired private MqttConfig config; public MqttServer() {log.info("mqtt-server启动!");}
public MqttClient publishConnect() { try { if (publishClient == null) { publishClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence()); } MqttConnectOptions options = mqttConnect.getOptions(); if (!publishClient.isConnected()) { publishClient.connect(options); log.info("---------------------连接成功"); } else { publishClient.disconnect(); publishClient.connect(mqttConnect.getOptions(options)); log.info("---------------------连接成功"); } } catch (MqttException e) { log.info(e.toString()); } return publishClient; }
public void subscribeConnect() { try { if (subscribeClient == null) { subscribeClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence()); subscribeClient.setCallback(new PushCallback(MqttServer.this)); } MqttConnectOptions options = mqttConnect.getOptions(); if (!subscribeClient.isConnected()) { subscribeClient.connect(options); } else { subscribeClient.disconnect(); subscribeClient.connect(mqttConnect.getOptions(options)); } log.info("----------客户端连接成功"); } catch (MqttException e) { log.info(e.getMessage(), e); } }
public boolean publish(MqttTopic topic, MqttMessage message) {
MqttDeliveryToken token = null; try { token = topic.publish(message); token.waitForCompletion(); boolean flag = token.isComplete();
StringBuffer sbf = new StringBuffer(200); sbf.append("给主题为'" + topic.getName()); sbf.append("'发布消息:"); if (flag) { sbf.append("成功!消息内容是:" + new String(message.getPayload())); } else { sbf.append("失败!"); } log.info(sbf.toString()); } catch (MqttException e) { log.info(e.toString()); } return token.isComplete(); }
public void sendMQTTMessage(String topic, String data, int qos) {
try { this.publishClient = publishConnect(); this.topic = this.publishClient.getTopic(topic); message = new MqttMessage(); message.setQos(qos); message.setRetained(false); message.setPayload(data.getBytes()); publish(this.topic, message); } catch (Exception e) { log.info(e.toString()); e.printStackTrace(); } }
public void init(String topic, int qos) { subscribeConnect(); try { subscribeClient.subscribe(topic, qos); } catch (MqttException e) { log.info(e.getMessage(), e); } }
public void unionInit(String topic) { subscribeConnect(); try { subscribeClient.unsubscribe(topic); } catch (MqttException e) { log.info(e.getMessage(), e); } } }
|
消息接收回调-PushCallback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
|
@Slf4j public class PushCallback implements MqttCallback {
private MqttServer MqttServer;
public PushCallback(MqttServer MqttServer) { this.MqttServer = MqttServer; } public void connectionLost(Throwable cause) { log.info("---------------------连接断开,可以做重连"); MqttServer.subscribeConnect();
while (true) { try { Thread.sleep(1000); break; } catch (Exception e) { continue; } }
}
public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); }
public void messageArrived(String topic, MqttMessage message) throws Exception { String result = new String(message.getPayload(), StandardCharsets.UTF_8); System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息内容 : " + result); }
}
|
控制器代码
定义两个接口 ,分别用来发送消息以及订阅topic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @RequestMapping("/mqtt") @RestController public class MqttController {
@Resource private MqttServer mqttServer;
@GetMapping(value = "/send") public String testRec(String topic, String message, int qos) { mqttServer.sendMQTTMessage(topic, message, 0); String data = "发送了一条主题是‘" + topic + "’,内容是:" + message + ",消息级别 " + qos; return data; }
@GetMapping(value = "/sub") public String testSub(String topic) { mqttServer.init(topic, 2); mqttServer.subscribeConnect(); String data = "订阅主题: " + topic + " , 成功"; return data; } }
|
到这里基本的server已经实现完毕了 , 下面我们通过MQTTX来进行模拟。
通过MQTTX模拟mqtt客户端&测试
创建客户端
下载地址 : https://mqttx.app/zh/downloads
首先我们需要添加一个客户端
item |
val |
名称 |
任意 |
Client_id |
任意 |
服务端地址 |
填写springboot的运行地址(本机直接localhost即可) |
端口 |
对应springboot的application.properties中的端口配置 |
用户名 |
对应springboot的application.properties中的端口配置 |
密码 |
对应springboot的application.properties中的端口配置 |
接着我们需要进行subscribe , 这里选择订阅接口topic
测试
重复上面的操作 , 添加一个相同的Client , 并且订阅主题为test
接着启动springboot项目 , 访问
http://localhost:8080/mqtt/sub?topic=test
订阅test
接着我们通过MQTTX来发布消息
查看控制台以及另一个Client
可以看到都接收到了client_1发送的消息.
接着访问http://localhost:8080/mqtt/pub?topic=test&message=This%20is%20springboot_interface%20,%20can%20ACK?&qos=1
, 通过server来发布消息
MQTTX中的两个客户端都接收到了消息。
到这里我们的代码的主要功能基本测试完毕了, 通过简单的功能结合业务代码,可以帮助我们更好的构建项目。
参考