Python之多进程
创始人
2024-03-05 00:54:19
0

python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

1. Process

创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典。name为别名。group实质上不使用。
方法:is_alive() 、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。

is_alive():判断该进程是否还活着

join([timeout]):主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

run():进程p调用start()时,自动调用run()

属性:authkey、daemon(要通过start()设置)、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

例1.1:创建函数并将其作为单个进程

import multiprocessing
import timedef worker(interval):n = 5while n > 0:print("The time is {0}".format(time.ctime()))  #输出时间的格式time.sleep(interval)n -= 1if __name__ == "__main__":p = multiprocessing.Process(target = worker, args = (3,))p.start()print "p.pid:", p.pidprint "p.name:", p.nameprint "p.is_alive:", p.is_alive()

结果

1

2

3

4

5

6

7

8

p.pid: 8736

p.name: Process-1

p.is_alive: True

The time is Tue Apr 21 20:55:12 2015

The time is Tue Apr 21 20:55:15 2015

The time is Tue Apr 21 20:55:18 2015

The time is Tue Apr 21 20:55:21 2015

The time is Tue Apr 21 20:55:24 2015

例1.2:创建函数并将其作为多个进程

import multiprocessing
import timedef worker_1(interval):print "worker_1"time.sleep(interval)print "end worker_1"def worker_2(interval):print "worker_2"time.sleep(interval)print "end worker_2"def worker_3(interval):print "worker_3"time.sleep(interval)print "end worker_3"if __name__ == "__main__":p1 = Process(target=worker_1, args=(6,))p2 = Process(target=worker_2, args=(4,))p3 = Process(target=worker_3, args=(2,))p1.start() p2.start() p3.start()print("The number of CPU is:" + str(cpu_count()))for p in active_children():print("child p.name:=%s" % p.name + "\tp.id=%s" % str(p.pid))print(p1.pid)print("END-----")

结果

1

2

3

4

5

6

7

8

9

10

11

The number of CPU is:4
child p.name:=Process-2 p.id=3864
child p.name:=Process-3 p.id=3256
child p.name:=Process-1 p.id=7336
7336
END-----
worker_1
worker_2
worker_3
end worker_3
end worker_2
end worker_1

例1.3:将进程定义为类

import multiprocessing
import timeclass ClockProcess(multiprocessing.Process):def __init__(self, interval):multiprocessing.Process.__init__(self)self.interval = intervaldef run(self):n = 5while n > 0:print("the time is {0}".format(time.ctime()))time.sleep(self.interval)n -= 1if __name__ == '__main__':p = ClockProcess(3)p.start()   

:进程p调用start()时,自动调用run()

结果

1

2

3

4

5

the time is Tue Apr 21 20:31:30 2015

the time is Tue Apr 21 20:31:33 2015

the time is Tue Apr 21 20:31:36 2015

the time is Tue Apr 21 20:31:39 2015

the time is Tue Apr 21 20:31:42 2015

例1.4:daemon程序对比结果

#1.4-1 不加daemon属性

import multiprocessing
import timedef worker(interval):print("work start:{0}".format(time.ctime()));time.sleep(interval)print("work end:{0}".format(time.ctime()));if __name__ == "__main__":p = multiprocessing.Process(target = worker, args = (3,))p.start()print "end!"

结果

1

2

3

end!

work start:Tue Apr 21 21:29:10 2015

work end:Tue Apr 21 21:29:13 2015

#1.4-2 加上daemon属性

import multiprocessing
import timedef worker(interval):print("work start:{0}".format(time.ctime()));time.sleep(interval)print("work end:{0}".format(time.ctime()));if __name__ == "__main__":p = multiprocessing.Process(target = worker, args = (3,))p.daemon = Truep.start()print "end!"

结果

1

end!

