Python 异步编程

简介

Python 异步编程是一种高效的编程方式,它利用了单线程异步、协程、事件循环等机制,以最小的资源消耗实现多任务的同时避免了并发编程带来的复杂性。在异步编程中,任务可以在同一个线程内并发执行,而不需要线程之间的切换,从而大大提高了程序的性能。

Python 中常用的异步编程库包括 asyncioaiohttpaiofilesasyncpgaiomysql 等。asyncio 是 Python 内置的异步编程库,它提供了事件循环、协程、任务等基础组件,可以方便地实现异步编程。aiohttp 是一个基于 asyncio 的 HTTP 客户端和服务器库,可以方便地实现异步网络编程。aiofiles 支持异步读写文件操作,asyncpg 和 aiomysql 用于异步访问 PostgreSQL 和 MySQL 数据库。

场景

  1. Web 应用程序的并发处理:异步编程可以让 Web 服务器同时处理多个请求,在此过程中会涉及多个 I/O 操作,如读取数据库、处理文件上传、发送电子邮件等。
  2. 网络编程:异步编程可用于编写客户端和服务器端应用程序,例如聊天应用程序。
  3. 数据分析和科学计算:当处理大量数据、模型训练和推理时,使用异步编程可以大大提高计算效率。
  4. 消息传递和事件处理:异步编程可以用于消息队列、事件传递和订阅-发布模式等应用程序,可以同时处理多个消息或事件。

关键词解释

异步: 程序在执行过程中并不会阻塞等待结果,而是继续执行下一步操作,直到结果就绪后再回来处理。这种机制能够提升程序的效率和并发性,因为在等待某些操作的时候,程序可以同时执行其他任务。异步常被用于网络通信、文件IO等延迟较高的操作。

异步IO: 一种与语言无关的编程模型,它从单线程、单进程的角度出发,采用协作多任务的方式完成异步IO操作的处理。

async/await: 两个用于定义协程的新 Python 关键字。

asyncio: Python 标准库,为运行和管理协程提供了基础和 API。

API 解释

  1. async:async 是 Python 中的关键字,用于声明异步函数。异步函数返回一个协程对象,协程对象可以并发执行。
1
2
async def coroutine_func():
...
  1. await:await 关键字用于等待异步函数的执行结果。当遇到 await 关键字时,Python 会暂停该异步函数的执行,等待另一个协程的执行完成或者等待异步操作的完成。
1
result = await func()
  1. asynico:asyncio 是 Python 用于异步编程的框架,提供了一组 API 用于异步 I/O 操作的管理。
1
import asyncio
  1. gather:gather() 函数是 asyncio 中的一个 API,它可以并发执行多个协程,并返回全部的协程结果。协程的执行顺序和参数的顺序相同。
1
results = await asyncio.gather(*[coroutine_func1(), coroutine_func2()])
  1. run:run() 函数是 asyncio 模块中的一个快捷函数,用于运行异步函数。
1
2
3
async def main():
...
asyncio.run(main())
  1. async with:async with 语法可以用来管理异步上下文。async with 语法实现了 aenter() 和 aexit() 异步方法的对象。
1
2
async with lock:
...
  1. async for:async for 语法可以用来作为异步生成器的循环。async for 语法实现了 aiter() 和 anext() 异步方法的对象。
1
2
async for item in async_iterator():
...
  1. asyncio.StreamReader:异步操作的流读写器类。用于从异步传输流中读取数据。
1
reader = asyncio.StreamReader()
  1. asyncio.StreamWriter:异步操作的流读写器类。用于向异步传输流中写入数据。
1
writer = asyncio.StreamWriter()
  1. **asyncio.create_task()**:创建并返回一个 Task 对象,Task 对象可以用于管理协程并发执行的状态。
1
task = asyncio.create_task(coroutine_func())
  1. **asyncio.sleep()**:暂停当前协程的执行,等待一定时间。
1
await asyncio.sleep(1)
  1. **asyncio.wait_for()**:等待一个异步操作的结果,并限制其最大等待时间。
1
result = await asyncio.wait_for(coroutine_func(), timeout=1)
  1. **asyncio.wait()**:并发执行多个协程,等待所有协程完成。
