我有一个应用程序,它已经无限运行,异步事件循环永远运行,而且我还需要每10秒运行一个特定的函数。
def do_something():
pass
a = asyncio.get_event_loop()
a.run_forever()
我想每10秒调用一次函数do_something。如何在不用while循环替换异步事件循环的情况下实现这一点?
编辑:我可以用下面的代码来实现这一点
def do_something():
pass
while True:
time.sleep(10)
do_something()
但是我不想使用while循环在我的应用程序中无限运行,相反,我想使用异步run_forever()。那么如何使用asyncio每10秒调用一次相同的函数呢?有没有像这样的调度器不会阻碍我正在进行的工作?
发布于 2021-11-12 08:48:14
asyncio
没有提供内置的调度器,但是构建自己的调度器非常容易。只需将while
循环与asyncio.sleep
结合起来,就可以每隔几秒钟运行一次代码。
async def every(__seconds: float, func, *args, **kwargs):
while True:
func(*args, **kwargs)
await asyncio.sleep(__seconds)
a = asyncio.get_event_loop()
a.create_task(every(1, print, "Hello World"))
...
a.run_forever()
请注意,如果func
本身是一个协程或长时间运行子例程,则设计必须稍有不同。在前一种情况下使用await func(...)
,在后一种情况下使用asyncio
's thread capabilities。
发布于 2021-11-12 08:21:45
您可以使用以下命令来实现它
async def do_something():
while True:
await asyncio.wait(10)
...rest of code...
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(do_something())
loop.run_forever()
https://stackoverflow.com/questions/69939800
复制相似问题