********進程同步控制********
****鎖--multiprocessing.Lock****
通過學習,我麼實現了程序的異步,讓多個任務可以同時在幾個進程中處理,他們之間的運行沒有順序,一旦
開啟不受我們控制。儘管併發編程讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題
當多個進程使用同一份數據資源的時候,就會引發數據安全或順序混亂問題。
**多進程搶佔資源
import os
import time
import random
from multiprocessing import Process
def work(n):
print('%s: %s is running' % (n, os.getpid()))
time.sleep(random.random())
print('%s:%s is done' % (n, os.getpid()))
if __name__ == '__main__':
for i in range(3):
p = Process(target=work, args=(i,))
p.start()
**使用鎖維護執行順序
由併發變成了串行,犧牲了運行效率,但避免了競爭
import time
import random
import os
from multiprocessing import Process,Lock
def work(n, l):
l.acquire()
print('%s: %s is running' % (n, os.getpid()))
time.sleep(random.random())
print('%s:%s is done' % (n, os.getpid()))
l.release()
if __name__ == '__main__':
l = Lock()
for i in range(3):
p = Process(target=work, args=(i,l))
p.start()
上面這種情況雖然使用加鎖的形式實現了順序的執行,但是程序又重新變成串行了,這樣確實會浪費了時間,卻保證了數據的安全。
接下來,我們以模擬搶票為例,來看看數據安全的重要性。
from multiprocessing import Lock, Process
import time
def show(i): # 產看餘票信息
with open('餘票') as f:
temp = f.read()
print('第%s個人查到還有餘票%s' % (i, temp))
def buy(i, l): # 買票
l.acquire()
with open('餘票') as f:
temp = int(f.read())
time.sleep(0.1)
if temp > 0:
temp -= 1
print('\033[31m 第%s個人買到了票\033[0m' % i)
else:
print('\033[32m第%s個人沒有買到票\033[0m' % i)
time.sleep(0.1)
with open('餘票', 'w') as f:
f.write(str(temp))
l.release()
if __name__ == '__main__':
l = Lock()
for i in range(10): # 多用戶查看票數
p = Process(target=show, args=(i + 1,))
p.start()
for i in range(10): # 模擬併發多個用戶搶票
p = Process(target=buy, args=(i + 1, l))
p.start()
#加鎖可以保證多個進程修改同一塊數據時,同一時間只能有一個任務可以進行修改,
#即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。
雖然可以用文件共享數據實現進程間通信,但問題是:
1.效率低(共享數據基於文件,而文件是硬盤上的數據)
2.需要自己加鎖處理
#因此我們最好找尋一種解決方案能夠兼顧:
1、效率高(多個進程共享一塊內存的數據)
2、幫我們處理好鎖問題。這就是mutiprocessing模塊為我們提供的基於消息的IPC通信機制:隊列和管道。
隊列和管道都是將數據存放於內存中
隊列又是基於(管道+鎖)實現的,可以讓我們從複雜的鎖問題中解脫出來,
我們應該儘量避免使用共享數據,儘可能使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,
而且在進程數目增多時,往往可以獲得更好的可獲展性。
****信號量****
介紹:
互斥鎖同時只允許一個線程更改數據,而信號量Semaphore是同時允許一定數量的線程更改數據 。
假設商場裡有4個迷你唱吧,所以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
實現:
信號量同步基於內部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時,
acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信號量概念P()和V()的Python實現。信號量同步機制適用於訪問
像服務器這樣的有限資源。
信號量與進程池的概念很像,但是要區分開,信號量涉及到加鎖的概念
from multiprocessing import Process, Semaphore
import time, random
def work(sem, user):
sem.acquire()
print('%s 佔到一個房間' % user)
time.sleep(random.random()) # 模擬在房間中待的時間
sem.release()
if __name__ == '__main__':
sem = Semaphore(4)
p_ls = []
for i in range(10):
p = Process(target=work, args=(sem, 'user %s' % i,))
p.start()
p_ls.append(p)
[i.join() for i in p_ls]
print('>>>>>>>>>>>>>>>>>>>')
****事件****
python線程的事件用於主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:is_set() 方法的值,默認為False
那麼當程序執行 event.wait 方法時就會阻塞,
如果is_set()值為True,那麼event.wait 方法執行時便不再阻塞。
clear()方法:將is_set()的值設置為False
set()方法:將is_set()的值設置為True
紅綠燈實例:
from multiprocessing import Process, Event
import time
import random
# 亮燈和車行駛是異步的
# 亮燈的進程先開啟
# is_set()剛開始默認為False
# 先開始l綠燈亮,這個時候car的進程開啟,is_set()為True,可以過車
def light(e):
while 1:
if e.is_set():
e.clear()
print('\033[31m紅燈亮了\033[0m')
else:
e.set()
print('\033[32m綠燈亮了\033[0m')
time.sleep(2)
def cars(i, e):
if not e.is_set():
print('第%s輛車在等待' % i)
e.wait()
print('第%s輛車過去了' % i)
if __name__ == '__main__':
e = Event()
tr = Process(target=light, args=(e,)) # 創建一個進程控制紅綠燈
tr.start()
for i in range(50): # 創建50個進程代表50輛車
car = Process(target=cars, args=(i + 1, e))
car.start()
time.sleep(random.random())
閱讀更多 程序猿Monster 的文章