Python asyncio异步编程的终极指南
聊一下有时候我们使用了asyncio,任务也是I/O密集型任务,但是感觉也没快呢?这是因为可能代码中并没有正确使用。
使用aysncio的时候,有什么条件或要求?
在使用 Python 的 asyncio
框架时,对其他模块或函数是要要求的,这是因为 asyncio
是一种基于 事件循环 和 异步编程模型 的工具,其工作方式与传统的同步编程有所不同。
-
1. 使用的模块必须支持异步操作
在工作中我们要选择支持异步操作的模块来跟asyncio来配合使用,支持异步操作的接口,通常以async
函数显示出现,异步操作的例子包括通络通讯,文件读写,数据库操作,HTTP请求等。
例如:
-
• 数据库支持异步操作的模块有 aiomysql
和asyncpg
-
• HTTP请求支持异步操作的是 aiohttp
, 这里要说明下我们平时用的最多的requests模块并不支持异步,它是阻塞的; -
• 文件I/O异步模块是 aiofiles
, Python库的open()不支持异步。
到这虽然我们理解了要去选择异步模块,到这我们还要问一个问题,为什么要求必须是异步模块呢?
为什么需要异步模块?
-
• asyncio
是基于单线程事件循环的,如果你在异步任务中调用了一个阻塞的同步函数,比如标准的文件 I/O 操作或网络请求,那么事件循环会被阻塞,其他协程无法执行,导致整个程序的并发性能大幅下降。 -
• 只有支持异步的模块,才能在执行 I/O 操作时将控制权交回给事件循环,让其他协程继续运行。
代码例子:
# 使用同步模块 requests,阻塞主线程
import requests
async def fetch_data_sync():
response = requests.get('https://example.com') # 这里会阻塞事件循环
return response.text
# 使用异步模块aiohttp,非阻塞
import aiohttp
async def fetch_data_async():
async with aiohttp.ClientSession() as session:
async with session.get('https://example.com') as response:
return await response.text()
从上面的代码例子我们可以看出使用requests的时候会阻塞,比如这里请求需要花费10秒,那代码就会在这里停10秒,而不会继续,这就到这了代码运行并不快,因为这里并没有释放将控制权交给事件循环,让其他协程继续运行。
-
2. 不要在协程中使用阻塞操作
因为阻塞操作会占用事件循环的执行时间,导致其他协程无法运行,从而破坏asyncio
的高并发性能,我们上面的例子已经说明了, 阻塞操作会直接暂停整个事件循环,而不是只暂停当前协程,我们再来个简单的代码例子:
# 错误的例子:使用阻塞操作
import time
async def task():
time.sleep(5) # 阻塞操作
print("Task completed")
# 正确的例子:使用异步操作
import asyncio
async def task():
await asyncio.sleep(5) # 非阻塞操作
print("Task completed")
如何使用多核?
在最后我们说一下如何使用asyncio,我主机核数比较多,我该怎么利用起来,这里我们说2个方式:
-
1. 多进程 + 协程
通过multiprocessing
模块创建多个进程,每个进程运行自己的事件循环,从而利用多个核心,我们直接给代码示例:
import asyncio
from multiprocessing import Process
async def async_task(name):
print(f"{name} 开始")
await asyncio.sleep(1) # 模拟 I/O 操作
print(f"{name} 完成")
def run_event_loop():
asyncio.run(async_task("任务"))
if __name__ == "__main__":
processes = []
for i in range(4): # 启动 4 个进程
p = Process(target=run_event_loop)
processes.append(p)
p.start()
for p in processes:
p.join()
因为每个进程都有独立的 Python 解释器和 GIL,因此可以并行运行在多个核心上。
2. 线程池/进程池 + 协程
可以通过 concurrent.futures
模块的线程池或进程池,将部分任务分发到其他线程或进程中运行,同样我们直接上代码:
import asyncio
from concurrent.futures import ProcessPoolExecutor
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_task, 10)
print(f"计算结果: {result}")
def cpu_bound_task(n):
# 模拟 CPU 密集型任务
total = 0
for i in range(10**8):
total += i * n
return total
asyncio.run(main())
协程负责 I/O 密集型任务,CPU 密集型任务由进程池并行执行,从而实现多核利用。
来源:python运维技术
THE END