【多线程】阻塞队列
创始人
2024-11-15 11:08:48
0

🏀🏀🏀来都来了,不妨点个关注!
🎧🎧🎧博客主页:欢迎各位大佬!
在这里插入图片描述

文章目录

  • 1. 阻塞队列是什么
  • 2. 简单使用阻塞队列
  • 3. 阻塞队列的应用场景——生产者消费者模型
    • 3.1 生产者消费者模型的概念
    • 3.2 生产者消费者模型的作用
      • 3.2.1 解耦合
      • 3.2.2 削峰填谷
  • 4. 模拟实现阻塞队列
    • 4.1 实现一个普通队列
    • 4.2 加上线程安全
    • 3. 加上阻塞功能

1. 阻塞队列是什么

阻塞队列是一种特殊的队列,既然是队列,也就满足“先进先出”的原则。
阻塞队列是一种线程安全的数据结构,并且具有以下特性:

  1. 当队列满的时候,继续入队列就会阻塞,直到有其他线程从队列中取走元素
  2. 当队列空的时候,继续出队列也会阻塞,直到有其他线程往队列中插入元素

如下图:

在这里插入图片描述
这个阻塞队列是十分有用的,尤其在写多线程代码的时候,多个线程之间进行数据交互,可以使用阻塞队列简化代码的编写,其中阻塞队列的一个典型应用场景就是 “生产者消费者模型”,这是一种非常典型的开发模型!

2. 简单使用阻塞队列

在Java标准库中为我们提供了阻塞队列的使用,当我们需要用到阻塞队列的时候,直接使用Java标准库为我们提供的BlockingQueue即可,BlockingQueue是一个接口,它的具体实现类常用的有数组形式的ArrayBlockingQueue(在实例化的时候需要指定数组的大小capacity)和链表形式的LinkedBlockingQueue(默认capacity为Integer.MAX_VALUE,也可以在实例化的时候指定capacity的大小),这里我们用链表形式的进行举例。
如下代码示例,我们简单的进行添加元素和取走元素:

import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque;   //阻塞队列 public class ThreadDemo20 {     public static void main(String[] args) throws InterruptedException {         BlockingDeque blockingDeque = new LinkedBlockingDeque<>();         blockingDeque.put("hello1");         blockingDeque.put("hello2");         blockingDeque.put("hello3");         blockingDeque.put("hello4");         blockingDeque.put("hello5");         String result = null;         result = blockingDeque.take();         System.out.println(result);         result = blockingDeque.take();         System.out.println(result);         result = blockingDeque.take();         System.out.println(result);         result = blockingDeque.take();         System.out.println(result);         result = blockingDeque.take();         System.out.println(result);          result = blockingDeque.take();         System.out.println(result);     } } 

在上述代码中,我们向阻塞队列中添加了5个元素,然后进行了6次取元素的操作,当第6次取元素的操作的时候线程就会发现此时阻塞队列为空,就会进行阻塞等待。
运行结果如下:
在这里插入图片描述

3. 阻塞队列的应用场景——生产者消费者模型

3.1 生产者消费者模型的概念

生产者消费者模式就是通过一个容器(阻塞队列)来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。这里我们拿一个实际生活中事情来举例,逢年过节,大家都会包饺子,吃饺子。我们就拿包饺子举例:
假设现在有小丁小万来包饺子,但只有一个擀面杖,于是就有了下面两种包法:
包法1:小丁擀一个饺子皮,包一个饺子,小万再擀一个饺子皮,包一个饺子。但这样效率并不高,当小丁擀饺子皮的时候,小万只能在旁边干看着,并且小丁小万擀完饺子皮再去包饺子这样切换状态也是比较费时费力的。
包法2:小丁负责擀饺子皮,擀好一个就放在盖帘(主要用于摆放饺子、粉条子等面食,避免它们粘连在一起)上,而小万负责包饺子,这样实现的效果就是,如果小丁擀的比较快将盖帘都摆放满了饺子皮,他就休息一下等待小万包饺子之后再继续擀,而当小万包的比较快,此时盖帘上没有饺子皮,她就可以等待小丁将饺子皮擀好放在盖帘上再继续包。这就和我们上面说的阻塞队列添加和取走元素对应起来了。这也就是我们所说的生产者消费者模型,这里擀饺子皮的小丁就是生产者,其中放饺子皮的盖帘则是阻塞队列,而包饺子的小万就是消费者。
在这里插入图片描述

3.2 生产者消费者模型的作用

生产者消费者模型解决的是什么问题呢?除了我们上面说的效率外,还有两个主要的作用。

