Python——生产者消费者问题、死锁、同步和异步

多线程产生的问题

​ 并发编程由于需要很多线程、进程之间的协作,所以很容易出现问题,下面主要介绍生产者与消费者问题、死锁问题、同步和异步问题

生产者与消费者问题

​ 当我们进行一个任务,需要两个线程不断地获取和操作数据时,可能会产生一个问题,如果数据获取很快,而操作很慢,那么获取数据的线程就必须等待操作数据的线程处理完毕,反之如果数据获取的很慢而操作的很快,那么操作数据的线程就必须等待获取数据的线程,这种问题可以看作是一种生产消费能力的不平衡,称之为生产者与消费者问题

解决方法

​ 解决这种生产消费能力不平衡的方法就是,在在生产者和消费者中间设立一个缓冲机制,生产者将生产的数据放入缓冲池中,消费者从缓冲池中取出数据进行处理,当缓冲池中的数据量小于一定的值时,生产者就会向缓冲池中添加数据,而当缓冲池中数据大于一个值时,消费者就会从缓冲池中取出数据,使生产者和消费者达到一种动态的平衡

​ 实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import threading
import time
from queue import Queue


class Producer(threading.Thread):
def run(self):
global Queue
count = 0
while True:
if queue.qsize() < 1000:
for i in range(100):
count += 1
msg = 'Pro ' + str(count)
queue.put(msg)
time.sleep(0.5)


class Consumer(threading.Thread):
def run(self):
global queue
while True:
if queue.qsize() > 100:
for i in range(3):
msg = self.name + 'Spend ' + queue.get()
print(msg)
time.sleep(1)


if __name__ == '__main__':
queue = Queue()

for i in range(500):
queue.put('Start Pro ' + str(i))
for i in range(2):
p = Producer()
p.start()
for i in range(5):
c = Consumer()
c.start()

​ 运行结果(截取):

1
2
3
4
5
6
7
8
9
10
11
12
Thread-7Spend Start Pro 494
Thread-3Spend Start Pro 495
Thread-5Spend Start Pro 496
Thread-5Spend Start Pro 498
Thread-5Spend Start Pro 499
Thread-3Spend Start Pro 497
Thread-3Spend Pro 1
Thread-4Spend Pro 2
Thread-4Spend Pro 3
Thread-4Spend Pro 4
Thread-6Spend Pro 5
Thread-6Spend Pro 6

死锁问题

​ 在多线程中,线程可以通过互斥锁来保证对同一资源的唯一占有,但当程序变得复杂后,可能会出现线程 A 对资源 A 上了锁,而线程 A 后边需要用到资源 B,使用完毕后才会对资源 A解锁,而线程 B 对资源 B 上了锁,它后边选要用到资源 A,用过后才会给 B 解锁,如果线程 A 和线程 B 同时运行,就可能会造成一种情况:线程 A 在等待线程 B 解锁,线程 B 也在等待线程 A 解锁,这就是死锁问题

死锁模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import threading
import time


class MyThraed1(threading.Thread):
def run(self):
if mutexA.acquire():
print(self.name + '----do 1----up----')
time.sleep(1)
if mutexB.acquire():
print(self.name + '----do 1----down----')
mutexB.release()
mutexA.release()


class MyThread2(threading.Thread):
def run(self):
if mutexB.acquire():
print(self.name + '----d0 2----up----')
time.sleep(1)
if mutexA.acquire():
print('----do 2----down----')
mutexA.release()
mutexB.release()


mutexA = threading.Lock()
mutexB = threading.Lock()

if __name__ == '__main__':
t1 = MyThraed1()
t2 = MyThread2()

t1.start()
t2.start()

​ 以上代码运行后,毫无疑问会产生死锁问题,结果如下

1
2
Thread-1----do 1----up----
Thread-2----d0 2----up----

​ 程序由于三个线程都在等待对方释放资源而卡住了

解决方法

​ 死锁问题应该尽量在设计程序时避免,或添加等待超时时间,从而检测程序是否产生了死锁,另一种就是通过银行家算法也可以避免死锁问题

银行家算法

​ 银行家算法的思想就是,假设银行有 10 元,这个时候有三个人提出贷款,A 要贷款 9 元,B 要贷款 3 元,C 要贷款 8 元,这时,银行肯定不够将所有人都满足,银行家算法就诞生了

​ 这时银行为了留住所有客户并且保证自己的钱不会不足,便分批贷款给客户,先借给 A 2 元、B 2 元、C 4 元,银行还剩 2 元,此时 B 直需要再借 1 元就满足了他自己的需求,银行便借给他 1 元,自己剩 1 元,当 B 用完,将 3 元还给银行后,银行再将这 4 元借给 C,C 也就满足了,等 C 还款后,再将 8 元中的 7 元借给 A,这样便动态的满足了三个客户的需求

​ 银行家算法在程序中实际上也是模拟了银行贷款的过程,操作系统会动态的向各个线程分配资源,在分配前,系统会判断分配后会不会导致系统进入不安全状态,不会就分配资源给线程,会则令线程等待

同步和异步

​ 同步和异步是并发编程下的两种重要的状态

同步

​ 同步是指当程序 A 调用程序 B 时,程序 A 停下不动,等待程序 B 完成后再继续运行

​ 举个例子就是,假设 A 喊 B 出去吃饭,B 说等我写完代码再去,A 就一直在原地等着 B,这就是同步

​ 归根结底,同步实现的就是一种顺序的运行

使用互斥锁实现线程同步

​ 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from threading import Thread
from threading import Lock
from time import sleep


class Task1(Thread):
def run(self):
while True:
if lock1.acquire():
print('-----Task 1-----')
sleep(0.5)
lock2.release()


class Task2(Thread):
def run(self):
while True:
if lock2.acquire():
print('-----Task 2-----')
sleep(0.5)
lock3.release()


class Task3(Thread):
def run(self):
while True:
if lock3.acquire():
print('-----Task 3-----')
sleep(0.5)
lock1.release()


lock1 = Lock()

lock2 = Lock()
lock2.acquire()

lock3 = Lock()
lock3.acquire()

t1 = Task1()
t2 = Task2()
t3 = Task3()

t1.start()
t2.start()
t3.start()

​ 运行结果

1
2
3
4
5
6
-----Task 1-----
-----Task 2-----
-----Task 3-----
-----Task 1-----
-----Task 2-----
-----Task 3-----

异步

​ 反之,异步是指当程序 A 调用程序 B 后,A 不会等到 B 执行完再运行,而是继续向下运行自己的程序

​ 举个例子就是,A 还是叫 B 去吃饭,B 依然说敲完代码再去,这时 A 没有等 B,而是去做自己的事情,等 B 敲完代码,两个再一起去吃饭

使用进程池实现进程异步

​ 代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from multiprocessing import Pool
import time
import os


def test():
print('-----Process in Pool----- pid = %d, ppid = %d' % (os.getpid(), os.getppid()))
for i in range(3):
print('----%d----' % i)
time.sleep(1)
return 'hahah'


def test2(args):
print('---callback func---- pid = %d' % os.getpid())
print('---callback func---- args = %s' % args)


pool = Pool(3)

pool.apply_async(func=test, callback=test2)

time.sleep(5)

print('----mainProcess pid = %d----' % os.getpid())

​ 运行结果:

1
2
3
4
5
6
7
-----Process in Pool----- pid = 6027, ppid = 6025
----0----
----1----
----2----
---callback func---- pid = 6025
---callback func---- args = hahah
----mainProcess pid = 6025----
-------------本文结束感谢您的阅读-------------