在之前的文章中:基于ESP32搭建物联网服务器十一(用WEB页面控制引脚(GPIO)功能)_esp32webserver 控制io_你的幻境的博客-CSDN博客
已经简单地介绍了MQTT协议,对比于其它网络协议,MQTT协议在物联网的开发中,它的特点使它适用于大多数受限的环境。例如网络代价昂贵,带宽低、不可靠,在嵌入设备中运行,处理器和内存资源有限。
下面深入了解一下MQTT协议的特点和优势,下图是一个MQTT的概念图:
如图所示,MQTT基于一个MQTT服务器(MQTT Broker),所有设备或客户端都可以是一个发布设备同时也可以是一个订阅设备,所以,只要你的设备可以连接在同一个MQTT服务器,都可以给其它设备进行发布任务或接收其它设备发布的数据,实现一对多的消息发布,完美地解决设备或应用程序的耦合。
MQTT发布的消息有三种服务质量:
QoS0:最多一次,可能导至发布的消息丢失
QoS1:至少一次,可能导至发布的消息多次发布
QoS2:确保只有一次,保证消息到达对方并且只到达一次
QoS等级越高,系统消耗也越高,在应用时可以根据需求选择合适的QoS等级。
服务器与客户端通信时,当遇到异常或客户端心跳超时的情况,MQTT服务器会替客户端发布一个遗嘱消息。当然如果服务器收到来自客户端的断开的消息(如自主选择断开连接),则不会触发遗嘱消息的发送。 在重要的应用或设备上使用该标记,可以在发生网络故障或网络波动,设备在保持连接周期内未能通讯,连接被服务端关闭,设备意外掉电,设备尝试进行不被允许的操作而被服务端关闭连接,例如订阅自身权限以外的主题等异常时通知第三方。
客户端可以设置一个心跳时间间隔,客户端会周期性地给服务发送一个心跳请求(PINGREQ),服务器收到请求后会回复并响应心跳请求(PINGRESP)。如果客户端在发送心跳请求(PINGREQ)后,没有收到服务端的心跳响应(PINGRESP),那么客户端就会认为自己与服务端的连接已经被断开了。
更多MQTT的介绍可以到首页 | MQTT中文网了解
在ESP32上使用MQTT协议,可以用的库比较多,这里选择pubsubclient ,该库可以找到的资料与文档说明都比较详细,唯一的不足是该库只能发布QoS0的消息和订阅Qos0和Qos1的消息。
该库的项目地址:GitHub - knolleary/pubsubclient: A client library for the Arduino Ethernet Shield that provides support for MQTT.
API文档(英文)地址:Arduino Client for MQTT
创建一个完全配置的客户端实例。
参数:
server IPAddress, uint8_t[] 或 const char[] -服务器地址
port int - 要连接的端口
callback function* (可选) -一个指向消息回调函数的指针,消息到达客户端时调用该函数
client -使用的网络客户端,例如WiFiClient
stream Stream (可选) - 将接收到的消息写入的流
客户端连接到服务器
参数:
clientID const char[] - 连接到服务器时使用的客户端ID
Credentials - (可选)
username const char[] - 要使用的用户名。如果为NULL,则不使用用户名或密码
password const char[] - 要使用的密码。如果为NULL,表示不使用密码
Will - (可选)
willTopic const char[] - 遗嘱消息要使用的主题
willQoS int: 0,1 or 2 - 遗嘱消息将要使用的服务质量
willRetain boolean - 遗嘱是否应以保留标记公布
willMessage const char[] - 遗嘱消息的有效负载
cleanSession boolean (可选) - 是否连接clean-session
返回值:
false - 连接失败
true - 连接成功
检查客户端是否连接到服务器端。
返回
false - 未连接
true - 已连接
需要循环调用该函数,以便客户端用来处理传入消息并维护与服务器的连接。
返回值:
false - 客户端不在连接状态
true - 客户端处于连接状态
将消息发布到指定的主题。
参数:
topic const char[] - 要发布的主题
payload const char[], byte[] - 要发布的消息
length unsigned int (可选) - 有效载荷的长度。如果有效负载是byte[],则为必选项。
retained boolean (可选) - 是否保留消息
false - 不保留
true - 保留
返回:
false - 发布失败,要么连接丢失,要么消息长度超长
true - 发布成功
定阅发送到主题的消息
参数:
topic const char[] - 订阅的主题
qos int: 只能选择 0 或 1 (可选) - 订阅消息的服务质量
返回:
false - 订阅失败,要么连接丢失,要么消息超长
true - 订阅成功
pubsubclient这个库的头文件为PubSubClient.h,同时,该库需要使用一个网络客户端,这里直接使用WiFi库来创建一个网络客户端实例,所以同时要引入WiFi.h这个头文件。
同时,我们创建几个变量,来保存连接服务器所需要的数据,如服务器地址和连接端口。
#include #include const char* mqttServer = "broker.emqx.io"; //MQTT服务器地址,一个公共的免费MQTT服务器 const int mqttPort = 1883; //连接的端口 /*********************************************************************************** * 创建一个网络客户端,用来创建MQTT客户端实例 ***********************************************************************************/ WiFiClient espClient; /*********************************************************************************** * 创建一个完全配置的客户端实例。 * 参数 * mqttServer - 服务器地址 * mqttPort - 要连接的端口 * callback - 一个指向消息回调函数的指针,当消息到达此客户端创建的订阅时调用该函数 * espClient - 使用的网络客户端,例如WiFiClient ***********************************************************************************/ PubSubClient mqtt_client(mqttServer,mqttPort,callback,espClient);
客户端已经设置好,后面我们需要利用connect ()来连接服务器,正常情况下,连接服务器前,我们先确定ESP32是否已连接网络,因为这里我们是用WIFI来连接网络的,所以这里写一个函数来方便调用
/*********************************************************************************** * 函数:连接mqtt服务器 * 返回: * 返回连接状态 ***********************************************************************************/ boolean connect_MQTT(){ if(WiFi.status() == WL_CONNECTED){ //WIFI是否连接 if(mqtt_client.connect("ESP32Client")){ //连接服务器,客户端ID可以自定义 Serial.println("连接MQTT服务器成功"); //输出连接状态 mqtt_client.publish("ESP32", "ESP32已连接MQTT服务器"); //发布一个消息到ESP32主题 mqtt_client.subscribe("ESP32"); //订阅一个ESP32主题 } } return mqtt_client.connected(); //返回连接状态 }
当设备连接上服务器后,大多数情况下,我们会把设备的一些数据发布,比如ESP32的引脚状态,从传感器获取到的数据等。所以,我们还需要创建一个定时发布消息的函数。
/*********************************************************************************** * 函数:定时发布消息到指定主题 ***********************************************************************************/ long lastSendTime = 0; //最后发送时间,该变量为全局变量 void regularPublish(){ long publishNow = millis(); //得到当前时间 if (publishNow - lastSendTime > 5000){ //当前时间-最后发送时间>5000时 lastSendTime = publishNow; //把最后发送时间设为当前时间 mqtt_client.publish("ESP32", "来自esp32定时发布的消息"); //发布一个消息到ESP32主题 } }
除了发布消息,我们也需要ESP32响应其它客户端发布的消息,来控制ESP32或响应对方的指令,如从手机端或电脑发送一个指令来控制ESP32的引脚状态,从而实现多远端来实现远程控制。该函数设定为当ESP32收到一个第一个字符为1的消息后,设置ESP32的2号引脚设为高电平,而收到的字符为0时2号引脚设为低电平。
/*********************************************************************************** * 函数:回调函数,收到消息时的动作,参数是固定的 * 参数: * char* topic:主题 * byte* payload:消息内容 * unsigned int length:消息内容长度 ***********************************************************************************/ void callback(char* topic, byte* payload, unsigned int length) { if(payload[0]=='1'){ digitalWrite(2,1); //2号引脚设为高电平 Serial.println("2号引脚设为高"); }else if(payload[0]=='0'){ Serial.println("2号引脚设为低"); digitalWrite(2,0); //2号引脚设为低电平 }else{ return; } }
该回调函数对应之前创建客户端实例里第三个参数。
PubSubClient mqtt_client(mqttServer,mqttPort,callback,espClient);
对于在线设备,我们需要考虑设备或网络异常的情况,让设备出现意外掉线等情况下自动可以重新连接服务器,这里用一个函数来调用之前连接MQTT服务器的函数,该函数在设备在线的情况下运行loop()函数,来接收已订阅的主题所收到的消息,如果设备出现异常,会每5秒调用之前连接MQTT服务器的函数来重新连接服务器。
/*********************************************************************************** * 函数:mqtt循环 * 如果连接断开,每5秒进行一次重连 ***********************************************************************************/ long lastAttemptTime = 0; //该变量为全局变量,用来保存最后连接的时间 void mqtt_loop(){ if(!mqtt_client.connected()){ //如果未连接MQTT服务器 long now = millis(); //得到当前时间 if (now - lastAttemptTime > 5000){ //当前时间-最后连接时间>5000时 lastAttemptTime = now; //把最后连接时间设为当前时间 mqtt_client.setKeepAlive(60); //心跳时间设为60秒 if(connect_MQTT()){ //如果连接服务器成功 lastAttemptTime = 0; //最后连接时间设为0 } } }else{ regularPublish(); //定时发布消息 mqtt_client.loop(); //MQTT循环 } }
ESP32_web_server_12_mqtt.ino
#include "ESPAsyncWebServer.h" AsyncWebServer server(80); //创建一个服务器对象,WEB服务器端口:80 void setup() { Serial.begin(9600); //串口波特率初始化 LittleFS_begin(); //LittleFS文件系统初始化 connect_NET(); //网络初始化 web_server(); //WEB服务器初始化 GPIO_begin(); //引脚初始化 connect_MQTT(); //连接MQTT服务器 } void loop() { DNS_request_loop(); //DNS服务请求处理 mqtt_loop(); //mqtt服务循环 }
mqtt_server.ino
#include #include /*********************************************************************************** * 库:https://github.com/knolleary/pubsubclient * 该库只能发布QoS0消息,可以订阅QoS0或QoS1消息 * 最大消息,包括头,默认为256个字节,可以通过PubSubClient::setBufferSize(size)重新配置 * 心跳间隔默认为15秒,可以通过PubSubClient::setKeepAlive(keepAlive)重新配置 * 默认mqtt版本为3.1.1。 ***********************************************************************************/ const char* mqttServer = "broker.emqx.io"; //MQTT服务器地址,一个公共的免费MQTT服务器 const int mqttPort = 1883; //连接的端口 const char* mqttUser = "yourMQTTuser"; //公共的MQTT服务器一般都不要求帐号登陆,这里可以不设置 const char* mqttPassword = "yourMQTTpassword"; //公共的MQTT服务器一般都不要求帐号登陆,这里可以不设置 long lastAttemptTime = 0; //最后更新时间 long lastSendTime = 0; //最后发送时间 /*********************************************************************************** * 创建一个网络客户端,用来创建MQTT客户端实例 ***********************************************************************************/ WiFiClient espClient; /*********************************************************************************** * 创建一个完全配置的客户端实例。 * 参数 * mqttServer - 服务器地址 * mqttPort - 要连接的端口 * callback - 一个指向消息回调函数的指针,当消息到达此客户端创建的订阅时调用该函数 * espClient - 使用的网络客户端,例如WiFiClient ***********************************************************************************/ PubSubClient mqtt_client(mqttServer,mqttPort,callback,espClient); /*********************************************************************************** * 函数:定时发布消息到指定主题 ***********************************************************************************/ void regularPublish(){ long publishNow = millis(); //得到当前时间 if (publishNow - lastSendTime > 5000){ //当前时间-最后发送时间>5000时 lastSendTime = publishNow; //把最后发送时间设为当前时间 char dataJson[40]; double data = 3.1415926; char str[] = "{\"来自Esp32发布的定时消息\":%7.lf}"; sprintf(dataJson, str, data ); mqtt_client.publish("ESP32", dataJson); //发布一个消息到ESP32主题 } } /*********************************************************************************** * 函数:回调函数,收到消息时的动作,参数是固定的 * 参数: * char* topic:主题 * byte* payload:消息内容 * unsigned int length:消息内容长度 ***********************************************************************************/ void callback(char* topic, byte* payload, unsigned int length) { /* String str; payload[length]='\0'; //缓冲区最后加入结束字符 str = String((char *)payload); //转为字符串 Serial.print("Message arrived ["); Serial.print(topic); // 打印主题信息 Serial.print("] "); Serial.print(str); Serial.println(); Serial.print("收到的指令:"); Serial.println(payload[0]); */ if(payload[0]=='1'){ digitalWrite(2,1); //2号引脚设为高电平 Serial.println("2号引脚设为高"); }else if(payload[0]=='0'){ Serial.println("2号引脚设为低"); digitalWrite(2,0); //2号引脚设为低电平 }else{ return; } } /*********************************************************************************** * 函数:连接mqtt服务器 * 返回: * 返回连接状态 ***********************************************************************************/ boolean connect_MQTT(){ if(WiFi.status() == WL_CONNECTED){ //WIFI是否连接 if(mqtt_client.connect("mqttx_72b06547",mqttUser,mqttPassword)){ //连接服务器,如果成功 Serial.println("连接MQTT服务器成功"); //输出连接状态 mqtt_client.publish("ESP32", "ESP32已连接MQTT服务器"); //发布一个消息到ESP32主题 mqtt_client.subscribe("ESP32"); //订阅一个ESP32主题 } } return mqtt_client.connected(); //返回连接状态 } /*********************************************************************************** * 函数:mqtt循环 * 如果连接断开,每5秒进行一次重连 ***********************************************************************************/ void mqtt_loop(){ if(!mqtt_client.connected()){ //如果未连接MQTT服务器 long now = millis(); //得到当前时间 if (now - lastAttemptTime > 5000){ //当前时间-最后尝试时间>5000时 lastAttemptTime = now; //把最后尝试时间设为当前时间 mqtt_client.setKeepAlive(60); //心跳时间设为60秒 if(connect_MQTT()){ //如果连接服务器成功 lastAttemptTime = 0; //最后尝试时间设为0 } } }else{ regularPublish(); //定时发布消息 mqtt_client.loop(); //MQTT循环 } }
web_server.ino
#include "ESPAsyncWebServer.h" #include #include /*********************************************************************************** * 函数:引脚初始化 ***********************************************************************************/ void GPIO_begin(){ pinMode(2, OUTPUT); //引脚2设置为输出模式 } /*********************************************************************************** * 函数:响应按键回调函数 ***********************************************************************************/ void GPIO_button(AsyncWebServerRequest *request){ int pin_state = digitalRead(2); String state; digitalWrite(2,(!pin_state)); //每次按下循环地改变引脚状态 if(digitalRead(2)){ state = "开"; }else{ state = "关"; } request->send(200,"text/plain",state); //把状态发送回页面 Serial.print("引脚状态改变为:"); Serial.println(pin_state); } /************************************************************************************** * 函数:字符串写入文件,文件如果存在,将被清零并新建,文件不存在,将新建该文件 * path: 文件的绝对路径 * str: 要写入的字符串 *************************************************************************************/ void str_write(String path, String str){ Serial.println("写入文件"); File wf = LittleFS.open(path,"w"); //以写入模式打开文件 if(!wf){ //如果无法打开文件 Serial.println("打开文件写入时错误"); //显示错误信息 return; //无法打开文件直接返回 } wf.print(str); //字符串写入文件 wf.close(); //关闭文件 File rf = LittleFS.open(path,"r"); //以读取模式打开文件 Serial.print("FILE:");Serial.println(rf.readString()); //读取文件 rf.close(); //关闭文件 } // /********************************************************************************** // * 函数:把收到的POST数据格式化为JSON格式的字符串 // *********************************************************************************/ //String format_json(AsyncWebParameter* post_data , int len){ // String json_name = post_data->name().c_str(); //得到名称 // String json_value = post_data->value.c_str(); //得到值 // StaticJsonDocument json_obj; //创建一个JSON对象 // json_obj[json_name] = json_value; //写入一个名称和值 // String json_str; // serializeJson(wifi_json, wifi_json_str); //生成JOSN的字符串 // return json_str; //返回JOSN字符串 //} /********************************************************************************** * 函数:响应网站/setwifi目录的POST请求,收到请求后,运行get_WIFI_set_CALLback回调函数 * 获取并格式化收到的POST数据 *********************************************************************************/ void get_WIFI_set_CALLback(AsyncWebServerRequest *request){ Serial.println("收到设置WIFI按钮"); if(request->hasParam("wifiname",true)){ AsyncWebParameter* wifiname = request->getParam("wifiname",true); //获取POST数据 AsyncWebParameter* wifipassword = request->getParam("wifipassword",true); //获取POST数据 String wn = wifiname->name().c_str(); String wnv = wifiname->value().c_str(); String wp = wifipassword->name().c_str(); String wpv = wifipassword->value().c_str(); //把SSID和password写成一个JSON格式 StaticJsonDocument<200> wifi_json; //创建一个JSON对象,wifi_json wifi_json[wn] = wnv; //写入一个建和值 wifi_json[wp] = wpv; //写入一个键和值 String wifi_json_str; //定义一个字符串变量 serializeJson(wifi_json, wifi_json_str); //生成JOSN的字符串 str_write("/WIFIConfig.conf",wifi_json_str); //字符串写入 } } /********************************************************************************** * 函数:从文件path中读取字符串 * path: 文件的绝对路径 * return: 返回读取的字符串 *********************************************************************************/ String str_read(String path){ Serial.println("读取文件"); File rf = LittleFS.open(path,"r"); //以读取模式打开文件 if(!rf){ //如果无法打开文件 Serial.println("打开文件读取时错误"); //显示错误信息 return ""; //无法打开文件直接返回 } String str = rf.readString(); //读取字符串 rf.close(); //关闭文件 return str; } /*************************************************************************************** * 函数:解析JSON字符串,从JSON字符串名称得到该值 * str: JSON字符串 * Name: JSON集合的名称 * return: 返回值的字符串 ***************************************************************************************/ String analysis_json(String str, String Name){ DynamicJsonDocument doc(str.length()*2); //定义一个JSON对象 DeserializationError error = deserializeJson(doc, str); //尝试反序列数据,如果失败生成error错误码,成功时将字符串str生成一个JSON对象doc if(error){ //如果反序列数据失败 Serial.print("JSON解析失败:"); //输出错误信息 Serial.println(error.f_str()); //输出错误信息 return ""; //返回 } if(!doc.containsKey(Name)){ //键是否存在 Serial.println("不存在键"); return ""; //返回 } return doc[Name].as(); //返回读取到的字符串 } /*********************************************************************************** * 函数:/WIFIConfig.conf文件中读取设置数据并连接WIFI ***********************************************************************************/ void wifi_connect(){ Serial.println("在conf文件中读取数据并连接WIFI"); String str = str_read("/WIFIConfig.conf"); //读取文件内容 String wifiname = analysis_json(str,"wifiname"); //解析WIFI名称 String wifipassword = analysis_json(str,"wifipassword"); //解析WIFI名称 connect_WIFI(wifiname, wifipassword); //连接WIFI } /*********************************************************************************** * web服务器初始化 ***********************************************************************************/ void web_server(){ Serial.println("初始化WEB服务器"); server.serveStatic("/", LittleFS, "/").setDefaultFile("index.html"); //响应网站根目录的GET请求,返回文件index.html server.on("/setwifi" ,HTTP_POST , get_WIFI_set_CALLback); //响应设置WIFI按钮的请求 server.on("/GPIO2", HTTP_GET, GPIO_button); //响应改变引脚按钮的请求 server.begin(); //初始化 } /******************************************************************************** * LittleFS文件系统初始化 *********************************************************************************/ void LittleFS_begin(){ Serial.println(); Serial.println("初始化文件系统"); if(!LittleFS.begin(true)){ Serial.println("An Error has occurred while mounting LittleFS"); return; } }
wifi_connect.ino
/*WIFI连接*/ #include #include DNSServer dnsserver; /*********************************************************************************** * 函数:连接WIFI * ssid: WIFI名称 * password: WIFI密码 * return: 连接成功返回true ***********************************************************************************/ void connect_WIFI(String ssid, String password){ WiFi.begin(ssid.c_str(), password.c_str()); //连接WIFI Serial.print("连接WIFI"); //循环,10秒后连接不上跳出循环 int i = 0; while(WiFi.status() != WL_CONNECTED){ Serial.print("."); delay(500); i++; if(i>20){ Serial.println(); Serial.println("WIFI连接失败"); return; } } Serial.println(); IPAddress local_IP = WiFi.localIP(); Serial.print("WIFI连接成功,本地IP地址:"); //连接成功提示 Serial.println(local_IP); } /*********************************************************************************** * 设置AP和STA共存模式,设置DNS服务器 ***********************************************************************************/ void connect_NET(){ const byte DNS_PORT = 53; //DNS端口 const String url = "ESPAP.com"; //域名 IPAddress APIp(10,0,10,1); //AP IP IPAddress APGateway(10,0,10,1); //AP网关 IPAddress APSubnetMask(255,255,255,0); //AP子网掩码 const char* APSsid = "esp32_AP"; //AP SSID const char* APPassword = "12345678"; //AP wifi密码 wifi_connect(); //连接WIFI WiFi.mode(WIFI_AP_STA); //打开AP和STA共存模式 WiFi.softAPConfig(APIp, APGateway, APSubnetMask); //设置AP的IP地址,网关和子网掩码 WiFi.softAP(APSsid, APPassword, 6); //设置AP模式的登陆名和密码 dnsserver.start(DNS_PORT, url, APIp); //设置DNS的端口、网址、和IP Serial.print("AP模式IP地址为:"); Serial.println(WiFi.softAPIP()); } /*********************************************************************************** * DNS处理请求的循环 ***********************************************************************************/ void DNS_request_loop(){ dnsserver.processNextRequest(); }
以下文件需要用插件上传到SPIFFS文件系统
data\index.html
EPS32教程 关
data\mystyle.css
div{ background-color:red; }
data\WIFIConfig.conf
该文件需要改动以符合你的WIFI设备,或以AP用手机登陆来生成该文件(生成需要重启设备),详细的操作方法可以找之前的文章:https://blog.csdn.net/m0_50114967/category_12014624.html
{"wifiname":"你的WIFI名称","wifipassword":"你的WIFI密码"}
以上代码上传到ESP32后,在WIFI配置正确的情况下,会自动连接到MQTT服务器(broker.emqx.io),上传前注意打开串口监视器,查看是否正确连接MQTT服务器。
连接成功后,这里我们需要别一个客户端来接收或给ESP32发送指令。
用如果电脑端,我们可以用MQTTX来把电脑做为一个客户端,下载地址:免费下载、试用 EMQ 产品
下载和安装这个工具,运行后,我们新建一个连接
如上图设置好,点击右上角的连接。
连接成功后,我们给该连接添加一个订阅
设置好后点确定来订阅,这个时候,我们应该会每五秒收到一条从ESP32发布的消息
我们在ESP32的2号引脚上连接一个LED,来测试在电脑上发送一个指令来控制该LED的亮灭,同时如果控制成功,在串口监视器上也可以看到提示。
我们用MQTTX发送一个数字"1"或"0",在代码设计中,当ESP32收到时会分别把2号引脚设置为高电平或低电平
发布的内容可以为"1"或"0",发布其它内容ESP32也可以接收到,但不一定能控制引脚的状态
点击发送按钮后,我们观察2号脚上的LED或观察串口监视器,
如上图的状态,就说明我们已经可以通过电脑远端控制ESP32了。
如果有空,大家也可以在手机上下载可以使用MQTT的APP来测试控制ESP32。
以上就是ESP32利用MQTT协议与其它客户端互动的简单实现,结合之前的文章基于ESP32搭建物联网服务器十一(用WEB页面控制引脚(GPIO)功能)_esp32webserver 控制io_你的幻境的博客-CSDN博客内容,我们现在可以用WEB页面和MQTT协议来与ESP32进行互动。
在之后的文章里,将会在安全性、可控范围上继续完备这两种互动方式。