3.2.1 解耦合

生产者和消费者之间的操作是独立的,它们之间通过共享的数据结构(如队列)进行通信,而不是直接调用对方的方法。这种解耦使得生产者和消费者可以独立地设计、开发和优化,而不需要关心对方的实现细节。
耦合:指的是两个模板之间的关联程度的高低,关联程度越高耦合越高,关联程度越低耦合越低。
内聚:指的是模板内部各个元素之间联系的紧密程度,元素之间紧密程度越高内聚越高,元素之间紧密程度越低内聚越低。
接着我们看下面的例子思考为什么生产者消费者模型可以解耦合:
在这里插入图片描述
此时有两个服务器A和B,它们之间是直接通信的,这种就是耦合比较高的情况,如果此时服务器B挂了,对服务器A就会有直接的影响。
引入生产者消费者模型:
在这里插入图片描述
此时服务器AB都不知道互相之间的存在,它们通过一个阻塞队列服务器进行数据的交换,此时如果服务器B挂了,对服务器A的影响就很小,这就是低耦合的状态。
消息队列:由于阻塞队列的功能十分好用, 所以也就有大佬把阻塞队列的功能单独拿出来扩充了很多功能并做成了单独的服务器也就是消息队列。

3.2.2 削峰填谷

继续通过上述的服务器AB进行举例:
在这里插入图片描述
A收到的请求数量是和用户正相关的,由于用户请求的数量是随机的,所以在某个时间可能会出现峰值

如果A和B是直接调用的关系,A收到了请求峰值,B也同样会有这个请求峰值,假设A平时收到的请求是每秒钟 1w个,而在某个时间点突然收到了每秒钟 3w个,对于B来说,请求也是每秒钟 3w个,但是对于服务器A来说它只负责转发请求,消耗的资源比较小,而服务器B需要根据请求做出实际的操作,很可能就挂了(服务器处理每个请求,都需要消耗资源硬件,包括不限于CPU、内存、硬盘、带宽,如果某个硬件资源达到瓶颈,此时服务器就挂了)这就给系统的稳定性带来了风险!

此时引入生产者消费者模型,风险大大降低,稳定性提高,如下图:
在这里插入图片描述
此时服务器A不直接向服务器B发生请求,而是通过阻塞队列进行转发请求,所以如果用户此时有大量的请求发送过来,当阻塞队列满了会进行阻塞队列,而服务器B依然可以按照之前的速率进行取数据——削峰。
而在波峰之后会有波谷,此时用户的请求变少,服务器B依然可以按照之前的速率进行取数据——填谷。

4. 模拟实现阻塞队列

实现阻塞队列分为三步:

  1. 先实现一个普通队列
  2. 加上线程安全
  3. 加上阻塞功能

4.1 实现一个普通队列

  1. 定义三个变量:head 头, tail 尾 ,size 记录数组元素的个数
    实现入队里面的操作 :
  2. 判断数组是否满了,满了直接return,没有满则将新元素放在数组尾部,并尾部后移动一个 tail++,如果 tail 达到数组末尾,则将 tail 从头开始,记录数组元素的个数 size++
    实现出队里面的操作:
  3. 判断数组里面是否有元素,数组为空则直接返回 null,不为空,则队列头部 head 的元素放在一个变量 value 里,并将头部后移动一位 head++,如果 head 到达数组末尾,则将 head 从头开始,记录数组元素的个数 size减去1,同时返回 value.

在这里插入图片描述

