EMQX(MQTT协议)服务和测试工具安装及整合SringBoot后的使用
创始人
2025-01-08 13:07:02
0

文章目录

  • 一、WINDOWS下搭建MQTT服务EMQX
    • 1、下载服务
    • 2、 启动服务
  • 二、测试工具
    • 1、安装
    • 2、汉化及使用
  • 三、整合SpringBoot
    • 1、导入Maven依赖
    • 2、配置
      • 2.1、配置文件
      • 2.2、常量
    • 3、MQTT服务类
    • 4、MQTT服务回调类
    • 5、MQTT配置类
    • 6、测试类
  • 总结


一、WINDOWS下搭建MQTT服务EMQX

1、下载服务

官网地址:

https://www.emqx.io/downloads

进入官网后下载window压缩包
在这里插入图片描述

2、 启动服务

解压zip文件,修改白名单。在etc目录下找到acl.conf文件。
在这里插入图片描述
在回到上一级目录,进入bin目录,在终端输入pushd + bin目录地址,例:

pushd D:\Environment\emqx\bin 

启动服务,终端输入:

emqx.cmd start 

打开浏览器输入:

localhost:18083

出现如下界面便是EMQX服务已经启动。
在这里插入图片描述
EMQX初始的用户名:admin 密码:public 。登录后设置中文页面。

在这里插入图片描述
在这里插入图片描述

二、测试工具

1、安装

官网地址:

https://mqttx.app/zh/downloads

安装过于简单,自行下载安装。

2、汉化及使用

在这里插入图片描述

添加一个连接
在这里插入图片描述
刷新管理端页面
在这里插入图片描述

三、整合SpringBoot

1、导入Maven依赖

                              org.springframework.boot             spring-boot-starter-integration                               org.springframework.integration             spring-integration-stream                               org.springframework.integration             spring-integration-mqtt                   

2、配置

2.1、配置文件

# MQTT配置 mqtt:   # emqx服务器地址   host: tcp://127.0.0.1:1883   # 用户名   username: ces   # 密码   password: ces1   # 客户端id   clientId: service_${random.uuid}   # 定义连接超时时间 默认为10秒 如果未在属性中指定 则使用默认值   connectionTimeout: 10   # 消息服务质量   qos: 2   # 连接保持检查周期 秒   keepAliveInterval: 20   # 开启自动重连   automaticReconnect: true   # 是否清除会话session   cleanSession: true   # 默认订阅主题   defaultSubscribeTopic: washingMachine/online/+,airConditioner/online/+   # 是否保留发布消息   retained: false 

2.2、常量

package net.rakan.distributedservice.common.constant;  import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration;  /**  * MQTT常量  * @author LiChangRui on 2024/4/21 16:53  */ @Data @Slf4j @Configuration @ConfigurationProperties(prefix = "mqtt") public class MqttConstants {     /**      * emqx服务器地址      */     private String host;     /**      * 用户名      */     private String username;     /**      * 密码      */     private String password;     /**      * 客户端id      */     private String clientId;     /**      * 定义连接超时时间 默认为10秒 如果未在属性中指定 则使用默认值      */     private int connectionTimeout;     /**      * 消息服务质量      */     private int qos;     /**      * 连接保持检查周期 秒      */     private int keepAliveInterval;     /**      * 开启自动重连      */     private Boolean automaticReconnect;     /**      * 是否清除会话session      */     private Boolean cleanSession;     /**      * 默认订阅主题      */     private String defaultSubscribeTopic;     /**      * 是否保留发布消息      */     private Boolean retained; } 

3、MQTT服务类

package net.rakan.distributedservice.common.service;  import lombok.extern.slf4j.Slf4j; import net.rakan.distributedservice.common.callback.MqttCallback; import net.rakan.distributedservice.common.constant.MqttConstants; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;  import java.util.Arrays;  /**  * MQTT服务  * @author LiChangRui on 2024/4/21 17:03  */ @Slf4j @Component public class MqttService {      /**      * MQTT常量      */     @Autowired     private MqttConstants mqttConstants;     /**      * MQTT连接      */     private MqttClient mqttClient;       private void setMqttClient(MqttClient mqttClient) {         this.mqttClient = mqttClient;     }      /**      * 推送消息      * @author LiChangRui on 2024/4/21 18:02      */     public void publish(String topic, String msg) {         try {             mqttClient.publish(topic, msg.getBytes(), mqttConstants.getQos(), mqttConstants.getRetained());         } catch (MqttException e) {             log.error("MQTT推送消息失败!");         }     }      /**      * 订阅消息      * @author LiChangRui on 2024/4/21 21:54      */     public void subscribe(String topic) {         log.info("开始订阅主题:" + topic + "。");         try {             mqttClient.subscribe(topic, mqttConstants.getQos());         } catch (MqttException e) {             log.error("MQTT订阅主题失败!");         }     }      /**      * 订阅消息      * @author LiChangRui on 2024/4/21 21:54      */     public void subscribe(String[] topic) {         int[] qos = new int[topic.length];         Arrays.fill(qos, mqttConstants.getQos());         log.info("开始订阅主题:" + String.join(",", topic) + "。");         try {             mqttClient.subscribe(topic, qos);         } catch (MqttException e) {             log.error("MQTT订阅主题失败!");         }     }      /**      * MQTT连接      * @author LiChangRui on 2024/4/22 9:48      */     public void connect(String host, String clientId, MqttCallback mqttCallback, MqttConnectOptions options) {         MqttClient mqttClient;         try {             // 设置连接参数             mqttClient = new MqttClient(host, clientId);             // 设置回调             mqttClient.setCallback(mqttCallback);             // 连接             mqttClient.connect(options);         } catch (MqttException e) {             log.error("连接失败:" + e.getMessage() + "!");             return;         }         this.setMqttClient(mqttClient);         log.info("连接成功!");     } } 

4、MQTT服务回调类

package net.rakan.distributedservice.common.callback;  import lombok.extern.slf4j.Slf4j; import net.rakan.distributedservice.common.constant.MqttConstants; import net.rakan.distributedservice.common.service.MqttService; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration;  /**  * MQTT回调  * @author LiChangRui on 2024/4/21 17:03  */ @Slf4j @Configuration public class MqttCallback implements MqttCallbackExtended {      /**      * MQTT常量      */     @Autowired     private MqttConstants mqttConstants;     /**      * MQTT服务      */     @Autowired     private MqttService mqttService;       /**      * 客户端断开后触发      * @author LiChangRui on 2024/4/21 18:19      */     @Override     public void connectionLost(Throwable throwable) {         log.info("客户端连接断开!");         // 已经设置断线重新连接 所以这里不用写重连的逻辑 只需要写断开连接的业务逻辑     }      /**      * 客户端收到消息触发      * @author LiChangRui on 2024/4/21 18:21      */     @Override     public void messageArrived(String topic, MqttMessage mqttMessage) {         log.info("接收消息主题 : " + topic);         log.info("接收消息Qos : " + mqttMessage.getQos());         log.info("接收消息内容 : " + new String(mqttMessage.getPayload()));     }      /**      * 发布消息成功      * @author LiChangRui on 2024/4/21 18:23      */     @Override     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {         String[] topics = iMqttDeliveryToken.getTopics();         for (String topic : topics) {             log.info("向主题:" + topic + "发送消息成功!");         } //        try { //            // 在消息被传递之前,正在传递的消息将被返回。一旦消息被传递,将返回null //            MqttMessage message = iMqttDeliveryToken.getMessage(); //            byte[] payload = message.getPayload(); //            String s = new String(payload, StandardCharsets.UTF_8); //            log.info("消息的内容是:" + s + "。"); //        } catch (MqttException e) { //            log.error("获取发送消息内容失败!"); //        }     }      /**      * 客户端连接成功      * @author LiChangRui on 2024/4/21 18:23      */     @Override     public void connectComplete(boolean reconnect, String serverURI) {         log.info("客户端连接成功!");         String[] topic = mqttConstants.getDefaultSubscribeTopic().split(",");         mqttService.subscribe(topic);     } } 

5、MQTT配置类

package net.rakan.distributedservice.common.config;  import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import net.rakan.distributedservice.common.callback.MqttCallback; import net.rakan.distributedservice.common.constant.MqttConstants; import net.rakan.distributedservice.common.service.MqttService; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration;  /**  * MQTT配置  * @author LiChangRui on 2024/4/21 22:26  */ @Slf4j @Configuration public class MqttConfig {      /**      * MQTT常量      */     @Autowired     private MqttConstants mqttConstants;     /**      * MQTT服务      */     @Autowired     private MqttService mqttService;     /**      * MQTT回调      */     @Autowired     private MqttCallback mqttCallback;      /**      * MQTT连接      * @author LiChangRui on 2024/4/22 9:51      */     @PostConstruct     public void mqttClient() {         MqttConnectOptions options = new MqttConnectOptions();         // 用户名         options.setUserName(mqttConstants.getUsername());         // 密码         options.setPassword(mqttConstants.getPassword().toCharArray());         // 设置连接超时时间         options.setConnectionTimeout(mqttConstants.getConnectionTimeout());         // 开启自动重连         options.setAutomaticReconnect(mqttConstants.getAutomaticReconnect());         // 是否清除会话session         options.setCleanSession(mqttConstants.getCleanSession());         // 设置心跳间隔时间         options.setKeepAliveInterval(mqttConstants.getKeepAliveInterval());         mqttService.connect(mqttConstants.getHost(), mqttConstants.getClientId(), mqttCallback, options);     } } 

6、测试类

package net.rakan.distributedservice.testserver.controller;  import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import net.rakan.distributedservice.common.dto.CancelOrderDTO; import net.rakan.distributedservice.common.dto.PublishDTO; import net.rakan.distributedservice.common.dto.SendMailDTO; import net.rakan.distributedservice.common.service.MqttService; import net.rakan.distributedservice.common.service.RabbitMqService; import net.rakan.distributedservice.common.vo.Result; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;  /**  * 测试  * @author LiChangRui on 2024/3/15 10:14  */ @Tag(name = "测试") @RestController @RequestMapping("/test") public class TestController {      @Autowired     private MqttService mqttService;      /**      * 推送消息      * @author LiChangRui on 2024/3/15 13:59      */     @Operation(summary = "推送消息")     @PostMapping("/publish")     public Result publish(@RequestBody @Validated PublishDTO dto) {          mqttService.publish(dto.getTopic(), dto.getMsg());          return Result.ok();     }  }  

在这里插入图片描述
控制台消息:
在这里插入图片描述

总结

如果您发现错误,还望及时提醒,共同进步。

相关内容

热门资讯

透视机巧!hhpoker到底可... 透视机巧!hhpoker到底可以作必弊吗,aapoker怎么选牌(透视)一贯有挂(哔哩哔哩)1)hh...
透视指南!hhpoker智能辅... 透视指南!hhpoker智能辅助插件,hhpoker透视工具(透视)竟然有透视app(哔哩哔哩)1、...
透视方针!wepoker科技辅... 透视方针!wepoker科技辅助器,wpk辅助购买(透视)本来是真的挂(哔哩哔哩)1)wepoker...
透视模块!aapoker怎么设... 透视模块!aapoker怎么设置提高好牌几率,aapoker辅助插件工具(透视)切实真的有透视器(哔...
透视办法!智星德州有脚本吗,w... 透视办法!智星德州有脚本吗,wpk透视怎么安装(透视)本来有挂(哔哩哔哩)1、金币登录送、破产送、升...
透视法门!德普辅助软件,德普之... 透视法门!德普辅助软件,德普之星透视辅助插件(透视)竟然存在有脚本技巧(哔哩哔哩)1、这是跨平台的德...
透视阶段!德州透视竞技联盟,约... 透视阶段!德州透视竞技联盟,约局吧透视挂下载(透视)竟然真的有挂(哔哩哔哩)1、金币登录送、破产送、...
透视方式!微扑克微乐辅助,wp... 透视方式!微扑克微乐辅助,wpk有作必弊吗(透视)其实真的有透视技巧(哔哩哔哩)1、许多玩家不知道微...
透视方式!如何下载wpk透视版... 透视方式!如何下载wpk透视版,有哪些免费的wpk作必弊码(透视)真是是有挂(哔哩哔哩)一、有哪些免...
透视技法!德普之星私人局透视,... 透视技法!德普之星私人局透视,德普之星私人局辅助免费(透视)本来存在有透视工具(哔哩哔哩)德普之星私...