python线程安全队列的实现代码是什么
Admin 2022-08-19 群英技术资讯 269 次浏览
一个完整的线程池由下面几部分组成,线程安全队列、任务对象、线程处理对象、线程池对象。其中一个线程安全的队列是实现线程池和任务队列的基础,本节我们通过threading包中的互斥量threading.Lock()和条件变量threading.Condition()来实现一个简单的、读取安全的线程队列。
包括put、pop、get等方法,为保证线程安全,读写操作时要添加互斥锁;并且pop操作可以设置等待时间以阻塞当前获取元素的线程,当新元素写入队列时通过条件变量通知解除等待操作。
class ThreadSafeQueue(object): def __init__(self, max_size=0): self.queue = [] self.max_size = max_size # max_size为0表示无限大 self.lock = threading.Lock() # 互斥量 self.condition = threading.Condition() # 条件变量 def size(self): """ 获取当前队列的大小 :return: 队列长度 """ # 加锁 self.lock.acquire() size = len(self.queue) self.lock.release() return size def put(self, item): """ 将单个元素放入队列 :param item: :return: """ # 队列已满 max_size为0表示无限大 if self.max_size != 0 and self.size() >= self.max_size: return ThreadSafeException() # 加锁 self.lock.acquire() self.queue.append(item) self.lock.release() self.condition.acquire() # 通知等待读取的线程 self.condition.notify() self.condition.release() return item def batch_put(self, item_list): """ 批量添加元素 :param item_list: :return: """ if not isinstance(item_list, list): item_list = list(item_list) res = [self.put(item) for item in item_list] return res def pop(self, block=False, timeout=0): """ 从队列头部取出元素 :param block: 是否阻塞线程 :param timeout: 等待时间 :return: """ if self.size() == 0: if block: self.condition.acquire() self.condition.wait(timeout) self.condition.release() else: return None # 加锁 self.lock.acquire() item = None if len(self.queue): item = self.queue.pop() self.lock.release() return item def get(self, index): """ 获取指定位置的元素 :param index: :return: """ if self.size() == 0 or index >= self.size(): return None # 加锁 self.lock.acquire() item = self.queue[index] self.lock.release() return item class ThreadSafeException(Exception): pass
def thread_queue_test_1(): thread_queue = ThreadSafeQueue(10) def producer(): while True: thread_queue.put(random.randint(0, 10)) time.sleep(2) def consumer(): while True: print('current time before pop is %d' % time.time()) item = thread_queue.pop(block=True, timeout=3) # item = thread_queue.get(2) if item is not None: print('get value from queue is %s' % item) else: print(item) print('current time after pop is %d' % time.time()) t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t1.start() t2.start() t1.join() t2.join()
测试结果:
我们可以看到生产者线程每隔2s向队列写入一个元素,消费者线程当无数据时默认阻塞3s。通过执行时间发现消费者线程确实发生了阻塞,当生产者写入数据时结束当前等待操作。
def thread_queue_test_2(): thread_queue = ThreadSafeQueue(10) def producer(): while True: thread_queue.put(random.randint(0, 10)) time.sleep(2) def consumer(name): while True: item = thread_queue.pop(block=True, timeout=1) # item = thread_queue.get(2) if item is not None: print('%s get value from queue is %s' % (name, item)) else: print('%s get value from queue is None' % name) t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer, args=('thread1',)) t3 = threading.Thread(target=consumer, args=('thread2',)) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join()
测试结果:
生产者还是每2s生成一个元素写入队列,消费者开启两个线程进行消费,默认阻塞时间为1s,打印结果显示通过加锁确保每次只有一个线程能获取数据,保证了线程读写的安全。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
猜你喜欢
这篇文章主要介绍了python计算阶乘的两个函数用法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
pip 是一个现代的、通用的安装和管理 Python 包的工具,这意味着它是一个工具,允许你安装和管理不属于标准库的其他库和依赖。
我们首先有一个成绩表单,但是学生的成绩是按照学号进行排序的,现在,我们希望清晰明了的知道每一个学生的名次,并且需要将学生按照成绩的高低重新进行排序。也就是说,我们将学生从按照学号排序转变为按照成绩从高到低进行排序。
这篇文章会基于Python对微信好友进行数据分析,这里选择的维度主要有:性别、头像、签名、位置,主要采用图表和词云两种形式来呈现结果,其中,对文本类信息会采用词频分析和情感分析两种方法,感兴趣的小伙伴可以了解一下
冒泡排序(Bubble Sort)是一种简单的排序算法。它重复地遍历要排序的数列,一次比较两个元素,如果他们的顺序错误就把他们交换过来。遍历数列的工作是重复地进行直到没有再需要交换,也就是说该数列已经排序完成。这个算法的名字由来是因为越小的元素会经由交换慢慢“浮”到数列的顶端。
成为群英会员,开启智能安全云计算之旅
立即注册Copyright © QY Network Company Ltd. All Rights Reserved. 2003-2020 群英 版权所有
增值电信经营许可证 : B1.B2-20140078 粤ICP备09006778号 域名注册商资质 粤 D3.1-20240008