Python 基础 - Multiprocessing

来源:转载


多进程 Multiprocessing 和多线程 threading 类似,Multiprocessing就是用来弥补 threading 的一些劣势, 比如GIL。



导入线程、进程标准模块
import multiprocessing as mp
import threading as td
定义一个被线程和进程调用的函数
def job(a,d):
print('aaaaa')
创建线程和进程
t1 = td.Thread(target=job,args=(1,2))
p1 = mp.Process(target=job,args=(1,2))
分别启动线程和进程

t1.start()
p1.start()
t1.join()
p1.join()


线程和进程的使用方法相似


添加main函数


import multiprocessing as mp


def job(a,d):
print(‘aaaaa’)


if name==’main‘:
p1 = mp.Process(target=job,args=(1,2))
p1.start()
p1.join()


Queue

Queue允许多个进程放入,多个进程从队列取出对象。Queue使用mutiprocessing.Queue(maxsize)创建,maxsize表示队列中可以存放对象的最大数量。


例子

import multiprocessing as mp
def job(q):
res = 0
for i in range(1000):
res += i+i**2+i**3
q.put(res) # queue
if name == ‘main‘:
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print(res1+res2)


效率对比 threading & multiprocessing
例子

创建多进程 multiprocessing,创建多线程 multithread再创建普通函数
最后运行进行时间比较。


import multiprocessing as mp
import threading as td
import time
def job(q):
res = 0
for i in range(1000000):
res += i + i**2 + i**3
q.put(res) # queue
def multicore():
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print('multicore:',res1 + res2)
def multithread():
q = mp.Queue() # thread可放入process同样的queue中
t1 = td.Thread(target=job, args=(q,))
t2 = td.Thread(target=job, args=(q,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = q.get()
res2 = q.get()
print('multithread:', res1 + res2)
def normal():
res = 0
for _ in range(2):
for i in range(1000000):
res += i + i**2 + i**3
print('normal:', res)
if __name__ == '__main__':
st = time.time()
normal()
st1 = time.time()
print('normal time:', st1 - st)
multithread()
st2 = time.time()
print('multithread time:', st2 - st1)
multicore()
print('multicore time:', time.time() - st2)

输出
(‘normal:’, 499999666667166666000000L)
(‘normal time:’, 0.4128448963165283)
(‘multithread:’, 499999666667166666000000L)
(‘multithread time:’, 0.7864401340484619)
(‘multicore:’, 499999666667166666000000L)
(‘multicore time:’, 0.2593879699707031)
时间是 多进程 < 普通 < 多线程,由此我们可以清晰地看出哪种方法更有效率。


进程池 Pool

进程池就是将所要运行的东西,放到池子里,Python会自行解决多进程的问题。


首先import multiprocessing和定义job()


import multiprocessing as mp
def job(x):
return x*x

然后我们定义一个Pool


pool = mp.Pool()

向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。 用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果


res = pool.map(job, range(10))

运行一下


import multiprocessing as mp
def job(x):
return x*x
pool = mp.Pool()
def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print res
if __name__ == '__main__':
multicore()

输出


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
自定义核数量

Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量


pool = mp.Pool(processes=3) # 定义CPU核数量为3
apply_async()

Pool除了map()外,还有可以返回结果的方式,那就是apply_async()。apply_async()中只能传递一个值,它只会放入一个核进行运算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值


def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
# 用get获得结果
print(res.get())

输出


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]# map()
4


apply_async()只能输入一组参数。因此我们将apply_async() 放入迭代器中,定义一个新的multi_res


multi_res = [pool.apply_async(job, (i,)) for i in range(10)]

同样在取出值时需要一个一个取出来


合并代码


def multicore():
pool = mp.Pool()
res = pool.map(job, range(10))
print(res)
res = pool.apply_async(job, (2,))
# 用get获得结果
print(res.get())
# 迭代器,i=0时apply一次,i=1时apply一次等等
multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
# 从迭代器中取出
print([res.get() for res in multi_res])

输出:


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # map()
4
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # multi_res


shared memory

只有用共享内存才能让CPU之间有交流。


Value

通过使用Value数据存储在一个共享的内存表中。


value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)

d和i参数用来设置数据类型的,d表示一个双精浮点类型,i表示一个带符号的整型


Array

Array类,可以和共享内存交互,来实现在进程之间共享数据。


array = mp.Array('i', [1, 2, 3, 4])

Array只能是一维的,不能是多维的。同样和Value 一样,需要定义数据形式,否则会报错。


各参数代表的数据类型
Type code
C Type
Python Type
Minimum size in bytes
'b'
signed char
int
1
'B'
unsigned char
int
1
'u'
Py_UNICODE
Unicode character
2
'h'
signed short
int
2
'H'
unsigned short
int
2
'i'
signed int
int
2
'I'
unsigned int
int
2
'l'
signed long
int
4
'L'
unsigned long
int
4
'q'
signed long long
int
8
'Q'
unsigned long long
int
8
'f'
float
float
4
'd'
double
float
8
进程锁 Lock

为了解决不同进程抢共享资源的问题,我们可以用加进程锁来解决。


首先需要定义一个进程锁 然后将进程锁的信息传入各个进程中 在job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占


import multiprocessing as mp
import time
def job(v, num, l):
l.acquire()
for _ in range(10):
time.sleep(0.1)
v.value += num
print(v.value)
l.release()
def multicore():
l = mp.Lock()
v = mp.Value('i', 0)
p1 = mp.Process(target=job, args=(v, 1, l))
p2 = mp.Process(target=job, args=(v, 3, l))
p1.start()
p2.start()
p1.join()
p2.join()
if __name__ == '__main__':
multicore()

就会有序输出


1
2
3
4
5
6
7
8
9
10
13






分享给朋友:
您可能感兴趣的文章:
随机阅读: