Springboot整合物联网IOT的MQTT协议
创始人
2024-12-13 17:06:01
0

准备工作 (下载EMQX服务端,相关客户端工具)

1. 服务端工具:

https://www.emqx.io/downloads?os=Windows

2. 客户端工具:

https://mqttx.app/zh

                       org.springframework.boot             spring-boot-starter-web                               org.springframework.boot             spring-boot-starter-integration                                        org.springframework.integration             spring-integration-stream                               org.springframework.integration             spring-integration-mqtt                               org.projectlombok             lombok          

自定义yml配置

server:   port: 8989 #mqtt properties mqtt:   #uris 可以有多个 所以是个数组   uris:     - tcp://127.0.0.1:1883   clientId: mqtt_test1   topics:     - demo     - test   username: admin   password: 123456   timeout: 30   keepalive: 60   qos: 1 

增加config配置读取yml文件 (使用了Lombok 需要自行添加pom依赖)

package com.huawen.mqtt.config;  import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component;  /**  * @author:xjl  * @date:2022/5/5 17:27  * @Description: MQTT的配置类  **/ @Component @ConfigurationProperties(prefix = "mqtt") @Data public class MqttConfiguration {      /**      * uris 服务器地址配置      */     private String[] uris;      /**      * clientId      */     private String clientId;      /**      * 话题      */     private String[] topics;      /**      * 用户名      */     private String username;      /**      * 密码      */     private String password;      /**      * 连接超时时长      */     private Integer timeout;      /**      * keep Alive时间      */     private Integer keepalive;      /**      * 遗嘱消息 QoS      */     private Integer qos; }  

消费者配置

package com.huawen.mqtt.config;  import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler;  import javax.annotation.Resource;  /**  * @author:xjl  * @date:2022/5/6 9:06  * @Description: MQTT 消费端的配置  **/ @Configuration @Slf4j public class MqttInBoundConfiguration {     @Resource     private MqttConfiguration mqttProperties;      //==================================== 消费消息==========================================//      /**      * 入站通道      *      * @return 消息通道对象 {@link MessageChannel}      */     @Bean("input")     public MessageChannel mqttInputChannel() {         //直连通道         return new DirectChannel();     }       /**      * 创建MqttPahoClientFactory 设置MQTT的broker的连接属性 如果使用ssl验证 也需要此处设置      *      * @return MQTT客户端工厂 {@link MqttPahoClientFactory}      */     @Bean     public MqttPahoClientFactory inClientFactory() {         //设置连接属性         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();         MqttConnectOptions options = new MqttConnectOptions();         options.setServerURIs(mqttProperties.getUris());         options.setUserName(mqttProperties.getUsername());         options.setPassword(mqttProperties.getPassword().toCharArray());         options.setConnectionTimeout(mqttProperties.getTimeout());         options.setKeepAliveInterval(mqttProperties.getKeepalive());         // 接受离线消息  告诉代理客户端是否要建立持久会话   false为建立持久会话         options.setCleanSession(false);         //设置断开后重新连接         options.setAutomaticReconnect(true);         factory.setConnectionOptions(options);         return factory;     }       /**      * 入站      *      * @return 消息提供者 {@link MessageProducer}      */     @Bean     public MessageProducer producer() {         // Paho客户端消息驱动通道适配器,主要用来订阅主题  对inboundTopics主题进行监听         //clientId 加后缀 不然会报retrying 不能重复         MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_customer", inClientFactory(), mqttProperties.getTopics());         adapter.setCompletionTimeout(5000);         // Paho消息转换器         DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();         // 按字节接收消息         // defaultPahoMessageConverter.setPayloadAsBytes(true);         adapter.setConverter(defaultPahoMessageConverter);         // 设置QoS         adapter.setQos(mqttProperties.getQos());         adapter.setOutputChannel(mqttInputChannel());         return adapter;     }      /**      * 通过通道获取数据      * ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。      * tips:      * 异步处理      *      * @return 消息处理 {@link MessageHandler}      */     @Bean     @ServiceActivator(inputChannel = "input")     public MessageHandler handler() {         return message -> {             log.info("收到的完整消息为--->{}", message);             log.info("----------------------");             log.info("message:" + message.getPayload());             log.info("Id:" + message.getHeaders().getId());             log.info("receivedQos:" + message.getHeaders().get(MqttHeaders.RECEIVED_QOS));             String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);             log.info("topic:" + topic);             log.info("----------------------");         };     } }  

生产者配置

package com.huawen.mqtt.controller;  import com.huawen.mqtt.bean.MyMessage; import com.huawen.mqtt.inter.MqttGateway; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController;  import javax.annotation.Resource;  /**  * @author:xjl  * @date:2022/5/6 9:17  * @Description: mqtt发布消息controller  **/ @RestController public class MqttPublishController {     @Resource     private MqttGateway mqttGateWay;      @PostMapping("/send")     public String send(@RequestBody MyMessage myMessage) {         // 发送消息到指定主题         mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());         return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();     } }  

创建一个通用接口 用于发送数据

package com.huawen.mqtt.inter;  import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header;  /**  * @author:xjl  * @date:2022/5/6 9:20  * @Description: 接口MqttGateway  **/ @MessagingGateway(defaultRequestChannel = "out") public interface MqttGateway {     /**      * 定义重载方法,用于消息发送      *      * @param payload 负载      */     void sendToMqtt(String payload);      /**      * 指定topic进行消息发送      *      * @param topic   topic话题      * @param payload 负载      */     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);      /**      * 指定topic和qos进行消息发送      *      * @param topic   topic话题      * @param qos     qos      * @param payload 负载 (字符串类型)      */     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);      /**      * 指定topic和qos进行消息发送      *      * @param topic   topic话题      * @param qos     qos      * @param payload 负载 (字节数组类型)      */     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); }  

