百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

突击并发编程JUC系列-阻塞队列 BlockingQueue

toyiye 2024-06-21 12:20 10 浏览 0 评论

突击并发编程JUC系列演示代码地址:

https://github.com/mtcarpenter/JavaTutorial

作者:故人

出处:https://segmentfault.com/a/1190000037574304

什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

  • 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

插入和移除操作的4种处理方式



  • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出 IllegalStateException ("Queue full")异常。当队列空时,从队列里获取元素会抛出 NoSuchElementException 异常。
  • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回 null 。
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里 put 元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里 take 元素,队列会阻塞住消费者线程,直到队列不为空。
  • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

如果是无界阻塞队列,队列不可能会出现满的情况,所以使用 put 或 offer 方法永远不会被阻塞,而且使用offer方法时,该方法永远返回 true。

ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。

默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。

阻塞式写方法

ArrayBlockingQueue 中提供了两个阻塞式写方法,分别如下(在该队列中,无论是阻塞式写方法还是非阻塞式写方法,都不允许写入null)。

void put(E e)
boolean offer(E e, long timeout, TimeUnit unit)

put() 方法示例

public class ArrayBlockingQueueExample1 {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        try {
            queue.put("class 1");
            queue.put("class 2");
            queue.put("class 3");
            // 超过指定得容量当前线程阻塞
            queue.put("class 4");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

非阻塞式写方法

当队列已满时写入数据,如果不想使得当前线程进入阻塞,那么就可以使用非阻塞式的写操作方法。

boolean add(E e)
boolean offer(E e)

add() 方法示例

public class ArrayBlockingQueueExample2 {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        queue.add("class 1");
        queue.add("class 2");
        queue.add("class 3");
        //  超过指定容量 抛出异常
        queue.add("class 4");
    }
}
// 抛出异常

阻塞式读方法

E take()
E poll(long timeout, TimeUnit unit)

take() 方法示例

public class ArrayBlockingQueueExample3 {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        queue.add("class 1");
        queue.add("class 2");
        queue.add("class 3");
        try {
            // 取出对头元素
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 队列大小 
        System.out.println(queue.size());
    }
}
//class 1
// 2

非阻塞式读方法

E poll()
E peek()
public class ArrayBlockingQueueExample4 {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        // 队列无元素 直接返回 null
        System.out.println(queue.poll( ));
        System.out.println(queue.peek( ));
    }
}
// null
// null

部分源码

public void put(E e) throws InterruptedException {
        // 检查元素
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 获取锁
        lock.lockInterruptibly();
        try {
            // 元素满 一直阻塞,队列非满时,被唤醒
            while (count == items.length)
                notFull.await();
            // 入队
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
         // 获取锁
        lock.lockInterruptibly();
        try {
            // 队列为空 等待
            while (count == 0)
                notEmpty.await();
            // 出队
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

LinkedBlockingQueue

LinkedBlockingQueue 是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE 。此队列按照先进先出的原则对元素进行排序。

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

public class PriorityBlockingQueueExample1 {
    public static void main(String[] args) {
        PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue();
        queue.offer(1);
        queue.offer(12);
        queue.offer(21);
        queue.offer(6);
        // 内部排序
        System.out.println(queue.poll()); // 1
        System.out.println(queue.poll()); // 6
        System.out.println(queue.poll()); // 12
        System.out.println(queue.poll()); //21

    }
}

DelayQueue

DelayQueue 是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。

DelayQueue 非常有用,可以将 DelayQueue 运用在以下应用场景。

  • 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue ,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
  • 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,比如 TimerQueue 就是使用 DelayQueue 实现的。

DelayQueue 队列的元素必须实现 Delayed 接口。我们可以参考 ScheduledThreadPoolExecutorScheduledFutureTask 类的实现。

public class DelayQueueExample1 {

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedEntry> queue = new DelayQueue<>();
        // 延期3秒 处理
        queue.put(new DelayedEntry("A", 30000L));
        // 延期10 秒处理
        queue.add(new DelayedEntry("B", 10000L));
        // 延期 20 秒处理
        queue.add(new DelayedEntry("C", 20000L));
        int size = queue.size();
        System.out.println("当前时间是:" + LocalDateTime.now());
        // 从延时队列中获取元素, 将输出 A,B,C
        for (int i = 0; i < size; i++) {
            System.out.println(queue.take() + " ------ " + LocalDateTime.now());
        }
    }
}

/**
 * 继承 Delayed 接口
 */
class DelayedEntry implements Delayed {
    /**
     * 元素数据内容
     */
    private final String value;
    /**
     * 用于计算失效时间
     */
    private final long exeTime;

    DelayedEntry(String value, long exeTime) {
        this.value = value;
        this.exeTime = exeTime + System.currentTimeMillis();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return exeTime - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        DelayedEntry t = (DelayedEntry) o;
        if (this.exeTime < t.exeTime) {
            return -1;
        } else if (this.exeTime > t.exeTime) {
            return 1;
        } else {
            return 0;
        }

    }

    @Override
    public String toString() {
        return "DelayedEntry{" +
                "value=" + value +
                ", exeTime=" + exeTime +
                '}';
    }
}

//当前时间是:2020-10-15T16:26:37.167
//DelayedEntry{value=B, exeTime=1602750407104} ------ 2020-10-15T16:26:47.117
// DelayedEntry{value=C, exeTime=1602750417104} ------ 2020-10-15T16:26:57.105
//DelayedEntry{value=A, exeTime=1602750427104} ------ 2020-10-15T16:27:07.104

SynchronousQueue

SynchronousQueue 是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。

它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法可以创建公平性访问的 SynchronousQueue ,如果设置为true,则等待的线程会采用先进先出的顺序访问队列。

LinkedTransferQueue

LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列, LinkedTransferQueue 多了 tryTransfertransfer 方法。

  • transfer方法如果当前有消费者正在等待接收元素(消费者使用 take() 方法或带时间限制的poll()方法时), transfer 方法可以把生产者传入的元素立刻 transfer (传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。 transfer 方法的关键代码如下
  • tryTransfer方法tryTransfer 方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收,方法立即返回,而 transfer 方法是必须等到消费者消费了才返回。对于带有时间限制的 tryTransfer(E e,long timeout,TimeUnit unit) 方法,试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回 true。

LinkedBlockingDeque

LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列, LinkedBlockingDeque 多了 addFirstaddLastofferFirstofferLastpeekFirstpeekLast 等方法,以 First 单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以 Last 单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法 add 等同于 addLast ,移除方法 remove 等效于 removeFirst 。但是 take 方法却等同于 takeFirst ,不知道是不是 JDK 的 bug,使用时还是用带有 FirstLast 后缀的方法更清楚。

在初始化 LinkedBlockingDeque 时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。


突击并发编程JUC系列演示代码地址:

https://github.com/mtcarpenter/JavaTutorial

作者:故人

出处:https://segmentfault.com/a/1190000037574304

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码