这篇专栏补全原视频笔记没有出现的进程与线程
(资料图)
专栏有错误的地方欢迎指出
WPS笔记及其他内容百度网盘链接:
https://pan.baidu.com/s/1fTwjyoM81_OOAccPNhGE9Q?pwd=h3fg
参考:
https://www.bilibili.com/video/BV1qt4y1T7M2/?spm_id_from=333.999.0.0
https://docs.python.org/zh-cn/3.8/library/multiprocessing.html
https://docs.python.org/zh-cn/3.8/library/threading.html
https://www.cnblogs.com/shuaiyao666/p/16126113.html
https://www.cnblogs.com/shuaiyao666/p/16126624.html
https://blog.csdn.net/a883774913/article/details/125375440
https://www.shuzhiduo.com/A/l1dyVEjGze/
部分内容来源网络
(1)、程序(Program)
使用编程语言所编写的指令的集合,用于实现一定的功能
(2)、多任务的方式
1.在一个应用程序内使用多个进程
2.在一个进程内使用多个线程
(3)、进程(Process)
1.概念
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。启动后的程序在系统中就有一个进程,系统会为这个进程分配内存空间
进程是资源分配的最小单位
# 可以在Windows任务管理器中查看进程
2.创建进程的方式
2.1.os.fork()函数
只适用于Unix、Linux、Mac操作系统
2.2.multiprocessing模块中的Process类
2.3.自定义Process类的子类
2.4.Pool进程池
补充:os模块的getpid()和getppid()函数
分别返回PID(当前进程ID)和PPID(父进程ID)
# PID(Process ID), PPID(Parent Process ID)
3.使用multiprocessing模块创建进程
下面展示一个示例,具体内容后文说明
示例:
运行结果: # 其中一次运行结果
主进程开始运行
子进程运行,PID是:5460,PPID是:6000
子进程运行,PID是:7236,PPID是:6000
子进程运行,PID是:1904,PPID是:6000
子进程运行,PID是:7284,PPID是:6000
子进程运行,PID是:8896,PPID是:6000
主进程结束运行,运行时间是3.252000093460083s
补充:进程的三种基本状态
1.就绪状态(Ready):只剩下CPU需要执行外,其他所有资源都已分配完毕
2.执行状态(Running):CPU开始执行该进程
3.阻塞状态(Blocked):由于等待某个事件发生而无法执行
(4)、Process类
Process是multiprocessing模块中的一个类,通过创建一个Process对象然后调用对象的start()方法来生成进程
1.创建Process类的对象
应始终使用关键字参数调用构造函数(或构造方法/构造器),构造函数如下:
Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
group——应始终是None,兼容作用
target——由run()方法调用的可调用对象。默认为None,意味什么都不调用
name——进程名称
args, kwargs——调用target时传入的位置参数和关键字参数
daemon——进程的守护标志
# args可以传入列表或元组
2.Process类的常用方法和属性
2.1.name属性
进程的名称。该名称是一个字符串,仅用于识别目的,没有语义。可以为多个进程指定相同的名称。初始名称由构造函数设定(默认为'Process-N1:N2:...:Nk',其中每个Nk是其父进程的第N个子进程)
2.2.pid属性
进程的PID。在生成进程前的值为None
2.3.daemon属性
进程的守护标志,是一个布尔值,必须在start()被调用之前设置
# 初始值是在构造函数中设置的值。在构造函数中设置的值如果是None(默认值),则从创建的进程继承(主进程默认为False)
# 在start()被调用之前可以使用动态绑定属性设置此属性
不允许在守护程序中创建子进程。因为当守护进程由于父进程而退出中断时,其子进程会变成孤儿进程
补充:守护进程(Daemon)
可以给子进程守护进程的标志,该进程会随着父进程代码执行完毕而强制终止(为父进程守护)
# 注意,父进程代码执行完毕并不代表父进程结束
补充:孤儿进程(Orphan Process)
指的是在其父进程执行完成或被终止后仍继续运行的一类进程。也就是父进程不存在了,子进程还在运行
示例:
运行结果:
pro1正在运行 # 3秒后出现
pro2正在运行 # 3秒后出现
主进程代码执行完毕 # 5秒后出现,此时主进程的守护进程pro1被终止
pro2正在运行 # 6秒后出现
pro2正在运行 # 9秒后出现
...
2.4.is_alive()
如果进程对象处于活动状态则返回True,否则返回False
2.5.join([timeout])
阻塞调用的进程,直到调用join()方法对应的进程对象终止。如果指定了timeout,则最多会阻塞timeout秒。此方法在进程终止或方法超时时返回None
一个进程可以被join多次。进程无法join自己,因为这会导致死锁。不能在启动进程之前join进程
2.6.start()
启动进程活动。此方法每个进程对象最多只能调用一次。它会将对象的run()方法安排在一个单独的进程中调用
2.7.run()
表示进程活动。可以在子类中重写此方法。标准run()方法会调用在构造函数中传递给对象的target参数,并分别从args和kwargs参数中获取位置和关键字参数
# 标准run()方法指在Process类中的run()方法而不是子类中的run()方法
2.8.terminate(), kill()
强制终止进程。不会执行退出处理程序和finally子句等。进程的子进程不会被终止
# 在Windows上terminate()与kill()相同
# 注意,如果在start()后终止进程,或在终止进程后立刻调用is_alive(),因为启动进程和终止进程需要时间,所以分别会出现不会启动进程和调用is_alive()返回值是True
示例:
运行结果:
子进程正在运行 # 3秒后出现
子进程的子进程正在运行 # 3秒后出现
子进程被终止 # 5秒后出现,此时子进程被终止,但子进程的子进程还在运行
子进程的子进程正在运行 # 6秒后出现
子进程的子进程正在运行 # 9秒后出现
子进程的子进程正在运行 # 12秒后出现
主进程代码执行完毕 # 15秒后出现,主进程终止,同时子进程的子进程终止
2.9.exitcode属性
子进程的退出代码。如果进程尚未终止,则值为None
# 一般0代表一切正常
2.10.close()
关闭Process对象,释放与之关联的所有资源。如果底层进程仍在运行,则会引发ValueError异常。一旦close()成功返回,Process对象的大多其他方法和属性将引发ValueError
# 这里的关闭是指关闭进程对象而不是进程,类似文件对象的释放。此方法不能在进程运行时调用
3.自定义Process子类创建进程
示例:
运行结果: # 其中一次运行结果
主进程开始执行
子进程的名称:进程--1,PID:10244,PPID:1568
子进程的名称:进程--0,PID:8328,PPID:1568
子进程的名称:进程--2,PID:10172,PPID:1568
子进程的名称:进程--3,PID:10356,PPID:1568
子进程的名称:进程--4,PID:5044,PPID:1568
主进程执行完毕
# 多个进程同时执行的顺序是无序的
(5)、Pool进程池
Pool进程池是multiprocessing模块中的一个类
1.并发和并行
1.1.并发
两个或多个事件同一时间间隔发生,多个进程被交替轮换着执行
1.2.并行
两个或多个事件在同一时刻发生,多条命令在同一时刻在多个处理器上同时执行
示例: # '0'表示不执行,'1'表示执行,长度表示时间
并发:
事件A:1111000000001111000000001111...
事件B:0000111100000000111100000000...
事件C:0000000011110000000011110000...
并行:
事件A:1111111111111111111111111111...
事件B:11111111111111111111111...
事件C:1111111111111111111111111...
2.为什么要有进程池
在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,如果对应着开启成千上万个进程,一是创建进程和销毁进程需要消耗时间,二是即使开启了成千上万的进程,操作系统也不能让它们同时执行,这样反而会影响程序的效率
3.进程池的概念
进程池是资源进程、管理进程组成的技术的应用
定义一个池子,在里面放入固定数量的进程,在有需求时就拿池中的一个进程来处理任务。处理完毕后,进程并不关闭,而是放回进程池继续等待任务
如果池中的进程数量不够,任务就要等到池中有空闲进程时才继续执行。也就是池中的进程数量是固定的,同一时间最多有固定数量的进程在运行
进程池技术的应用至少由以下两部分组成:
3.1.资源进程(工作进程)
预先创建好的空闲进程
3.2.管理进程
管理进程负责创建资源进程,把工作交给空间资源进程处理,回收已经处理完工作的资源进程
4.优点
这样即不会增加操作系统的调度难道,还节省了开关进程的时间,在一定程度上能够实现并发效果
5.创建Pool类(进程池类)的对象
构造函数如下:
Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
processes——要使用的工作进程数目。如果值为None(默认),则使用os.cpu_count()返回的值
补充:os.cpu_count()
返回系统的CPU数量。不确定返回None
initializer, initargs——如果initializer不为None,则每个工作进程将会在启动时调用initializer(*initargs)
maxtasksperchild——一个工作进程在它退出之前或被一个新的工作进程代替之前能完成的任务数量,为了释放未使用的资源。此参数的默认值为None,意味着工作进程的寿命与进程池一致
6.Pool类的方法
# 注意,进程池对象的方法只有创建它的进程能够调用
# 进程池类遵守上下文管理协议,__enter__()返回进程池对象,__exit__()会调用terminate()方法
6.1.apply([func[, args[, kwds]]])
使用阻塞方式在进程池中的一个工作进程中调用func并传入args位置参数和kwds关键字参数(同步调用)
6.2.apply_async(func[, args[, kwds[, callback[, error_callback]]]])
apply()方法的一个变种。使用非阻塞方式在进程池中的一个工作进程中调用func并传入args位置参数和kwds关键字参数(异步调用)。返回一个AsyncResult对象
# AsyncResult类见本章(5)
callback, error_callback——参数必须是一个接受单个参数的可调用对象。当func执行成功时,callback会被用于处理执行后的返回结果,否则,会将抛出的异常对象作为参数传递给error_callback执行
回调函数callback/error_callback应立刻执行完成,否则会阻塞负责处理结果的线程
# 进程池里的回调函数在父进程里执行,原因是任务是父进程发起的,所以结果也应该交给父进程
补充:同步调用和异步调用
同步调用是一种阻塞式调用,一段代码调用另一段代码时,必须等另一段代码执行结束并返回结果后代码才能继续执行下去
异步调用是一种非阻塞式调用,一段异步代码还未执行完时,可以继续执行下一段代码逻辑,当代码执行完后通过回调函数返回继续执行相应的逻辑,而不耽误其他代码的执行
6.3.close()
关闭进程池,阻止后续任务提交到进程池,即不再接收新的任务。当所有任务执行完成后,工作进程会退出
6.4.terminate()
不必等待未完成的任务,立刻停止工作进程。当进程池对象被垃圾回收时会立即调用此方法
6.5.join()
等待工作进程结束(阻塞调用的进程)。此方法必须在调用close()或者terminate()后调用
补充:垃圾回收(GC)
当电脑上的一个动态内存不再需要时,就应该予以释放,以让出内存,这种内存资源管理称为垃圾回收(GC,全称Garbage Collection)
# 主进程不会等待进程池中的任务都执行完毕后再退出,如果主进程代码执行完毕,进程池的工作进程就会被停止(工作进程是主进程的守护进程)
示例1: # 并发
运行结果: # 其中一次运行结果
主进程开始执行
子进程的PID是:4596,任务名称:0,时间:0.177s
子进程的PID是:9140,任务名称:1,时间:1.177s
子进程的PID是:8060,任务名称:2,时间:2.177s
子进程的PID是:4596,任务名称:3,时间:3.178s
子进程的PID是:9140,任务名称:4,时间:4.178s
子进程的PID是:8060,任务名称:5,时间:5.179s
子进程的PID是:4596,任务名称:6,时间:6.179s
子进程的PID是:9140,任务名称:7,时间:7.179s
子进程的PID是:8060,任务名称:8,时间:8.179s
子进程的PID是:4596,任务名称:9,时间:9.179s
主进程执行完毕
# 并发:十个任务同一时间间隔创建,三个子进程交替轮换执行任务
示例2: # 并行
继上个示例
# 非阻塞方式运行,一个任务未完成时继续创建下一个任务
p.apply(func=task, args=(i, start))改为p.apply_async(func=task, args=(i, start))
# 因为任务以非阻塞方式运行,如果不p.join()阻塞主进程,主进程代码执行完毕后就会停止进程池的工作进程
将# p.join()的'# '删去
运行结果: # 其中一次运行结果
主进程开始执行
子进程的PID是:1260,任务名称:0,时间:0.17s
子进程的PID是:3956,任务名称:1,时间:0.175s
子进程的PID是:8876,任务名称:2,时间:0.224s
子进程的PID是:1260,任务名称:3,时间:1.17s
子进程的PID是:3956,任务名称:4,时间:1.175s
子进程的PID是:8876,任务名称:5,时间:1.224s
子进程的PID是:1260,任务名称:6,时间:2.17s
子进程的PID是:3956,任务名称:7,时间:2.175s
子进程的PID是:8876,任务名称:8,时间:2.224s
子进程的PID是:1260,任务名称:9,时间:3.17s
主进程执行完毕
# 并行:十个任务同一时刻创建,每秒三个任务在三个子进程上同时执行
示例3:
运行结果: # 其中一次运行结果
主进程开始执行
初始化函数被调用,传入值:0
任务执行,子进程PID:7184,任务名称:0
初始化函数被调用,传入值:0
任务执行,子进程PID:6668,任务名称:1
初始化函数被调用,传入值:0
任务执行,子进程PID:8460,任务名称:2
任务0执行完毕
任务执行,子进程PID:7184,任务名称:3
回调函数被调用,传入值:0
任务1执行完毕
回调函数被调用,传入值:0
任务执行,子进程PID:6668,任务名称:4
任务2执行完毕
回调函数被调用,传入值:0
任务执行,子进程PID:8460,任务名称:5
任务3执行完毕
回调函数被调用,传入值:0
任务4执行完毕
回调函数被调用,传入值:0
任务5执行完毕
回调函数被调用,传入值:0
初始化函数被调用,传入值:0
任务执行,子进程PID:216,任务名称:6
初始化函数被调用,传入值:0
任务执行,子进程PID:8568,任务名称:7
初始化函数被调用,传入值:0
任务执行,子进程PID:2516,任务名称:8
主进程执行完毕 # 此时工作进程被停止,任务6~9停止执行
# 初始化函数在工作进程启动时调用,回调函数在任务执行完毕后调用,每个工作进程最多完成两个任务(也就是上面子进程PID最多出现两次)
6.6.map(func, iterable[, chunksize])
内置map()函数的并行版本(但只支持一个iterable参数,支持多个的见starmap())。此方法以阻塞方式运行
# 这里指的并行是可迭代对象中的元素对提供函数做映射时并行
此方法会将可迭代对象分割为许多块,然后提交给进程池。可以将chunksize设置为一个正整数从而(近似)指定每个块的大小
# 对于很长的迭代对象,可能消耗很多内存。可以考虑使用imap()或imap_unordered()并且显式指定chunksize以提升效率
# 注意这里的func不能使用匿名函数
返回值是一个列表,元素是映射的结果
补充:内置函数map() # 由于在原笔记没有出现此函数,在专栏解释一下
根据提供的函数对指定序列做映射
将function应用于iterable的每一个元素,返回函数输出结果的迭代器。如果iterable有多个,则function必须接收相同个数的实参,并被应用于所有iterable中的并行的每一个元素
# 多个iterable时,迭代器的元素数量是最少的iterable的元素数量为基准
map(function, iterable, ...)
function——函数 # 可调用对象
iterable——可迭代对象
示例:
运行结果:
<class 'map'>
['1', '2', '3', '4', '5']
[2, 12, 30, 56, 90]
6.7.map_async(func, iterable[, chunksize[, callback[, error_callback]]])
map()方法的一个变种,以非阻塞方式运行,返回一个AsyncResult对象
callback, error_callback——与apply_async()方法相同
6.8.imap(func, iterable[, chunksize])
map()方法的延迟执行版本,大致等效与map(),可能比map()慢很多,但对于很长的迭代对象,给chunksize设置一个很大的值会比默认值1极大地加快执行速度
# 实测如果chunksize值太大反而会减慢速度
返回值是一个迭代器。如果chunksize是1,则这个迭代器的next()方法有一个可选的timeout参数(默认值None):如果无法在timeout秒内执行得到结果,则会抛出multiprocessing.TimeoutError异常
6.9.imap_unordered(func, iterable[, chunksize])
和imap()相同,只不过迭代器返回的结果的顺序是任意的
# 如果进程池只有一个工作进程,返回的结果顺序才能认为是“有序”的
6.10.starmap(func, iterable[, chunksize])
和map()类似,不过iterable中的每一项会被解包再作为函数参数。返回值是一个列表
示例:可迭代对象[(1,2), (3, 4)]会转化为等价于[func(1, 2), func(3, 4)]的调用
6.11.starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
相当于starmap()与map_async()的结合。返回值是一个AsyncResult对象
(6)、AsyncResult类
Pool.apply_async()和Pool.map_async返回对象所属的类
1.AsyncResult类的方法
1.1.get([timeout])
用于获取执行结果。如果timeout不是None并且在timeout秒内仍然没有执行完得到结果,则抛出multiprocessing.TimeoutError异常。如果远程调用发生异常,这个异常会通过此函数重新抛出
# 如果timeout是None(默认),在没执行完得到结果前会阻塞
1.2.wait([timeout])
阻塞,直到返回结果,或者timeout秒后超时
1.3.ready()
返回执行状态,是否已经完成
1.4.successful()
判断是否未引发异常。如果没有执行完会抛出ValueError异常
示例:
运行结果:
主进程开始执行
执行结果:[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
执行结果:[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
错误任务执行
极慢任务执行
执行结果:[(0, 0), (1, 1), (2, 2), (3, 3), (4, 4)]
执行出错或超时 # 这里是因为出错
执行出错或超时 # 这里是因为超时
主进程执行完毕
补充:短路运算符
当有多个表达式时,左边的表达式可以确定结果时,就不在继续运算右边的值
短路运算符例如:
x and y:只有x为真值时才会对第二个参数求值
x or y:只有x为假值时才会对第二个参数求值
# 所以上个示例中successful()才不会报错,因为当ready()为False时不会执行它
(7)、进程之间的通信
1.进程之间的关系
每个进程都是独立的,都有自己的内存、空间、地址等,进程之间的数据不共享。在父进程创建子进程时,子进程会完全复制一份父进程的环境,包括变量、函数、类等
示例:
运行结果:
子进程运行前,主进程a的值为: 100
func2中a的值为: 50
func1中a的值为: 150
子进程运行后,主进程a的值为: 100
# 子进程中对a的修改,并未影响到主进程和其他子进程
补充:在Windows系统使用多进程不以主程序的形式运行会报错的原因
因为Python创建子进程的时候相当于会开一块新的内存空间去存储主进程中的代码,并且是通过导入包的形式去复刻主程序中的代码的,如果不以主程序的形式运行,就会执行在if name == 'main'外的代码,就会无限递归地创建子进程,但multiprocessing.Process的源码中对这种行为是不允许的,所以会抛出RuntimeError异常
示例:
运行结果:
if __name__ == '__main__'外的代码被执行 # 主进程执行
if __name__ == '__main__'内的代码被执行 # 主进程执行
if __name__ == '__main__'外的代码被执行 # 子进程执行
2.进程之间的通信
实现进程之间的通信有两种消息机制:
· 管道(Pipe)
· 队列(Queue)
3.管道(Pipe)
3.1.概念
从一个进程连接到另一个进程的数据流。管道可以用于在两个进程间的通信
管道可分为单向管道和双向管道。单向管道只能从一端发送消息,一端接收消息。双向管道两端都可以发送和接收消息
3.2.创建管道
使用multiprocessing模块中的Pipe()函数,函数语法如下:
Pipe([duplex])
返回一个由管道连接的连接对象(Connection对象)。返回值是一个二元组(conn1, conn2),返回的两个连接对象conn1, conn2表示管道的两端
duplex——如果为True(默认值),则管道是双向的,否则管道是单向的,即conn1只能用于接收消息,conn2只能用于发送消息
# 注意如果两个进程(或线程)同时尝试读取或写入管道的同一端,则管道中的数据可能会损坏。如果是不同端不会有损坏的风险
# 如果一个进程在试图读写管道时被终止了,那么管道中的数据很可能是不完整的,因为此时可能无法确定消息的边界
补充:序列化和反序列化
序列化是将数据结构或对象转换成二进制串的过程。反序列化是将在序列化过程中所生成的二进制串转换成数据结构或者对象的过程
3.3.连接对象(Connection对象)的方法
3.3.1.send(obj)
将一个可序列化对象obj发送到连接的另一端,可以用recv()方法读取
过大的对象(接近32MiB+,此值取决于操作系统)有可能引发ValueError异常
3.3.2.recv()
返回一个由另一端使用send()发送的对象。该方法会一直阻塞直到接收到对象。如果对端关闭了连接并且没有东西可接收,将抛出EOFError异常
3.3.3.fileno()
返回由连接对象使用的描述符或者句柄
3.3.4.close()
关闭连接对象。当连接对象被垃圾回收时会自动调用
3.3.5.poll([timeout])
返回连接对象中是否有可以读取的数据,如果有则返回True,否则返回False
timeout——如果未指定,此方法会立刻返回。如果是一个数字,则指定了最大阻塞的秒数。如果是None,则会一直等待,不会超时
3.3.6.send_bytes(buffer[, offset[, size])
从一个字节类对象中取出字节数组并作为一条完整消息发送
buffer, offset——如果给定了offset,在buffer中指定的offset位置读取数据
# offset值为0时对应buffer第1个字节,1对应第2个,以此类推
size——如果给定了size,会从缓冲区buffer中读取size个字节
# 缓冲区长度不能小于offset或offset + size,offset和size必须为非负整数
过大的缓冲区(接近32MiB+,此值取决于操作系统)有可能引发ValueError异常
3.3.7.recv_bytes([maxlength])
与recv()类似,返回的是字节数据
maxlength——如果给定了maxlength并且消息长于maxlength,则抛出OSError异常并且该连接对象将不再可读
3.3.8.recv_bytes_into(buffer[, offset])
与recv()类似,会将一条完整的字节数据读入buffer中并返回消息的字节数
buffer必须是一个可写的字节类对象(例如bytearray)
offset——如果给定了offset,则消息从指定的offset位置写入缓冲区。offset必须是小于缓冲区长度(字节)的非负整数
如果缓冲区太小,则引发BufferTooShort异常,并且完整的消息将会存放在异常实例e的e.args[0]中
示例1:
运行结果:
b'345'
[1, None, '']
True
False
11
bytearray(b'\x00\x00\x00Hello World\x00') # 在第四个字节及以后插入了消息
示例2:
# 这个示例由https://blog.csdn.net/a883774913/article/details/125375440修改而成
运行结果: # 其中一次运行结果
发送消息:hello
发送消息:hey
发送消息:hru?
发送消息:END
收到消息:hello
收到消息:hey
收到消息:hru?
# 有一定的延迟,所以在发送消息后没有立刻收到
4.队列(Queue)
4.1.概念
队列是一种遵循先进先出(First In, First Out)数据操作规则的线性数据结构
队列的头部称为队首,队列尾部称为队尾,将把元素加入队尾的操作称为入队,删除队首元素的操作称为出队。队列可以在多个生产者和消费者之间通信(多个进程之间通信)
补充:单向队列和双向队列
单向队列只能从队尾添加元素,只能从队首删除元素,遵循先进先出原则
双向队列可以从队尾或队首添加或删除元素
# 下面使用的是单向队列
4.2.创建队列
使用multiprocessing模块中的Queue类,构造函数如下:
Queue([maxsize])
返回一个使用一个管道和少量锁和信号量实现的共享队列实例。当一个进程将一个对象放入队列中时,一个写入线程会启动并将对象从缓冲区写入管道中
# 锁、信号量、线程,以及前面提到的生产者和消费者在后文说明
maxsize——是一个整数,用于设置队列中可以放入的消息个数的上限(项目数的上限)。如果值小于或等于零,则队列尺寸为无限大
4.3.Queue类的方法
4.3.1.qsize()
返回队列的大致长度(当前包含的消息数量)
4.3.2.empty()
如果队列为空则返回True,否则返回False
4.3.3.full()
如果队列为满则返回True,否则返回False
4.3.4.put(obj[, block[, timeout]])
将obj放入队列
block, timeout——如果blcok为True(默认值)而且timeout为None(默认值),将会阻塞当前进程,直到有空的缓冲槽。如果timeout为正数,将会在阻塞了最多timeout秒后还没有可用的缓冲槽时抛出queue.Full异常。如果block为False,则会忽略timeout参数,仅当有可用缓冲槽时才放入对象,否则抛出queue.Full异常
# 如果队列已经关闭,则抛出ValueError异常
# queue.Full异常以及下面提到的queue.Empty异常都是queue模块中的
4.3.5.put_nowait(obj)
相当于put(obj, False)
4.3.6.get([block[, timeout]])
从队列中取出并返回对象
block, timeout——如果blcok为True(默认值)而且timeout为None(默认值),将会阻塞当前进程,直到队列中出现可用的对象。如果timeout为正数,将会在阻塞了最多timeout秒后还没有可用的对象时抛出queue.Empty异常。如果block为False,则会忽略timeout参数,仅当有可用对象时才取出并返回,否则抛出queue.Empty异常
# 如果队列已经关闭,则抛出ValueError异常
4.3.7.get_nowait()
相当于get(False)
# 由于多线程或多进程的环境,qsize()、empty()、full()返回值是不可靠的
# 当一个对象被放入一个队列时,这个对象首先会被一个后台线程用pickle序列化,并将序列化的数据通过一个底层管道的管道传递到队列中。所以可能需要极小的延迟,empty()才会返回False、close()才不会抛出BrokenPipeError异常等等
# 如果有多个进程同时将对象放入队列,那么在队列的另一端接收到对象可能是无序的。但是由同一个进程放入的多个对象的顺序在另一端输出时总是一样的
# 如果一个子进程已将一些对象放入队列中(并且没有使用cancel_join_thread()方法),那么这个进程在所有缓冲区的对象被刷新进管道之前,是不会终止的。这就意味着如果除非确定所有放入队列中的对象都已经被消费了,否则如果试图等待这个进程,可能会陷入死锁状态
示例1: # 演示使用队列实现进程之间的通信
运行结果:
a入队时的值: 90
a入队时的值: 80
a入队时的值: 70
a入队时的值: 60
a入队时的值: 50
出队时a的值: 90
出队时a的值: 80
出队时a的值: 70
出队时a的值: 60
出队时a的值: 50
示例2: # 演示放入对象的延迟和死锁状态
运行结果1:
True
[进入死锁]
运行结果2:
False
[进入死锁]
# 死锁是因为子进程(工作进程)将消息放入队列时,队列产生具有q.put('X' * 1000000)的feeder线程,feeder线程将消息传入管道。子进程中的主线程在等待所有缓冲区的对象被刷新进管道(也就是下面join_thread()提到的等待后台线程),但是消息量太大,无法放入缓冲区,而主进程在等待子进程运行结束,两个进程永远在互相等待,造成死锁
# 尽量避免在进程间传递大量数据,越少越好
4.3.8.close()
指示当前进程将不会再往队列中放入对象。一旦缓冲区中的数据被写入管道之后,后台的数据会退出。这个方法在队列被垃圾回收时会自动调用
# 注意只是当前进程,并不影响其他进程
4.3.9.join_thread()
等待后台线程。这个方法仅在调用了close()方法之后可用。这会阻塞当前进程,直到后台线程退出,确保所有缓冲区中的数据被写入管道中
默认情况下,如果一个不是队列创建者的进程试图退出,他会尝试等待这个队列的后台线程
4.3.10.cancel_join_thread()
防止join_thread()方法阻塞当前进程。具体来说这防止进程退出时自动等待后台线程退出
此方法仅当不关心底层管道中可能丢失数据,只希望进程能马上结束时使用
4.4.SimpleQueue类的方法
SimpleQueue是multiprocessing模块中的一个类,是一个简化的Queue类的实现,很像带锁的Pipe
# SimpleQueue类不能指定放入消息的上限
# 实测SimpleQueue类的empty()方法返回值可靠,但消息的上限比Queue类的少
4.4.1.empty()
如果队列为空则返回True,否则返回False
4.4.2.put(item)
将item放入队列
4.4.3.get()
从队列中取出并返回对象
示例:
运行结果:
1 1 1 1 1 1 1 1 1 1
4.5.JoinableQueue类的方法
JoinableQueue是multiprocessing模块中的一个类,是Queue的子类,额外添加了以下方法:
4.5.1.task_done(), join()
task_done()指出之前进入队列的任务已经完成。由队列的消费者进程使用
join()阻塞至队列中所有的项目(任务)都被接收和处理完毕
每当一个项目添加到队列的时候,未完成任务的计数会增加。对于每个调用get()获取的任务,执行完毕后调用task_done()告诉队列该任务已处理完成,未完成计数就会减少。当未完成计数降到零的时候,join()阻塞将被解除
# 当然也可以不调用get()获取任务就调用task_done()告诉任务已经完成
如果task_done()被调用的次数多于放入队列中的项目数量,将引发ValueError异常
示例:
运行结果:
主进程执行结束
4.6.生产者和消费者模型
多个进程(线程)之间的通信可以采用生产者和消费者模型
生产者负责生产数据
消费者负责处理数据
它们通过一个容器(中央仓库)进行通信,这个容器可以是队列,相当于一个缓冲区:生产者 --<生产数据放入容器>--> 容器 --<从容器取出数据>--> 消费者
4.6.1.作用
· 平衡生产者与消费者之间的速度差
· 解决生产者和消费者的强耦合问题
# 耦合是程序之间关联性(依赖程度)
# 在并发编程中使用生产者和消费者模式能解决绝大多数并发问题,并通过平衡生产进程(线程)和消费进程(线程)的工作能力来提高程序的整体处理数据的速度
示例: # 使用JoinableQueue实现生产者和消费者模型
运行结果: # 其中一次运行结果
产生第一轮数据
产生第一轮数据
数据[1, 0, 'p1']消费完成
数据[1, 1, 'p1']消费完成
数据[1, 0, 'p2']消费完成
数据[1, 1, 'p2']消费完成
产生第二轮数据
产生第二轮数据
生产者进程退出
生产者进程退出
数据[2, 0, 'p2']消费完成
数据[2, 1, 'p2']消费完成
数据[2, 0, 'p1']消费完成
数据[2, 1, 'p1']消费完成
(8)、multiprocessing模块的部分函数
1.active_children()
返回当前进程存活的子进程的列表
# 列表的元素是Process对象
2.cpu_count()
参见os.cpu_count()
# 可能引发NotImplementedError
3.current_process()
返回与当前进程对应的Process对象
4.parent_process()
返回父进程的Process对象,相当于父进程调用current_process()。如果在主进程调用,则会返回None
5.freeze_support()
为使用了multiprocessing模块的程序,提供冻结代码以产生Windows可执行文件的模块的支持(例如PyInstaller模块)
需要在main模块(主模块)的if __name__ == '__main__'行之后立刻调用此函数
# 如果不调用,产生的可执行文件大概率会出错
(9)、线程(Thread)
1.概念
线程(Thread)是CPU可调度的最小单位,线程被包含在进程中,是进程中实际的运作单位。一个进程中可以并发多个线程,每条线程并行执行不同的任务
# 一个进程中至少有一个线程在工作
2.创建线程的方式
2.1.threading模块中的Thread类
2.2.自定义Thread类的子类
3.分别使用两种方式创建线程
3.1.使用threading模块中的Thread类创建线程
# 可以类比着创建进程
示例:
运行结果: # 其中一次运行结果
主线程开始执行
线程9068正在执行:0
线程8648正在执行:0
线程8648正在执行:1
线程9068正在执行:1
线程8648正在执行:2
线程9068正在执行:2
主线程结束执行
# 多个线程同时执行的顺序是无序的
3.2.自定义Thread类的子类创建线程
示例:
(10)、Thread类
Thread是threading模块中的一个类,通过创建一个Thread对象然后调用对象的start()方法来生成线程
1.创建Thread类的对象
应始终使用关键字参数调用构造函数,构造函数如下:
Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
# 关于参数请参见本章(4)、1
2.Thread类的常用方法和属性
2.1.name属性
只用于识别的字符串,没有语义。多个线程可以赋予相同的名称。初始名称由构造函数设定(默认为'Thread-N',其中N是第N个子线程)
2.2.ident属性, native_id属性
ident属性是这个线程的“线程标识符”,是个非零整数。它的值没有直接含义,主要用作magic cookie
native_id属性是这个线程的原生线程ID(TID),是个非负整数。它的值可用来在整个系统中唯一地标识这个特定线程
两个属性在线程终结之后可能会被复用
# 类似于进程ID,线程ID的有效期(全系统范围内保证唯一)将从线程被创建开始直到线程被终结
# 实测通常两个属性值相同(没有发现不同的情况)
2.3.daemon属性
线程的守护标志,是一个布尔值,必须在start()被调用之前设置,否则在之后设置会抛出RuntimeError异常
# 初始值是构造函数中设置的值(值为None则从创建的线程继承)
# 主线程不是守护线程,默认为False
# 当没有存活的非守护线程时,整个Python程序才会退出
# 可以在守护线程中创建子线程
2.4.is_alive()
返回线程是否存活。当run()方法刚开始知道run()方法刚结束时此方法返回True
2.5.join(timeout=None)
阻塞调用的线程,直到调用join()方法对应的线程对象终止
# 终止包括正常终止、抛出未处理异常、发生超时
timeout——同Process类的join()方法,值可以是以秒为单位的浮点数或分数
# 因为join()总是返回None,所以判断是否发生超时要在join()后调用is_alive()(返回True则join()超时)
# fractions模块支持分数
# 一个线程可以被join多次。线程join自己或join一个未开始的线程会抛出RuntimeError
2.6.start()
同Process类的start()方法
# 同一个线程对象调用此方法大于一次会抛出RuntimeError异常
2.7.run()
同Process类的run()方法
# 注意Thread类没有terminate(), kill()方法
(11)、threading模块的部分函数和常量
1.enumerate()
以列表形式返回当前所有存活的Thread对象。该列表包含守护线程,current_thread()创建的虚拟线程对象和主线程。它不包含已终结的线程和尚未开始的线程
2.active_count()
返回当前存活的Thread对象的数量。返回值与enumerate()函数所返回的列表长度一致
3.current_thread()
返回当前对应调用者的控制线程的Thread对象。如果调用者的控制线程不是利用threading模块创建,则会返回一个功能受限的虚拟线程对象
4.get_ident()
返回当前线程的“线程标识符”
5.get_native_id()
返回内核分配给当前线程的原生集成线程ID
6.main_thread()
返回主线程的Thread对象
7.TIMEOUT_MAX常量
阻塞函数中形参timeout允许的最大值,传入值超过此值会抛出OverflowError异常
(12)、线程之间的通信
1.线程之间的关系
一个进程内的所有线程共享数据
示例:
运行结果:
主线程开始执行,a的值为:100
加的线程开始执行
a的值为:130 # 100 + 30 = 130
加的线程执行结束
减的线程开始执行
a的值为:80 # 130 - 50 = 80
减的线程执行结束
主线程结束执行,a的值为:80
(13)、线程同步
1.多线程共享数据所带来的安全性问题
多线程都是在同一个进程中运行的,因此在进程中的全局变量所有线程都可以共享,这就造成了一个问题,因为线程的顺序是无序的,有可能会造成数据错乱
2.锁机制——互斥锁
互斥锁有锁定和非锁定两种状态
当线程访问共享资源时,先将资源的状态变成“锁定”,其它线程不能修改;直到该线程释放资源,将资源的状态变成“非锁定”时,其它线程才能操作共享资源
互斥锁是线程的相互排斥,谁先抢到资源,谁就上锁修改资源,同一时间只能有一个线程可以进行修改,牺牲了速度却保证了数据的安全性
3.使用锁的原则
· 把尽量少的和不耗时的代码放到锁中执行
· 代码执行完成后要记得释放锁
4.锁/互斥锁/同步锁/原始锁(Lock)
在Python中,锁是一个在锁定时不属于特定线程的能用的最低级的同步基元组件
使用threading模块中的Lock类
一旦一个线程获得一个锁,会阻塞随后尝试获得锁的线程,直到它被释放;任何线程都可以释放它
# 使用类的实例化创建锁对象
4.1.Lock类的方法
4.1.1.acquire(blocking=True, timeout=-1)
可以阻塞或非阻塞地获得锁(上锁)
blocking——如果值为True(默认值),阻塞直到锁被释放,然后将锁锁定并返回True;如果值为False,不会阻塞,如果锁已被锁定则返回False,否则将锁锁定并返回True
timeout——值为浮点型,如果值为非负数,只要无法得到锁,将最多阻塞timeout设定的秒数;如果值为-1,将无限等待;如果值为除-1以外的负数,或当blocking值为False时,将此参数的值设置为-1以外的值会抛出ValueError异常
如果成功获得锁,则返回True,否则返回False
4.1.2.release()
释放一个锁。此方法可以在任何线程中调用,不限于获得锁的线程
当锁被锁定时,将它重置为未锁定。如果其他线程正在等待这个锁解锁而被阻塞,只允许其中一个获得锁并继续运行
当锁未被锁定时,会引发RuntimeError异常
此方法没有返回值
4.1.3.locked()
返回锁是否处于锁定状态
示例:
运行结果: # 其中一次运行结果
5468正在出传第5张票
9580正在出传第4张票
2520正在出传第3张票
5468正在出传第2张票
5468正在出传第1张票
# 先执行到上锁语句的线程上锁,后执行到的被阻塞。当票被出传完释放锁后,先获得锁的线程上锁,其余线程继续被阻塞
补充:multiprocessing模块中的Lock类
类似于threading.Lock,以下是不同之处:
threading中的Lock只能用于同一个进程,multiprocessing中的能用于多进程
# 在多进程使用锁需要在创建进程时传递锁对象
在acquire()方法中,multiprocessing中的第一个参数名是block而不是blocking。第二个参数默认值为None,无限等待。如果第二个参数值为负数,则效果和值为0一样而不是抛出异常。当block为False时,timeout会被忽略而不是抛出异常
在release()方法中,multiprocessing中的此方法可以在任何进程、线程中使用,不限于锁的拥有者。当尝试释放一个没有被持有的锁时抛出的异常是ValueError而不是RuntimeError
multiprocessing中没有locked()方法
补充:
在multiprocessing模块和threading模块提供的所有带有acquire()和release()方法的对象,都可以被用作with语句的上下文管理器。当进入语句块是调用acquire()方法,退出语句块时调用release()方法
示例:
补充:死锁(Deadlock) # 摘编自百度百科
死锁是指两个或两个以上的进程或线程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程或线程称为死锁进程或线程
5.递归锁/可重入锁(RLock)
递归锁是一个可以被同一个线程多次获取的同步基础元件。在内部,它在基元锁的锁定/非锁定状态上附加了“所属线程”和“递归等级”的概念。在锁定状态下属于某些线程;在非锁定状态下不属于任何线程。如果某个线程拿到了递归锁,这个线程可以再次拿到这个锁而不需要等待。但是这个线程拿锁操作和释放锁操作的次数相同递归锁才会处于非锁定状态
使用threading模块中的RLock类
# 使用类的实例化创建递归锁对象
5.1.RLock类的方法
5.1.1.acquire(blocking=True, timeout=-1)
与Lock类中的acquire()方法类似,区别是如果调用此方法的线程已经拥有锁,递归等级加一,不进行阻塞并返回True。否则,如果其他线程拥有该锁,则阻塞至该锁解锁。一旦锁被解锁(不属于任何线程),则抢夺所有权,设置递归等级为一并继续运行。如果多个线程被阻塞,等待锁被解锁,一次只有一个线程抢到锁的所有权并继续运行
5.1.2.release()
释放锁,自减递归等级。如果减到零,则将锁重置为非锁定状态(不被任何线程拥有)。如果自减后递归等级不是零,则锁保持锁定,仍由调用线程拥有
只有当前线程拥有锁才能调用此方法,如果锁被释放后调用此方法或非拥有锁的线程调用此方法,则抛出RuntimeError异常
此方法没有返回值
补充:Lock和RLock实际上是一个工厂函数(功能和类相似但是是一个函数),返回平台支持的具体递归锁类中最有效的版本的实例
补充:multiprocessing模块中的RLock类
类似于threading.RLock,以下是不同之处:
threading中的RLock只能用于同一个进程,multiprocessing中的能用于多进程
在acquire()方法中,multiprocessing中的此方法的区别同multiprocessing.Lock
在release()方法中,multiprocessing中的此方法的区别与multiprocessing.Lock相似,不同之处在于抛出的异常是AssertionError
示例:
运行结果: # 其中一次运行结果
my_thread获得锁
my_thread获得锁
my_thread释放锁
my_thread释放锁
0获得锁
0释放锁
1获得锁
2获得锁 # 这里是因为在1释放锁后打印内容前,2已经获得锁并打印内容
2释放锁
1释放锁
6.信号量(Semaphore)
一个信号量管理一个内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1。计数器的值永远不会小于零。当计数器为0时,调用acquire()将会阻塞直到其他线程调用release()方法
使用threading模块中的Semaphore类,构造函数如下:
Semaphore(value=1)
value——赋予内部计数器初始值,默认值为1。如果value被赋予小于0的值,将会引发ValueError异常
6.1.Semaphore类的方法
方法与RLock类的方法类似,区别是上文所说的内部计数器
# 因为调用release()方法是释放一个信号量,将内部计数器的值增加1,所以不会抛出异常
补充:multiprocessing模块中的Semaphore类,类似于threading.Semaphore,不同之处与multiprocessing.RLock和threading.RLock的不同之处大体相同
示例:
运行结果:
False # 说明没有获得信号量
# 在大多数情况下,信号量用于保护数量有限的资源。如果信号量被释放的次数过多,则表明出现了异常
7.有界信号量
有界信号量通过检查以确保它当前的值不会超过初始值
使用threading模块中的BoundedSemaphore类,构造函数如下:
BoundedSemaphore(value=1)
value——指定内部计数器不会超过的初始值(默认值为1)。如果超过了初始值,将会引发ValueError异常
# value也是赋予内部计数器的初始值
# 方法与Semaphore类相同
补充:multiprocessing模块中的BoundedSemaphore类,不同之处与multiprocessing.RLock和threading.RLock的不同之处大体相同.
示例:
运行结果:
任务被执行
任务被执行
任务被执行
# 两秒之后
任务被执行
任务被执行
8.事件(Event)
线程之间通信的最简单机制之一:一个线程发出事件信号,其他线程等待该信号
一个事件对象管理一个内部标志(内部旗标),调用set()方法可将其设置为True,调用clear()方法可将其设置为False,调用wait()方法将进入阻塞直到标志为True
使用threading模块中的Event类,内部标志初始为False
# 使用类的实例化创建事件对象
8.1.Event类的方法
8.1.1.is_set()
如果内部标志为True时返回True,否则返回False
8.1.2.set()
将内部标志设置为True
8.1.3.clear()
将内部标志设置为False
8.1.4.wait(timeout=None)
阻塞线程直到内部标志为True。如果调用内部标志为True,则立刻返回,返回值为True。否则阻塞线程,直到调用set()方法将标志设置为True(这种情况返回值为True)或者发生可选的超时
timeout——是一个浮点数,代表操作的超时时间。默认值为None,表示不会超时。如果值为负数,则效果和值为0一样。超时后返回值是False
补充:multiprocessing模块中的Event类,类似于threading.Event,区别是能用于多进程
示例: # 模拟红绿灯
# 这个示例由https://www.shuzhiduo.com/A/l1dyVEjGze/修改而成
9.条件变量(Condition)
条件变量用来管理那些已获得了一个锁但是因为需要满足某一条件才能运行的线程。条件变量总是与某种类型的锁对象关联(互斥锁或递归锁)。在线程获得锁的时候调用wait()方法释放锁,然后阻塞直到其他线程通知(也就是满足条件)后等待锁被其他线程释放后获得锁继续执行
使用threading模块中的Condition类,构造函数如下:
Condition(lock=Lone)
lock——如果值为None,则会创建新的RLock对象作为底层锁,否则值必须为Lock对象或RLock对象,它将被用作底层锁
9.1.Condition类的方法
9.1.1.acquire(*args)
获得底层锁。此方法调用底层锁的相应方法,返回值是底层锁相应方法的返回值
9.1.2.release()
释放底层锁。此方法调用底层锁的相应方法,没有返回值
9.1.3.wait(timeout=None)
释放底层锁,然后阻塞,直到被通知或发生超时,重新获得锁并继续执行。如果线程在调用此方法时没有获得锁,将会引发RuntimeError异常
# 当底层锁是RLock时,不会使用它的release()方法释放锁,会使用RLock类的内部接口,所以即使多次递归获取它,也能将它解锁。然后在重新获得锁时使用另一个内部接口来恢复递归等级
timeout——是一个浮点数,代表操作的超时时间。默认值为None,表示不会超时。如果值为负数,则效果和值为0一样。超时后返回值是False,否则是True
9.1.4.wait_for(predicate, timeout=None)
重复调用wait(),直到满足判断式predicate或发生超时。返回值是判断式最后一个返回值或者发生超时返回False
predicate——是一个可调用对象且返回值可解释为一个布尔值
timeout——同wait()
忽略超时功能,调用此方法大致相当于编写:
while not predicate:
cv.wait() # cv是条件变量
9.1.5.notify(n=1)
唤醒最多n个(默认为1个)正在等待这个条件变量的线程。如果线程在调用此方法时没有获得锁,将会引发RuntimeError异常
# 如果没有线程在等待,这是一个空操作
# 注意notify()方法不释放锁,被唤醒的线程直到它可以重新获得锁才继续执行
9.1.6.notify_all()
与notify()类似,区别是会唤醒所有等待这个条件变量线程
补充:multiprocessing模块中的Condition类,类似于threading.Condition,区别是底层锁是multiprocessing模块中的且能用于多进程
示例1:
示例2: # 演示wait_for()
运行结果:
子线程开始运行,获得锁
子线程等待被通知且满足判断式
第一次通知 # 3秒后,wait_5s()函数返回False
第二次通知 # 6秒后,wait_5s()函数返回True
子线程结束运行
10.栅栏(Barrier)
栅栏可以将任务集中到栅栏,当所有线程都准备好时,所有线程被释放并继续执行,而栅栏重置以便下次使用。线程调用wait()方法后将阻塞,直到所有线程(指定数量的线程)都调用了wait()方法,此时所有线程将被同时释放
使用threading模块中的Barrier类,构造函数如下:
Barrier(parties, action=None, timeout=None)
parties——栅栏对象需要的线程数量
action——当所有线程被释放时在其中一个线程中自动调用
timeout——默认的超时时间,作为wait()方法中timeout参数的默认值
10.1.Barrier类的方法和属性
10.1.1.wait(timeout=None)
冲出栅栏。当栅栏中所有线程都已经调用了这个函数时,它们将同时被释放
timeout——是一个浮点数,代表操作的超时时间。默认值为构造函数中指定的timeout,表示不会超时。如果值为负数,则效果和值为0一样。如果发生了超时,栅栏对象将进入损坏状态
如果创建栅栏对象时在构造函数提供了action参数,它将在其中一个线程释放前被调用。如果此调用引发了异常,栅栏对象将进入损坏状态
如果栅栏对象进入损坏状态,但仍有线程等待释放,这些线程将会收到BrokenBarrierError异常
函数的返回值是一个整数,取值返回在0到parties-1,在每个线程中的返回值不同。可用于从所有线程中选择唯一的一个线程执行一些特别的任务
10.1.2.reset()
重置栅栏为默认的初始状态。如果栅栏中仍有线程等待释放,这些线程将会收到BrokenBarrierError异常
# 使用此参数时,如果存在状态未知的其他线程,则可能需要执行外部同步,例如想要一个状态未知的线程进入重置后的栅栏,需要使用同步语句先将栅栏重置,再使这个线程进入栅栏
# 处于损坏状态的栅栏对象重置后将离开损坏状态,可以继续使用,但最好将其废弃并新建一个
10.1.3.abort()
使栅栏处于损坏状态
10.1.4.parties属性
冲出栅栏所需要的线程数量
# 值是构造函数中传入parties参数的值
10.1.5.n_waiting属性
当前时刻正在栅栏中被阻塞的线程数量
10.1.6.broken属性
是一个布尔值,如果栅栏处于损坏状态值为True,否则为False
补充:multiprocessing模块中的Barrier类,类似于threading.Barrier,区别是能用于多进程
示例:
运行结果: # 其中一次运行结果
Thread-1将进入栅栏,当前栅栏中的线程数:0
Thread-4将进入栅栏,当前栅栏中的线程数:1
Thread-2将进入栅栏,当前栅栏中的线程数:2
# Thread-2未进入栅栏前有两个线程,进入栅栏后有3个线程,释放所有线程
Thread-4冲出栅栏
Thread-2冲出栅栏
Thread-1冲出栅栏
wait()返回值为0的线程执行该语句,是Thread-1
Thread-3将进入栅栏,当前栅栏中的线程数:0
Thread-6将进入栅栏,当前栅栏中的线程数:1
Thread-5将进入栅栏,当前栅栏中的线程数:2
Thread-5冲出栅栏
Thread-3冲出栅栏
wait()返回值为0的线程执行该语句,是Thread-3
Thread-6冲出栅栏
(14)、定时器(Timer)
定时器Timer是threading模块中的一个类,它是Thread类的子类。此类表示一个操作应该在等待一定时间之后运行——相当于一个定时器,构造函数如下:
Timer(interval, function, args=None, kwargs=None)
创建一个定时器,在通过start()函数启动后,经过interval秒的间隔时间后,将会用位置参数args和关键字参数kwargs调用function。如果args为None(默认值),则会使用一个空列表。如果kwargs为None(默认值),则会使用一个空字典
1.1.Timer类的新增方法
# Timer是Thread类的子类,拥有Thread类的所有方法
1.1.1.cancel()
停止定时器并取消执行定时器将要执行的操作。仅当定时器处于等待状态时有效
(15)、线程队列/内置模块queue
内置模块queue实现了多生产者、多消费者队列,提供了同步的、线程安全的队列类
1.FIFO(先进先出)队列Queue类
先添加到队列的条目(元素)先被取回
2.LIFO(后进先出)队列LifoQueue类
后被添加到队列的条目先被取回(类似一个堆栈)
3.优先级队列PriorityQueue类
条目将保持排序(使用heapq模块),最小值条目先被取出
# 放进优先级队列的条目经典模式是以下形式的元组:(优先级数字, 数据)
# 如果条目之间无法进行比较会引发TypeError异常
4.Queue类, LifoQueue类, PriorityQueue类的方法
类似于multiprocessing模块中的JoinableQueue类,区别是没有close(), join_thread(), cancel_join_thread()方法且条目放入队列时不需要极小的延迟才让empty()之类的方法返回值可信
# 构造函数同样也有maxsize参数来指定放入条目的上限
# qsize() > 0保证后续调用的get()不被阻塞,qsize() < maxsize也不保证put()不被阻塞。例如在判断后有其他线程调用get()或put()
示例:
运行结果: # 其中一次运行结果
(2, <__main__.A object at 0x0000000002363A00>) # 2最先进入q队列
(3, <__main__.A object at 0x0000000002363C70>) # 3最后进入lq队列
(1, <__main__.A object at 0x0000000002363CD0>) # 1在pq队列中最小
5.SimpleQueue类
无界的FIFO简单队列,缺少任务跟踪等高级功能
# 使用类的实例化创建SimpleQueue类的对象
5.1.SimpleQueue类的方法
5.1.1.qsize()
返回队列的大致大小
# qsize() > 0不保证后续调用的get()不被阻塞
5.1.2.empty()
如果队列为空返回True,否则返回False
# empty()返回False不保证后续调用的get()不被阻塞
5.1.3.put(item, block=True, timeout=None)
将item放入队列。此方法永不阻塞,始终成功(除了潜在的低级错误,例如内存分配失败)
block, timeout——为保持Queue.put()方法的兼容性而提供,其值被忽略
5.1.4.put_nowait(item)
相当于put(item),为保持Queue.put_nowait()兼容性而提供
5.1.5.get(block=True, timeout=None)
相当于Queue.get()
5.1.6.get_nowait()
相当于get(False)
标签: