项目实战--SpringBoot整合RabbitMQ:实现邮件大批量异步推送
创始人
2025-01-07 04:33:39
0

项目实战--SpringBoot整合RabbitMQ:实现邮件大批量异步推送

  • 一、背景
  • 二、实现方案
    • 2.1 环境:
    • 2.2 方案
  • 三、前期工作
  • 四、具体实现
    • 4.1 创建spring boot项目
    • 4.2 配置rabbitMQ、mail
    • 4.3 RabbitConfig配置类
    • 4.4 Mail 邮件实体类
    • 4.5 Mail SendMailUtil邮件发送类
    • 4.6 ProduceServiceImpl 生产者类
    • 4.7 ConsumerMailService 消费者类
    • 4.8 TestController 控制层类
  • 五、测试
  • 六、消息发送失败处理
    • 6.1 创建消息投递日志表
    • 6.2 编写 MsgLog 相关服务类
    • 6.3 改写服务逻辑
    • 6.4 使用定数任务对消息投递失败进行补偿

一、背景

由于项目前期使用SpringBoot 整合 mail 实现各类邮件的自动推送服务,但是这个服务越来越不稳定,出现网络异常的时候,会导致邮件推送失败造成堆积,同时业务需要大批量的同步推送邮件,可靠性也不高。故采用RabbitMQ 消息队列来实现邮件 100% 被投递,内容涵盖 RabbitMQ 很多知识点:

生产者和消费者模型 消息发送确认机制 消费确认机制 消息的重新投递 消费幂等性 ...... 

RabbitMQ 处理流程图:
在这里插入图片描述

二、实现方案

2.1 环境:

springboot版本:2.1.5.RELEASE
RabbitMQ版本:3.6.5
SendMailUtil:发送邮件工具类
ProduceServiceImpl:生产者,发送消息
ConsumerMailService:消费者,消费消息,发送邮件

2.2 方案

1.搭建一台 Linux 服务器,并安装 RabbitMQ
2.开放 QQ 邮箱或者其它邮箱授权码,用于发送邮件
3.创建邮件发送项目并编写代码
4.发送邮件测试
5.消息发送失败处理

三、前期工作

获取邮箱授权码的目的,主要是为了通过代码进行发送邮件,例如 QQ 邮箱授权码获取方式,如下图:
在这里插入图片描述
点击开启按钮,然后发送短信,即可获取授权码,该授权码就是配置文件spring.mail.password需要的密码:
在这里插入图片描述

四、具体实现

4.1 创建spring boot项目

名为smail的 Springboot 项目,pom文件中加入amqp和mail:

                   org.springframework.boot         spring-boot-starter                        org.springframework.boot         spring-boot-starter-test         test                        org.springframework.boot         spring-boot-starter-web                        org.springframework.boot         spring-boot-devtools         true                        org.springframework.boot         spring-boot-starter-mail                        org.springframework.boot         spring-boot-starter-amqp                        org.apache.commons         commons-lang3         3.4                        org.projectlombok         lombok         1.16.10       

4.2 配置rabbitMQ、mail

在application.properties文件中,配置amqp和mail,其中,spring.mail.password前面获取的授权码,同时username和from要一致:

#rabbitmq spring.rabbitmq.host=192.168.0.103 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 开启confirms回调 P -> Exchange spring.rabbitmq.publisher-confirms=true # 开启returnedMessage回调 Exchange -> Queue spring.rabbitmq.publisher-returns=true # 设置手动确认(ack) Queue -> C spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=100  # mail spring.mail.default-encoding=UTF-8 spring.mail.host=smtp.qq.com spring.mail.username=xxxx@qq.com spring.mail.password=获取的邮箱授权码 spring.mail.from=xxxx@qq.com spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true 

4.3 RabbitConfig配置类

