What is a Thread?
What is a Process?
進程與線程的區別?
- 線程是執行的指令集,進程是資源的集合
- 線程的啟動速度要比進程的啟動速度要快
- 兩個線程的執行速度是一樣的
- 進程與線程的運行速度是沒有可比性的
- 線程共享創建它的進程的內存空間,進程的內存是獨立的。
- 兩個線程共享的數據都是同一份數據,兩個子進程的數據不是共享的,而且數據是獨立的;
- 同一個進程的線程之間可以直接交流,同一個主進程的多個子進程之間是不可以進行交流,如果兩個進程之間需要通信,就必須要通過一箇中間代理來實現;
- 一個新的線程很容易被創建,一個新的進程創建需要對父進程進行一次克隆
- 一個線程可以控制和操作同一個進程裡的其他線程,線程與線程之間沒有隸屬關係,但是進程只能操作子進程
- 改變主線程,有可能會影響到其他線程的行為,但是對於父進程的修改是不會影響子進程;
一個多併發的小腳本
- import threading
- import time
- def Princ(String):
- print('task', String)
- time.sleep(5)
- # target=目標函數, args=傳入的參數
- t1 = threading.Thread(target=Princ, args=('t1',))
- t1.start()
- t2 = threading.Thread(target=Princ, args=('t1',))
- t2.start()
- t3 = threading.Thread(target=Princ, args=('t1',))
- t3.start()
參考文檔
進程與線程的一個簡單解釋
http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html
Linux進程與線程的區別
https://my.oschina.net/cnyinlinux/blog/422207
多線程
多線程在Python內實則就是一個假象,為什麼這麼說呢,因為CPU的處理速度是很快的,所以我們看起來以一個線程在執行多個任務,每個任務的執行速度是非常之快的,利用上下文切換來快速的切換任務,以至於我們根本感覺不到。
但是頻繁的使用上下文切換也是要耗費一定的資源,因為單線程在每次切換任務的時候需要保存當前任務的上下文。
什麼時候用到多線程?
首先IO操作是不佔用CPU的,只有計算的時候才會佔用CPU(譬如1+1=2),Python中的多線程不適合CPU密集型的任務,適合IO密集型的任務(sockt server)。
啟動多個線程
主進程在啟動之後會啟動一個主線程,下面的腳本中讓主線程啟動了多個子線程,然而啟動的子線程是獨立的,所以主線程不會等待子線程執行完畢,而是主線程繼續往下執行,並行執行。
- for i in range(50):
- t = threading.Thread(target=Princ, args=('t-%s' % (i),))
- t.start()
join()
join()方法可以讓程序等待每一個線程之後完成之後再往下執行,又成為串行執行。
- import threading
- import time
- def Princ(String):
- print('task', String)
- time.sleep(1)
- for i in range(50):
- t = threading.Thread(target=Princ, args=('t-%s' % (i),))
- t.start()
- # 當前線程執行完畢之後在執行後面的線程
- t.join()
讓主線程阻塞,子現在並行執行
- import threading
- import time
- def Princ(String):
- print('task', String)
- time.sleep(2)
- # 執行子線程的時間
- start_time = time.time()
- # 存放線程的實例
- t_objs = []
- for i in range(50):
- t = threading.Thread(target=Princ, args=('t-%s' % (i),))
- t.start()
- # 為了不讓後面的子線程阻塞,把當前的子線程放入到一個列表中
- t_objs.append(t)
- # 循環所有子線程實例,等待所有子線程執行完畢
- for t in t_objs:
- t.join()
- # 當前時間減去開始時間就等於執行的過程中需要的時間
- print(time.time() - start_time)
查看主線程與子線程
- import threading
- class MyThreading(threading.Thread):
- def __init__(self):
- super(MyThreading, self).__init__()
- def run(self):
- print('我是子線程: ', threading.current_thread())
- t = MyThreading()
- t.start()
- print('我是主線程: ', threading.current_thread())
輸出如下:
- C:\Python\Python35\python.exe E:/MyCodeProjects/進程與線程/s3.py
- 我是子線程:
- 我是主線程: <_mainthread started=""/>
- Process finished with exit code 0
查看當前進程的活動線程個數
- import threading
- class MyThreading(threading.Thread):
- def __init__(self):
- super(MyThreading, self).__init__()
- def run(self):
- print('www.anshengme.com')
- t = MyThreading()
- t.start()
- print('線程個數: ', threading.active_count())
輸出如下:
- C:\Python\Python35\python.exe E:/MyCodeProjects/進程與線程/s3.py
- www.anshengme.com
- # 一個主線程和一個子線程
- 線程個數: 2
- Process finished with exit code 0
Event
Event是線程間通信最間的機制之一:一個線程發送一個event信號,其他的線程則等待這個信號。用於主線程控制其他線程的執行。 Events 管理一個flag,這個flag可以使用set
()設置成True或者使用clear()重置為False,wait()則用於阻塞,在flag為True之前。flag默認為False。
選項描述Event.wait([timeout])堵塞線程,直到Event對象內部標識位被設為True或超時(如果提供了參數timeout)Event.set()將標識位設為TureEvent.clear()將標識伴設為FalseEvent.isSet()判斷標識位是否為Ture
- #!/use/bin/env python
- # _*_ coding: utf-8- _*_
- import threading
- def runthreading(event):
- print("Start...")
- event.wait()
- print("End...")
- event_obj = threading.Event()
- for n in range(10):
- t = threading.Thread(target=runthreading, args=(event_obj,))
- t.start()
- event_obj.clear()
- inp = input("True/False?>> ")
- if inp == "True":
- event_obj.set()
- `
守護進程(守護線程)
一個主進程可以啟動多個守護進程,但是主進程必須要一直運行,如果主進程掛掉了,那麼守護進程也會隨之掛掉
程序會等待主線程(進程)執行完畢,但是不會等待守護進程(線程)
- import threading
- import time
- def Princ(String):
- print('task', String)
- time.sleep(2)
- for i in range(50):
- t = threading.Thread(target=Princ, args=('t-%s' % (i),))
- t.setDaemon(True) # 把當前線程設置為守護線程,要在start之前設置
- t.start()
場景預設: 比如現在有一個FTP服務,每一個用戶連接上去的時候都會創建一個守護線程,現在已經有300個用戶連接上去了,就是說已經創建了300個守護線程,但是突然之間FTP服務宕掉了,這個時候就不會等待守護線程執行完畢再退出,而是直接退出,如果是普通的線程,那麼就會登臺線程執行完畢再退出。
- #!/use/bin/env python
- # _*_ coding:utf-8 _*_
- from multiprocessing import Process
- import time
- def runprocess(arg):
- print(arg)
- time.sleep(2)
- p = Process(target=runprocess, args=(11,))
- p.daemon=True
- p.start()
- print("end")
線程之間的數據交互與鎖(互斥鎖)
python2.x需要加鎖,但是在python3.x上面就不需要了
- # _*_ coding:utf-8 _*_
- import threading
- def Princ():
- # 獲取鎖
- lock.acquire()
- # 在函數內可以直接修改全局變量
- global number
- number += 1
- # 為了避免讓程序出現串行,不能加sleep
- # time.sleep(1)
- # 釋放鎖
- lock.release()
- # 鎖
- lock = threading.Lock()
- # 主線程的number
- number = 0
- t_objs = []
- for i in range(100):
- t = threading.Thread(target=Princ)
- t.start()
- t_objs.append(t)
- for t in t_objs:
- t.join()
- print('Number:', number)
遞歸鎖(Lock/RLock)
- import threading
- def run1():
- print("grab the first part data")
- lock.acquire()
- global num
- num += 1
- lock.release()
- return num
- def run2():
- print("grab the second part data")
- lock.acquire()
- global num2
- num2 += 1
- lock.release()
- return num2
- def run3():
- lock.acquire()
- res = run1()
- print('--------between run1 and run2-----')
- res2 = run2()
- lock.release()
- print(res, res2)
- t_objs = []
- if __name__ == '__main__':
- num, num2 = 0, 0
- lock = threading.RLock() # RLock()類似創建了一個字典,每次退出的時候找到字典的值進行退出
- # lock = threading.Lock() # Lock()會阻塞在這兒
- for i in range(10):
- t = threading.Thread(target=run3)
- t.start()
- t_objs.append(t)
- for t in t_objs:
- t.join()
- print(num, num2)
信號量(Semaphore)
互斥鎖同時只允許一個線程更改數據,而Semaphore是同時允許一定數量的線程更改數據
- import threading
- import time
- def run(n):
- semaphore.acquire() # 獲取信號,信號可以有多把鎖
- time.sleep(1) # 等待一秒鐘
- print("run the thread: %s\n" % n)
- semaphore.release() # 釋放信號
- t_objs = []
- if __name__ == '__main__':
- semaphore = threading.BoundedSemaphore(5) # 聲明一個信號量,最多允許5個線程同時運行
- for i in range(20): # 運行20個線程
- t = threading.Thread(target=run, args=(i,)) # 創建線程
- t.start() # 啟動線程
- t_objs.append(t)
- for t in t_objs:
- t.join()
- print('>>>>>>>>>>>>>')
以上代碼中,類似與創建了一個隊列,最多放5個任務,每執行完成一個任務就會往後面增加一個任務。
多進程
多進程的資源是獨立的,不可以互相訪問。
啟動一個進程
- from multiprocessing import Process
- import time
- def f(name):
- time.sleep(2)
- print('hello', name)
- if __name__ == '__main__':
- # 創建一個進程
- p = Process(target=f, args=('bob',))
- # 啟動
- p.start()
- # 等待進程執行完畢
- p.join(
在進程內啟動一個線程
- from multiprocessing import Process
- import threading
- def Thread(String):
- print(String)
- def Proces(String):
- print('hello', String)
- t = threading.Thread(target=Thread, args=('Thread %s' % (String),)) # 創建一個線程
- t.start() # 啟動它
- if __name__ == '__main__':
- p = Process(target=Proces, args=('World',)) # 創建一個進程
- p.start() # 啟動
- p.join() # 等待進程執行完畢
啟動一個多進程
- from multiprocessing import Process
- import time
- def f(name):
- time.sleep(2)
- print('hello', name)
- if __name__ == '__main__':
- for n in range(10): # 創建一個進程
- p = Process(target=f, args=('bob %s' % (n),))
- # 啟動
- p.start()
- # 等待進程執行完畢
獲取啟動進程的PID
- # _*_ coding:utf-8 _*_
- from multiprocessing import Process
- import os
- def info(String):
- print(String)
- print('module name:', __name__)
- print('父進程的PID:', os.getppid())
- print('子進程的PID:', os.getpid())
- print("\n")
- def ChildProcess():
- info('\033[31;1mChildProcess\033[0m')
- if __name__ == '__main__':
- info('\033[32;1mTheParentProcess\033[0m')
- p = Process(target=ChildProcess)
- p.start()
輸出結果
- C:\Python\Python35\python.exe E:/MyCodeProjects/多進程/s1.py
- TheParentProcess
- module name: __main__
- # Pycharm的PID
- 父進程的PID: 6888
- # 啟動的腳本PID
- 子進程的PID: 4660
- ChildProcess
- module name: __mp_main__
- # 腳本的PID
- 父進程的PID: 4660
- # 父進程啟動的子進程PID
- 子進程的PID: 8452
- Process finished with exit code 0
進程間通信
默認情況下進程與進程之間是不可以互相通信的,若要實現互相通信則需要一箇中間件,另個進程之間通過中間件來實現通信,下面是進程間通信的幾種方式。
進程Queue
- # _*_ coding:utf-8 _*_
- from multiprocessing import Process, Queue
- def ChildProcess(Q):
- Q.put(['Hello', None, 'World']) # 在Queue裡面上傳一個列表
- if __name__ == '__main__':
- q = Queue() # 創建一個Queue
- p = Process(target=ChildProcess, args=(q,)) # 創建一個子進程,並把Queue傳給子進程,相當於克隆了一份Queue
- p.start() # 啟動子進程
- print(q.get()) # 獲取q中的數據
- p.join()
管道(Pipes)
- # _*_ coding:utf-8 _*_
- from multiprocessing import Process, Pipe
- def ChildProcess(conn):
- conn.send(['Hello', None, 'World']) # 寫一段數據
- conn.close() # 關閉
- if __name__ == '__main__':
- parent_conn, child_conn = Pipe() # 生成一個管道實例,parent_conn, child_conn管道的兩頭
- p = Process(target=ChildProcess, args=(child_conn,))
- p.start()
- print(parent_conn.recv()) # 收取消息
- p.join()
數據共享(Managers)
- # _*_ coding:utf-8 _*_
- # _*_ coding:utf-8 _*_
- from multiprocessing import Process, Manager
- import os
- def ChildProcess(Dict, List):
- Dict['k1'] = 'v1'
- Dict['k2'] = 'v2'
- List.append(os.getpid()) # 獲取子進程的PID
- print(List) # 輸出列表中的內容
- if __name__ == '__main__':
- manager = Manager() # 生成Manager對象
- Dict = manager.dict() # 生成一個可以在多個進程之間傳遞共享的字典
- List = manager.list() # 生成一個字典
- ProcessList = [] # 創建一個空列表,存放進程的對象,等待子進程執行用於
- for i in range(10): # 生成是個子進程
- p = Process(target=ChildProcess, args=(Dict, List)) # 創建一個子進程
- p.start() # 啟動
- ProcessList.append(p) # 把子進程添加到p_list列表中
- for res in ProcessList: # 循環所有的子進程
- res.join() # 等待執行完畢
- print('\n')
- print(Dict)
- print(List)
輸出結果
- C:\Python\Python35\python.exe E:/MyCodeProjects/多進程/s4.py
- [5112]
- [5112, 3448]
- [5112, 3448, 4584]
- [5112, 3448, 4584, 2128]
- [5112, 3448, 4584, 2128, 11124]
- [5112, 3448, 4584, 2128, 11124, 10628]
- [5112, 3448, 4584, 2128, 11124, 10628, 5512]
- [5112, 3448, 4584, 2128, 11124, 10628, 5512, 10460]
- [5112, 3448, 4584, 2128, 11124, 10628, 5512, 10460, 10484]
- [5112, 3448, 4584, 2128, 11124, 10628, 5512, 10460, 10484, 6804]
- {'k1': 'v1', 'k2': 'v2'}
- [5112, 3448, 4584, 2128, 11124, 10628, 5512, 10460, 10484, 6804]
- Process finished with exit code 0
鎖(Lock)
- from multiprocessing import Process, Lock
- def ChildProcess(l, i):
- l.acquire() # 獲取鎖
- print('hello world', i)
- l.release() # 釋放鎖
- if __name__ == '__main__':
- lock = Lock() # 生成Lock對象
- for num in range(10):
- Process(target=ChildProcess, args=(lock, num)).start() # 創建並啟動一個子進程
進程池
同一時間啟動多少個進程
- #!/use/bin/env python
- # _*_ coding: utf-8 _*_
- from multiprocessing import Pool
- import time
- def myFun(i):
- time.sleep(2)
- return i+100
- def end_call(arg):
- print("end_call>>", arg)
- p = Pool(5) # 允許進程池內同時放入5個進程
- for i in range(10):
- p.apply_async(func=myFun, args=(i,),callback=end_call) # # 平行執行,callback是主進程來調用
- # p.apply(func=Foo) # 串行執行
- print("end")
- p.close()
- p.join() # 進程池中進程執行完畢後再關閉,如果註釋,那麼程序直接關閉。
線程池
簡單實現
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import threading
- import queue
- import time
- class MyThread:
- def __init__(self,max_num=10):
- self.queue = queue.Queue()
- for n in range(max_num):
- self.queue.put(threading.Thread)
- def get_thread(self):
- return self.queue.get()
- def put_thread(self):
- self.queue.put(threading.Thread)
- pool = MyThread(5)
- def RunThread(arg,pool):
- print(arg)
- time.sleep(2)
- pool.put_thread()
- for n in range(30):
- thread = pool.get_thread()
- t = thread(target=RunThread, args=(n,pool,))
- t.start()
複雜版本
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import queue
- import threading
- import contextlib
- import time
- StopEvent = object()
- class ThreadPool(object):
- def __init__(self, max_num, max_task_num = None):
- if max_task_num:
- self.q = queue.Queue(max_task_num)
- else:
- self.q = queue.Queue()
- self.max_num = max_num
- self.cancel = False
- self.terminal = False
- self.generate_list = []
- self.free_list = []
- def run(self, func, args, callback=None):
- """
- 線程池執行一個任務
- :param func: 任務函數
- :param args: 任務函數所需參數
- :param callback: 任務執行失敗或成功後執行的回調函數,回調函數有兩個參數1、任務函數執行狀態;2、任務函數返回值(默認為None,即:不執行回調函數)
- :return: 如果線程池已經終止,則返回True否則None
- """
- if self.cancel:
- return
- if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
- self.generate_thread()
- w = (func, args, callback,)
- self.q.put(w)
- def generate_thread(self):
- """
- 創建一個線程
- """
- t = threading.Thread(target=self.call)
- t.start()
- def call(self):
- """
- 循環去獲取任務函數並執行任務函數
- """
- current_thread = threading.currentThread()
- self.generate_list.append(current_thread)
- event = self.q.get()
- while event != StopEvent:
- func, arguments, callback = event
- try:
- result = func(*arguments)
- success = True
- except Exception as e:
- success = False
- result = None
- if callback is not None:
- try:
- callback(success, result)
- except Exception as e:
- pass
- with self.worker_state(self.free_list, current_thread):
- if self.terminal:
- event = StopEvent
- else:
- event = self.q.get()
- else:
- self.generate_list.remove(current_thread)
- def close(self):
- """
- 執行完所有的任務後,所有線程停止
- """
- self.cancel = True
- full_size = len(self.generate_list)
- while full_size:
- self.q.put(StopEvent)
- full_size -= 1
- def terminate(self):
- """
- 無論是否還有任務,終止線程
- """
- self.terminal = True
- while self.generate_list:
- self.q.put(StopEvent)
- self.q.queue.clear()
- @contextlib.contextmanager
- def worker_state(self, state_list, worker_thread):
- """
- 用於記錄線程中正在等待的線程數
- """
- state_list.append(worker_thread)
- try:
- yield
- finally:
- state_list.remove(worker_thread)
- # How to use
- pool = ThreadPool(5)
- def callback(status, result):
- # status, execute action status
- # result, execute action return value
- pass
- def action(i):
- print(i)
- for i in range(30):
- ret = pool.run(action, (i,), callback)
- time.sleep(5)
- print(len(pool.generate_list), len(pool.free_list))
- print(len(pool.generate_list), len(pool.free_list))
- pool.close()
- pool.terminate()
什麼是IO密集型和CPU密集型?
IO密集型(I/O bound)
頻繁網絡傳輸、讀取硬盤及其他IO設備稱之為IO密集型,最簡單的就是硬盤存取數據,IO操作並不會涉及到CPU。
計算密集型(CPU bound)
程序大部分在做計算、邏輯判斷、循環導致cpu佔用率很高的情況,稱之為計算密集型,比如說python程序中執行了一段代碼1+1,這就是在計算1+1的值
What is the association?
協程的優缺點:
優點
- 無需線程上下文切換的開銷
- 無需原子操作鎖定及同步的開銷(更改一個變量)
- 方便切換控制流,簡化編程模型
- 高併發+高擴展性+低成本:一個CPU支持上萬的協程都不是問題。所以很適合用於高併發處理。
缺點:
- 無法利用多核資源:協程的本質是個單線程,它不能多核,協程需要和進程配合才能運行在多CPU上,當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是CPU密集型應用。
- 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程序
實現協程實例
yield
- def consumer(name):
- print("--->starting eating baozi...")
- while True:
- new_baozi = yield # 直接返回
- print("[%s] is eating baozi %s" % (name, new_baozi))
- def producer():
- r = con.__next__()
- r = con2.__next__()
- n = 0
- while n < 5:
- n += 1
- con.send(n) # 喚醒生成器的同時傳入一個參數
- con2.send(n)
- print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
- if __name__ == '__main__':
- con = consumer("c1")
- con2 = consumer("c2")
- p = producer()
Greenlet
安裝greenlet
- pip3 install greenlet
- # -*- coding:utf-8 -*-
- from greenlet import greenlet
- def func1():
- print(12)
- gr2.switch()
- print(34)
- gr2.switch()
- def func2():
- print(56)
- gr1.switch()
- print(78)
- # 創建兩個攜程
- gr1 = greenlet(func1)
- gr2 = greenlet(func2)
- gr1.switch() # 手動切換
Gevent
Gevent可以實現併發同步或異步編程,在gevent中用到的主要模式是Greenlet, 它是以C擴展模塊形式接入Python的輕量級協程,Greenlet全部運行在主程序操作系統進程的內部,但它們被協作式地調度。
安裝Gevent
- pip3 install gevent
- import gevent
- def foo():
- print('Running in foo')
- gevent.sleep(2)
- print('Explicit context switch to foo again')
- def bar():
- print('Explicit context to bar')
- gevent.sleep(3)
- print('Implicit context switch back to bar')
- # 自動切換
- gevent.joinall([
- gevent.spawn(foo), # 啟動一個協程
- gevent.spawn(bar),
- ])
頁面抓取
- from urllib import request
- from gevent import monkey
- import gevent
- import time
- monkey.patch_all() # 當前程序中只要設置到IO操作的都做上標記
- def wget(url):
- print('GET: %s' % url)
- resp = request.urlopen(url)
- data = resp.read()
- print('%d bytes received from %s.' % (len(data), url))
- urls = [
- 'https://www.python.org/',
- 'https://www.python.org/',
- 'https://github.com/',
- 'https://yw666.blog.51cto.com/',
- ]
- # 串行抓取
- start_time = time.time()
- for n in urls:
- wget(n)
- print("串行抓取使用時間:", time.time() - start_time)
- # 並行抓取
- ctrip_time = time.time()
- gevent.joinall([
- gevent.spawn(wget, 'https://www.python.org/'),
- gevent.spawn(wget, 'https://www.python.org/'),
- gevent.spawn(wget, 'https://github.com/'),
- gevent.spawn(wget, 'https://yw666.blog.51cto.com/'),
- ])
- print("並行抓取使用時間:", time.time() - ctrip_time)
輸出
- C:\Python\Python35\python.exe E:/MyCodeProjects/協程/s4.py
- GET: https://www.python.org/
- 47424 bytes received from https://www.python.org/.
- GET: https://www.python.org/
- 47424 bytes received from https://www.python.org/.
- GET: https://github.com/
- 25735 bytes received from https://github.com/.
- GET: https://blog.ansheng.me/
- 82693 bytes received from https://yw666.blog.51cto.com/.
- 串行抓取使用時間: 15.143015384674072
- GET: https://www.python.org/
- GET: https://www.python.org/
- GET: https://github.com/
- GET: https://blog.ansheng.me/
- 25736 bytes received from https://github.com/.
- 47424 bytes received from https://www.python.org/.
- 82693 bytes received from https://yw666.blog.51cto.com/.
- 47424 bytes received from https://www.python.org/.
- 並行抓取使用時間: 3.781306266784668
- Process finished with exit code 0
閱讀更多 繁華落盡and曲終人散 的文章