本文共 18821 字,大约阅读时间需要 62 分钟。
本节书摘来自华章计算机《Python爬虫开发与项目实战》一书中的第1章,第1.4节,作者:范传辉著,更多章节内容可以访问云栖社区“华章计算机”公众号查看
在爬虫开发中,进程和线程的概念是非常重要的。提高爬虫的工作效率,打造分布式爬虫,都离不开进程和线程的身影。本节将从多进程、多线程、协程和分布式进程等四个方面,帮助大家回顾Python语言中进程和线程中的常用操作,以便在接下来的爬虫开发中灵活运用进程和线程。
1.4.1 多进程 Python实现多进程的方式主要有两种,一种方法是使用os模块中的fork方法,另一种方法是使用multiprocessing模块。这两种方法的区别在于前者仅适用于Unix/Linux操作系统,对Windows不支持,后者则是跨平台的实现方式。由于现在很多爬虫程序都是运行在Unix/Linux操作系统上,所以本节对两种方式都进行讲解。 1.?使用os模块中的fork方式实现多进程 Python的os模块封装了常见的系统调用,其中就有fork方法。fork方法来自于Unix/Linux操作系统中提供的一个fork系统调用,这个方法非常特殊。普通的方法都是调用一次,返回一次,而fork方法是调用一次,返回两次,原因在于操作系统将当前进程(父进程)复制出一份进程(子进程),这两个进程几乎完全相同,于是fork方法分别在父进程和子进程中返回。子进程中永远返回0,父进程中返回的是子进程的ID。下面举个例子,对Python使用fork方法创建进程进行讲解。其中os模块中的getpid方法用于获取当前进程的ID,getppid方法用于获取父进程的ID。代码如下:import os if __name__ == '__main__': print 'current Process (%s) start ...'%(os.getpid()) pid = os.fork() if pid < 0: print 'error in fork' elif pid == 0: print 'I am child process(%s) and my parent process is (%s)',(os.getpid(),? ????os.getppid()) else: print 'I(%s) created a chlid process (%s).',(os.getpid(),pid)
运行结果如下:
current Process (3052) start ... I(3052) created a chlid process (3053). I am child process(3053) and my parent process is (3052)
2.?使用multiprocessing模块创建多进程
multiprocessing模块提供了一个Process类来描述一个进程对象。创建子进程时,只需要传入一个执行函数和函数的参数,即可完成一个Process实例的创建,用start()方法启动进程,用join()方法实现进程间的同步。下面通过一个例子来演示创建多进程的流程,代码如下:import os from multiprocessing import Process # 子进程要执行的代码 def run_proc(name): print 'Child process %s (%s) Running...' % (name, os.getpid()) if __name__ == '__main__': print 'Parent process %s.' % os.getpid() for i in range(5): p = Process(target=run_proc, args=(str(i),)) print 'Process will start.' p.start() p.join() print 'Process end.'
运行结果如下:
Parent process 2392. Process will start. Process will start. Process will start. Process will start. Process will start. Child process 2 (10748) Running... Child process 0 (5324) Running... Child process 1 (3196) Running... Child process 3 (4680) Running... Child process 4 (10696) Running... Process end.
以上介绍了创建进程的两种方法,但是要启动大量的子进程,使用进程池批量创建子进程的方式更加常见,因为当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态生成多个进程,如果是上百个、上千个目标,手动去限制进程数量却又太过繁琐,这时候进程池Pool发挥作用的时候就到了。
3.?multiprocessing模块提供了一个Pool类来代表进程池对象 Pool可以提供指定数量的进程供用户调用,默认大小是CPU的核数。当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来处理它。下面通过一个例子来演示进程池的工作流程,代码如下:from multiprocessing import Pool import os, time, random def run_task(name): print 'Task %s (pid = %s) is running...' % (name, os.getpid()) time.sleep(random.random() * 3) print 'Task %s end.' % name if __name__=='__main__': print 'Current process %s.' % os.getpid() p = Pool(processes=3) for i in range(5): p.apply_async(run_task, args=(i,)) print 'Waiting for all subprocesses done...' p.close() p.join() print 'All subprocesses done.'
运行结果如下:
Current process 9176. Waiting for all subprocesses done... Task 0 (pid = 11012) is running... Task 1 (pid = 12464) is running... Task 2 (pid = 11260) is running... Task 2 end. Task 3 (pid = 11260) is running... Task 0 end. Task 4 (pid = 11012) is running... Task 1 end. Task 3 end. Task 4 end. All subprocesses done.
上述程序先创建了容量为3的进程池,依次向进程池中添加了5个任务。从运行结果中可以看到虽然添加了5个任务,但是一开始只运行了3个,而且每次最多运行3个进程。当一个任务结束了,新的任务依次添加进来,任务执行使用的进程依然是原来的进程,这一点通过进程的pid就可以看出来。
Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
4.?进程间通信
假如创建了大量的进程,那进程间通信是必不可少的。Python提供了多种进程间通信的方式,例如Queue、Pipe、Value+Array等。本节主要讲解Queue和Pipe这两种方式。Queue和Pipe的区别在于Pipe常用来在两个进程间通信,Queue用来在多个进程间实现通信。 首先讲解一下Queue通信方式。Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。有两个方法:Put和Get可以进行Queue操作:运行结果如下:
Process(9968) is writing... Process(9512) is writing... Put url_1 to queue... Put url_4 to queue... Process(1124) is reading... Get url_1 from queue. Get url_4 from queue. Put url_5 to queue... Get url_5 from queue. Put url_2 to queue... Get url_2 from queue. Put url_6 to queue... Get url_6 from queue. Put url_3 to queue... Get url_3 from queue.
最后介绍一下Pipe的通信机制,Pipe常用来在两个进程间进行通信,两个进程分别位于管道的两端。
Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。若duplex为False,conn1只负责接收消息,conn2只负责发送消息。send和recv方法分别是发送和接收消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。 下面通过一个例子进行说明:创建两个进程,一个子进程通过Pipe发送数据,一个子进程通过Pipe接收数据。程序示例如下:import multiprocessing import random import time,os def proc_send(pipe,urls): for url in urls: print "Process(%s) send: %s" %(os.getpid(),url) pipe.send(url) time.sleep(random.random()) def proc_recv(pipe): while True: print "Process(%s) rev:%s" %(os.getpid(),pipe.recv()) time.sleep(random.random()) if __name__ == "__main__": pipe = multiprocessing.Pipe() p1 = multiprocessing.Process(target=proc_send, args=(pipe[0],['url_'+str(i) ? ???? for i in range(10) ])) p2 = multiprocessing.Process(target=proc_recv, args=(pipe[1],)) p1.start() p2.start() p1.join() p2.join()
运行结果如下:
Process(10448) send: url_0 Process(5832) rev:url_0 Process(10448) send: url_1 Process(5832) rev:url_1 Process(10448) send: url_2 Process(5832) rev:url_2 Process(10448) send: url_3 Process(10448) send: url_4 Process(5832) rev:url_3 Process(10448) send: url_5 Process(10448) send: url_6 Process(5832) rev:url_4 Process(5832) rev:url_5 Process(10448) send: url_7 Process(10448) send: url_8 Process(5832) rev:url_6 Process(5832) rev:url_7 Process(10448) send: url_9 Process(5832) rev:url_8 Process(5832) rev:url_9
以上多进程程序运行结果的打印顺序在不同的系统和硬件条件下略有不同。
1.4.2 多线程
多线程类似于同时执行多个不同程序,多线程运行有如下优点:1.?用threading模块创建多线程
threading模块一般通过两种方式创建多线程:第一种方式是把一个函数传入并创建Thread实例,然后调用start方法开始执行;第二种方式是直接从threading.Thread继承并创建线程类,然后重写__init__方法和run方法。 首先介绍第一种方法,通过一个简单例子演示创建多线程的流程,程序如下:运行结果如下:
MainThread is running... Current Thread_1 is running... Thread_1 ---->>> url_1 Current Thread_2 is running... Thread_2 ---->>> url_4 Thread_1 ---->>> url_2 Thread_2 ---->>> url_5 Thread_2 ---->>> url_6 Thread_1 ---->>> url_3 Thread_1 ended. Thread_2 ended. MainThread ended.
第二种方式从threading.Thread继承创建线程类,下面将方法一的程序进行重写,程序如下:
import random import threading import time class myThread(threading.Thread): def __init__(self,name,urls): threading.Thread.__init__(self,name=name) self.urls = urls def run(self): print 'Current %s is running...' % threading.current_thread().name for url in self.urls: print '%s ---->>> %s' % (threading.current_thread().name,url) time.sleep(random.random()) print '%s ended.' % threading.current_thread().name print '%s is running...' % threading.current_thread().name t1 = myThread(name='Thread_1',urls=['url_1','url_2','url_3']) t2 = myThread(name='Thread_2',urls=['url_4','url_5','url_6']) t1.start() t2.start() t1.join() t2.join() print '%s ended.' % threading.current_thread().name
运行结果如下:
MainThread is running... Current Thread_1 is running... Thread_1 ---->>> url_1 Current Thread_2 is running... Thread_2 ---->>> url_4 Thread_2 ---->>> url_5 Thread_1 ---->>> url_2 Thread_1 ---->>> url_3 Thread_2 ---->>> url_6 Thread_2 ended. Thread_1 ended.
2.?线程同步
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。使用Thread对象的Lock和RLock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。 对于Lock对象而言,如果一个线程连续两次进行acquire操作,那么由于第一次acquire之后没有release,第二次acquire将挂起线程。这会导致Lock对象永远不会release,使得线程死锁。RLock对象允许一个线程多次对其进行acquire操作,因为在其内部通过一个counter变量维护着线程acquire的次数。而且每一次的acquire操作必须有一个release操作与之对应,在所有的release操作完成之后,别的线程才能申请该RLock对象。下面通过一个简单的例子演示线程同步的过程:import threading mylock = threading.RLock() num=0 class myThread(threading.Thread): def __init__(self, name): threading.Thread.__init__(self,name=name) def run(self): global num while True: mylock.acquire() print '%s locked, Number: %d'%(threading.current_thread().name, num) if num>=4: mylock.release() print '%s released, Number: %d'%(threading.current_thread().name, num) break num+=1 print '%s released, Number: %d'%(threading.current_thread().name, num) mylock.release() if __name__== '__main__': thread1 = myThread('Thread_1') thread2 = myThread('Thread_2') thread1.start() thread2.start()
运行结果如下:
Thread_1 locked, Number: 0 Thread_1 released, Number: 1 Thread_1 locked, Number: 1 Thread_1 released, Number: 2 Thread_2 locked, Number: 2 Thread_2 released, Number: 3 Thread_1 locked, Number: 3 Thread_1 released, Number: 4 Thread_2 locked, Number: 4 Thread_2 released, Number: 4 Thread_1 locked, Number: 4 Thread_1 released, Number: 4
3.?全局解释器锁(GIL)
在Python的原始解释器CPython中存在着GIL(Global Interpreter Lock,全局解释器锁),因此在解释执行Python代码时,会产生互斥锁来限制线程对共享资源的访问,直到解释器遇到I/O操作或者操作次数达到一定数目时才会释放GIL。由于全局解释器锁的存在,在进行多线程操作的时候,不能调用多个CPU内核,只能利用一个内核,所以在进行CPU密集型操作的时候,不推荐使用多线程,更加倾向于多进程。那么多线程适合什么样的应用场景呢?对于IO密集型操作,多线程可以明显提高效率,例如Python爬虫的开发,绝大多数时间爬虫是在等待socket返回数据,网络IO的操作延时比CPU大得多。1.4.3 协程 协程(coroutine),又称微线程,纤程,是一种用户级的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此协程能保留上一次调用时的状态,每次过程重入时,就相当于进入上一次调用的状态。在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其他协程共享全局数据和其他资源。 协程需要用户自己来编写调度逻辑,对于CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。那么在Python中是如何实现协程的呢? Python通过yield提供了对协程的基本支持,但是不完全,而使用第三方gevent库是更好的选择,gevent提供了比较完善的协程支持。gevent是一个基于协程的Python网络函数库,使用greenlet在libev事件循环顶部提供了一个有高级别并发性的API。主要特性有以下几点:由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO,这就是协程一般比多线程效率高的原因。由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,将一些常见的阻塞,如socket、select等地方实现协程跳转,这一过程在启动时通过monkey patch完成。下面通过一个的例子来演示gevent的使用流程,代码如下:
运行结果如下:
Visit --> https:// github.com/ Visit --> https:// www.python.org/ Visit --> http://www.cnblogs.com/ 45740 bytes received from http://www.cnblogs.com/. 25482 bytes received from https:// github.com/. 47445 bytes received from https:// www.python.org/.
以上程序主要用了gevent中的spawn方法和joinall方法。spawn方法可以看做是用来形成协程,joinall方法就是添加这些协程任务,并且启动运行。从运行结果来看,3个网络操作是并发执行的,而且结束顺序不同,但其实只有一个线程。
gevent中还提供了对池的支持。当拥有动态数量的greenlet需要进行并发管理(限制并发数)时,就可以使用池,这在处理大量的网络和IO操作时是非常需要的。接下来使用gevent中pool对象,对上面的例子进行改写,程序如下:from gevent import monkey monkey.patch_all() import urllib2 from gevent.pool import Pool def run_task(url): print 'Visit --> %s' % url try: response = urllib2.urlopen(url) data = response.read() print '%d bytes received from %s.' % (len(data), url) except Exception,e: print e return 'url:%s --->finish'% url if __name__=='__main__': pool = Pool(2) urls = ['https:// github.com/','https:// www.python.org/','http://www.cnblogs.com/'] results = pool.map(run_task,urls) print results
运行结果如下:
Visit --> https:// github.com/ Visit --> https:// www.python.org/ 25482 bytes received from https:// github.com/. Visit --> http://www.cnblogs.com/ 47445 bytes received from https:// www.python.org/. 45687 bytes received from http://www.cnblogs.com/.['url:https:// github.com/ --->finish', 'url:https:// www.python.org/ --->finish', 'url:http://www.cnblogs.com/ --->finish']
通过运行结果可以看出,Pool对象确实对协程的并发数量进行了管理,先访问了前两个网址,当其中一个任务完成时,才会执行第三个。
1.4.4 分布式进程 分布式进程指的是将Process进程分布到多台机器上,充分利用多台机器的性能完成复杂的任务。我们可以将这一点应用到分布式爬虫的开发中。 分布式进程在Python中依然要用到multiprocessing模块。multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。可以写一个服务进程作为调度者,将任务分布到其他多个进程中,依靠网络通信进行管理。举个例子:在做爬虫程序时,常常会遇到这样的场景,我们想抓取某个网站的所有图片,如果使用多进程的话,一般是一个进程负责抓取图片的链接地址,将链接地址存放到Queue中,另外的进程负责从Queue中读取链接地址进行下载和存储到本地。现在把这个过程做成分布式,一台机器上的进程负责抓取链接,其他机器上的进程负责下载存储。那么遇到的主要问题是将Queue暴露到网络中,让其他机器进程都可以访问,分布式进程就是将这一个过程进行了封装,我们可以将这个过程称为本地队列的网络化。整体过程如图1-24所示。要实现上面例子的功能,创建分布式进程需要分为六个步骤:
1)建立队列Queue,用来进行进程间的通信。服务进程创建任务队列task_queue,用来作为传递任务给任务进程的通道;服务进程创建结果队列result_queue,作为任务进程完成任务后回复服务进程的通道。在分布式多进程环境下,必须通过由Queuemanager获得的Queue接口来添加任务。 2)把第一步中建立的队列在网络上注册,暴露给其他进程(主机),注册后获得网络队列,相当于本地队列的映像。 3)建立一个对象(Queuemanager(BaseManager))实例manager,绑定端口和验证口令。 4)启动第三步中建立的实例,即启动管理manager,监管信息通道。 5)通过管理实例的方法获得通过网络访问的Queue对象,即再把网络队列实体化成可以使用的本地队列。 6)创建任务到“本地”队列中,自动上传任务到网络队列中,分配给任务进程进行处理。 接下来通过程序实现上面的例子(Linux版),首先编写的是服务进程(taskManager.py),代码如下:import random,time,Queue from multiprocessing.managers import BaseManager # 第一步:建立task_queue和result_queue,用来存放任务和结果 task_queue=Queue.Queue() result_queue=Queue.Queue() class Queuemanager(BaseManager): pass # 第二步:把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象, # 将Queue对象在网络中暴露 Queuemanager.register('get_task_queue',callable=lambda:task_queue) Queuemanager.register('get_result_queue',callable=lambda:result_queue) # 第三步:绑定端口8001,设置验证口令‘qiye’。这个相当于对象的初始化 manager=Queuemanager(address=('',8001),authkey='qiye') # 第四步:启动管理,监听信息通道 manager.start() # 第五步:通过管理实例的方法获得通过网络访问的Queue对象 task=manager.get_task_queue() result=manager.get_result_queue() # 第六步:添加任务 for url in ["ImageUrl_"+i for i in range(10)]: print 'put task %s ...' %url task.put(url) # 获取返回结果 print 'try get result...' for i in range(10): print 'result is %s' %result.get(timeout=10) # 关闭管理 manager.shutdown()
任务进程已经编写完成,接下来编写任务进程(taskWorker.py),创建任务进程的步骤相对较少,需要四个步骤:
1)使用QueueManager注册用于获取Queue的方法名称,任务进程只能通过名称来在网络上获取Queue。 2)连接服务器,端口和验证口令注意保持与服务进程中完全一致。 3)从网络上获取Queue,进行本地化。 4)从task队列获取任务,并把结果写入result队列。 程序taskWorker.py代码(win/linux版)如下:# coding:utf-8 import time from multiprocessing.managers import BaseManager # 创建类似的QueueManager: class QueueManager(BaseManager): pass # 第一步:使用QueueManager注册用于获取Queue的方法名称 QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') # 第二步:连接到服务器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证口令注意保持与服务进程完全一致: m = QueueManager(address=(server_addr, 8001), authkey='qiye') # 从网络连接: m.connect() # 第三步:获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 第四步:从task队列获取任务,并把结果写入result队列: while(not task.empty()): image_url = task.get(True,timeout=5) print('run task download %s...' % image_url) time.sleep(1) result.put('%s--->success'%image_url) # 处理结束: print('worker exit.')
最后开始运行程序,先启动服务进程taskManager.py,运行结果如下:
put task ImageUrl_0 ... put task ImageUrl_1 ... put task ImageUrl_2 ... put task ImageUrl_3 ... put task ImageUrl_4 ... put task ImageUrl_5 ... put task ImageUrl_6 ... put task ImageUrl_7 ... put task ImageUrl_8 ... put task ImageUrl_9 ... try get result...
接着再启动任务进程taskWorker.py,运行结果如下:
Connect to server 127.0.0.1... run task download ImageUrl_0... run task download ImageUrl_1... run task download ImageUrl_2... run task download ImageUrl_3... run task download ImageUrl_4... run task download ImageUrl_5... run task download ImageUrl_6... run task download ImageUrl_7... run task download ImageUrl_8... run task download ImageUrl_9... worker exit.
当任务进程运行结束后,服务进程运行结果如下:
result is ImageUrl_0--->success result is ImageUrl_1--->success result is ImageUrl_2--->success result is ImageUrl_3--->success result is ImageUrl_4--->success result is ImageUrl_5--->success result is ImageUrl_6--->success result is ImageUrl_7--->success result is ImageUrl_8--->success result is ImageUrl_9--->success
其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,实现大规模的分布式爬虫。
由于平台的特性,创建服务进程的代码在Linux和Windows上有一些不同,创建工作进程的代码是一致的。
taskManager.py程序在Windows版下的代码如下:
# coding:utf-8 # taskManager.py for windows import Queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support # 任务个数 task_number = 10 # 定义收发队列 task_queue = Queue.Queue(task_number); result_queue = Queue.Queue(task_number); def get_task(): return task_queue def get_result(): return result_queue # 创建类似的QueueManager: class QueueManager(BaseManager): pass def win_run(): # Windows下绑定调用接口不能使用lambda,所以只能先定义函数再绑定 QueueManager.register('get_task_queue',callable = get_task) QueueManager.register('get_result_queue',callable = get_result) # 绑定端口并设置验证口令,Windows下需要填写IP地址,Linux下不填默认为本地 manager = QueueManager(address = ('127.0.0.1',8001),authkey = 'qiye') # 启动 manager.start() try: # 通过网络获取任务队列和结果队列 task = manager.get_task_queue() result = manager.get_result_queue() # 添加任务 for url in ["ImageUrl_"+str(i) for i in range(10)]: print 'put task %s ...' %url task.put(url) print 'try get result...' for i in range(10): print 'result is %s' %result.get(timeout=10) except: print('Manager error') finally: # 一定要关闭,否则会报管道未关闭的错误 manager.shutdown() if __name__ == '__main__': # Windows下多进程可能会有问题,添加这句可以缓解 freeze_support() win_run()