@Configuration @Slf4j public class RabbitConfig {      // 发送邮件     public static final String MAIL_QUEUE_NAME = "mail.queue";     public static final String MAIL_EXCHANGE_NAME = "mail.exchange";     public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";      @Autowired     private CachingConnectionFactory connectionFactory;      @Bean     public RabbitTemplate rabbitTemplate() {         RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);         rabbitTemplate.setMessageConverter(converter());          // 消息是否成功发送到Exchange         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {             if (ack) {                 log.info("消息成功发送到Exchange");             } else {                 log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);             }         });          // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调         rabbitTemplate.setMandatory(true);         // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法         rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {             log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);         });          return rabbitTemplate;     }      @Bean     public Jackson2JsonMessageConverter converter() {         return new Jackson2JsonMessageConverter();     }      @Bean     public Queue mailQueue() {         return new Queue(MAIL_QUEUE_NAME, true);     }      @Bean     public DirectExchange mailExchange() {         return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);     }      @Bean     public Binding mailBinding() {         return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);     } } 

4.4 Mail 邮件实体类

@Getter @Setter @NoArgsConstructor @AllArgsConstructor public class Mail {      @Pattern(regexp = "^([a-z0-9A-Z]+[-|\\.]?)+[a-z0-9A-Z]@([a-z0-9A-Z]+(-[a-z0-9A-Z]+)?\\.)+[a-zA-Z]{2,}$", message = "邮箱格式不正确")     private String to;      @NotBlank(message = "标题不能为空")     private String title;      @NotBlank(message = "正文不能为空")     private String content;      private String msgId;// 消息id } 

4.5 Mail SendMailUtil邮件发送类

@Component @Slf4j public class SendMailUtil {      @Value("${spring.mail.from}")     private String from;      @Autowired     private JavaMailSender mailSender;      /**      * 发送简单邮件      *      * @param mail      */     public boolean send(Mail mail) {         String to = mail.getTo();// 目标邮箱         String title = mail.getTitle();// 邮件标题         String content = mail.getContent();// 邮件正文          SimpleMailMessage message = new SimpleMailMessage();         message.setFrom(from);         message.setTo(to);         message.setSubject(title);         message.setText(content);          try {             mailSender.send(message);             log.info("邮件发送成功");             return true;         } catch (MailException e) {             log.error("邮件发送失败, to: {}, title: {}", to, title, e);             return false;         }     } } 

4.6 ProduceServiceImpl 生产者类

@Service public class ProduceServiceImpl implements ProduceService {      @Autowired     private RabbitTemplate rabbitTemplate;      @Override     public boolean send(Mail mail) {         //创建uuid         String msgId = UUID.randomUUID().toString().replaceAll("-", "");         mail.setMsgId(msgId);                  //发送消息到rabbitMQ         CorrelationData correlationData = new CorrelationData(msgId);         rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail), correlationData);          return true;     } } 

4.7 ConsumerMailService 消费者类

@Component @Slf4j public class ConsumerMailService {      @Autowired     private SendMailUtil sendMailUtil;      @RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)     public void consume(Message message, Channel channel) throws IOException {         //将消息转化为对象         String str = new String(message.getBody());         Mail mail = JsonUtil.strToObj(str, Mail.class);         log.info("收到消息: {}", mail.toString());          MessageProperties properties = message.getMessageProperties();         long tag = properties.getDeliveryTag();          boolean success = sendMailUtil.send(mail);         if (success) {             channel.basicAck(tag, false);// 消费确认         } else {             channel.basicNack(tag, false, true);         }     } } 

4.8 TestController 控制层类

@RestController @RequestMapping("/test") @Slf4j public class TestController {      @Autowired     private ProduceService testService;      @PostMapping("send")     public boolean sendMail(Mail mail) {         return testService.send(mail);     } } 

五、测试

启动 SpringBoot 服务之后,模拟请求接口。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

六、消息发送失败处理

若 rabbitMQ 突然崩溃、邮件发送失败、重启 rabbitMQ 服务器出现消息重复消费,的怎处理。

6.1 创建消息投递日志表

CREATE TABLE `msg_log` (   `msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识',   `msg` text COMMENT '消息体, json格式化',   `exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',   `routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',   `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',   `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',   `next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间',   `create_time` datetime DEFAULT NULL COMMENT '创建时间',   `update_time` datetime DEFAULT NULL COMMENT '更新时间',   PRIMARY KEY (`msg_id`),   UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志'; 

6.2 编写 MsgLog 相关服务类

public interface MsgLogService {      /**      * 插入消息日志      * @param msgLog      */     void insert(MsgLog msgLog);      /**      * 更新消息状态      * @param msgId      * @param status      */     void updateStatus(String msgId, Integer status);      /**      * 查询消息      * @param msgId      * @return      */     MsgLog selectByMsgId(String msgId); } 

6.3 改写服务逻辑

生产服务类中,新增数据写入:
在这里插入图片描述
在RabbitConfig服务配置,当消息发送成功之后,新增更新消息状态逻辑:
在这里插入图片描述
消费者ConsumerMailService,每次消费的时候,从数据库中查询,如果消息已经被消费,不用再重复发送数据:
在这里插入图片描述

6.4 使用定数任务对消息投递失败进行补偿

当 rabbitMQ 服务器突然挂掉之后,生成者就无法正常进行投递数据,此时因为消息已经被记录到数据库,因此我们可以利用定数任务查询出没有投递成功的消息,进行补偿投递:
在这里插入图片描述
利用定数任务,对投递失败的消息进行补偿投递,基本可以保证消息 100% 消费成功

相关内容

热门资讯

我来教大家(wpkai辅助)w... 我来教大家(wpkai辅助)wepoke辅助机器人(透视辅助)其实真的有挂我来教大家(wpkai辅助...
技术分享(德州ai辅助工具)a... 技术分享(德州ai辅助工具)aapoker辅助工具存在(透视)其实真的有挂;暗藏猫腻,小编详细说明a...
分享给玩家(wpk有没有作弊)... 分享给玩家(wpk有没有作弊)wepoke辅助挂(透视辅助)其实真的有挂;1、不需要AI权限,帮助你...
玩家亲测(众合推扑克辅助器)德... 玩家亲测(众合推扑克辅助器)德扑起手牌胜率图(透视)其实真的有挂;1、游戏颠覆性的策略玩法,独创攻略...
必看攻略(wepoke有辅助吗... 必看攻略(wepoke有辅助吗)智星德州菠萝外挂(透视辅助)其实真的有挂1、智星德州菠萝外挂系统规律...
热门推荐(aa扑克不能用模拟器... 热门推荐(aa扑克不能用模拟器)aapoker透视辅助(透视)其实真的有挂aapoker透视辅助是一...
来一盘(智星德州菠萝安全吗)a... 来一盘(智星德州菠萝安全吗)aapoker透明挂(透视)其实真的有挂;所有人都在同一条线上,像星星一...
技术分享(wpk德州透视辅助)... 技术分享(wpk德州透视辅助)wpk发牌逻辑(透视)原来真的有挂1、wpk德州透视辅助系统规律教程、...
我来分享(德州wpk到底有没有... 我来分享(德州wpk到底有没有外挂)德州之星有辅助挂(辅助)原来真的有挂1、玩家可以在德州之星有辅助...
重大发现(aapoker下载教... 重大发现(aapoker下载教程)德州之星插件(透明黑科技)其实真的有挂运德州之星插件辅助工具,进入...