百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

Python线程同步实现方式详解(python 线程同步)

toyiye 2024-08-31 02:55 4 浏览 0 评论

一个线程启动后,其会自行运行。但如果希望它们能同步运行,应该怎么做呢?

举个简单的例子,有两个线程 A 和 B,A 负责从网络上读取数据,保持到变量 X 中,B 负责处理变量X中的数据,这时线程 B 就需要和 A 同步。也就是说 B 需要等 A 给其一个信号,其才可以开始去做自己的事情。同样,B 完成了任务后也需要通知 A,告诉 A 变量 X 中间的数据已经处理完了,可以将新的数据放入 X 了。

图 1 表示了这个过程:

实现线程同步的方式有很多种,下面分别进行介绍。

1、thread.Lock线程锁

借助锁,可以得到一些排他的资源。例如,为某个资源 A 加上锁 L,如果要使用该资源,则必须得到锁 L,这个锁可以保证在任意时候,只有一个线程可以得到它。其他线程如果想得到已经被别的线程得到的锁,只能等待锁的拥有者主动释放锁。

这个锁类提供了 acquire() 和 release() 两个接口函数。release() 表示某线程已经完成了任务,其他线程可以开始自己的工作了;acquire() 表示某线程计划做某个工作,请在可以开始时通知它。

所以做某件工作可以用下面三步构成:

  • acquire():等条件成熟了告诉某线程。
  • do_the work():条件成熟了,开始工作。
  • release():工作完成,告诉其他线程可以开始工作了。


现在回到前面的例子上,我们需要两个锁。一个用来写变量 X,用 write_lock 来表示;一个用来读变量 X,用 read_lock 来表示。写变量 X 的过程如下:

write_lock.acquire()
X=var
read_lock.release()


读变量 X 的过程如下:

read_lock.acquire()
var=X
write_lock.release()


下面是完整的代码。

import sys, time                        # 引入时间库
if sys.version_info.major == 2:         # Python 2
import thread
else:                                   # Python 3
import _thread as thread               # 创建两个锁,一个用来读,一个用来写
read_lock = thread.allocate_lock()
write_lock = thread.allocate_lock()
X = 0                                   # 变量X,用来保存两个线程之间交换的数据
def write_thread_entry():               # 写线程的入口函数
    global X, read_lock,  write_lock
    for i in range(2, 10, 1):
        write_lock.acquire()
        X = i
        read_lock.release()
def read_thread_entry():                # 读线程的入口函数
    global X, read_lock, write_lock
    while True:
        read_lock.acquire()
        print("Processing X = %d" % X)
        write_lock.release()
def start_threads():                    # 启动线程
    read_lock.acquire()                 # read_lock处于被占用状态
    t1 = thread.start_new_thread(write_thread_entry, tuple())
    t2 = thread.start_new_thread(read_thread_entry, tuple())
    time.sleep(5)
if __name__=='__main__':                # 如果是运行该脚本而不是import该文件
    start_threads()

运行结果如下:

$ python lockDemo1.py # 运行脚本
Processing X = 2 # 程序第20行的输出
Processing X = 3 # 当前X的值
Processing X = 4
Processing X = 5
Processing X = 6
Processing X = 7
Processing X = 8
Processing X = 9

2、threading.Lock线程锁

threading 包也包含一个类 Lock,其提供的函数也是 acquire() 和 release(),这两个函数和 thread.Lock 类的函数一样。但是其并没有提供 allocate_lock() 接口函数来创建 Lock,需要我们用构造函数自己创建。

下面的代码使用几乎一样的方法实现了前面的例子:

import sys, time
import threading                       # 引入线程库
read_lock = threading.Lock()
write_lock = threading.Lock()
X = 0                                  # 变量X,读写线程交互信息的载体
def write_thread_entry():
    global X, read_lock,  write_lock
    for i in range(2, 10, 1):
        write_lock.acquire()
        X = i
        read_lock.release()
