asyncio模块实现单线程-多任务的异步协程

2023-05-10,,

本篇介绍基于asyncio模块,实现单线程-多任务的异步协程

基本概念

协程函数

协程函数: 定义形式为 async def 的函数;

aysnc

Python3.5+版本新增了aysncawait关键字,这两个语法糖让我们非常方便地定义和使用协程。

如果一个函数的定义被async修饰后,则该函数就是一个特殊的函数(协程函数)

1
2
3
4
5
6
7
# 使用 async 关键字修饰函数后,调用该函数,但不会执行函数,而是返回一个coroutine协程对象
async def get_request(url):
print("正在请求: ", url)
sleep(1)
print('请求结束:', url) get_request('www.b.com')

运行分析:

直接调用这个函数的话并不会被执行,也会出现一条警告 RuntimeWarning: coroutine 'get_request' was never awaited

对于它的解释 官方文档 里提到,当协程程序被调用而不是被等待时(即执行 get_request('www.b.com') 而不是 await get_request('www.b.com') )或者协程没有通过 asyncio.create_task() 被排入计划日程(创建任务对象),asyncio 将会发出一条 RuntimeWarning

当然 asyncio.create_task( get_request) 是py3.7中的,在之前的版本中是用到的 asyncio.ensure_future( get_request )

await

在协程中如果要调用另一个协程就使用await要注意await关键字要在async定义的函数中使用,而反过来async函数可以不出现await
如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多 asyncio API 都被设计为接受可等待对象。
可等待 对象有三种主要类型: 协程, 任务Future.
通过 ensure_futurecreate_task 函数打包协程对象即可得到任务。
Future 是一种特殊的 低层级 可等待对象,表示一个异步操作的 最终结果
不用回调方法编写异步代码后,为了获取异步调用的结果,引入一个 Future 未来对象。Future 封装了与 loop 的交互行为,add_done_callback 方法向 epoll 注册回调函数,当 result 属性得到返回值后,会运行之前注册的回调函数,向上传递给 coroutine。
通常情况下 没有必要 在应用层级的代码中创建 Future 对象

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio

async def producer():
for i in range(1, 6):
print(f'生产:{i}')
await consumer(i) async def consumer(i):
print(f'消费:{i}') asyncio.run(producer())
# asyncio.run() 是py3.7更新出来的,在py3.7中,使用这个可以简单直接的运行 asyncio 程序。

asyncio.run() 函数用来运行最高层级的入口点 “main()” 函数,更多解释详见 官方文档

此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。

协程对象

协程对象*:调用 *协程函数 所返回的对象。

特殊函数被调用后,函数内部的实现语句不会被立即执行,然后该函数调用会返回一个协程对象。

结论:协程对象 == 特殊的函数调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def get_request(url):
print("正在请求: ", url)
sleep(1)
print('请求结束:', url) # 函数调用:返回的就是一个协程对象
c = get_request('www.b.com')
print(c)
# <coroutine object get_request at 0x000002A6DFA026D0> # 创建3个协程对象
urls = [
'1.com', '2.com', '3.com'
]
coroutine_list = []
for url in urls:
c = get_request(url)
coroutine_list.append(c)
print(coroutine_list)
# [<coroutine object get_request at 0x0000022FE5313F10>, <coroutine object get_request at 0x0000022FE52426D0>, <coroutine object get_request at 0x0000022FE5313EB8>]

任务对象

任务对象其实就是对协程对象的进一步封装
任务 被用来设置日程以便 并发 执行协程。

结论:任务对象 == 高级的协程对象 == 特殊的函数调用

特性:可以绑定回调(爬虫中回调函数常用来做数据解析)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
from time import sleep # 协程函数的定义
async def get_request(url):
print("正在请求: ", url)
sleep(1)
print('请求结束:', url) # 函数调用:返回的就是一个协程对象
c = get_request('www.b.com') # 创建一个任务对象:基于协程对象创建的
task = asyncio.ensure_future(c) # ensure_future 是py3.7之前的 # 创建3个任务对象
urls = [
'1.com', '2.com', '3.com'
]
task_list = [] # 存放多个任务对象的列表
for url in urls:
c = get_request(url)
task = asyncio.ensure_future(c)
task_list.append(task)

绑定回调

回调函数什么时候被执行?

任务对象执行结束后执行

task.add_done_callback(func)

func必须要有一个参数,该参数表示的是该回调函数对应的任务对象
回调函数的参数.result() : 任务对象对应的特殊函数执行结束的返回值。

事件循环对象