class MyBlockingQueue {      int[] elems = new int[1000];      int head = 0;      int tail = 0;      int size = 0;            public void put(int elem) {          //判断数组是否满了          if (size == elems.length) {              return ;          }          elems[tail++] = elem;          size++;          if (tail == elems.length) {             tail = 0;          }      }      public Integer take() {           if (size == 0) {             return null;           }           int value = elems[head++];           size--;           if (head == elems.length) {               head = 0;           }           return value;      }    } 

4.2 加上线程安全

synchronized:关于线程安全我们之前介绍过,在这里需要进行加锁,用 synchronized 关键字修饰 put()方法和 take()方法,不难分析,在多线程情况下,put 和 take 方法内部有判定条件和修改操作,修改操作不是原子的,线程抢占式执行,因此,加锁即可解决线程不安全问题
volatile:针对head,tail,size这三个变量涉及到多个线程读取操作,为避免读取的时候出现问题,我们这里加上volitale关键字。
代码如下:

class MyBlockingQueue {      int[] elems = new int[1000];      volatile int head = 0;      volatile int tail = 0;      volatile int size = 0;            synchronized public void put(int elem) {          //判断数组是否满了          if (size == elems.length) {              return ;          }          elems[tail++] = elem;          size++;          if (tail == elems.length) {             tail = 0;          }      }      synchronized public Integer take() {           if (size == 0) {             return null;           }           int value = elems[head++];           size--;           if (head == elems.length) {               head = 0;           }           return value;      }    } 

3. 加上阻塞功能

我们要实现一个阻塞队列,需要满足阻塞的特性,即队列满时,继续 put() 操作会阻塞,直到有线程有 take() 操作使队列不满;队列为空时,继续 take() 操作会阻塞,直到有线程 put() 操作,使队列不空

通过 wait() 方法 和 notify() 方法配合使用就可以实现上述阻塞功能!其中需要注意调用 wait() 方法 和 notify() 方法的对象必须是和加锁的对象一致,共享一个锁对象,否则会抛出 IllegalMonitorStateException 异常,由于 synchronized 直接加在成员方法上,锁对象就是 this,那么 wait() 方法 和 notify() 方法的对象也就是 this,代码如下:

class MyBlockingQueue {      int[] elems = new int[1000];      volatile int head = 0;      volatile int tail = 0;      volatile int size = 0;            synchronized public void put(int elem) {          //判断数组是否满了          while (size == elems.length) {              this.wait();          }          elems[tail++] = elem;          size++;          if (tail == elems.length) {             tail = 0;          }          this.notify();      }      synchronized public Integer take() {           while (size == 0) {             this.wait();           }           int value = elems[head++];           size--;           if (head == elems.length) {               head = 0;           }           this.notify();           return value;      }  } 

使用自己实现的阻塞队列,并实现生产者消费者模型代码:

class MyBlockingQueue {     private int[] elems = new int[1000];     //加volatile的原因是在多个线程的调度中可能会出现错误     volatile private int head = 0;     volatile private int tail = 0;     volatile private int size = 0;      synchronized public void put(int elem) throws InterruptedException {        //用while而不用if的原因是在别的线程中可能会有interrupt,导致wait被提前唤醒          while (size == elems.length) {              this.wait();          }          elems[tail] = elem;          tail++;          if (tail == elems.length) {              tail = 0;          }            size++;            this.notify();      }      synchronized public Integer take() throws InterruptedException {         while (size == 0) {             this.wait();;         }         int value = elems[head];         head++;          if (head == elems.length) {             head = 0;         }         size--;         this.notify();         return value;      }   } public class ThreadDemo22 {     public static void main(String[] args) {         MyBlockingQueue myBlockingQueue = new MyBlockingQueue();          Thread t1 = new Thread( () -> {             while (true) {                 try {                     int value = myBlockingQueue.take();                     System.out.println("消费者" + value);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }         });          Thread t2 = new Thread( () -> {             int count = 0;             while (true) {                 try {                     System.out.println("生产者" + count);                     myBlockingQueue.put(count);                     Thread.sleep(1000);                     count++;                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }         });         t1.start();         t2.start();     } } 

本次的分享就结束了,感谢支持!

相关内容

热门资讯

小牛圈服务器的构造与功能特性解... 小牛圈的服务器是高性能、高稳定性的硬件设备,具备强大的处理能力和存储容量。这些服务器通常配置有先进的...
如何充分利用购买的云服务器和域... 购买云服务器和域名主要用于搭建网站或应用,提供在线服务。云服务器作为数据存储和处理的中心,保证网站稳...
【制作100个unity游戏之... 最终效果系列导航制作100个unity游戏之27】使用unity复刻经典游戏《植物大战僵尸》...
曙光服务器电源持续报警的原因是... 曙光服务器电源持续报警可能由多个因素引起,包括电源单元故障、电压不稳定、过载或内部连接问题。建议检查...
选择搭建FRP时,应考虑哪些服... 搭建frp(fast reverse proxy)通常需要租用一台具有公网IP地址的服务器,以便能够...
【UE5/UE4】超详细教程接... 【UE5/UE4】超详细教程接入科大讯飞语音唤醒SDK并初始持久监听(10102错误码...
Unity接入IAP内购(An... 从0开始接入Unity IAP欢迎进入Unity内购系列整体流程介绍第一篇(内购接入)接入环境一、创...
双人成行UE4-nuts ga... 双人成行(It Takes Two)出现“UE4-nuts game已崩...
ffmpeg 调用gpu进行转... ffmpeg -hwaccel cuvid -i rtsp://admin:xxxx@192...
(二)springboot2.... 引入官方流程设计器1. activiti-webapp-explorer2-5.23.0.war项目...