进程、线程和协程

在介绍进程、线程和协程时,先介绍多任务、并发与并行

多任务

  • 什么是多任务
  • 操作系统能同时运行多个任务(代码),就是多任务
  • 多任务的作用
  • 更充分利用 CPU
  • 程序运行效率提升
  • 提升用户体验
  • 并发与并行
  • 并发
  • 当任务数小于或者等于 CPU 核数时,每一个任务都有对应的 CPU 来处理执行,即任务真的是一起执行的
  • 并行
  • 当任务数大于 CPU 核数时,一个 CPU 需负责多个任务,通过操作系统的各种任务调度算法,实现用多个任务“一起”执行(实际上在某个时间点,还是只有一个任务执行,只是切换任务的速度相当快,看上去一起执行而已
  • 知乎上对并发与并行比较通俗和好理解的解释
  • 你吃饭吃到一半,电话来了,你一直到吃完了以后才去接,这就说明你不支持并发也不支持并行。
  • 你吃饭吃到一半,电话来了,你停了下来接了电话,接完后继续吃饭,这说明你支持并发。
  • 你吃饭吃到一半,电话来了,你一边打电话一边吃饭,这说明你支持并行。
  • 并发的关键是你有处理多个任务的能力,不一定要同时。
  • 并行的关键是你有同时处理多个任务的能力。
  • 所以我认为它们最关键的点就是:是否是『同时』。
  • 进程
  • 一个程序运行起来后,代码+用到的资源称之为进程,进程是操作系统分配资源的单位,多进程能够实现多任务
  • 进程的状态
  • 工作中,任务数往往大于 CPU 的核数,即一定有一些任务正在执行,而另外一些任务在等待 CPU 进行执行,因此导致了有了不同的状态
  • 就绪态:运行的条件都已经满足,正在等待 CPU 执行
  • 执行态:CPU 正在执行其功能
  • 等待态:等待某些条件满足,例如一个程序 sleep 了, 此时就处于等待态,等待消息回复,等待同步锁等都是出于等待态
  • 进程的创建(multiprocessing)
  • multiprocessing 是跨平台版本的多进程模块,提供了一个 Processing 类来创建进程对象
from multiprocessing import Process
import time

def run_proc():
"""子进程要执行的代码"""
while True:
print("----2----")
time.sleep(1)

if __name__=='__main__':
p = Process(target=run_proc)
p.start()
while True:
print("----1----")
time.sleep(1)

Process 语法结构

Process([group [, target [, name [, args [, kwargs]]]]])
  • target:如果传递了函数的引用,可以认为这个子进程就执行这里的代码
  • args:给 target 指定的函数传递的参数,以元组的方式传递
  • kwargs:给 target 指定的函数传递命名参数
  • name:给进程设定一个名字,可以不设定
  • group:指定进程组,大多数情况下用不到
  • Process创建的实例对象的常用方法
  • start():启动子进程实例(创建子进程)
  • is_alive():判断进程子进程是否还在活着
  • join([timeout]):是否等待子进程执行结束,或等待多少秒
  • terminate():不管任务是否完成,立即终止子进程

注意:进程间不共享全局变量

进程间通信(Queue)

可以使用 multiprocessing 模块的 Queue 实现多进程之间的数据传递,Queue 是一个消息的队列

from multiprocessing import Queue
q=Queue(3) #初始化一个Queue对象,最多可接收三条put消息
q.put("消息1")
q.put("消息2")
print(q.full()) #False
q.put("消息3")
print(q.full()) #True

#因为消息列队已满下面的try都会抛出异常,第一个try会等待2秒后再抛出异常,第二个Try会立刻抛出异常
try:
q.put("消息4",True,2)
except:
print("消息列队已满,现有消息数量:%s"%q.qsize())

try:
q.put_nowait("消息4")
except:
print("消息列队已满,现有消息数量:%s"%q.qsize())

#推荐的方式,先判断消息列队是否已满,再写入
if not q.full():
q.put_nowait("消息4")

#读取消息时,先判断消息列队是否为空,再读取
if not q.empty():
for i in range(q.qsize()):
print(q.get_nowait())

初始化 Queue() 对象时(例如:q=Queue()), 若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头)

进程池(Pool)

当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。

初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务,请看下面的实例:

from multiprocessing import Pool
import os, time, random

def worker(msg):
t_start = time.time()
print("%s开始执行,进程号为%d" % (msg,os.getpid()))
# random.random()随机生成0~1之间的浮点数
time.sleep(random.random()*2)
t_stop = time.time()
print(msg,"执行完毕,耗时%0.2f" % (t_stop-t_start))

po = Pool(3) # 定义一个进程池,最大进程数3
for i in range(0,10):
# Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程去调用目标

po.apply_async(worker,(i,))

print("----start----")
po.close() # 关闭进程池,关闭后po不再接收新的请求
po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
print("-----end-----")

进程池中的 Queue

如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue(),否则会得到一条如下的错误信息:

RuntimeError: Queue objects should only be shared between processes through inheritance.

线程

  • 一个程序运行起来至少有一个进程,一个进程至少有一个线程
  • 处理器 CPU 分配给线程,即 CPU 真正运行的是线程中的代码
  • 分配 CPU 给线程时,是通过时间片轮训方式进行的
  • 进程是操作系统分配程序执行资源的单位,而线程是进程的一个实体,
  • 是 CPU 调度和分配的单位。
  • python 实现线程有两种方式
  • 创建函数并且传入Thread 对象中
import threading
import time

def download_music():
"""模拟下载歌曲,需要5秒钟下载完成"""
for i in range(5):
time.sleep(1) # 休眠1秒
print("---正在下载歌曲%d---" % i)

def play_music():
"""模拟播放歌曲,需要5秒钟下载完成"""
for i in range(5):
time.sleep(1) # 休眠1秒
print("---正在播放歌曲%d---" % i)

def main():
# 创建线程对象t1
# target: 指向新开启的线程要执行的代码
t1 = threading.Thread(target=download_music)
t2 = threading.Thread(target=play_music)

t1.start() # 启动线程,既然线程开始执行
t2.start()

if __name__ == '__main__':
main()
  1. 继承 Thread 类,创建一个新的 class ,将要执行的代码 写到 run 函数里面
import threading
import time

# 自定义类,继承threading.Thread
class MyThread(threading.Thread):
def run(self):
for i in range(5):
time.sleep(1)
# name属性中保存的是当前线程的名字
msg = "I'm " + self.name + ' @ ' + str(i)
print(msg)

if __name__ == '__main__':
# 通过MyThread创建线程对象
t1 = MyThread()

# 开始执行线程
t1.start()

注意:

  • start() 方法是启动一个子线程,线程名就是我们定义的name
  • run() 方法并不启动一个新线程,只是在主线程中调用了一个普通函数而已。
  • 因此,如果你想启动多线程,就必须使用 start() 方法。
  • python的threading.Thread类有一个run方法,用于定义线程的功能函数,可以在自己的线程类中覆盖该方法。而创建自己的线程实例后,通过Thread类的start方法,可以启动该线程,当该线程获得执行的机会时,就会调用run方法执行线程。
  • 线程何时开启,何时结束
  • 当调用thread.start()时 开启线程,再运行线程的代码
  • 子线程把target指向的函数中的语句执行完毕后,或者线程中的run函数代码执行完毕后,立即结束当前子线程
  • 通过threading.enumerate()可枚举当前运行的所有线程
  • 所有子线程执行完毕后,主线程才结束

多线程的执行顺序是无序的

多线程共享全局变量

  • 在一个进程内的所有线程共享全局变量,很方便在多个线程间共享数据
  • 缺点就是,多线程对全局变量随意遂改可能造成全局变量的混乱(即线程非安全)
  • 多线程开发可能遇到的问题
  • 假设两个线程t1和t2都要对全局变量g_num(默认是0)进行加1运算,t1和t2都各对g_num加10次,g_num的最终的结果应该为20。

但是由于是多线程同时操作,有可能出现下面情况:

  • 在g_num=0时,t1取得g_num=0。此时系统把t1调度为”sleeping”状态,此时g_num并没有加1并赋值给g_num,把t2转换为”running”状态,t2也获得g_num=0
  • 然后t2对得到的值进行加1并赋给g_num,使得g_num=1
  • 然后系统又把t2调度为”sleeping”,把t1转为”running”。线程t1又把它之前得到的0加1后赋值给g_num。
  • 这样导致虽然t1和t2都对g_num加1,但结果仍然是g_num=1
  • 如果多个线程同时对同一个全局变量操作,会出现资源竞争问题,从而数据结果会不正确,即会遇到线程安全问题
  • 使用同步机制解决线程安全问题
  • 同步就是协同步调,按预定的先后次序进行运行
  • 当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制
  • 线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。
  • 某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

threading模块中定义了Lock类,可以方便的处理锁定:

# 创建锁
mutex = threading.Lock()

# 锁定
mutex.acquire()

# 释放
mutex.release()
  • 如果这个锁之前是没有上锁的,那么acquire不会堵塞
  • 如果在调用acquire对这个锁上锁之前 它已经被 其他线程上了锁,那么此时acquire会堵塞,直到这个锁被解锁为止
  • 上锁解锁过程
  • 当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。
  • 每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“阻塞”,直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。
  • 线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。
  • 协程
  • 协程是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源)。
  • 通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定

简单实现协程(yield)

import time

def work1():
while True:
print("----work1---")
yield
time.sleep(0.5)

def work2():
while True:
print("----work2---")
yield
time.sleep(0.5)

def main():
w1 = work1()
w2 = work2()
while True:
next(w1)
next(w2)

if __name__ == "__main__":
main()

通过 gevent 实现协程

from gevent import monkey
import gevent
import random
import time

# 有耗时操作时需要
monkey.patch_all() # 将程序中耗时操作的代码,换为gevent中自己实现的模块

def coroutine_work(coroutine_name):
for i in range(10):
print(coroutine_name, i)
time.sleep(random.random())

gevent.joinall([
gevent.spawn(coroutine_work, "work1"),
gevent.spawn(coroutine_work, "work2")
])

进程与线程对比

功能

  • 进程,能够完成多任务,比如运行的QQ再单独开一个进程接收推送的消息
  • 线程,能够完成多任务,比如运行的QQ开多个线程来发送消息、接收文件、视频聊天等多个任务

定义的不同

  • 进程是操作系统进行资源分配和调度的一个基本单位.
  • 线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.

区别

  • 一个程序至少有一个进程,一个进程至少有一个线程.
  • 线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高。
  • 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率
  • 线程不能够独立执行,必须依存在进程中
  • 可以将进程理解为工厂中的一条流水线,而其中的线程就是这个流水线上的工人

优缺点

线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反。

协程和线程差异

在实现多任务时, 线程切换从系统层面远不止保存和恢复 CPU 上下文这么简单。 操作系统为了程序运行的高效性每个线程都有自己缓存 Cache 等等数据,操作系统还会帮你做这些数据的恢复操作。 所以线程的切换非常耗性能。但是协程的切换只是单纯的操作 CPU 的上下文,所以一秒钟切换个上百万次系统都抗的住。

进程、线程和协程的区别

  • 进程是操作系统资源分配的单位
  • 线程是 CPU 调度的单位
  • 进程切换需要的资源最大,效率很低
  • 线程切换需要的资源一般,效率一般(当然在不考虑 GIL 的情况下)
  • 协程切换任务资源很小,效率高
  • 多进程、多线程根据 CPU 核数不一样可能是并行的,但是协程是在一个线程中,所以是并发


分享到:


相關文章: