python线程安全队列的实现代码是什么
Admin 2022-08-19 群英技术资讯 419 次浏览
一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中一个线程安全的队列是实现线程池和任务队列的基础,本节我们通过threading包中的互斥量threading.Lock()和条件变量threading.Condition()来实现一个简单的、读取安全的线程队列。
包括put、pop、get等方法,为保证线程安全,读写操作时要添加互斥锁;并且pop操作可以设置等待时间以阻塞当前获取元素的线程,当新元素写入队列时通过条件变量通知解除等待操作。
class ThreadSafeQueue(object): def __init__(self, max_size=0): self.queue = [] self.max_size = max_size # max_size为0表示无限大 self.lock = threading.Lock() # 互斥量 self.condition = threading.Condition() # 条件变量 def size(self): """ 获取当前队列的大小 :return: 队列长度 """ # 加锁 self.lock.acquire() size = len(self.queue) self.lock.release() return size def put(self, item): """ 将单个元素放入队列 :param item: :return: """ # 队列已满 max_size为0表示无限大 if self.max_size != 0 and self.size() >= self.max_size: return ThreadSafeException() # 加锁 self.lock.acquire() self.queue.append(item) self.lock.release() self.condition.acquire() # 通知等待读取的线程 self.condition.notify() self.condition.release() return item def batch_put(self, item_list): """ 批量添加元素 :param item_list: :return: """ if not isinstance(item_list, list): item_list = list(item_list) res = [self.put(item) for item in item_list] return res def pop(self, block=False, timeout=0): """ 从队列头部取出元素 :param block: 是否阻塞线程 :param timeout: 等待时间 :return: """ if self.size() == 0: if block: self.condition.acquire() self.condition.wait(timeout) self.condition.release() else: return None # 加锁 self.lock.acquire() item = None if len(self.queue): item = self.queue.pop() self.lock.release() return item def get(self, index): """ 获取指定位置的元素 :param index: :return: """ if self.size() == 0 or index >= self.size(): return None # 加锁 self.lock.acquire() item = self.queue[index] self.lock.release() return item class ThreadSafeException(Exception): pass
def thread_queue_test_1(): thread_queue = ThreadSafeQueue(10) def producer(): while True: thread_queue.put(random.randint(0, 10)) time.sleep(2) def consumer(): while True: print('current time before pop is %d' % time.time()) item = thread_queue.pop(block=True, timeout=3) # item = thread_queue.get(2) if item is not None: print('get value from queue is %s' % item) else: print(item) print('current time after pop is %d' % time.time()) t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join()
测试结果:
我们可以看到生产者线程每隔2s向队列写入一个元素,消费者线程当无数据时默认阻塞3s。通过执行时间发现消费者线程确实发生了阻塞,当生产者写入数据时结束当前等待操作。
def thread_queue_test_2(): thread_queue = ThreadSafeQueue(10) def producer(): while True: thread_queue.put(random.randint(0, 10)) time.sleep(2) def consumer(name): while True: item = thread_queue.pop(block=True, timeout=1) # item = thread_queue.get(2) if item is not None: print('%s get value from queue is %s' % (name, item)) else: print('%s get value from queue is None' % name) t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer, args=('thread1',)) t3 = threading.Thread(target=consumer, args=('thread2',)) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join()
测试结果:
生产者还是每2s生成一个元素写入队列,消费者开启两个线程进行消费,默认阻塞时间为1s,打印结果显示通过加锁确保每次只有一个线程能获取数据,保证了线程读写的安全。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
猜你喜欢
这篇文章主要介绍了python中Flask Web 表单的使用方法介绍,表单的操作是Web程序开发中最核心的模块之一,绝大多数的动态交互功能都是通过表单的形式实现的。更多介绍需要的小伙伴可以参考下面文章内容
cookielib是一个自动处理cookies的模块,如果我们在使用爬虫等技术的时候需要保存cookie,那么cookielib会让你事半功倍!他最常见的搭档模
这篇文章主要介绍了Python爬虫网页元素定位术,文章通过Beautiful Soup模块展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
散点密度图是在散点图的基础上,计算了每个散点周围分布了多少其他的点,并通过颜色表现出来。本文主要介绍了Python绘制散点密度图的三种方式,需要的可以参考下
有的时候我们写的东西不想让别人看到,会设置密码来加密。在Python中,我们写的字符字符不想被别人看到,保护自己的隐私,会选择加密。本文介绍Python字符串加密的五种方法:url编码、base64、ascii、md5 Unicode转中文。具体代码如下:
成为群英会员,开启智能安全云计算之旅
立即注册Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2020 群英 版权所有
增值电信经营许可证 : B1.B2-20140078 粤ICP备09006778号 域名注册商资质 粤 D3.1-20240008