:因子进程设置了daemon属性,主进程结束,它们就随着结束了。

  在多线程模型中,默认情况下(sub-Thread.daemon=False)主线程会等待子线程退出后再退出,而如果sub- Thread.setDaemon(True)时,主线程不会等待子线程,直接退出,而此时子线程会随着主线程的对出而退出,避免这种情况,主线程中需要 对子线程进行join,等待子线程执行完毕后再退出。对应的,在多进程模型中,Process类也有daemon属性,而它表示的含义与 Thread.daemon类似,当设置sub-Process.daemon=True时,主进程中需要对子进程进行等待,否则子进程会随着主进程的退 出而退出

#1.4-3 设置daemon执行完结束的方法

import multiprocessing
import timedef worker(interval):print("work start:{0}".format(time.ctime()));time.sleep(interval)print("work end:{0}".format(time.ctime()));if __name__ == "__main__":p = multiprocessing.Process(target = worker, args = (3,))p.daemon = Truep.start()p.join()print "end!"

结果

1

2

3

work start:Tue Apr 21 22:16:32 2015

work end:Tue Apr 21 22:16:35 2015

end!

2. Lock

当多个进程需要访问共享资源的时候,Lock可以用来避免访问的冲突。

import multiprocessing
import sysdef worker_with(lock, f):with lock:fs = open(f, 'a+')n = 10while n > 1:fs.write("Lockd acquired via with\n")n -= 1fs.close()def worker_no_with(lock, f):lock.acquire()try:fs = open(f, 'a+')n = 10while n > 1:fs.write("Lock acquired directly\n")n -= 1fs.close()finally:lock.release()if __name__ == "__main__":lock = multiprocessing.Lock()f = "file.txt"w = multiprocessing.Process(target = worker_with, args=(lock, f))nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))w.start()nw.start()print "end"

结果(输出文件)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lockd acquired via with

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

Lock acquired directly

3. Semaphore

Semaphore用来控制对共享资源的访问数量,例如池的最大连接数。

import multiprocessing
import timedef worker(s, i):s.acquire()print(multiprocessing.current_process().name + "acquire");time.sleep(i)print(multiprocessing.current_process().name + "release\n");s.release()if __name__ == "__main__":s = multiprocessing.Semaphore(2)for i in range(5):p = multiprocessing.Process(target = worker, args=(s, i*2))p.start()

结果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

Process-1acquire

Process-1release

Process-2acquire

Process-3acquire

Process-2release

Process-5acquire

Process-3release

Process-4acquire

Process-5release

Process-4release

例子2:

import multiprocessing
import timedef worker(s, ):s.acquire()print(multiprocessing.current_process().name + "acquire")time.sleep(1)# print(multiprocessing.current_process().name + "release\n")s.release()if __name__ == "__main__":s = multiprocessing.Semaphore(2)for i in range(5):p = multiprocessing.Process(target = worker, args=(s, ))# time.sleep(0.01)p.start()
#####结果######
Process-4acquire
Process-3acquireProcess-1acquire
Process-2acquireProcess-5acquire

4. Event

Event用来实现进程间同步通信。

import multiprocessing
import timedef wait_for_event(e):print("wait_for_event: starting")e.wait() #一直阻塞的去等待set值print('*****')print("wairt_for_event: e.is_set()->" + str(e.is_set()))def wait_for_event_timeout(e, t):print("wait_for_event_timeout:starting")e.wait(2)  #等2s去取set值print('------')print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))if __name__ == "__main__":e = multiprocessing.Event()# e.set()w1 = multiprocessing.Process(name="block", target=wait_for_event, args=(e,))w2 = multiprocessing.Process(name="non-block", target=wait_for_event_timeout, args=(e, 2))w1.start()w2.start()time.sleep(10)e.set()        # 设置set的值print("main: event is set")

结果

1

2

3

4

5

wait_for_event: starting
wait_for_event_timeout:starting
------
wait_for_event_timeout:e.is_set->False
main: event is set
*****
wairt_for_event: e.is_set()->True

5. Queue

Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。Queue的一段示例代码:

import multiprocessingdef writer_proc(q):      try:         q.put(1, block = False) except:         pass   def reader_proc(q):      try:         print q.get(block = False) except:         passif __name__ == "__main__":q = multiprocessing.Queue()writer = multiprocessing.Process(target=writer_proc, args=(q,))  writer.start()   reader = multiprocessing.Process(target=reader_proc, args=(q,))  reader.start()  #reader.join()   这样会一直阻塞#writer.join()