生产者测试controller

package com.huawen.mqtt.controller;  import com.huawen.mqtt.bean.MyMessage; import com.huawen.mqtt.inter.MqttGateway; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController;  import javax.annotation.Resource;  /**  * @author:xjl  * @date:2022/5/6 9:17  * @Description: mqtt发布消息controller  **/ @RestController public class MqttPublishController {     @Resource     private MqttGateway mqttGateWay;      @PostMapping("/send")     public String send(@RequestBody MyMessage myMessage) {         // 发送消息到指定主题         mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());         return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();     } }  

该文章参考 https://blog.csdn.net/m0_46689235/article/details/124606005

相关内容

热门资讯

据统计!凑一桌游戏关春天破解透... 据统计!凑一桌游戏关春天破解透视版(辅助)一贯有挂辅助器(有挂透视)-哔哩哔哩1、实时凑一桌游戏关春...
经调查!阿拉斗牌辅助免费(辅助... 经调查!阿拉斗牌辅助免费(辅助)本来存在有辅助插件(有挂技术)-哔哩哔哩1、阿拉斗牌辅助免费透视辅助...
现有说明如下!广西微乐小程序脚... 现有说明如下!广西微乐小程序脚本(辅助)本来真的是有辅助神器(有挂详细)-哔哩哔哩广西微乐小程序脚本...
据公告内容!山西扣点子的技巧(... 据公告内容!山西扣点子的技巧(辅助)果然真的有辅助神器(有挂秘籍)-哔哩哔哩运山西扣点子的技巧辅助工...
据了解!破解大菠萝的辅助器(辅... 据了解!破解大菠萝的辅助器(辅助)一贯真的有辅助修改器(确实有挂)-哔哩哔哩1、实时破解大菠萝的辅助...
此事备受玩家关注!多乐游戏辅助... 此事备受玩家关注!多乐游戏辅助脚本下载(辅助)总是真的有辅助软件(真的有挂)-哔哩哔哩1、多乐游戏辅...
这一现象值得深思!多乐破解游戏... 这一现象值得深思!多乐破解游戏盒子破解版(辅助)一贯真的有辅助插件(有挂实锤)-哔哩哔哩1、多乐破解...
此事迅速冲上热搜!兴动休闲辅助... 此事迅速冲上热搜!兴动休闲辅助(辅助)好像是有辅助安装(有挂工具)-哔哩哔哩1、首先打开兴动休闲辅助...
现就发布提示!斗棋有bug吗(... 现就发布提示!斗棋有bug吗(辅助)原来有挂辅助平台(了解有挂)-哔哩哔哩1、每一步都需要思考,不同...
经调查!随意玩辅助软件(辅助)... 经调查!随意玩辅助软件(辅助)一贯存在有辅助安装(有挂详细)-哔哩哔哩1、打开软件启动之后找到中间准...