百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

性能远超chan的无锁队列

toyiye 2024-05-25 20:11 26 浏览 0 评论

作者:leonzgshao,腾讯CSIG后台开发工程师

| 导语结合Java语言的高可用无锁队列框架Disruptor实现的高性能无锁队列,可实现远高于chan的高性能数据传递,解决高并发环境下chan写入数据慢的问题。

1. chan的困境

1.1. chan高并发挺慢

按照我之前的认知,我以为既然go提供了这么一种通信方式,那么它的性能自然一定是有保证的。事实证明,靠想、靠自以为是不靠谱的,chan并没有想象中的那么快,尤其是并发的时候。先做一个最简单的测试,并发向chan中放入数据,看下放入的时间分布:

var (
		t1_10us     = uint64(0) // 1-10微秒
		t10_100us   = uint64(0) // 10-100微秒
		t100_1000us = uint64(0) // 100-1000微秒
		t1_10ms     = uint64(0) // 1-10毫秒
		t10_100ms   = uint64(0) // 10-100毫秒
		t100_ms     = uint64(0) // 大于100毫秒
	)

	var (
		length   = 1024 * 1024
		goSize   = 100
		numPerGo = 10000
		counter  = uint64(0)
		slower   = uint64(0)
		wg       sync.WaitGroup
	)
	ch := make(chan uint64, length)
	// 消费端
	go func() {
		var ts time.Time
		var count int32
		for {
			x := <-ch
			atomic.AddInt32(&count, 1)
			if count == 1 {
				ts = time.Now()
			}
			if x%100000 == 0 {
				fmt.Printf("read %d\n", x)
			}
			if count == int32(goSize*numPerGo) {
				tl := time.Since(ts)
				fmt.Printf("read time = %d ms\n", tl.Milliseconds())
			}
		}
	}()
	wg.Add(goSize)
	totalS := time.Now()
	for i := 0; i < goSize; i++ {
		go func() {
			for j := 0; j < numPerGo; j++ {
				x := atomic.AddUint64(&counter, 1)
				ts := time.Now()
				ch <- x
				tl := time.Since(ts)
				ms := tl.Microseconds()
				if ms > 1 {
					atomic.AddUint64(&slower, 1)
					if ms < 10 { // t1_10us
						atomic.AddUint64(&t1_10us, 1)
					} else if ms < 100 {
						atomic.AddUint64(&t10_100us, 1)
					} else if ms < 1000 {
						atomic.AddUint64(&t100_1000us, 1)
					} else if ms < 10000 {
						atomic.AddUint64(&t1_10ms, 1)
					} else if ms < 100000 {
						atomic.AddUint64(&t10_100ms, 1)
					} else {
						atomic.AddUint64(&t100_ms, 1)
					}
				}
			}
			wg.Done()
		}()
	}
	wg.Wait()
	totalL := time.Since(totalS)
	fmt.Printf("write total time = [%d ms]\n", totalL.Milliseconds())
	time.Sleep(time.Second * 3)
	fmt.Printf("slow ratio = %.2f \n", float64(slower)*100.0/float64(counter))
	fmt.Printf("quick ratio = %.2f \n", float64(goSize*numPerGo-int(slower))*100.0/float64(goSize*numPerGo))
	fmt.Printf("[<1us][%d] \n", counter-slower)
	fmt.Printf("[1-10us][%d] \n", t1_10us)
	fmt.Printf("[10-100us][%d] \n", t10_100us)
	fmt.Printf("[100-1000us][%d] \n", t100_1000us)
	fmt.Printf("[1-10ms][%d] \n", t1_10ms)
	fmt.Printf("[10-100ms][%d] \n", t10_100ms)
	fmt.Printf("[>100ms][%d] \n", t100_ms)

上述例子中,启动了100个协程,每个协程循环向chan中放入10000个对象,上面的代码在我的mac中执行结果如下:

write total time = [184 ms]
read time = 196 ms
slow ratio = 14.72 
quick ratio = 85.28 
[<1us][852773] 
[1-10us][101126] 
[10-100us][45671] 
[100-1000us][395] 
[1-10ms][19] 
[10-100ms][16] 
[>100ms][0] 

然而我们仔细的分析一下,可以看到如下两个点:

