Event 简介
Event 事件 是线程间通信的最简单方法之一,主要用于线程同步。
处理机制
定义一个全局内置标志Flag,如果Flag为False,执行到 event.wait 时程序就会阻塞,如果Flag为True,event.wait 便不会阻塞
【注意如果处于阻塞状态,不管在哪使得Flag为true,wait都会继续执行】
接口
set() 将标志设置为True,并通知所有处于阻塞状态的线程恢复运行
clear() 将标志设置为False
isSet() 获取内置标志的状态,返回 True 或者 False
wait(timeout) 如果标志为False,将使得线程阻塞,如果为True,继续运行,默认为False
示例代码
示例代码--等通知
import threading import timeevent = threading.Event()def chihuoguo(name):# 等待事件,进入等待阻塞状态print '%s 已经启动' % threading.currentThread().getName()print '小伙伴 %s 已经进入就餐状态!'%nametime.sleep(1)event.wait()# 收到事件后进入运行状态print '%s 收到通知了.' % threading.currentThread().getName()print '小伙伴 %s 开始吃咯!'%namethreads = [] thread1 = threading.Thread(target=chihuoguo, args=("a", )) thread2 = threading.Thread(target=chihuoguo, args=("b", )) threads.append(thread1) threads.append(thread2)for thread in threads:thread.start()time.sleep(0.1) # 发送事件通知 print '主线程通知小伙伴开吃咯!' event.set()
示例代码--互相通知
import threading import timedef producer():print u'等人来买包子....'event.wait()#event.clear()print event.isSet()print u'chef:sb is coming for baozi...'print u'chef:making a baozi for sb...'time.sleep(5)print u'chef:你的包子好了...'event.set()def consumer():print u'chenchao:去买包子....'event.set()time.sleep(2)print 'chenchao:waiting for baozi to be ready...'print event.wait()print u'chenchao:哎呀真好吃....'event = threading.Event()p = threading.Thread(target=producer,args=()) c = threading.Thread(target=consumer,args=()) p.start() c.start()
输出
等人来买包子.... chenchao:去买包子.... True chef:sb is coming for baozi... chef:making a baozi for sb... chenchao:waiting for baozi to be ready... True chenchao:哎呀真好吃.... chef:你的包子好了...
上面实现了一个生产者-消费者模式,显然有错误,包子还没做好就吃上了。
稍微细心的缕下思路就会发现,消费者中的wait并没有阻塞线程,因为Flag此时为True
解决方法:
1. 用另一个 event2 来阻塞线程
2. 在生产者获得set时及时把Flag设置为False 【取消生产者中 event.clear() 的注释即可】
注意点1
import threading event = threading.Event() print(1) print(event.wait()) # 打印也会使线程阻塞 print(2)
注意点2
import time import threadingdef myfunc():while 1:time.sleep(1)print(1)event = threading.Event()ts = [] if len(ts) > 2:event.wait() # 此时阻塞,已经开启的线程将继续运行for i in range(12):t = threading.Thread(target=myfunc)t.start()ts.append(t)
实战案例
多线程验证代理ip的有效性
问题:计算机并不能无休止的增加线程,每台计算机都有自己的上限
### 计算机能够执行的最大线程数def myfunc():time.sleep(2)count = 0 while 1:count += 1print(count)t = threading.Thread(target=myfunc)t.start()
超过上限,就会报错
thread.error: can't start new thread
思路:设置最大线程数,当启动的线程超过最大限制时,阻塞,不再生成新线程,并且持续跟踪线程数,一旦减小或者小于某个阈值,就取消阻塞,继续生成线程
class MyTestProxy(object):def __init__(self):self.sFile = 'ip.txt'self.dFile = 'alive.txt'self.url = 'https://www.qiushibaike.com/text/'self.threadmax = 500 # 最大线程数self.threadmin = 400 # 最低线程数self.timeout = 3self.regex = re.compile('qiushibaike.com')self.aliveList = []self.event = threading.Event()self.event2 = threading.Event()self.lock = threading.Lock()self.run()def run(self):with open(self.sFile, 'rb') as fp:lines = fp.readlines()self.ts = 0 # 初始化线程数while lines:if self.ts > self.threadmax:self.event.clear()self.event.wait() # 超过设定线程就阻塞 line = lines.pop()t = threading.Thread(target=self.linkWithProxy, args=(line, ))t.start()self.lock.acquire()self.ts += 1 # 启动一个就加1,ts 被其他线程一直在更新,所以加锁 self.lock.release()self.event2.wait() # 处理完毕后统一存储with open(self.dFile, 'w') as fp:for i in range(len(self.aliveList)):fp.write(self.aliveList[i])def act(self):# 执行完一个线程就减1,因为同时执行,要加锁 self.lock.acquire()self.ts -= 1self.lock.release()print(self.ts)if self.ts < self.threadmin:self.event.set() # 小于最低线程取消阻塞if self.ts == 0:self.event2.set()def linkWithProxy(self, line):# 爬虫server = line.strip()# print(server)protocol = line.split(':')[0]opener = urllib2.build_opener(urllib2.ProxyHandler({protocol:server}))urllib2.install_opener(opener)try:response = urllib2.urlopen(self.url, timeout=self.timeout)except:returnelse:try:str = response.read()if self.regex.search(str):# print(str)print('%s connect success'%server)print(response.geturl())self.aliveList.append(line)except:returnfinally:self.act()if __name__ == '__main__':time.clock()tp = MyTestProxy()print(time.clock())
效率还是不错的
参考资料:
http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html