1
done, pending = await asyncio.wait(coroutines, timeout=1, return_when=asyncio.FIRST_EXCEPTION)
  1. **asyncio.ensure_future()**:创建并返回一个 Future 对象,Future 对象可以用于管理异步操作的状态。
1
future = asyncio.ensure_future(coroutine_func())
  1. asyncio.Queue:异步操作的队列类,可以用于管理协程之间的通信。
1
queue = asyncio.Queue()
  1. loop:event loop 是异步编程中最主要的对象。event loop 是用于管理协程并发执行和异步 I/O 操作的。
1
loop = asyncio.get_event_loop()

不同版本的介绍

Python3.7 之前的版本

在 Python3.7 之前的版本中,需要使用以下步骤运行异步程序:

  1. 获取事件循环对象:通过 asyncio.get_event_loop() 函数获取当前线程的事件循环对象,如果线程中不存在事件循环对象,则创建一个新的事件循环对象并返回它。
  2. 创建任务对象:通过 asyncio.ensure_future() 或者 loop.create_task() 函数创建协程对象的任务对象,并将任务对象添加到事件循环中。
  3. 运行异步程序:调用事件循环对象的 loop.run_until_complete() 方法来运行异步程序,直到协程对象运行完成后返回结果。
  4. 关闭事件循环对象:在异步程序运行结束后,需要调用事件循环对象的 loop.close() 方法来关闭事件循环对象。

例如,要运行两个协程对象 coro1()coro2(),可以使用以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio

async def coro1():
# do something...
return result1

async def coro2():
# do something...
return result2

loop = asyncio.get_event_loop()
task1 = asyncio.ensure_future(coro1())
task2 = asyncio.ensure_future(coro2())
results = loop.run_until_complete(asyncio.gather(task1, task2))
loop.close()

print(results)

这种方式虽然稍微有点繁琐,但是它仍然是有效的,并且在 Python3.7 之前是常用的异步编程方式。

这个版本不再多说,因为在 3.7 之后引入了更方便更强大的 API。

Python3.7 之后的版本

在 Python 3.7 引入的 asyncio.run() 函数极大地简化了 asyncio 的使用,简化了异步编程的过程,允许我们更轻松地编写异步代码。使用 asyncio.run() 函数,可以摆脱手动获取和关闭事件循环对象的过程,同时也提供了一些可选的参数来方便我们控制事件循环的运行。

例如,我们可以使用以下代码运行两个协程对象 coro1()coro2()

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio

async def coro1():
# do something...
return result1

async def coro2():
# do something...
return result2

results = asyncio.run(asyncio.gather(coro1(), coro2()))

print(results)

这样就非常简单和直观了,无需手动获取和关闭事件循环对象。除此之外,asyncio.run() 函数还支持一些可选的参数,如 debugshieldtimeout,这些参数能够让我们更加灵活、方便地控制异步程序的运行。

总之,asyncio.run() 函数让 asyncio 的使用更加简单和直观,是值得推荐的异步编程方式。

强大的asyncio.gather

asyncio.gather() 函数的确提供了一种更加方便的方式来同时运行多个协程,特别是当需要在程序中同时执行多个并发任务的时候,使用 asyncio.gather() 函数会更加方便和高效。因为该函数可以让我们方便地指定一组协程,然后等待所有协程完成后再统一收集结果。

例如,我们可以使用以下代码运行多个协程:

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio

async def coro1():
# do something...
return result1

async def coro2():
# do something...
return result2

results = asyncio.run(asyncio.gather(coro1(), coro2()))
print(results)

asyncio.gather 函数的文档中,它会返回一个协程对象,该协程对象的结果是一个列表,列表中包含了每个调用的协程对象的结果,同时这个列表的顺序与协程对象的调用顺序一致。因此在示例代码中,传入 asyncio.gather 函数的第一个参数是 func1(),第二个参数是 func2(),因此在 results 的结果中,results[0] 对应着 func1() 的返回值。

需要注意的是,当多个协程对象并发执行的时候,获取返回值的时候需要保证执行顺序一致,否则可能会出现混淆的情况。可以通过调节协程对象执行顺序或者使用协程对象返回值的标识符进行区分,从而避免混淆。

总之,对于需要同时运行多个协程的情况,使用 asyncio.gather() 函数能够提供一种高效灵活的方案,能够方便地并发执行多个协程并收集结果。