1)对象在放入chan中时,还是比较耗时的,尤其是会存在不小比例的耗时比较高的,例如上面的16个10-100ms间的操作,这种波动或者说抖动会高并发时会严重影响我们的性能,我们期望这种抖动尽可能降低;

2)上述测试的chan长度是1024x1024,实际上比放入对象的大小还大,也就是说即使chan不满,也会导致一定的慢耗时

1.2. chan结构

在排查原因前,我们先看下chan的结构,众所周知,chan的结构如下runtime.hchan

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

1.3. chan为什么这么慢

其实已经有很多文章介绍chan的原理了,我就不详细描述了,从影响性能的角度来看,简单来说有两点:

  • 如果recv协程空闲,则send协程会优先给recv协程,以提高效率;
  • recv和send协议都使用了hchan.lock对象,该对象是一个锁,也就是说它们之间会有竞争;

对于前者,给我们提供了一个思路,那就是由多个recv协程读取,这样可以提高放入的性能,但是具体设置多少呢?实际上业务并不好判断,因此并不是一种好的选择。

对于后者,我们先来说下这个lock,这个lock不是sync.Mutexsync.Mutex是go给我们开发者提供一个锁,这个锁的实现其实比较复杂,也有很多文章来说明,我这里就不详细说明了,但是这个锁有一个好处,它其实是一个轻量级的锁,这个轻量主要说的是这个锁是由go自身提供的,当出现锁切换时,它并不需要调用操作系统的lock来让渡出cpu资源,而仅仅是通过gopark()的方式,将目前正在执行的g放回到p中,等待其他m来调度它,它的切换相对来说没有那么耗费资源。

而hchan中的lock是一个runtime.mutex。这个锁是一个互斥锁,在linux系统中它的实现是futex,在没有竞争的情况下,会退化成为一个自旋操作,速度非常快,但是当竞争比较大时,它就会在内核中休眠。注意,此处是在内核中休眠,而与runtime.Mutex是不同的,这也是为什么chan会这么慢的原因。

2. 无锁队列

2.1. Disruptor

熟悉java语言的小伙伴应该知道有一个比较出名的高性能无锁队列框架:Disruptor,github地址:https://github.com/LMAX-Exchange/disruptor 当然go语言也有一个对应的库go-disruptor:https://github.com/smarty-prototypes/go-disruptor

但是在实际测试中发现go-disruptor其实性能比较一般,没有Java中的优化那么多,并且go-disruptor并发写入需要使用锁,非常不优化,性能比较低。因此,就萌生了写一个go版本Disruptor想法。

2.2. Lockfree

要做到无锁实现队列模型,那么能依赖的其实就只有atomic(原子操作)。本无锁队列参考了Disruptor的实现,根据go语言的特点,加入了自己的一些思考,代码已经开源,地址:https://github.com/bruceshao/lockfree 欢迎来拍。

lockfree的核心优化点包括如下几个方面,其中很多也是我们在开发高性能程序需要关注的,可以参考:

1)绝对的无锁实现:

lockfree内部几乎所有的操作都是通过原子变量(atomic)来操作,仅仅有一处使用了chan,作为队列长时间为空时,消费g阻塞使用,该chan只有在队列为空的情况下触发,所以不会影响性能。

2)单一的消费g:

消费g即队列的消费者,将消费g设置为单一g,即整个无锁队列只有一个g用于消费,这样就屏蔽掉了读操作竞争带来的性能损耗。

3)写不等待原则:

本身无锁队列的设计初衷就是写入要快,因此对于写入的操作是不会等待的,当无法写入时会持续通过自旋加任务调度的方式处理,一方面尽量加快写入效率,另一方面则是防止占用太多CPU资源。其核心处理代码:

  // 获取下一个可写入序号  
  seq := q.seqer.next()
	pos := int(seq & q.mask)
	for {
		if q.abuf.disabled(pos) {
			q.rbuf.write(pos, v)
			q.abuf.enable(pos)
			// 如果接收方阻塞则释放
			q.abuf.release()
			break
		}
		// 写操作持续等待,该等待仅会调用runtime.Gosched()进行当前g的调度让出
		loop, _ = wait(loop, WriteWaitMax)
	}

4)Pointer替代切片:

available切片用于标记ringbuffer中元素的可用状态。尽管其是一个[]uint8结构,但实际上当高并发对其进行赋值更新时,由于每次操作在其内部都会进行越界判断(通过汇编代码获得该信息),导致其寻址性能并不高。因此通过对切片结构中的Data进行unsafe.Pointer操作,提高了其可用状态调整的性能。

// enable 设置pos位置为可读状态,读线程可读取
func (a *available) enable(pos int) {
	*(*uint8)(unsafe.Pointer(uintptr(a.buf) + uintptr(pos))) = 1
}

// disable 设置pos位置为可写状态,写入线程可写入值
func (a *available) disable(pos int) {
	*(*uint8)(unsafe.Pointer(uintptr(a.buf) + uintptr(pos))) = 0
}

5)一次性内存分配:

使用环状结构Ringbuffer实现对象的传递,RingBuffer中存储对象为包含传递对象结构的结构体,可以进行一次性内存分配,提高处理的性能。

6)缓存行填充:

在计算机硬件上,为了提高效率,cpu存在多级高速缓存(通常是三级)。指令和数据会被事先加载到多级缓存中,这样cpu就不用每次与内存进行交互,从而提高效率。然而实际上不会只加载需要的数据,而是会加载需要数据的上文部分数据,因为根据程序的局部性原理,这些数据后面大概率会用到,这样就避免了再次加载,提高了效率。但是如果这样一次性加载的数据如果被多个cpu核心操作,就会涉及到一个竞争,因此每次加载和更新的数据是有冲突的(从应用程序上来看是没有冲突的),这就形成了所谓的伪共享。解决这个问题的办法就是缓存行填充,操作系统一般一次性加载的缓存行大小是64B,因此可以在其前和后各加入部分字段来解决。数据结构如下:

// cursor 游标,一直持续增长的一个uint64序列
// 该序列用于wg(Write Goroutine)获取对应写入到buffer中元素的位置操作
// 通过使用atomic操作避免锁,提高性能
// 通过使用padding填充的方式,填充前面和后面各使用7个uint64(缓存行填充),避免伪共享问题
type cursor struct {
	p1, p2, p3, p4, p5, p6, p7       uint64
	v                                uint64
	p9, p10, p11, p12, p13, p14, p15 uint64
}

7)与运算加速:

RingBuffer的容量必须设置为2的n次方,这样就可以通过与运算来代替取余运算,从而提高整体的性能。

8)泛型加速:

Go1.18版本后引入了泛型,泛型与interface有很明显的区别,从性能上来看,泛型是在编译阶段确定类型,这样可有效降低在运行时进行类型转换的耗时(经过测试,这部分还是比较耗时的)。

2.3. 核心概念

Lockfree的整体结构关系如下所示:

ringBuffer

具体对象的存放区域,通过数组(定长切片)实现环状数据结构,其中的数据对象是具体的结构体而非指针,这样可以一次性进行内存申请,避免频繁内存申请带来的系统开销。数据结构如下:

type e[T any] struct {
	val T
}

// ringBuffer 具体对象的存放区域,通过数组(定长切片)实现环状数据结构
// 其中e为具体对象,非指针,这样可以一次性进行内存申请
type ringBuffer[T any] struct {
	buf      []e[T]
	sequer   *sequencer
	capacity uint64
}

available

切片实现的map,通过index(或pos)标识每个位置为0或1,当长时间无法读取时会通过blockC进行阻塞,写线程完成时可释放该blockC。 其内部buf实际是[]uint8,但由于[]uint8切片在寻址时会进行游标是否越界的判断,造成性能下降,因此通过使用unsafe.Pointer直接对对应的值进行操作,从而避免越界判断,提升性能。之所以使用uint8数组而不是使用的bitmap,主要是考虑到写并发的行为,防止bit操作导致数据异常(或靠锁解决)。数据结构如下:

// available 切片实现的map,通过index(或pos)标识每个位置为0或1
// 当长时间无法读取时会通过blockC进行阻塞,写线程完成时可释放该blockC
// 其内部buf实际是[]uint8,但由于[]uint8切片在寻址时会进行游标是否越界的判断,造成性能下降,
// 因此通过使用unsafe.Pointer直接对对应的值进行操作,从而避免越界判断,提升性能
// 之所以使用uint8是考虑到写并发的行为,防止bit操作导致数据异常(或靠锁解决)
type available struct {
	buf    unsafe.Pointer
	blockC chan struct{}
	block  uint32
}

