Python 线程间通信
线程共享分配给进程的内存。因此,同一进程中的线程可以相互通信。为了促进线程间通信,线程模块提供了Event对象和Condition对象。
Event对象
Event对象管理内部标志的状态。标志初始为false,并通过set()方法变为true,通过clear()方法重新设置为false。wait()方法会阻塞,直到标志变为true。
Event对象的方法:
is_set()方法
如果内部标志为true,则返回True。
set()方法
将内部标志设置为true。等待它变为true的所有线程将被唤醒。一旦标志为true,调用wait()方法的线程将不会阻塞。
clear()方法
将内部标志重置为false。随后,调用wait()的线程将阻塞,直到再次调用set()将内部标志设置为true。
wait(timeout=None)方法
阻塞,直到内部标志为true。如果进入时内部标志为true,则立即返回。否则,阻塞直到另一个线程调用set()将标志设置为true,或者直到可选的超时发生。
当timeout参数存在且不为None时,它应该是一个浮点数,表示操作的超时时间(秒)。
示例
下面的代码试图模拟交通流量由交通信号(绿灯或红灯)的状态控制。
程序中有两个线程,针对两个不同的函数。signal_state()函数周期性地设置和重置事件,指示从绿灯变为红灯的变化。
traffic_flow()函数等待事件被设置,并在事件保持设置的情况下运行循环。
from threading import *
import time
def signal_state():
while True:
time.sleep(5)
print("Traffic Police Giving GREEN Signal")
event.set()
time.sleep(10)
print("Traffic Police Giving RED Signal")
event.clear()
def traffic_flow():
num=0
while num<10:
print("Waiting for GREEN Signal")
event.wait()
print("GREEN Signal ... Traffic can move")
while event.is_set():
num=num+1
print("Vehicle No:", num," Crossing the Signal")
time.sleep(2)
print("RED Signal ... Traffic has to wait")
event=Event()
t1=Thread(target=signal_state)
t2=Thread(target=traffic_flow)
t1.start()
t2.start()
输出
它将产生以下 输出 −
Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 1 Crossing the Signal
Vehicle No: 2 Crossing the Signal
Vehicle No: 3 Crossing the Signal
Vehicle No: 4 Crossing the Signal
Vehicle No: 5 Crossing the Signal
Signal is RED
RED Signal ... Traffic has to wait
Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 6 Crossing the Signal
Vehicle No: 7 Crossing the Signal
Vehicle No: 8 Crossing the Signal
Vehicle No: 9 Crossing the Signal
Vehicle No: 10 Crossing the Signal
条件对象
线程模块中的Condition类实现了条件变量对象。条件对象会强制一个或多个线程等待,直到被另一个线程通知。条件对象与可重入锁相关联。条件对象具有acquire()和release()方法,这两个方法调用了相关联锁的对应方法。
threading.Condition(lock=None)
以下是Condition对象的方法:
acquire(*args)
获取底层锁。这个方法调用底层锁的相应方法;返回值是该方法返回的任何值。
release()
释放底层锁。这个方法调用底层锁的相应方法;没有返回值。
wait(timeout=None)
这个方法释放底层锁,然后阻塞直到在另一个线程中对相同条件变量进行notify()或notify_all()调用唤醒,或者直到可选的timeout发生。一旦唤醒或超时,它重新获取锁并返回。
wait_for(predicate, timeout=None)
这个实用方法可能会重复调用wait(),直到谓词满足或超时发生。返回值是谓词的最后一个返回值,如果方法超时,则返回False。
notify(n=1)
这个方法最多唤醒等待条件变量的n个线程;如果没有线程在等待,它不起作用。
notify_all()
唤醒所有等待该条件的线程。这个方法类似于notify(),但是唤醒所有等待的线程而不是一个。如果调用线程在调用该方法时尚未获取锁,则抛出RuntimeError。
例子
在下面的代码中,线程t2运行taskB()函数,线程t1运行taskA()函数。线程t1获取条件并通知它。这时,线程t2处于等待状态。在条件被释放后,等待的线程继续消耗被通知函数生成的随机数。
from threading import *
import time
import random
numbers=[]
def taskA(c):
while True:
c.acquire()
num=random.randint(1,10)
print("Generated random number:", num)
numbers.append(num)
print("Notification issued")
c.notify()
c.release()
time.sleep(5)
def taskB(c):
while True:
c.acquire()
print("waiting for update")
c.wait()
print("Obtained random number", numbers.pop())
c.release()
time.sleep(5)
c=Condition()
t1=Thread(target=taskB, args=(c,))
t2=Thread(target=taskA, args=(c,))
t1.start()
t2.start()
当您执行此代码时,将产生以下 输出 –
waiting for update
Generated random number: 4
Notification issued
Obtained random number 4
waiting for update
Generated random number: 6
Notification issued
Obtained random number 6
waiting for update
Generated random number: 10
Notification issued
Obtained random number 10
waiting for update