百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

Java&Go高性能队列之Disruptor性能测试

toyiye 2024-05-25 20:11 18 浏览 0 评论

之前写过Java&Go高性能队列之LinkedBlockingQueue性能测试之后,就一直准备这这篇文章,作为准备内容的过程中也写过一些Disruptor高性能消息队列的应用文章:高性能队列Disruptor在测试中应用和千万级日志回放引擎设计稿。

Disruptor以高性能出名,下面我来测试一下三种场景下性能表现。

有一些基本的设定和用词规范,大家可以翻看Java&Go高性能队列之LinkedBlockingQueue性能测试。

结论

总体来说,com.lmax.disruptor.dsl.Disruptor消费性能是非常厉害的,几乎是测不到顶。但是在生产方面,性能会随着Event的增加会下降很多 还是在50万QPS级别上,满足现在压测需求,唯一需要避免的就是队列较长时性能不稳定。总结起来几点比较通用的参考:

  • 从Disruptor消费者能力超强,即使在超高消费者数量(1000),依然保持非常高性能
  • 保证无消息积压前提下,com.lmax.disruptor.AbstractSequencer#bufferSize大小对性能影响不大
  • 在单生产者场景下,Disruptor生产速率与java.util.concurrent.LinkedBlockingQueue一样具有性能不稳定的问题
  • Disruptor性能瓶颈在于生产者,消息对象大小对性能影响较大,多生产者对总体性能影响不大,队列积压对性能影响也不大
  • 如果降低Event体积会极大提升性能,以后尽量使用java.lang.String,这点已经在日志回放系统印证了

简介

这里再多唠叨两句。

?

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单。

?

测试结果

这里性能只记录每毫秒处理消息(对象)个数作为评价性能的唯一标准。这里我采用的是com.lmax.disruptor.dsl.ProducerType#MULTI消费模式,注册消费者用的是com.lmax.disruptor.dsl.Disruptor#handleEventsWithWorkerPool方法。

数据说明

这里我用了三种org.apache.http.client.methods.HttpGet,创建方法均使用原生API,为了区分大小的区别,我会响应增加一些header和URL长度。

小对象:

def get = new HttpGet()

中对象:

def get = new HttpGet(url)
get.addHeader("token", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)

大对象:

def get = new HttpGet(url + token)
get.addHeader("token", token)
get.addHeader("token1", token)
get.addHeader("token5", token)
get.addHeader("token4", token)
get.addHeader("token3", token)
get.addHeader("token2", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)

生产者

测试过程中超大com.lmax.disruptor.AbstractSequencer#bufferSize会导致com.lmax.disruptor.dsl.Disruptor耗时非常长,自测1024 * 1024 再高就感觉很吃力了,所以没测试超过1百万的消息队列长度。由于并没有设定com.lmax.disruptor.AbstractSequencer#bufferSize的测试场景,所以本次测试总是用这个设置。

测试结果规律倒是挺明显的:

  1. 消息总量越大,QPS越大
  2. 生产者线程数对QPS影响不大
  3. 消息体尽可能小

消费者

对于Disruptor框架来讲,单独的消费者用例比较难构建,我用了一个取巧的办法,会对性能测试结果有一些影响,这里可以通过后来分享测试用例的时候会详细说说。不过对于Disruptor逆天的消费能力,这点误差可以忽略。

测试结论也挺明显的,基本与java.util.concurrent.LinkedBlockingQueue一致。

  1. 数据上看长度越长越好
  2. 消费者线程越少越好
  3. 消息体尽可能小

PS:关于Disruptor消费能力,我测试了一个1百万大对象消息,1000线程的消费者用例,QPS=3412/ms,这个跟我后面基于Disruptor设计的新性能测试模型有关系,表明消费者线程数即使增加到1000,Disruptor依然保持了非常高的性能。

生产者 & 消费者

这里的线程数指的是生产者或者消费者的数量,总体线程数是此数值的2倍。

次轮整个测试过程都是几乎崩溃的,因为同样的用例执行起来误差太大了,最大的能有接近2倍的差距。以下结论仅供参考:

  1. 消息队列积累消息越少,速率越快
  2. 消费速率随时间推移越来越快
  3. 消息体尽可能小

其中当线程数超过10的时候,出现了非常明显的性能下滑,这个可以通过上面两组数据得出原因,Disruptor消费太快了,是生产者的数倍之多。最后测试出来的结果其实就是生产者的速率。当线程数比较少的时候,Disruptor总是有消息堆积的,所以生产者速率不会成为瓶颈,这个也跟用例设计有关系。

基准测试

请翻阅上期的测试文章内容Java&Go高性能队列之LinkedBlockingQueue性能测试。

测试用例

测试用例使用Groovy语言编写,自从我自定义了异步关键字fun和复习了闭包的语法之后,感觉就像开了光一样,有点迷上了各类多线程的语法实现。本期我又额外使用了自定义统计时间的关键字time以及利用闭包实现自定义等待方法,其他内容均与上期文章相同。

Disruptor有个先天的优势,就是必需得设置ringBufferSize,相当于提前分配内存了。这点是我之前没想到的,当我回去复测LinkedBlockingQueue的时候发现并没有明显的性能差异,对于测试结果影响可忽略。

PS:这里用到了一些sleep(),会导致一些误差,这个以我能力暂无法避免,经过测试对结论影响不大,对数据影响有限。

生产者


import com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.frame.execute.ThreadPoolUtil
import com.funtester.utils.Time
import com.lmax.disruptor.EventHandler
import com.lmax.disruptor.RingBuffer
import com.lmax.disruptor.WorkHandler
import com.lmax.disruptor.YieldingWaitStrategy
import com.lmax.disruptor.dsl.Disruptor
import com.lmax.disruptor.dsl.ProducerType
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase

import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger

class DisProduce extends SourceCode {

    static AtomicInteger index = new AtomicInteger(1)

    static int total = 50_0000

    static int size = 10

    static int threadNum = 10

    static int piece = total / size

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    public static void main(String[] args) {
        Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>(
                FunEvent::new,
                1024 * 1024,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );
        disruptor.start();
        RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer();
        def latch = new CountDownLatch(threadNum)
        def ss = Time.getTimeStamp()
        def funtester = {
            fun {
                (total / threadNum).times {
                    if (index.getAndIncrement() % piece == 0) {
                        def l = Time.getTimeStamp() - ss
                        output("${formatLong(index.get())}添加总消耗${formatLong(l)}")
                        ss = Time.getTimeStamp()
                    }
                    //                    def get = new HttpGet()

//                    def get = new HttpGet(url)
//                    get.addHeader("token", token)
//                    get.addHeader(HttpClientConstant.USER_AGENT)
//                    get.addHeader(HttpClientConstant.CONNECTION)

                    def get = new HttpGet(url + token)
                    get.addHeader("token", token)
                    get.addHeader("token1", token)
                    get.addHeader("token5", token)
                    get.addHeader("token4", token)
                    get.addHeader("token3", token)
                    get.addHeader("token2", token)
                    get.addHeader(HttpClientConstant.USER_AGENT)
                    get.addHeader(HttpClientConstant.CONNECTION)
                    ringBuffer.publishEvent((event, sequence) -> event.setRequest(get))
                }
                latch.countDown()
            }
        }
        //        fun {
        //            while (true) {
        //                sleep(1.0)
        //                output(disruptor.getRingBuffer().getBufferSize())
        //            }
        //        }
        def start = Time.getTimeStamp()
        threadNum.times {funtester()}
        latch.await()
        def end = Time.getTimeStamp()
        outRGB("每毫秒速率${total / (end - start)}")


        disruptor.shutdown();


    }

    /**
     * 消费者
     */
    private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> {

        public void onEvent(FunEvent event, long sequence, boolean endOfBatch) {

        }

        public void onEvent(FunEvent event) {

        }

    }


    private static class FunEvent {

        HttpRequestBase request

        HttpRequestBase getRequest() {
            return request
        }

        void setRequest(HttpRequestBase request) {
            this.request = request
        };

    }

消费者


import com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.frame.event.EventThread
import com.funtester.frame.execute.ThreadPoolUtil
import com.funtester.utils.Time
import com.lmax.disruptor.EventHandler
import com.lmax.disruptor.RingBuffer
import com.lmax.disruptor.WorkHandler
import com.lmax.disruptor.YieldingWaitStrategy
import com.lmax.disruptor.dsl.Disruptor
import com.lmax.disruptor.dsl.ProducerType
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase

import java.util.concurrent.atomic.AtomicInteger
import java.util.stream.Collectors

class DisConsumer extends SourceCode {

    static AtomicInteger index = new AtomicInteger(1)

    static int total = 50_0000

    static int threadNum = 10

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    static def key = true

    public static void main(String[] args) {

        Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>(
                FunEvent::new,
                1024 * 1024,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );
        def funs = range(threadNum).mapToObj(f -> new FunEventHandler()).collect(Collectors.toList())
        disruptor.handleEventsWithWorkerPool(funs as FunEventHandler[])
        disruptor.start();
        RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer();
        def ss = Time.getTimeStamp()
        time {
            total.times {
//                def get = new HttpGet()

//                def get = new HttpGet(url)
//                get.addHeader("token", token)
//                get.addHeader(HttpClientConstant.USER_AGENT)
//                get.addHeader(HttpClientConstant.CONNECTION)

                def get = new HttpGet(url + token)
                get.addHeader("token", token)
                get.addHeader("token1", token)
                get.addHeader("token5", token)
                get.addHeader("token4", token)
                get.addHeader("token3", token)
                get.addHeader("token2", token)
                get.addHeader(HttpClientConstant.USER_AGENT)
                get.addHeader(HttpClientConstant.CONNECTION)

                ringBuffer.publishEvent((event, sequence) -> event.setRequest(get));
            }
        }
        output("数据$total 构建完成!")
        def start = Time.getTimeStamp()
        key = false
        waitFor {!disruptor.hasBacklog()} , 0.01
        def end = Time.getTimeStamp()
        output(end - start)
        outRGB("每毫秒速率${total / (end - start)}")


        disruptor.shutdown();


    }

    /**
     * 消费者
     */
    private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> {

        public void onEvent(FunEvent event, long sequence, boolean endOfBatch) {
            if (key) sleep(0.05)
        }

        public void onEvent(FunEvent event) {
            if (key) sleep(0.05)
        }

    }


    private static class FunEvent {

        HttpRequestBase request

        HttpRequestBase getRequest() {
            return request
        }

        void setRequest(HttpRequestBase request) {
            this.request = request
        };

    }

生产者 & 消费者


import com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.frame.execute.ThreadPoolUtil
import com.funtester.utils.Time
import com.lmax.disruptor.EventHandler
import com.lmax.disruptor.RingBuffer
import com.lmax.disruptor.WorkHandler
import com.lmax.disruptor.YieldingWaitStrategy
import com.lmax.disruptor.dsl.Disruptor
import com.lmax.disruptor.dsl.ProducerType
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase

import java.util.concurrent.atomic.AtomicInteger
import java.util.stream.Collectors

class DisBoth extends SourceCode {

    static AtomicInteger index = new AtomicInteger(1)

    static int total = 100_0000

    static int threadNum = 5

    static int buffer = 20_0000

    static def url = "http://localhost:12345/funtester"

    static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"

    static def key = true

    public static void main(String[] args) {

        Disruptor<FunEvent> disruptor = new Disruptor<FunEvent>(
                FunEvent::new,
                1024 * 256,
                ThreadPoolUtil.getFactory(),
                ProducerType.MULTI,
                new YieldingWaitStrategy()
        );
        def funs = range(threadNum).mapToObj(f -> new FunEventHandler()).collect(Collectors.toList())
        disruptor.handleEventsWithWorkerPool(funs as FunEventHandler[])
        disruptor.start();
        RingBuffer<FunEvent> ringBuffer = disruptor.getRingBuffer();
        def produces = {
            fun {
                while (true) {
                    if (index.getAndIncrement() > total) break
                    //                    def get = new HttpGet()

//                    def get = new HttpGet(url)
//                    get.addHeader("token", token)
//                    get.addHeader(HttpClientConstant.USER_AGENT)
//                    get.addHeader(HttpClientConstant.CONNECTION)

                                    def get = new HttpGet(url + token)
                                    get.addHeader("token", token)
                                    get.addHeader("token1", token)
                                    get.addHeader("token5", token)
                                    get.addHeader("token4", token)
                                    get.addHeader("token3", token)
                                    get.addHeader("token2", token)
                                    get.addHeader(HttpClientConstant.USER_AGENT)
                                    get.addHeader(HttpClientConstant.CONNECTION)
                    ringBuffer.publishEvent((event, sequence) -> event.setRequest(get));
                }
            }
        }
        time {
            buffer.times {
                //                                def get = new HttpGet()

//                def get = new HttpGet(url)
                //                get.addHeader("token", token)
                //                get.addHeader(HttpClientConstant.USER_AGENT)
                //                get.addHeader(HttpClientConstant.CONNECTION)

                                def get = new HttpGet(url + token)
                                get.addHeader("token", token)
                                get.addHeader("token1", token)
                                get.addHeader("token5", token)
                                get.addHeader("token4", token)
                                get.addHeader("token3", token)
                                get.addHeader("token2", token)
                                get.addHeader(HttpClientConstant.USER_AGENT)
                                get.addHeader(HttpClientConstant.CONNECTION)
                ringBuffer.publishEvent((event, sequence) -> event.setRequest(get));
            }
        }
        output("数据$buffer 构建完成!")
        def start = Time.getTimeStamp()
        key = false
        threadNum.times {produces()}
        waitFor {!disruptor.hasBacklog()} , 0.01
        def end = Time.getTimeStamp()
        def l = end - start
        output(l)
        outRGB("每毫秒速率${(total + buffer) / l}")


        disruptor.shutdown();


    }

    /**
     * 消费者
     */
    private static class FunEventHandler implements EventHandler<FunEvent>, WorkHandler<FunEvent> {

        public void onEvent(FunEvent event, long sequence, boolean endOfBatch) {
            if (key) sleep(0.05)
        }

        public void onEvent(FunEvent event) {
            if (key) sleep(0.05)
        }


    }


    private static class FunEvent {

        HttpRequestBase request

        HttpRequestBase getRequest() {
            return request
        }

        void setRequest(HttpRequestBase request) {
            this.request = request
        };

    }

「Have Fun ~ Tester !」

  • FunTester2021年总结
  • FunTester原创大赏
  • Groovy语言学习笔记大赏【FunTester】
  • Golang语言HTTP客户端实践
  • 反射访问和修改private变量
  • 如何突破职业瓶颈
  • 如何选择API测试工具
  • 如何从测试自动化中实现价值
  • Groovy中的list
  • 重放浏览器请求多链路性能测试实践
  • 性能测试如何减少本机误差
  • 莫要寻找可能不存在的答案
  • 分段随机实践—模拟线上流量
  • 千万级日志回放引擎设计稿

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码