由于项目开发过程中需要对接物联网设备 , 并且MQTT协议在物联网应用中有着及其广泛的应用, 这里首先简单实现一个demo级别的mqtt-server
核心在于理解MQTT协议的工作方式以及具体的流程。
本文不涉及任何业务代码
完整代码请参考 https://github.com/adorabled4/demo0828 下 com.dhx.dem0828.mqtt
关于MQTT协议
对于后端开发人员来说MQTT协议并不模式, 常用的消息队列如RabbitMQ , ActiveMQ等都支持MQTT协议。
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅
(publish/subscribe
)模式的“轻量级”通讯协议
该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,MQTT协议在物联网应用上有着非常广阔的应用场景。
MQTT协议的主要特点包括
-
**轻量级:**开销低、报文小,消耗资源更少,即使在有限的能力下也能实现高效的通信。
-
**可靠:**MQTT 支持多种 QoS 等级、会话感知和持久连接,即使在困难的条件下也能保证消息的可靠传递。
QoS 0(最多一次,At most once):
- 这是最低的服务质量级别。消息被发布后,不进行确认,也不进行重新传输。发布者发送消息后,不会知道消息是否被成功接收。
- 消息会尽力传递,但可能会丢失或重复。这种级别适用于对消息的可靠性要求不高的情况。
QoS 1(至少一次,At least once):
- 在这个级别下,发布者会收到一个消息确认(PUBACK)来确认消息已经被接收,如果没有确认,发布者会尝试重新发送消息。
- 这个级别保证消息至少传递一次,但可能会导致消息重复。适用于需要确保消息到达,但可以容忍重复的情况。
QoS 2(仅一次,Exactly once):
- 最高的服务质量级别,确保消息只传递一次且不会重复。它通过使用两个确认(PUBREC 和 PUBCOMP)来实现。
- 发布者发送消息后会等待完整的确认过程,确保消息不会丢失,也不会重复传递。适用于对消息的可靠性要求非常高的情况,但它会引入一些额外的开销。
选择适当的服务质量级别取决于应用程序的需求和性能要求。
通常情况下,QoS 0用于需要低延迟和可以容忍消息丢失的场景,QoS 1用于需要保证消息传递至少一次的场景,而QoS 2则用于对消息传递的可靠性要求非常高的场景。
-
持久性会话:MQTT客户端可以选择创建持久性会话,以便在重新连接时接收未传递的消息。这对于确保不会丢失重要消息非常有用。
-
会话状态保持:MQTT代理服务器可以维护客户端的会话状态,以确保客户端能够重新连接并保持之前的订阅和消息处理状态。
-
安全性:MQTT可以与TLS/SSL一起使用,以加密和保护消息传递过程中的数据安全性。也支持基本的用户名和密码身份验证。
以下几个概念是理解MQTT协议的重点 , 接下来会做简单的介绍 , 具体内容以 https://www.runoob.com/w3cnote/mqtt-intro.html 为准
- MQTT 客户端
- MQTT Broker
- 发布-订阅模式
MQTT客户端
任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端
- 客户端可以是传感器、嵌入式设备、移动应用程序、服务器或任何能够实现 MQTT 协议的实体。
- 客户端可以发布消息、订阅主题、接收消息或执行其他与 MQTT 协议相关的操作。
实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。
其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT服务端
MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:
- 接受来自客户的网络连接;
- 接受客户发布的应用信息;
- 处理来自客户端的订阅和退订请求;
- 向订阅的客户转发应用程序消息。
发布-订阅模式
MQTT是基于发布(Publish)/**订阅(Subscribe)**模式来进行通信及数据交换的,
与HTTP的***请求(Request)应答(Response)***的模式有本质的不同
订阅者(Subscriber)会向 消息服务器(Broker)订阅一个主题(Topic)。
成功订阅后,消息服务器会将该主题下的消息转发给所有订阅者
主题(Topic)以’/‘为分隔符区分不同的层级,包含通配符’+’ 或 ‘#’的主题又称为主题过滤器(Topic Filters); 不含通配符的成为主题名(Topic Names)
+
:表示单层通配符,例如 a/+
匹配 a/x
或 a/y
。
#
:表示多层通配符,例如 a/#
匹配 a/x
、a/b/c/d
。
- 注意:通配符主题只能用于订阅,不能用于发布。
环境
item |
version |
JDK |
1.8 |
springboot |
2.5.6 |
Mqtt |
3.13 |
安装Broker
对于上图的结构 , 我们需要安装一个Broker用来在 消息发布者与 消息订阅者之间传播消息。
这里使用EMQX , 并通过Docker来安装
官网 : https://www.emqx.io/docs/zh/v4/getting-started/getting-started.html#安装-emqx
1 2
| docker pull emqx/emqx:latest docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest
|
mqtt-server实现
首先 , 我们需要明确的是 , 对于MQTT协议中真正的组成部分是
- publisher
- broker
- subscriber
前文提到的 客户端 , 服务端只是具体的实现 , 不可脱离本质。
那么接下来实现的mqtt-server , 兼顾 publisher以及 subscriber的功能
这里可以简单设想一个场景
假定有一个智能药盒 , 每隔一段时间需要发送消息 , 通知用户吃药。
业务服务器中定义一个broker , 用来通知用户吃药 , 同时我们也需要在业务服务器中定义一个subscriber ,用来执行相关的业务操作
那么接下来通过springboot来实现一个mqtt-server , 主要的功能有
引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
|
进行MQTT相关配置
创建MQTT服务端连接工具类MqttConfig,用于加载配置参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Configuration @ConfigurationProperties(MqttConfig.PREFIX) @Getter @Setter public class MqttConfig { public static final String PREFIX = "mqtt"; private String host; private String clientid; private String username; private String password; private boolean cleansession; private String default_topic; private int timeout; private int keepalive; private int connectionTimeout; }
|
在application.properties文件中配置如下内容 :
注意, MQTT协议的URI通常使用tcp
而不是mqtt
来指定协议,因此正确的形式是tcp://localhost:1883
而不是mqtt://localhost:1883
。
在大多数情况下,使用tcp://
来指定MQTT Broker的URI是标准做法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| mqtt.host=tcp://localhost:1883
mqtt.username=admin
mqtt.password=admin
mqtt.cleansession=false
mqtt.clientid=mqtt_publish
mqtt.default_topic=测试
mqtt.timeout=1000
mqtt.keepalive = 10
mqtt.connectionTimeout=3000
|
配置MQTT连接
建立MQTT服务端连接类MqttConnect
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Component public class MqttConnect {
@Autowired private MqttConfig config;
public MqttConnect(MqttConfig config) { this.config = config; }
public MqttConnectOptions getOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(config.isCleansession()); options.setUserName(config.getUsername()); options.setPassword(config.getPassword().toCharArray()); options.setConnectionTimeout(config.getConnectionTimeout()); options.setKeepAliveInterval(config.getKeepalive()); return options; }
public MqttConnectOptions getOptions(MqttConnectOptions options) {
options.setCleanSession(options.isCleanSession()); options.setUserName(options.getUserName()); options.setPassword(options.getPassword()); options.setConnectionTimeout(options.getConnectionTimeout()); options.setKeepAliveInterval(options.getKeepAliveInterval()); return options; }
}
|
创建mqtt-server
这里在server中设置了两个MqttClient
是为了让发布者和订阅者分开
如果订阅者和发布者都用一个MqttClient
链接对象,则会出现两方都订阅了某个主题后谁发送了消息,都会自己接收到自己发的消息,
所以分开,里面主要就是回调类的设置setCallback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
| @Service @Slf4j public class MqttServer { private MqttClient subscribeClient; private MqttClient publishClient; public MqttTopic topic; public MqttMessage message; @Autowired private MqttConnect mqttConnect; @Autowired private MqttConfig config; public MqttServer() {log.info("mqtt-server启动!");}
public MqttClient publishConnect() { try { if (publishClient == null) { publishClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence()); } MqttConnectOptions options = mqttConnect.getOptions(); if (!publishClient.isConnected()) { publishClient.connect(options); log.info("---------------------连接成功"); } else { publishClient.disconnect(); publishClient.connect(mqttConnect.getOptions(options)); log.info("---------------------连接成功"); } } catch (MqttException e) { log.info(e.toString()); } return publishClient; }
public void subscribeConnect() { try { if (subscribeClient == null) { subscribeClient = new MqttClient(config.getHost(), config.getClientid(), new MemoryPersistence()); subscribeClient.setCallback(new PushCallback(MqttServer.this)); } MqttConnectOptions options = mqttConnect.getOptions(); if (!subscribeClient.isConnected()) { subscribeClient.connect(options); } else { subscribeClient.disconnect(); subscribeClient.connect(mqttConnect.getOptions(options)); } log.info("----------客户端连接成功"); } catch (MqttException e) { log.info(e.getMessage(), e); } }
public boolean publish(MqttTopic topic, MqttMessage message) {
MqttDeliveryToken token = null; try { token = topic.publish(message); token.waitForCompletion(); boolean flag = token.isComplete();
StringBuffer sbf = new StringBuffer(200); sbf.append("给主题为'" + topic.getName()); sbf.append("'发布消息:"); if (flag) { sbf.append("成功!消息内容是:" + new String(message.getPayload())); } else { sbf.append("失败!"); } log.info(sbf.toString()); } catch (MqttException e) { log.info(e.toString()); } return token.isComplete(); }
public void sendMQTTMessage(String topic, String data, int qos) {
try { this.publishClient = publishConnect(); this.topic = this.publishClient.getTopic(topic); message = new MqttMessage(); message.setQos(qos); message.setRetained(false); message.setPayload(data.getBytes()); publish(this.topic, message); } catch (Exception e) { log.info(e.toString()); e.printStackTrace(); } }
public void init(String topic, int qos) { subscribeConnect(); try { subscribeClient.subscribe(topic, qos); } catch (MqttException e) { log.info(e.getMessage(), e); } }
public void unionInit(String topic) { subscribeConnect(); try { subscribeClient.unsubscribe(topic); } catch (MqttException e) { log.info(e.getMessage(), e); } } }
|
消息接收回调-PushCallback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
|
@Slf4j public class PushCallback implements MqttCallback {
private MqttServer MqttServer;
public PushCallback(MqttServer MqttServer) { this.MqttServer = MqttServer; } public void connectionLost(Throwable cause) { log.info("---------------------连接断开,可以做重连"); MqttServer.subscribeConnect();
while (true) { try { Thread.sleep(1000); break; } catch (Exception e) { continue; } }
}
public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); }
public void messageArrived(String topic, MqttMessage message) throws Exception { String result = new String(message.getPayload(), StandardCharsets.UTF_8); System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); System.out.println("接收消息内容 : " + result); }
}
|
控制器代码
定义两个接口 ,分别用来发送消息以及订阅topic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @RequestMapping("/mqtt") @RestController public class MqttController {
@Resource private MqttServer mqttServer;
@GetMapping(value = "/send") public String testRec(String topic, String message, int qos) { mqttServer.sendMQTTMessage(topic, message, 0); String data = "发送了一条主题是‘" + topic + "’,内容是:" + message + ",消息级别 " + qos; return data; }
@GetMapping(value = "/sub") public String testSub(String topic) { mqttServer.init(topic, 2); mqttServer.subscribeConnect(); String data = "订阅主题: " + topic + " , 成功"; return data; } }
|
到这里基本的server已经实现完毕了 , 下面我们通过MQTTX来进行模拟。
通过MQTTX模拟mqtt客户端&测试
创建客户端
下载地址 : https://mqttx.app/zh/downloads
首先我们需要添加一个客户端
item |
val |
名称 |
任意 |
Client_id |
任意 |
服务端地址 |
填写springboot的运行地址(本机直接localhost即可) |
端口 |
对应springboot的application.properties中的端口配置 |
用户名 |
对应springboot的application.properties中的端口配置 |
密码 |
对应springboot的application.properties中的端口配置 |
接着我们需要进行subscribe , 这里选择订阅接口topic
测试
重复上面的操作 , 添加一个相同的Client , 并且订阅主题为test
接着启动springboot项目 , 访问
http://localhost:8080/mqtt/sub?topic=test
订阅test
接着我们通过MQTTX来发布消息
查看控制台以及另一个Client
可以看到都接收到了client_1发送的消息.
接着访问http://localhost:8080/mqtt/pub?topic=test&message=This%20is%20springboot_interface%20,%20can%20ACK?&qos=1
, 通过server来发布消息
MQTTX中的两个客户端都接收到了消息。
到这里我们的代码的主要功能基本测试完毕了, 通过简单的功能结合业务代码,可以帮助我们更好的构建项目。
参考