Python multiprocessing用法

fork 方式创建子进程
python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。
Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。
Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

import os
print "Process %s start ..." %(os.getpid())
pid = os.fork()
if pid == 0:
    print "This is child process and my pid is %d, my father process is %d" %(os.getpid(), os.getppid())
else:
    print "This is Fater process, And Its child pid is %d" %(pid)

来看一下执行结果:

[root@server120 tmp]# python thread.py 
Process 3279 start ...
This is Fater process, And Its child pid is 3280
This is child process and my pid is 3280, my father process is 3279

从结果可以看到, 从pid = os.fork() 开始, 下面的部分代码运行了两次, 第一次是父进程运行, 第二次是子进程运行, 且子进程的fork的结果总是0, 所以这个也可以用来作为区分父进程或是子进程标志。

print "Process %s start ..." %(os.getpid())
pid = os.fork()
source = 10
if pid == 0:
    print "This is child process and my pid is %d, my father process is %d" %(os.getpid(), os.getppid())
    source = source - 6
    print "child process source value is "+str(source)
else:
    print "This is Fater process, And Its child pid is %d" %(pid)
    source = source - 1
    print "father process source value is "+str(source)
print "source value is "+str(source)

运行结果如下:

[root@server120 tmp]# python thread.py 
Process 3294 start ...
This is Fater process, And Its child pid is 3295
father process source value is 9
source value is 9
This is child process and my pid is 3295, my father process is 3294
child process source value is 4
source value is 4

很明显, 初始值为10的source 在父进程中值 减少了 1, 为9, 而子进程明显source的初始值 是10, 也就是说多进程之间并没有什么相互影响。

multiprocessing 方式创建子进程
fork 方式是仅在linux 下才有的接口, 在windows下并没有, 那么在windows下如何实现多进程呢, 这就用到了multiprocessing
multiprocessing 模块的Process 对象表示的是一个进程对象, 可以创建子进程并执行制定的函数
运行下面的代码:

from multiprocessing import Process
import os

def pro_do(name, func):
    print "This is child process %d from parent process %d, and name is  %s which is used for %s" %(os.getpid(), os.getppid(), name, func)

if __name__ == "__main__":
    print "Parent process id %d" %(os.getpid())
    #process 对象指定子进程将要执行的操作方法(pro_do), 以及该函数的对象列表args(必须是tuple格式, 且元素与pro_do的参数一一对应)
    pro = Process(target=pro_do, args=("test", "dev"))
    print "start child process"
    #启动子进程
    pro.start()
    #是否阻塞方式执行, 如果有, 则阻塞方式, 否则非阻塞
    pro.join() #if has this, it's synchronous operation or asynchronous operation
    print "Process end"

执行结果:

[root@server120 tmp]# python thread.py 
Parent process id 3308
start child process
This is child process 3309 from parent process 3308, and name is test which is used for dev
Process end

Pool 进程池
如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

from multiprocessing import Pool
import os, time
def pro_do(process_num):
    print "child process id is %d" %(os.getpid())
    time.sleep(6 - process_num)
    print "this is process %d" %(process_num)
if __name__ == "__main__":
    print "Current process is %d" %(os.getpid())
    p = Pool()
    for i in range(5):
        p.apply_async(pro_do, (i,))  #增加新的进程
    p.close() # 禁止在增加新的进程
    p.join()
    print "pool process done"

执行结果如下:

Current process is 92212
child process id is 92213
child process id is 92214
this is process 1
child process id is 92214
this is process 0
child process id is 92213
this is process 2
child process id is 92214
this is process 3
this is process 4
pool process done

可以看到
child process id is 92213
child process id is 92214
是先输出的,后面的依次在等待了sleep的时间后输出 , 之所以立即输出了上面两个是因为Pool 进程池默认是按照cpu的数量开启子进程的, 我是在虚拟机中运行, 只分配了两核, 所以先立即启动两个子进程, 剩下的进程要等到前面的进程执行完成后才能启动。
不过也可以在p=Poo() 中使用Pool(5)来指定启动的子进程数量, 这样输出就是下面的了:

[root@vincent tmp]# python xx.py 
Current process is 92259
child process id is 92261
child process id is 92262
child process id is 92263
child process id is 92260
child process id is 92264
this is process 4
this is process 3
this is process 2
this is process 1
this is process 0
pool process done

进程间的通信
1)Queue
运行下面的程序:

from multiprocessing import Process, Queue
import os, time

def write_queue(q):
    for name in ["Yi_Zhi_Yu", "Tony" ,"San"]:
        print "put name %s to queue" %(name)
        q.put(name)
        time.sleep(2)
    print "write data finished"

def read_queue(q):
    print "begin to read data"
    while True:
        name = q.get()
        print "get name %s from queue" %(name)

if __name__ == "__main__":
    q = Queue()
    pw = Process(target=write_queue, args=(q,))
    pr = Process(target=read_queue,args=(q,))

    pw.start()
    pr.start()
    pw.join() #这个表示是否阻塞方式启动进程, 如果要立即读取的话, 两个进程的启动就应该是非阻塞式的, 所以pw在start后不能立即使用pw.join(), 要等pr start后方可
    pr.terminate() #服务进程,强制停止

运行结果如下:

[root@vincent tmp]# python xx.py 
begin to read data
put name Yi_Zhi_Yu to queue
get name Yi_Zhi_Yu from queue
put name Tony to queue
get name Tony from queue
put name San to queue
get name San from queue
write data finished

2)Pipe管道
运行下面的程序:

#!/usr/bin/env python
#encoding=utf-8

from multiprocessing import Process,Pipe
import os,time,sys

def send_pipe(p):
    names = ["Yi_Zhi_Yu", "Tony", "San"]
    for name in names:
        print "put name %s to Pipe" %(name)
        p.send(name)
        time.sleep(1)
def recv_pipe(p):
    print "Try to read data in pipe"
    while True:
            name = p.recv()
            print "get name %s from pipe" %(name)

if __name__ == "__main__":
   #pipe, one for send, one for read
   ps_pipe, pr_pipe = Pipe()
   #process
   ps = Process(target=send_pipe, args=(ps_pipe,))
   pr = Process(target=recv_pipe, args=(pr_pipe,))
   pr.start()
   ps.start()
   ps.join()
   pr.terminate()

结果如下:

[root@vincent tmp]# python xx.py 
put name Yi_Zhi_Yu to Pipe
Try to read data in pipe
get name Yi_Zhi_Yu from pipe
put name Tony to Pipe
get name Tony from pipe
put name San to Pipe
get name San from pipe