使用 RocketMQ 实现消息的顺序消费
创始人
2025-01-08 00:32:55
0

在分布式系统中,保持消息的顺序性是一个常见且重要的问题。RocketMQ 提供了一种有效的方式来确保消息的顺序消费。本文将通过代码示例,介绍如何使用 RocketMQ 实现消息的顺序生产和消费。

环境准备

在开始之前,请确保您已经配置好 RocketMQ 环境,并且在 MqConstant 类中定义了 RocketMQ 的 NameServer 地址。

顺序消息的生产

首先,我们需要编写生产者代码来发送顺序消息。我们会创建两个示例,一个简单的顺序生产示例,另一个则是基于业务逻辑(如订单流程)的顺序生产示例。

简单的顺序生产者

package com.takumilove.demo;  import com.takumilove.constant.MqConstant; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.junit.Test;  public class FOrderlyTest {      @Test     public void orderlyProducer() throws Exception {         DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");         producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);         producer.start();         for (int i = 0; i < 10; i++) {             Message message = new Message("orderlyTopic", ("我是第" + i + "个消息").getBytes());             producer.send(message);         }         producer.shutdown();         System.out.println("发送完毕:");     } } 

基于业务逻辑的顺序生产者

在这个示例中,我们假设有一个 Order 类表示订单,订单包含了 idorderNumberpricedatestatus 等信息。

package com.takumilove.demo;  import com.takumilove.constant.MqConstant; import com.takumilove.domain.Order; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.junit.Test;  import java.util.Arrays; import java.util.Date; import java.util.List;  public class FOrderlyTest {      @Test     public void orderlyProducer() throws Exception {         DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");         producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);         producer.start();         List orderList = Arrays.asList(                 new Order(1, 111, 59D, new Date(), "下订单"),                 new Order(2, 111, 59D, new Date(), "物流"),                 new Order(3, 111, 59D, new Date(), "签收"),                 new Order(4, 112, 89D, new Date(), "下订单"),                 new Order(5, 112, 89D, new Date(), "物流"),                 new Order(6, 112, 89D, new Date(), "拒收")         );         orderList.forEach(order -> {             Message message = new Message("orderlyTopic", order.toString().getBytes());             try {                 producer.send(message, new MessageQueueSelector() {                     @Override                     public MessageQueue select(List list, Message message, Object o) {                         int queueNumber = list.size();                         Integer i = (Integer) o;                         return list.get(i % queueNumber);                     }                 }, order.getOrderNumber());             } catch (Exception e) {                 System.out.println("发送失败" + e.getMessage());             }         });         producer.shutdown();         System.out.println("发送完毕:");     } } 

顺序消息的消费

接下来,我们编写消费者代码来消费这些顺序消息。我们将分别展示简单顺序消费者和基于业务逻辑的顺序消费者。

简单的顺序消费者

package com.takumilove.demo;  import com.takumilove.constant.MqConstant; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.junit.Test;  import java.util.List;  public class FOrderlyTest {      @Test     public void orderlyConsumer() throws Exception {         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");         consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);         consumer.subscribe("orderlyTopic", "*");         consumer.registerMessageListener(new MessageListenerConcurrently() {             @Override             public ConsumeConcurrentlyStatus consumeMessage(List list,                                                             ConsumeConcurrentlyContext consumeConcurrentlyContext) {                 System.out.println(new String(list.get(0).getBody()));                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;             }         });         consumer.start();         System.in.read();     } } 

基于业务逻辑的顺序消费者

package com.takumilove.demo;  import com.takumilove.constant.MqConstant; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.message.MessageExt; import org.junit.Test;  import java.util.List;  public class FOrderlyTest {      @Test     public void orderlyConsumer() throws Exception {         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");         consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);         consumer.subscribe("orderlyTopic", "*");         consumer.registerMessageListener(new MessageListenerOrderly() {             @Override             public ConsumeOrderlyStatus consumeMessage(List list,                                                        ConsumeOrderlyContext consumeOrderlyContext) {                 MessageExt messageExt = list.get(0);                 System.out.println(new String(messageExt.getBody()));                 return ConsumeOrderlyStatus.SUCCESS;             }         });         consumer.start();         System.in.read();     } } 

总结

通过以上示例,我们展示了如何使用 RocketMQ 实现消息的顺序生产和消费。无论是简单的消息还是基于业务逻辑的消息,都可以通过 RocketMQ 提供的顺序消费机制来保证消息的有序性。这对于订单系统等需要严格顺序的场景尤为重要。

相关内容

热门资讯

透视透视“阿当比鸡破解版2.0... 透视透视“阿当比鸡破解版2.0.0”拱趴大菠萝万能挂(带开挂辅助下载透视教程) 了解更多开挂安装加(...
教程辅助“九游破解辅助插件”有... 九游破解辅助插件开挂教程视频分享装挂详细步骤在当今的网络游戏中,九游破解辅助插件作为一种经典的娱乐方...
我来教教大家“蛮王大厅辅助”约... 我来教教大家“蛮王大厅辅助”约局吧是否有挂(带开挂辅助软件详细教程!)蛮王大厅辅助ai黑科技系统规律...
教程辅助“阿拉游戏中心辅助”发... 您好:阿拉游戏中心辅助这款游戏可以开挂的,确实是有挂的,很多玩家在这款游戏中打牌都会发现很多用户的牌...
实操分享“约局吧破解器”拱趴大... 实操分享“约局吧破解器”拱趴大菠萝有挂吗(带开挂辅助神器可靠教程) 了解更多开挂安装加(136704...
教程辅助“柳州八一字牌辅助”有... 教程辅助“柳州八一字牌辅助”有挂规律开挂辅助神器新2026版您好:柳州八一字牌辅助这款游戏可以开挂,...
必备科技“佛手在线辅助器苹果版... 必备科技“佛手在线辅助器苹果版”wpk软件是真的吗(带开挂辅助平台曝光教程);亲,佛手在线辅助器苹果...
教程辅助“微信小程序旺旺福建辅... 【亲,微信小程序旺旺福建辅助 这款游戏可以开挂的,确实是有挂的,很多玩家在这款微信小程序旺旺福建辅助...
透视讲解“西部透视辅助”hhp... 透视讲解“西部透视辅助”hhpoker是正规的吗(带开挂辅助工具详细教程);亲,西部透视辅助这款游戏...
教程辅助“天天贵阳智能辅助器”... 您好:天天贵阳智能辅助器这款游戏可以开挂的,确实是有挂的,很多玩家在这款游戏中打牌都会发现很多用户的...