百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

disruptor笔记之三环形队列的基础操作(不用Disruptor类)

toyiye 2024-05-25 20:11 29 浏览 0 评论

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

本篇概览

  • 本文是《disruptor笔记》系列的第三篇,主要任务是编码实现消息生产和消费,与《disruptor笔记之一:快速入门》不同的是,本次开发不使用Disruptor类,和Ring Buffer(环形队列)相关的操作都是自己写代码实现;
  • 这种脱离Disruptor类操作Ring Buffer的做法,不适合用在生产环境,但在学习Disruptor的过程中,这是种高效的学习手段,经过本篇实战后,在今后使用Disruptor时,您在开发、调试、优化等各种场景下都能更加得心应手;
  • 简单的消息生产消费已不能满足咱们的学习热情,今天的实战要挑战以下三个场景:
  1. 100个事件,单个消费者消费;
  2. 100个事件,三个消费者,每个都独自消费这个100个事件;
  3. 100个事件,三个消费者共同消费这个100个事件;

前文回顾

为了完成本篇的实战,前文《disruptor笔记之二:Disruptor类分析》已做了充分的研究分析,建议观看,这里简单回顾以下Disruptor类的几个核心功能,这也是咱们编码时要实现的:

  1. 创建环形队列(RingBuffer对象)
  2. 创建SequenceBarrier对象,用于接收ringBuffer中的可消费事件
  3. 创建BatchEventProcessor,负责消费事件
  4. 绑定BatchEventProcessor对象的异常处理类
  5. 调用ringBuffer.addGatingSequences,将消费者的Sequence传给ringBuffer
  6. 启动独立线程,用来执行消费事件的业务逻辑
  • 理论分析已经完成,接下来开始编码;

源码下载

  • 本篇实战中的完整源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):

  • 这个git项目中有多个文件夹,本次实战的源码在disruptor-tutorials文件夹下,如下图红框所示:

  • disruptor-tutorials是个父工程,里面有多个module,本篇实战的module是low-level-operate,如下图红框所示:

开发

  • 进入编码阶段,今天的任务是挑战以下三个场景:
  1. 100个事件,单个消费者消费;
  2. 100个事件,三个消费者,每个都独自消费这个100个事件;
  3. 100个事件,三个消费者共同消费这个100个事件;
  • 咱们先把工程建好,然后编写公共代码,例如事件定义、事件工厂等,最后才是每个场景的开发;
  • 在父工程disruptor-tutorials新增名为low-level-operate的module,其build.gradle如下:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'com.lmax:disruptor'

    testImplementation('org.springframework.boot:spring-boot-starter-test')
}
  • 然后是springboot启动类:
package com.bolingcavalry;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LowLevelOperateApplication {
	public static void main(String[] args) {
		SpringApplication.run(LowLevelOperateApplication.class, args);
	}
}
  • 事件类,这是事件的定义:
package com.bolingcavalry.service;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@ToString
@NoArgsConstructor
public class StringEvent {
    private String value;
}
  • 事件工厂,定义如何在内存中创建事件对象:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventFactory;

public class StringEventFactory implements EventFactory<StringEvent> {
    @Override
    public StringEvent newInstance() {
        return new StringEvent();
    }
}
  • 事件生产类,定义如何将业务逻辑的事件转为disruptor事件发布到环形队列,用于消费:
package com.bolingcavalry.service;

import com.lmax.disruptor.RingBuffer;

public class StringEventProducer {

    // 存储数据的环形队列
    private final RingBuffer<StringEvent> ringBuffer;

    public StringEventProducer(RingBuffer<StringEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(String content) {

        // ringBuffer是个队列,其next方法返回的是下最后一条记录之后的位置,这是个可用位置
        long sequence = ringBuffer.next();

        try {
            // sequence位置取出的事件是空事件
            StringEvent stringEvent = ringBuffer.get(sequence);
            // 空事件添加业务信息
            stringEvent.setValue(content);
        } finally {
            // 发布
            ringBuffer.publish(sequence);
        }
    }
}
  • 事件处理类,收到事件后具体的业务处理逻辑:
package com.bolingcavalry.service;

import com.lmax.disruptor.EventHandler;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class StringEventHandler implements EventHandler<StringEvent> {

    public StringEventHandler(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception {
        log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event);

        // 这里延时100ms,模拟消费事件的逻辑的耗时
        Thread.sleep(100);

        // 如果外部传入了consumer,就要执行一次accept方法
        if (null!=consumer) {
            consumer.accept(null);
        }
    }
}
  • 定义一个接口,外部通过调用接口的方法来生产消息,再放几个常量在里面后面会用到:
package com.bolingcavalry.service;

public interface LowLevelOperateService {
    /**
     * 消费者数量
     */
    int CONSUMER_NUM = 3;

    /**
     * 环形缓冲区大小
     */
    int BUFFER_SIZE = 16;

    /**
     * 发布一个事件
     * @param value
     * @return
     */
    void publish(String value);

    /**
     * 返回已经处理的任务总数
     * @return
     */
    long eventCount();
}
  • 以上就是公共代码了,接下来逐个实现之前规划的三个场景;

100个事件,单个消费者消费

  • 这是最简单的功能了,实现发布消息和单个消费者消费的功能,代码如下,有几处要注意的地方稍后提到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service("oneConsumer")
@Slf4j
public class OneConsumerServiceImpl implements LowLevelOperateService {

    private RingBuffer<StringEvent> ringBuffer;

    private StringEventProducer producer;

    /**
     * 统计消息总数
     */
    private final AtomicLong eventCount = new AtomicLong();

    private ExecutorService executors;

    @PostConstruct
    private void init() {
        // 准备一个匿名类,传给disruptor的事件处理类,
        // 这样每次处理事件时,都会将已经处理事件的总数打印出来
        Consumer<?> eventCountPrinter = new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            }
        };

        // 创建环形队列实例
        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);

        // 准备线程池
        executors = Executors.newFixedThreadPool(1);

        //创建SequenceBarrier
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

        // 创建事件处理的工作类,里面执行StringEventHandler处理事件
        BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
                ringBuffer,
                sequenceBarrier,
                new StringEventHandler(eventCountPrinter));

        // 将消费者的sequence传给环形队列
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

        // 在一个独立线程中取事件并消费
        executors.submit(batchEventProcessor);

        // 生产者
        producer = new StringEventProducer(ringBuffer);
    }

    @Override
    public void publish(String value) {
        producer.onData(value);
    }

    @Override
    public long eventCount() {
        return eventCount.get();
    }
}
  • 上述代码有以下几处需要注意:
  1. 自己创建环形队列RingBuffer实例
  2. 自己准备线程池,里面的线程用来获取和消费消息
  3. 自己动手创建BatchEventProcessor实例,并把事件处理类传入
  4. 通过ringBuffer创建sequenceBarrier,传给BatchEventProcessor实例使用
  5. 将BatchEventProcessor的sequence传给ringBuffer,确保ringBuffer的生产和消费不会出现混乱
  6. 启动线程池,意味着BatchEventProcessor实例在一个独立线程中不断的从ringBuffer中获取事件并消费;
  • 为了验证上述代码能否正常工作,我这里写了个单元测试类,如下所示,逻辑很简单,调用OneConsumerServiceImpl.publish方法一百次,产生一百个事件,再检查OneConsumerServiceImpl记录的消费事件总数是不是等于一百:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.LowLevelOperateService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertEquals;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class LowLeverOperateServiceImplTest {

    @Autowired
    @Qualifier("oneConsumer")
    LowLevelOperateService oneConsumer;

    private static final int EVENT_COUNT = 100;

    private void testLowLevelOperateService(LowLevelOperateService service, int eventCount, int expectEventCount) throws InterruptedException {
        for(int i=0;i<eventCount;i++) {
            log.info("publich {}", i);
            service.publish(String.valueOf(i));
        }

        // 异步消费,因此需要延时等待
        Thread.sleep(10000);

        // 消费的事件总数应该等于发布的事件数
        assertEquals(expectEventCount, service.eventCount());
    }

    @Test
    public void testOneConsumer() throws InterruptedException {
        log.info("start testOneConsumerService");
        testLowLevelOperateService(oneConsumer, EVENT_COUNT, EVENT_COUNT);
    }
  • 注意,如果您是直接在IDEA上点击图标来执行单元测试,记得勾选下图红框中选项,否则可能出现编译失败:

  • 执行上述单元测试类,结果如下图所示,消息的生产和消费都符合预期,并且消费逻辑是在独立线程中执行的:

  • 继续挑战下一个场景;

100个事件,三个消费者,每个都独自消费这个100个事件

  • 这个场景在kafka中也有,就是三个消费者的group不同,这样每一条消息,这两个消费者各自消费一次;
  • 因此,100个事件,3个消费者每人都会独立消费这100个事件,一共消费300次;
  • 代码如下,有几处要注意的地方稍后提到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service("multiConsumer")
@Slf4j
public class MultiConsumerServiceImpl implements LowLevelOperateService {

    private RingBuffer<StringEvent> ringBuffer;

    private StringEventProducer producer;

    /**
     * 统计消息总数
     */
    private final AtomicLong eventCount = new AtomicLong();

    /**
     * 生产一个BatchEventProcessor实例,并且启动独立线程开始获取和消费消息
     * @param executorService
     */
    private void addProcessor(ExecutorService executorService) {
        // 准备一个匿名类,传给disruptor的事件处理类,
        // 这样每次处理事件时,都会将已经处理事件的总数打印出来
        Consumer<?> eventCountPrinter = new Consumer<Object>() {
            @Override
            public void accept(Object o) {
                long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            }
        };

        BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>(
                ringBuffer,
                ringBuffer.newBarrier(),
                new StringEventHandler(eventCountPrinter));

        // 将当前消费者的sequence实例传给ringBuffer
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

        // 启动独立线程获取和消费事件
        executorService.submit(batchEventProcessor);
    }

    @PostConstruct
    private void init() {
        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);

        ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);

        // 创建多个消费者,并在独立线程中获取和消费事件
        for (int i=0;i<CONSUMER_NUM;i++) {
            addProcessor(executorService);
        }

        // 生产者
        producer = new StringEventProducer(ringBuffer);
    }

    @Override
    public void publish(String value) {
        producer.onData(value);
    }

    @Override
    public long eventCount() {
        return eventCount.get();
    }
}
  • 上述代码和前面的OneConsumerServiceImpl相比差别不大,主要是创建了多个BatchEventProcessor实例,然后分别在线程池中提交;
  • 验证方法依旧是单元测试,在刚才的LowLeverOperateServiceImplTest.java中增加代码即可,注意testLowLevelOperateService的第三个参数是EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM,表示预期的被消费消息数为300
 	@Autowired
    @Qualifier("multiConsumer")
    LowLevelOperateService multiConsumer;

    @Test
    public void testMultiConsumer() throws InterruptedException {
        log.info("start testMultiConsumer");
        testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM);
    }
  • 执行单元测试,如下图所示,一共消费了300个事件,并且三个消费者在不同线程:

