官网地址:
https://www.emqx.io/downloads
进入官网后下载window压缩包
解压zip文件,修改白名单。在etc目录下找到acl.conf文件。
在回到上一级目录,进入bin目录,在终端输入pushd + bin目录地址,例:
pushd D:\Environment\emqx\bin
启动服务,终端输入:
emqx.cmd start
打开浏览器输入:
localhost:18083
出现如下界面便是EMQX服务已经启动。
EMQX初始的用户名:admin 密码:public 。登录后设置中文页面。
官网地址:
https://mqttx.app/zh/downloads
安装过于简单,自行下载安装。
添加一个连接
刷新管理端页面
org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-stream org.springframework.integration spring-integration-mqtt
# MQTT配置 mqtt: # emqx服务器地址 host: tcp://127.0.0.1:1883 # 用户名 username: ces # 密码 password: ces1 # 客户端id clientId: service_${random.uuid} # 定义连接超时时间 默认为10秒 如果未在属性中指定 则使用默认值 connectionTimeout: 10 # 消息服务质量 qos: 2 # 连接保持检查周期 秒 keepAliveInterval: 20 # 开启自动重连 automaticReconnect: true # 是否清除会话session cleanSession: true # 默认订阅主题 defaultSubscribeTopic: washingMachine/online/+,airConditioner/online/+ # 是否保留发布消息 retained: false
package net.rakan.distributedservice.common.constant; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; /** * MQTT常量 * @author LiChangRui on 2024/4/21 16:53 */ @Data @Slf4j @Configuration @ConfigurationProperties(prefix = "mqtt") public class MqttConstants { /** * emqx服务器地址 */ private String host; /** * 用户名 */ private String username; /** * 密码 */ private String password; /** * 客户端id */ private String clientId; /** * 定义连接超时时间 默认为10秒 如果未在属性中指定 则使用默认值 */ private int connectionTimeout; /** * 消息服务质量 */ private int qos; /** * 连接保持检查周期 秒 */ private int keepAliveInterval; /** * 开启自动重连 */ private Boolean automaticReconnect; /** * 是否清除会话session */ private Boolean cleanSession; /** * 默认订阅主题 */ private String defaultSubscribeTopic; /** * 是否保留发布消息 */ private Boolean retained; }
package net.rakan.distributedservice.common.service; import lombok.extern.slf4j.Slf4j; import net.rakan.distributedservice.common.callback.MqttCallback; import net.rakan.distributedservice.common.constant.MqttConstants; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Arrays; /** * MQTT服务 * @author LiChangRui on 2024/4/21 17:03 */ @Slf4j @Component public class MqttService { /** * MQTT常量 */ @Autowired private MqttConstants mqttConstants; /** * MQTT连接 */ private MqttClient mqttClient; private void setMqttClient(MqttClient mqttClient) { this.mqttClient = mqttClient; } /** * 推送消息 * @author LiChangRui on 2024/4/21 18:02 */ public void publish(String topic, String msg) { try { mqttClient.publish(topic, msg.getBytes(), mqttConstants.getQos(), mqttConstants.getRetained()); } catch (MqttException e) { log.error("MQTT推送消息失败!"); } } /** * 订阅消息 * @author LiChangRui on 2024/4/21 21:54 */ public void subscribe(String topic) { log.info("开始订阅主题:" + topic + "。"); try { mqttClient.subscribe(topic, mqttConstants.getQos()); } catch (MqttException e) { log.error("MQTT订阅主题失败!"); } } /** * 订阅消息 * @author LiChangRui on 2024/4/21 21:54 */ public void subscribe(String[] topic) { int[] qos = new int[topic.length]; Arrays.fill(qos, mqttConstants.getQos()); log.info("开始订阅主题:" + String.join(",", topic) + "。"); try { mqttClient.subscribe(topic, qos); } catch (MqttException e) { log.error("MQTT订阅主题失败!"); } } /** * MQTT连接 * @author LiChangRui on 2024/4/22 9:48 */ public void connect(String host, String clientId, MqttCallback mqttCallback, MqttConnectOptions options) { MqttClient mqttClient; try { // 设置连接参数 mqttClient = new MqttClient(host, clientId); // 设置回调 mqttClient.setCallback(mqttCallback); // 连接 mqttClient.connect(options); } catch (MqttException e) { log.error("连接失败:" + e.getMessage() + "!"); return; } this.setMqttClient(mqttClient); log.info("连接成功!"); } }
package net.rakan.distributedservice.common.callback; import lombok.extern.slf4j.Slf4j; import net.rakan.distributedservice.common.constant.MqttConstants; import net.rakan.distributedservice.common.service.MqttService; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; /** * MQTT回调 * @author LiChangRui on 2024/4/21 17:03 */ @Slf4j @Configuration public class MqttCallback implements MqttCallbackExtended { /** * MQTT常量 */ @Autowired private MqttConstants mqttConstants; /** * MQTT服务 */ @Autowired private MqttService mqttService; /** * 客户端断开后触发 * @author LiChangRui on 2024/4/21 18:19 */ @Override public void connectionLost(Throwable throwable) { log.info("客户端连接断开!"); // 已经设置断线重新连接 所以这里不用写重连的逻辑 只需要写断开连接的业务逻辑 } /** * 客户端收到消息触发 * @author LiChangRui on 2024/4/21 18:21 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) { log.info("接收消息主题 : " + topic); log.info("接收消息Qos : " + mqttMessage.getQos()); log.info("接收消息内容 : " + new String(mqttMessage.getPayload())); } /** * 发布消息成功 * @author LiChangRui on 2024/4/21 18:23 */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { String[] topics = iMqttDeliveryToken.getTopics(); for (String topic : topics) { log.info("向主题:" + topic + "发送消息成功!"); } // try { // // 在消息被传递之前,正在传递的消息将被返回。一旦消息被传递,将返回null // MqttMessage message = iMqttDeliveryToken.getMessage(); // byte[] payload = message.getPayload(); // String s = new String(payload, StandardCharsets.UTF_8); // log.info("消息的内容是:" + s + "。"); // } catch (MqttException e) { // log.error("获取发送消息内容失败!"); // } } /** * 客户端连接成功 * @author LiChangRui on 2024/4/21 18:23 */ @Override public void connectComplete(boolean reconnect, String serverURI) { log.info("客户端连接成功!"); String[] topic = mqttConstants.getDefaultSubscribeTopic().split(","); mqttService.subscribe(topic); } }
package net.rakan.distributedservice.common.config; import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import net.rakan.distributedservice.common.callback.MqttCallback; import net.rakan.distributedservice.common.constant.MqttConstants; import net.rakan.distributedservice.common.service.MqttService; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; /** * MQTT配置 * @author LiChangRui on 2024/4/21 22:26 */ @Slf4j @Configuration public class MqttConfig { /** * MQTT常量 */ @Autowired private MqttConstants mqttConstants; /** * MQTT服务 */ @Autowired private MqttService mqttService; /** * MQTT回调 */ @Autowired private MqttCallback mqttCallback; /** * MQTT连接 * @author LiChangRui on 2024/4/22 9:51 */ @PostConstruct public void mqttClient() { MqttConnectOptions options = new MqttConnectOptions(); // 用户名 options.setUserName(mqttConstants.getUsername()); // 密码 options.setPassword(mqttConstants.getPassword().toCharArray()); // 设置连接超时时间 options.setConnectionTimeout(mqttConstants.getConnectionTimeout()); // 开启自动重连 options.setAutomaticReconnect(mqttConstants.getAutomaticReconnect()); // 是否清除会话session options.setCleanSession(mqttConstants.getCleanSession()); // 设置心跳间隔时间 options.setKeepAliveInterval(mqttConstants.getKeepAliveInterval()); mqttService.connect(mqttConstants.getHost(), mqttConstants.getClientId(), mqttCallback, options); } }
package net.rakan.distributedservice.testserver.controller; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; import net.rakan.distributedservice.common.dto.CancelOrderDTO; import net.rakan.distributedservice.common.dto.PublishDTO; import net.rakan.distributedservice.common.dto.SendMailDTO; import net.rakan.distributedservice.common.service.MqttService; import net.rakan.distributedservice.common.service.RabbitMqService; import net.rakan.distributedservice.common.vo.Result; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 测试 * @author LiChangRui on 2024/3/15 10:14 */ @Tag(name = "测试") @RestController @RequestMapping("/test") public class TestController { @Autowired private MqttService mqttService; /** * 推送消息 * @author LiChangRui on 2024/3/15 13:59 */ @Operation(summary = "推送消息") @PostMapping("/publish") public Result> publish(@RequestBody @Validated PublishDTO dto) { mqttService.publish(dto.getTopic(), dto.getMsg()); return Result.ok(); } }
控制台消息:
如果您发现错误,还望及时提醒,共同进步。