3.5 并发编程之multiprocessing

之前我们介绍了 Python3 下的 _thread 多线程库的使用。并说明了一点:python中的多线程其实并不是真正的多线程,并不能做到充分利用多核CPU资源。如果想要充分利用,在python中大部分情况需要使用多进程,那么这个包就叫做 multiprocessing。

借助它,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

1. Process

这里我首先定义5个worker函数(其中一个函数有明显错误),以便下文使用:

import time

def worker_0(interval):
    print("错误的工作")
    time.sleep(interval)
    0/0
    print("end工作失误")
def worker_1(interval):
    print("工作1")
    time.sleep(interval)
    print("end工作1")
def worker_2(interval):
    print("工作212")
    time.sleep(interval)
    print("end工作212")
def worker_3(interval):
    print("工作three")
    time.sleep(interval)
    print("end工作three")
def worker_4(interval):
    print("工作four")
    time.sleep(interval)
    print("end工作four")

基本使用

在multiprocessing中,每一个进程都用一个Process类来表示。首先看下它的API:

Process([group [, target [, name [, args [, kwargs]]]]])

  • target表示调用对象,可以传入方法的名字
  • args表示被调用对象的位置参数元组,比如target是函数a,他有两个参数m,n,那么args就传入(m, n)即可
  • kwargs表示调用对象的字典
  • name是别名,相当于给这个进程取一个名字
  • group分组,实际上不使用

Process类的相关方法有:

  • is_alive():判断进程是否存活
  • join([timeout]):子进程结束再执行下一步,让父进程等待子进程结束,可以通过设置 timeout 处理进程阻塞
  • run():如果在创建Process对象的时候不指定target,那么就会默认执行Process的run方法
  • start():启动进程,区分run()
  • terminate():终止进程,关于终止进程没有这么简单,貌似用psutil包会更好

Process类的相关属性:

  • authkey: process设置过程的授权密钥
  • daemon:父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置
  • exitcode:进程在运行时为None、如果为–N,表示被信号N结束
  • name:进程的名字,自定义
  • pid:每个进程有唯一的PID编号。

Process(),start(),join()

from multiprocessing import Process
import time

