進程、線程和協程

在介紹進程、線程和協程時,先介紹多任務、併發與並行

多任務

  • 什麼是多任務
  • 操作系統能同時運行多個任務(代碼),就是多任務
  • 多任務的作用
  • 更充分利用 CPU
  • 程序運行效率提升
  • 提升用戶體驗
  • 併發與並行
  • 併發
  • 當任務數小於或者等於 CPU 核數時,每一個任務都有對應的 CPU 來處理執行,即任務真的是一起執行的
  • 並行
  • 當任務數大於 CPU 核數時,一個 CPU 需負責多個任務,通過操作系統的各種任務調度算法,實現用多個任務“一起”執行(實際上在某個時間點,還是隻有一個任務執行,只是切換任務的速度相當快,看上去一起執行而已
  • 知乎上對併發與並行比較通俗和好理解的解釋
  • 你吃飯吃到一半,電話來了,你一直到吃完了以後才去接,這就說明你不支持併發也不支持並行。
  • 你吃飯吃到一半,電話來了,你停了下來接了電話,接完後繼續吃飯,這說明你支持併發。
  • 你吃飯吃到一半,電話來了,你一邊打電話一邊吃飯,這說明你支持並行。
  • 併發的關鍵是你有處理多個任務的能力,不一定要同時。
  • 並行的關鍵是你有同時處理多個任務的能力。
  • 所以我認為它們最關鍵的點就是:是否是『同時』。
  • 進程
  • 一個程序運行起來後,代碼+用到的資源稱之為進程,進程是操作系統分配資源的單位,多進程能夠實現多任務
  • 進程的狀態
  • 工作中,任務數往往大於 CPU 的核數,即一定有一些任務正在執行,而另外一些任務在等待 CPU 進行執行,因此導致了有了不同的狀態
  • 就緒態:運行的條件都已經滿足,正在等待 CPU 執行
  • 執行態:CPU 正在執行其功能
  • 等待態:等待某些條件滿足,例如一個程序 sleep 了, 此時就處於等待態,等待消息回覆,等待同步鎖等都是出於等待態
  • 進程的創建(multiprocessing)
  • multiprocessing 是跨平臺版本的多進程模塊,提供了一個 Processing 類來創建進程對象
from multiprocessing import Process
import time

def run_proc():
"""子進程要執行的代碼"""
while True:
print("----2----")
time.sleep(1)

if __name__=='__main__':
p = Process(target=run_proc)
p.start()
while True:
print("----1----")
time.sleep(1)

Process 語法結構

Process([group [, target [, name [, args [, kwargs]]]]])
  • target:如果傳遞了函數的引用,可以認為這個子進程就執行這裡的代碼
  • args:給 target 指定的函數傳遞的參數,以元組的方式傳遞
  • kwargs:給 target 指定的函數傳遞命名參數
  • name:給進程設定一個名字,可以不設定
  • group:指定進程組,大多數情況下用不到
  • Process創建的實例對象的常用方法
  • start():啟動子進程實例(創建子進程)
  • is_alive():判斷進程子進程是否還在活著
  • join([timeout]):是否等待子進程執行結束,或等待多少秒
  • terminate():不管任務是否完成,立即終止子進程

注意:進程間不共享全局變量

進程間通信(Queue)

可以使用 multiprocessing 模塊的 Queue 實現多進程之間的數據傳遞,Queue 是一個消息的隊列

from multiprocessing import Queue
q=Queue(3) #初始化一個Queue對象,最多可接收三條put消息
q.put("消息1")
q.put("消息2")
print(q.full()) #False
q.put("消息3")
print(q.full()) #True

#因為消息列隊已滿下面的try都會拋出異常,第一個try會等待2秒後再拋出異常,第二個Try會立刻拋出異常
try:
q.put("消息4",True,2)
except:
print("消息列隊已滿,現有消息數量:%s"%q.qsize())

try:
q.put_nowait("消息4")
except:
print("消息列隊已滿,現有消息數量:%s"%q.qsize())

#推薦的方式,先判斷消息列隊是否已滿,再寫入
if not q.full():
q.put_nowait("消息4")

#讀取消息時,先判斷消息列隊是否為空,再讀取
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait())

初始化 Queue() 對象時(例如:q=Queue()), 若括號中沒有指定最大可接收的消息數量,或數量為負值,那麼就代表可接受的消息數量沒有上限(直到內存的盡頭)

進程池(Pool)

當需要創建的子進程數量不多時,可以直接利用multiprocessing中的Process動態成生多個進程,但如果是上百甚至上千個目標,手動的去創建進程的工作量巨大,此時就可以用到multiprocessing模塊提供的Pool方法。

初始化Pool時,可以指定一個最大進程數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會創建一個新的進程用來執行該請求;但如果池中的進程數已經達到指定的最大值,那麼該請求就會等待,直到池中有進程結束,才會用之前的進程來執行新的任務,請看下面的實例:

from multiprocessing import Pool
import os, time, random

def worker(msg):
t_start = time.time()
print("%s開始執行,進程號為%d" % (msg,os.getpid()))
# random.random()隨機生成0~1之間的浮點數
time.sleep(random.random()*2)
t_stop = time.time()
print(msg,"執行完畢,耗時%0.2f" % (t_stop-t_start))

po = Pool(3) # 定義一個進程池,最大進程數3
for i in range(0,10):
# Pool().apply_async(要調用的目標,(傳遞給目標的參數元祖,))
# 每次循環將會用空閒出來的子進程去調用目標

po.apply_async(worker,(i,))

print("----start----")
po.close() # 關閉進程池,關閉後po不再接收新的請求
po.join() # 等待po中所有子進程執行完成,必須放在close語句之後
print("-----end-----")

進程池中的 Queue

如果要使用Pool創建進程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否則會得到一條如下的錯誤信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

線程

  • 一個程序運行起來至少有一個進程,一個進程至少有一個線程
  • 處理器 CPU 分配給線程,即 CPU 真正運行的是線程中的代碼
  • 分配 CPU 給線程時,是通過時間片輪訓方式進行的
  • 進程是操作系統分配程序執行資源的單位,而線程是進程的一個實體,
  • 是 CPU 調度和分配的單位。
  • python 實現線程有兩種方式
  • 創建函數並且傳入Thread 對象中
import threading
import time

def download_music():
"""模擬下載歌曲,需要5秒鐘下載完成"""
for i in range(5):
time.sleep(1) # 休眠1秒
print("---正在下載歌曲%d---" % i)

def play_music():
"""模擬播放歌曲,需要5秒鐘下載完成"""
for i in range(5):
time.sleep(1) # 休眠1秒
print("---正在播放歌曲%d---" % i)

def main():
# 創建線程對象t1
# target: 指向新開啟的線程要執行的代碼
t1 = threading.Thread(target=download_music)
t2 = threading.Thread(target=play_music)

t1.start() # 啟動線程,既然線程開始執行
t2.start()

if __name__ == '__main__':
main()
  1. 繼承 Thread 類,創建一個新的 class ,將要執行的代碼 寫到 run 函數里面
import threading
import time

# 自定義類,繼承threading.Thread
class MyThread(threading.Thread):
def run(self):
for i in range(5):
time.sleep(1)
# name屬性中保存的是當前線程的名字
msg = "I'm " + self.name + ' @ ' + str(i)
print(msg)

if __name__ == '__main__':
# 通過MyThread創建線程對象
t1 = MyThread()

# 開始執行線程
t1.start()

注意:

  • start() 方法是啟動一個子線程,線程名就是我們定義的name
  • run() 方法並不啟動一個新線程,只是在主線程中調用了一個普通函數而已。
  • 因此,如果你想啟動多線程,就必須使用 start() 方法。
  • python的threading.Thread類有一個run方法,用於定義線程的功能函數,可以在自己的線程類中覆蓋該方法。而創建自己的線程實例後,通過Thread類的start方法,可以啟動該線程,當該線程獲得執行的機會時,就會調用run方法執行線程。
  • 線程何時開啟,何時結束
  • 當調用thread.start()時 開啟線程,再運行線程的代碼
  • 子線程把target指向的函數中的語句執行完畢後,或者線程中的run函數代碼執行完畢後,立即結束當前子線程
  • 通過threading.enumerate()可枚舉當前運行的所有線程
  • 所有子線程執行完畢後,主線程才結束

多線程的執行順序是無序的

多線程共享全局變量

  • 在一個進程內的所有線程共享全局變量,很方便在多個線程間共享數據
  • 缺點就是,多線程對全局變量隨意遂改可能造成全局變量的混亂(即線程非安全)
  • 多線程開發可能遇到的問題
  • 假設兩個線程t1和t2都要對全局變量g_num(默認是0)進行加1運算,t1和t2都各對g_num加10次,g_num的最終的結果應該為20。

但是由於是多線程同時操作,有可能出現下面情況:

  • 在g_num=0時,t1取得g_num=0。此時系統把t1調度為”sleeping”狀態,此時g_num並沒有加1並賦值給g_num,把t2轉換為”running”狀態,t2也獲得g_num=0
  • 然後t2對得到的值進行加1並賦給g_num,使得g_num=1
  • 然後系統又把t2調度為”sleeping”,把t1轉為”running”。線程t1又把它之前得到的0加1後賦值給g_num。
  • 這樣導致雖然t1和t2都對g_num加1,但結果仍然是g_num=1
  • 如果多個線程同時對同一個全局變量操作,會出現資源競爭問題,從而數據結果會不正確,即會遇到線程安全問題
  • 使用同步機制解決線程安全問題
  • 同步就是協同步調,按預定的先後次序進行運行
  • 當多個線程幾乎同時修改某一個共享數據的時候,需要進行同步控制
  • 線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。
  • 某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成“非鎖定”,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。

