Python——多进程、进程池、进程间通信(还有一个复制文件的小程序)

Python 中的进程

什么是进程

​ 当一段代码被运行,或者一个应用程序被运行,就会创建一个进程,以下内容来自百度百科:

“进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体”

并行与并发

​ 每台计算机,cpu 是计算机的运算核心,多核计算机的意思就是,这台计算机上可以同时运行数量等于其核数的进程

​ 并发执行,即 cpu 核数 < 进程数,这个时候这些进程需要操作系统中的调度算法(时间片轮转算法、优先级调度算法等)来协调

​ 并行执行,即 cpu 核数 > 进程数,这个时候所有的进程都能得到执行,就是并行

Python 中实现多进程的三种方法

​ 在 Python 中,想要实现多进程有三种方式:使用os模块中的fork()函数、使用multiProcessing模块的Process类、使用multiProcessing模块中的Pool(进程池),下面分别介绍这三种方法及他们之间的比较

使用 os.fork() 实现多进程

​ 首先,fork()函数只有在 Linux 或者是 类 Linux 系统中才能使用

​ 当程序运行到这个函数时,会新创建一个进程,新的进程会从这个函数的返回值处开始执行,fork()函数很特殊,在父进程里,fork()的返回值为> 0,并且这个返回值就是主进程的pid,而在子进程中fork()的返回值是== 0

​ 在有fork()函数的程序中,若主程序在子程序结束之前运行完,那么系统不会等待子进程结束,而是会直接结束主进程,但这并不代表子进程也会被结束,子进程此时已经是一个独立的进程,它会自己运行到结束为止

​ 示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import os
import time

ret = os.fork()

if ret == 0:
while True:
print('Dad:%d : %d' % (ret, os.getpid()))
time.sleep(1)

else:
while True:
print('Son:%d : %d : %d' % (ret, os.getpid(), os.getppid()))
time.sleep(1)

​ 运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Son:6081 : 6079 : 5917
Dad:0 : 6081
Son:6081 : 6079 : 5917
Dad:0 : 6081
Son:6081 : 6079 : 5917
Dad:0 : 6081
Son:6081 : 6079 : 5917
Dad:0 : 6081
Son:6081 : 6079 : 5917
Dad:0 : 6081
Son:6081 : 6079 : 5917
Dad:0 : 6081
Son:6081 : 6079 : 5917
Dad:0 : 6081

os模块的其他常用方法:

1
2
3
4
# 获取当前进程的 pid(进程号)
os.getpid():
# 获取当前进程父进程的 pid
os.getppid():

使用 Process 实现多进程

multiProcessing模块是一个在 Windows 中也可以使用的模块,它包含Process,使用 Process 创建的进程,主进程会等待所有的子进程结束后才结束

​ 并且,Process可以被继承,这样也使得Process可以被更灵活的运用,以及封装功能性代码

​ 使用Process创建进程的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing import Process
import time


def test():
while True:
print('----test----')
time.sleep(1)


p = Process(target=test)

p.start()

while True:
print('----main----')
time.sleep(1)

​ 运行结果:

1
2
3
4
5
6
7
8
9
----main----
----test----
----main----
----test----
----main----
----test----
----main----
----test----
----main----

​ 创建一个类继承Process来实现多进程的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from multiprocessing import Process
import time
import os


class Process_Class(Process):

def __init__(self, interval):
Process.__init__(self)
self.interval = interval

def run(self):
print('son(%s) start, dad is (%s)' % (os.getpid(), os.getppid()))
t_start = time.time()
time.sleep(self.interval)
t_stop = time.time()
print('(%s) is over, Is %0.2f seconds' % (os.getpid(), t_stop - t_start))


if __name__ == '__main__':
t_start = time.time()
print('now Process is (%s)' % os.getpid())
p1 = Process_Class(2)
p1.start()
p1.join()
t_stop = time.time()
print('(%s) is over, spend %0.2f seconds' % (os.getpid(), t_stop - t_start))

​ 运行结果:

1
2
3
4
now Process is (6155)
son(6157) start, dad is (6155)
(6157) is over, Is 2.00 seconds
(6155) is over, spend 2.01 seconds

os.Process的其他常用方法及属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 常用方法:
# 判断进程实例是否还在执行
Process.is_alive()
# 是否等待进程实例执行结束,或等待多少秒
Process.join([timeout])
# 启动进程实例(创建子进程)
Process.start()
# 如果没有给定 target 参数,这个对象调用 start() 方法时,就将执行对象中的 run() 方法
Process.run()
# 不管任务是否完成,立即终止
Process.terminate()
# 属性:
# 当前进程实例别名,默认为 Process-N,N 为从 1 开始递增的整数
name
# 当前进程实例的 PID 值
pid

使用 Pool(进程池)实现多进程

​ 当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态生成多个进程,但如果时上百甚至上千个目标,手动的去创建进程的工作量巨大,此时可以用到multiprocessing模块提供的Pool方法

​ 初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程来执行请求;如果进程池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中进程结束,才会请求池中的空闲进程来执行

​ 使用Pool实现多进程的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from multiprocessing import Pool
import os
import time