if __name__ == '__main__':
    a=time.time()
    p1=Process(target=worker_1,args=(4,))
    p2 = Process(target=worker_2, args=(6,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    b=time.time()
    print('finish',b-a)

这里一共开了两个进程,p1和p2,arg=(4,)中的4是fun1函数的参数,这里要用tulpe类型,如果两个参数或更多就是arg=(参数1,参数2...),之后用start()启动进程,我们设置等待p1和p2进程结束再执行下一步.来看下面的运行结果,2个函数基本在同一时间开始运行,当运行完毕(fun1睡眠4秒,同时fun2睡眠6秒),才执行print语句。

运行结果如下:

工作1
工作212
end工作1
end工作212
finish 6.02905011177063

现在再来看下start()与join()处于不同位置会发生什么:

from multiprocessing import Process
import time

if __name__ == '__main__':
    a=time.time()
    p1=Process(target=worker_1,args=(4,))
    p2 = Process(target=worker_2, args=(6,))
    p1.start()
    p1.join()
    p2.start()
    p2.join()
    b=time.time()
    print('finish',b-a)

运行结果:

工作1
end工作1
工作212
end工作212
finish 10.03864574432373

现在是先运行 worker_1 函数,运行完毕再运行 worker_2 接着再是 print 'finish',即先运行进程p1再运行进程p2,在每个子进程都调用了join()方法,这样父进程(主进程)就会等待子进程执行完毕。感受到 join() 的魅力了吧。

name,daemon,is_alive()

from multiprocessing import Process
import time

if __name__ == '__main__':
    a=time.time()
    p1=Process(target=worker_1,args=(4,))
    p2 = Process(target=worker_2, args=(6,))
    p1.daemon=True
    p2.daemon = True
    p1.start()
    p2.start()
    p1.join()
    print('进程1:',p1,'\n','进程2:',p2)
    print('进程1:',p1.is_alive(),'\n','进程2:',p2.is_alive())
    b=time.time()
    print('finish',b-a)

运行结果:

工作1
工作212
end工作1
进程1: <Process(Process-78, stopped daemon)>
 进程2: <Process(Process-79, started daemon)>
进程1: False
 进程2: True
finish 4.023045301437378
end工作212

可以看到,name是给进程赋予名字, 运行到 print('进程1:',p1.is_alive(),'\n','进程2:',p2.is_alive()) 这句的时候,p1进程已经结束(返回False),p2进程仍然在运行(返回True),但p2没有用join(),所以直接接着执行主进程,由于用了daemon=Ture,父进程终止后自动终止,p2进程没有结束就强行结束整个程序了。

run()

run()在Process没有指定target函数时,默认用run()函数运行程序。实例如下:

from multiprocessing import Process
import time

def fun1():
    print('this is fun1',time.ctime())
    time.sleep(2)
    print('fun1 finish',time.ctime())

if __name__ == '__main__':
    a=time.time()
    p=Process()
    p.run=fun1
    p.start()
    p.join()
    b=time.time()
    print('finish',b-a)

运行结果:

this is fun1 Mon Oct  9 17:30:53 2017
fun1 finish Mon Oct  9 17:30:55 2017
finish 2.0163445472717285

2. Lock

当线程的输出操作同时进行时,可能会出现资源不足,导致出现输出错位的情况。这时候我们需要相关输出'互斥',即在某一时间,只能一个进程输出,其他进程等待。等刚才那个进程输出完毕之后,另一个进程再进行输出。

我们可以通过 Lock 来实现,在一个进程输出时,加锁,其他进程等待。等此进程执行结束后,释放锁,其他进程可以进行输出。下面就用一个实例来感受一下:

from multiprocessing import Process, Lock
import time

class MyProcess(Process):
    def __init__(self, loop, lock):
        Process.__init__(self)
        self.loop = loop
        self.lock = lock

    def run(self):
        for count in range(self.loop):
            time.sleep(0.1)
            self.lock.acquire()
            print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
            self.lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(1, 5):
        p = MyProcess(i, lock)
        p.start()

我们在print方法的前后分别添加了获得锁和释放锁的操作。这样就能保证在同一时间只有一个print操作。所以在访问临界资源时,使用Lock就可以避免进程同时占用资源而导致的一些问题。

3. Queue

Process 之间有时需要通信,可以使用 multiprocessing 的 Queue 实现多进程之间的数据传递,Queue 本身是一个消息列队程序,与Queue.Queue 不同,Queue.Queue 是进程内非阻塞队列,multiprocess.Queue 是跨进程通信队列。多进程前者是各自私有,后者是各子进程共有。

我们以 multiprocess.Queue 为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print('Get %s from queue.' % value)
            time.sleep(random.random())
        else:
            break

if __name__=='__main__':
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    pw.start()    
    pw.join()
    pr.start()
    pr.join()
    print('所有数据都写入并且读完')

结果如下:

Put A to queue...
Put B to queue...
Put C to queue...
Get A from queue.
Get B from queue.
Get C from queue.
所有数据都写入并且读完

另外,另外队列中常用的方法有:

  • Queue.qsize() 返回队列的大小
  • Queue.empty() 如果队列为空,返回True, 反之False
  • Queue.full() 如果队列满了,返回True,反之False
  • Queue.get([block[, timeout]]) 获取队列,timeout等待时间
  • Queue.get_nowait() 相当Queue.get(False)
  • Queue.put(item) 阻塞式写入队列,timeout等待时间
  • Queue.put_nowait(item) 相当Queue.put(item, False)

4. Pipe

管道,顾名思义,一端发一端收。之前我们介绍过,由于多进程缺乏同步机制,因此不得不采取消息传递机制,在进程间传递复制的数据。在这里,Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。

一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。下面用一个实例来感受一下:

from multiprocessing import Process,Pipe

class Consumer(Process):

    def __init__(self,pipe):
        Process.__init__(self)
        self.pipe = pipe
    def run(self):
        self.pipe.send('Consumer Words')
        print('Consumer Received:',self.pipe.recv())

class Producer(Process):
    def __init__(self,pipe):
        Process.__init__(self)
        self.pipe = pipe
    def run(self):
        print('Producer Received:',self.pipe.recv())
        self.pipe.send('Producer Words')

pipe = Pipe()
p = Producer(pipe[0])
c = Consumer(pipe[1])
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print('Ended!')

在这里声明了一个默认为双向的管道,然后将管道的两端分别传给两个进程。两个进程互相收发。观察一下结果:

Producer Received: Consumer Words
Consumer Received: Producer Words
Ended!

以上就是对管道的简单介绍。

5. Pool

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

在这里需要了解阻塞和非阻塞的概念。阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态。阻塞即要等到回调结果出来,在有结果之前,当前进程会被挂起。

Pool的用法有阻塞和非阻塞两种方式。非阻塞即为添加进程后,不一定非要等到改进程执行完就添加其他进程运行,阻塞则相反。

from multiprocessing import Process, Pool

function_list = [worker_1,worker_2,worker_3,worker_4,worker_0]
a = time.time()
pool = multiprocessing.Pool()
for func in function_list:
    pool.apply_async(func,(1,))
pool.close()
pool.join()
b = time.time()
print('finsh:', b-a)

在这里利用了apply_async方法,即非阻塞,可以发现在这里添加三个进程进去后,立马就开始执行,不用非要等到某个进程结束后再添加新的进程进去。

结果如下:

工作212
工作four
工作three
工作1
end工作four
错误的工作
end工作212
end工作three
end工作1
finsh: 2.141101837158203

如果要使用阻塞,在这里只需要把apply_async改成apply即可。

下面对相关函数进行解释:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞。
  • apply(func[, args[, kwds]])是阻塞的。
  • close() 关闭pool,使其不在接受新的任务。
  • terminate() 结束工作进程,不在处理未完成的任务。
  • join() 主进程阻塞,等待子进程的退出,join方法要在close或terminate后使用。

当然每个进程可以在各自的方法返回一个结果。apply或apply_async方法可以拿到这个结果并进一步进行处理。

如果现在有一堆数据要处理,每一项都需要经过一个方法来处理,那么map非常适合。

比如现在你有一个数组,包含了所有的URL,而现在已经有了一个方法用来抓取每个URL内容并解析,那么可以直接在map的第一个参数传入方法名,第二个参数传入URL数组。现在我们用一个实例来感受一下:

from multiprocessing import Process, Pool
import requests
from requests.exceptions import ConnectionError

def scrape(url):
    try:
        print(requests.get(url))
    except ConnectionError:
        print('Error Occured',url)
    finally:
        print('URL',url,'Scraped')
pool = Pool()
urls = [
    'https://www.baidu.com',
    'http://www.meituan.com/',
    'http://blog.csdn.net/',
    'http://xxxyxxx.net'
]
pool.map(scrape,urls)

在这里初始化一个Pool,指定进程数为3,如果不指定,那么会自动根据CPU内核来分配进程数。然后有一个链接列表,map函数可以遍历每个URL,然后对其分别执行scrape方法。

运行结果如下:

<Response [200]>
URL https://www.baidu.com Scraped
Error Occured http://xxxyxxx.net
URL http://xxxyxxx.net Scraped
<Response [200]>
URL http://blog.csdn.net/ Scraped
<Response [200]>
URL http://www.meituan.com/ Scraped
Out[10]:
[None, None, None, None]

以上便是 multiprocessing 模块的简单介绍。

results matching ""

    No results matching ""