结果

1

1

6. Pipe

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

import multiprocessing
import timedef proc1(pipe):# while True:for i in range(3):print("send: %s" %(i))pipe.send(i)time.sleep(1)def proc2(pipe):while True:print ("proc2 rev:", pipe.recv())time.sleep(1)def proc3(pipe):while True:print("PROC3 rev:", pipe.recv())time.sleep(1)if __name__ == "__main__":pipe = multiprocessing.Pipe()p1 = multiprocessing.Process(target=proc1, args=(pipe[0],))p2 = multiprocessing.Process(target=proc2, args=(pipe[1],))#p3 = multiprocessing.Process(target=proc3, args=(pipe[1],))p1.start()p2.start()# p3.start()# p1.join()# p2.join()# p3.join()
#######结果########
send: 0
proc2 rev: 0
send: 1
proc2 rev: 1
send: 2
proc2 rev: 2

结果

7. Pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

 例子:

import multiprocessing
import timedef func(msg, a):# if a == 1:#     time.sleep(8)#     print(1)print("msg:", msg)print("++++")time.sleep(3)# print("end")if __name__ == "__main__":pool = multiprocessing.Pool(processes=3)for i in range(7):msg = "hello %d" % (i)a = i# 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去pool.apply_async(func, (msg, a, ))pool.close()# 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束pool.join()print("Sub-process(es) done.")

例7.1:使用进程池(非阻塞)

#coding: utf-8
import multiprocessing
import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"if __name__ == "__main__":pool = multiprocessing.Pool(processes = 3)for i in xrange(4):msg = "hello %d" %(i)pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"pool.close()pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束print "Sub-process(es) done."

一次执行结果

1

2

3

4

5

6

7

8

9

10

mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0

msg: hello 1

msg: hello 2

end

msg: hello 3

end

end

end

Sub-process(es) done.

函数解释:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
  • close()    关闭pool,使其不在接受新的任务。
  • terminate()    结束工作进程,不在处理未完成的任务。
  • join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。

执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。

例7.2:使用进程池(阻塞)

#coding: utf-8
import multiprocessing
import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"if __name__ == "__main__":pool = multiprocessing.Pool(processes = 3)for i in xrange(4):msg = "hello %d" %(i)pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"pool.close()pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束print "Sub-process(es) done."

一次执行的结果

1

2

3

4

5

6

7

8

9

10

msg: hello 0

end

msg: hello 1

end

msg: hello 2

end

msg: hello 3

end

Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~

Sub-process(es) done.

  

例7.3:使用进程池,并关注结果

import multiprocessing
import timedef func(msg):print "msg:", msgtime.sleep(3)print "end"return "done" + msgif __name__ == "__main__":pool = multiprocessing.Pool(processes=4)result = []for i in xrange(3):msg = "hello %d" %(i)result.append(pool.apply_async(func, (msg, )))pool.close()pool.join()for res in result:print ":::", res.get()print "Sub-process(es) done."

一次执行结果

1

2

3

4

5

6

7

8

9

10

msg: hello 0

msg: hello 1

msg: hello 2

end

end

end

::: donehello 0

::: donehello 1

::: donehello 2

Sub-process(es) done.

例7.4:使用多个进程池

import multiprocessing
import os, time, randomdef Lee(i):print('1', i)time.sleep(3)print('-----')# print("\nRun task Lee-%s" %(os.getpid())) #os.getpid()获取当前的进程的ID# start = time.time()# time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数# end = time.time()# print('Task Lee, runs %0.2f seconds.' % (end - start))def Marlon(i):print('2', i)time.sleep(3)print('-----')# print("\nRun task Marlon-%s" % (os.getpid()))# start = time.time()# time.sleep(random.random() * 40)# end = time.time()# print('Task Marlon runs %0.2f seconds.' %(end - start))def Allen(i):print('3', i)time.sleep(3)print('-----')# print("\nRun task Allen-%s" %(os.getpid()))# start = time.time()# time.sleep(random.random() * 30)# end = time.time()# print('Task Allen runs %0.2f seconds.' %(end - start))def Frank(i):print('4', i)time.sleep(3)print('-----')# print("\nRun task Frank-%s" %(os.getpid()))# start = time.time()# time.sleep(random.random() * 20)# end = time.time()# print('Task Frank runs %0.2f seconds.' %(end - start))if __name__ == '__main__':function_list = [Lee, Marlon, Allen, Frank]# print("parent process %s" % (os.getpid()))pool = multiprocessing.Pool(4)for func in function_list:# Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中for i in ['a', 'b', 'c','d', 'e', 'f', 'g']:pool.apply_async(func, args=(i,))print('Waiting for all subprocesses done...')pool.close()# 调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束pool.join()print('All subprocesses done.')

 View Code

#coding: utf-8
import multiprocessing
import os, time, randomdef Lee():print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的IDstart = time.time()time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数end = time.time()print 'Task Lee, runs %0.2f seconds.' %(end - start)def Marlon():print "\nRun task Marlon-%s" %(os.getpid())start = time.time()time.sleep(random.random() * 40)end=time.time()print 'Task Marlon runs %0.2f seconds.' %(end - start)def Allen():print "\nRun task Allen-%s" %(os.getpid())start = time.time()time.sleep(random.random() * 30)end = time.time()print 'Task Allen runs %0.2f seconds.' %(end - start)def Frank():print "\nRun task Frank-%s" %(os.getpid())start = time.time()time.sleep(random.random() * 20)end = time.time()print 'Task Frank runs %0.2f seconds.' %(end - start)if __name__=='__main__':function_list=  [Lee, Marlon, Allen, Frank] print "parent process %s" %(os.getpid())pool=multiprocessing.Pool(4)for func in function_list:pool.apply_async(func)     #Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中print 'Waiting for all subprocesses done...'pool.close()pool.join()    #调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束print 'All subprocesses done.'

一次执行结果

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

parent process 7704

Waiting for all subprocesses done...

Run task Lee-6948

Run task Marlon-2896

Run task Allen-7304

Run task Frank-3052

Task Lee, runs 1.59 seconds.

Task Marlon runs 8.48 seconds.

Task Frank runs 15.68 seconds.

Task Allen runs 18.08 seconds.

All subprocesses done.

相关内容

热门资讯

三部门:不断完善学前教育成本分... 人民财讯12月23日电,国家发展改革委、教育部、财政部发布关于完善幼儿园收费政策的通知,其中提出,各...
李强签署国务院令 公布《行政执... 新华社北京12月23日电 国务院总理李强日前签署国务院令,公布《行政执法监督条例》(以下简称《条例》...
“免申即享” 央行发布一次性信... 经济参考报记者 张莫 12月22日,中国人民银行发布一次性信用修复政策有关安排,支持信用受损但积极还...
从民进80年足迹中读懂中国新型... 参会人员翻阅民进会史工作成果书籍。(《中国新闻》报 尹李梅 摄) “民进历史犹如奔流不息的长河,过...
多所高校改革学业评价制度,弱化... 近日,河南大学正式出台2025级本科生成绩与学分管理新规,其改革目标在于鼓励学术探索,缓解过度竞争。...
东华科技:公司财务总监、总法律... 每经AI快讯,东华科技(SZ 002140,收盘价:11.47元)12月23日晚间发布公告称,202...
融资租赁合同纠纷调解结案 宁新... 12月23日,宁新新材(920719)发布公告,因与远东国际融资租赁有限公司的融资租赁合同纠纷,公司...
湖南白银:金融借款纠纷案终审胜... 12月23日,湖南白银(002716)发布公告,近日公司银行账户部分资金解除冻结。此前,因与曹永德、...
三部门:完善幼儿园收费政策 国家发展改革委 教育部 财政部关于 完善幼儿园收费政策的通知 发改价格〔2025〕1644号 各省、...
多政策护航 交通领域消费场景不... 今天(12月23日),国务院新闻办公室举行新闻发布会,交通运输部副部长李扬表示,正在推进新的交通运输...