巴黎人网址导航

Python 并行与并发详解

巴黎人登录

在Python中有很多关于并发/并行的实现(库),所以我将做一个总结以供将来参考。事实上,在查看文档时,很难说Python的最佳方式是查看官方文档。除了一些例子,它几乎是完美的,所以本文只列出常用的方法和属性。

基本概念

平行

Python学习交流组:1004391443

并行性是同时执行的意义,因此单个线程永远不会达到并行状态,使用多个线程和多个进程。但是由于众所周知的GIL,Python的多线程不允许两个线程“同时运行”,因此实际上不可能达到并行状态。就像两个人同时吃两个包子一样,最后两个包子同时被吃掉了。

并发

并发性作为一个整体,多个任务同时执行,但一次只执行一个任务。就像一个人同时吃两个馒头,一次吃一口,最后两个馒头同时吃。

多进程

过程:程序是指令,数据及其组织形式的描述。流程是程序的实体。子进程:子进程是指由另一进程(对应于父进程)创建的进程。多进程:实现并行的手段,多个进程的数量取决于CPU核心的数量。

多线程

线程:轻量级进程。线程是进程中的实体(进程至少有一个线程),它是系统独立调度和分派的基本单元(即操作系统可以执行操作调度的最小单元)。多线程:实现并发的手段,多线程的数量是根据实际需要确定的,但也有一个限制。

在等待处理器时,线程可以逻辑运行;运行状态意味着线程在运行时占用处理器;阻塞状态意味着线程正在等待事件(例如信号量)并且在逻辑上是不可执行的。每个程序至少有一个线程。如果程序只有一个线程,那就是程序本身。

实施方法

多进程

Unix/Linux操作系统提供了fork()系统调用,这是非常特殊的。普通函数,调用一次,返回一次,但fork()被调用一次,两次,因为操作系统自动复制当前进程(父进程)(子进程),然后在父进程和子进程分别返回。子进程始终返回0,父进程返回子进程的ID。原因是父进程可以分叉很多子进程,因此父进程必须记住每个子进程的ID,而子进程只需要调用getpid()来获取父进程的ID。

在Python中,os模块封装了常见的系统调用,包括fork,这使得在Python程序中创建子进程变?萌菀祝?

导入os

打印('Process(%s)start .'%os.getpid())

#仅适用于Unix/Linux/Mac:

Pid=os.fork()

如果pid==0:

打印('我是子进程(%s),我的父进程是%s。'%(os.getpid(),os.getppid()))

否则为:

打印('我(%s)刚刚创建了一个子进程(%s)。'%(os.getpid(),pid))

结果:

过程(876)开始.

我(876)刚创建了一个子进程(877)。

我是孩子的过程(877),我的父母是876。

使用fork,进程可以在收到新任务时复制子进程以处理新任务。常见的Apache服务器是父进程侦听端口。每当有新的http请求时,它就会分叉子进程。处理新的http请求。

有时,子进程不是代码本身,而是外部进程。 Python标准库中的子进程库可以派生子进程来运行外部程序,以及管道进程的输入和输出。

子进程中定义了多个函数,根据需要以不同的方式创建子进程(call,check_call,check_output,Popen等)。此外,子进程提供了用于接管标准流和管道以使用进程之间的文本通信的工具。

以下示例演示如何在Python代码中运行命令nslookup,其效果与直接从命令行运行相同:

导入子流程

打印('$ nslookup')

r=subprocess.call(['nslookup',''])

打印('退出代码:',r)

结果

$ nslookup

服务器: 192.168.19.4

地址: 192.168.19.4#53

非权威性答案:

Canonical name=python.map.fastly.net。

名称: python.map.fastly.net

地址: 199.27.79.223

退出代码: 0

如果你想编写一个多进程服务程序,Unix/Linux肯定是正确的选择。由于Windows没有fork调用,因此上述代码在Windows上不起作用。但Python是跨平台的,自然应该提供跨平台的多进程支持。然后出现了多处理。

多处理模块是跨平台的多进程模块。在Unix/Linux下,多处理模块封装了fork(),因此我们不需要注意fork()的细节。由于Windows没有fork调用,因此多处理需要“模拟”fork效果。父进程中的所有Python对象必须由pickle序列化并传递给子进程。

因此,如果多处理无法在Windows下调用,请首先考虑pickle是否失败。

多处理模块的常见用法:

多处理模块提供Process类来表示进程对象:

为流程创建一个类

过程([group [target [name [args [kwargs]]])))

说明:

此类实例化的对象表示子进程(尚未启动)中的任务,称为p

注意:

Args:指定传递给目标函数的位置参数,它是元组的形式。

参数:

Args=('arg1',)

Kwargs={'name':'hexin','age': 18}

方法:

P.start():启动进程并在子进程中调用p.run()p.run():进程启动时运行的方法,它是调用target指定的函数,如果你继承了要编写的进程类对于多进程,此方法必须在自定义类中实现。 P.terminate():强制终止进程。 p不执行任何清理操作。如果p创建子进程,子进程将变为僵尸进程。这种方法需要特别小心。如果p也保存了一个锁,它将不会被释放,从而导致死锁。 P.is_alive():如果p仍在运行,则返回truep.join([timeout]):主线程等待p终止(即主线程处于相等状态,p处于运行状态) 。超时是可选的超时期限(超过此时间,父线程不再等待子线程继续执行)。请注意,p.join()只能加入由start启动的进程,但不能加入由run启动的进程。

属性:

P.daemon

P.name

P.pid

例如:

注意:在窗口中使用Process()必须放在if __name__=='__ main__':

下面

以下示例演示了如何启动子进程并等待它结束:

从多处理导入过程

导入os

#child进程执行代码

Def run_proc(name):

打印('运行子进程%s(%s).'%(name,os.getpid()))

打印('父进程%s。'%os.getpid())

p=Process(target=run_proc,args=('test',))

打印('子进程将开始。')

P.start()

P.join()

打印('子进程结束。')

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,然后使用start()方法启动它,这使得该进程比fork()更简单。

join()方法可以在继续运行之前等待子进程完成,通常用于进程之间的同步。

当然,您也可以使用继承Process的类来创建子进程:

导入时间

随机导入

从多处理导入过程

类MyProcess(进程):

Def __init __(self,name):

超级().__ INIT __()

Self.name=name

Def run(self):

打印('%s正在运行'%self.name)

Time.sleep(random.randrange(1,5))

打印('%s stop'%self.name)

P1=MyProcess('1')

P2=MyProcess('2')

P3=MyProcess('3')

P4=MyProcess('4')

P1.start()#start将自动调用run

P2.start()

P3.start()

P4.start()

打印('主线程')

结果

1运行

2正在运行

主线

3运行

4运行

1站

4站

2站

3站

池([numprocess [,initializer [,initargs]]])

那么问题来了,开多进程的目的是为了并发,如果有多核,通常有几个核就开几个进程,进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行),但很明显需要并发执行的任务常常远大于核数,这时我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数等。

当进程数目不大时,可以直接利用多处理中的Process类手动创建多个进程,如果数量很大,就需要使用进程池.Pool类可以提供指定数量的进程,供用户调用,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

参数:

numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值initializer:是每个子进程启动时要执行的可调用对象,默认为Noneinitargs:是要传给initializer的参数组

方法:

