认识消息队列:Spring Boot 实现 MQ 消息队列,解锁异步、削峰、广播等高级功能!
创始人
2024-11-05 13:08:37
0

在现代微服务架构中,消息队列(Message Queue,MQ)扮演着至关重要的角色,它能够帮助我们实现异步处理、流量削峰、系统解耦等功能。本文将以 Spring Boot 为例,深入探讨如何利用 MQ 实现这些强大的特性,并结合实际场景,给出详细的代码实现和步骤说明。

一、项目准备

首先,我们需要创建一个 Spring Boot 项目,并引入相关依赖:

              org.springframework.boot         spring-boot-starter-web                   org.springframework.boot         spring-boot-starter-amqp                   org.projectlombok         lombok         true                   org.springframework.boot         spring-boot-starter-test         test                   org.springframework.amqp         spring-rabbit-test         test       

这里我们引入了 spring-boot-starter-amqp 来支持 RabbitMQ。

二、异步处理

传统的同步调用方式,调用方需要等待被调用方执行完成后才能继续执行。而使用 MQ 进行异步处理,调用方只需将消息发送到队列,然后继续执行其他任务,从而提高系统响应速度。

2.1 定义消息模型
@Data public class OrderMessage {     private Long orderId;     private String productName;     private BigDecimal price; } 
2.2 配置消息队列和交换机
@Configuration public class RabbitMqConfig {      public static final String ORDER_QUEUE = "order.queue";     public static final String ORDER_EXCHANGE = "order.exchange";      @Bean     public Queue orderQueue() {         return new Queue(ORDER_QUEUE, true);     }      @Bean     public DirectExchange orderExchange() {         return new DirectExchange(ORDER_EXCHANGE);     }      @Bean     public Binding bindingOrder() {         return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.routing.key");     } } 

这里我们定义了一个名为 order.queue 的队列,一个名为 order.exchange 的 Direct 交换机,并将它们绑定在一起。

2.3 发送消息
@Service public class OrderService {      @Autowired     private RabbitTemplate rabbitTemplate;      public void createOrder(OrderMessage message) {         // ... 处理订单逻辑 ...          rabbitTemplate.convertAndSend(RabbitMqConfig.ORDER_EXCHANGE, "order.routing.key", message);     } } 

在 OrderService 中,我们使用 RabbitTemplate 将订单消息发送到 order.exchange 交换机。

2.4 消费消息
@Component @RabbitListener(queues = RabbitMqConfig.ORDER_QUEUE) public class OrderConsumer {      @RabbitHandler     public void handleOrderMessage(OrderMessage message) {         // ... 处理订单消息 ...         System.out.println("Received order message: " + message);     } } 

使用 @RabbitListener 注解监听 order.queue 队列,当有消息到达时,handleOrderMessage() 方法会被自动调用。

三、流量削峰

当系统面临突发流量冲击时,MQ 可以充当缓冲区,将请求先存储在队列中,然后逐步消费,避免系统过载。

3.1 配置消息监听器
@Component @RabbitListener(queues = RabbitMqConfig.ORDER_QUEUE, concurrency = "5-10") public class OrderConsumer {     // ... } 

通过设置 concurrency 属性,我们可以控制消息监听器的并发数量,从而控制消息的消费速度,达到流量削峰的目的。

四、消息总线

消息总线(Message Bus)是一种设计模式,它允许不同的组件通过共享的消息通道进行通信。

4.1 创建主题交换机
@Configuration public class RabbitMqConfig {      public static final String EVENT_EXCHANGE = "event.exchange";      @Bean     public TopicExchange eventExchange() {         return new TopicExchange(EVENT_EXCHANGE);     } } 

这里我们创建了一个名为 event.exchange 的主题交换机,用于实现消息总线。

4.2 发布消息
@Service public class EventPublisher {      @Autowired     private RabbitTemplate rabbitTemplate;      public void publishEvent(String event, Object data) {         rabbitTemplate.convertAndSend(RabbitMqConfig.EVENT_EXCHANGE, event, data);     } } 
4.3 订阅消息
@Component @RabbitListener(bindings = @RabbitListenerBinding(         value = @QueueBinding(                 value = @Queue(value = "user.queue", durable = "true"),                 exchange = @Exchange(value = RabbitMqConfig.EVENT_EXCHANGE, type = ExchangeTypes.TOPIC),                 key = "user.#"         ) )) public class UserEventListener {      @RabbitHandler     public void handleUserEvent(UserEvent event) {         // ... 处理用户事件 ...     } } 

使用 user.# 作为路由键,UserEventListener 将会接收到所有以 user. 开头的事件消息。

五、延迟队列

延迟队列允许我们设置消息的延迟时间,消息在延迟时间到达后才会被消费者消费。

5.1 创建延迟队列
@Configuration public class RabbitMqConfig {      public static final String DELAY_QUEUE = "delay.queue";     public static final String DELAY_EXCHANGE = "delay.exchange";      @Bean     public Queue delayQueue() {         return QueueBuilder.durable(DELAY_QUEUE)                 .withArgument("x-dead-letter-exchange", "dead.letter.exchange") // 设置死信交换机                 .withArgument("x-dead-letter-routing-key", "dead.letter.routing.key") // 设置死信路由键                 .build();     }      @Bean     public DirectExchange delayExchange() {         return new DirectExchange(DELAY_EXCHANGE);     }      @Bean     public Binding bindingDelay() {         return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.routing.key");     } } 
5.2 发送延迟消息
@Service public class DelayMessageSender {      @Autowired     private RabbitTemplate rabbitTemplate;      public void sendDelayMessage(Object message, long delayTime) {         rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE, "delay.routing.key", message, msg -> {             msg.getMessageProperties().setExpiration(String.valueOf(delayTime));             return msg;         });     } } 
5.3 处理延迟消息

我们需要创建一个新的队列和交换机来处理死信消息:

@Configuration public class RabbitMqConfig {      // ...      @Bean     public Queue deadLetterQueue() {         return new Queue("dead.letter.queue", true);     }      @Bean     public DirectExchange deadLetterExchange() {         return new DirectExchange("dead.letter.exchange");     }      @Bean     public Binding bindingDeadLetter() {         return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dead.letter.routing.key");     } }  @Component @RabbitListener(queues = "dead.letter.queue") public class DeadLetterConsumer {      @RabbitHandler     public void handleDeadLetterMessage(String message) {         // ... 处理延迟消息 ...         System.out.println("Received dead letter message: " + message);     } } 

六、广播消息推送

广播消息允许我们将同一条消息发送给多个消费者。

6.1 创建扇形交换机
@Configuration public class RabbitMqConfig {      public static final String FANOUT_EXCHANGE = "fanout.exchange";      @Bean     public FanoutExchange fanoutExchange() {         return new FanoutExchange(FANOUT_EXCHANGE);     } } 
6.2 发送广播消息
@Service public class BroadcastMessageSender {      @Autowired     private RabbitTemplate rabbitTemplate;      public void sendBroadcastMessage(Object message) {         rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE, "", message);     } } 
6.3 接收广播消息

每个消费者都需要创建一个独立的队列,并绑定到 fanout.exchange 交换机:

@Component @RabbitListener(bindings = @RabbitListenerBinding(         value = @QueueBinding(                 value = @Queue(value = "broadcast.queue.1", durable = "true"),                 exchange = @Exchange(value = RabbitMqConfig.FANOUT_EXCHANGE, type = ExchangeTypes.FANOUT)         ) )) public class BroadcastConsumer1 {      @RabbitHandler     public void handleBroadcastMessage(String message) {         // ... 处理广播消息 ...         System.out.println("Consumer 1 received broadcast message: " + message);     } }  @Component @RabbitListener(bindings = @RabbitListenerBinding(         value = @QueueBinding(                 value = @Queue(value = "broadcast.queue.2", durable = "true"),                 exchange = @Exchange(value = RabbitMqConfig.FANOUT_EXCHANGE, type = ExchangeTypes.FANOUT)         ) )) public class BroadcastConsumer2 {      @RabbitHandler     public void handleBroadcastMessage(String message) {         // ... 处理广播消息 ...         System.out.println("Consumer 2 received broadcast message: " + message);     } } 

七、总结

本文介绍了如何使用 Spring Boot 和 RabbitMQ 实现异步处理、流量削峰、消息总线、延迟队列和广播消息推送等功能。希望这篇文章能够帮助你更好地理解 MQ 的强大功能,并在实际项目中灵活运用。

相关内容

热门资讯

黑科技挂(欢乐棋牌)外挂透明挂... 您好,欢乐棋牌这款游戏可以开挂的,确实是有挂的,需要了解加微【136704302】很多玩家在这款游戏...
黑科技辅助(WEPoke)外挂... 黑科技辅助(WEPoke)外挂透明挂辅助挂(透视)存在挂教程(2024已更新)(哔哩哔哩)是一款可以...
黑科技辅助!wepoke有挂(... 您好,wepoke有挂这款游戏可以开挂的,确实是有挂的,需要了解加微【136704302】很多玩家在...
黑科技辅助!wepoke苹果版... 黑科技辅助!wepoke苹果版外挂(透视)好像有挂(真的有挂)-哔哩哔哩;1、让任何用户在无需AI插...
黑科技安卓版(德州之星ai辅助... 【福星临门,好运相随】;黑科技安卓版(德州之星ai辅助)外挂透明挂辅助软件(透视)原生存在有挂(了解...
黑科技辅助(xpoker)外挂... 黑科技辅助(xpoker)外挂透明挂辅助安装(透视)必赢教程(2025已更新)(哔哩哔哩);亲真的是...
黑科技辅助!nzt德州辅助软件... 黑科技辅助!nzt德州辅助软件(透视)软件透明挂ai辅助工具(素来真的有挂)-哔哩哔哩;1、黑科技辅...
黑科技辅助!德州wpk辅助(透... 黑科技辅助!德州wpk辅助(透视)最初真的有挂(有挂功能)-哔哩哔哩;大家肯定在之前德州wpk辅助或...
黑科技ai代打(WePoKea... 您好,WePoKeai插件这款游戏可以开挂的,确实是有挂的,需要了解加微【136704302】很多玩...
黑科技辅助(来玩app德州)外... 黑科技辅助(来玩app德州)外挂透明挂辅助安装(透视)爆料教程(2021已更新)(哔哩哔哩);一、来...