作用:将其内部注册的任务对象进行异步执行。
事件循环是异步编程的底层基石
在py3.6中我们需要手动创建事件循环对象。
在py3.7中,有了高层级的 asyncio 函数,例如 asyncio.run(),就很少有必要使用 低层级函数 来手动创建和关闭事件循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import asyncio
import time # 函数的定义
# 使用 async 关键字修饰函数后,调用该函数,但不会执行函数,而是返回一个coroutine协程对象
async def get_request(url):
print("正在请求: ", url)
# asyncio.sleep(1) # 阻塞1s没有成功
await asyncio.sleep(1) # 加上await关键字即可,这里的 await 表示等待
print('请求结束:', url) # 创建3个协程对象
urls = [
'1.com', '2.com', '3.com'
]
start = time.time() # 任务列表:存储多个任务对象 # py3.6
tasks = []
for url in urls:
c = get_request(url)
task = asyncio.ensure_future(c)
tasks.append(task)
# 获取当前事件循环,如果当前os线程没有设置并且 set_event_loop() 还没有被调用,asyncio创建一个新的事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks)) # 直接列表会报错,需要修饰以下,这里的 wait 表示挂起 print('总耗时:', time.time() - start) # py3.7
# 异步实现
# async def main():
# tasks = []
# for url in urls:
# c = get_request(url)
# task = asyncio.create_task (c)
# tasks.append(task)
# await asyncio.gather(*tasks)
# print('总耗时:', time.time() - start)
#
# asyncio.run(main()) # 当然这样的写法仍是同步
# async def main():
# for url in urls:
# c = get_request(url)
# task = asyncio.create_task(c)
# await task
# print('总耗时:', time.time() - start)
#
# asyncio.run(main())

与py3.6相比,都是先做一个任务列表,然后py3.6需要手动创建事件循环对象get_event_loop 并使用 run_until_complete 来达到异步执行,而在py3.7中,gather会并发的执行传入的可等待对象并在run的调用下完成异步执行。所以在新版py3.7中,我们无需手动创建和关闭事件循环了。
py3.7用 create_task 代替 ensure_future。

编码流程

定义协程函数

创建协程对象

封装任务对象

绑定回调函数

创建事件循环对象

将任务对象注册到事件循环对象中,并且开启事件循环。

按照流程完整的py3.6代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import time # 定义协程函数
async def get_request(url):
print("正在请求: ", url)
# asyncio.sleep(1) # 阻塞1s没有成功
await asyncio.sleep(1) # 加上await关键字即可,这里的 await 表示等待
print('请求结束:', url)
return '我去回调啦' def parse(task): # task 表示与回调函数绑定的任务对象 / 给回调函数传入任务对象
print('i am task callback() !!!', task.result()) urls = [
'1.com', '2.com', '3.com'
]
start = time.time() # 任务列表:存储多个任务对象
tasks = []
for url in urls:
# 创建协程对象
c = get_request(url)
# 封装任务对象
task = asyncio.ensure_future(c)
# 绑定回调
task.add_done_callback(parse)
tasks.append(task)
# 创建事件循环对象
loop = asyncio.get_event_loop()
# 将任务对象注册到事件循环对象中,并且开启事件循环
loop.run_until_complete(asyncio.wait(tasks)) # 直接列表会报错,需要修饰以下,这里的 wait 表示挂起 print('总耗时:', time.time() - start)

note:在特殊函数内部的实现语句中不可以出现不支持异步的模块对应的代码,否则就会终止多任务异步协程的异步效果。

在py3.7中,则为

定义协程函数

定义 asyncio 程序的主入口

创建协程对象
封装任务对象
绑定回调函数

asyncio.run(main())

按照流程完整的py3.7代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import time # 定义协程函数
async def get_request(url):
print("正在请求: ", url)
await asyncio.sleep(1)
print('请求结束:', url)
return '我去回调啦' def parse(task): # task 表示与回调函数绑定的任务对象 / 给回调函数传入任务对象
print('i am task callback() !!!', task.result()) urls = [
'1.com', '2.com', '3.com'
]
start = time.time() # 定义 asyncio 程序的入口点
async def main():
tasks = []
for url in urls:
# 创建协程对象
c = get_request(url)
# 封装任务对象
task = asyncio.create_task(c)
# 绑定回调函数
task.add_done_callback(parse)
tasks.append(task)
await asyncio.gather(*tasks)
print('总耗时:', time.time() - start) asyncio.run(main())

异步的本质

按照注册顺序执行,遇到阻塞就会挂起,执行下一个任务。

当上一个任务的阻塞结束后,就会继续执行该任务。

真正的挂起是由 asyncio.wait(tasks) 做到的

图片来自: 谈谈Python协程技术的演进

图片来自 理解 Python asyncio

底层还没有理解,先把大佬的图粘过来慢慢研究

asyncio模块实现单线程-多任务的异步协程的相关教程结束。

《asyncio模块实现单线程-多任务的异步协程.doc》

下载本文的Word格式文档,以方便收藏与打印。