MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种基于发布/订阅模式的轻量级消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用而设计,可以用极少的代码为联网设备提供实时可靠的消息服务。
介绍
- MQTT 教程
**强烈推荐** - 特点
发布/订阅(publish/subscribe)模式- Topic
- 提供一对多的消息发布,解除应用程序耦合
- 轻量高效(MQTT 的最小报文仅为 2 个字节,比 HTTP 占用更少的网络开销),节省带宽
- 可靠的消息传递
- 海量连接支持
- 在线状态感知
- 使用 TCP/IP 提供网络连接,支持 TLS/SSL 安全双向通信
- QoS(Quality of Service levels)
QoS 0消息会发生丢失或重复QoS 1消息将至少传送一次给订阅者QoS 2保证消息仅传送到目的地一次,在发送方和接收方之间需要两个流- 三种消息发布服务质量:
至多一次消息发布完全依赖底层 TCP/IP 网络,会发生消息丢失或重复- 一般用于如:环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送
至少一次确保消息到达,但消息重复可能会发生只有一次确保消息到达一次- 在一些要求比较严格的计费系统中,可以使用此级别,在计费系统中,消息重复或丢失会导致不正确的结果
- 这种最高质量的消息发布服务还可以用于即时通讯类的 APP 的推送,确保用户收到且只会收到一次
- 使用
Last Will和Testament特性通知有关各方客户端异常中断的机制Last Will(遗言机制)用于通知同一主题下的其他设备发送遗言的设备已经断开了连接Testament(遗嘱机制)功能类似于 Last Will
- 保留(Retain)消息
- 每个主题最多可以储存一条保留消息
- 在发送 PUBLISH 的时候把 Retain 设置为 1 或者 True,表示当前消息是保留消息,保留消息进入服务端
- 默认情况下,当保留消息当成普通消息向订阅者转发的时候,保留消息中的 retain 标识会被清除也就是设置为 0,只有当新的订阅建立的时候,发送保留消息的 retain 会设置为 1,表示这是一个保留消息
- Clean Session
主题
- 更多参考
- MQTT 协议根据主题来转发消息
- 每个 Topic 都是一个 UTF-8 的字符串,主题通过
/来区分层级,不能包括控制,类似于 URL 路径,例如:
abc/xyz/1
sensor/1/temperature- MQTT 主题支持以下两种通配符:
+和#+表示单层通配符,例如a/+匹配a/x、a/y#表示多层通配符,例如a/#匹配a/x、a/b/c/d
- 通配符主题只能用于订阅,不能用于发布
$开头的主题必须由服务端来决定其使用方式和场景- 以
$SYS/开头的主题为系统主题,系统主题主要用于获取 MQTT 服务器自身运行状态、消息统计、客户端上下线事件等数据 - 共享订阅
$share/<group>/<topic> $delay前缀用于实现消息的延迟发布
- 以
- 使用斜杠
/分隔不同的层级,例如home/livingroom/temperature。 - 避免前导斜杠,例如
/myhome/temperature。这会引入一个空的顶级层级,没有实际意义且可能导致混淆 - 避免空层级,例如
home//temperature - 区分大小写,例如
home/temperature和Home/Temperature是不同的 Topic。建议统一使用小写字母,以避免混淆 - Topic 长度应尽量短而精炼,因为 Topic 会包含在每个消息中,过长的 Topic 会增加网络开销,尤其对于资源受限的设备。
- MQTT 规范允许 Topic 长度最大为 65535 字节,但实际应用中应远低于此
- 区分数据 Topic 和命令 Topic:
- 明确区分发布数据(状态)和发送命令的 Topic
- 常见的做法是在命令 Topic 中加入
/cmd或/set,在状态 Topic 中加入/status或/state - 例如:
home/livingroom/light/state(发布灯的状态)home/livingroom/light/set(发送命令控制灯)
- 嵌入唯一标识符,在 Topic 中嵌入设备 ID 或其他唯一标识符,以便定位特定设备或服务。例如:
devices/sensor001/temperature
智能家居/物联网设备
-
设备状态/数据上报:
home/livingroom/temperature/current(客厅当前温度)home/bedroom/light/state(卧室灯状态:on/off)home/frontdoor/lock/status(前门锁状态:locked/unlocked)home/garden/soil/moisture(花园土壤湿度)device/12345/sensor/temperature(某个设备的温度传感器数据)
-
设备控制/命令下发:
home/livingroom/light/set(设置客厅灯状态,Payload:{"state": "on", "brightness": 100})home/bedroom/thermostat/cmd(发送命令控制卧室恒温器,Payload:{"target_temp": 22.5})device/12345/cmd/reboot(命令设备重启)
-
设备属性/配置:
device/12345/config/update(更新设备配置)device/12345/firmware/version(获取设备固件版本)
-
报警/事件:
home/security/alarm/triggered(安全警报触发)home/kitchen/smoke_detector/alert(厨房烟雾报警)
车辆物联网 (Connected Car)
-
车辆数据:
vehicle/VIN123/location/latitudevehicle/VIN123/location/longitudevehicle/VIN123/speedvehicle/VIN123/engine/statusvehicle/VIN123/fuel_level
-
车辆控制:
vehicle/VIN123/control/lock(远程锁车)vehicle/VIN123/control/engine/start(远程启动引擎)
-
诊断信息:
vehicle/VIN123/diagnostics/error_codevehicle/VIN123/maintenance/status
MQTT 协议实现
- MQTT 协议中有三种身份:
发布者(Publish)代理(Broker),也称为 MQTT 服务器订阅者(Subscribe)
publish <-> MQTT Broker <-> multi Subscribe- 消息的发布者和订阅者都是
客户端,消息代理是服务器,消息发布者可以同时是订阅者- 客户端
- 发布其他客户端可能会订阅的信息
- 订阅其它客户端发布的消息
- 退订或删除应用程序的消息
- 断开与服务器连接
- 服务器
- 接受来自客户的网络连接
- 接受客户发布的应用信息
- 处理来自客户端的订阅和退订请求
- 向订阅的客户转发应用程序消息
- 客户端
- MQTT 传输的消息分为:
主题(Topic)即消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)- 连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端
- 目的:消息路由,类似与 url,使用
/分割层级,支持通配符+表示通配一个层级,例如a/+匹配a/x、a/y#表示通配多个层级,例如a/#匹配a/x、a/b/c/d
chat/room/1
sensor/1/temperature负载(payload)即消息的内容,是指订阅者接收的内容- 核心概念
- 订阅包含
主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联- 一个会话可以包含多个订阅,每一个会话中的每个订阅都有一个不同的主题筛选器
会话(Session)每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互- 会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接
主题筛选器(Topic Filter)一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题
- 订阅包含
- MQTT 协议中的方法
Connect等待与服务器建立连接,使用Clean Session标志设置会话0表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息
1表示创建一个新的临时会话,在客户端断开时,会话自动销毁
Disconnect等待 MQTT 客户端完成所做的工作,并与服务器断开 TCP/IP 会话Subscribe等待完成订阅UnSubscribe等待服务器取消客户端的一个或多个 topics 订阅PublishMQTT 客户端发送消息请求,发送完成后返回应用程序线程
MQTT 软件实现
-
Mosquitto Broker
-
EMQX
-
NanoMQ
客户端库
参考 https://github.com/mqtt/mqtt.org/wiki/libraries
采集
使用 Telegraf 可以采集 mqtt 信息到 influxdb 数据库