def worker(num):
for i in range(5):
print('===pid=%d===num=%d' % (os.getpid(), num))
time.sleep()


pool = Pool(3)

for i in range(10):
print('---%d---' % i)
pool.apply_async(worker, (i,))

pool.close()
pool.join()

​ 运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
---0---
---1---
---2---
---3---
---4---
---5---
---6---
---7---
---8---
---9---
===pid=6223===num=0
===pid=6224===num=1
===pid=6225===num=2
===pid=6225===num=3
===pid=6224===num=4
===pid=6224===num=5
===pid=6225===num=6
===pid=6225===num=9
===pid=6223===num=7
===pid=6224===num=8

Pool的其他常用方法:

1
2
3
4
5
6
7
8
9
# 非堵塞式添加进程,第一个参数为需要调用的目标,第二个参数为传递给目标参数的元组
# 如果添加的任务数量超过了进程池中进程的个数的话,不会导致添加不进去,它会等待进程池中的进程完成一个任务后,自动的用刚刚那个进程,完成当前的新任务
Pool.apply_async(func, (item, item, ...))
# 堵塞式添加进程,每个请求添加就开始执行,结束后再添加下一个请求
Pool.apply(func, (item, item, ...))
# 关闭进程池,关闭后进程池不会再接收新的请求
Pool.close()
# 主进程在向进程池创建或添加任务后,默认不会等待进程池中的任务执行完,join 函数就是等待 Pool 中所有的子进程执行完成,必须放在 close() 函数后执行
Pool.join()

三种方法的比较:

fork():最底层的方法,其余两种方法创建进程实际上最底层也是通过fork()来创建的,fork()不能直接跨平台使用,只支持 Linux 或类 Linux 平台
Process:父进程和子进程都用来执行,并且父进程会等待所有子进程结束后再结束
PoolPool方法中,父进程一般只用于等待,真正的任务都在子进程中执行

进程间通信(使用 Queue)

​ 进程之间是没有任何关联的,可以使用队列来进行进程间的通信,这里使用的是multiProcessing模块的QueueManage(用于线程池)

​ 队列是一种先进先出(FIFO)的数据结构,下面是通过不断向同一个队列中输入数据和读取数据实现进程间的通信

使用 Queue(使用 Process 实现的多进程)

​ 直接放代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from multiprocessing import Process
from multiprocessing import Queue
import time
import random


def write(q):
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())


def read(q):
while True:
if not q.empty():
value = q.get(True)
print('Get %s from queue' % value)
time.sleep(random.random())
else:
break


if __name__ == '__main__':

q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))

pw.start()
pw.join()

pr.start()
pr.join()

print(' ')
print()

​ 运行结果:

1
2
3
4
5
6
Put A to queue...
Put B to queue...
Put C to queue...
Get A from queue
Get B from queue
Get C from queue

使用 Manage(使用 Pool 实现的多进程)

​ 直接放代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from multiprocessing import Manager
from multiprocessing import Pool
import os


def reader(q):
print('reader start (%s), Dad is (%s)' % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print('rader get from queue : %s' % q.get(True))


def writer(q):
print('writer start (%s), Dad is (%s)' % (os.getpid(), os.getppid()))
for i in 'LiMingHui love GH':
q.put(i)


if __name__ == '__main__':
print('(%s) start' % os.getpid())
q = Manager().Queue()
po = Pool()

po.apply(writer, (q,))
po.apply(reader, (q,))

po.close()
po.join()

print('(%s) is end' % os.getpid())

​ 运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
(6306) start
writer start (6313), Dad is (6306)
reader start (6313), Dad is (6306)
rader get from queue : L
rader get from queue : i
rader get from queue : M
rader get from queue : i
rader get from queue : n
rader get from queue : g
rader get from queue : H
rader get from queue : u
rader get from queue : i
rader get from queue :
rader get from queue : l
rader get from queue : o
rader get from queue : v
rader get from queue : e
rader get from queue :
rader get from queue : G
rader get from queue : H
(6306) is end

一个使用多进程实现的复制文件夹的小程序

​ 只需要输入文件夹名字,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os
from multiprocessing import Pool
from multiprocessing import Manager


def copyFile(name, oldFolderName, newFolderName, queue):
fr = open(oldFolderName + '/' + name)
fw = open(newFolderName + '/' + name, 'w')

content = fr.read()
fw.write(content)

fr.close()
fw.close()

queue.put(name)


def main():
oldFolderName = input('Floder-name is:')
newFolderName = oldFolderName + '_scopy'
os.mkdir(newFolderName)

fileNames = os.listdir(oldFolderName)

pool = Pool(5)
queue = Manager().Queue()

for name in fileNames:
pool.apply_async(copyFile, args=(name, oldFolderName, newFolderName, queue))

num = 0
allNum = len(fileNames)

while num < allNum:
queue.get()
num += 1
copyRate = num / allNum
print('\rcopy : %.2f %%' % (copyRate * 100), end="")

print('copy end...')


if __name__ == '__main__':
main()
-------------本文结束感谢您的阅读-------------