sequencer

序号产生器,维护读和写两个状态,写状态具体由内部游标(cursor)维护,读取状态由自身一个uint64变量维护。它的核心方法是next(),用于获取下个可以写入的游标。数据结构如下:

// sequencer 序号产生器,维护读和写两个状态,写状态具体由内部游标(cursor)维护。
// 读取状态由自身维护,变量read即可
type sequencer struct {
	wc       *cursor
	ws       waitStrategy
	rc       uint64 // 读取游标,因为该值仅会被一个g修改,所以不需要使用cursor
	capacity uint64
}

Producer

生产者,核心方法是Write,通过调用Write方法可以将对象写入到队列中。支持多个g并发操作,保证加入时处理的效率。数据结构如下:

// Producer 生产者
// 核心方法是Write,通过调用Write方法可以将对象写入到队列中
type Producer[T any] struct {
	seqer  *sequencer
	rbuf   *ringBuffer[T]
	abuf   *available
	mask   uint64
	status int32
}

consumer

消费者,这个消费者只会有一个g操作,这样处理的好处是可以不涉及并发操作,其内部不会涉及到任何锁,对于实际的并发操作由该g进行分配。数据结构如下:

// consumer 消费者,这个消费者只会有一个g操作,这样处理的好处是可以不涉及并发操作,其内部不会涉及到任何锁
// 对于实际的并发操作由该g进行分配
type consumer[T any] struct {
	rbuf     *ringBuffer[T]
	abuf     *available
	seqer    *sequencer
	hdl      EventHandler[T]
	parallel bool
	mask     uint64 // 用于使用&代替%(取余)运算提高性能
	status   int32  // 运行状态
}

waitStrategy

等待策略,该策略用于获取写入可用的sequence时进行的等待。默认提供了两个实现,SchedWaitStrategy和SleepWaitStrategy,前者使用runtime.Gosched(),后者使用time.Sleep()实现。 推荐使用SchedWaitStrategy,也可以自己实现。

EventHandler

事件处理器接口,整个项目中唯一需要用户实现的接口,该接口描述消费端收到消息时该如何处理,它使用泛型,通过编译阶段确定事件类型,提高性能。

// EventHandler 事件处理器接口
// 整个无锁队列中唯一需要用户实现的接口,该接口描述消费端收到消息时该如何处理
// 使用泛型,通过编译阶段确定事件类型,提高性能
type EventHandler[T any] interface {
	// OnEvent 用户侧实现,事件处理方法
	OnEvent(t T)
}

3. 性能对比

3.1. 写入耗时提升

整体上来看,Disruptor在写入和读取上的性能大概都在channel的7倍以上,数据写入的越多,性能提升越明显。 下面是buffer=1024x1024时,写入数据的耗时对比(可以看到写入时间有明显提升):

3.2. 性能对比

仍然以buffer大小为1024 x 1024为例,将写入时间进行分段,形成了如下的表,其中快速率描述的是写入耗时在微秒内的占比:

数据
(g*循环)

队列

快速率

<1us

1-10us

10-100us

100-1000us

1-10ms

10-100ms

>100ms

50*10000

chan

85.24%

426198

48630

24835

327

6

4

4

50*10000

lockfree

98.06%

490307

8340

1255

94

4

0

0

100*10000

chan

84.39%

843858

104287

51598

217

20

20

0

100*10000

lockfree

98.00%

980004

17513

2343

131

9

0

0

1000*10000

chan

10.07%

1007273

117192

50303

8822466

2714

39

13

1000*10000

lockfree

64.06%

6405519

23298

47347

3519377

3083

1376

0

5000*10000

chan

1.98%

990905

119376

48902

530

48835376

4889

22

5000*10000

lockfree

80.97%

40485785

30654

19052

466781

8987742

9986

0

10000*10000

chan

1.12%

1117019

76828

33322

1504

98746320

24960

47

10000*10000

lockfree

88.33%

88333884

46109

43460

630901

9701375

1244271

0