XXP.apply(func [args [kwargs]]):对进程池中的所有进程执行func(* args,** kwargs)并返回结果。注意:如果同时执行2个apply,它将按顺序执行,因此该函数是阻塞类型,被阻塞的对象是主进程和每个子进程。此函数不建议在py2.3之后使用apply_async(func [args [kwds [callback [error_callback]]]]:执行func(* args,**进程池中所有进程中的kwargs))然后返回结果,此方法的结果是AsyncResult类的实例(如下所述)。回调是一个可调用的对象,它接收输入参数。当func的结果变为可用时,它会立即传递给回调,这将禁止任何阻塞操作,否则它将接收来自其他异步操作的结果。发生函数执行错误时调用Error_callback。请注意,apply_async未阻止。 P.map(func,iterable [chunksize=None]):Pool类中的map方法与内置map函数基本相同,内置map函数阻止进程直到返回结果。请注意,虽然第二个参数是迭代器,但它只在整个队列准备好后运行子进程。注意:此时func必须接受iterable中每个元素的参数。此功能正在阻塞,被阻止的对象是主要进程。Map_async(func,iterable [chunksize [callback [error_callback]]]):map_async与map和apply_async具有相同的关系,即此函数没有阻塞。 map_async和apply_async都是异步的,因此您需要有一个回调函数,该函数由callback参数指定。当然,error_callback与p.close()相同:关闭进程池以防止进一步操作。 P.terminate():结束子进程,不再处理未处理的任务。 P.join():等待所有子进程完成。注意:此方法只能在close()或teminate()之后调用,这会导致进程池不再接受新任务。

其他方法:

apply_async()和map_async()的返回值是AsyncResul,此实例具有以下方法

Obj.get([timeout]):返回结果,等待结果在必要时到达。 timeout参数是可选的。如果它未在指定时间内到达,则抛出异常。如果子进程中发生异常,则在调用此方法时将再次抛出异常。 (专注于获取)obj.wait([timeout]):等待返回的结果到达,timeout参数是可选的。如果您未在指定时间内等待,请直接执行后续代码。 (关注等等)obj.ready():如果调用完成,则返回Trueobj.successful():如果调用完成而不抛出异常则返回True,如果在结果准备好之前调用该方法则抛出异常(ValueError):未就绪)

例如:

从多处理导入池

导入os,时间,随机

Def long_time_task(name):

打印('运行任务%s(%s).'%(name,os.getpid()))

Start=time.time()

Time.sleep(random.random()* 3)

结束=time.time()

打印('任务%s运行%0.2f秒。'%(名称,(结束 - 开始)))

打印('父进程%s。'%os.getpid())

p=Pool(4)

对于i在范围(5):

P.apply_async(long_time_task,args=(i,))

打印('等待所有子进程完成.')

P.close()

P.join()

打印('完成所有子流程。')

结果

父流程669。

等待所有子流程完成.

运行任务0(671).

运行任务1(672).

运行任务2(673).

运行任务3(674).

任务2运行0.14秒。

运行任务4(673).

任务1运行0.27秒。

任务3运行0.86秒。

任务0运行1.41秒。

任务4运行1.91秒。

所有子流程都已完成。

请注意,输出结果,任务0,1,2,3立即执行,任务4在执行前等待上一个任务完成。这是因为我的计算机上池的默认大小为4,因此最多的Execute 4进程同时进行。这是池的有意设计的限制,并不是操作系统的限制。如果更改为:p=Pool(5),则可以同时运行5个进程(但不是并行,而是并发)。由于池的默认大小是CPU核心数,如果您遗憾地拥有8核CPU,则必须提交至少9个子进程才能看到等待效果(转义)。

另一个例子:

提交任务并在主进程中获取结果(前一个进程是执行任务,结果放在队列中,现在可以直接在主进程中获得结果)

从多处理导入池

导入时间

Def work(n):

打印('开始工作.')

Time.sleep(3)

返回n ** 2

q=Pool()

#Asynchronous apply_async用法:如果使用异步提交的任务,主进程需要使用join,等待进程池中的任务进行处理,然后使用get来收集结果。否则,主进程结束,进程池可能没有时间执行,它将跟随。结束了

Res=q.apply_async(work,args=(2,))

Q.close()

Q.join()#join在关闭后调用

打印(res.get())

#同步apply用法:主进程等待应用程序提交的应用程序完成后再继续执行后续代码

#res=q.apply(work,args=(2,))

#print(res)

结果

开始工作.

4

在Pool对象上调用join()方法将等待所有子进程完成执行。您必须在调用join()之前调用close()。调用close()后,您无法继续添加新进程。