Python asyncio异步编程的终极指南

聊一下有时候我们使用了asyncio,任务也是I/O密集型任务,但是感觉也没快呢?这是因为可能代码中并没有正确使用。

使用aysncio的时候,有什么条件或要求?

在使用 Python 的 asyncio 框架时,对其他模块或函数是要要求的,这是因为 asyncio 是一种基于 事件循环 和 异步编程模型 的工具,其工作方式与传统的同步编程有所不同。

  1. 1. 使用的模块必须支持异步操作
    在工作中我们要选择支持异步操作的模块来跟asyncio来配合使用,支持异步操作的接口,通常以async函数显示出现,异步操作的例子包括通络通讯,文件读写,数据库操作,HTTP请求等。

例如:

  • • 数据库支持异步操作的模块有aiomysqlasyncpg
  • • HTTP请求支持异步操作的是aiohttp, 这里要说明下我们平时用的最多的requests模块并不支持异步,它是阻塞的;
  • • 文件I/O异步模块是aiofiles, Python库的open()不支持异步。

到这虽然我们理解了要去选择异步模块,到这我们还要问一个问题,为什么要求必须是异步模块呢?

为什么需要异步模块?

  • • asyncio 是基于单线程事件循环的,如果你在异步任务中调用了一个阻塞的同步函数,比如标准的文件 I/O 操作或网络请求,那么事件循环会被阻塞,其他协程无法执行,导致整个程序的并发性能大幅下降。
  • • 只有支持异步的模块,才能在执行 I/O 操作时将控制权交回给事件循环,让其他协程继续运行。
    代码例子:
# 使用同步模块 requests,阻塞主线程
import requests

async def fetch_data_sync():
    response = requests.get('https://example.com')  # 这里会阻塞事件循环
    return response.text

# 使用异步模块aiohttp,非阻塞
import aiohttp

async def fetch_data_async():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://example.com') as response:
            return await response.text()

从上面的代码例子我们可以看出使用requests的时候会阻塞,比如这里请求需要花费10秒,那代码就会在这里停10秒,而不会继续,这就到这了代码运行并不快,因为这里并没有释放将控制权交给事件循环,让其他协程继续运行。

  1. 2. 不要在协程中使用阻塞操作
    因为阻塞操作会占用事件循环的执行时间,导致其他协程无法运行,从而破坏 asyncio 的高并发性能,我们上面的例子已经说明了, 阻塞操作会直接暂停整个事件循环,而不是只暂停当前协程,我们再来个简单的代码例子:
# 错误的例子:使用阻塞操作
import time

async def task():
    time.sleep(5)  # 阻塞操作
    print("Task completed")

# 正确的例子:使用异步操作
import asyncio

async def task():
    await asyncio.sleep(5)  # 非阻塞操作
    print("Task completed")

如何使用多核?

在最后我们说一下如何使用asyncio,我主机核数比较多,我该怎么利用起来,这里我们说2个方式:

  1. 1. 多进程 + 协程
    通过 multiprocessing 模块创建多个进程,每个进程运行自己的事件循环,从而利用多个核心,我们直接给代码示例:
import asyncio
from multiprocessing import Process

async def async_task(name):
    print(f"{name} 开始")
    await asyncio.sleep(1)  # 模拟 I/O 操作
    print(f"{name} 完成")

def run_event_loop():
    asyncio.run(async_task("任务"))

if __name__ == "__main__":
    processes = []
    for i in range(4):  # 启动 4 个进程
        p = Process(target=run_event_loop)
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

因为每个进程都有独立的 Python 解释器和 GIL,因此可以并行运行在多个核心上。

2. 线程池/进程池 + 协程
可以通过 concurrent.futures 模块的线程池或进程池,将部分任务分发到其他线程或进程中运行,同样我们直接上代码:

import asyncio
from concurrent.futures import ProcessPoolExecutor

async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_task, 10)
        print(f"计算结果: {result}")

def cpu_bound_task(n):
    # 模拟 CPU 密集型任务
    total = 0
    for i in range(10**8):
        total += i * n
    return total

asyncio.run(main())

协程负责 I/O 密集型任务,CPU 密集型任务由进程池并行执行,从而实现多核利用。

来源:python运维技术

THE END