运行单个async异步任务的示例代码如下:
from __future__ import annotations
import asyncio
import datetime
import uuid
from typing import Dict
async def my_async_func() -> Dict[str, str]:
await asyncio.sleep(1) # 默认使用 asyncio.sleep 作为耗时任务
ret = {
'uuid': uuid.uuid4().hex,
'time': datetime.datetime.now().isoformat(),
'coroutine_id': id(asyncio.current_task()),
}
return ret
if __name__ == '__main__':
# 使用 asyncio.run 运行单个协程
# 此时传入的 my_async_func() 是一个 Coroutine 对象
print(asyncio.run(my_async_func()))
运行后可以得到如下的打印结果:
{
'uuid': 'ee99da4393714a73baaddc4e3bb31f89',
'time': '2024-11-23T12:21:48.380767',
'coroutine_id': 5021212608
}
上面的示例代码通用 asyncio.run
库函数来运行一个异步任务函数,通用await
关键字获取异步函数运行后的结果。
from __future__ import annotations
import asyncio
import datetime
import uuid
from typing import Dict
async def my_async_func() -> Dict[str, str]:
await asyncio.sleep(1) # 默认使用 asyncio.sleep 作为耗时任务
ret = {
'uuid': uuid.uuid4().hex,
'time': datetime.datetime.now().isoformat(),
'coroutine_id': id(asyncio.current_task()),
}
return ret
async def my_async_main():
print('start')
# 在 my_async_main 中运行单个协程
ret = await my_async_func()
print(ret)
print('end')
if __name__ == '__main__':
# 使用 asyncio.run 运行一个协程入口函数 my_async_main()
# 在 my_async_main 函数中运行单个协程:await my_async_func()
print(asyncio.run(my_async_main()))
上面的代码接口可以概括为:
from __future__ import annotations
import asyncio
import datetime
import json
import uuid
from typing import Dict
async def my_async_func() -> Dict[str, str]:
await asyncio.sleep(2) # 默认使用 asyncio.sleep 作为耗时任务
ret = {
'uuid': uuid.uuid4().hex,
'time': datetime.datetime.now().isoformat(),
'coroutine_id': id(asyncio.current_task()),
}
return ret
async def my_async_main():
print('start')
# 使用 asyncio.gather 调度多个协程运行
ret = await asyncio.gather(my_async_func(), my_async_func(), my_async_func())
print(json.dumps(ret, ensure_ascii = False, indent = 4))
print('end')
if __name__ == '__main__':
# 使用 asyncio.run 运行一个协程入口函数 my_async_main()
print(asyncio.run(my_async_main()))
运行效果为:
start
[
{
"uuid": "4630638d03594f209387a49e7e5926eb",
"time": "2024-11-23T13:45:43.743456",
"coroutine_id": 4860356992
},
{
"uuid": "04ba280c906a421d994f2871344fe224",
"time": "2024-11-23T13:45:43.743578",
"coroutine_id": 4860352960
},
{
"uuid": "a73788bfc4d2410091fa97dfa997b90d",
"time": "2024-11-23T13:45:43.743625",
"coroutine_id": 4860355072
}
]
end
None
值得注意的是,asyncio.gather 中也可以传入一个或多个 Task 对象:
async def my_async_main():
print('start')
# 使用 create_task 将 Coroutine 对象转换为 Task 对象
# 使用 asyncio.gather 调度多个Task对象运行
ret = await asyncio.gather(
asyncio.create_task(my_async_func()),
asyncio.create_task(my_async_func()),
asyncio.create_task(my_async_func())
)
print(json.dumps(ret, ensure_ascii = False, indent = 4))
print('end')
from __future__ import annotations
import asyncio
import datetime
import uuid
from asyncio import FIRST_COMPLETED
from typing import Dict
async def my_async_func(delta: int) -> Dict[str, str]:
await asyncio.sleep(delta) # 默认使用 asyncio.sleep 作为耗时任务
ret = {
'uuid': uuid.uuid4().hex,
'time': datetime.datetime.now().isoformat(),
'coroutine_id': id(asyncio.current_task()),
}
return ret
async def my_async_main():
print('start')
# 使用 create_task 将 Coroutine 对象转换为 Task 对象
# 将多个 Task 对象作为一个列表传给 asyncio.wait
# 使用 asyncio.wait 调度多个Task对象运行
# 根据设置的 return_when 参数的不同,反回的 done 和 pending 集合的对象个数也不同
task_list = [
asyncio.create_task(my_async_func(1)),
asyncio.create_task(my_async_func(2)),
asyncio.create_task(my_async_func(3))
]
done, pending = await asyncio.wait(task_list, return_when = FIRST_COMPLETED)
for d in done:
print(type(d))
print('DONE:', d)
for p in pending:
print(type(p))
print('PENDING:', p)
print(await p)
print('end')
if __name__ == '__main__':
# 使用 asyncio.run 运行一个协程入口函数 my_async_main()
print(asyncio.run(my_async_main()))
运行效果为:
start
<class '_asyncio.Task'>
DONE: <Task finished name='Task-2' coro=<my_async_func() done, defined at /Users/cyx/Documents/BoardStudy/Python3/AsyncIO/demo_test/multi_3.py:14> result={'coroutine_id': 5095254400, 'time': '2024-11-23T13:47:59.710775', 'uuid': '8f91bc728b5e...ac7938c5ba36e'}>
<class '_asyncio.Task'>
PENDING: <Task pending name='Task-4' coro=<my_async_func() running at /Users/cyx/Documents/BoardStudy/Python3/AsyncIO/demo_test/multi_3.py:15> wait_for=<Future pending cb=[Task.task_wakeup()]>>
{'uuid': '94e74a0052964923b079f7469fd7976e', 'time': '2024-11-23T13:48:01.710907', 'coroutine_id': 5095252480}
<class '_asyncio.Task'>
PENDING: <Task finished name='Task-3' coro=<my_async_func() done, defined at /Users/cyx/Documents/BoardStudy/Python3/AsyncIO/demo_test/multi_3.py:14> result={'coroutine_id': 5095250368, 'time': '2024-11-23T13:48:00.710883', 'uuid': '68cbf4c14538...a521f0b9513cd'}>
{'uuid': '68cbf4c145384796bf2a521f0b9513cd', 'time': '2024-11-23T13:48:00.710883', 'coroutine_id': 5095250368}
end
None
需要注意的是,不能忘 asyncio.wait 中传入 Coroutine 对象:
async def my_async_main():
print('start')
# 如果使用 Coroutine 对象列表传给 asyncio.wait,则会报错:
# TypeError: Passing coroutines is forbidden, use tasks explicitly.
task_list = [
my_async_func(1),
my_async_func(2),
my_async_func(3)
]
done, pending = await asyncio.wait(task_list, return_when = FIRST_COMPLETED)
for d in done:
print('DONE:', d)
for p in pending:
print('PENDING:', p)
print('end')
否则会有报错:
from __future__ import annotations
import asyncio
import datetime
import json
import uuid
from typing import Dict
async def my_async_func(delta: int) -> Dict[str, str]:
await asyncio.sleep(delta) # 默认使用 asyncio.sleep 作为耗时任务
ret = {
'uuid': uuid.uuid4().hex,
'time': datetime.datetime.now().isoformat(),
'coroutine_id': id(asyncio.current_task()),
}
return ret
async def my_async_main():
print('start')
try:
# 使用 wait_for 调度单个 Coroutine 对象,设置超时时间
ret = await asyncio.wait_for(my_async_func(1), timeout = 10)
print(json.dumps(ret, indent = 4, ensure_ascii = False))
print('end')
except asyncio.TimeoutError as e:
print('Error:', e)
if __name__ == '__main__':
# 使用 asyncio.run 运行一个协程入口函数 my_async_main()
asyncio.run(my_async_main())
运行的效果为:
start
{
"uuid": "5f587726d866427783936937972267dc",
"time": "2024-11-23T13:51:39.985166",
"coroutine_id": 5370404800
}
end
asyncio.wait_for 中也可以传入 Task 对象:
async def my_async_main():
print('start')
try:
# 使用 wait_for 调度单个 Task 对象,设置超时时间
ret = await asyncio.wait_for(asyncio.create_task(my_async_func(1)), timeout = 10)
print(json.dumps(ret, indent = 4, ensure_ascii = False))
print('end')
except asyncio.TimeoutError as e:
print('Error:', e)
from __future__ import annotations
import asyncio
import datetime
import json
import uuid
from typing import Dict
async def my_async_func(delta: int) -> Dict[str, str]:
await asyncio.sleep(delta) # 默认使用 asyncio.sleep 作为耗时任务
ret = {
'uuid': uuid.uuid4().hex,
'time': datetime.datetime.now().isoformat(),
'coroutine_id': id(asyncio.current_task()),
}
return ret
async def my_async_main():
print('start')
try:
# 使用 asyncio.as_completed 调度 Coroutine 对象列表
# asyncio.as_completed 会生成一个迭代器,
# 迭代器每次返回一个 Coroutine 对象
# 每次需要 await Coroutine 对象获取其执行的结果
# asyncio.as_completed 允许你并发地执行多个异步任务,并在每个任务完成时立即处理它的结果
# 这与等待所有任务全部完成后再一次性处理结果(如使用 asyncio.gather)相比
# 可以更快地响应每个任务的完成
coroutines = [my_async_func(1), my_async_func(2), my_async_func(3)]
coroutines_iter = asyncio.as_completed(coroutines, timeout = 4)
for i in coroutines_iter:
print(type(i))
ret = await i # 迭代器返回的是 Coroutine 对象
print(json.dumps(ret, indent = 4, ensure_ascii = False))
print('end')
except asyncio.TimeoutError as e:
print('Error:', e)
if __name__ == '__main__':
# 使用 asyncio.run 运行一个协程入口函数 my_async_main()
asyncio.run(my_async_main())
运行效果为:
start
<class 'coroutine'>
{
"uuid": "d0fdd0d1caba44148a0a531d58811baa",
"time": "2024-11-23T13:53:32.706889",
"coroutine_id": 6047213952
}
<class 'coroutine'>
{
"uuid": "30389fbaa6ae40d7932d5f344d6e0389",
"time": "2024-11-23T13:53:33.708025",
"coroutine_id": 6047209920
}
<class 'coroutine'>
{
"uuid": "856fc8020d874f2a98fadcd26a252762",
"time": "2024-11-23T13:53:34.707807",
"coroutine_id": 6047212032
}
end
asyncio.as_completed 中同样也可以传入 Task 对象:
async def my_async_main():
print('start')
try:
# 使用 asyncio.as_completed 调度 Task 对象列表
# asyncio.as_completed 会生成一个迭代器,
# 迭代器每次返回一个 Task 对象
# 每次需要 await Task 对象获取其执行的结果
tasks = [
asyncio.create_task(my_async_func(1)),
asyncio.create_task(my_async_func(2)),
asyncio.create_task(my_async_func(3))]
coroutines_iter = asyncio.as_completed(tasks, timeout = 4)
for i in coroutines_iter:
print(type(i))
ret = await i # 迭代器返回的是 Task 对象
print(json.dumps(ret, indent = 4, ensure_ascii = False))
print('end')
except asyncio.TimeoutError as e:
print('Error:', e)
from __future__ import annotations
import asyncio
async def my_coroutine():
print('my_coroutine begin')
await asyncio.sleep(2)
print('my_coroutine end')
async def my_async_func():
print('my_async_func begin')
loop = asyncio.get_running_loop()
task = loop.create_task(my_coroutine())
await task
print('my_async_func end')
if __name__ == '__main__':
asyncio.run(my_async_func())
运行的效果为:
my_async_func begin
my_coroutine begin
my_coroutine end
my_async_func end
Future 对象作为一个占位符存在,用于存储异步操作的结果。
当一个异步操作被触发时,其结果可能还不可用,Future 对象则在这里发挥作用,它将来会被操作的结果填充
from __future__ import annotations
import asyncio
async def my_coroutine():
print('my_coroutine begin')
await asyncio.sleep(2)
print('my_coroutine end')
async def my_async_func():
print('my_async_func begin')
loop = asyncio.get_running_loop()
future = loop.create_future()
future.set_result('this is a future object')
ret = await future
print(ret)
print('my_async_func end')
if __name__ == '__main__':
asyncio.run(my_async_func())
运行效果:
my_async_func begin
this is a future object
my_async_func end
from __future__ import annotations
import asyncio
from asyncio import Future
def on_my_future_done(obj: Future):
print('my_future done begin')
print(obj.result())
print('my_future done end')
async def my_async_func():
print('my_async_func begin')
loop = asyncio.get_running_loop()
future = loop.create_future()
future.set_result('this is a future object')
future.add_done_callback(on_my_future_done) # 同步接口
ret = await future
print(ret)
print('my_async_func end ------')
if __name__ == '__main__':
asyncio.run(my_async_func())
执行效果为:
my_async_func begin
this is a future object
my_async_func end ------
my_future done begin
this is a future object
my_future done end
from __future__ import annotations
import asyncio
import time
from asyncio import Future
def on_my_future_done(obj: Future):
print('my_future done begin')
print(obj.result())
print('my_future done end')
async def another_async_func(obj: Future):
print('another_async_func begin')
obj.set_result('this is another future coroutine object')
print('another_async_func end')
return f'another async future coroutine finished at {time.time()}'
async def my_async_func():
print('my_async_func begin')
loop = asyncio.get_running_loop()
future = loop.create_future()
future.add_done_callback(on_my_future_done) # 同步接口
task = asyncio.create_task(another_async_func(future))
print(await task)
print(await future)
print('my_async_func end ------')
if __name__ == '__main__':
asyncio.run(my_async_func())
运行效果:
my_async_func begin
another_async_func begin
another_async_func end
my_future done begin
this is another future coroutine object
my_future done end
another async future coroutine finished at 1732341693.668807
this is another future coroutine object
my_async_func end ------
def sync_func(delta: int) -> str:
begin_time = time.time()
print('sync_func begin@', begin_time)
time.sleep(delta) # 模拟同步耗时程序
finished_time = time.time()
print('sync_func finished@', finished_time)
return f'sync_func begin@{begin_time} finished@{finished_time}'
async def async_func(delta: int) -> str:
begin_time = time.time()
print('async_func begin@', time.time())
await asyncio.sleep(delta) # 模拟异步耗时程序
finished_time = time.time()
print('async_func finished@', finished_time)
return f'async_func begin@{begin_time} finished@{finished_time}'
async def main():
# 先执行 to_thread, 再执行 async_func, 顺序执行
print('async main begin...')
sync_ret = await asyncio.to_thread(sync_func, 3) # 阻塞main 但是会让出 cpu 供其他事件占用
async_ret = await async_func(3)
print(sync_ret, async_ret)
print('async main end...')
if __name__ == '__main__':
asyncio.run(main())
运行效果:
async def main2():
# 并发执行 sync_func 和 async_func
print(f'async main begin @{time.time()}')
tasks = [
asyncio.create_task(asyncio.to_thread(sync_func, 3)),
asyncio.create_task(asyncio.to_thread(sync_func, 3)),
asyncio.create_task(async_func(3)),
asyncio.create_task(async_func(3))
]
_ = await asyncio.wait(tasks, return_when = asyncio.ALL_COMPLETED)
print(f'async main end @{time.time()}')
if __name__ == '__main__':
asyncio.run(main2())
运行效果:
在 main
函数中,按顺序执行了两个任务:一个同步函数 sync_func
和一个异步函数 async_func
。这里的关键是使用 await asyncio.to_thread(sync_func, 3)
来在不同的线程中运行同步函数,这样可以避免阻塞事件循环。由于使用了 await
,async_func
只有在 sync_func
完成后才开始执行。这意味着这两个函数是顺序执行的,即使它们被放在了不同的线程或协程中。
在 main2
函数中,使用了并发执行的方式来处理四个任务:两个 sync_func
和两个 async_func
。这里,所有任务都被封装成了 asyncio.Task
并被同时启动。使用 asyncio.wait
来等待所有任务完成。这种方式允许所有任务几乎同时开始,并且它们各自独立地运行,无需等待其他任务完成。这显著提高了程序的并发性能。
这两种方法展示了如何根据不同的需求选择合适的任务调度策略。
from __future__ import annotations
import asyncio
import random
import time
import uuid
from asyncio import ALL_COMPLETED, StreamReader, StreamWriter
async def handle_echo(reader: StreamReader, writer: StreamWriter):
addr = writer.get_extra_info('peername')
print(f"{addr} connected")
try:
while True:
data = await reader.read(1024) # 读取客户端发送的数据, 每次读取 100 字节
message = data.decode('utf-8') # 这里假设客户端发送的都是 utf-8 字符串
print(f'服务端收到请求:{message}')
writer.write(f'{message}@{time.time()}'.encode('utf-8')) # 将接收到的数据返回给客户端
await writer.drain() # 确保数据被发送
except Exception as e:
print(f"Error with {addr}: {e}")
finally:
print(f"{addr} disconnected")
writer.close() # 关闭连接
async def tcp_server_main():
"""
client_connected_cb: 客户端连接建立时的回调函数,通常接收两个参数:reader 和 writer。
host: 字符串或字符串列表,表示服务器监听的 IP 地址。可以是 IPv4 或 IPv6 地址。
port: 整数,表示服务器监听的端口号。
loop: 可选参数,指定事件循环。如果不指定,默认使用 asyncio.get_event_loop()。
limit: 可选参数,设置缓冲区的大小限制,默认为 256 KB。
family: 可选参数,设置地址族,如 socket.AF_INET。
flags: 可选参数,设置地址解析的配置。
sock: 可选参数,可以直接传入一个 socket 对象而不是让 asyncio.start_server 创建一个。
backlog: 可选参数,设置操作系统可以挂起的最大连接数。
ssl: 可选参数,如果提供了 SSL 上下文,服务器将处理 SSL 加密的连接。
reuse_address: 可选参数,告诉操作系统是否应该允许重用一个地址。
reuse_port: 可选参数,允许多个程序绑定到同一端口。
"""
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 18082) # 在本地端口启动服务器
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever() # 持续服务,直到被外部取消
async def tcp_client_main():
client_id = uuid.uuid4()
random_sleep = random.randint(1, 10)
print(f"Client {client_id} starting... random sleep: {random_sleep}s")
# 为每个客户端引入了一个随机的延迟,让服务器完全启动并准备好接受连接
# 错开启动时间:这个随机延迟错开了每个客户端尝试连接服务器的时间点,减少了所有客户端同时冲击服务器的可能性,从而降低了连接失败的风险
# 给服务器准备时间:这个延迟为服务器提供了额外的时间来完成启动过程并开始监听端口,从而在客户端尝试连接时,服务器已经准备好接受新的连接
# 减少资源竞争:在多客户端同时启动的情况下,服务器可能需要处理多个并发的连接请求,这可能会导致资源竞争和性能瓶颈。通过引入延迟,客户端的连接请求被分散到不同的时间点,从而减轻了服务器的负载压力
await asyncio.sleep(random_sleep)
reader, writer = await asyncio.open_connection(
'127.0.0.1', 18082) # 连接到服务器
print(f"Client {client_id} connected")
try:
while True:
writer.write(f'{client_id}, 间隔周期:{random_sleep}s, Hello World!'.encode('utf-8')) # 发送数据
await writer.drain() # 确保数据被发送
data = await reader.read(1024) # 读取服务器的响应
print(f'客户端收到回复: {data.decode()}')
await asyncio.sleep(random_sleep)
except Exception as e:
print(f"Error: {e}")
finally:
print('server exit')
# 这个方法用于关闭 StreamWriter 对象的底层传输(例如 TCP 连接)
# 调用 close() 后,传输会停止发送数据,并尽可能地完成剩余数据的发送
# 然而,这个方法本身是非阻塞的,即它会立即返回,而不等待连接完全关闭
writer.close() # 关闭连接
# 这个方法是异步的,它等待直到底层的传输真正关闭
# 这意味着在这个 await 表达式之后,你可以确信连接已经被完全关闭,资源被释放
# 如果仅仅调用 writer.close() 而不等待 wait_closed()
# 那么可能会在连接实际上还没有完全关闭的情况下继续执行后续代码
# 这可能导致资源泄露或其他一些难以预料的问题
# writer.close() 和 await writer.wait_closed() 一起使用是为了确保连接的正确和完全关闭
# 这种做法遵循了良好的异步编程实践,即确保所有资源都被适当地管理和释放
await writer.wait_closed()
async def my_tcp_demo(client_num: int = 1):
server_task = asyncio.create_task(tcp_server_main())
# await asyncio.sleep(1)
client_task_list = [asyncio.create_task(tcp_client_main()) for _ in range(client_num)]
done, pending = await asyncio.wait([server_task, *client_task_list],
return_when = ALL_COMPLETED)
for task in pending:
task.cancel()
for task in done:
try:
await task
except asyncio.CancelledError:
pass
if __name__ == '__main__':
asyncio.run(my_tcp_demo(3))
运行效果:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。