100个事件,三个消费者共同消费这个100个事件

  • 本篇的最后一个实战是发布100个事件,然后让三个消费者共同消费100个(例如A消费33个,B消费33个,C消费34个);
  • 前面用到的BatchEventProcessor是用来独立消费的,不适合多个消费者共同消费,这种多个消费共同消费的场景需要借助WorkerPool来完成,这个名字还是很形象的:一个池子里面有很多个工作者,把任务放入这个池子,工作者们每人处理一部分,大家合力将任务完成;
  • 传入WorkerPool的消费者需要实现WorkHandler接口,于是新增一个实现类:
package com.bolingcavalry.service;

import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Consumer;

@Slf4j
public class StringWorkHandler implements WorkHandler<StringEvent> {

    public StringWorkHandler(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次
    private Consumer<?> consumer;

    @Override
    public void onEvent(StringEvent event) throws Exception {
        log.info("work handler event : {}", event);

        // 这里延时100ms,模拟消费事件的逻辑的耗时
        Thread.sleep(100);

        // 如果外部传入了consumer,就要执行一次accept方法
        if (null!=consumer) {
            consumer.accept(null);
        }
    }
}
  • 新增服务类,实现共同消费的逻辑,有几处要注意的地方稍后会提到:
package com.bolingcavalry.service.impl;

import com.bolingcavalry.service.*;
import com.lmax.disruptor.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

@Service("workerPoolConsumer")
@Slf4j
public class WorkerPoolConsumerServiceImpl implements LowLevelOperateService {

    private RingBuffer<StringEvent> ringBuffer;

    private StringEventProducer producer;

    /**
     * 统计消息总数
     */
    private final AtomicLong eventCount = new AtomicLong();

    @PostConstruct
    private void init() {
        ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE);

        ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM);

        StringWorkHandler[] handlers = new StringWorkHandler[CONSUMER_NUM];

        // 创建多个StringWorkHandler实例,放入一个数组中
        for (int i=0;i < CONSUMER_NUM;i++) {
            handlers[i] = new StringWorkHandler(o -> {
                long count = eventCount.incrementAndGet();
                log.info("receive [{}] event", count);
            });
        }

        // 创建WorkerPool实例,将StringWorkHandler实例的数组传进去,代表共同消费者的数量
        WorkerPool<StringEvent> workerPool = new WorkerPool<>(ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), handlers);

        // 这一句很重要,去掉就会出现重复消费同一个事件的问题
        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());

        workerPool.start(executorService);

        // 生产者
        producer = new StringEventProducer(ringBuffer);
    }

    @Override
    public void publish(String value) {
        producer.onData(value);
    }

    @Override
    public long eventCount() {
        return eventCount.get();
    }
}

上述代码中,要注意的有以下两处:

  1. StringWorkHandler数组传入给WorkerPool后,每个StringWorkHandler实例都放入一个新的WorkProcessor实例,WorkProcessor实现了Runnable接口,在执行workerPool.start时,会将WorkProcessor提交到线程池中;
  2. 和前面的独立消费相比,共同消费最大的特点在于只调用了一次ringBuffer.addGatingSequences方法,也就是说三个消费者共用一个sequence实例;
  • 验证方法依旧是单元测试,在刚才的LowLeverOperateServiceImplTest.java中增加代码即可,注意testWorkerPoolConsumer的第三个参数是EVENT_COUNT,表示预期的被消费消息数为100
 	@Autowired
    @Qualifier("workerPoolConsumer")
    LowLevelOperateService workerPoolConsumer;
    
    @Test
    public void testWorkerPoolConsumer() throws InterruptedException {
        log.info("start testWorkerPoolConsumer");
        testLowLevelOperateService(workerPoolConsumer, EVENT_COUNT, EVENT_COUNT);
    }
  • 执行单元测试如下图所示,三个消费者一共消费100个事件,且三个消费者在不同线程

  • 至此,咱们在不用Disruptor类的前提下完成了三种常见场景的消息生产消费,相信您对Disruptor的底层实现也有了深刻认识,今后不论是使用还是优化Disruptor,一定可以更加得心应手;

你不孤单,欣宸原创一路相伴

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码