threading模塊中定義了Lock類,可以方便的處理鎖定:

# 創建鎖
mutex = threading.Lock()

# 鎖定
mutex.acquire()

# 釋放
mutex.release()
  • 如果這個鎖之前是沒有上鎖的,那麼acquire不會堵塞
  • 如果在調用acquire對這個鎖上鎖之前 它已經被 其他線程上了鎖,那麼此時acquire會堵塞,直到這個鎖被解鎖為止
  • 上鎖解鎖過程
  • 當一個線程調用鎖的acquire()方法獲得鎖時,鎖就進入“locked”狀態。
  • 每次只有一個線程可以獲得鎖。如果此時另一個線程試圖獲得這個鎖,該線程就會變為“blocked”狀態,稱為“阻塞”,直到擁有鎖的線程調用鎖的release()方法釋放鎖之後,鎖進入“unlocked”狀態。
  • 線程調度程序從處於同步阻塞狀態的線程中選擇一個來獲得鎖,並使得該線程進入運行(running)狀態。
  • 協程
  • 協程是python箇中另外一種實現多任務的方式,只不過比線程更小佔用更小執行單元(理解為需要的資源)。
  • 通俗的理解:在一個線程中的某個函數,可以在任何地方保存當前函數的一些臨時變量等信息,然後切換到另外一個函數中執行,注意不是通過調用函數的方式做到的,並且切換的次數以及什麼時候再切換到原來的函數都由開發者自己確定

簡單實現協程(yield)

import time

def work1():
while True:
print("----work1---")
yield
time.sleep(0.5)

def work2():
while True:
print("----work2---")
yield
time.sleep(0.5)

def main():
w1 = work1()
w2 = work2()
while True:
next(w1)
next(w2)

if __name__ == "__main__":
main()

通過 gevent 實現協程

from gevent import monkey
import gevent
import random
import time

# 有耗時操作時需要
monkey.patch_all() # 將程序中耗時操作的代碼,換為gevent中自己實現的模塊

def coroutine_work(coroutine_name):
for i in range(10):
print(coroutine_name, i)
time.sleep(random.random())

gevent.joinall([
gevent.spawn(coroutine_work, "work1"),
gevent.spawn(coroutine_work, "work2")
])

進程與線程對比

功能

  • 進程,能夠完成多任務,比如運行的QQ再單獨開一個進程接收推送的消息
  • 線程,能夠完成多任務,比如運行的QQ開多個線程來發送消息、接收文件、視頻聊天等多個任務

定義的不同

  • 進程是操作系統進行資源分配和調度的一個基本單位.
  • 線程是進程的一個實體,是CPU調度和分派的基本單位,它是比進程更小的能獨立運行的基本單位.線程自己基本上不擁有系統資源,但是它可與同屬一個進程的其他的線程共享進程所擁有的全部資源.

區別

  • 一個程序至少有一個進程,一個進程至少有一個線程.
  • 線程的劃分尺度小於進程(資源比進程少),使得多線程程序的併發性高。
  • 進程在執行過程中擁有獨立的內存單元,而多個線程共享內存,從而極大地提高了程序的運行效率
  • 線程不能夠獨立執行,必須依存在進程中
  • 可以將進程理解為工廠中的一條流水線,而其中的線程就是這個流水線上的工人

優缺點

線程和進程在使用上各有優缺點:線程執行開銷小,但不利於資源的管理和保護;而進程正相反。

協程和線程差異

在實現多任務時, 線程切換從系統層面遠不止保存和恢復 CPU 上下文這麼簡單。 操作系統為了程序運行的高效性每個線程都有自己緩存 Cache 等等數據,操作系統還會幫你做這些數據的恢復操作。 所以線程的切換非常耗性能。但是協程的切換隻是單純的操作 CPU 的上下文,所以一秒鐘切換個上百萬次系統都抗的住。

進程、線程和協程的區別

  • 進程是操作系統資源分配的單位
  • 線程是 CPU 調度的單位
  • 進程切換需要的資源最大,效率很低
  • 線程切換需要的資源一般,效率一般(當然在不考慮 GIL 的情況下)
  • 協程切換任務資源很小,效率高
  • 多進程、多線程根據 CPU 核數不一樣可能是並行的,但是協程是在一個線程中,所以是併發


分享到:


相關文章: