微服务基础篇:MQ(MessageQueue)消息队列(同步异步通讯,RabbitMQ快速入门,SpringAMQP,简单队列模型,工作队列模型,发布订阅模型,消息转换器)
创始人
2025-01-09 10:34:48
0

目录

  • 1.初识MQ
    • 1.同步通讯
      • 1.同步调用存在的问题
      • 2.优点
    • 2.异步通讯
      • 1.事件驱动优势
      • 2.异步通信的缺点
    • 3.MQ常见框架
  • 2.RabbitMQ快速入门
    • 1.单机部署
    • 2.常见消息模型
  • 3.SpringAMQP
    • 1.Basic Queue简单队列模型
    • 2.Work Queue工作队列模型
    • 3.发布、订阅模型-Fanout
      • 1.发布订阅模式
      • 2.Fanout Exchange
    • 4.发布、订阅模型-Direct
    • 5.发布、订阅模型-Topic
    • 6.消息转换器

1.初识MQ

1.同步通讯

同步通讯是指通信双方在进行数据交流时,必须按照一定的顺序同步进行,数据的发送方必须等待接收方对前一条数据的接收和处理完成后,才能发送下一条数据,确保数据的顺序和一致性
在同步通讯中,通信双方会相互等待对方的响应,直到收到对方的确认信号才会进行下一步操作。
同步通讯常见的应用有电话通话、视频聊天和实时游戏等。

1.同步调用存在的问题

耦合度高
每次加入新的需求,都要修改原来的代码。
性能下降
调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和。
资源浪费
调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源。
级联失败
如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障。

2.优点

时效性较强,可以立即得到结果

2.异步通讯

异步通讯是指通信双方在进行数据交流时,不需要即时等待对方的响应就能够继续进行下一步的操作。
发送方在发送数据后,不会立即等待接收方的确认信号,而是可以立即进行其他操作。
接收方在接收数据后,可以根据需要决定何时对数据进行处理,无需强制按照发送方的时序进行处理。
异步通讯能够提高通信的效率和并发性,常见的应用有电子邮件、消息队列等。

异步调用常见实现就是事件驱动模式

在这里插入图片描述

1.事件驱动优势

服务解耦
性能提升,吞吐量提高
服务没有强依赖,不担心级联失败问题
流量削峰

2.异步通信的缺点

①依赖于Broker的可靠性、安全性、吞吐能力
②架构复杂,业务没有明显的流程线,不好追踪管理

3.MQ常见框架

MQ(MessageQueue),中文是消息队列,字面来看就是存放消息的队列。
也就是事件驱动架构中的Broker

在这里插入图片描述

2.RabbitMQ快速入门

官方网址:

https://www.rabbitmq.com/ 

1.单机部署

在Centos7虚拟机中使用Docker来安装:
1.在线拉取:

docker pull rabbitmq:3-management 

2.本地导入:
在这里插入图片描述

docker load -i mq.tar 

在这里插入图片描述
3.执行下面的命令来运行MQ容器:

docker run \  -e RABBITMQ_DEFAULT_USER=root \  -e RABBITMQ_DEFAULT_PASS=123456 \  --name mq \  --hostname mq1 \  -p 15672:15672 \  -p 5672:5672 \  -d \  rabbitmq:3-management 

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

RabbitMQ中的几个概念:
channel:操作MQ的工具.
exchange:路由消息到队列中
queue:缓存消息
virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

2.常见消息模型

1.基本消息队列(BasicQueue )
2.工作消息队列( WorkQueue)
发布订阅( Publish、Subscribe),又根据交换机类型不同分为三种:
3.Fanout Exchange:广播
4.Direct Exchange:路由
5.Topic Exchange:主题

消息发送流程:

①建立connection
②创建channel
③利用channel声明队列
④利用channel向队列发送消息

消息接收流程:

①建立connection
②创建channel
③利用channel声明队列
④定义consumer的消费行为handleDelivery()
⑤利用channel将消费者与队列绑定

3.SpringAMQP

官方地址:https://spring.io/projects/spring-amqp

AMQP:Advanced Message Queuing Protocol,是用于在应用程序或之间传递业务消息的开放标准。
该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。
包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

1.Basic Queue简单队列模型

案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能:

1.在父工程中引入spring-amqp的依赖
在这里插入图片描述

