Springboot整合物联网IOT的MQTT协议
创始人
2024-12-13 17:06:01
0

准备工作 (下载EMQX服务端,相关客户端工具)

1. 服务端工具:

https://www.emqx.io/downloads?os=Windows

2. 客户端工具:

https://mqttx.app/zh

                       org.springframework.boot             spring-boot-starter-web                               org.springframework.boot             spring-boot-starter-integration                                        org.springframework.integration             spring-integration-stream                               org.springframework.integration             spring-integration-mqtt                               org.projectlombok             lombok          

自定义yml配置

server:   port: 8989 #mqtt properties mqtt:   #uris 可以有多个 所以是个数组   uris:     - tcp://127.0.0.1:1883   clientId: mqtt_test1   topics:     - demo     - test   username: admin   password: 123456   timeout: 30   keepalive: 60   qos: 1 

增加config配置读取yml文件 (使用了Lombok 需要自行添加pom依赖)

package com.huawen.mqtt.config;  import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component;  /**  * @author:xjl  * @date:2022/5/5 17:27  * @Description: MQTT的配置类  **/ @Component @ConfigurationProperties(prefix = "mqtt") @Data public class MqttConfiguration {      /**      * uris 服务器地址配置      */     private String[] uris;      /**      * clientId      */     private String clientId;      /**      * 话题      */     private String[] topics;      /**      * 用户名      */     private String username;      /**      * 密码      */     private String password;      /**      * 连接超时时长      */     private Integer timeout;      /**      * keep Alive时间      */     private Integer keepalive;      /**      * 遗嘱消息 QoS      */     private Integer qos; }  

消费者配置

package com.huawen.mqtt.config;  import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler;  import javax.annotation.Resource;  /**  * @author:xjl  * @date:2022/5/6 9:06  * @Description: MQTT 消费端的配置  **/ @Configuration @Slf4j public class MqttInBoundConfiguration {     @Resource     private MqttConfiguration mqttProperties;      //==================================== 消费消息==========================================//      /**      * 入站通道      *      * @return 消息通道对象 {@link MessageChannel}      */     @Bean("input")     public MessageChannel mqttInputChannel() {         //直连通道         return new DirectChannel();     }       /**      * 创建MqttPahoClientFactory 设置MQTT的broker的连接属性 如果使用ssl验证 也需要此处设置      *      * @return MQTT客户端工厂 {@link MqttPahoClientFactory}      */     @Bean     public MqttPahoClientFactory inClientFactory() {         //设置连接属性         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();         MqttConnectOptions options = new MqttConnectOptions();         options.setServerURIs(mqttProperties.getUris());         options.setUserName(mqttProperties.getUsername());         options.setPassword(mqttProperties.getPassword().toCharArray());         options.setConnectionTimeout(mqttProperties.getTimeout());         options.setKeepAliveInterval(mqttProperties.getKeepalive());         // 接受离线消息  告诉代理客户端是否要建立持久会话   false为建立持久会话         options.setCleanSession(false);         //设置断开后重新连接         options.setAutomaticReconnect(true);         factory.setConnectionOptions(options);         return factory;     }       /**      * 入站      *      * @return 消息提供者 {@link MessageProducer}      */     @Bean     public MessageProducer producer() {         // Paho客户端消息驱动通道适配器,主要用来订阅主题  对inboundTopics主题进行监听         //clientId 加后缀 不然会报retrying 不能重复         MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_customer", inClientFactory(), mqttProperties.getTopics());         adapter.setCompletionTimeout(5000);         // Paho消息转换器         DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();         // 按字节接收消息         // defaultPahoMessageConverter.setPayloadAsBytes(true);         adapter.setConverter(defaultPahoMessageConverter);         // 设置QoS         adapter.setQos(mqttProperties.getQos());         adapter.setOutputChannel(mqttInputChannel());         return adapter;     }      /**      * 通过通道获取数据      * ServiceActivator注解表明:当前方法用于处理MQTT消息,inputChannel参数指定了用于消费消息的channel。      * tips:      * 异步处理      *      * @return 消息处理 {@link MessageHandler}      */     @Bean     @ServiceActivator(inputChannel = "input")     public MessageHandler handler() {         return message -> {             log.info("收到的完整消息为--->{}", message);             log.info("----------------------");             log.info("message:" + message.getPayload());             log.info("Id:" + message.getHeaders().getId());             log.info("receivedQos:" + message.getHeaders().get(MqttHeaders.RECEIVED_QOS));             String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);             log.info("topic:" + topic);             log.info("----------------------");         };     } }  

生产者配置

package com.huawen.mqtt.controller;  import com.huawen.mqtt.bean.MyMessage; import com.huawen.mqtt.inter.MqttGateway; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController;  import javax.annotation.Resource;  /**  * @author:xjl  * @date:2022/5/6 9:17  * @Description: mqtt发布消息controller  **/ @RestController public class MqttPublishController {     @Resource     private MqttGateway mqttGateWay;      @PostMapping("/send")     public String send(@RequestBody MyMessage myMessage) {         // 发送消息到指定主题         mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());         return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();     } }  

创建一个通用接口 用于发送数据

package com.huawen.mqtt.inter;  import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header;  /**  * @author:xjl  * @date:2022/5/6 9:20  * @Description: 接口MqttGateway  **/ @MessagingGateway(defaultRequestChannel = "out") public interface MqttGateway {     /**      * 定义重载方法,用于消息发送      *      * @param payload 负载      */     void sendToMqtt(String payload);      /**      * 指定topic进行消息发送      *      * @param topic   topic话题      * @param payload 负载      */     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);      /**      * 指定topic和qos进行消息发送      *      * @param topic   topic话题      * @param qos     qos      * @param payload 负载 (字符串类型)      */     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);      /**      * 指定topic和qos进行消息发送      *      * @param topic   topic话题      * @param qos     qos      * @param payload 负载 (字节数组类型)      */     void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); }  

生产者测试controller

package com.huawen.mqtt.controller;  import com.huawen.mqtt.bean.MyMessage; import com.huawen.mqtt.inter.MqttGateway; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController;  import javax.annotation.Resource;  /**  * @author:xjl  * @date:2022/5/6 9:17  * @Description: mqtt发布消息controller  **/ @RestController public class MqttPublishController {     @Resource     private MqttGateway mqttGateWay;      @PostMapping("/send")     public String send(@RequestBody MyMessage myMessage) {         // 发送消息到指定主题         mqttGateWay.sendToMqtt(myMessage.getTopic(), 1, myMessage.getContent());         return "send topic: " + myMessage.getTopic() + ", message : " + myMessage.getContent();     } }  

该文章参考 https://blog.csdn.net/m0_46689235/article/details/124606005

相关内容

热门资讯

aapoker外挂!aapok... aapoker外挂!aapoker插件,(AAPOkER)真是是真的有挂(详细辅助曝光教程)aapo...
透视工具(WEPOKE)透明挂... 透视工具(WEPOKE)透明挂辅助app(WePoKe外挂)原本真的是有挂(详细透视黑科技教程),支...
aapoker辅助!aapok... aapoker辅助!aapoke辅助工具存在吗,(AaPOKER)真是是真的有挂(详细辅助解密教程)...
aapoker外挂!aapok... aapoker外挂!aapoker是谁开发的,(AAPoKER)果然存在有挂(详细辅助力荐教程)aa...
透视模拟器(来玩德州app)智... 透视模拟器(来玩德州app)智星德州菠萝有挂吗(透视)竟然存在有挂(详细辅助揭秘攻略)1、上手简单,...
aapoker猫腻!aa扑克发... aapoker猫腻!aa扑克发牌机制,(aa poker)一直有挂(详细辅助软件教程)1、aapok...
透视辅助(wEPoke)外挂透... 透视辅助(wEPoke)外挂透明挂辅助代打(wepoke辅助德之星)总是存在有挂(详细透视2025新...
透视脚本(智星德州)德扑之星a... 透视脚本(智星德州)德扑之星ai代打(透视)确实存在有挂(详细辅助揭秘教程)暗藏猫腻,小编详细说明德...
aapoker辅助!aapok... aapoker辅助!aapoker线上规律,(aaPoker)竟然是有挂(详细辅助力荐教程)1、起透...
透视科技(wePOke)透明挂... 透视科技(wePOke)透明挂辅助代打(wepokeai辅助)从来真的有挂(详细透视大神讲解);支持...