百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

Disruptor-源码解读

toyiye 2024-06-27 00:39 11 浏览 0 评论

前言

Disruptor的高性能,是多种技术结合以及本身架构的结果。本文主要讲源码,涉及到的相关知识点需要读者自行去了解,以下列出:

  • 锁和CAS
  • 伪共享和缓存行
  • volatile和内存屏障

原理

此节结合demo来看更容易理解:传送门

下图来自官方文档

官方原图有点乱,我翻译一下

在讲原理前,先了解 Disruptor 定义的术语

  • Event
  • 存放数据的单位,对应 demo 中的 LongEvent
  • Ring Buffer
  • 环形数据缓冲区:这是一个首尾相接的环,用于存放 Event ,用于生产者往其存入数据和消费者从其拉取数据
  • Sequence
  • 序列:用于跟踪进度(生产进度、消费进度)
  • Sequencer
  • Disruptor的核心,用于在生产者和消费者之间传递数据,有单生产者和多生产者两种实现。
  • Sequence Barrier
  • 序列屏障,消费者之间的依赖关系就靠序列屏障实现
  • Wait Strategy
  • 等待策略,消费者等待生产者将发布的策略
  • Event Processor
  • 事件处理器,循环从 RingBuffer 获取 Event 并执行 EventHandler。
  • Event Handler
  • 事件处理程序,也就是消费者
  • Producer
  • 生产者

Ring Buffer

环形数据缓冲区(RingBuffer),逻辑上是首尾相接的环,在代码中用数组来表示Object[]。Disruptor生产者发布分两步

  • 步骤一:申请写入 n 个元素,如果可以写入,这返回最大序列号
  • 步骤二:根据序列号去 RingBuffer 中获取 Event,修改并发布
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();// 获取下一个可用位置的下标(步骤1)long sequence = ringBuffer.next();try {    // 返回可用位置的元素    LongEvent event = ringBuffer.get(sequence);    // 设置该位置元素的值    event.set(l);} finally {    // 发布    ringBuffer.publish(sequence);}

这两个步骤由 Sequencer 完成,分为单生产者和多生产者实现

Sequencer

单生产者

如果申请 2 个元素,则如下图所示(圆表示 RingBuffer)

// 一般不会有以下写法,这里为了讲解源码才使用next(2)// 向RingBuffer申请两个元素long sequence = ringBuffer.next(2);for (long i =  sequence-1; i <= sequence; i++) {    try {        // 返回可用位置的元素        LongEvent event = ringBuffer.get(i);        // 设置该位置元素的值        event.set(1);    } finally {        ringBuffer.publish(i);    }}

next 申请成功的序列,cursor 消费者最大可用序列,gatingSequence 表示能申请的最大序列号。红色表示待发布,绿色表示已发布。申请相当于占位置,发布需要一个一个按顺序发布

如果 RingBuffer 满了呢,在上图步骤二的基础上,生产者发布了3个元素,消费者消费1个。此时生产者再申请 2个元素,就会变成下图所示

只剩下 1 个空间,但是要申请 2个元素,此时程序会自旋等待空间足够。

接下来结合代码看,单生产者的 Sequencer 实现为 SingleProducerSequencer,先看看构造方法

abstract class SingleProducerSequencerPad extends AbstractSequencer{    protected long p1, p2, p3, p4, p5, p6, p7;     SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy)    {        super(bufferSize, waitStrategy);    }} abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad{    SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy)    {        super(bufferSize, waitStrategy);    }     long nextValue = Sequence.INITIAL_VALUE;    long cachedValue = Sequence.INITIAL_VALUE;} public final class SingleProducerSequencer extends SingleProducerSequencerFields{    protected long p1, p2, p3, p4, p5, p6, p7;     public SingleProducerSequencer(int bufferSize, WaitStrategy waitStrategy)    {        super(bufferSize, waitStrategy);    }}

这是 Disruptor 高性能的技巧之一,SingleProducerSequencer 需要的类变量只有 nextValue 和cachedValue,p1 ~ p7 的作用是填充缓存行,这能保证 nextValue 和cachedValue 必定在独立的缓存行,我们可以用ClassLayout打印内存布局看看

接下来看如何获取序列号(也就是步骤一)

// 调用路径// RingBuffer#next()// SingleProducerSequencer#next()public long next(int n){    if (n < 1)    {        throw new IllegalArgumentException("n must be > 0");    }     long nextValue = this.nextValue;     //生产者当前序号值+期望获取的序号数量后达到的序号值    long nextSequence = nextValue + n;    //减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’    long wrapPoint = nextSequence - bufferSize;    //从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’    //这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。    long cachedGatingSequence = this.cachedValue;     //(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’    //(cachedGatingSequence > nextValue) : https://github.com/LMAX-Exchange/disruptor/issues/76    // 这里判断就是生产者生产的填满BingBUffer,需要等待消费者消费    if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)    {        cursor.setVolatile(nextValue);  // StoreLoad fence         //gatingSequences就是消费者队列末尾的序列,也就是消费者消费到哪里了        //实际上就是获得处理的队尾,如果队尾是current的话,说明所有的消费者都执行完成任务在等待新的事件了        long minSequence;        while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))        {            // 等待1纳秒            LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?        }         this.cachedValue = minSequence;    }     this.nextValue = nextSequence;     return nextSequence;} public void publish(long sequence){    // 更新序列号    cursor.set(sequence);    // 等待策略的唤醒    waitStrategy.signalAllWhenBlocking();}

要解释的都在注释里了,gatingSequences 是消费者队列末尾的序列,对应着就是下图中的 ApplicationConsumer 的 Sequence

多生产者

看完单生产者版,接下来看多生产者的实现。因为是多生产者,需要考虑并发的情况。

如果有A、B两个消费者都来申请 2 个元素

cursor 申请成功的序列,HPS 消费者最大可用序列,gatingSequence 表示能申请的最大序列号。红色表示待发布,绿色表示已发布。HPS 是我自己编的缩写,表示 getHighestPublishedSequence 方法的返回值

如图所示,只要申请成功,就移动 cursor 的位置。RingBuffer 并没有记录发布情况(图中的红绿颜色),这个发布情况由 MultiProducerSequenceravailableBuffer 来维护。

下面看代码

public final class MultiProducerSequencer extends AbstractSequencer{    // 缓存的消费者中最小序号值,相当于SingleProducerSequencerFields的cachedValue    private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);    // 标记元素是否可用    private final int[] availableBuffer;     public long next(int n)    {        if (n < 1)        {            throw new IllegalArgumentException("n must be > 0");        }         long current;        long next;         do        {            current = cursor.get();            next = current + n;             //减掉RingBuffer的总的buffer值,用于判断是否出现‘覆盖’            long wrapPoint = next - bufferSize;            //从后面代码分析可得:cachedValue就是缓存的消费者中最小序号值,他不是当前最新的‘消费者中最小序号值’,而是上次程序进入到下面的if判定代码段时,被赋值的当时的‘消费者中最小序号值’            //这样做的好处在于:在判定是否出现覆盖的时候,不用每次都调用getMininumSequence计算‘消费者中的最小序号值’,从而节约开销。只要确保当生产者的节奏大于了缓存的cachedGateingSequence一个bufferSize时,从新获取一下 getMinimumSequence()即可。            long cachedGatingSequence = gatingSequenceCache.get();             //(wrapPoint > cachedGatingSequence) : 当生产者已经超过上一次缓存的‘消费者中最小序号值’(cachedGatingSequence)一个‘Ring’大小(bufferSize),需要重新获取cachedGatingSequence,避免当生产者一直在生产,但是消费者不再消费的情况下,出现‘覆盖’            //(cachedGatingSequence > nextValue) : https://github.com/LMAX-Exchange/disruptor/issues/76            // 这里判断就是生产者生产的填满BingBUffer,需要等待消费者消费            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)            {                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);                 if (wrapPoint > gatingSequence)                {                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?                    continue;                }                 gatingSequenceCache.set(gatingSequence);            }            // 使用cas保证只有一个生产者能拿到next            else if (cursor.compareAndSet(current, next))            {                break;            }        }        while (true);         return next;    }......}

MultiProducerSequencerSingleProducerSequencer的 next()方法逻辑大致一样,只是多了CAS的步骤来保证并发的正确性。接着看发布方法

public void publish(final long sequence){    // 记录发布情况    setAvailable(sequence);    // 等待策略的唤醒    waitStrategy.signalAllWhenBlocking();} private void setAvailable(final long sequence){    // calculateIndex(sequence):获取序号    // calculateAvailabilityFlag(sequence):RingBuffer的圈数    setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));} private void setAvailableBufferValue(int index, int flag){    long bufferAddress = (index * SCALE) + BASE;    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);    // 上面相当于 availableBuffer[index] = flag 的高性能版}

记录发布情况,其实相当于 availableBuffer[sequence] = 圈数,前面说了,availableBuffer是用来标记元素是否可用的,如果消费者的圈数 ≠ availableBuffer中的圈数,则表示元素不可用

public boolean isAvailable(long sequence){    int index = calculateIndex(sequence);    // 计算圈数    int flag = calculateAvailabilityFlag(sequence);    long bufferAddress = (index * SCALE) + BASE;    // UNSAFE.getIntVolatile(availableBuffer, bufferAddress):相当于availableBuffer[sequence] 的高性能版    return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;} private int calculateAvailabilityFlag(final long sequence){    // 相当于 sequence % bufferSize ,但是位操作更快    return (int) (sequence >>> indexShift);}

isAvailable() 方法判断元素是否可用,此方法的调用堆栈看完消费者就清楚了。

消费者

本小节介绍两个方面,一是 Disruptor 的消费者如何实现依赖关系的,二是消费者如何拉取消息并消费

消费者的依赖关系实现

我们看回这张图,每个消费者前都有一个 SequenceBarrier ,这就是消费者之间能实现依赖的关键。每个消费者都有一个 Sequence,表示自身消费的进度,如图中,ApplicationConsumer 的 SequenceBarrier 就持有 ReplicaionConsumer 和 JournalConsumer 的 Sequence,这样就能控制 ApplicationConsumer 的消费进度不超过其依赖的消费者。

下面看源码,这是 disruptor 配置消费者的代码。

EventHandler journalConsumer = xxx;EventHandler replicaionConsumer = xxx;EventHandler applicationConsumer = xxx; disruptor.handleEventsWith(journalConsumer, replicaionConsumer)        .then(applicationConsumer); // 下面两行等同于上面这行// disruptor.handleEventsWith(journalConsumer, replicaionConsumer);// disruptor.after(journalConsumer, replicaionConsumer).then(applicationConsumer);

先看ReplicaionConsumer 和 JournalConsumer 的配置 disruptor.handleEventsWith(journalConsumer, replicaionConsumer)

/** 代码都在Disruptor类 **/ public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){    // 没有依赖的消费者就创建新的Sequence    return createEventProcessors(new Sequence[0], handlers);} /** * 创建消费者 * @param barrierSequences 当前消费者组的屏障序列数组,如果当前消费者组是第一组,则取一个空的序列数组;否则,barrierSequences就是上一组消费者组的序列数组 * @param eventHandlers 事件消费逻辑的EventHandler数组 */EventHandlerGroup<T> createEventProcessors(    final Sequence[] barrierSequences,    final EventHandler<? super T>[] eventHandlers){    checkNotStarted();     // 对应此事件处理器组的序列组    final Sequence[] processorSequences = new Sequence[eventHandlers.length];    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);     for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++)    {        final EventHandler<? super T> eventHandler = eventHandlers[i];         // 创建消费者,注意这里传入了SequenceBarrier        final BatchEventProcessor<T> batchEventProcessor =            new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);         if (exceptionHandler != null)        {            batchEventProcessor.setExceptionHandler(exceptionHandler);        }         consumerRepository.add(batchEventProcessor, eventHandler, barrier);        processorSequences[i] = batchEventProcessor.getSequence();    }     // 每次添加完事件处理器后,更新门控序列,以便后续调用链的添加    // 所谓门控,就是RingBuffer要知道在消费链末尾的那组消费者(也是最慢的)的进度,避免消息未消费就被写入覆盖    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);     return new EventHandlerGroup<>(this, consumerRepository, processorSequences);}

createEventProcessors() 方法主要做了3件事,创建消费者、保存eventHandler和消费者的映射关系、更新 gatingSequences

  • EventProcessor 是消费者
  • SequenceBarrier 是消费者屏障,保证了消费者的依赖关系
  • consumerRepository 保存了eventHandler和消费者的映射关系

gatingSequences 我们在前面说过,生产者通过 gatingSequences 知道消费者的进度,防止生产过快导致消息被覆盖,更新操作在 updateGatingSequencesForNextInChain() 方法中

// 为消费链下一组消费者,更新门控序列// barrierSequences是上一组事件处理器组的序列(如果本次是第一次,则为空数组),本组不能超过上组序列值// processorSequences是本次要设置的事件处理器组的序列private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences){    if (processorSequences.length > 0)    {        // 将本组序列添加到Sequencer中的gatingSequences中        ringBuffer.addGatingSequences(processorSequences);        // 将上组消费者的序列从gatingSequences中移除        for (final Sequence barrierSequence : barrierSequences)        {            ringBuffer.removeGatingSequence(barrierSequence);        }        // 取消标记上一组消费者为消费链末端        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);    }}

让我们把视线再回到消费者的设置方法

disruptor.handleEventsWith(journalConsumer, replicaionConsumer)        .then(applicationConsumer);

journalConsumer 和 replicaionConsumer 已经设置了,接下来是 applicationConsumer

/** 代码在EventHandlerGroup类 **/ public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers){    return handleEventsWith(handlers);} public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){    return disruptor.createEventProcessors(sequences, handlers);}

可以看到,设置 applicationConsumer 最终调用的也是 createEventProcessors() 方法,区别就在于 createEventProcessors() 方法的第一个参数,这里的 sequences 就是 journalConsumer 和 replicaionConsumer 这两个消费者的 Sequence

消费者的消费逻辑

消费者的主要消费逻辑在 EventProcessor#run()方法中,下面以BatchEventProcessor举例

// BatchEventProcessor#run()// BatchEventProcessor#processEvents()private void processEvents(){    T event = null;    long nextSequence = sequence.get() + 1L;     while (true)    {        try        {            // 获取最大可用序列            final long availableSequence = sequenceBarrier.waitFor(nextSequence);            ...             // 执行消费逻辑            while (nextSequence <= availableSequence)            {                // dataProvider就是RingBuffer                event = dataProvider.get(nextSequence);                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);                nextSequence++;            }             sequence.set(availableSequence);        }        catch ()        {            // 异常处理        }    }}

方法简洁明了,在死循环中通过 sequenceBarrier 获取最大可用序列,然后从 RingBuffer 中获取 Event 并调用 EventHandler 进行消费。重点在 sequenceBarrier.waitFor(nextSequence); 中

public long waitFor(final long sequence)        throws AlertException, InterruptedException, TimeoutException{    checkAlert();    // 获取可用的序列,这里返回的是Sequencer#next方法设置成功的可用下标,不是Sequencer#publish    // cursorSequence:生产者的最大可用序列    // dependentSequence:依赖的消费者的最大可用序列    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);     if (availableSequence < sequence)    {        return availableSequence;    }    // 获取最大的已发布成功的序号(对于发布是否成功的校验在此方法中)    return sequencer.getHighestPublishedSequence(sequence, availableSequence);}

熟悉的 getHighestPublishedSequence() 方法,忘了就回去看看生产者小节。waitStrategy.waitFor() 对应着图片中的 waitFor() 。

消费者的启动

前面讲了消费者的处理逻辑,但是 BatchEventProcessor#run() 是如何被调用的呢,关键在于disruptor.start();

// Disruptor#start()public RingBuffer<T> start(){    checkOnlyStartedOnce();    for (final ConsumerInfo consumerInfo : consumerRepository)    {        consumerInfo.start(executor);    }     return ringBuffer;} class EventProcessorInfo<T> implements ConsumerInfo{    public void start(final Executor executor)    {        // eventprocessor就是消费者        executor.execute(eventprocessor);    }}

还记得 consumerRepository吗,没有就往上翻翻设置消费者那里的 disruptor.handleEventsWith() 方法。

所以启动过程就是
disruptor#start() → ConsumerInfo#start() → Executor#execute() → EventProcessor#run()

课后作业:Disruptor 的消费者使用了多少线程?

总结

本文讲了 Disruptor 大体逻辑和源码,当然其高性能的秘诀不止文中描述的那些。还有不同的等待策略,Sequence 中使用Unsafe而不是JDK中的 Atomic 原子类等等。

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码