def read_thread_entry():
    global X, read_lock, write_lock
    while True:
        read_lock.acquire()
        print("Processing X = %d" % X)
        write_lock.release()
def start_threads():
    read_lock.acquire()         # read_lock处于被占用状态
    t1 = threading.Thread(target=write_thread_entry)
    t1.setDaemon(True)
    t1.start()
    t2 = threading.Thread(target=read_thread_entry)
    t2.setDaemon(True)
    t2.start()
    time.sleep(5)
if __name__=='__main__':
    start_threads()

运行结果如下:

$ python lockDemo1.py
Processing X = 2 # 程序第16行的输出
Processing X = 3 # 显示当前X的值
Processing X = 4
Processing X = 5
Processing X = 6
Processing X = 7
Processing X = 8
Processing X = 9

3、threading.RLock可重入锁

前面介绍的 threading.Lock 有一个问题,就是对某个人 threading.Lock 对象在同一个线程内重复调用两次 acquire() 会发生锁死现象。

下面的代码演示了这个情况:

import sys, time
import threading
lock_obj1 = threading.Lock()            # 创建锁对象
def thread_entry():                     # 子线程入口函数
    global lock_obj1                    # 使用全局变量lock_obj1
    print("Child Thread: thread_entry() Is Running")
    lock_obj1.acquire()                         # 第一次调用acquire(),成功
    print("Child Thread: acquire(1) Finished")
    lock_obj1.acquire()                         # 第二次调用acquire(),阻塞
            # 我们看不到下面的两句输出
    print("Child Thread: acquire(2) Finished")
    print("Child Thread: Quit")
def start_threads():                            # 主线程
    global lock_obj1
    t1 = threading.Thread(target=thread_entry)
    t1.setDaemon(True)
    t1.start()
    time.sleep(5)
print("Main Thread: Quit")
if __name__=='__main__':            # 作为脚本执行
    start_threads()

运行结果如下:

$ python lockDemo3.py
Child Thread: thread_entry() Is Running
Child Thread: acquire(1) Finished
Main Thread: Quit

可以看到子线程在第二次调用 acquire() 时卡住了。threading.RLock() 类就是为了解决这个问题的,其用法和 Threading.Lock() 基本相同,区别是如果某个线程调用了自己已经调用过的 RLock,不会被阻塞。

下面将前面代码的第三行:

lock_obj1 = threading.Lock()

修改为

lock_obj1 = threading.RLock()

其他都保持不变,再次运行后结果如下:

$ python lockDemo4.py
Child Thread: thread_entry() Is Running
Child Thread: acquire(1) Finished
Child Thread: acquire(2) Finished
Child Thread: Quit
Main Thread: Quit

这里还有一个特别的地方,就是 release() 被调用的次数要求和 acquire() 被调用的次数相同,否则其他的非所有者线程还是会被卡在 acquire() 上。

某个线程成功执行 acquire() 后,该线程就是该 threading.RLock 对象的所有者。threading.RLock 对象内部有一个成员变量 _RLock__count,其类型为整数。如 _RLock__count=0,表示其没有被任意线程所有,因而任意线程都可以成功执行 acquire() 来获得该 threading.RLock对象;如果 _RLock__count>0,则只有上次成功执行 acquire()的所有者线程才可以成功执行 acquire(),其他线程都会被阻塞。成功执行 acquire() 后,_RLock__count 的值会加一;而成功执行 release() 后,_RLo ck__count 的值会减一。

>>> import threading                   # 引入库
>>> lock_obj1 = threading.RLock()      # 创建RLock对象
>>> lock_obj1._RLock__count            # 查看当前的值
                                # =0表示还没有所有者
>>> lock_obj1.acquire()                # 获得该RLock对象
True
>>> lock_obj1._RLock__count            # 查看值
                                       # 已经被acquire()一次了
>>> lock_obj1.acquire()                # 继续获得RLock,同一个线程
1
>>> lock_obj1._RLock__count
2
>>> lock_obj1.release()                # 释放一次,值应该减一
>>> lock_obj1._RLock__count
1
>>> lock_obj1.release()
>>> lock_obj1._RLock__count            # =0表示没有所有者了
0

4、threading.Condition条件变量

条件变量提供两个接口,一个是 wait(),表示等待有线程调用 notify();另一个是 nodify(),表示激活处于等待的线程。

下面是一个简单的例子,其每隔 3 秒就将工作线程唤醒一次。注意在调用 wait() 和 notify() 之前要调用 acquire(),在调用 wait() 和 notify 之后要调用 release()。

import sys, time
import threading                                # 引入线程库
def thread_entry(id, condition_obj):            # 线程入口函数
    print("Worker Thread %d: thread_entry() Is Running" % id)
    for round in range(3):                      # 循环3次
        condition_obj.acquire()                 # 等待
        condition_obj.wait()
        print("Worker Thread %d: is Doing Work" % id)
        condition_obj.release()
        time.sleep(0.1*id)
    print("Worker Thread %d : Quit" % id)       # 线程结束
def start_threads():                            # 创建线程
    condition_obj = threading.Condition()       # 创建condition
    t1 = threading.Thread(target=thread_entry, args=(1, condition_obj))
    t1.start()                                  # 启动线程
    time.sleep(0.1)                             # 休眠0.1秒
    t2 = threading.Thread(target=thread_entry, args=(2, condition_obj))
    t2.start()
    for round in range(3):
        time.sleep(2)
        condition_obj.acquire()
        condition_obj.notify_all()               # 通知子线程开始工作
        condition_obj.release()
if __name__=='__main__':
    start_threads()

运行结果如下:

$ python conditionDemo1.py
Worker Thread 1: thread_entry() Is Running
Worker Thread 2: thread_entry() Is Running
Worker Thread 1: is Doing Work
Worker Thread 2: is Doing Work
Worker Thread 1: is Doing Work
Worker Thread 2: is Doing Work
Worker Thread 1: is Doing Work
Worker Thread 2: is Doing Work
Worker Thread 1 : Quit
Worker Thread 2 : Quit


如果觉得使用 acquire() 和 release() 比较麻烦,也可以用 with 语句,如

with condition_obj:
condition_obj.notify_all()

等效于:

condition_obj.acquire()
condition_obj.notify_all()
condition_obj.release()

现在代码可以写成:

import sys, time
import threading                                        # 引入线程库
def thread_entry(id, condition_obj):
    print("Worker Thread %d: thread_entry() Is Running" % id)
    for round in range(3):                              # 循环3次
        with condition_obj:
        condition_obj.wait()
        print("Worker Thread %d: is Doing Work" % id)
time.sleep(0.1*id)
    print("Worker Thread %d : Quit" % id)
def start_threads():
condition_obj = threading.Condition()
    t1 = threading.Thread(target=thread_entry, args=(1, condition_obj))
    t1.start()
time.sleep(0.1)
    t2 = threading.Thread(target=thread_entry, args=(2, condition_obj))
    t2.start()
    for round in range(3):                              # 循环3次
time.sleep(2)
        with condition_obj:                             # 进入竞争区
        condition_obj.notify_all()                      # 通知子线程
if __name__=='__main__':
start_threads()

5、threading.Semaphore信号量

Semaphore 和 Lock 的作用相似,其不同之处是 Lock 只能被一个线程获得,其他的线程都只能等待,而 Semaphore 可以被 N 个线程同时获得,N 也可以等于 1。

下面是其用法演示:

import sys, time
import threading                        # 引入线程库
def thread_entry(id, Semaphore_obj):    # 线程入口函数
    print("Worker Thread %d: thread_entry() Is Running" % id)
    time.sleep(1.8)
    for round in range(3):              # 循环3次
        Semaphore_obj.acquire()
        print("Worker Thread %d: is Doing Work" % id)
        time.sleep(0.1*id)
    print("Worker Thread %d : Quit" % id)
def start_threads():
    Semaphore_obj = threading.Semaphore(3)      # 创建3个元素的sem
    t1 = threading.Thread(target=thread_entry, args=(1, Semaphore_obj))
    t1.start()
    t2 = threading.Thread(target=thread_entry, args=(2, Semaphore_obj))
    t2.start()
    t3 = threading.Thread(target=thread_entry, args=(3, Semaphore_obj))
    t3.start()
    t4 = threading.Thread(target=thread_entry, args=(4, Semaphore_obj))
    t4.start()
    for round in range(9):
        time.sleep(2)
        print("Release() is Called")      # 释放sem
        Semaphore_obj.release()
    print("Main Thread Quit")
if __name__=='__main__':
    start_threads()

运行结果如下:

$ python semaphoreDemo1.py # 运行脚本
Worker Thread 1: thread_entry() Is Running # 子线程启动
Worker Thread 2: thread_entry() Is Running
Worker Thread 3: thread_entry() Is Running
Worker Thread 4: thread_entry() Is Running
Worker Thread 2: is Doing Work # 仅有3个线程可以并行工作
Worker Thread 1: is Doing Work
Worker Thread 3: is Doing Work
Release() is Called # 释放sem,这时有一个子线程可以工作了
Worker Thread 4: is Doing Work # 被第9行释放的sem唤醒
Release() is Called # 再次释放一个sem
Worker Thread 1: is Doing Work
Release() is Called
Worker Thread 2: is Doing Work
Release() is Called
Worker Thread 3: is Doing Work
Release() is Called
Worker Thread 4: is Doing Work
Release() is Called
Worker Thread 1: is Doing Work
Worker Thread 1 : Quit
Release() is Called
Worker Thread 2: is Doing Work
Worker Thread 2 : Quit
Release() is Called
Worker Thread 3: is Doing Work
Worker Thread 3 : Quit
Release() is Called
vMain Thread Quit
Worker Thread 4: is Doing Work
Worker Thread 4 : Quit

可以看到最开始有 3 个线程并行工作,到后来每调用一次 release() 就会有一个进程开始工作。Semephore 内部保存一个属性 _Semaphore__value 该值在初始化时设置,在上面的代码中设置为 3。每次调用 acquire() 时判断该值是否大于 0,如果大于 0,则将该值减一并立即返回;如果等于 0,则一直等待直到该值大于 0。而在 release() 时,其将 _Semaphore__value 值加一,这样原来阻塞在 acquire() 上的线程可能就会开始执行了。

下面介绍信号量的一些属性和方法。

1) _Semaphore__value属性

该属性值表示还有多少个线程可以得到该 Semaphore。其在 Semaphore 初始化时被初始化。该属性对 Python 2 有效,在 Python 3 中则被 _value 替代。下面是 Python 2 中的情况:

>>> import threading
>>> sem_obj = threading.Semaphore(3)
>>> sem_obj._Semaphore__value
3
>>> sem_obj.acquire()
True
>>> sem_obj._Semaphore__value
2
>>> sem_obj.release()
>>> sem_obj._Semaphore__value
3
>>> sem_obj.release()
>>> sem_obj._Semaphore__value
4
>>> sem_obj.release()
>>> sem_obj._Semaphore__value
5

下面是 Python 3 中的情况:

>>> import threading
>>> sem_obj = threading.Semaphore(3)
>>> sem_obj._value
3
>>> sem_obj.acquire()
True
>>> sem_obj._value
2
>>> sem_obj.release()
>>> sem_obj._value
3
>>> sem_obj.release()
>>> sem_obj._value
4
>>> sem_obj.release()
>>> sem_obj._value
5

需要注意的是,该属性值可以大于初始值。如最开始设定的初始值为 3,但其通过调用 release() 可以达到 4 或 5。这在很多系统中是不会出现的现象。

2) acquire()得到信号量

该函数在 Python 2 中只有一个参数 blocking,如果为 1 表示一直等待;为 0 表示立刻返回。如果得到了信号量,返回值是 True,否则是 False。

>>> import threading
>>> sem_obj = threading.Semaphore(1)
>>> sem_obj._Semaphore__value
1
>>> sem_obj.acquire()
True
>>> sem_obj._Semaphore__value
0
>>> sem_obj.acquire(0)     # 不等待,直接返回
False

在 Python 3 中,多了一个超时参数,表示最多等待的时间,单位为秒。

>>> import threading
>>> sem_obj = threading.Semaphore(1)
>>> sem_obj._value
1
>>> sem_obj.acquire()
True
>>> sem_obj._value
0
>>> sem_obj.acquire(1, 3)     # 等待最多3秒
False

3) release()释放信号量

该函数没有任何参数,而且返回值也是 None。

>>> import threading                   # 引入threading库
>>> sem_obj = threading.Semaphore(1)   # 创建Semaphore对象
>>> sem_obj.acquire()
True
>>> ret = sem_obj.release()             # 返回值是None
>>> ret is None
True

6、threading.Event事件

Event 可以看作是某个开关状态,可以通过 set() 来闭合开关,也可以通过 clear() 来断开开关,还可以使用 wait() 来等待开关的闭合。

import sys, time
import threading                                        # 引入线程库
def thread_entry(id, evt):                              # 线程入口函数
    print("Child Thread %d Wait for event" % id)
    evt.wait()                                          # 等待可以执行
    print("Child Thread %d Quit" % id)  # 子线程退出
def start_threads():
    event_obj1 = threading.Event()      # 创建事件
    thread1 = threading.Thread(target=thread_entry, args=(1, event_obj1))
    thread1.start()                     # 启动子线程1
    thread2 = threading.Thread(target=thread_entry, args=(2, event_obj1))
    thread2.start()                     # 启动子线程2       
    time.sleep(0.8)
    print("Active Thread Number = %d" % threading.active_count())
    time.sleep(1.8)
    event_obj1.set()                    # 允许子线程运行
    print("Main Thread Quit")
if __name__=='__main__':
    start_threads()

运行结果如下:

$ python eventDemo1.py
Child Thread 1 Wait for event
Child Thread 2 Wait for event
Active Thread Number = 3
Main Thread Quit
Child Thread 1 Quit
Child Thread 2 Quit


下面介绍 Event 对象的接口函数。

1) is_set():得到 Event 实例对象的状态

新创建的 Event 对象处于非 set 状态。注意:set 状态表示闭合开关。

>>> event_obj = threading.Event()
>>> event_obj.is_set()
False

2) wait(timeout):等待Event实例对象变成set状态

该接口函数有超时参数,表示最多等待多少秒。不提供该参数表示一直等待直到 Event 实例对象变成 set 状态。如果超时,其返回值为 False,否则返回值为 True。

>>> event_obj = threading.Event()     # 创建Event实例对象event_obj
>>> event_obj.set()                   # 设置为set状态(闭合开关)
>>> ret = event_obj.wait()            # 等待,没有超时参数
>>> ret                               # 返回值为True
True
>>> event_obj.clear()                 # 清除set状态(断开开关)
>>> ret = event_obj.wait(0.1)         # 超时时间为0.1秒
>>> ret                               # 返回值为False,表示超时返回
False

3) set():设置状态

这样所有 wait() 都会满足条件返回,相当于开关闭合。

>>> event_obj = threading.Event()
>>> event_obj.is_set()
False
>>> event_obj.set()
>>> event_obj.is_set()
True
>>> event_obj.set()             # 多次设置也没有问题
>>> event_obj.is_set()
True

4) clear():清除状态

这样所有 wait() 都会被阻塞,相当于开关断开。

>>> event_obj = threading.Event()
>>> event_obj.is_set()
False
>>> event_obj.set()
>>> event_obj.is_set()
True
>>> event_obj.clear()
>>> event_obj.is_set()
False
>>> event_obj.clear()
>>> event_obj.is_set()
False

