Python中实现异步I/O的方法及过程是什么

Admin 2022-08-15 群英技术资讯 469 次浏览

关于“Python中实现异步I/O的方法及过程是什么”的知识有一些人不是很理解,对此小编给大家总结了相关内容,具有一定的参考借鉴价值,而且易于学习与理解,希望能对大家有所帮助,有这个方面学习需要的朋友就继续往下看吧。


它的功能与linux的epoll,还是select模块,poll等类似;实现高效的I/O multiplexing,  常用于非阻塞的socket的编程中; 简单介绍一下这个模块,更多内容查看 python文档:https://docs.python.org/3/library/selectors.html

 

1. 模块定义了一个 BaseSelector的抽象基类, 以及它的子类,包括:SelectSelector, PollSelector, EpollSelector, DevpollSelector, KqueueSelector.    

另外还有一个DefaultSelector类,它其实是以上其中一个子类的别名而已,它自动选择为当前环境中最有效的Selector,所以平时用 DefaultSelector类就可以了,其它用不着。

 

2. 模块定义了两个常量,用于描述 event Mask

EVENT_READ :      表示可读的; 它的值其实是1;

EVENT_WRITE:      表示可写的; 它的值其实是2;

 

3. 模块定义了一个 SelectorKey类, 一般用这个类的实例 来描述一个已经注册的文件对象的状态, 这个类的几个属性常用到:

fileobj:   表示已经注册的文件对象;

fd:          表示文件对象的描述符,是一个整数,它是文件对象的 fileno()方法的返回值;

events:    表示注册一个文件对象时,我们等待的events, 即上面的event Mask, 是可读呢还是可写呢!!

data:       表示注册一个文件对象是邦定的data;

 

 

4. 最后说说抽象基类中的方法;

register(fileobj, events, data=None) 

作用:注册一个文件对象。

参数: fileobj——即可以是fd 也可以是一个拥有fileno()方法的对象;

events——上面的event Mask 常量; data

返回值: 一个SelectorKey类的实例;

unregister(fileobj) 

作用: 注销一个已经注册过的文件对象;

返回值:一个SelectorKey类的实例;

modify(fileobj, events, data=None)

作用:用于修改一个注册过的文件对象,比如从监听可读变为监听可写;它其实就是register() 后再跟unregister(), 但是使用 modify( ) 更高效;

返回值:一个SelectorKey类的实例;

select(timeout=None)

作用: 用于选择满足我们监听的event的文件对象;

返回值: 是一个(key, events)的元组, 其中key是一个SelectorKey类的实例, 而events 就是 event Mask(EVENT_READ或EVENT_WRITE,或者二者的组合)

close() 

作用:关闭 selector。 最后一定要记得调用它, 要确保所有的资源被释放;

get_key(fileobj)  

作用: 返回注册文件对象的 key;

返回值 :一个SelectorKey类的实例;

 服务端

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import *
import selectors

sel=selectors.DefaultSelector()

def accept(server_fileobj,mask):
    coon,addr = server_fileobj.accept()
    print(coon,addr,mask)
    sel.register(coon,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data = conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(b'hello')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj = socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False)#设置socket的接口为非阻塞

#相当于往select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept
sel.register(server_fileobj,selectors.EVENT_READ,accept)



while True:
    #检测所有的fileobj,是否有完成wait data的
    events = sel.select()
    # SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
    for sel_obj,mask in events:
        callable = sel_obj.data
        callable(sel_obj.fileobj,mask)

 客户端

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))

 

模拟请求

 1 #1. epoll并不代表一定比select好
 2 # 在并发高的情况下,连接活跃度不是很高, epoll比select
 3 # 并发性不高,同时连接很活跃, select比epoll好
 4 
 5 #通过非阻塞io实现http请求
 6 # select + 回调 + 事件循环
 7 #  并发性高
 8 # 使用单线程
 9 
10 import socket
11 from urllib.parse import urlparse
12 from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
13 
14 
15 selector = DefaultSelector()
16 #使用select完成http请求
17 urls = []
18 stop = False
19 
20 
21 class Fetcher:
22     def connected(self, key):
23         selector.unregister(key.fd)
24         self.client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode("utf8"))
25         selector.register(self.client.fileno(), EVENT_READ, self.readable)
26 
27     def readable(self, key):
28         d = self.client.recv(1024)
29         if d:
30             self.data += d
31         else:
32             selector.unregister(key.fd)
33             data = self.data.decode("utf8")
34             html_data = data.split("\r\n\r\n")[1]
35             print(html_data)
36             self.client.close()
37             urls.remove(self.spider_url)
38             if not urls:
39                 global stop
40                 stop = True
41 
42     def get_url(self, url):
43         self.spider_url = url
44         url = urlparse(url)
45         self.host = url.netloc
46         self.path = url.path
47         self.data = b""
48         if self.path == "":
49             self.path = "/"
50 
51         # 建立socket连接
52         self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
53         self.client.setblocking(False)
54 
55         try:
56             self.client.connect((self.host, 80))  # 阻塞不会消耗cpu
57         except BlockingIOError as e:
58             pass
59 
60         #注册
61         selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
62 
63 
64 #心脏, 心脏在不停的跳动,跳动就会知道调用什么代码,执行什么方法
65 #不停的向操作系统询问哪些socket已经准备好了,然后执行回调方法
66 def loop():
67     #事件循环,不停的请求socket的状态并调用对应的回调函数
68     #1. select本身是不支持register模式
69     #2. socket状态变化以后的回调是由程序员完成的
70     while not stop:
71         ready = selector.select()
72         for key, mask in ready:
73             call_back = key.data
74             call_back(key)
75     #回调+事件循环+select(poll\epoll)
76 
77 if __name__ == "__main__":
78     fetcher = Fetcher()
79     import time
80     start_time = time.time()
81     for url in range(20):
82         url = "http://shop.projectsedu.com/goods/{}/".format(url)
83         urls.append(url)
84         fetcher = Fetcher()
85         fetcher.get_url(url)
86     loop()
87     print(time.time()-start_time)

 


感谢各位的阅读,以上就是“Python中实现异步I/O的方法及过程是什么”的内容了,经过本文的学习后,相信大家对Python中实现异步I/O的方法及过程是什么都有更深刻的体会了吧。这里是群英网络,小编将为大家推送更多相关知识点的文章,欢迎关注! 群英智防CDN,智能加速解决方案

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。

猜你喜欢

成为群英会员,开启智能安全云计算之旅

立即注册
专业资深工程师驻守
7X24小时快速响应
一站式无忧技术支持
免费备案服务
免费拨打  400-678-4567
免费拨打  400-678-4567 免费拨打 400-678-4567 或 0668-2555555
在线客服
微信公众号
返回顶部
返回顶部 返回顶部
在线客服
在线客服