Rabbitmq的几种工作模式
创始人
2024-11-06 11:10:56
0

工具类

public class RabbitMQConnection {     public static Connection getConnection() throws Exception{         //1.创建connectionFactory         ConnectionFactory connectionFactory = new ConnectionFactory();         //2.配置Host         connectionFactory.setHost("127.0.0.1");         //3.设置Port         connectionFactory.setPort(5672);         //4.设置账户和密码         connectionFactory.setUsername("guest");         connectionFactory.setPassword("guest");         //5.设置VirtualHost         connectionFactory.setVirtualHost("0517");         return connectionFactory.newConnection();     } } 

点对点(简单)的队列

图解:

        

生产者代码
public class Producer {     private final static String QUEUE_NAME = "hello";     public static void main(String[] args) throws Exception {         //创建一个连接工厂         ConnectionFactory factory = new ConnectionFactory();         factory.setHost("127.0.0.1");         factory.setUsername("guest");         factory.setPassword("guest");         //channel 实现了自动 close 接口 自动关闭 不需要显示关闭         try(Connection connection = factory.newConnection(); Channel channel =                 connection.createChannel()) {             /**              * 生成一个队列              * 1.队列名称              * 2.队列里面的消息是否持久化 默认消息存储在内存中              * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费              * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除              * 5.其他参数              */             channel.queueDeclare(QUEUE_NAME,false,false,false,null);             String message="hello world";             /**              * 发送一个消息              * 1.发送到那个交换机              * 2.路由的 key 是哪个              * 3.其他的参数信息              * 4.发送消息的消息体              */             channel.basicPublish("",QUEUE_NAME,null,message.getBytes());             System.out.println("消息发送完毕");         }     } }
消费者代码
public class Consumer {     private final static String QUEUE_NAME = "hello";     public static void main(String[] args) throws Exception {         ConnectionFactory factory = new ConnectionFactory();         factory.setHost("127.0.0.1");         factory.setUsername("guest");         factory.setPassword("guest");         Connection connection = factory.newConnection();         Channel channel = connection.createChannel();         System.out.println("等待接收消息....");         //推送的消息如何进行消费的接口回调         DeliverCallback deliverCallback=(consumerTag, delivery)->{             String message= new String(delivery.getBody());             System.out.println(message);         };         //取消消费的一个回调接口 如在消费的时候队列被删除掉了         CancelCallback cancelCallback=(consumerTag)->{             System.out.println("消息消费被中断");         };         /**          * 消费者消费消息          * 1.消费哪个队列          * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答          * 3.消费者未成功消费的回调          */         channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);     } }
重点解析:点对点没什么可说的,就是生产者产生消息给到消息队列,第一次已推送的形式给到消费者,之后就是消费者端主动的拉取,需要在生产端创建好队列或在图形化页面创建好队列

工作(公平性)队列模式

图解:

       

生产者代码
public class Task01 {     private static final String QUEUE_NAME="hello";     public static void main(String[] args) throws Exception {         try(Channel channel=RabbitMqUtils.getChannel();) {             channel.queueDeclare(QUEUE_NAME,false,false,false,null);             //从控制台当中接受信息             Scanner scanner = new Scanner(System.in);             while (scanner.hasNext()){                 String message = scanner.next();                 channel.basicPublish("",QUEUE_NAME,null,message.getBytes());                 System.out.println("发送消息完成:"+message);             }         }     } }
消费者代码

消费者1:

public class Consumer1 {     private final static String QUEUE_NAME = "hello";     public static void main(String[] args) throws Exception {         Channel channel = RabbitMqUtils.getChannel();          //推送的消息如何进行消费的接口回调         DeliverCallback deliverCallback=(consumerTag, delivery)->{             String receivedMessage = new String(delivery.getBody());             System.out.println("接收到消息:"+receivedMessage);         };         CancelCallback cancelCallback=(consumerTag)->{             System.out.println(consumerTag+"消费者取消消费接口回调逻辑");         };         System.out.println("C1 消费者启动等待消费......");         channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);     } } 

消费者2:

public class Consumer2 {     private final static String QUEUE_NAME = "hello";     public static void main(String[] args) throws Exception {          Channel channel = RabbitMqUtils.getChannel();          //推送的消息如何进行消费的接口回调         DeliverCallback deliverCallback=(consumerTag, delivery)->{             String receivedMessage = new String(delivery.getBody());             System.out.println("接收到消息:"+receivedMessage);         };         CancelCallback cancelCallback=(consumerTag)->{             System.out.println(consumerTag+"消费者取消消费接口回调逻辑");         };         System.out.println("C2 消费者启动等待消费......");         channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);     } }
重点解析:
        工作(公平性)队列模式,和点对点差不多,就是生产者将消息直接存放到队列中,然后队列默认采用轮询的形式选择消费者进行消费

        当然也可以设置channel.basicQos(i)的形式进行公平分发(谁处理快,谁做的多)

        这里公平的意思是谁做的多,谁处理的多,并不是平均分配的意思

发布订阅模式

图解:

       

生产者代码
public class ProducerFanout {     //定义交换机名称     private static final String EXCHANGE_NAME = "fanout_exchange";      public static void main(String[] args) throws  Exception{         //创建连接         Connection connection = RabbitMQConnection.getConnection();         //创建通道         Channel channel = connection.createChannel();         //通道关联交换机(创建交换机)(fanout类型会自动创建)         //channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);         //channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true);         String msg = "程子强你好";         channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());         channel.close();         connection.close();     } }
消费者代码

消费者1:

public class MailConsumer {     /**      * 定义邮件队列      */     private static final String QUEUE_NAME = "fanout_email_queue";     /**      * 定义交换机的名称      */     private static final String EXCHANGE_NAME = "fanout_exchange";      public static void main(String[] args) throws  Exception{         System.out.println("邮件消费者...");         // 创建我们的连接         Connection connection = RabbitMQConnection.getConnection();         // 创建我们通道         final Channel channel = connection.createChannel();         // 关联队列消费者关联队列         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {             @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                 String msg = new String(body, "UTF-8");                 System.out.println("邮件消费者获取消息:" + msg);             }         };         // 开始监听消息 自动签收         channel.basicConsume(QUEUE_NAME, true, defaultConsumer);      } }

消费者2:

public class SmsConsumer {     /**      * 定义短信队列      */     private static final String QUEUE_NAME = "fanout_email_sms";     /**      * 定义交换机的名称      */     private static final String EXCHANGE_NAME = "fanout_exchange";     public static void main(String[] args) throws Exception{         System.out.println("短信消费者...");     // 创建我们的连接     Connection connection = RabbitMQConnection.getConnection();     // 创建我们通道     final Channel channel = connection.createChannel();     // 关联队列消费者关联队列         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");     DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {         @Override         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {             String msg = new String(body, "UTF-8");             System.out.println("短信消费者获取消息:" + msg);         }     };     // 开始监听消息 自动签收         channel.basicConsume(QUEUE_NAME, true, defaultConsumer);  } }
重点解析:
       发布订阅模式,和前两种模式不同这里用到了一个fanout类型的交换机(具体交换机的类型和概念小伙伴们可以自行查阅下,这里主要讲工作模式),生产者将消息发送给这个交换机,这个交换机把消息发送给每一个和其绑定的队列(注意fanout类型的交换机不需要key所以生产者传递直接传""就好)

路由模式Routing

图解:

       

生产者代码
public class ReceiveLogsDirect {     private static final String EXCHANGE_NAME = "direct_logs";     public static void main(String[] args) throws Exception {         try (Connection connection = RabbitMQConnection.getConnection(); Channel channel =                 connection.createChannel()) {             //创建交换机             channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);             //创建多个 bindingKey             Map bindingKeyMap = new HashMap<>();             bindingKeyMap.put("info","普通 info 信息");             bindingKeyMap.put("warning","警告 warning 信息");             bindingKeyMap.put("error","错误 error 信息");             //debug 没有消费这接收这个消息 所有就丢失了             bindingKeyMap.put("debug","调试 debug 信息");              for (Map.Entry bindingKeyEntry: bindingKeyMap.entrySet()){                 String bindingKey = bindingKeyEntry.getKey();                 String message = bindingKeyEntry.getValue();                 channel.basicPublish(EXCHANGE_NAME,bindingKey, null,                         message.getBytes("UTF-8"));                 System.out.println("生产者发出消息:" + message);             }         }     } }
消费者代码

消费者1:

public class ReceiveLogsDirect01 {     private static final String EXCHANGE_NAME = "direct_logs";      public static void main(String[] args) throws  Exception{         Connection connection = RabbitMQConnection.getConnection();         Channel channel = connection.createChannel();         //写不写都可以,如果代码创建在生产端写,如果是浏览器创建,就不需要写这段代码         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);         String queueName = "disk";         channel.queueBind(queueName, EXCHANGE_NAME, "error");         System.out.println("等待接收消息.....");         DeliverCallback deliverCallback = (consumerTag, delivery) -> {             String message = new String(delivery.getBody(), "UTF-8");             message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;             File file = new File("E:\\xxx\\rabbitmq_info.txt");//路径任意写             FileUtils.writeStringToFile(file,message,"UTF-8");             System.out.println("错误日志已经接收");         };         channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {         });     } }

消费者2:

public class ReceiveLogsDirect02 {     private static final String EXCHANGE_NAME = "direct_logs";     public static void main(String[] argv) throws Exception {         Connection connection = RabbitMQConnection.getConnection();         Channel channel = connection.createChannel();         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,true);         String queueName = "console";         channel.queueBind(queueName, EXCHANGE_NAME, "info");         channel.queueBind(queueName, EXCHANGE_NAME, "warning");         System.out.println("等待接收消息.....");         DeliverCallback deliverCallback = (consumerTag, delivery) -> {             String message = new String(delivery.getBody(), "UTF-8");             System.out.println(" 接收绑定键 :"+delivery.getEnvelope().getRoutingKey()+", 消息:"+message);         };         channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {         });     } }
重点解析:
       路由模式,用到了direct类型的交换机,简单讲就是队列通过key和交换机进行绑定,生产者那边传入的key和消息给交换机,如果该队列绑定的key与其传入的key相同则,交换机讲该消息传给对应的队列,一个队列可以绑定多个key

通配符模式Topics(主题)

图解:

       

生产者代码
public class ProducerTopic {     /**      * 定义交换机的名称      */     private static final String EXCHANGE_NAME = "topic_exchange";      public static void main(String[] args) throws Exception{         //  创建Connection         Connection connection = RabbitMQConnection.getConnection();         // 创建Channel         Channel channel = connection.createChannel();         // 通道关联交换机         channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);         String msg = "我是xxx";         channel.basicPublish(EXCHANGE_NAME, "czq.hhh.aa", null, msg.getBytes());         channel.close();         connection.close();     } } 
消费者代码

消费者1:

public class SmsConsumer {     /**      * 定义短信队列      */     private static final String QUEUE_NAME = "topic_sms_queue";     /**      * 定义交换机的名称      */     private static final String EXCHANGE_NAME = "topic_exchange";      public static void main(String[] args) throws Exception{         System.out.println("短信消费者...");         //  创建Connection         Connection connection = RabbitMQConnection.getConnection();         // 创建Channel         Channel channel = connection.createChannel();         // 关联队列消费者关联队列         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "czq.#");         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {             @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                 String msg = new String(body, "UTF-8");                 System.out.println("短信消费者获取消息:" + msg);             }         };         // 开始监听消息 自动签收         channel.basicConsume(QUEUE_NAME, true, defaultConsumer);      } } 

消费者2:

public class MailConsumer {     /**      * 定义邮件队列      */     private static final String QUEUE_NAME = "topic_email_queue";     /**      * 定义交换机的名称      */     private static final String EXCHANGE_NAME = "topic_exchange";      public static void main(String[] args) throws Exception{         System.out.println("邮件消费者...");         //  创建Connection         Connection connection = RabbitMQConnection.getConnection();         // 创建Channel         Channel channel = connection.createChannel();         // 关联队列消费者关联队列         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.boyatop.#");         DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {             @Override             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                 String msg = new String(body, "UTF-8");                 System.out.println("邮件消费者获取消息:" + msg);             }         };         // 开始监听消息 自动签收         channel.basicConsume(QUEUE_NAME, true, defaultConsumer);      } } 
重点解析:
       通配符模式,用到了topic类型的交换机,简单讲与通配符模式原理大致,差别在于根据队列绑定的路由建模糊转发到具体的队列中存放。其中#号表示支持匹配多个词;*号表示只能匹配一个词,假如同一个队列与交换机直接设置的多个模糊的key都符合传入的,那么也只传送一次

相关内容

热门资讯

据权威媒体报道!手游挂机辅助免... 据权威媒体报道!手游挂机辅助免费版(辅助)果然是真的有辅助挂(讲解有挂);1、许多玩家不知道手游挂机...
练习辅助!都莱大菠萝怎么设置!... 练习辅助!都莱大菠萝怎么设置!曝光存在有辅助神器(果真有挂)1、实时都莱大菠萝怎么设置透视辅助更新:...
于此同时!宝宝游戏辅助(辅助)... 于此同时!宝宝游戏辅助(辅助)原来真的是有辅助器(果真有挂);宝宝游戏辅助透视方法中分为三种模型:宝...
演示辅助!哈糖菠萝怎么挂!教你... 演示辅助!哈糖菠萝怎么挂!教你真的是有辅助神器(存在有挂)1、完成哈糖菠萝怎么挂有辅助插件,帮助玩家...
黑科技辅助挂!随意玩房卡代理有... 黑科技辅助挂!随意玩房卡代理有挂吗(辅助)切实真的有辅助app(有挂技巧)1、首先打开随意玩房卡代理...
演示辅助!推饼游戏小程序辅助器... 演示辅助!推饼游戏小程序辅助器!解谜是有辅助app(有挂透视)一、推饼游戏小程序辅助器可以开透视的定...
昨日!拱趴辅助(辅助)总是是真... 昨日!拱趴辅助(辅助)总是是真的有辅助工具(果真有挂)所有人都在同一条线上,像星星一样排成一排,每一...
烘培辅助!牛总管辅助免费版!关... 烘培辅助!牛总管辅助免费版!关于有辅助方法(详细教程)1、下载好牛总管辅助免费版脚本下载之后点击打开...
长期以来!财神十三张有挂辅助吗... 长期以来!财神十三张有挂辅助吗(辅助)原来确实有辅助攻略(有挂猫腻)1、每一步都需要思考,不同水平的...
教程书辅助!七千在线十三道辅助... 教程书辅助!七千在线十三道辅助!教你有辅助技巧(果真有挂)1、上手简单,内置详细流程视频教学,新手小...