百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程字典 > 正文

Python进程池multiprocessing.Pool的用法

toyiye 2024-08-29 00:38 7 浏览 0 评论

一、multiprocessing模块

multiprocessing模块提供了一个Process类来代表一个进程对象,multiprocessing模块像线程一样管理进程,这个是multiprocessing的核心,它与threading很相似,对多核CPU的利用率会比threading好的多

看一下Process类的构造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

参数说明:
group:进程所属组(基本不用)
target:表示调用对象
args:表示调用对象的位置参数元组
name:别名
kwargs:表示调用对象的字典

示例:

import multiprocessing


def do(n):             # 参数n由args=(1,)传入
    name = multiprocessing.current_process().name        # 获取当前进程的名字
    print(name, 'starting')
    print("worker ", n)
    return


if __name__ == '__main__':
    numList = []
    for i in range(5):
        p = multiprocessing.Process(target=do, args=(i,))      # (i,)中加入","表示元祖
        numList.append(p)
        print(numList)
        p.start()                 # 用start()方法启动进程,执行do()方法
        p.join()                  # 等待子进程结束以后再继续往下运行,通常用于进程间的同步
        print("Process end.")

运行结果:

[<Process(Process-1, initial)>]
Process-1 starting
worker  0
Process end.
[<Process(Process-1, stopped)>, <Process(Process-2, initial)>]
Process-2 starting
worker  1
Process end.
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, initial)>]
Process-3 starting
worker  2
Process end.
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, initial)>]
Process-4 starting
worker  3
Process end.
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, initial)>]
Process-5 starting
worker  4
Process end.

通过打印numList可以看出当前进程结束后,再开始下一个进程

注意:
在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的
if __name__ == ‘__main__’ :语句的下面,才能正常使用Windows下的进程模块。Unix/Linux下则不需要

二、Pool类

Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求
下面介绍一下multiprocessing 模块下的Pool类下的几个方法:

1.apply()

函数原型:apply(func[, args=()[, kwds={}]])

该函数用于传递不定参数,同python中的apply函数一致,主进程会被阻塞直到函数执行结束(不建议使用,并且3.x以后不再出现)

2.apply_async

函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

与apply用法一致,但它是非阻塞的且支持结果返回后进行回调

3.map()

函数原型:map(func, iterable[, chunksize=None])

Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到结果返回
注意:虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程

4.map_async()

函数原型:map_async(func, iterable[, chunksize[, callback]])
与map用法一致,但是它是非阻塞的

5.close()

关闭进程池(pool),使其不再接受新的任务

6.terminal()

结束工作进程,不再处理未处理的任务

7.join()

主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用

示例1--使用map()函数

import time
from multiprocessing import Pool


def run(fn):
    # fn: 函数参数是数据列表的一个元素
    time.sleep(1)
    print(fn * fn)


if __name__ == "__main__":
    testFL = [1, 2, 3, 4, 5, 6]
    print('shunxu:')  # 顺序执行(也就是串行执行,单进程)
    s = time.time()
    for fn in testFL:
        run(fn)
    t1 = time.time()
    print("顺序执行时间:", int(t1 - s))

    print('concurrent:')  # 创建多个进程,并行执行
    pool = Pool(3)  # 创建拥有3个进程数量的进程池
    # testFL:要处理的数据列表,run:处理testFL列表中数据的函数
    pool.map(run, testFL)
    pool.close()  # 关闭进程池,不再接受新的进程
    pool.join()  # 主进程阻塞等待子进程的退出
    t2 = time.time()
    print("并行执行时间:", int(t2 - t1))

运行结果:

1、map函数中testFL为可迭代对象--列表

2、当创建3个进程时,会一次打印出3个结果“1,4,9”,当当创建2个进程时,会一次打印出2个结果“1,4”,以此类推,当创建多余6个进程时,会一次打印出所有结果

3、如果使用Pool(),不传入参数,可以创建一个动态控制大小的进程池

从结果可以看出,并发执行的时间明显比顺序执行要快很多,但是进程是要耗资源的,所以平时工作中,进程数也不能开太大。 对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),让其不再接受新的Process了

示例2--使用map()_async函数

print('concurrent:')  # 创建多个进程,并行执行
    pool = Pool(3)  # 创建拥有3个进程数量的进程池
    # testFL:要处理的数据列表,run:处理testFL列表中数据的函数
    pool.map_async(run, testFL)
    pool.close()  # 关闭进程池,不再接受新的进程
    pool.join()  # 主进程阻塞等待子进程的退出
    t2 = time.time()
    print("并行执行时间:", int(t2 - t1))

运行结果:

从结果可以看出,map_async()和map()用时相同。目前还没有看出两者的区别,后面知道后再完善

示例3--使用apply()函数

    print('concurrent:')  # 创建多个进程,并行执行
    pool = Pool(3)  # 创建拥有3个进程数量的进程池
    # testFL:要处理的数据列表,run:处理testFL列表中数据的函数
    for fn in testFL:
        pool.apply(run, (fn,))
    pool.close()  # 关闭进程池,不再接受新的进程
    pool.join()  # 主进程阻塞等待子进程的退出
    t2 = time.time()
    print("并行执行时间:", int(t2 - t1))

运行结果:

可见,使用apply()方法,并行执行和顺序执行用时相同,经过试验,进程数目增大也不会减少并行执行的时间

原因:以阻塞的形式产生进程任务,生成1个任务进程并等它执行完出池,第2个进程才会进池,主进程一直阻塞等待,每次只执行1个进程任务

示例4--使用apply_async()函数

print('concurrent:')  # 创建多个进程,并行执行
    pool = Pool(3)  # 创建拥有3个进程数量的进程池
    # testFL:要处理的数据列表,run:处理testFL列表中数据的函数
    for fn in testFL:
        pool.apply_async(run, (fn,))
    pool.close()  # 关闭进程池,不再接受新的进程
    pool.join()  # 主进程阻塞等待子进程的退出
    t2 = time.time()
    print("并行执行时间:", int(t2 - t1))

运行结果:

可见,使用apply_async()方法,并行执行时间与使用map()、map_async()方法相同

注意:

map_async()和map()方法,第2个参数可以是列表也可以是元祖,如下图:

而使用apply()和apply_async()方法时,第2个参数只能传入元祖,传入列表进程不会被执行,如下图:

三、apply_async()方法callback参数的用法

示例:

from multiprocessing import Pool
import time


def fun_01(i):
    time.sleep(2)
    print('start_time:', time.ctime())
    return i + 100


def fun_02(arg):
    print('end_time:', arg, time.ctime())


if __name__ == '__main__':
    pool = Pool(3)
    for i in range(4):
        pool.apply_async(func=fun_01, args=(i,), callback=fun_02)  # fun_02的入参为fun_01的返回值
        # pool.apply_async(func=fun_01, args=(i,))
    pool.close()
    pool.join()
    print('done')

运行结果:

start_time: Thu Nov 14 16:31:41 2019
end_time: 100 Thu Nov 14 16:31:41 2019
start_time: Thu Nov 14 16:31:41 2019
end_time: 101 Thu Nov 14 16:31:41 2019
start_time: Thu Nov 14 16:31:41 2019
end_time: 102 Thu Nov 14 16:31:41 2019
start_time: Thu Nov 14 16:31:43 2019
end_time: 103 Thu Nov 14 16:31:43 2019
done

map_async()方法callback参数的用法与apply_async()相同

四、使用进程池并关注结果

import multiprocessing
import time


def func(msg):
    print('hello :', msg, time.ctime())
    time.sleep(2)
    print('end', time.ctime())
    return 'done' + msg


if __name__ == '__main__':
    pool = multiprocessing.Pool(2)
    result = []
    for i in range(3):
        msg = 'hello %s' % i
        result.append(pool.apply_async(func=func, args=(msg,)))

    pool.close()
    pool.join()

    for res in result:
        print('***:', res.get())             # get()函数得出每个返回结果的值

    print('All end--')

运行结果:

五、多进程执行多个函数

使用apply_async()或者apply()方法,可以实现多进程执行多个方法

示例:

import multiprocessing
import time
import os


def Lee():
    print('\nRun task Lee--%s******ppid:%s' % (os.getpid(), os.getppid()), '~~~~', time.ctime())
    start = time.time()
    time.sleep(5)
    end = time.time()
    print('Task Lee,runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())


def Marlon():
    print("\nRun task Marlon-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
    start = time.time()
    time.sleep(10)
    end = time.time()
    print('Task Marlon runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())


def Allen():
    print("\nRun task Allen-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
    start = time.time()
    time.sleep(15)
    end = time.time()
    print('Task Allen runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())


def Frank():
    print("\nRun task Frank-%s******ppid:%s" % (os.getpid(), os.getppid()), '~~~~', time.ctime())
    start = time.time()
    time.sleep(20)
    end = time.time()
    print('Task Frank runs %0.2f seconds.' % (end - start), '~~~~', time.ctime())


if __name__ == '__main__':
    func_list = [Lee, Marlon, Allen, Frank]
    print('parent process id %s' % os.getpid())

    pool = multiprocessing.Pool(4)
    for func in func_list:
        pool.apply_async(func)

    print('Waiting for all subprocesses done...')
    pool.close()
    pool.join()
    print('All subprocesses done.')

运行结果:

parent process id 84172
Waiting for all subprocesses done...

Run task Lee--84868******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019

Run task Marlon-84252******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019

Run task Allen-85344******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019

Run task Frank-85116******ppid:84172 ~~~~ Thu Nov 14 17:44:14 2019
Task Lee,runs 5.00 seconds. ~~~~ Thu Nov 14 17:44:19 2019
Task Marlon runs 10.00 seconds. ~~~~ Thu Nov 14 17:44:24 2019
Task Allen runs 15.00 seconds. ~~~~ Thu Nov 14 17:44:29 2019
Task Frank runs 20.00 seconds. ~~~~ Thu Nov 14 17:44:34 2019
All subprocesses done.

六、其他

1、获取当前计算机的CPU数量

相关推荐

# Python 3 # Python 3字典Dictionary(1)

Python3字典字典是另一种可变容器模型,且可存储任意类型对象。字典的每个键值(key=>value)对用冒号(:)分割,每个对之间用逗号(,)分割,整个字典包括在花括号({})中,格式如...

Python第八课:数据类型中的字典及其函数与方法

Python3字典字典是另一种可变容器模型,且可存储任意类型对象。字典的每个键值...

Python中字典详解(python 中字典)

字典是Python中使用键进行索引的重要数据结构。它们是无序的项序列(键值对),这意味着顺序不被保留。键是不可变的。与列表一样,字典的值可以保存异构数据,即整数、浮点、字符串、NaN、布尔值、列表、数...

Python3.9又更新了:dict内置新功能,正式版十月见面

机器之心报道参与:一鸣、JaminPython3.8的热乎劲还没过去,Python就又双叒叕要更新了。近日,3.9版本的第四个alpha版已经开源。从文档中,我们可以看到官方透露的对dic...

Python3 基本数据类型详解(python三种基本数据类型)

文章来源:加米谷大数据Python中的变量不需要声明。每个变量在使用前都必须赋值,变量赋值以后该变量才会被创建。在Python中,变量就是变量,它没有类型,我们所说的"类型"是变...

一文掌握Python的字典(python字典用法大全)

字典是Python中最强大、最灵活的内置数据结构之一。它们允许存储键值对,从而实现高效的数据检索、操作和组织。本文深入探讨了字典,涵盖了它们的创建、操作和高级用法,以帮助中级Python开发...

超级完整|Python字典详解(python字典的方法或操作)

一、字典概述01字典的格式Python字典是一种可变容器模型,且可存储任意类型对象,如字符串、数字、元组等其他容器模型。字典的每个键值key=>value对用冒号:分割,每个对之间用逗号,...

Python3.9版本新特性:字典合并操作的详细解读

处于测试阶段的Python3.9版本中有一个新特性:我们在使用Python字典时,将能够编写出更可读、更紧凑的代码啦!Python版本你现在使用哪种版本的Python?3.7分?3.5分?还是2.7...

python 自学,字典3(一些例子)(python字典有哪些基本操作)

例子11;如何批量复制字典里的内容2;如何批量修改字典的内容3;如何批量修改字典里某些指定的内容...

Python3.9中的字典合并和更新,几乎影响了所有Python程序员

全文共2837字,预计学习时长9分钟Python3.9正在积极开发,并计划于今年10月发布。2月26日,开发团队发布了alpha4版本。该版本引入了新的合并(|)和更新(|=)运算符,这个新特性几乎...

Python3大字典:《Python3自学速查手册.pdf》限时下载中

最近有人会想了,2022了,想学Python晚不晚,学习python有前途吗?IT行业行业薪资高,发展前景好,是很多求职群里严重的香饽饽,而要进入这个高薪行业,也不是那么轻而易举的,拿信工专业的大学生...

python学习——字典(python字典基本操作)

字典Python的字典数据类型是基于hash散列算法实现的,采用键值对(key:value)的形式,根据key的值计算value的地址,具有非常快的查取和插入速度。但它是无序的,包含的元素个数不限,值...

324页清华教授撰写【Python 3 菜鸟查询手册】火了,小白入门字典

如何入门学习python...

Python3.9中的字典合并和更新,了解一下

全文共2837字,预计学习时长9分钟Python3.9正在积极开发,并计划于今年10月发布。2月26日,开发团队发布了alpha4版本。该版本引入了新的合并(|)和更新(|=)运算符,这个新特性几乎...

python3基础之字典(python中字典的基本操作)

字典和列表一样,也是python内置的一种数据结构。字典的结构如下图:列表用中括号[]把元素包起来,而字典是用大括号{}把元素包起来,只不过字典的每一个元素都包含键和值两部分。键和值是一一对应的...

取消回复欢迎 发表评论:

请填写验证码