本篇是在做BI项目时尝试引入MQ来优化项目时发现,MQ引入到项目中做法比较类似。变的只是谁发消息给谁,谁去监听消息。至于MQ的可靠性(比如生产者可靠性、消费者可靠性、消息可靠性)都是通过固定参数进行配置。
因此将引入MQ到BI项目的过程抽象出来,变成通用的方法~
砍掉耗时久的业务,缩短单个业务时间,监听者异步执行耗时久任务
做完之后,你可以完成springboot项目中MQ基本配置,并且MQ具有一定可靠性~
spring: rabbitmq: # 连接信息 host: xxx # 你的IP地址 port: 5672 # 用户相关信息建议在rabbitMQ中设置好 virtual-host: /bi # 可在UI界面创建独属项目的虚拟机与用户名 username: xxx # 用户名 password: xxx # 密码 connection-timeout: 200ms # max waited time # 生产者(消息发送者) # 生产者确认机制 - 默认取消,消耗性能 publisher-confirm-type: none publisher-returns: false template: # 生产者重连机制 retry: enabled: true initial-interval: 1000ms multiplier: 1 max-attempts: 3 # 消费者(监听者) listener: simple: prefetch: 1 # (能者多劳)每次只能获取一条信息,处理完才能获取下一条 acknowledge-mode: auto # 消费者确认 - 自动模式 retry: enabled: true # 失败消息处理策略
失败者消息处理策略实现 - 消息发到error交换机 /** * 失败者消息处理策略实现 */ @Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){ return new RepublishMessageRecoverer(rabbitTemplate, BI_ERROR_EXCHANGE, BI_ERROR_ROUTING_KEY); }
@Configuration public class ErrorConfiguration { @Bean public Queue errorQueue() { return QueueBuilder.durable(BI_ERROR_QUEUE).build(); } @Bean public DirectExchange errorExchange() { return ExchangeBuilder.directExchange(BI_ERROR_EXCHANGE).build(); } @Bean public Binding errorBinding() { return BindingBuilder.bind(errorQueue()).to(errorExchange()).with(BI_ERROR_ROUTING_KEY); } }
直接在@RabbitListener注解中定义并绑定 @RabbitListener(bindings = @QueueBinding( // 队列: // name - 队列名字 // durable - 队列持久化,不会随着MQ关闭而消失 // arguments:使队列为Lazy queue将消息尽快写入磁盘 value = @Queue( name = BI_QUEUE_NAME, durable = "true", arguments = @Argument(name = "x-queue-mode", value = "lazy")), // 交换机:指定交换机的名字与类型(默认direct) exchange = @Exchange(name = BI_EXCHANGE_NAME, type = ExchangeTypes.DIRECT), // 按交换机类型(Direct、Topic),设置Key key = BI_ROUTING_KEY )) public void receiveMessage(String msg) {
b. JSON消息转换器(替换掉原生的JDK) /** * 消息转换器 * @return */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }
@RabbitListener(bindings = @QueueBinding( // 队列: // name - 队列名字 // durable - 队列持久化,不会随着MQ关闭而消失 // arguments:使队列为Lazy queue将消息尽快写入磁盘 value = @Queue( name = BI_QUEUE_NAME, durable = "true", arguments = @Argument(name = "x-queue-mode", value ="lazy")), // 交换机:指定交换机的名字与类型(默认direct) exchange = @Exchange(name = BI_EXCHANGE_NAME, type = ExchangeTypes.DIRECT), // 按交换机类型(Direct、Topic),设置Key key = BI_ROUTING_KEY )) public void receiveMessage(Long chatId) { // 0. 业务幂等性判断 - 基于乐观锁改造 boolean update = chartService.lambdaUpdate() .set(Chart::getStatus, RUNNING_STATUS) .eq(Chart::getId, chatId) .eq(Chart::getStatus, WAIT_STATUS) .update(); if (!update) { handleChartUpdateError(chatId, "该图表正在生成中!请耐心等待"); return; }
当然,还可以对其进行拓展,比如对error队列进行监听,针对错误消息进行特殊业务处理等等~在Spring Boot项目中快速集成RabbitMQ以增强系统性能和可靠性,主要涉及以下几个关键步骤:
application.yml
中配置RabbitMQ服务器的连接细节,包括主机、端口、虚拟主机、用户名、密码及连接超时时间。application.yml
中设置RabbitMQ相关参数,包括连接信息、生产者和消费者的配置。@RabbitListener
注解中定义队列、交换机和绑定关系。通过上述步骤,可以实现在Spring Boot项目中高效、可靠地使用RabbitMQ作为消息中间件,优化业务流程并提升系统整体性能。