2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
在这里插入图片描述
在这里插入图片描述
查看消息队列情况:
在这里插入图片描述

3.在consumer服务中编写消费逻辑,监听simple.queue这个队列
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

2.Work Queue工作队列模型

Work queue,工作队列,
①可以提高消息处理速度,避免队列消息堆积
②多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
③通过设置prefetch来控制消费者预取的消息数量

案例:模拟WorkQueue,实现一个队列绑定多个消费者

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

可以看到MQ默认的消息预取机制是相对平均的,并没有考虑到不同消费者的消费能力的问题

所以我们可以做一个消费预取限制:

修改application.yml文件,设置preFetch这个值,可以控制预取消息的上限:
在这里插入图片描述

重启消费者consumer类,并运行生产者测试类:

在这里插入图片描述

3.发布、订阅模型-Fanout

1.发布订阅模式

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。

1.交换机的作用:

  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失

2.常见exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

注意:exchange负责消息路由,而不是存储,路由失败则消息丢失

2.Fanout Exchange

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue

在这里插入图片描述

案例:利用SpringAMQP演示FanoutExchange的使用

1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定

在这里插入图片描述
在这里插入图片描述

2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

在这里插入图片描述

3.在publisher中编写测试方法,向itcast.fanout发送消息
在这里插入图片描述

在这里插入图片描述

4.发布、订阅模型-Direct

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例:利用SpringAMQP演示DirectExchange的使用

1.利用@RabbitListener声明Exchange.Queue、RoutingKey

2.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
在这里插入图片描述

启动消费者类,可以在MQ服务器上看到新增的队列和路由:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.在publisher中编写测试方法,向itcast. direct发送消息

在这里插入图片描述
查看控制台信息:
在这里插入图片描述
队列1确实也是绑定的blue

描述下Direct交换机与Fanout交换机的差异?
①Fanout交换机将消息路由给每一个与之绑定的队列
②Direct交换机根据RoutingKey判断路由给哪个队列
③如果多个队列具有相同的RoutingKey,则与Fanout功能类似

5.发布、订阅模型-Topic

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割

Queue与Exchange指定BindingKey时可以使用通配符:

#:代指0个或多个单词
*:代指一个单词

案例:利用SpringAMQP演示TopicExchange的使用
1.并利用@RabbitListener声明Exchange、Queue、RoutingKey

2.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.在publisher中编写测试方法,向itcast. topic发送消息
在这里插入图片描述

6.消息转换器

案例 : 测试发送Object类型消息:
说明:在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
消息转换器:

Spring的对消息对象的处理是由org.springframework.amqp.support.converter.MessageConverter来处理的。
而默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter类型的Bean即可。

推荐用JSON方式序列化,步骤如下:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
consumer模块接收消息的形式:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

SpringAMQP中消息的序列化和反序列化是怎么实现的?

  • 利用MessageConverter实现的,默认是JDK的序列化
  • 注意发送方与接收方必须使用相同的MessageConverter

相关内容

热门资讯

基于Hadoop的网上购物行为... 有需要本项目的可以私信博主,提供部署和讲解服务!!...
Flink-时间语义 1时间语义flink种设计时间的不同概念:1 Event Time:事件时间ÿ...
Java中的StringBui... 一、StringBuilder的使用方法1. 什么是StringBuilderStringBuild...
Spring Boot集成Sh... Spring Boot集成ShardingSphere详解随着数据量的不断增长,单一数...
踏入大数据的第一天,我先入入门 【转行原因】目前数据每天已亿计算,作为 Java程序员的我已经无法使用常规工具对数据进...
Lua 运算符 Lua 运算符Lua 是一种轻量级的编程语言,广泛用于游戏开发、脚本编写和其他应用程序...
使用Postman调用微信小程... 引言微信小程序已经成为企业连接用户的重要渠道之一。为了吸引用户进入小程序,开发者常常需...
Flink 运行时[Runti... 一、基本组件栈在Flink整个软件架构体系中,同样遵循着分层的架构设计理念࿰...
Lua协程(同步的多线程) 1.coroutine.create( func )创建一个协程,返回co࿰...
camtasia怎么剪掉不用的... 有时我们录制的屏幕内容,并不一定全部需要。那么,屏幕录制的视频怎么裁剪上...