从上图中可以明显看出,lockfree比chan的性能会高很多:

  • lockfree的快速率明显超过chan,并且随着写入数据的增加,其没有明显下降,而chan下降非常明显;
  • lockfree基本没有非常大的耗时(大于100ms),而chan会存在,这种情况会导致比较强烈的抖动;

最后,git地址:https://github.com/bruceshao/lockfree 欢迎小伙伴来拍、沟通和试用。

相关推荐

为何越来越多的编程语言使用JSON(为什么编程)

JSON是JavascriptObjectNotation的缩写,意思是Javascript对象表示法,是一种易于人类阅读和对编程友好的文本数据传递方法,是JavaScript语言规范定义的一个子...

何时在数据库中使用 JSON(数据库用json格式存储)

在本文中,您将了解何时应考虑将JSON数据类型添加到表中以及何时应避免使用它们。每天?分享?最新?软件?开发?,Devops,敏捷?,测试?以及?项目?管理?最新?,最热门?的?文章?,每天?花?...

MySQL 从零开始:05 数据类型(mysql数据类型有哪些,并举例)

前面的讲解中已经接触到了表的创建,表的创建是对字段的声明,比如:上述语句声明了字段的名称、类型、所占空间、默认值和是否可以为空等信息。其中的int、varchar、char和decimal都...

JSON对象花样进阶(json格式对象)

一、引言在现代Web开发中,JSON(JavaScriptObjectNotation)已经成为数据交换的标准格式。无论是从前端向后端发送数据,还是从后端接收数据,JSON都是不可或缺的一部分。...

深入理解 JSON 和 Form-data(json和formdata提交区别)

在讨论现代网络开发与API设计的语境下,理解客户端和服务器间如何有效且可靠地交换数据变得尤为关键。这里,特别值得关注的是两种主流数据格式:...

JSON 语法(json 语法 priority)

JSON语法是JavaScript语法的子集。JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组JS...

JSON语法详解(json的语法规则)

JSON语法规则JSON语法是JavaScript对象表示法语法的子集。数据在名称/值对中数据由逗号分隔大括号保存对象中括号保存数组注意:json的key是字符串,且必须是双引号,不能是单引号...

MySQL JSON数据类型操作(mysql的json)

概述mysql自5.7.8版本开始,就支持了json结构的数据存储和查询,这表明了mysql也在不断的学习和增加nosql数据库的有点。但mysql毕竟是关系型数据库,在处理json这种非结构化的数据...

JSON的数据模式(json数据格式示例)

像XML模式一样,JSON数据格式也有Schema,这是一个基于JSON格式的规范。JSON模式也以JSON格式编写。它用于验证JSON数据。JSON模式示例以下代码显示了基本的JSON模式。{"...

前端学习——JSON格式详解(后端json格式)

JSON(JavaScriptObjectNotation)是一种轻量级的数据交换格式。易于人阅读和编写。同时也易于机器解析和生成。它基于JavaScriptProgrammingLa...

什么是 JSON:详解 JSON 及其优势(什么叫json)

现在程序员还有谁不知道JSON吗?无论对于前端还是后端,JSON都是一种常见的数据格式。那么JSON到底是什么呢?JSON的定义...

PostgreSQL JSON 类型:处理结构化数据

PostgreSQL提供JSON类型,以存储结构化数据。JSON是一种开放的数据格式,可用于存储各种类型的值。什么是JSON类型?JSON类型表示JSON(JavaScriptO...

JavaScript:JSON、三种包装类(javascript 包)

JOSN:我们希望可以将一个对象在不同的语言中进行传递,以达到通信的目的,最佳方式就是将一个对象转换为字符串的形式JSON(JavaScriptObjectNotation)-JS的对象表示法...

Python数据分析 只要1分钟 教你玩转JSON 全程干货

Json简介:Json,全名JavaScriptObjectNotation,JSON(JavaScriptObjectNotation(记号、标记))是一种轻量级的数据交换格式。它基于J...

比较一下JSON与XML两种数据格式?(json和xml哪个好)

JSON(JavaScriptObjectNotation)和XML(eXtensibleMarkupLanguage)是在日常开发中比较常用的两种数据格式,它们主要的作用就是用来进行数据的传...

取消回复欢迎 发表评论:

请填写验证码