前述
多线程与多处理器(多核)运算,是我们需要掌握的一项技能,因为他们有很多好处,但是本身又很复杂,容易出错。因此有必要将它作为一项专业技能加以研究运用。
多线程与多核运算的好处主要体现在以下几个方面:
1. 提高计算效率
多线程和多核运算能够将一个大任务拆分成多个小任务,并在多个处理器或核心上同时执行。这种并行处理方式可以显著提高计算效率,尤其适用于需要处理大量数据的任务,如图像处理或数据分析。例如,通过多线程处理,可以在短时间内完成更多的计算任务,从而提升整体工作效率。
2. 缩短计算时间
通过多线程和多核运算的并行处理能力,可以同时执行多个任务,从而大幅缩短计算时间。这对于需要快速响应的应用场景尤为重要,如实时数据处理或机器学习模型的训练。通过并行计算,可以在更短的时间内完成复杂的计算任务,满足对时间敏感的需求。
3. 提高资源利用率
多线程和多核运算能够更充分地利用计算机的硬件资源,包括CPU、内存和磁盘等。通过合理分配任务到不同的处理器或核心上,可以避免资源的浪费,并提高计算机的性能和效率。这种优化的资源利用方式有助于更好地处理大规模数据,提升系统的整体性能。
4. 增加可扩展性
多线程和多核运算的设计使得系统可以轻松地扩展到更多的处理器或核心上,以处理更大规模的数据。这种可扩展性为处理复杂问题和应对未来数据增长提供了灵活性。随着技术的发展和硬件的升级,多线程和多核运算将能够应对更大的计算挑战。
5. 提升系统响应能力和用户体验
多线程技术可以实现异步操作,即在执行某个任务的同时,不阻塞主线程的运行。这在进行网络请求、文件读写等耗时操作时特别有用,可以显著提高用户界面的流畅性和响应能力。对于需要与用户进行交互的应用来说,这一点至关重要,因为它直接影响到用户体验。
背景
操作系统怎样提升并行处理能力
操作系统利用线程调度和多处理器技术来提升并行处理能力的主要方法包括以下几个方面:
- 并行任务分配: 操作系统通过线程调度算法将任务分配给多个处理器或处理器核心,并利用多个线程并行执行不同的任务。这样可以充分利用处理器资源,提高系统的整体处理能力。
- 多核并行处理: 操作系统能够利用多核处理器中的多个核心来并行执行任务。通过合理的线程调度和任务分配,可以实现多个线程在不同核心上同时执行,从而提高系统的并行处理能力。
- 并发执行: 操作系统可以利用线程调度机制实现多个线程在同一时间片内并发执行。通过在不同线程之间进行快速切换,操作系统可以实现并行执行多个任务,从而提高系统的并发处理能力。
- 负载均衡: 操作系统可以通过线程调度算法对任务进行负载均衡,将任务合理地分配给各个处理器核心,以确保每个处理器核心的负载均衡,最大限度地发挥处理器的性能。
- 调度优化: 操作系统通过优化线程调度算法和策略,以最大程度地提高系统的并行处理能力。例如,采用抢占式调度算法、实时调度算法等,根据不同的应用场景和需求进行调度优化。
具体策略如下:
线程调度策略
- 基于优先级的调度:
- 操作系统为线程分配不同的优先级,高优先级的线程会优先获得处理器时间。这确保了重要的任务能够优先被执行,从而提高系统的响应速度和吞吐量。
时间片轮转调度:
- 在这种策略下,每个线程被分配一个固定的时间片来运行。当一个线程的时间片用完时,它会被移至就绪队列的末尾,等待下一个轮转周期。这种方式确保了所有线程都能公平地获得处理器时间。
多级队列调度:
- 操作系统维护多个就绪队列,每个队列具有不同的优先级。线程可以根据其特性和需求被分配到不同的队列中。这种策略结合了优先级调度和时间片轮转调度的优点,提供了更灵活的线程管理。
多处理器技术
- 对称多处理(SMP):
- 在SMP系统中,所有处理器都是平等的,可以访问共享的内存和I/O资源。操作系统通过合理的线程调度策略,将线程分配到不同的处理器上并行执行,从而显著提高整体计算能力。
- 多线程技术:
- 现代处理器普遍支持多线程技术,如Intel的超线程技术(Hyper-Threading)。这种技术允许一个处理器核心同时执行多个线程,从而提高了处理器的利用率和整体性能。
- 负载均衡:
- 操作系统通过监控各个处理器的负载情况,动态地分配和调整线程到不同的处理器上执行。这确保了每个处理器都能得到充分的利用,避免了某些处理器过载而其他处理器空闲的情况。
- 亲和性调度:
- 操作系统允许程序员或系统管理员指定线程与处理器的亲和性,即线程更倾向于在某个特定的处理器上执行。这可以减少线程在不同处理器之间的迁移开销,提高缓存利用率和整体性能。
所以,操作系统通过合理的线程调度策略和多处理器技术,能够显著提高系统的并行处理能力。这些策略和技术相互补充,确保了线程能够高效、公平地访问处理器资源,从而提升了整个系统的性能和响应速度。
多线程调度
在不同的处理器结构下,多线程调度的策略会根据操作系统的不同而有所差异。下面我将分别针对Windows和Linux系统来详细介绍多线程调度的策略。
Windows系统下的多线程调度策略
- 线程优先级:
- Windows系统采用了32个线程优先级,这些优先级被分为三类:实时优先级(16-31),可变优先级(1-15),以及零页线程(优先级0)。
- 实时优先级的线程在运行时不会改变其优先级,而可变优先级的线程可以在一定范围内升高或降低其优先级。
- 时间配额:
- Windows中的时间配额不是一个具体的时间长度值,而是一个称为配额单位的整数。
- 当一个线程用完了自己的时间配额,并且没有其他相同优先级的线程时,Windows会重新为该线程分配一个新的时间配额,以确保其能够继续运行。
- 调度策略:
- Windows系统会根据线程的优先级和时间配额来动态调度线程的执行。高优先级的线程会优先获得CPU时间,而在相同优先级的线程之间,系统会根据时间配额来分配处理器时间。
Linux系统下的多线程调度策略
- 调度策略与优先级:
- Linux系统主要提供了三种调度策略:SCHED_OTHER(默认),SCHED_FIFO,和SCHED_RR。
- SCHED_OTHER是基于时间片轮转的策略,允许多个线程共享CPU时间。这种策略适用于普通的后台任务。
- SCHED_FIFO是先进先出的策略,优先级高的线程会一直运行,直到它主动放弃CPU或者完成其任务。这种策略适用于需要实时响应的任务。
- SCHED_RR也是基于时间片轮转的策略,但与SCHED_OTHER不同的是,当优先级高的线程运行一段时间后被抢占时,它会回到队列的尾部等待下一次调度。这种策略提供了更好的公平性。
- 调度器与调度类:
- Linux内核中的调度器负责根据线程的调度策略和优先级来分配CPU时间。调度器会根据当前系统的负载情况和线程的状态来动态调整调度决策。
- Linux还支持多种调度类,如CFS(完全公平调度器)和实时调度类等。这些调度类为不同类型的线程提供了不同的调度策略和优化手段。
Windows和Linux系统在多线程调度策略上有所不同。Windows系统更注重线程的优先级和时间配额的分配方式来进行调度决策;而Linux系统则提供了多种灵活的调度策略和调度类来满足不同类型线程的需求。这些差异使得两个系统在处理多线程任务时各有优势并适用于不同的应用场景。
多核架构
多 CPU 架构可以分为几种类型,主要有对称多处理器(SMP)、非对称多处理器(ASMP)、多核处理器、众核处理器等。
- 对称多处理器(SMP):
- 在 SMP 架构中,所有的 CPU 共享同一总线或连接到一个中央交换机。
- 每个 CPU 都可以访问相同的内存地址空间,因此可以并发地执行不同的程序和任务。
- SMP 架构通常用于桌面计算机、服务器和超级计算机等领域。
- 非对称多处理器(ASMP):
- 在 ASMP 架构中,每个 CPU 有自己的本地内存,而不是共享同一内存。
- CPU 之间的通信通常通过网络或专用通信通道进行,而不是共享总线。
- ASMP 架构通常用于嵌入式系统和分布式系统等领域。
- 多核处理器:
- 多核处理器是指在同一芯片上集成多个 CPU 核心。
- 这些核心共享同一片片上缓存(L2 和 L3 缓存),但有自己的独立缓存(L1 缓存)。
- 多核处理器可用于桌面计算机、笔记本电脑、服务器和移动设备等。
- 众核处理器:
- 众核处理器是指在同一芯片上集成大量的 CPU 核心,通常超过几十个甚至上百个。
- 这些核心可能共享一些资源,如内存接口、片片上缓存等。
- 众核处理器通常用于高性能计算、数据中心、人工智能和深度学习等领域。
在多 CPU 架构中,链接情况取决于具体的架构和硬件设计。在 SMP 架构中,所有 CPU 共享相同的内存空间,因此它们可以相互通信和协调执行任务。而在 ASMP 架构中,CPU 之间的通信通常需要通过网络或专用通信通道进行,因此链接情况更为复杂。在多核和众核处理器中,核心之间的链接通常通过芯片内部的互联网络进行,以实现高效的通信和数据传输。
互连线
互连线是处理器与内存以及处理器与处理器之间进行通信的媒介。有两种基本的互连结构:SMP(symmetric multiprocessing,对称多处理)和NUMA(nonuniform memory access,非一致内存访问)
互连线示意图,来自《多处理器编程的艺术》
在SMP系统结构中,处理器和内存之间采用总线互连结构,类似于微型以太网上的广播媒介。处理器和主存都有用来负责发送和监听总线上广播的信息的总线控制单元(监听有时称为探听)。如今SMP系统结构非常普遍,因为它们最容易构建,但是对于数量较多的处理器来说,这种系统结构不具有扩展性,因为总线最终将变为过载。
在NUMA系统结构中,一系列节点通过点对点网络相互连接,就像一个小型的局域网。每个节点包含一个或多个处理器和一个本地存储器。一个节点的本地存储对于其他节点是可访问的,所有节点的本地存储一起形成一个可以被所有处理器共享的全局存储器。NUMA的名字反映了一个事实,即处理器访问自己节点存储器的速度要比访问其他节点存储器的速度快。网络要比总线复杂,需要更加复杂的协议,但是对于数量较多的处理器来说网络比总线的可扩展性更好。
可以在SMP和NUMA系统结构之间设计一种折中方案:设计一种混合系统结构,同一集群中的处理器通过总线通信,而不同集群中的处理器则通过网络通信。从程序员的角度看,底层平台无论是基于总线、网络还是混合结构似乎并不重要。然而,理解互连线是由处理器所共享的有限资源是很重要的。如果一个处理器使用较多的互连线带宽,那么其他的处理器就会被延迟。
高速缓存
在现代系统结构中一次主存访问可能会花费数百个时钟周期,因此,存在这样一种危险,即处理器将会花费许多时间等待主存响应请求。解决这一问题的方法就是引入一个或多个高速缓存:一种与处理器非常接近因此速度比主存要快的小容量存储器。这些高速缓存逻辑上位于处理器和主存之间:当处理器试图从给定的主存地址读取一个值时,首先查看该值是否已经在高速缓存中,如果在,则不需要进行较慢的主存访问。如果找到目标地址的值,则称处理器在高速缓存中命中,否则称为缺失。同样,如果处理器试图写的地址在高速缓存中,那么它就不需要执行较慢的主存访问。在高速缓存中符合请求的比例称为高速缓存的命中率。
高速缓存是非常有效的,因为大多数程序都表现出较高的局部性:如果处理器读或写一个内存地址(或者内存单元),那么它很快将读或写同一个地址。况且,如果处理器读或写一个内存单元,那么它很可能会立刻读或写该单元附近的单元。为了利用第二个结论,高速缓存通常在一个比字更大的粒度上进行操作:高速缓存维护一组邻近的字,称为缓存行(或缓存块)。
实际上,大多数处理器都具有二级高速缓存,称为L?Cache和L2 Cache。L1 Cache通常和处理器在同一个芯片中,对它的访问通常需要一到两个时钟周期。L2 Cache则可放置在芯片中也可以不放置在芯片中,对它的访问需要数十个时钟周期。两者都比要花费数百个时钟周期的内存快得多。当然,对于不同的平台,访问次数会随之而变化,许多多处理器都具有更为精细的高速缓存结构。
NUMA系统结构的最初提议中并不包含高速缓存,因为当初认为有本地内存就已足够了。然而后来的商用NUMA系统结构却包含有高速缓存。术语缓存一致的NUMA(cc-NUMA)有时用来指带有高速缓存的NUMA系统结构。为了避免歧义,今后除非明确指出,我们所说的NUMA都是缓存一致的。
由于高速缓存的生产价格高,因此其大小要比内存小得多:在同一时刻只有一部分内存单元被放置在高速缓存中。因此,我们希望在高速缓存中保存那些最常使用的单元。这意味着当内存单元要被装入到高速缓存中而缓存已满时,有必要收回一个缓存块,如果该缓存块没有被修改则直接丢弃,如果已被修改则写回主存。替换策略则决定将替换掉哪一个缓存块,以便为新的内存单元腾出空间。如果替换策略是自由地替换任何缓存块,则称该高速缓存是全相联的。另一方面,如果只可以替换唯一的缓存块,则称该缓存是直接映射的。如果我们折中这种差别,允许使用一组大小为k的块的集合中的任何一个块来替换一个给定的块,则称这样的缓存为k级组相联的。
一致性
当一个处理器读或写被另一处理器装入高速缓存的主存地址时,将发生共事(或称内存争用)现象。如果两个处理器都只读数据而不修改,那么数据可以装入到两个处理器的高速缓存中。然而,如果一个处理器要更新共享的缓存块,那么另一个处理器的副本必须作废以确保它不会读到过期的值。通常称这个问题为缓存一致性。如MESI协议。该协议已经用在Pentium和PowerPC处理器中。下面是缓存块的状态。
·modified(修改):缓存中的块已被修改,它最终必须写回主存。其他的处理器不能再缓存这个块。
·exclusive(互斥):缓存块还未被修改,且其他的处理器不能将这个块装入缓存。
·shared(共享):缓存块未被修改,且其他处理器可以缓存这个块。
·invalid(无效):块中不包含任何有意义的数据。
下面用一个简短的例子来说明MESI协议,
MESI,来自《多处理器编程的艺术》
在a中,处理器A从地址a读数据,将数据存入它的缓存并置为exculsive状态。在b中,当处理器B试图从相同的地址读数据时,A检测到地址冲突,以相关数据做出响应。此时,a同时被处理器A和B以shared状态装入缓存。在c中,如果B要对共享地址a进行写操作,则将其状态改变为modified,并广播此信息以提醒A(以及其他任何可能已将该数据装入缓存的处理器)将它的缓存块状态设置为invalid。在d中,如果A随后从a读数据,它会广播它的请求,B则通过将修改过的数据发送到A和主存,并置两个副本的状态为shared来做出响应
处理器A从地址a读数据,将数据存入它的高速缓存并置为exculsive状态。当处理器B试图从同一个地址读数据时,A检测到地址冲突,并以相关数据做出响应。此时,a同时被A和B以shared状态装入缓存。如果B要对地址a进行写操作,则将其状态改变为modified,并广播此信息以提醒A(以及其他任何可能已将该数据装入缓存的处理器)将它的缓存块状态设置为invalid。如果A随后要从a读数据,它会广播它的请求,B则通过将修改过的数据发送到A和主存,并置两个副本的状态为shared来做出响应。
当处理器访问逻辑上不同的数据时,由于它们要访问的内存单元对应于同一个缓存块而导致发生冲突的现象称为错误共享。这种情形反映了一种难于处理的权衡问题:较大的缓存块对局部性有利,但却增加了错误共享的可能性。出现错误共享的可能性可以通过确保独立线程并发访问的数据对象距离内存足够远来降低。例如,让多个线程共享一个字节数组则可以导致错误共享,但是若让它们共享双精度整型数组则出现错误共享的危险性就变得很小了。
自旋
如果处理器不断地测试内存中的某个字,等待另一个处理器改变它,则称该处理器正在自旋。自旋依赖于体系结构,能对整个系统的性能产生显著的影响。
对于无高速缓存的SMP系统结构来说,自旋是一种非常糟糕的想法。每当处理器读内存时,都会消耗总线带宽却没有做任何有用的工作。由于总线是广播媒介,这些直接对内存的请求可能会阻止其他处理器的推进。
对于无高速缓存的NUMA系统结构,如果地址位于处理器的本地存储器中,那么自旋是可以接受的。尽管无高速缓存的多处理器系统结构很少见,我们仍然要研究当考虑具有自旋的同步协议时,是否允许每个处理器在它自己的本地存储器上自旋。
对于具有高速缓存的SMP或NUMA系统结构,自旋仅消耗非常少的资源。处理器第一次读地址时,会产生一个高速缓存缺失,将该地址的内容加载到缓存块中。此后,只要数据没有改变,处理器只需从它自己的高速缓存中重读数据,不需占用互连带宽,这种过程称为本地自旋。当高速缓存状态发生改变时,处理器产生一个高速缓存缺失,观察到数据已发生改变,并停止自旋。
可以考虑下面代码的性能对比
public class TASLock implements Lock {
...
public void lock() {
while( state.getAndSet(true) ) {} // spin
}
...
}
public class TTASLock implements Lock {
...
while( true ) {
while (state.get()) {}; // spin
if( !state.getAndSet( true ) )
return;
}
...
}
在1989年的经典实验中,Anderson在当时的一些多处理器上测试了执行一个简单程序所需的时间。他测量了n个线程对一个较小的临界区执行一百万次所花费的时间。
lock对于多线程的数量的影响对比,来自《多处理器编程的艺术》
多核与多线程体系结构
在多核体系结构中,多个处理器被放置在同一个芯片中。芯片上的每个处理器通常都有自己的L1高速缓存,但它们共享一个公共的L2高速缓存。处理器之间可以通过共享L2高速缓存进行高效的通信,从而避免了进入内存并调用那些令人讨厌的一致性协议。
多核cache图,《多处理器编程的艺术》
在多线程体系结构中,一个处理器可以一次执行两个或更多个线程。许多现代处理器能够不按次序来执行指令(并行性),或以并行的方式执行(如保持定长和浮点单元同时繁忙),甚至可以在分支或数据计算之前预测地执行指令。为了保持硬件单元繁忙,多线程的处理器能将多个流的指令混合执行。
现代处理器系统结构将多核系统结构与多线程系统结构相结合,多个独立地支持多线程的核可以放置到同一个芯片中。在一些多核芯片上,上下文切换所花费的代价非常低,并可以在很细的粒度上进行,特别对于那些每条指令都要切换的上下文更是如此。因此,多线程方式避免了较大的内存访问时延:当一个线程访问内存时,处理器会让另一个线程执行。
松弛的内存一致性
当处理器要将一个值写入内存时,该值被保存在高速缓存中并被标记为脏值,以表明该值最终必须要写回主存。对于大多数现代处理器来说,当写请求发生时并没有直接作用到主存中,而是将它们收集到一个称为写缓冲区的硬件队列中,在以后的某个时刻再一起作用到主存上。写缓冲具有两个优点。首先,它能更加高效地发布一批请求,称为批处理。其次,如果一个线程对一个地址多次写,早先的请求会被抛弃,节省了内存访问代价,这种现象称为写吸收。写缓冲区的应用会产生一个重要的结果:对主存发出的读写访问顺序并不一定与主存中实际发生的顺序一样。例如,如果两个处理器各自都先写自己的标志,然后再读对方的标志位,那么其中一个将会看到对方最新写的标志值。若采用写缓冲方式,则该结论不再成立,因为有可能两个处理器都在写,每个写都在它自己的写缓冲区中,但这两个缓冲区有可能在每个处理器都读了对方在内存中的标志位后才被写入。这样两者都没有读到对方的标志。在编译中则可能出现更为严重的问题。通常,编译器适于在单处理器系统结构上进行性能优化。这种优化往往要求重排单个线程对内存的读写次序。这种重排序对于单线程程序是不可见的,但对多线程程序来说,由于线程可以观察到写发生的顺序,则会产生我们并不希望的结果。例如,如果一个线程将数据装入缓冲区后设置一个指示器以标记缓冲区是否为满,那么并发线程可能在看到新数据之前看到了指示器设置,从而导致它们读到的数据为旧值。
所有的系统结构都提供强制写操作按照它们产生的次序来执行的能力,但这种方式的代价很高。内存路障指令(有时称为内存栅栏)将刷新写缓冲区,以确保在路障之前产生的所有写操作对于产生路障的处理器是可见的。内存路障往往是通过像getAndSet()这样的原子读-改-写操作或者标准的并发库来透明地插入。因此,只有当处理器对临界区外的共享变量执行读/写指令时,才需要显式地使用内存路障。一方面,内存路障的代价较高(100个时钟周期或者更多),因此只有在必要时才能使用。另一方面,由于同步问题很难追踪,所以应该宽松地使用内存路障,而不是依靠复杂的特定平台来保障对内存指令重排序的限制。
硬件同步指令
现代的典型系统结构通常支持两种通用同步原语中的一种。AMD、Intel和Sun的系统结构支持比较和交换(compare-and-swap,CAS)指令。该指令具有三个参数:内存地址a、期望值e和更新值v,返回一个布尔值。它原子地执行下列步骤:
- 如果内存地址a中包含有期望值e,
- 将更新值v写入该地址并返回true,
- 否则,保持该内存值不变,并返回false。
在Intel和AMD的系统结构中,CAS被称为CMPXCHG,而在SPARCTM中被称为CAS。CAS指令有一个缺陷。下面是最常使用CAS的情形。一个应用从给定的内存地址读值a,并且为该地址计算出一个新值c。仅当该地址的值a在被应用读后一直未改变,才能将新值c存入。有人可能认为用期望值a和更新值c调用CAS能实现这个目标。然而有一个问题:一个线程有可能用另一个值b覆写了a,随后又将a写入到那个地址中。CAS指令将用c替换掉a,但是这也许并不是应用所预期的结果(例如,如果地址中存放的是指针,而新值a可能是一个回收对象的地址)。CAS调用将用v替换e,但是应用并没有完成它所预期的工作。这种问题称为ABA问题。
另一个硬件同步原语是一对指令:加载/链接和存储/条件(load-linked和store-conditional,LL/SC)。LL指令从地址a读数据。随后的SC指令尝试将一个新值存入该地址。若线程对a产生LL指令以来,地址a的内容没有变化则该SC指令成功。若在这段期间a的内容发生了变化,则该SC指令失败。有一些系统结构支持LL和SC指令:Alpha AXP(1d1_1/st1_c)、IBM PowerPC(1warx/stwcx),MIPS(11/sc)和ARM(1drex/strex)。LL/SC指令并不受ABA问题所影响,但在实践中,往往对一个线程在LL与对应的SC之间所能做的工作加以限制。上下文切换是另一种LL指令(或另一种加载/存储指令),该指令有可能导致SC指令失败。
CAS与LL/SC
CAS与LL/SC(Load-Linked/Store-Conditional)的原理如下所述:
CAS (Compare-And-Swap) 原理
- 基本操作:CAS包含三个基本操作:读取内存值、比较内存值和预期值、以及在内存值和预期值相同时写入新值。
- 原子性:这三个操作是原子的,即在执行过程中不会被其他线程中断,从而保证了数据的一致性和正确性。
- 无锁实现:CAS主要用于实现无锁的数据结构,避免了传统锁机制可能带来的死锁和线程阻塞问题。
- 硬件支持:CAS的实现依赖于硬件提供的原子操作指令,确保操作的原子性。
- 并发性能:CAS通过减少线程间的竞争和上下文切换,提高了并发性能。
- 示例:
// 使用c++ atomic
#include <atomic>
std::atomic<int> counter(0); // 原子整数
void increment() {
int expected = counter.load(); // 获取当前值
while (!counter.compare_exchange_weak(expected, expected + 1)) {
// CAS操作失败,重试
expected = counter.load();
}
}
int main() {
// 创建多个线程进行自增操作
// ...
return 0;
}
// 汇编语言
section .data
lock_value dd 0 ; 锁的初始值为 0
section .text
global cas_lock ; 全局函数 cas_lock
cas_lock:
mov eax, 1 ; 设置 eax 为 1,表示要获取锁
xchg eax, [lock_value] ; 用 eax 的值与锁的当前值交换,并将新值存储在锁中
test eax, eax ; 检查 eax 是否为 0,如果为 0,说明获取锁成功
jz lock_acquired ; 跳转到 lock_acquired 标签
ret ; 如果锁未获取成功,返回
lock_acquired:
ret ; 锁获取成功,返回
LL/SC (Load-Linked/Store-Conditional) 原理
- Load-Linked (LL):此操作从内存中读取一个值,并标记该内存地址,以便后续的Store-Conditional操作可以检查该地址的值是否在此期间被其他处理器或线程修改过。
- Store-Conditional (SC):此操作尝试向之前通过Load-Linked标记的内存地址写入一个新值。但是,只有在该内存地址的值从上次LL操作后未被修改过的情况下,写入操作才会成功。如果内存值已被修改,则SC操作会失败。
- 原子性保证:LL和SC操作通常被设计为原子的,以确保在多处理器或多线程环境中数据的一致性和正确性。
- 锁实现基础:LL/SC机制是许多锁算法和read-modify-write原子操作的基础,它允许线程或处理器在并发环境中安全地访问和修改共享数据。
- 硬件支持:与CAS类似,LL/SC的实现也依赖于底层硬件的支持,以确保操作的原子性和内存一致性。
- 汇编语言示例:
# Spin_Lock(lockkey)
# lockkey 是共享资源锁,这是一个可以被多核共享的内存地址。
# 值为0表示锁处于空闲状态,为1表示锁已经被某个核所获取。
# 其余核若想获取它只能等待。
Spin_Lock:
ll t0, lockkey # 从内存中读取 lockkey
bnez t0, Spin_Lock # 如果锁不可用,跳转到 Spin_Lock 标签重新尝试获取
li t0, 1 # 给 t0 寄存器赋值为1
sc t0, lockkey # 将 t0 寄存器的值保存入 lockkey 中
beqz t0, Spin_Lock # 判断 t0 寄存器的值是否为0,如果为0表示操作失败,返回 Spin_Lock 标签重新开始
sync # 内存操作同步指令,保证之前的内存操作完成
# 锁已成功获取,继续执行后续代码
# ...
# 使用CAS实现自增操作
# value 是一个共享的整数变量
# 假设 value 的初始值为0
Increment:
ll t0, value # 从内存中读取 value
addi t1, t0, 1 # 将 t0 寄存器的值加1
sc t1, value # 将 t1 寄存器的值保存入 value 中
beqz t1, Increment # 判断 t1 寄存器的值是否为0,如果为0表示操作失败,返回 Increment 标签重新开始
# 自增操作成功,继续执行后续代码
# ...
CAS和LL/SC都是用于解决并发环境下数据竞争问题的机制,它们通过硬件支持的原子操作来确保数据的一致性和正确性,从而提高了多线程或多处理器环境中的并发性能。但是,执行一条CAS或LL/SC指令往往要花费比执行加载或存储指令多得多的时钟周期:它包含内存路障、防止乱序执行以及各种编译器优化。准确的代价取决于许多因素,不仅包含从一种系统结构到另一种系统结构的变化,而且还包含在同一种系统结构中从一种应用到另一种应用的变化。这足以说明CAS或LL/SC要比简单的加载/存储慢得多。
自旋锁的实现原理
自旋锁是一种用于多线程同步的机制,其实现原理可以概括为以下几个关键点:
- 锁状态检查:当一个线程尝试获取自旋锁时,它首先会检查锁的状态。如果锁处于空闲状态(未被其他线程持有),则该线程可以直接获取到锁,并将锁的状态设置为占用;如果锁已经被其他线程持有,则进入自旋等待状态。
- 自旋等待:如果锁已经被其他线程占用,当前线程会进入一个循环(即“自旋”),在这个循环中不断地检查锁的状态。一旦锁被释放,当前线程会立即获取到锁,并将锁的状态设置为占用。
- 锁的释放:当线程完成对共享资源的访问后,它会释放自旋锁,将锁的状态重新设置为空闲,以便其他线程可以获取到锁。
需要注意的是,自旋锁虽然避免了线程的睡眠和唤醒开销,但如果锁被持有时间过长或者竞争激烈,会导致CPU资源的浪费。因此,在实际应用中需要权衡使用自旋锁与其他同步机制的优缺点。
总的来说,CAS(和LL/SC)算法和自旋锁都是多线程编程中用于实现线程同步的重要机制,它们在不同的应用场景中各有优势。
核心内容
多线程和多处理器编程是利用计算机系统中的多个核心或处理器并行执行任务的编程技术。其核心内容主要包括以下几个方面:
- 并行性:多线程和多处理器编程的核心是利用计算机系统中的并行处理能力,将任务分解为多个子任务,然后在多个线程或处理器上同时执行这些子任务,以提高系统的整体性能和吞吐量。
- 同步与互斥:在多线程和多处理器编程中,不同的线程或处理器可能同时访问共享资源,因此需要采取同步和互斥机制来保证数据的一致性和正确性。常见的同步和互斥技术包括锁、信号量、条件变量等。
- 线程调度:在多线程编程中,操作系统负责对线程进行调度和管理,以确保多个线程能够合理地共享处理器时间片,并且能够按照一定的优先级和调度策略执行。合理的线程调度策略可以提高系统的响应速度和吞吐量。
- 并发性与竞争条件:多线程和多处理器编程中常常会遇到竞争条件(Race Condition)和死锁(Deadlock)等并发性问题,开发人员需要通过合适的设计和编码技术来避免或解决这些问题,以确保程序的正确性和可靠性。
- 数据共享与通信:在多线程和多处理器编程中,不同的线程或处理器之间需要共享数据或进行通信,开发人员需要选择合适的数据共享和通信方式,包括共享内存、消息传递等,以实现线程之间的协作和交互。
- 并行算法和数据结构:多线程和多处理器编程需要设计并实现并行算法和数据结构,以适应并行执行的需求。并行算法和数据结构需要考虑并行性、数据共享、同步等方面的问题,以确保在并行环境下的正确性和高效性。
综上所述,多线程和多处理器编程的核心内容和思想包括并行性、同步与互斥、线程调度、并发性与竞争条件、数据共享与通信,以及并行算法和数据结构等方面。开发人员需要结合具体的应用场景和需求,选择合适的编程模型和技术,以实现高效、可靠的并行程序。
多线程与多处理器编程的核心内容和思想可以归纳为以下几点:
多线程编程:
- 线程的概念:
- 线程是程序执行的最小单位,它允许程序同时执行多个任务。
- 线程是程序执行的路径,代表了一个独立的执行序列。
- 多线程的优势:
- 提高程序的性能和响应能力。通过将任务分解为多个线程,可以加快程序的运行速度。
- 实现同时执行多个任务,提高系统的吞吐量。
- 多线程的挑战:
- 对同一份资源操作时,会存在资源抢夺的问题,需要加入并发控制。
- 线程会带来额外的开销,如CPU调度时间和并发控制开销。
- 多线程的实现:
- 在多线程编程中,线程的运行由调度器安排调度,调度器与操作系统紧密相关,线程的先后顺序不能人为干预。
- 多线程编程需要合理地划分任务,并分配给不同的线程执行。
多处理器编程:
在单处理器上编写程序时,通常不用考虑系统底层系统结构的细节。然而不幸的是,对多处理器的编程目前还不能做到这点,对机器底层系统结构的理解在多核编程中仍起着至关重要的作用。
- 多核处理器的概念:
- 多核处理器是指在一个物理芯片上集成了两个或更多的处理核心,这些处理核心可以同时执行不同的指令,实现并行计算。
- 多处理器编程的优势:
- 通过合理的任务划分和调度,可以充分发挥多核处理器的计算能力,提高系统的整体性能。
- 并行计算能够显著提高计算性能,加快任务的执行速度。
- 多处理器编程的挑战:
- 需要将计算任务划分为多个子任务,并将这些子任务分配给不同的处理核心进行并行处理。
- 需要考虑数据共享和同步的问题,以确保数据的一致性和正确性。
- 多处理器编程的实现:
- 存在两种常见的并行计算模式:数据并行和任务并行。数据并行是将数据集合划分为多个子数据集合进行处理;任务并行则是将任务划分为多个子任务进行处理。
- 多处理器编程需要充分考虑并行计算和数据共享的原理,以实现高效的任务执行。
综上所述,多线程与多处理器编程的核心内容和思想是通过合理的任务划分和调度,利用多个执行单元(线程或处理核心)并行处理任务,从而提高程序的执行效率和系统的整体性能。同时,也需要解决资源抢夺、数据同步和一致性等挑战。
互斥锁
任何互斥协议都会产生这样的问题:如果不能获得锁,应该怎么做?对此有两种选择。一种方案是让其继续进行尝试,这种锁称为自旋锁,对锁的反复测试过程称为旋转或忙等待。另一种方案就是挂起自己,请求操作系统调度器在处理器上调度另外一个线程,这种方式称为阻塞。由于从一个线程切换到另一个线程的代价比较大,所以只有在允许锁延迟较长的情形下,阻塞才有意义。许多操作系统将这两种策略综合起来使用,先旋转一个小的时间段然后再阻塞。旋转和阻塞都是重要的技术。
自旋锁
我们使用了C++的std::atomic<bool>类型来实现自旋锁。lock()函数中,我们使用了exchange()函数来尝试将标志位从false交换为true,如果当前标志位为true(表示锁已经被其他线程持有),则会进入自旋等待,直到锁被释放。unlock()函数中,我们将标志位重新设置为false,释放锁。
#include <iostream>
#include <atomic>
#include <thread>
class SpinLock {
public:
SpinLock() : flag(false) {}
void lock() {
while (flag.exchange(true, std::memory_order_acquire)) {
// 自旋等待锁释放
}
}
void unlock() {
flag.store(false, std::memory_order_release);
}
private:
std::atomic<bool> flag;
};
// 示例代码
int main() {
SpinLock spinLock;
auto criticalSection = [&spinLock]() {
spinLock.lock();
// 临界区代码
std::cout << "Thread ID: " << std::this_thread::get_id() << " is in critical section." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
spinLock.unlock();
};
std::thread t1(criticalSection);
std::thread t2(criticalSection);
t1.join();
t2.join();
return 0;
}
自旋锁是一种在获取锁时不会立即阻塞线程,而是通过循环自旋的方式等待锁释放的锁机制。虽然自旋锁在某些情况下可以提供较低的延迟,但也存在一些缺点:
- 性能开销高: 自旋锁会持续占用CPU资源进行自旋,导致性能开销较大,特别是在竞争激烈的情况下,可能会导致大量的CPU资源浪费。
- 优先级反转: 当一个低优先级的线程持有自旋锁时,高优先级的线程无法进入临界区,可能会导致优先级反转问题,降低系统的响应性能。
- 活锁: 自旋锁在等待锁时不会阻塞线程,因此如果线程一直在自旋等待锁释放,可能会导致活锁问题,使得所有线程都在无效地自旋等待。
为了缓解自旋锁的这些问题,可以采用一些优化策略,例如:
- 自适应自旋: 在等待锁时,可以通过动态调整自旋等待的时间或次数来适应当前系统负载情况,减少不必要的自旋时间。
- 回退策略: 当自旋等待时间达到一定阈值时,可以采取回退策略,将线程转为阻塞状态,以避免浪费过多的CPU资源。
- 锁粒度优化: 尽量减小锁的粒度,使得锁的持有时间尽可能短,降低竞争和自旋等待的概率。
下面是使用C++ CAS算法实现的简单自旋锁示例代码,其中包含了回退策略:
#include <atomic>
#include <thread>
#include <chrono>
class SpinLock {
public:
SpinLock() : flag(false) {}
void lock() {
bool expected = false;
while (!flag.compare_exchange_weak(expected, true, std::memory_order_acquire)) {
// 自旋等待超过一定次数后,采取回退策略
if (retry_count++ > max_retry_count) {
std::this_thread::sleep_for(std::chrono::microseconds(10)); // 短暂休眠
retry_count = 0; // 重置重试计数
}
expected = false; // 重置expected值
}
}
void unlock() {
flag.store(false, std::memory_order_release);
}
private:
std::atomic<bool> flag;
int retry_count = 0; // 重试计数
static constexpr int max_retry_count = 1000; // 最大重试次数
};
在这个示例中,SpinLock类使用了CAS算法来实现自旋锁。在lock()函数中,通过循环不断尝试用compare_exchange_weak()函数尝试获取锁,如果获取失败则继续自旋等待。当自旋等待次数达到一定阈值(max_retry_count)时,采取回退策略,线程短暂休眠一段时间后再重新尝试获取锁。
阻塞锁
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx; // 定义互斥锁
// 线程函数
void print_block(int n, char c) {
// 使用锁来保护临界区代码
mtx.lock();
for (int i = 0; i < n; ++i) {
std::cout << c;
}
std::cout << std::endl;
mtx.unlock();
}
int main() {
std::thread t1(print_block, 10, '*');
std::thread t2(print_block, 10, '#');
t1.join();
t2.join();
return 0;
}
我们使用了C++标准库中的std::mutex来实现阻塞锁(互斥锁)。在线程函数print_block()中,我们首先调用lock()函数来获取锁,进入临界区执行代码,然后在退出临界区前调用unlock()函数来释放锁。这样可以确保在同一时间只有一个线程可以进入临界区执行代码,避免了多线程并发访问造成的数据竞争问题。
锁的作用
锁是实现同步的一种重要机制,用于控制对共享资源的访问,以避免竞态条件和数据不一致。粗粒度锁和细粒度锁是两种主要的锁策略,它们在并行计算中有着不同的应用和影响。
粗粒度锁(Coarse-Grained Locking)
- 定义:粗粒度锁是指锁的范围较大,通常锁定整个数据结构或数据集。
- 用途:当需要保护大量数据或复杂的数据结构时,使用粗粒度锁可以简化锁的管理。它减少了锁的数量和复杂性,但可能降低了并发性。
- 影响:由于锁定范围大,可能导致多个线程在等待同一个锁,从而降低了系统的并发性能。然而,它的管理相对简单,开销也较小。
细粒度锁(Fine-Grained Locking)
- 定义:细粒度锁是指锁的范围非常小,通常是锁定某个特定数据结构的单个元素或一小部分数据。
- 用途:当需要高并发访问不同部分的数据时,使用细粒度锁可以提高系统的并发性能。它允许多个线程同时访问不同的数据片段。
- 影响:细粒度锁提高了并发性,但也需要更复杂的管理来确保正确的同步。此外,由于锁的数量增多,可能增加了一定的开销和管理复杂性。
其他类型的锁
除了粗粒度锁和细粒度锁之外,还有一些其他类型的锁常用于并行编程中:
- 自旋锁(Spinlock):当线程尝试获取已被持有的锁时,它会持续“自旋”等待锁释放,而不是进入睡眠状态。这减少了线程切换的开销,但在高争用情况下可能导致CPU资源浪费。
- 读写锁(Read-Write Lock):这种锁允许多个线程同时读取共享资源,但只允许一个线程写入。这提高了读操作的并发性,同时确保了写操作的原子性。
- 互斥锁(Mutex):最基本的锁类型,用于保护临界区,确保一次只有一个线程可以访问特定资源。
- 条件变量(Condition Variable):与互斥锁结合使用,允许线程在特定条件下等待或唤醒其他线程。
- 信号量(Semaphore):用于控制对多个共享资源的访问,可以视为一种更通用的同步机制。
- 读写信号量(Read-Write Semaphore):类似于读写锁,但使用信号量实现,允许多读单写或单读单写的并发访问模式。
这些锁各有优缺点,适用于不同的并行编程场景和需求。在选择锁的类型和粒度时,需要根据具体的应用场景、性能要求和资源争用情况来进行权衡。
并行数据结构
并行数据结构是一种特殊的数据结构,它们是一种专门设计用于在多线程环境中安全、高效地访问和修改数据的结构,以便更有效地利用多核处理器或多处理器系统中的并行性。与普通数据结构相比,并行数据结构的主要优点在于其能够显著提高线程的吞吐量,即单位时间内处理的数据量。当然并行数据结构的复杂性,也呈现出来了,也既并行数据结构需要考虑线程间的竞争和协调,以避免数据不一致和竞争条件。
优点:
- 提高吞吐量:通过并行处理,可以同时处理多个数据元素,从而加快数据处理速度。
- 更好的硬件利用率:在具有多个处理器核心的系统中,并行数据结构可以充分利用所有可用的处理资源,避免资源浪费。
- 减少线程等待时间:通过并行处理,可以减少线程之间的等待时间,因为多个线程可以同时进行操作。
- 线程安全: 并行数据结构通过锁机制、无锁算法(如CAS操作)、分段锁等技术手段来确保线程安全,允许多个线程并发访问而不会出现数据竞争和不一致问题。
- 高并发性能: 通过减少锁的粒度或完全避免使用锁,并行数据结构能够显著提升多线程环境下的并发性能和吞吐量。
- 可扩展性: 并行数据结构通常具有良好的扩展性,能够适应不同数量的并发线程。
提升线程吞吐量的方法:
- 数据划分:将数据划分为多个部分,每个部分可以由一个单独的线程处理。这允许不同的线程并行工作,从而提高吞吐量。
- 无锁数据结构:使用无锁(lock-free)或等待自由(wait-free)的数据结构可以减少线程之间的同步开销,从而提高性能。这些数据结构通过算法保证操作的原子性,而不需要使用传统的锁机制。
- 细粒度并行性:通过将任务划分为更小的子任务,可以增加并行处理的机会。这允许更多的线程同时工作,从而提高吞吐量。
- 优化内存访问模式:通过合理安排数据的存储和访问方式,可以减少内存访问冲突和缓存失效,从而提高线程的工作效率。
- 使用高效的并发算法:针对并行处理设计高效的算法,可以进一步提高吞吐量。这些算法需要充分考虑并行性、数据局部性和内存访问模式等因素。
关于锁,提升线程吞吐量的优化方法:
- 细粒度锁(Fine-Grained Locking):
- 使用细粒度锁代替粗粒度锁,减少锁的争用,提高并发度。例如,Java 的 ConcurrentHashMap 使用分段锁(Segmented Locking)来提升并发性能。
- 无锁算法(Lock-Free Algorithms):
- 使用无锁算法(如CAS)来避免锁的使用,减少锁竞争带来的性能开销。无锁队列(Lock-Free Queue)和无锁栈(Lock-Free Stack)是常见的例子。
- 乐观并发控制(Optimistic Concurrency Control):
- 允许多个线程乐观地进行并发操作,只有在提交时才检查冲突并重试。Java 的 StampedLock 提供了乐观读锁的实现。
- 分段锁(Segmented Locking):
- 将数据结构划分为多个段,每个段有独立的锁,从而减少锁的竞争。ConcurrentHashMap 是分段锁的一个典型应用。
- 读写锁(Read-Write Lock):
- 使用读写锁允许多个读线程并发访问,而写操作独占锁。ReentrantReadWriteLock 是 Java 中的实现例子。
需要注意的是,虽然并行数据结构具有许多优点,但它们也带来了额外的复杂性和挑战,如同步问题、数据一致性问题以及并行开销等。因此,在设计并行数据结构时需要仔细权衡各种因素。
此外,根据参考文章3中提到的数据并行优化技巧,如通信融合(Fuse Allreduce),可以通过减少通信延迟和数据传输时间来进一步提高数据并行的效率,从而提升线程的吞吐量。这些优化技巧通常需要在具体的并行计算框架或系统中实现。
无锁队列
在实现无锁队列时,我们可以利用 CAS(Compare-And-Swap)来确保并发安全。ABA 问题是 CAS 操作中的一个经典问题,即在进行比较和交换之间,一个变量的值被改动了,但最后又改回了原值,使得 CAS 操作看起来没有变化。这可以通过引入版本号或使用双重指针(比如 tagged pointer)来避免。
以下是一个使用 C++ 实现的无锁队列示例,它使用 CAS 操作并尝试解决 ABA 问题:
#include <atomic>
#include <memory>
#include <iostream>
template<typename T>
class LockFreeQueue {
private:
struct Node {
T data;
std::atomic<Node*> next;
Node(T data) : data(data), next(nullptr) {}
};
struct TaggedPtr {
std::uintptr_t ptr;
unsigned long tag;
TaggedPtr() : ptr(0), tag(0) {}
TaggedPtr(Node* p, unsigned long t) : ptr(reinterpret_cast<std::uintptr_t>(p)), tag(t) {}
Node* getPtr() const { return reinterpret_cast<Node*>(ptr); }
bool operator==(const TaggedPtr& other) const { return ptr == other.ptr && tag == other.tag; }
};
std::atomic<TaggedPtr> head;
std::atomic<TaggedPtr> tail;
public:
LockFreeQueue() {
Node* dummy = new Node(T{});
head.store(TaggedPtr(dummy, 0));
tail.store(TaggedPtr(dummy, 0));
}
~LockFreeQueue() {
while (Node* node = head.load().getPtr()) {
head.store(TaggedPtr(node->next.load(), head.load().tag + 1));
delete node;
}
}
void enqueue(T data) {
Node* newNode = new Node(data);
TaggedPtr tailOld;
while (true) {
tailOld = tail.load();
Node* tailNode = tailOld.getPtr();
Node* nextNode = tailNode->next.load();
if (tailOld == tail.load()) {
if (nextNode == nullptr) {
if (tailNode->next.compare_exchange_weak(nextNode, newNode)) {
tail.compare_exchange_weak(tailOld, TaggedPtr(newNode, tailOld.tag + 1));
return;
}
} else {
tail.compare_exchange_weak(tailOld, TaggedPtr(nextNode, tailOld.tag + 1));
}
}
}
}
bool dequeue(T& result) {
TaggedPtr headOld;
while (true) {
headOld = head.load();
TaggedPtr tailOld = tail.load();
Node* headNode = headOld.getPtr();
Node* tailNode = tailOld.getPtr();
Node* nextNode = headNode->next.load();
if (headOld == head.load()) {
if (headNode == tailNode) {
if (nextNode == nullptr) {
return false;
}
tail.compare_exchange_weak(tailOld, TaggedPtr(nextNode, tailOld.tag + 1));
} else {
result = nextNode->data;
if (head.compare_exchange_weak(headOld, TaggedPtr(nextNode, headOld.tag + 1))) {
delete headNode;
return true;
}
}
}
}
}
};
int main() {
LockFreeQueue<int> queue;
queue.enqueue(1);
queue.enqueue(2);
int result;
if (queue.dequeue(result)) {
std::cout << "Dequeued: " << result << std::endl;
}
if (queue.dequeue(result)) {
std::cout << "Dequeued: " << result << std::endl;
}
return 0;
}
解释:
- Node 结构体:每个节点包含数据和指向下一个节点的指针。
- TaggedPtr 结构体:用来存储指针和版本号,解决 ABA 问题。
- head 和 tail:头部和尾部指针使用 TaggedPtr 结构。
- enqueue 方法:添加新元素,使用 CAS 确保并发安全。
- dequeue 方法:删除元素,使用 CAS 确保并发安全,并防止 ABA 问题。
- main 函数:测试队列的基本操作。
这个无锁队列利用 CAS 操作和版本号(tag)来避免 ABA 问题,确保了在高并发环境中的正确性。
并行优先级队列
这里我们将使用C++和Compare-And-Swap(CAS)来实现一个并行优先级队列。为了避免ABA问题,我们可以使用带有版本号的指针。这个例子将使用一个带有版本号的指针来解决ABA问题。
template<typename T>
class LockFreePriorityQueue {
public:
LockFreePriorityQueue() {
head = new Node<T>(T(), std::numeric_limits<int>::min());
tail = new Node<T>(T(), std::numeric_limits<int>::max());
head->next.setPointer(tail, 0);
}
~LockFreePriorityQueue() {
Node<T>* node = head;
while (node != nullptr) {
Node<T>* next = node->next.getPointer();
delete node;
node = next;
}
}
void enqueue(T value, int priority) {
Node<T>* newNode = new Node<T>(value, priority);
while (true) {
Node<T>* pred = head;
Node<T>* curr = head->next.getPointer();
while (curr->priority < priority) {
pred = curr;
curr = curr->next.getPointer();
}
uint32_t currVersion = curr->next.getVersion();
uint32_t predVersion = pred->next.getVersion();
newNode->next.setPointer(curr, currVersion);
if (pred->next.compareAndSet(curr, newNode, predVersion, predVersion + 1)) {
return;
}
}
}
bool dequeue(T& result) {
while (true) {
Node<T>* pred = head;
Node<T>* curr = head->next.getPointer();
Node<T>* succ = curr->next.getPointer();
if (curr == tail) {
return false; // Queue is empty
}
uint32_t currVersion = curr->next.getVersion();
uint32_t predVersion = pred->next.getVersion();
if (pred->next.compareAndSet(curr, succ, predVersion, predVersion + 1)) {
result = curr->value;
delete curr;
return true;
}
}
}
private:
Node<T>* head;
Node<T>* tail;
};
int main() {
LockFreePriorityQueue<int> pq;
pq.enqueue(5, 1);
pq.enqueue(10, 2);
pq.enqueue(1, 0);
int value;
while (pq.dequeue(value)) {
std::cout << "Dequeued: " << value << std::endl;
}
return 0;
}
并行二叉搜索树(Concurrent Binary Search Tree, BST)
并行二叉搜索树是一种可以在多线程环境下安全操作的二叉树数据结构。为了实现并行操作,通常会使用细粒度锁或无锁算法来确保线程安全。
下面先展示一个线程安全的二叉搜索树的示例。我们使用锁来包含Node节点,以确保多个线程可以并发插入、查找和删除节点。为了简化代码,我们只实现插入和查找操作。
#include <iostream>
#include <mutex>
#include <shared_mutex>
#include <thread>
template<typename T>
class ConcurrentBST {
private:
struct Node {
T value;
Node* left;
Node* right;
mutable std::shared_mutex mtx;
Node(T val) : value(val), left(nullptr), right(nullptr) {}
};
Node* root;
mutable std::shared_mutex root_mtx;
void insert(Node*& node, T value) {
if (!node) {
node = new Node(value);
} else if (value < node->value) {
std::unique_lock lock(node->mtx);
insert(node->left, value);
} else if (value > node->value) {
std::unique_lock lock(node->mtx);
insert(node->right, value);
}
}
bool search(Node* node, T value) const {
if (!node) {
return false;
}
std::shared_lock lock(node->mtx);
if (value == node->value) {
return true;
} else if (value < node->value) {
return search(node->left, value);
} else {
return search(node->right, value);
}
}
public:
ConcurrentBST() : root(nullptr) {}
void insert(T value) {
std::unique_lock lock(root_mtx);
insert(root, value);
}
bool search(T value) const {
std::shared_lock lock(root_mtx);
return search(root, value);
}
};
int main() {
ConcurrentBST<int> tree;
auto inserter = [&tree](int start, int end) {
for (int i = start; i < end; ++i) {
tree.insert(i);
}
};
auto searcher = [&tree](int value) {
if (tree.search(value)) {
std::cout << "Found " << value << std::endl;
} else {
std::cout << "Not Found " << value << std::endl;
}
};
std::thread t1(inserter, 0, 100);
std::thread t2(inserter, 100, 200);
t1.join();
t2.join();
std::thread t3(searcher, 50);
std::thread t4(searcher, 150);
std::thread t5(searcher, 250);
t3.join();
t4.join();
t5.join();
return 0;
}
为了提升上面示例代码的性能,我们需要更细粒度的锁机制,确保在操作一个节点时不锁住整个子树。这意味着我们应该避免持有锁的时间过长,并在可能的情况下尽量减少锁的范围。
我们可以通过:递归时,锁分离的方式来实现这一点,即在插入和搜索操作中,先锁住当前节点,然后递归进入子节点时解锁当前节点。这种方法称为"递归分离锁"(Hand-Over-Hand Locking),使得锁的持有时间尽量短,减少线程之间的等待时间。
#include <iostream>
#include <mutex>
#include <shared_mutex>
#include <thread>
template<typename T>
class ConcurrentBST {
private:
struct Node {
T value;
Node* left;
Node* right;
mutable std::shared_mutex mtx;
Node(T val) : value(val), left(nullptr), right(nullptr) {}
};
Node* root;
mutable std::shared_mutex root_mtx;
void insert(Node*& node, T value, std::unique_lock<std::shared_mutex>& parent_lock) {
if (!node) {
node = new Node(value);
} else {
std::unique_lock<std::shared_mutex> node_lock(node->mtx);
parent_lock.unlock();
if (value < node->value) {
insert(node->left, value, node_lock);
} else if (value > node->value) {
insert(node->right, value, node_lock);
}
}
}
bool search(Node* node, T value, std::shared_lock<std::shared_mutex>& parent_lock) const {
if (!node) {
return false;
}
std::shared_lock<std::shared_mutex> node_lock(node->mtx);
parent_lock.unlock();
if (value == node->value) {
return true;
} else if (value < node->value) {
return search(node->left, value, node_lock);
} else {
return search(node->right, value, node_lock);
}
}
public:
ConcurrentBST() : root(nullptr) {}
void insert(T value) {
std::unique_lock<std::shared_mutex> lock(root_mtx);
insert(root, value, lock);
}
bool search(T value) const {
std::shared_lock<std::shared_mutex> lock(root_mtx);
return search(root, value, lock);
}
};
int main() {
ConcurrentBST<int> tree;
auto inserter = [&tree](int start, int end) {
for (int i = start; i < end; ++i) {
tree.insert(i);
}
};
auto searcher = [&tree](int value) {
if (tree.search(value)) {
std::cout << "Found " << value << std::endl;
} else {
std::cout << "Not Found " << value << std::endl;
}
};
std::thread t1(inserter, 0, 100);
std::thread t2(inserter, 100, 200);
t1.join();
t2.join();
std::thread t3(searcher, 50);
std::thread t4(searcher, 150);
std::thread t5(searcher, 250);
t3.join();
t4.join();
t5.join();
return 0;
}
代码说明
- 递归分离锁:
- 在插入和搜索操作中,首先锁定当前节点,然后在递归进入子节点之前解锁当前节点。
- 使用 std::unique_lock 和 std::shared_lock 管理锁的生命周期,并确保在进入子节点之前解锁父节点的锁。
- 改进的插入和搜索函数:
- 插入和搜索函数接受一个父节点的锁作为参数,递归调用时传递新的锁。
- 在锁定当前节点后,立即解锁父节点的锁,以便其他线程可以访问父节点及其子树。
通过递归分离锁技术,我们减少了锁的持有时间,避免了锁住整个子树的情况,提高了并行操作的效率。这种方法在多线程环境下可以显著提升性能,同时保持线程安全性。
但是,我们还有进一步优化的空间,比如,使用CAS(Compare-And-Swap)算法来实现并行二叉树。如果使用CAS算法实现并行二叉树,则需要一些额外的工作,因为CAS操作主要用于原子地更新单个变量,而二叉树节点的插入和搜索操作涉及多个步骤。我们可以借助一种叫做 Lock-Free Data Structure 的设计来实现这一目标。以下是一个示例代码,展示如何使用CAS来实现并行二叉树的插入操作。
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>
#include <cassert>
template<typename T>
class LockFreeBST {
private:
struct Node {
T value;
std::atomic<Node*> left;
std::atomic<Node*> right;
Node(T val) : value(val), left(nullptr), right(nullptr) {}
};
std::atomic<Node*> root;
bool insertHelper(Node* parent, Node* node, T value) {
if (value < node->value) {
Node* left = node->left.load();
if (left == nullptr) {
Node* newNode = new Node(value);
if (node->left.compare_exchange_strong(left, newNode)) {
return true;
} else {
delete newNode;
return insertHelper(node, node->left.load(), value);
}
} else {
return insertHelper(node, left, value);
}
} else if (value > node->value) {
Node* right = node->right.load();
if (right == nullptr) {
Node* newNode = new Node(value);
if (node->right.compare_exchange_strong(right, newNode)) {
return true;
} else {
delete newNode;
return insertHelper(node, node->right.load(), value);
}
} else {
return insertHelper(node, right, value);
}
}
return false; // Value already exists in the tree.
}
public:
LockFreeBST() : root(nullptr) {}
bool insert(T value) {
Node* newNode = new Node(value);
if (root.load() == nullptr) {
if (root.compare_exchange_strong(nullptr, newNode)) {
return true;
} else {
delete newNode;
}
}
Node* r = root.load();
return insertHelper(nullptr, r, value);
}
bool search(T value) {
Node* current = root.load();
while (current != nullptr) {
if (value == current->value) {
return true;
} else if (value < current->value) {
current = current->left.load();
} else {
current = current->right.load();
}
}
return false;
}
};
int main() {
LockFreeBST<int> tree;
auto inserter = [&tree](int start, int end) {
for (int i = start; i < end; ++i) {
tree.insert(i);
}
};
auto searcher = [&tree](int value) {
if (tree.search(value)) {
std::cout << "Found " << value << std::endl;
} else {
std::cout << "Not Found " << value << std::endl;
}
};
std::thread t1(inserter, 0, 100);
std::thread t2(inserter, 100, 200);
t1.join();
t2.join();
std::thread t3(searcher, 50);
std::thread t4(searcher, 150);
std::thread t5(searcher, 250);
t3.join();
t4.join();
t5.join();
return 0;
}
代码解释
- Node 结构:每个节点包含一个值和两个原子指针,分别指向左右子节点。
- root:根节点是一个原子指针,允许并发访问和更新。
- insertHelper:这个辅助函数递归地尝试插入新节点。如果插入位置已经被其他线程占用,则重复尝试。
- insert:尝试将新节点插入树中。如果根节点为空,尝试用CAS操作将其设置为新节点。如果根节点已经存在,调用 insertHelper 进行递归插入。
- search:递归地搜索值,直到找到或到达树的末端。
预防措施
- CAS重试机制:在插入操作中使用CAS,如果失败则重复尝试,确保线程安全。
- 避免死锁和活锁:通过细粒度的CAS操作和递归重试机制,避免死锁和活锁情况。
- 资源管理:确保在CAS操作失败时,及时释放新分配的节点,防止内存泄漏。
通过这种方式,我们可以实现一个线程安全的并行二叉树,同时避免传统锁机制带来的性能瓶颈。
我们再次扩展一下,使用 Compare-And-Swap (CAS) 来实现并行二叉搜索树 (Binary Search Tree, BST) ,并且要避免 ABA 问题。
c++实现代码如下:
#include <atomic>
#include <iostream>
#include <thread>
#include <vector>
#include <cstdint>
namespace BST {
// 辅助类,避免 ABA 问题
template <typename T>
class AtomicMarkedPointer {
public:
AtomicMarkedPointer(T* ptr = nullptr, bool mark = false)
: combined(encode(ptr, mark)) {}
// 拷贝构造函数
AtomicMarkedPointer(const AtomicMarkedPointer& other)
: combined(other.combined.load()) {}
// 拷贝赋值操作符
AtomicMarkedPointer& operator=(const AtomicMarkedPointer& other) {
if (this == &other) return *this;
combined.store(other.combined.load());
return *this;
}
// 移动构造函数
AtomicMarkedPointer(AtomicMarkedPointer&& other) noexcept
: combined(other.combined.load()) {
other.combined.store(encode(nullptr, false));
}
// 移动赋值操作符
AtomicMarkedPointer& operator=(AtomicMarkedPointer&& other) noexcept {
if (this == &other) return *this;
combined.store(other.combined.load());
other.combined.store(encode(nullptr, false));
return *this;
}
bool compare_exchange_strong(AtomicMarkedPointer& expected, AtomicMarkedPointer desired) {
return combined.compare_exchange_strong(expected.combined, desired.combined);
}
T* get_pointer() const {
return decode_pointer(combined.load());
}
bool get_mark() const {
return decode_mark(combined.load());
}
void set(T* ptr, bool mark) {
combined.store(encode(ptr, mark));
}
void set(AtomicMarkedPointer ptr) {
combined.store(ptr.combined.load());
}
private:
std::atomic<uintptr_t> combined;
static uintptr_t encode(T* ptr, bool mark) {
return reinterpret_cast<uintptr_t>(ptr) | static_cast<uintptr_t>(mark);
}
static T* decode_pointer(uintptr_t encoded) {
return reinterpret_cast<T*>(encoded & ~(uintptr_t(1)));
}
static bool decode_mark(uintptr_t encoded) {
return encoded & uintptr_t(1);
}
};
// 节点结构,包含版本号
struct Node {
int key;
std::atomic<AtomicMarkedPointer<Node>> left;
std::atomic<AtomicMarkedPointer<Node>> right;
std::atomic<uint32_t> version; // 版本号
Node(int k) : key(k), left(nullptr), right(nullptr), version(0) {}
// 拷贝构造函数
Node(const Node& other)
: key(other.key), left(other.left.load()), right(other.right.load()), version(other.version.load()) {}
// 拷贝赋值操作符
Node& operator=(const Node& other) {
if (this == &other) return *this;
key = other.key;
left.store(other.left.load());
right.store(other.right.load());
version.store(other.version.load());
return *this;
}
// 移动构造函数
Node(Node&& other) noexcept
: key(other.key), left(other.left.load()), right(other.right.load()), version(other.version.load()) {
other.left.store(nullptr);
other.right.store(nullptr);
other.version.store(0);
}
// 移动赋值操作符
Node& operator=(Node&& other) noexcept {
if (this == &other) return *this;
key = other.key;
left.store(other.left.load());
right.store(other.right.load());
version.store(other.version.load());
other.left.store(nullptr);
other.right.store(nullptr);
other.version.store(0);
return *this;
}
};
// 二叉搜索树
class ConcurrentBST {
public:
ConcurrentBST() : root(nullptr) {}
void insert(int key) {
while (true) {
Node* parent = nullptr;
AtomicMarkedPointer<Node> current = root.load();
if (current.get_pointer() == nullptr) {
Node* new_node = new Node(key);
if (root.compare_exchange_strong(current, AtomicMarkedPointer<Node>(new_node, false))) {
return;
}
else {
delete new_node;
continue;
}
}
while (current.get_pointer() != nullptr) {
parent = current.get_pointer();
if (key < parent->key) {
current = parent->left.load();
}
else if (key > parent->key) {
current = parent->right.load();
}
else {
return;
}
}
Node* new_node = new Node(key);
uint32_t version = parent->version.load();
if (key < parent->key) {
if (parent->left.compare_exchange_strong(current, AtomicMarkedPointer<Node>(new_node, false))) {
parent->version.fetch_add(1); // 增加版本号
return;
}
}
else {
if (parent->right.compare_exchange_strong(current, AtomicMarkedPointer<Node>(new_node, false))) {
parent->version.fetch_add(1); // 增加版本号
return;
}
}
delete new_node;
}
}
void print_in_order() {
print_in_order(root.load().get_pointer());
std::cout << std::endl;
}
private:
std::atomic<AtomicMarkedPointer<Node>> root;
void print_in_order(Node* node) {
if (node != nullptr) {
print_in_order(node->left.load().get_pointer());
std::cout << node->key << " ";
print_in_order(node->right.load().get_pointer());
}
}
};
}
int mainCBST() {
BST::ConcurrentBST tree;
std::vector<std::thread> threads;
for (int i = 0; i < 10; i++) {
threads.push_back(std::thread([&tree, i]() {
tree.insert(i);
}));
}
for (auto& t : threads) {
t.join();
}
tree.print_in_order();
return 0;
}
代码说明
- Node 结构:每个节点包含一个键值和两个原子指针(left 和 right),分别指向左子节点和右子节点。
- AtomicMarkedPointer 类:这是一个模板类,用于将指针和标记(防止 ABA 问题)结合在一起。
- ConcurrentBST 类:这是并行二叉搜索树的主体,包含插入和打印函数。
- insert 方法:插入一个新的键值。如果树是空的,则将新节点设置为根节点。否则,找到正确的插入位置并插入新节点。
- print_in_order 方法:递归地打印树的内容。
处理 ABA 问题
在上面的代码中,AtomicMarkedPointer 使用标记位(1 bit)来处理 ABA 问题。标记位和实际指针结合在一起进行比较和交换操作,以确保在多线程环境中避免 ABA 问题。
多线程测试
在 main 函数中,我们创建了 10 个线程,每个线程插入一个键值到树中。最终,我们打印出树的内容。
这个示例展示了如何使用 CAS 和标记位来实现一个并行二叉搜索树,并避免 ABA 问题。实际应用中,您可能需要根据具体需求进行优化和调整。
补充说明,在 AtomicMarkedPointer 类中,该例使用了指针地址的最低位来存储标记位(mark)。这样做的确会修改指针的数值,但不会影响原始的指针语义。原始的指针数值并不关心其最低位的标记位,因此在使用 AtomicMarkedPointer 时,对于原始指针的操作不受影响。
具体来说,我们在 AtomicMarkedPointer 中使用了 reinterpret_cast 将指针转换为整数(uintptr_t 类型),然后将标记位存储在整数的最低位。这样做可以确保标记位的存储不会影响原始指针的数值,因为整数类型的最低位通常用来存储标记位或其他辅助信息。
在使用 AtomicMarkedPointer 类时,我们需要确保正确地处理指针和标记位,以确保在原始指针和标记位之间正确地进行转换和比较。在 AtomicMarkedPointer 类中,我们提供了 get_pointer() 和 get_mark() 方法来分别获取指针和标记位,以及 set() 方法来设置新的指针和标记位。
总之,尽管在 AtomicMarkedPointer 中修改了指针的数值,但我们可以通过适当的转换和处理来确保原始指针的语义不受影响。
并行hash map
使用 Compare-And-Swap (CAS) 实现并行哈希映射是一个高级并且复杂的任务,因为它涉及线程安全、原子操作以及处理ABA问题等。以下是一个使用 C++ 实现并行哈希映射的示例代码。为了解决 ABA 问题,我们可以引入版本号。
首先,让我们定义一些必要的结构:
- Node: 存储哈希映射的键值对。
- Bucket: 包含一个指向 Node 的指针和一个版本号,用于解决 ABA 问题。
- ConcurrentHashMap: 并行哈希映射的主要结构。
#include <atomic>
#include <vector>
#include <memory>
#include <iostream>
template <typename K, typename V>
class ConcurrentHashMap {
private:
struct Node {
K key;
V value;
std::shared_ptr<Node> next;
Node(K k, V v) : key(k), value(v), next(nullptr) {}
};
struct Bucket {
std::atomic<std::shared_ptr<Node>> head;
std::atomic<size_t> version;
Bucket() : head(nullptr), version(0) {}
};
std::vector<Bucket> buckets;
size_t capacity;
size_t hash(K key) {
return std::hash<K>{}(key) % capacity;
}
public:
ConcurrentHashMap(size_t capacity) : capacity(capacity), buckets(capacity) {}
bool insert(K key, V value) {
size_t index = hash(key);
Bucket& bucket = buckets[index];
size_t old_version = bucket.version.load();
std::shared_ptr<Node> new_node = std::make_shared<Node>(key, value);
while (true) {
std::shared_ptr<Node> old_head = bucket.head.load();
new_node->next = old_head;
if (bucket.head.compare_exchange_weak(old_head, new_node)) {
bucket.version.fetch_add(1);
return true;
}
size_t new_version = bucket.version.load();
if (new_version != old_version) {
old_version = new_version;
}
}
}
bool find(K key, V& value) {
size_t index = hash(key);
Bucket& bucket = buckets[index];
std::shared_ptr<Node> current = bucket.head.load();
while (current) {
if (current->key == key) {
value = current->value;
return true;
}
current = current->next;
}
return false;
}
bool remove(K key) {
size_t index = hash(key);
Bucket& bucket = buckets[index];
size_t old_version = bucket.version.load();
while (true) {
std::shared_ptr<Node> current = bucket.head.load();
std::shared_ptr<Node> prev = nullptr;
while (current && current->key != key) {
prev = current;
current = current->next;
}
if (!current) {
return false;
}
std::shared_ptr<Node> next = current->next;
if (!prev) {
if (bucket.head.compare_exchange_weak(current, next)) {
bucket.version.fetch_add(1);
return true;
}
} else {
if (prev->next.compare_exchange_weak(current, next)) {
bucket.version.fetch_add(1);
return true;
}
}
size_t new_version = bucket.version.load();
if (new_version != old_version) {
old_version = new_version;
}
}
}
};
int main() {
ConcurrentHashMap<int, std::string> map(10);
map.insert(1, "value1");
map.insert(2, "value2");
std::string value;
if (map.find(1, value)) {
std::cout << "Found: " << value << std::endl;
} else {
std::cout << "Not found." << std::endl;
}
if (map.remove(1)) {
std::cout << "Removed." << std::endl;
} else {
std::cout << "Not removed." << std::endl;
}
if (map.find(1, value)) {
std::cout << "Found: " << value << std::endl;
} else {
std::cout << "Not found." << std::endl;
}
return 0;
}
这个示例代码实现了一个基本的并行哈希映射,并使用了版本号来避免 ABA 问题。主要的思路是在每个桶(bucket)中包含一个版本号,并在每次修改操作后增加版本号,以检测并避免 ABA 问题。
主要注意点:
- Node 结构:存储键值对,并使用 std::shared_ptr 来管理链表中的节点。
- Bucket 结构:包含一个指向 Node 的原子指针和一个版本号。
- 插入操作:在插入新节点时,使用 compare_exchange_weak 原子操作来确保并发安全。
- 查找操作:遍历链表查找键值对。
- 删除操作:使用 compare_exchange_weak 原子操作来确保并发安全。
- 版本号:在每次插入和删除操作后,增加版本号,以检测并避免 ABA 问题。
这种设计虽然提供了基本的并行哈希映射功能,但在实际应用中,可能需要更多的优化和改进来满足性能和功能需求。
并行跳表
要实现并行的SkipList,并使用CAS算法避免ABA问题,我们需要设计一种并行算法来操作SkipList的节点,同时利用CAS确保线程安全并避免ABA问题。通常,这需要使用一种带有标记和版本号的指针,以便检测和避免ABA问题。
下面是一个简化的并行SkipList实现示例,使用CAS和标记指针来避免ABA问题:
#include <iostream>
#include <atomic>
#include <vector>
#include <limits>
#include <random>
class Node {
public:
int key;
int level;
std::vector<std::atomic<Node*>> forward;
Node(int key, int level) : key(key), level(level), forward(level + 1) {
for (int i = 0; i <= level; ++i) {
forward[i] = nullptr;
}
}
};
class SkipList {
public:
SkipList(int maxLevel) : maxLevel(maxLevel), level(0) {
header = new Node(std::numeric_limits<int>::min(), maxLevel);
tail = new Node(std::numeric_limits<int>::max(), maxLevel);
for (int i = 0; i <= maxLevel; ++i) {
header->forward[i] = tail;
}
}
~SkipList() {
Node* node = header;
while (node) {
Node* next = node->forward[0];
delete node;
node = next;
}
}
bool insert(int key) {
std::vector<Node*> update(maxLevel + 1);
Node* x = header;
for (int i = level; i >= 0; --i) {
while (x->forward[i] && x->forward[i].load()->key < key) {
x = x->forward[i];
}
update[i] = x;
}
x = x->forward[0].load();
if (x && x->key == key) {
return false;
} else {
int newLevel = randomLevel();
if (newLevel > level) {
for (int i = level + 1; i <= newLevel; ++i) {
update[i] = header;
}
level = newLevel;
}
Node* newNode = new Node(key, newLevel);
for (int i = 0; i <= newLevel; ++i) {
Node* oldForward;
do {
oldForward = update[i]->forward[i].load();
newNode->forward[i] = oldForward;
} while (!update[i]->forward[i].compare_exchange_weak(oldForward, newNode));
}
return true;
}
}
bool erase(int key) {
std::vector<Node*> update(maxLevel + 1);
Node* x = header;
for (int i = level; i >= 0; --i) {
while (x->forward[i] && x->forward[i].load()->key < key) {
x = x->forward[i];
}
update[i] = x;
}
x = x->forward[0].load();
if (!x || x->key != key) {
return false;
} else {
for (int i = 0; i <= level; ++i) {
Node* oldForward;
do {
oldForward = x->forward[i].load();
if (update[i]->forward[i].load() != x) {
break;
}
} while (!update[i]->forward[i].compare_exchange_weak(x, oldForward));
}
delete x;
while (level > 0 && header->forward[level].load() == tail) {
--level;
}
return true;
}
}
bool contains(int key) {
Node* x = header;
for (int i = level; i >= 0; --i) {
while (x->forward[i] && x->forward[i].load()->key < key) {
x = x->forward[i];
}
}
x = x->forward[0].load();
return x && x->key == key;
}
private:
int maxLevel;
Node* header;
Node* tail;
std::atomic<int> level;
int randomLevel() {
int lvl = 0;
static std::default_random_engine e;
static std::uniform_int_distribution<int> dist(0, 1);
while (dist(e) && lvl < maxLevel) {
++lvl;
}
return lvl;
}
};
int main() {
SkipList skiplist(4);
skiplist.insert(3);
skiplist.insert(6);
skiplist.insert(7);
skiplist.insert(9);
skiplist.insert(12);
skiplist.insert(19);
skiplist.insert(17);
skiplist.insert(26);
skiplist.insert(21);
skiplist.insert(25);
std::cout << "Contains 19: " << skiplist.contains(19) << std::endl;
std::cout << "Contains 15: " << skiplist.contains(15) << std::endl;
skiplist.erase(19);
std::cout << "Contains 19 after deletion: " << skiplist.contains(19) << std::endl;
return 0;
}
解释
- Node 类:表示跳表中的一个节点,包含一个 key 和一个 forward 指针数组。
- SkipList 类:实现了并行的 SkipList,包含插入、删除和查找操作。
- insert 方法:使用 CAS 操作插入新节点,并通过 randomLevel 方法决定新节点的层数。
- erase 方法:使用 CAS 操作删除节点,并更新前驱节点的 forward 指针。
- contains 方法:检查跳表中是否包含某个关键字。
ABA 问题的处理
为了避免 ABA 问题,示例代码使用了 compare_exchange_weak 和 compare_exchange_strong 进行 CAS 操作。通过不断重试和标记指针的方法,可以有效避免 ABA 问题。
线程模型
线程模型是指多线程编程中线程之间的组织结构和调度方式,主要描述的是线程之间如何协作完成任务。不同的线程模型会影响程序的并发性、性能以及可维护性等方面。
常见的几种线程模型包括:
单线程模型(Single-Threaded Model):
在单线程模型中,程序只有一个线程来执行任务。所有的任务按顺序逐个执行,没有并发性。这种模型适用于简单的、单任务的应用程序,例如某些命令行工具或者简单的图形用户界面应用程序。
多线程模型(Multi-Threaded Model):
多线程模型允许程序同时运行多个线程,每个线程可以执行独立的任务。不同的线程之间可以并发执行,提高了程序的响应速度和并发处理能力。例如,一个网络服务器可以使用多线程模型同时处理多个客户端请求。
固定线程池模型(Fixed Thread Pool Model):
在固定线程池模型中,创建了一个固定数量的线程池,线程池中的线程数量是预先确定的。任务会被分配给线程池中的空闲线程执行。当任务完成后,线程会返回线程池以供下次使用。这种模型适用于需要限制线程数量的场景,例如服务器端的并发处理。
工作窃取模型(Work Stealing Model):
工作窃取模型是一种用于并行计算的线程调度算法。在这种模型中,每个线程都有一个本地任务队列,线程首先尝试执行自己队列中的任务。当本地队列为空时,线程会从其他线程的队列中窃取任务来执行,以充分利用系统资源。这种模型常用于并行计算框架中,例如Fork/Join框架。
下面编码实现Work Stealing Model,以c++代码为例:
#include <iostream>
#include <thread>
#include <vector>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <chrono>
class WorkStealingThreadPool {
public:
WorkStealingThreadPool(size_t num_threads) : threads_(num_threads), done_(false) {
for (size_t i = 0; i < num_threads; ++i) {
threads_[i] = std::thread([this, i]() { worker_thread(i); });
}
}
~WorkStealingThreadPool() {
done_ = true;
for (auto& thread : threads_) {
thread.join();
}
}
void submit(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(mutex_);
tasks_.push_front(task);
}
cv_.notify_one();
}
private:
void worker_thread(size_t index) {
while (!done_) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return done_ || !tasks_.empty(); });
if (!tasks_.empty()) {
task = tasks_.back();
tasks_.pop_back();
}
}
if (task) {
task();
} else {
// Try to steal work from other threads
for (size_t i = 1; i < threads_.size(); ++i) {
size_t steal_index = (index + i) % threads_.size();
std::unique_lock<std::mutex> lock(mutex_);
if (!tasks_[steal_index].empty()) {
task = tasks_[steal_index].front();
tasks_[steal_index].pop_front();
break;
}
}
if (task) {
task();
}
}
}
}
std::vector<std::thread> threads_;
std::vector<std::deque<std::function<void()>>> tasks_;
std::mutex mutex_;
std::condition_variable cv_;
std::atomic<bool> done_;
};
int main() {
WorkStealingThreadPool pool(std::thread::hardware_concurrency());
// Submit tasks to the thread pool
for (int i = 0; i < 1000; ++i) {
pool.submit([i]() {
std::cout << "Task " << i << " executed by thread " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});
}
std::this_thread::sleep_for(std::chrono::seconds(60)); // Wait for tasks to complete
return 0;
}
消息传递模型(Message Passing Model):
在消息传递模型中,不同的线程之间通过消息进行通信和同步。线程通过发送和接收消息来实现数据共享和协作。这种模型常用于分布式系统和并行计算领域,例如Actor模型。
下面以c++为例说明该模型:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
class MessageQueue {
public:
void push(std::function<void()> msg) {
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(msg);
cv_.notify_one();
}
std::function<void()> pop() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return !queue_.empty(); });
auto msg = queue_.front();
queue_.pop();
return msg;
}
private:
std::mutex mutex_;
std::condition_variable cv_;
std::queue<std::function<void()>> queue_;
};
class Worker {
public:
Worker(MessageQueue& queue) : queue_(queue) {}
void start() {
thread_ = std::thread([this]() {
while (true) {
auto msg = queue_.pop();
if (!msg) break;
msg();
}
});
}
void join() {
if (thread_.joinable()) {
thread_.join();
}
}
private:
MessageQueue& queue_;
std::thread thread_;
};
int main() {
MessageQueue queue;
Worker worker1(queue);
Worker worker2(queue);
worker1.start();
worker2.start();
queue.push([]() { std::cout << "Message 1 processed by thread " << std::this_thread::get_id() << std::endl; });
queue.push([]() { std::cout << "Message 2 processed by thread " << std::this_thread::get_id() << std::endl; });
queue.push([]() { std::cout << "Message 3 processed by thread " << std::this_thread::get_id() << std::endl; });
worker1.join();
worker2.join();
return 0;
}
Actor模型C++实现
再使用c++实现一个简单的Actor模型,该模型与早期的Active Object模型很相似:
#include <iostream>
#include <string>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
class Actor {
public:
Actor() : running_(true) {
thread_ = std::thread([this]() {
while (running_) {
std::function<void()> message;
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return !messages_.empty() || !running_; });
if (!running_) break;
message = std::move(messages_.front());
messages_.pop();
}
message();
}
});
}
~Actor() {
running_ = false;
cv_.notify_one();
thread_.join();
}
template<typename F, typename... Args>
void send(F&& f, Args&&... args) {
{
std::lock_guard<std::mutex> lock(mutex_);
messages_.emplace([f, args...](){ f(args...); });
}
cv_.notify_one();
}
private:
std::thread thread_;
std::queue<std::function<void()>> messages_;
std::mutex mutex_;
std::condition_variable cv_;
bool running_;
};
int main() {
Actor actor;
actor.send([]() { std::cout << "Message 1 processed by thread " << std::this_thread::get_id() << std::endl; });
actor.send([]() { std::cout << "Message 2 processed by thread " << std::this_thread::get_id() << std::endl; });
actor.send([]() { std::cout << "Message 3 processed by thread " << std::this_thread::get_id() << std::endl; });
// This delay is to ensure all messages are processed before exiting
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return 0;
}
固定线程模式
c++示例:
#include <iostream>
#include <thread>
#include <vector>
void task() {
std::cout << "Task executed by thread " << std::this_thread::get_id() << std::endl;
}
int main() {
// 创建固定数量的线程池
std::vector<std::thread> threadPool;
int numThreads = 4;
for (int i = 0; i < numThreads; ++i) {
threadPool.emplace_back(task);
}
// 等待所有线程执行完毕
for (auto& t : threadPool) {
t.join();
}
return 0;
}
java示例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolModel {
public static void main(String[] args) {
// 创建固定数量的线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
// 提交任务到线程池执行
for (int i = 0; i < 4; ++i) {
executor.submit(() -> {
System.out.println("Task executed by thread " + Thread.currentThread().getId());
});
}
// 关闭线程池
executor.shutdown();
}
}
haxe示例:
class FixedThreadPoolModel {
static function main() {
// 创建固定数量的线程池
var executor = new haxe.concurrent.Executor(4);
// 提交任务到线程池执行
for (i in 0...4) {
executor.execute(function() {
trace("Task executed by thread " + Thread.current().getId());
});
}
}
}
上面的模型侧重于大类的分组。下面再介绍几种
其他多线程模型:
Reactor模式(也称为事件处理模式):
- 适用于I/O密集型场景,如网络服务器。
- 有一个或多个并发输入源,有一个Service Handler,和多个Request Handlers。
- 当有输入事件到来时,由分发器(如epoll, kqueue, IOCP等)通知Service Handler,然后由Service Handler根据事件的类型动态地将其分派给某个具体的Request Handler。
- 著名的Netty框架就是基于Reactor模式的。
- c++例子代码:
#include <iostream>
#include <vector>
#include <sys/epoll.h>
#include <fcntl.h>
#include <unistd.h>
class EventHandler {
public:
virtual void handleEvent(uint32_t events) = 0;
virtual int getFd() const = 0;
virtual ~EventHandler() {}
};
class Reactor {
public:
Reactor() {
epollFd_ = epoll_create1(0);
if (epollFd_ == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
}
~Reactor() {
close(epollFd_);
}
void addHandler(EventHandler* handler) {
int fd = handler->getFd();
struct epoll_event event;
event.data.ptr = handler;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event)) {
perror("epoll_ctl: add");
exit(EXIT_FAILURE);
}
handlers_.push_back(handler);
}
void removeHandler(EventHandler* handler) {
int fd = handler->getFd();
if (epoll_ctl(epollFd_, EPOLL_CTL_DEL, fd, nullptr)) {
perror("epoll_ctl: del");
exit(EXIT_FAILURE);
}
handlers_.erase(std::remove(handlers_.begin(), handlers_.end(), handler), handlers_.end());
}
void run() {
const int maxEvents = 10;
struct epoll_event events[maxEvents];
while (true) {
int eventCount = epoll_wait(epollFd_, events, maxEvents, -1);
for (int i = 0; i < eventCount; i++) {
EventHandler* handler = static_cast<EventHandler*>(events[i].data.ptr);
handler->handleEvent(events[i].events);
}
}
}
private:
int epollFd_;
std::vector<EventHandler*> handlers_;
};
// 示例EventHandler实现,仅用于演示目的。
class EchoServer : public EventHandler {
public:
EchoServer(int port) {
// 创建一个socket,绑定到指定端口,并开始监听等。
}
void handleEvent(uint32_t events) override {
// 处理连接请求,接收数据,并回显给客户端。
}
int getFd() const override {
// 返回socket的文件描述符。
return 0; // 示例值,应替换为实际的文件描述符。
}
};
int main() {
Reactor reactor;
EchoServer server(8080); // 假设我们在8080端口上运行echo服务器。
reactor.addHandler(&server); // 将服务器添加到Reactor中。
reactor.run(); // 开始事件循环。
return 0;
}
上述代码是一个简化的示例,用于说明Reactor模式的基本结构和原理。在实际应用中,你需要根据具体需求来实现EventHandler的子类,并处理网络编程中的各种细节(如创建socket、绑定、监听、接受连接、读写数据等)。同时,错误处理和资源管理也是非常重要的部分,应确保所有资源在使用后都得到正确释放。
另外,这个示例使用了epoll,它是Linux特有的。如果你在Windows上编程,可以考虑使用IOCP(I/O Completion Ports)来实现类似的功能。
Producer-Consumer模式:
- 也称为生产者-消费者模式。
- 适用于解耦数据的产生和消费,使得两者可以并发执行,提高吞吐量。
- 生产者将数据放入缓冲区,消费者从缓冲区取出数据进行处理。
- Java中的BlockingQueue就是一个典型的Producer-Consumer模式的实现。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::queue<int> data_queue;
std::mutex mtx;
std::condition_variable cond_var;
bool stop = false;
void producer(int id) {
for (int i = 0; i < 5; ++i) {
std::unique_lock<std::mutex> lock(mtx);
data_queue.push(id * 100 + i);
std::cout << "Producer " << id << " produced " << id * 100 + i << std::endl;
cond_var.notify_one();
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cond_var.wait(lock, [] { return !data_queue.empty() || stop; });
if (stop && data_queue.empty()) {
break;
}
int data = data_queue.front();
data_queue.pop();
lock.unlock();
std::cout << "Consumer " << id << " consumed " << data << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
int main() {
std::thread prod1(producer, 1);
std::thread prod2(producer, 2);
std::thread cons1(consumer, 1);
std::thread cons2(consumer, 2);
prod1.join();
prod2.join();
stop = true;
cond_var.notify_all();
cons1.join();
cons2.join();
return 0;
}
Master-Worker模式(也称为Leader-Follower模式):
- 有一个Master线程和多个Worker线程。
- Master负责接收和分配任务,Worker负责执行任务。
- 当Worker完成任务后,会通知Master,然后Master再分配新的任务。
- 这种模式可以充分利用多核CPU,提高系统的吞吐量。
- c++示例:
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <functional>
std::queue<std::function<void()>> taskQueue;
std::mutex queueMutex;
std::condition_variable condVar;
bool stopWorkers = false;
void workerFunction() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queueMutex);
condVar.wait(lock, [] { return stopWorkers || !taskQueue.empty(); });
if (stopWorkers && taskQueue.empty()) {
break;
}
task = std::move(taskQueue.front());
taskQueue.pop();
}
task();
}
}
void masterFunction(int numWorkers) {
std::vector<std::thread> workers;
for (int i = 0; i < numWorkers; ++i) {
workers.emplace_back(workerFunction);
}
// 示例任务:打印数字
for (int i = 0; i < 10; ++i) {
int taskId = i;
taskQueue.push([taskId]() {
std::cout << "Task " << taskId << " is running by " << std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 模拟耗时任务
std::cout << "Task " << taskId << " completed by " << std::this_thread::get_id() << std::endl;
});
}
// 等待所有任务完成
while (!taskQueue.empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 等待一段时间,让worker线程有机会处理任务
}
// 停止所有worker线程
{
std::unique_lock<std::mutex> lock(queueMutex);
stopWorkers = true;
}
condVar.notify_all(); // 通知所有worker线程停止工作
// 等待所有worker线程结束
for (auto& worker : workers) {
worker.join();
}
}
int main() {
int numWorkers = 4; // 设置worker线程数量
masterFunction(numWorkers); // 启动Master-Worker模式处理任务
return 0;
}
这个示例程序创建了一个Master线程和多个Worker线程。Master线程负责生成任务并将其添加到任务队列中,而Worker线程则从任务队列中获取任务并执行。当所有任务都完成后,Master线程会通知Worker线程停止工作,并等待它们结束。这个示例程序使用C++11的线程库和标准库中的其他并发原语(如互斥锁和条件变量)来实现线程同步。
Future模式:
- 当我们需要调用一个异步的API并等待其结果时,可以使用Future模式。
- Future表示一个异步计算的结果,它提供了检查计算是否完成的方法,以等待计算的完成,并可以使用get()方法来获取计算的结果。
- Java中的java.util.concurrent.Future接口和ExecutorService就是基于这种模式的。
- c++示例:
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
// 模拟一个耗时的操作
int longRunningOperation() {
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时操作
return 42; // 返回结果
}
int main() {
// 创建一个异步任务,该任务将在一个单独的线程中运行,并返回一个std::future对象
std::future<int> resultFuture = std::async(longRunningOperation);
// 在这里,你可以继续执行其他任务,而不必等待longRunningOperation完成
std::cout << "Doing other work..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟其他工作
// 现在,我们需要longRunningOperation的结果。如果结果尚未可用,get()方法将阻塞
int result = resultFuture.get(); // 等待异步操作完成并获取结果;当然可以使用waitxxx函数避免一直阻塞在这里。
std::cout << "The result is: " << result << std::endl; // 输出结果
return 0;
}
这个示例使用了C++标准库中的std::future和std::async来实现Future模式。std::async函数用于异步启动一个任务,并返回一个std::future对象,该对象将在未来某个时间点包含任务的结果。std::future::get方法用于获取结果,如果结果尚未可用,它将阻塞调用线程,直到结果准备好为止。
注意:这个示例使用了C++11及以上版本的特性,因此请确保你的编译器支持这些特性。如果你使用的是较旧的编译器或标准库,你可能需要使用其他方法或库来实现Future模式。例如,你可以使用Boost库中的boost::future或boost::promise来实现类似的功能。
Actor模型:
- Actor模型是一种并发计算的模型,它将计算实体封装为一个“Actor”,每个Actor都有自己的状态和行为,它们之间通过发送和接收消息来通信。
- Actor模型强调“消息传递”的并发方式,可以避免共享状态导致的并发问题。
- Erlang语言、Akka框架和Scala的Actor库都是基于Actor模型的。
- java使用actor模型,如使用Maven,则需要在pom.xml中添加以下依赖:
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.13</artifactId>
<version>2.6.14</version>
</dependency>
</dependencies>
然后,你可以创建一个简单的Actor:
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.ActorSystem;
import java.util.HashMap;
import java.util.Map;
// 定义一个消息类型
class Greeting {
private final String message;
Greeting(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
// 定义一个Actor
class Greeter extends AbstractActor {
Map<String, Integer> greetings = new HashMap<>();
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Greeting.class, this::greet)
.build();
}
private void greet(Greeting greeting) {
greetings.put(greeting.getMessage(), greetings.getOrDefault(greeting.getMessage(), 0) + 1);
System.out.println("Greeting received: " + greeting.getMessage());
}
}
public class Main {
public static void main(String[] args) {
// 创建一个Actor系统
ActorSystem system = ActorSystem.create("greeter-system");
// 创建一个Actor的Props对象,这是创建Actor的配方或蓝图
Props props = Props.create(Greeter.class, () -> new Greeter());
// 使用system的actorOf方法创建一个Actor实例
ActorRef greeter = system.actorOf(props, "greeter");
// 向Actor发送消息
greeter.tell(new Greeting("Hello, Akka!"), ActorRef.noSender());
}
}
这个示例中,我们定义了一个Greeting消息类型和一个GreeterActor。GreeterActor接收Greeting消息,并在控制台上打印出消息内容。在main方法中,我们创建了一个Actor系统,并使用该系统创建了一个GreeterActor实例,然后向该Actor发送了一个Greeting消息。
CSP(Communicating Sequential Processes)模型:
- CSP是一种描述并发系统行为的数学模型,它强调通过通道(channel)进行通信的并发过程(process)。
- 在CSP模型中,一个过程可以与多个其他过程进行通信,但只能通过通道发送和接收消息。这有助于避免共享状态,并减少并发问题。
- Go语言的并发模型就是基于CSP的,通过goroutine和channel实现。
- c++示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <optional>
// 通道类,用于进程间通信
template <typename T>
class Channel {
public:
void send(const T& value) {
std::unique_lock<std::mutex> lock(mutex_);
data_queue_.push(value);
cond_var_.notify_one(); // 通知接收方有新数据
}
std::optional<T> receive() {
std::unique_lock<std::mutex> lock(mutex_);
cond_var_.wait(lock, [this]() { return !data_queue_.empty(); }); // 等待数据
T value = data_queue_.front();
data_queue_.pop();
return value;
}
private:
std::queue<T> data_queue_;
std::mutex mutex_;
std::condition_variable cond_var_;
};
// 发送方进程
void sender_process(Channel<int>& channel) {
for (int i = 0; i < 5; ++i) {
channel.send(i);
std::cout << "Sender sent: " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟耗时操作
}
}
// 接收方进程
void receiver_process(Channel<int>& channel) {
while (true) {
auto value = channel.receive(); // 阻塞等待数据
if (value) {
std::cout << "Receiver received: " << *value << std::endl;
} else {
break; // 如果通道关闭或出错,则退出循环(本例中未实现通道关闭逻辑)
}
}
}
int main() {
Channel<int> channel; // 创建一个整数通道
std::thread sender(sender_process, std::ref(channel)); // 启动发送方进程
std::thread receiver(receiver_process, std::ref(channel)); // 启动接收方进程
sender.join(); // 等待发送方完成
receiver.join(); // 等待接收方完成(在实际CSP模型中,接收方可能会一直运行,直到显式终止)
return 0;
}
这个示例中,Channel 类提供了一个简单的通道实现,允许发送方发送数据,并允许接收方接收数据。发送和接收操作都是线程安全的,并且接收操作在没有数据时会被阻塞。sender_process 和 receiver_process 分别模拟了CSP中的发送方和接收方进程。请注意,这个示例是为了展示CSP的基本概念而简化的,并没有处理通道关闭或错误处理等更复杂的情况。在实际应用中,你可能需要扩展这个模型以处理更多的边界情况和错误处理逻辑。
多线程模型示例
可以使用UML图来解释线程模型的主要算法
Master-worker
摘自google搜索的活动图
具体控制逻辑参考下图
master/slave模型的主要控制活动展示
Work Stealing Model变体-自己改写的
多个场景线程会有争夺分配权的活动,解耦多线程之间的“窃取”活动
降低线程间交互的同步问题,可以采用下面的算法,
可以将需要交互的操作,放入共享队列,待每帧结束后,在控制线程中,顺序完成共享队列中的任务,如下图:
需要跨线程访问的functor或在控制线程中执行,此时workthread已经暂停运行
或者参考actor模式,每个线程中,放置一个无锁队列,将相互调用的functor放入其中,类似Actor模式。如图示意:
跨线程调用会将functor投递到目标线程的thread-safety-queue中,functor在对方线程被调用
总结,上图NodePort的线程模型,每个线程基本上是独立完成自己的业务的;相互之间的交互通信,都是通过彼此的安全队列来传递。这样的模型可能会存在‘饥饿’现象,也既某些线程比较忙碌,而某些线程比较空闲,如下图所示,以我当初优化地图线程的测试数据为例:
某些地图线程有饥饿现象
这时,我们可以通过Work Stealing Model或它的变体,优化它,做到·各个线程的负载平衡,效果如下图:
地图线程的负载是均衡的
PPL、TBB和thread_pool
PPL(Parallel Patterns Library)和TBB(Threading Building Blocks)是两种常用的C++并行编程库,旨在简化多线程编程,提升程序的性能。
PPL (Parallel Patterns Library)
PPL是微软提供的C++并行编程库,主要在Windows平台上使用。PPL提供了一组高级并行算法和数据结构,简化了多线程编程。
用法与用途
- 任务并行:PPL 提供了一种机制,使得多个任务可以在Windows线程池上并行执行。这有助于开发者将程序拆分为多个独立的任务,并利用多核处理器并行执行这些任务,从而提高程序的执行效率。
- 并行算法:PPL 提供了一系列并行处理数据集合的泛型算法,这些算法类似于标准模板库(STL)所提供的算法,但能够在数据集合并发执行工作。例如,parallel_for 算法支持在数据集合上进行并行迭代操作。
- 并行容器和对象:PPL 还提供了一些线程安全的并行容器类型,如并行向量、并行哈希表等。这些容器允许开发者在并行环境中安全地存储和访问数据。
原理
PPL 构建在并发运行时的计划和资源管理组件上,通过提供并行作用于数据的泛型安全算法和容器,提高了应用程序代码与基础线程机制之间的抽象级别。它利用Windows线程池来实现任务的并行执行,并自动管理线程的创建、销毁和调度,从而简化了并行编程的复杂性。
PPL 示例代码
以下是一个使用PPL实现并行循环的示例代码:
#include <iostream>
#include <vector>
#include <ppl.h>
int main() {
const int size = 1000;
std::vector<int> vec(size);
// Fill the vector using parallel_for
concurrency::parallel_for(0, size, [&vec](int i) {
vec[i] = i * i;
});
// Print the first 10 elements
for (int i = 0; i < 10; ++i) {
std::cout << vec[i] << " ";
}
std::cout << std::endl;
return 0;
}
在这个示例中,concurrency::parallel_for 用于并行地填充向量 vec,从而提升性能。
TBB (Threading Building Blocks)
TBB是英特尔提供的C++并行编程库,支持多平台,包括Windows、Linux和macOS。TBB提供了一组通用的并行算法、容器和底层线程管理机制。
用法与用途
- 并行算法:TBB 提供了一系列高效的并行算法,如parallel_for、parallel_reduce、parallel_sort等。这些算法可以帮助开发者在并行环境中高效地处理数据。例如,parallel_for 可以在一个范围内并行执行for循环。
- 任务调度:TBB 的任务调度器可以自动将任务分配给可用的线程,无需开发者手动管理线程的创建和销毁。这大大简化了并行编程的复杂性。
- 并行容器:与PPL类似,TBB 也提供了一些线程安全的并行容器,以便在并行环境中安全地存储和访问数据。
- 内存分配器:TBB 还提供了一个高效的内存分配器,用于在多线程环境中分配和释放内存。
原理
TBB 是一个开源的C++并行计算库,旨在提升数据并行计算的能力。它使用了一些优化技术,如任务窃取和任务合并,以提高并行任务的执行效率。此外,TBB 还提供了高级抽象和易于使用的接口,使得并行编程变得简单和直观。它是一个跨平台的库,可以在多种操作系统和硬件架构上使用。
TBB 示例代码
以下是一个使用TBB实现并行循环的示例代码:
#include <iostream>
#include <vector>
#include <tbb/tbb.h>
int main() {
const int size = 1000;
std::vector<int> vec(size);
// Fill the vector using parallel_for
tbb::parallel_for(0, size, [&vec](int i) {
vec[i] = i * i;
});
// Print the first 10 elements
for (int i = 0; i < 10; ++i) {
std::cout << vec[i] << " ";
}
std::cout << std::endl;
return 0;
}
在这个示例中,tbb::parallel_for 用于并行地填充向量 vec,同样提升了性能。
PPL 和 TBB 对比
- 平台支持:
- PPL主要支持Windows平台。
- TBB支持多平台,包括Windows、Linux和macOS。
- 易用性:
- PPL集成在Visual Studio中,适合使用微软开发工具链的开发者。
- TBB提供了更丰富的并行算法和数据结构,适合跨平台开发和高性能需求的应用。
- 功能特性:
- PPL提供了基本的并行循环和任务调度。
- TBB提供了更高级的特性,例如并行排序、并行管道和并行分治算法。
安装和配置
PPL 安装和配置
PPL是Visual Studio的一部分,使用Visual Studio开发环境时,PPL自动包含在内。只需包含头文件 #include <ppl.h> 并链接必要的库即可。
TBB 安装和配置
TBB可以从英特尔的官方网站或通过包管理器(如vcpkg或Homebrew)安装。
使用vcpkg安装TBB:
vcpkg install tbb
CMake示例配置:
cmake_minimum_required(VERSION 3.10)
project(TBBExample)
set(CMAKE_CXX_STANDARD 11)
find_package(TBB REQUIRED)
add_executable(TBBExample main.cpp)
target_link_libraries(TBBExample TBB::tbb)
Boost::thread_pool
boost::thread_pool 是 Boost 库中一个用于管理和调度线程的组件。尽管 Boost 没有一个直接命名为 boost::thread_pool 的组件,但可以使用 boost::asio::thread_pool 或 boost::asio::io_context 来实现线程池功能。
Boost.Thread 基础
首先,需要安装和配置 Boost 库。可以通过包管理器如 vcpkg 安装:
vcpkg install boost-asio
使用boost::asio::thread_pool
Boost Asio 提供了 boost::asio::thread_pool,它是一个简单易用的线程池实现。
线程池的原理
- 任务队列:线程池维护一个任务队列,任务是要执行的工作单元。
- 工作线程:线程池内部有多个工作线程,它们会从任务队列中提取任务并执行。
- 同步和互斥:为了保证线程安全,任务队列通常使用互斥锁或其他同步机制来保护。
代码示例
以下是一个使用 boost::asio::thread_pool 的示例代码:
#include <boost/asio.hpp>
#include <iostream>
#include <vector>
#include <thread>
void print_number(int number) {
std::cout << "Number: " << number << " Thread ID: " << std::this_thread::get_id() << std::endl;
}
int main() {
// 创建一个包含4个线程的线程池
boost::asio::thread_pool pool(4);
// 向线程池提交10个任务
for (int i = 0; i < 10; ++i) {
boost::asio::post(pool, [i]() {
print_number(i);
});
}
// 等待所有任务完成
pool.join();
return 0;
}
更高级的用法
boost::asio::io_context 也可以用来实现更高级的线程池功能,提供细粒度的控制和更多的功能。
代码示例
以下示例展示了如何使用 boost::asio::io_context 来实现线程池:
#include <boost/asio.hpp>
#include <iostream>
#include <vector>
#include <thread>
void print_number(int number) {
std::cout << "Number: " << number << " Thread ID: " << std::this_thread::get_id() << std::endl;
}
int main() {
// 创建 io_context 和工作守护者
boost::asio::io_context io_context;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard(io_context.get_executor());
// 创建线程池
std::vector<std::thread> threads;
for (int i = 0; i < 4; ++i) {
threads.emplace_back([&io_context]() {
io_context.run();
});
}
// 向 io_context 提交任务
for (int i = 0; i < 10; ++i) {
boost::asio::post(io_context, [i]() {
print_number(i);
});
}
// 释放工作守护者并等待线程完成
work_guard.reset();
for (auto& thread : threads) {
thread.join();
}
return 0;
}
boost::basic_thread_pool
boost::basic_thread_pool 是 Boost.Thread 库的一部分,用于管理和调度线程。代码示例如下:
class TaskCounted;
class TaskBase
{
public:
virtual void run_once() = 0;
};
class TaskCounted : public TaskBase
{
public:
TaskCounted(TaskBase * pTask);
~TaskCounted();
void run_once();
static void reset_counter();
static bool empty();
static void wait_all();
static TaskCounted * malloc(TaskBase * pTask);
static void free(TaskCounted *);
private:
TaskBase * _pTask;
static std::atomic_int _counter;
static std::mutex _counter_mtx;
static std::condition_variable _counter_cv;
static ObjectPool<TaskCounted> _task_alloc;
};
std::atomic_int TaskCounted::_counter(0);
std::mutex TaskCounted::_counter_mtx;
std::condition_variable TaskCounted::_counter_cv;
ObjectPool<TaskCounted> TaskCounted::_task_alloc;
TaskCounted::TaskCounted(TaskBase * pTask)
:_pTask(pTask)
{
std::unique_lock <std::mutex> lck(_counter_mtx);
++_counter;
}
TaskCounted::~TaskCounted()
{
std::unique_lock <std::mutex> lck(_counter_mtx);
--_counter;
_counter_cv.notify_all();
}
void TaskCounted::reset_counter()
{
_counter.store(0);
}
bool TaskCounted::empty()
{
return _counter.load() < 1;
}
void TaskCounted::wait_all()
{
std::unique_lock <std::mutex> lck(_counter_mtx);
while (!empty())
_counter_cv.wait(lck);
}
void TaskCounted::run_once() {
if (!_pTask)
return;
_pTask->run_once();
}
TaskCounted * TaskCounted::malloc(TaskBase * pTask)
{
return _task_alloc.construct(pTask);
}
void TaskCounted::free(TaskCounted * p)
{
_task_alloc.destroy(p);
}
#pragma once
// thread_pool_executor.h
#include <boost/thread/thread_pool.hpp>
#include <functional>
#include "task_base.h"
class TPTaskExecutor
{
public:
TPTaskExecutor(int nThreadNum=0);
void submit(TaskBase * pBegin);
void parallel_do(TaskBase * pBegin, int count);
template<typename _Iterator, typename F>
void parallel_do(_Iterator begin, _Iterator end, F f) {
for (auto it = begin; it != end; ++it)
{
TaskBase * pBegin = f(it);
TaskCounted * p = TaskCounted::malloc(pBegin);
_thp.submit([p]() { p->run_once(); TaskCounted::free(p); });
}
}
template<typename K>
void parallel_do(std::map<K, TaskBase*> & mp)
{
parallel_do(mp.begin(), mp.end(), [](decltype(mp.begin()) it)->TaskBase* { return it->second; });
}
void parallel_do(std::vector<TaskBase*> &vec)
{
parallel_do(std::begin(vec), std::end(vec), [](decltype(std::begin(vec)) it)->TaskBase* { return *it; });
}
void parallel_do(std::list<TaskBase*> &vec)
{
parallel_do(std::begin(vec), std::end(vec), [](decltype(std::begin(vec)) it)->TaskBase* { return *it; });
}
void wait_all();
private:
void wait();
private:
boost::executors::basic_thread_pool _thp;
};
#include "stdafx.h"
#include "thread_pool_executor.h"
TPTaskExecutor::TPTaskExecutor(int nThreadNum)
:_thp(nThreadNum<1?(boost::thread::hardware_concurrency()+1):nThreadNum)
{
;
}
void TPTaskExecutor::wait()
{
//while (_thp.try_executing_one())
// ;
TaskCounted::wait_all();
}
void TPTaskExecutor::submit(TaskBase * pBegin)
{
TaskCounted * p = TaskCounted::malloc(pBegin);
_thp.submit([p]() { p->run_once(); TaskCounted::free(p); });
}
void TPTaskExecutor::parallel_do(TaskBase * pBegin, int count)
{
TaskBase * pEnd = pBegin + count;
for (;pBegin != pEnd;++pBegin)
{
TaskCounted * p = TaskCounted::malloc(pBegin);
_thp.submit([p]() { p->run_once(); TaskCounted::free(p); });
}
}
void TPTaskExecutor::wait_all()
{
wait();
}
线程池剖析
线程池提供了一种简单而高效的方式来管理多个线程。它内部维护一个任务队列和一组工作线程,这些工作线程从任务队列中提取任务并执行。当然,这个过程必须是线程安全的,要确保任务的并行执行。
原理
- 任务队列:线程池内部维护一个任务队列,用于存放待执行的任务。这些任务可以是函数、函数对象或者Lambda表达式。
- 线程集合:线程池管理着一组工作线程。这些线程会循环从任务队列中取出任务并执行。
- 任务调度:线程池内部有一个调度器,它负责从任务队列中取出任务,并将其分配给空闲的工作线程。调度策略可以是FIFO(先进先出)、LIFO(后进先出)或优先级调度等。
- 线程管理:线程池负责创建、销毁和复用线程。当有新任务到来时,如果线程池中有空闲线程,则复用该线程执行任务;如果没有空闲线程且线程池未达到最大线程数,则创建新线程;如果线程池已满,则新任务将在任务队列中等待。
- 并发控制:线程池需要确保对共享资源(如任务队列)的访问是线程安全的,通常通过使用互斥锁、条件变量等同步机制来实现。
一个简单的线程池
我们自己实现一个简单的基于C++11标准库线程的线程池,以下是代码示例:
#include <vector>
#include <queue>
#include <future>
#include <functional>
#include <stdexcept>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <iostream>
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
std::atomic<bool> done;
};
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false), done(false)
{
for(size_t i = 0; i < threads; ++i) {
workers.emplace_back(
[this] {
for(;;) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty()) {
return;
}
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
}
// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace(task { (*task)(); });
}
condition.notify_one();
return res;
}
// ... (其他函数的实现,例如析构函数)
int main() {
ThreadPool pool(4);
auto result = pool.enqueue( { return x + y; }, 10, 20);
std::cout << "Result: " << result.get() << std::endl;
return 0;
}
总结
多线程与多核运算通过提高计算效率、缩短计算时间、提高资源利用率、增加可扩展性以及提升系统响应能力和用户体验等方面的好处,为处理大规模数据和复杂计算任务提供了强大的支持。
多线程与多核编程,是我们需要掌握的一项技能,其核心思想和具体的编程语言无关。然后当我们运用他们的时候,由于它的复杂性,实现起来比较困难,并且极容易出错,所以,我们可以多练习,以期能正确的运用这一编程技能。
多线程编程注意事项:
- 线程安全:
- 多线程编程中,多个线程可能会同时访问共享资源,因此需要确保线程安全,避免数据竞争、死锁和活锁等问题。
- 可以使用同步机制(如互斥锁、读写锁、条件变量等)来确保共享资源的安全访问。
- 线程调度:
- 合理地调度线程,确保每个线程都有足够的时间执行任务,避免线程饥饿或CPU资源的过度使用。
- 了解并合理使用线程的优先级,以便在需要时能够让重要的线程优先执行。
- 线程间通信:
- 多线程之间需要进行有效的通信,以便协调任务和共享数据。
- 使用合适的同步机制,如信号量、消息队列等,来实现线程间的安全通信。
- 异常处理:
- 在多线程编程中,异常处理是一个重要环节。需要确保每个线程都有适当的异常处理机制,以便在发生错误时能够及时处理,防止程序崩溃。
- 资源管理:
- 需要管理线程相关的各种资源,如内存、文件句柄、网络连接等。
- 确保在适当的时候释放资源,避免资源泄漏、内存溢出和句柄耗尽等问题。
多核编程注意事项:
- 任务划分与并行性:
- 将计算任务划分为多个子任务,这些子任务应该尽量相互独立,减少核间通信的开销。
- 确保任务划分均匀,避免某些核心负载过重,而其他核心闲置。
- 多核锁竞争:
- 在多核环境中,锁竞争可能导致核心等待,从而降低效率。因此,应尽量减少锁的使用,或者采用更高效的并发数据结构来避免锁竞争。
- 资源充分利用:
- 充分利用每个核心的计算能力,避免资源浪费。可以通过合理的任务调度和负载均衡策略来实现。
多线程与多核编程的精髓:
- 并发与并行:多线程与多核编程的精髓在于利用并发和并行的优势来提高程序的执行效率和响应速度。通过合理地划分任务和调度线程,可以充分利用多核处理器的计算能力,实现更高的吞吐量和更低的延迟。
- 共享与同步:在多线程与多核环境中,共享资源和数据是不可避免的。因此,如何高效地共享资源并确保数据的一致性是多线程与多核编程的重要精髓。这需要通过合适的同步机制和并发数据结构来实现。
- 错误处理与鲁棒性:在多线程与多核编程中,错误处理和鲁棒性设计同样重要。由于多个线程或核心可能同时访问和修改共享资源,因此需要设计健壮的错误处理机制和容错策略来确保程序的稳定运行。