Python多进程中内存优化的一些技巧

前言

前段时间写了个爬虫,结构是典型的 Producer-Consumer 模式,要爬取的数据在20万左右,需要长时间运行。在开始阶段爬虫经常自己退出,log 里也没有报错,后来长时间观察才发现是内存占满了。
除了占内存,速度稍微慢点,Python 还是比较多优点,比如节省大量开发时间,还有良好的生态。现在家用 PC 基本配置都上来了,所以上面两个缺点在本地环境基本可以忽略,但在服务器上就要进行一些优化来节省开支了。

进程池(Pool)的内存优化

先上示例代码:

from multiprocessing import Pool
pool = Pool(processes=parent_maxworkers,maxtasksperchild=2)
for url in urls:
    pool.apply_async(consumer,args=(url, ))
pool.join()

进程池很方便,把任务扔进去设好worker数就可以了,不需要单独 fork 出几个进程来。
但是注意上面的apply_async那里,如果注册的任务数量(或者说调用次数)太多,进程池没来得及完成旧任务就又添派新任务,新任务似乎会堆积在内存之中,总任务太多就会塞爆内存。这里加上maxtasksperchild,官方文档的解释如下:

maxtasksperchild is the number of tasks a worker process can complete before it will exit and be replaced with a fresh >worker process, to enable unused resources to be freed. The default maxtasksperchild is None, which means worker >processes will live as long as the pool.

maxtasksperchild代表了在进程池中,一个进程在完成了多少任务之后才会退出,然后给一个全新的进程所取代,从而达到释放 资源的目的。默认值为None,意思是进程将常驻于进程池中。
由于没有比较好的垃圾回收机制,长时间运行的脚本要自己释放资源,maxtasksperchild把资源问题传递到了新进程,能起到比较好的内存优化作用,开启之后子进程能看到不断变化的 PID。但这种解决方法还是不够完美,时间一长还是会内存泄漏。

使用BoundedSemaphore进行任务调度

(代码来源:Stackoverflow)

from threading import BoundedSemaphore

class TaskManager(object):
    def __init__(self, process_num, queue_size):
        self.pool = Pool(processes=process_num,maxtasksperchild=2)
        self.workers = BoundedSemaphore(queue_size)

    def new_task(self, function,arguments):
        """Start a new task, blocks if queue is full."""
        self.workers.acquire()
        self.pool.apply_async(function, args=arguments, callback=self.task_done)

    def task_done(self,*args,**kwargs):
        """Called once task is done, releases the queue is blocked."""
        self.workers.release()

    def join(self):
        self.pool.close()
        self.pool.join()

if __name__ == '__main__':
    queue_size=120
    pool = TaskManager(process_num=20,queue_size=queue_size)
    for url in urls:
        pool.new_task(function=consumer, arguments=(url, ))
    pool.join()

上面代码实现了一个Pool的封装TaskManager,可以从Pool基本无缝迁移。TaskManagerqueuesize对进程池任务数进行控制,旧任务没完成前不会开展新任务,阻塞了生产者的运行,能比较有效地解决内存泄漏问题。把queue_size调小还可以跑在 256 MB 小型服务器上,而且效率也不会差太多。