结合 for 循环

使用 asyncio.gather() 函数和 for 循环结合起来可以方便地处理一系列相关的异步任务。下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio

async def worker(id):
print(f"Worker {id} start")
await asyncio.sleep(1)
print(f"Worker {id} done")
return f"Result {id}"

async def main():
tasks = []
for i in range(3):
task = asyncio.create_task(worker(i))
tasks.append(task)

results = await asyncio.gather(*tasks)
print("Results:", results)

asyncio.run(main())

这个例子中,我们定义了一个 worker() 协程,它是一个模拟异步任务的函数,会等待 1 秒钟然后返回一个结果。在 main() 函数中,我们使用 for 循环创建了 3 个协程任务对象,并将它们加入到任务列表 tasks 中。然后,我们使用 asyncio.gather() 函数将这些协程一起运行,并等待它们全部完成。最后,我们将所有协程的结果打印出来。

这个例子展示了如何使用 asyncio.gather() 和 for 循环一起处理一批相关的协程任务,简化了异步任务的调度和管理。

案例

最基础的案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio

async def func1():
print('hello')
# 休眠三秒后输出world
await asyncio.sleep(3)
print('world')
return 'a'

async def func2():
print('函数 2')
return 'b'

async def main():
results = await asyncio.gather(func1(),func2())
print(results)

asyncio.run(main())

这段代码使用了asyncio库来进行异步编程。在程序运行时,创建了三个async函数:func1,func2和main。

func1函数中,首先输出了一个字符串”hello”,之后使用await asyncio.sleep(3)来实现3秒的延时,最后输出另一个字符串”world”。这个函数返回了一个字符串’a’。

func2函数中,只是简单地输出了一个字符串”函数2”,并返回了一个字符串’b’。

main函数中,使用了asyncio.gather()函数来并发执行func1和func2函数,并等待所有函数执行完毕。最后,输出了两个函数的返回值。

最后,使用asyncio.run()函数来启动整个异步程序。整个程序运行时,先输出”hello”,然后等待3秒,输出”world”,最后输出”函数2”和[‘a’, ‘b’]。

爬虫案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
import time
import aiohttp

async def async_http():
# 声明一个支持异步的上下文管理器
async with aiohttp.ClientSession() as session:
res = await session.get('http://httpbin.org/delay/2')
print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status}')

tasks = [async_http() for _ in range(5)]
start = time.time()
# Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run()来代替后的启动操作
asyncio.run(asyncio.wait(tasks))
print(f'aiohttp异步耗时:{time.time() - start}')

这段代码是利用aiohttp库实现了异步请求的效果,使用了Python的asyncio框架,其中async_http()是异步请求的函数,通过aiohttp库创建了一个异步的ClientSession对象,使用session.get()方法向指定的url发起异步请求,并返回响应结果res。最后,通过asyncio.run()来启动事件循环,等待所有的异步请求都结束,最后计算异步请求的耗时。

请求类型

除了get请求,aiohttp还支持其它请求类型,如POST、PUT、DELETE等,和requests使用方式类似。

1
2
3
4
5
6
session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

响应的几个方法

对于响应来说,我们可以用如下方法分别获取其中的响应情况。状态码、响应头、响应体、响应体二进制内容、响应体JSON结果,实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python
# @Author : 钢铁知识库
import aiohttp
import asyncio

async def main():
data = {'name': '知识库', 'age': 23}
async with aiohttp.ClientSession() as session:
async with session.post('https://www.httpbin.org/post', data=data) as response:
print('status:', response.status) # 状态码
print('headers:', response.headers) # 响应头
print('body:', await response.text()) # 响应体
print('bytes:', await response.read()) # 响应体二进制内容
print('json:', await response.json()) # 响应体json数据

if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

超时设置

我们可以借助ClientTimeout对象设置超时,例如要设置1秒的超时时间,可以这么实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#!/usr/bin/env python
# @Author : 钢铁知识库
import aiohttp
import asyncio

async def main():
# 设置 1 秒的超时
timeout = aiohttp.ClientTimeout(total=1)
data = {'name': '知识库', 'age': 23}
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get('https://www.httpbin.org/delay/2', data=data) as response:
print('status:', response.status) # 状态码

if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
'''
Traceback (most recent call last):
####中间省略####
raise asyncio.TimeoutError from None
asyncio.exceptions.TimeoutError
'''

这里设置了超时1秒请求延时2秒,发现抛出异常asyncio.TimeoutError,如果正常则响应200。

并发限制

aiohttp可以支持非常高的并发量,但面对高并发网站可能会承受不住,随时有挂掉的危险,这时需要对并发进行一些控制。现在我们借助asyncio 的Semaphore来控制并发量,实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : 钢铁知识库
import asyncio
from datetime import datetime
import aiohttp

# 声明大并发量
semaphore = asyncio.Semaphore(2)

async def get_api():
async with semaphore:
print(f'scrapting...{datetime.now()}')
async with session.get('https://www.baidu.com') as response:
await asyncio.sleep(2)
# print(f'当前时间:{datetime.now()}, {response.status}')

async def main():
global session
session = aiohttp.ClientSession()
tasks = [asyncio.ensure_future(get_api()) for _ in range(1000)]
await asyncio.gather(*tasks)
await session.close()

if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())

在main方法里,我们声明了1000个task,如果没有通过Semaphore进行并发限制,那这1000放到gather方法后会被同时执行,并发量相当大。有了信号量的控制之后,同时运行的task数量就会被控制,这样就能给aiohttp限制速度了。

实战

这部分我会分为需求和方案两个模块,假设有异步编程需求,该给出怎么样的解决方案。

需求1

需求描述

当 Web 应用程序需要处理多个请求时,通常会发生大量的 I/O 操作,例如读取数据库或文件、发送邮件等操作。这些操作通常会占用大量的时间,如果在每个请求中都同步执行它们,将会严重影响响应时间和服务器的性能,如何解决呢?

解决方案

asyncio.start_server 是 asyncio 标准库提供的一个函数,是用于创建一个异步 TCP 服务器的工具方法。该函数的原型如下:

1
2
asyncio.start_server(client_connected_cb, host=None, port=None, *,
loop=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, server_hostname=None)

参数说明如下:

  • client_connected_cb: 当有新的客户端连接时,会调用此回调函数。
  • host: 监听的 IP 地址,默认为 None,表示监听所有可用的地址。
  • port: 监听的端口号。
  • loop: 事件循环,用于异步处理客户端请求。
  • family: 指定要使用的地址族,默认为 socket.AF_UNSPEC,表示同时兼容 IPv4 和 IPv6。
  • flags: 指定 getaddrinfo() 函数的标志,默认为 socket.AI_PASSIVE,表示从所有网络接口获取信息。
  • sock: 指定要使用的套接字对象,如果指定了该参数,则不能同时指定 host 和 port。
  • backlog: 最大等待连接的数量。
  • ssl, server_hostname: 如果要使用 SSL 安全传输协议,则可以指定 SSLContext 对象和服务器的主机名。

使用 asyncio.start_server,我们可以方便地创建一个 TCP 服务器,用于处理客户端的请求。在上面的示例代码中,我们使用 start_server 创建了一个异步 Web 服务器,监听本地的 8080 端口,当有客户端连接时,会调用 handle_request 函数进行异步处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio

async def handle_request(reader, writer):
"""异步处理请求"""
data = await reader.read(1024)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")

# 模拟较长时间的 I/O 操作
await asyncio.sleep(1)

response = f"Hello, {message}!"
writer.write(response.encode())
await writer.drain()
writer.close()

async def main():
"""启动服务器"""
server = await asyncio.start_server(
handle_request, 'localhost', 8080)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()

asyncio.run(main())

该示例代码实现了一个简单的异步 Web 服务器。在 handle_request 函数中,我们先通过 await reader.read(1024) 从客户端读取数据,然后使用 await asyncio.sleep(1) 模拟了一个长时间的 I/O 操作(1 秒钟),最后通过 writer 写入响应数据,并运行 await writer.drain() 将数据异步地发送到客户端。在 main 函数中,我们通过 await asyncio.start_server 启动了服务器,并调用 await server.serve_forever() 不断监听客户端的连接,从而实现了异步处理多个请求的功能。

当我们向该服务器发送多个请求时,可以看到服务器会同时接收多个请求并异步地处理它们,而不会阻塞其他请求:

1
2
3
4
5
6
7
> telnet localhost 8080
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello, World!
Hello, World!Hello, World!
Connection closed by foreign host.

总之,使用 asyncio 实现异步编程可以轻松地实现高效的并发处理,提高服务器的性能和响应速度。

需求 2

需求描述

假设公司的数据中心需要下载非常大的数据集,但下载速度受到带宽和网络等因素的限制,下载一个数据集可能需要数小时甚至数天的时间。而数据中心需要同时下载多个数据集,因此需要一个可以同时下载多个文件的多任务下载工具。

解决方案

使用异步 IO 技术可以达到高效的下载效果,因为在下载一个数据集的过程中,可以同时启动并发的下载任务,遇到 IO 阻塞时,自动切换到其他任务,从而实现了不阻塞其他任务的目的。

实现思路如下:

  1. 创建一个任务队列,用于存放需要下载的文件列表。
  2. 创建一个下载任务的协程,从队列中获取要下载的文件,使用 aiohttp 库中的 aiohttp.ClientSession 类发送异步的 GET 请求下载文件,并保存到本地的文件中。
  3. 创建多个下载任务的协程,使用 asyncio.gather 函数并发执行多个下载任务。
  4. 为了避免下载时 IO 阻塞,使用异步 IO 技术,将每个下载操作封装为一个协程,使用异步 IO 库 asyncio 进行并发控制。

下面是该场景的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import asyncio
import aiohttp

# 下载任务协程
async def download_file(url: str, file_name: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
with open(file_name, 'wb') as f:
while True:
chunk = await response.content.read(1024)
if not chunk:
break
f.write(chunk)

# 多任务下载函数
async def multi_download(urls: list, dir_path: str):
tasks = []
for url in urls:
file_name = url.split('/')[-1]
file_path = dir_path + file_name
task = asyncio.create_task(download_file(url, file_path))
tasks.append(task)
await asyncio.gather(*tasks)

# 测试代码
urls = [
'https://www.example.com/file1.zip',
'https://www.example.com/file2.zip',
'https://www.example.com/file3.zip',
'https://www.example.com/file4.zip',
]

dir_path = '/tmp/downloads/'

asyncio.run(multi_download(urls, dir_path))

在上面的示例中,我们使用 async/await 语法定义了两个协程函数:download_file 和 multi_download,其中 download_file 用于下载单个文件,multi_download 用于同时下载多个文件。在 multi_download 中我们使用 asyncio.gather 函数并发执行多个下载任务。在下载任务中,我们使用 aiohttp 库来发送 HTTP 请求,并使用异步 IO 技术来避免 IO 阻塞。

需求 3

需求描述

当前需要开发一款爬虫系统,这个系统需要能够实时获取多个网站的数据,并且需要进行数据清洗和处理,以便进行数据挖掘和分析。由于数据量比较大,因此系统需要具备高并发和高性能的特点,能够快速响应市场变化,并且能够及时地进行数据清洗和处理。

可以使用异步编程框架,如Asyncio和Tornado等,来实现该爬虫需求。通过异步框架,需要编写高效的异步爬虫程序,同时还可以利用该框架提供的协程和异步IO等功能,提升系统性能,并更好地应对高并发和高性能的要求。

解决方案

1.选择异步编程框架:选择一个适合您项目的异步编程框架,这里我们以Asyncio作为框架示例。Asyncio是一个基于协程的异步框架,可以很好地应对高并发的需求。

2.使用异步HTTP库:从多个网站获取数据需要使用异步HTTP库,而对于Asyncio来说,aiohttp 库是一个不错的选择,它提供了异步 HTTP 客户端和服务器的支持。

3.数据清洗和处理:获取到数据后进行数据清洗和处理,以便进行下一步的分析。使用Python数据处理库如Pandas、NumPy、SciPy来处理数据。

4.部署到生产环境:将开发的爬虫系统部署到生产环境中,可以使用Docker或Kubernetes等容器化技术进行部署。

5.性能优化:优化系统性能和响应速度,可以使用多线程、多进程、Redis等技术来进行优化。

6.监控和日志:监控和记录系统的运行状况和日志,可以使用钉钉、ELK等工具来进行监控和日志管理。