EMQX(MQTT协议)服务和测试工具安装及整合SringBoot后的使用
创始人
2025-01-08 13:07:02
0

文章目录

  • 一、WINDOWS下搭建MQTT服务EMQX
    • 1、下载服务
    • 2、 启动服务
  • 二、测试工具
    • 1、安装
    • 2、汉化及使用
  • 三、整合SpringBoot
    • 1、导入Maven依赖
    • 2、配置
      • 2.1、配置文件
      • 2.2、常量
    • 3、MQTT服务类
    • 4、MQTT服务回调类
    • 5、MQTT配置类
    • 6、测试类
  • 总结


一、WINDOWS下搭建MQTT服务EMQX

1、下载服务

官网地址:

https://www.emqx.io/downloads

进入官网后下载window压缩包
在这里插入图片描述

2、 启动服务

解压zip文件,修改白名单。在etc目录下找到acl.conf文件。
在这里插入图片描述
在回到上一级目录,进入bin目录,在终端输入pushd + bin目录地址,例:

pushd D:\Environment\emqx\bin 

启动服务,终端输入:

emqx.cmd start 

打开浏览器输入:

localhost:18083

出现如下界面便是EMQX服务已经启动。
在这里插入图片描述
EMQX初始的用户名:admin 密码:public 。登录后设置中文页面。

在这里插入图片描述
在这里插入图片描述

二、测试工具

1、安装

官网地址:

https://mqttx.app/zh/downloads

安装过于简单,自行下载安装。

2、汉化及使用

在这里插入图片描述

添加一个连接
在这里插入图片描述
刷新管理端页面
在这里插入图片描述

三、整合SpringBoot

1、导入Maven依赖

                              org.springframework.boot             spring-boot-starter-integration                               org.springframework.integration             spring-integration-stream                               org.springframework.integration             spring-integration-mqtt                   

2、配置

2.1、配置文件

# MQTT配置 mqtt:   # emqx服务器地址   host: tcp://127.0.0.1:1883   # 用户名   username: ces   # 密码   password: ces1   # 客户端id   clientId: service_${random.uuid}   # 定义连接超时时间 默认为10秒 如果未在属性中指定 则使用默认值   connectionTimeout: 10   # 消息服务质量   qos: 2   # 连接保持检查周期 秒   keepAliveInterval: 20   # 开启自动重连   automaticReconnect: true   # 是否清除会话session   cleanSession: true   # 默认订阅主题   defaultSubscribeTopic: washingMachine/online/+,airConditioner/online/+   # 是否保留发布消息   retained: false 

2.2、常量

package net.rakan.distributedservice.common.constant;  import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration;  /**  * MQTT常量  * @author LiChangRui on 2024/4/21 16:53  */ @Data @Slf4j @Configuration @ConfigurationProperties(prefix = "mqtt") public class MqttConstants {     /**      * emqx服务器地址      */     private String host;     /**      * 用户名      */     private String username;     /**      * 密码      */     private String password;     /**      * 客户端id      */     private String clientId;     /**      * 定义连接超时时间 默认为10秒 如果未在属性中指定 则使用默认值      */     private int connectionTimeout;     /**      * 消息服务质量      */     private int qos;     /**      * 连接保持检查周期 秒      */     private int keepAliveInterval;     /**      * 开启自动重连      */     private Boolean automaticReconnect;     /**      * 是否清除会话session      */     private Boolean cleanSession;     /**      * 默认订阅主题      */     private String defaultSubscribeTopic;     /**      * 是否保留发布消息      */     private Boolean retained; } 

3、MQTT服务类

package net.rakan.distributedservice.common.service;  import lombok.extern.slf4j.Slf4j; import net.rakan.distributedservice.common.callback.MqttCallback; import net.rakan.distributedservice.common.constant.MqttConstants; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;  import java.util.Arrays;  /**  * MQTT服务  * @author LiChangRui on 2024/4/21 17:03  */ @Slf4j @Component public class MqttService {      /**      * MQTT常量      */     @Autowired     private MqttConstants mqttConstants;     /**      * MQTT连接      */     private MqttClient mqttClient;       private void setMqttClient(MqttClient mqttClient) {         this.mqttClient = mqttClient;     }      /**      * 推送消息      * @author LiChangRui on 2024/4/21 18:02      */     public void publish(String topic, String msg) {         try {             mqttClient.publish(topic, msg.getBytes(), mqttConstants.getQos(), mqttConstants.getRetained());         } catch (MqttException e) {             log.error("MQTT推送消息失败!");         }     }      /**      * 订阅消息      * @author LiChangRui on 2024/4/21 21:54      */     public void subscribe(String topic) {         log.info("开始订阅主题:" + topic + "。");         try {             mqttClient.subscribe(topic, mqttConstants.getQos());         } catch (MqttException e) {             log.error("MQTT订阅主题失败!");         }     }      /**      * 订阅消息      * @author LiChangRui on 2024/4/21 21:54      */     public void subscribe(String[] topic) {         int[] qos = new int[topic.length];         Arrays.fill(qos, mqttConstants.getQos());         log.info("开始订阅主题:" + String.join(",", topic) + "。");         try {             mqttClient.subscribe(topic, qos);         } catch (MqttException e) {             log.error("MQTT订阅主题失败!");         }     }      /**      * MQTT连接      * @author LiChangRui on 2024/4/22 9:48      */     public void connect(String host, String clientId, MqttCallback mqttCallback, MqttConnectOptions options) {         MqttClient mqttClient;         try {             // 设置连接参数             mqttClient = new MqttClient(host, clientId);             // 设置回调             mqttClient.setCallback(mqttCallback);             // 连接             mqttClient.connect(options);         } catch (MqttException e) {             log.error("连接失败:" + e.getMessage() + "!");             return;         }         this.setMqttClient(mqttClient);         log.info("连接成功!");     } } 

4、MQTT服务回调类

package net.rakan.distributedservice.common.callback;  import lombok.extern.slf4j.Slf4j; import net.rakan.distributedservice.common.constant.MqttConstants; import net.rakan.distributedservice.common.service.MqttService; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration;  /**  * MQTT回调  * @author LiChangRui on 2024/4/21 17:03  */ @Slf4j @Configuration public class MqttCallback implements MqttCallbackExtended {      /**      * MQTT常量      */     @Autowired     private MqttConstants mqttConstants;     /**      * MQTT服务      */     @Autowired     private MqttService mqttService;       /**      * 客户端断开后触发      * @author LiChangRui on 2024/4/21 18:19      */     @Override     public void connectionLost(Throwable throwable) {         log.info("客户端连接断开!");         // 已经设置断线重新连接 所以这里不用写重连的逻辑 只需要写断开连接的业务逻辑     }      /**      * 客户端收到消息触发      * @author LiChangRui on 2024/4/21 18:21      */     @Override     public void messageArrived(String topic, MqttMessage mqttMessage) {         log.info("接收消息主题 : " + topic);         log.info("接收消息Qos : " + mqttMessage.getQos());         log.info("接收消息内容 : " + new String(mqttMessage.getPayload()));     }      /**      * 发布消息成功      * @author LiChangRui on 2024/4/21 18:23      */     @Override     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {         String[] topics = iMqttDeliveryToken.getTopics();         for (String topic : topics) {             log.info("向主题:" + topic + "发送消息成功!");         } //        try { //            // 在消息被传递之前,正在传递的消息将被返回。一旦消息被传递,将返回null //            MqttMessage message = iMqttDeliveryToken.getMessage(); //            byte[] payload = message.getPayload(); //            String s = new String(payload, StandardCharsets.UTF_8); //            log.info("消息的内容是:" + s + "。"); //        } catch (MqttException e) { //            log.error("获取发送消息内容失败!"); //        }     }      /**      * 客户端连接成功      * @author LiChangRui on 2024/4/21 18:23      */     @Override     public void connectComplete(boolean reconnect, String serverURI) {         log.info("客户端连接成功!");         String[] topic = mqttConstants.getDefaultSubscribeTopic().split(",");         mqttService.subscribe(topic);     } } 

5、MQTT配置类

package net.rakan.distributedservice.common.config;  import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import net.rakan.distributedservice.common.callback.MqttCallback; import net.rakan.distributedservice.common.constant.MqttConstants; import net.rakan.distributedservice.common.service.MqttService; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration;  /**  * MQTT配置  * @author LiChangRui on 2024/4/21 22:26  */ @Slf4j @Configuration public class MqttConfig {      /**      * MQTT常量      */     @Autowired     private MqttConstants mqttConstants;     /**      * MQTT服务      */     @Autowired     private MqttService mqttService;     /**      * MQTT回调      */     @Autowired     private MqttCallback mqttCallback;      /**      * MQTT连接      * @author LiChangRui on 2024/4/22 9:51      */     @PostConstruct     public void mqttClient() {         MqttConnectOptions options = new MqttConnectOptions();         // 用户名         options.setUserName(mqttConstants.getUsername());         // 密码         options.setPassword(mqttConstants.getPassword().toCharArray());         // 设置连接超时时间         options.setConnectionTimeout(mqttConstants.getConnectionTimeout());         // 开启自动重连         options.setAutomaticReconnect(mqttConstants.getAutomaticReconnect());         // 是否清除会话session         options.setCleanSession(mqttConstants.getCleanSession());         // 设置心跳间隔时间         options.setKeepAliveInterval(mqttConstants.getKeepAliveInterval());         mqttService.connect(mqttConstants.getHost(), mqttConstants.getClientId(), mqttCallback, options);     } } 

6、测试类

package net.rakan.distributedservice.testserver.controller;  import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import net.rakan.distributedservice.common.dto.CancelOrderDTO; import net.rakan.distributedservice.common.dto.PublishDTO; import net.rakan.distributedservice.common.dto.SendMailDTO; import net.rakan.distributedservice.common.service.MqttService; import net.rakan.distributedservice.common.service.RabbitMqService; import net.rakan.distributedservice.common.vo.Result; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController;  /**  * 测试  * @author LiChangRui on 2024/3/15 10:14  */ @Tag(name = "测试") @RestController @RequestMapping("/test") public class TestController {      @Autowired     private MqttService mqttService;      /**      * 推送消息      * @author LiChangRui on 2024/3/15 13:59      */     @Operation(summary = "推送消息")     @PostMapping("/publish")     public Result publish(@RequestBody @Validated PublishDTO dto) {          mqttService.publish(dto.getTopic(), dto.getMsg());          return Result.ok();     }  }  

在这里插入图片描述
控制台消息:
在这里插入图片描述

总结

如果您发现错误,还望及时提醒,共同进步。

相关内容

热门资讯

专业讨论!德扑之星真破解套路(... 专业讨论!德扑之星真破解套路(辅助挂)软件透明挂(有挂了解)-哔哩哔哩;人气非常高,ai更新快且高清...
每日必看!智星德州菠萝外挂检测... 每日必看!智星德州菠萝外挂检测(辅助挂)软件透明挂(有挂教学)-哔哩哔哩1、玩家可以在智星德州菠萝外...
透视透明挂!轰趴十三水有后台(... 轰趴十三水有后台赢率提升策略‌;透视透明挂!轰趴十三水有后台(辅助挂)软件透明挂(有挂详情)-哔哩哔...
发现玩家!德扑ai助手软件(辅... 发现玩家!德扑ai助手软件(辅助挂)透视辅助(有挂教学)-哔哩哔哩;玩家在德扑ai助手软件中需先进行...
一分钟了解!x-poker辅助... 一分钟了解!x-poker辅助软件(辅助挂)辅助透视(有挂攻略)-哔哩哔哩1、每一步都需要思考,不同...
一分钟揭秘!德州最新辅助器(辅... 一分钟揭秘!德州最新辅助器(辅助挂)透视辅助(有挂攻略)-哔哩哔哩;德州最新辅助器最新版本免费下载安...
玩家攻略推荐!德州辅助(辅助挂... 玩家攻略推荐!德州辅助(辅助挂)辅助透视(有挂了解)-哔哩哔哩是由北京得德州辅助黑科技有限公司精心研...
揭秘真相!pokernow德州... 《揭秘真相!pokernow德州(辅助挂)辅助透视(有挂介绍)-哔哩哔哩》 pokernow德州软件...
五分钟了解!德州之星辅助器(辅... 五分钟了解!德州之星辅助器(辅助挂)辅助透视(有挂透明)-哔哩哔哩1、很好的工具软件,可以解锁游戏的...
推荐一款!pokermaste... 1、推荐一款!pokermaster有外挂(辅助挂)透视辅助(有挂教学)-哔哩哔哩;详细教程。2、p...