相关推荐

# Python 3 # Python 3字典Dictionary(1)

Python3字典字典是另一种可变容器模型,且可存储任意类型对象。字典的每个键值(key=>value)对用冒号(:)分割,每个对之间用逗号(,)分割,整个字典包括在花括号({})中,格式如...

Python第八课:数据类型中的字典及其函数与方法

Python3字典字典是另一种可变容器模型,且可存储任意类型对象。字典的每个键值...

Python中字典详解(python 中字典)

字典是Python中使用键进行索引的重要数据结构。它们是无序的项序列(键值对),这意味着顺序不被保留。键是不可变的。与列表一样,字典的值可以保存异构数据,即整数、浮点、字符串、NaN、布尔值、列表、数...

Python3.9又更新了:dict内置新功能,正式版十月见面

机器之心报道参与:一鸣、JaminPython3.8的热乎劲还没过去,Python就又双叒叕要更新了。近日,3.9版本的第四个alpha版已经开源。从文档中,我们可以看到官方透露的对dic...

Python3 基本数据类型详解(python三种基本数据类型)

文章来源:加米谷大数据Python中的变量不需要声明。每个变量在使用前都必须赋值,变量赋值以后该变量才会被创建。在Python中,变量就是变量,它没有类型,我们所说的"类型"是变...

一文掌握Python的字典(python字典用法大全)

字典是Python中最强大、最灵活的内置数据结构之一。它们允许存储键值对,从而实现高效的数据检索、操作和组织。本文深入探讨了字典,涵盖了它们的创建、操作和高级用法,以帮助中级Python开发...

超级完整|Python字典详解(python字典的方法或操作)

一、字典概述01字典的格式Python字典是一种可变容器模型,且可存储任意类型对象,如字符串、数字、元组等其他容器模型。字典的每个键值key=>value对用冒号:分割,每个对之间用逗号,...

Python3.9版本新特性:字典合并操作的详细解读

处于测试阶段的Python3.9版本中有一个新特性:我们在使用Python字典时,将能够编写出更可读、更紧凑的代码啦!Python版本你现在使用哪种版本的Python?3.7分?3.5分?还是2.7...

python 自学,字典3(一些例子)(python字典有哪些基本操作)

例子11;如何批量复制字典里的内容2;如何批量修改字典的内容3;如何批量修改字典里某些指定的内容...

Python3.9中的字典合并和更新,几乎影响了所有Python程序员

全文共2837字,预计学习时长9分钟Python3.9正在积极开发,并计划于今年10月发布。2月26日,开发团队发布了alpha4版本。该版本引入了新的合并(|)和更新(|=)运算符,这个新特性几乎...

Python3大字典:《Python3自学速查手册.pdf》限时下载中

最近有人会想了,2022了,想学Python晚不晚,学习python有前途吗?IT行业行业薪资高,发展前景好,是很多求职群里严重的香饽饽,而要进入这个高薪行业,也不是那么轻而易举的,拿信工专业的大学生...

python学习——字典(python字典基本操作)

字典Python的字典数据类型是基于hash散列算法实现的,采用键值对(key:value)的形式,根据key的值计算value的地址,具有非常快的查取和插入速度。但它是无序的,包含的元素个数不限,值...

324页清华教授撰写【Python 3 菜鸟查询手册】火了,小白入门字典

如何入门学习python...

Python3.9中的字典合并和更新,了解一下

全文共2837字,预计学习时长9分钟Python3.9正在积极开发,并计划于今年10月发布。2月26日,开发团队发布了alpha4版本。该版本引入了新的合并(|)和更新(|=)运算符,这个新特性几乎...

python3基础之字典(python中字典的基本操作)

字典和列表一样,也是python内置的一种数据结构。字典的结构如下图:列表用中括号[]把元素包起来,而字典是用大括号{}把元素包起来,只不过字典的每一个元素都包含键和值两部分。键和值是一一对应的...

取消回复欢迎 发表评论:

请填写验证码