开发者

Python中asyncio库实现异步编程的示例

开发者 https://www.devze.com 2025-05-01 09:23 出处:网络 作者: 彬彬侠
目录1. asyncio 库的作用2. 核心概念3. 基本用法3.1 定义和运行协程3.2 事件循环4. 常用功能4.1 并发运行多个协程4.2 超时控制4.3 异步迭代4.4 异步上下文管理器
目录
  • 1. asyncio 库的作用
  • 2. 核心概念
  • 3. 基本用法
    • 3.1 定义和运行协程
    • 3.2 事件循环
  • 4. 常用功能
    • 4.1 并发运行多个协程
    • 4.2 超时控制
    • 4.3 异步迭代
    • 4.4 异步上下文管理器
  • 5. 与外部库集成
    • 6. 高级功能
      • 6.1 任务取消
      • 6.2 同步与异步混合
      • 6.3 自定义事件循环
    • 7. 实际应用场景
      • 8. 注意事项
        • 9. 综合示例

          python 的 asyncio 库是标准库的一部分,引入于 Python 3.4(通过 PEP 3156),用于实现异步编程。它提供了一种基于事件循环(Event Loop)的并发机制,适合处理 I/O 密集型任务(如网络请求、文件操作、数据库查询等)。asyncio 通过协程(coroutines)、任务(tasks)和事件循环,允许程序在等待 I/O 操作时执行其他任务,从而提高效率。

          以下是对 asyncio 库的详细说明和常见用法。

          1. asyncio 库的作用

          • 异步编程:支持非阻塞的并发执行,适合 I/O 密集型任务。
          • 事件循环:管理协程和任务的调度,协调异步操作。
          • 高性能:相比线程或进程,协程的开销更低,适合高并python发场景。
          • 生态支持:与许多库(如 aiohttpaiomysql)集成,支持异步网络请求、数据库操作等。

          2. 核心概念

          • 协程(Coroutine):使用 async def 定义的函数,代表可暂停和恢复的异步操作。
          • 事件循环(Event Loop):asyncio 的核心,负责调度协程和处理 I/O 事件。
          • 任务(Task):协程的封装,用于在事件循环中并发运行。
          • Future:表示尚未完成的操作,协程通常通过 Future 管理结果。
          • Awaitable:可以被 await 的对象,包括协程、任务和 Future。

          3. 基本用法

          以下是 asyncio 的基本用法,展示如何定义和运行协程。

          3.1 定义和运行协程

          import asyncio
          
          # 定义协程
          async def say_hello():
              print("Hello")
              await asyncio.sleep(1)  # 模拟异步 I/O 操作
              print("World")
          
          # 运行协程
          async def main():
              await asyncio.gather(say_hello(), say_hello())  # 并发运行多个协程
          
          # 执行事件循环
          if __name__ == "__main__":
              asyncio.run(main())
          

          输出:

          Hello

          Hello

          World

          World

          说明:

          • async def 定义协程,await 表示暂停点,允许事件循环调度其他任务。
          • asyncio.sleep(1) 模拟异步 I/O(如网络请求),不会阻塞事件循环。
          • asyncio.gather() 并发运行多个协程。
          • asyncio.run() 是运行异步程序的推荐入口,自动创建和关闭事件循环。

          3.2 事件循环

          事件循环是 asyncio 的核心,负责调度协程和处理回调。以下是手动操作事件循环的示例:

          import asyncio
          
          async def task():
              print("Task started")
              await asyncio.sleep(1)
              print("Task finished")
          
          loop = asyncio.get_event_loop()  # 获取事件循环
          try:
              loop.run_until_complete(task())  # 运行协程直到完成
          finally:
              loop.close()  # 关闭事件循环
          

          说明:

          • asyncio.get_event_loop() 获取默认事件循环。
          • loop.run_until_complete() 运行单个协程或 Future。
          • 通常推荐使用 asyncio.run(),因为它更安全且自动管理循环的生命周期。

          4. 常用功能

          asyncio 提供了丰富的 API,以下是常见功能和用法。

          4.1 并发运行多个协程

          使用 asyncio.gather() 或 asyncio.create_task() 实现并发:

          import asyncio
          
          async def task1():
              print("Task 1 started")
              await asyncio.sleep(2)
              print("Task 1 finished")
          
          async def task2():
              print("Task 2 started")
              await asyncio.sleep(1)
              print("Task 2 finished")
          
          async def main():
              # 使用 gather 并发运行
              await asyncio.gather(task1(), task2())
              # 或者使用 create_task
              t1 = asyncio.create_task(task1())
              t2 = asyncio.create_task(task2())
              await t1
              await t2
          
          asyncio.run(main())
          

          输出:

          Task 1 started

          Task 2 started

          Task 2 finished

          Task 1 finished

          说明:

          • asyncio.gather() 等待所有协程完成,返回结果列表。
          • asyncio.create_task() 将协程包装为任务,立即调度运行。

          4.2 超时控制

          使用 asyncio.wait_for() 为协程设置超时:

          import asyncio
          
          async def long_task():
              await asyncio.sleep(5)
              print("Task completed")
          
          async def main():
              try:
                  await asyncio.wait_for(long_task(), timeout=2)  # 2 秒超时
              except asyncio.TimeoutError:
                  print("Task timed out")
          
          asyncio.run(main())
          

          输出:

          Task timed out

          4.3 异步迭代

          asyncio 支持异步迭代器和异步上下文管理器:

          import asyncio
          
          async def async_generator():
              for i in range(3):
                  await asyncio.sleep(1)
                  yield i
          
          async def main():
              async for value in async_generator():
                  print(f"Received: {value}")
          
          asyncio.run(main())
          

          输出:

          Received: 0

          Received: 1

          Received: 2

          4.4 异步上下文管理器

          使用 async with 管理资源:

          import asyncio
          from contextlib import asynccontextmanager
          
          @asynccontextmanager
          async def resource():
              print("Resource acquired")
              try:
                  yield
              finally:
                  print("Resource released")
          
          async def main():
              async with resource():
                  print("Using resource")
                  await asyncio.sleep(1)
          
          asyncio.run(main())
          

          输出:

          Resource acquired

          Using resource

          Resource released

          5. 与外部库集成

          asyncio 常与异步库结合使用,例如:

          aiohttp:异步 HTTP 客户端/服务器。

          import aiohttp
          import asyncio
          
          async def fetch_url(url):
              async with aiohttp.ClientSession() as session:
                  async with session.get(url) as response:
                      return await response.text()
          
          async def main():
              html = await fetch_url("https://example.com")
              print(html[:100])
          
          asyncio.run(main())
          
          • aiomysql:异步 MySQL 数据库操作。

          • aiofiles:异步文件读写。

          6. 高级功能

          6.1 任务取消

          可以取消正在运行的任务:

          import asyncio
          
          async def long_task():
              try:
                  print("Task started")
                  await asyncio.sleep(10)
                  print("Task finished")
              except asyncio.CancelledError:
                  print("Task was cancelled")
                  raise
          
          async def main():
              task = asyncio.create_task(long_task())
              await asyncio.sleep(1)
              tas编程客栈k.cancel()  # 取消任务
              try:
                  await task
              except asyncio.CancelledError:
                  print("Main caught cancellation")
          
          asyncio.run(main())
          

          输出:

          Task started

          Task was cancelled

          Main caught cancellation

          6.2 同步与异步混合

          使用 loop.run_in_executor() 将同步代码(阻塞操作)运行在线程池或进程池中:

          import asyncio
          import time
          
          def blocking_task():
              time.sleep(1)  # 模拟阻塞操作
              return "Done"
          
          async def main():
              loop = asyncio.get_running_loop()
              result = apythonwait loop.run_in_elLhlcmxecutor(None, blocking_task)  # 在默认线程池运行
              print(result)
          
          asyncio.run(main())
          

          输出:

          Done

          6.3 自定义事件循环

          可以自定义事件循环策略,例如使用 uvloop(高性能事件循环):

          import asyncio
          import uvloop
          
          asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
          # 后续代码使用 uvloop 作为事件循环
          

          安装 uvloop:

          pip install uvloop
          

          7. 实际应用场景

          • 网络编程:异步爬虫、Web 服务器(如 FastAPI、Sanic)。
          • 数据库操作:异步查询数据库(如 aiomysql、asyncpg)。
          • 实时应用:聊天 服务器、WebSocket 应用。
          • 并发任务:批量处理网络请求或文件操作。
          • 微服务:异步处理 HTTP 请求或消息队列。

          8. 注意事项

          • 单线程模型:asyncio 在单线程中运行,依赖事件循环调度,无法利用多核 CPU(需要结合 multiprocessing)。
          • 避免阻塞代码:在异步代码中调用同步阻塞函数(如 time.sleeprequests.get)会阻塞事件循环,需使用异步替代或 run_in_executor
          • 协程必须 await:协程对象必须通过 await 或任务调度运行,否则不会执行。
          • 线程安全:事件循环不是线程安全的,避免在多线程中共享同一循环。
          • 调试困难:异步代码可能因调度顺序导致复杂 bug,建议使用日志或调试工具(如 asyncio.run(debug=True))。
          • Python 版本:
            • Python 3.7+ 引入 asyncio.run() 和上下文管理器改进。
            • Python 3.8+ 优化了调试php支持。
            • Python 3.11+ 提供任务组(asyncio.TaskGroup)等新功能。

          9. 综合示例

          以下是一个综合示例,展示异步爬虫的实现:

          import asyncio
          import aiohttp
          
          async def fetch_url(session, url):
              async with session.get(url) as response:
                  return await response.text()
          
          async def main():
              urls = [
                  "https://example.com",
                  "https://python.org",
                  "https://github.com"
              ]
              async with aiohttp.ClientSession() as session:
                  tasks = [fetch_url(session, url) for url in urls]
                  results = await asyncio.gather(*tasks, return_exceptions=True)
                  for url, result in zip(urls, results):
                      if isinstance(result, Exception):
                          print(f"Failed to fetch {url}: {result}")
                      else:
                          print(f"Fetched {url}: {len(result)} bytes")
          
          if __name__ == "__main__":
              asyncio.run(main())
          

          输出示例:

          Fetched https://example.com: 1256 bytes

          Fetched https://python.org: 50342 bytes

          Fetched https://github.com: 123456 bytes

          说明:

          • 使用 aiohttp.ClientSession 管理 HTTP 会话。
          • asyncio.gather 并发请求多个 URL。
          • return_exceptions=True 防止单个失败影响其他任务。

          到此这篇关于Python中asyncio库实现异步编程的示例的文章就介绍到这了,更多相关Python asyncio异步编程内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

          0

          精彩评论

          暂无评论...
          验证码 换一张
          取 消

          关注公众号