客至汲泉烹茶, 抚琴听者知音

【python教程】map、多进程与进度条

今天讲讲我在实习中学到的一点 python 知识,核心内容是多进程,也即我们常说的并行计算。

map

首先提个问题,给出一个列表,对列表中的每个元素都平方,代码怎么写?

最简单直观的方法自然就是 for 循环。

alist = [1,2,3,4,5,6,7,8]

def power_value(num):
    return num**2

result_list = []
for num in alist:
    result_list.append(power_value(num))

print(result_list)

>>> [1, 4, 9, 16, 25, 36, 49, 64]
注:当然你也可以不定义函数直接循环,这里主要是为了方便开展下文。

学过列表推导式的同学可以写出更加简洁的方案:

result_list = [power_value(x) for x in alist]

print(result_list)

>>> [1, 4, 9, 16, 25, 36, 49, 64]

但其实,除了上面两种方法外,python 还内置了一个函数 map,专门解决这种不断往同一个函数传不同参数的问题。它的语法也很简单:

map(function, iterable, ...)

第一个参数为函数名,第二个参数和之后的参数均为序列(列表、元组、range()等等)。第一个参数 function 以参数序列中的每一个元素调用 function 函数,返回包含每次 function 函数返回值的新列表。有了这个函数,上面的代码就可以改写为:

result_list = list(map(power_value,alist))

print(result_list)

>>> [1, 4, 9, 16, 25, 36, 49, 64]

这里为什么要加一个 list() 呢?因为 map 返回的是一个迭代器,一般情况下只能循环遍历读取,所以加个 list 把它转换为列表。

上面的函数只有一个参数,那假如它有两个参数呢?我们分为两种情况,一是这两个参数是成对出现的,比如说,两个列表相加,就是列表对应的元素相加。这种情况我们也可以直接应用 map。

alist = [1,2,3,4,5,6,7,8]
blist = [2,3,4,5,6,7,8,9]

def add_value(a,b):
    return a + b

result_list = list(map(add_value,alist,blist))

print(result_list)

>>> [3, 5, 7, 9, 11, 13, 15, 17]

第二种情况,这两个参数并不对应,常见情况是需要固定一个参数,比如上面的加法函数,我需要固定 b=5,这时候,可以借助 python 提供的另外一个函数 partial,这是 python 自带包 functools 里的函数,它的作用就是固定某些参数,从而构造出一个新函数。其语法为:

partial(func, param, ……)

这里有个容易忽略的点,如果不指定参数位置的话,那么默认从第一个参数开始固定,所以使用它的时候最好指定参数。但是被指定的参数必须在函数的最后部分,比如 func01(a,b,c),我们可以固定 b 和 c,可以固定 c,但是绝对不能只固定 b!否则会报错。

from functools import partial

alist = [1,2,3,4,5,6,7,8]

def add_value(a,b):
    return (a + b) * a

func = partial(add_value,b = 5)
result_list = list(map(func,alist))

print(result_list)

>>> [6, 14, 24, 36, 50, 66, 84, 104]

多进程

常被和进程一起提起的是线程,不过我也没搞懂这两的具体原理。总之明白两个区别就好:

  1. 进程之前数据不共享,线程之间数据共享
  2. 进程之间不会相互影响,一个进程挂掉另外一个进程照常工作。而一个线程挂掉则所有线程都会挂掉。

我在工作中常用到的是多进程,所以主要讲讲它。

python 自带了一个多进程库 multiprocessing ,利用上面学到的 map 知识,我们可以很容易实现并行运算,有了它,上面的代码可以改写如下:

import multiprocessing
from functools import partial

alist = [1,2,3,4,5,6,7,8]

def add_value(a,b):
    return (a + b) * a

func = partial(add_value,b = 5)

if __name__ == '__main__':
    num_processes = multiprocessing.cpu_count()-2 # 使用核心数
    pool = multiprocessing.Pool(processes=num_processes) # 实例化进程池
    func = partial(add_value,b = 5)
    print(pool.map(func,alist))

这样就实现了一个最简单的多进程,可以发现,和 map 相比就多了一个实例化进程池的过程。需要注意的是,这里开启多进程,肯定是要比单进程速度慢的,因为系统在进程间分配任务也是需要时间的,所以我们也不能无脑开多进程,大型任务才可能需要。

另外,pool.map 返回的是一个列表,如果想像 map 那样返回迭代器的话,可以使用 pool.imap,这两个函数返回都是有序的,如果有特殊需求想返回无须结果,可以使用 imap_unordered

最重要的一点,多进程必须在 if __name__ == '__main__' 下写!否则会报错,当然了,你在函数里使用多进程,然后在 if __name__ == '__main__' 下调用这个函数也是允许的。

这是 Windows 上多进程的实现问题。在 Windows 上,子进程会自动 import 启动它的这个文件,而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归创建子进程报错。所以必须把创建子进程的部分用那个 if 判断保护起来, import 的时候 name 不是 main,就不会递归运行了。

@cholerae

进度条

在执行大型任务的时候,我一般是输出一个中间结果就 print 一条提示语句,这样可以让我知道程序仍然在正常运行,而且也能大致知道处理到哪了。不过,如果只是上面两点需求的话,其实可以使用进度条,这样更加方便直观。最常见的进度条库是 tqdm,pip 安装好后就可以使用了,语法也很简单。

from tqdm import tqdm,trange 
for i in trange(100):
    time.sleep(0.1)

for i in tqdm(range(100)):
    time.sleep(0.1)

不过问题在于,多进程下如何使用进度条?自然,可以为每个进程生成一个进度条,但是我想看总的任务进度,而不是展示 5、6 根进度,怎么做?针对这个需求,我借鉴 Lei Mao,重写了 run_imap_mp 函数,用法和 map 基本一样,其他参数注释里也有说明,这里就不细说了。

def run_imap_mp(func, argument_list, num_processes='', is_tqdm=True):
    '''
    多进程与进度条结合

    param:
    ------
    func:function
        函数
    argument_list:list
        参数列表
    num_processes:int
        进程数,不填默认为总核心-3
    is_tqdm:bool
        是否展示进度条,默认展示
    ''' 
    result_list_tqdm = []
    try:
        import multiprocessing
        if num_processes == '':
            num_processes = multiprocessing.cpu_count()-3
        pool = multiprocessing.Pool(processes=num_processes)
        if is_tqdm:
            from tqdm import tqdm
            for result in tqdm(pool.imap(func=func, iterable=argument_list), total=len(argument_list)):
                result_list_tqdm.append(result)
        else:
            for result in pool.imap(func=func, iterable=argument_list):
                result_list_tqdm.append(result)
        pool.close()
    except:
        result_list_tqdm = list(map(func,argument_list))   
    return result_list_tqdm

添加新评论