在使用 asyncio
时,连接不断生成和使用数据的多个协程是常见需求。下面就是我在实际操作中遇到的问题以及解决方法可以供大家参考,有问题的也可以指正。
1、问题背景
使用 Python 3.4 的 asyncio 模块时,可能会遇到需要连接不断生成和使用数据的问题。例如,您可能想要创建一个程序来定期报告从 subprocess 接收到的数字的总和,同时不会阻止对 subprocess 的读取。您希望报告函数不会阻止读取子进程的迭代,并且报告者的迭代次数与在足够长的时间内 expect_exact() 的迭代次数大致相同。
2、解决方案
有两种主要的方法可以使用 asyncio 模块解决这个问题:条件和队列。
条件
asyncio.Condition() 提供了一种通知条件的方法。当您不必关心丢失一些事件时,可以使用它。
示例代码:
import asyncio
import pexpect
class Source:
def __init__(self):
self.flag = asyncio.Event()
self.sum = 0
# For consumers
self.ready = asyncio.Condition()
def start(self):
self.flag.set()
def stop(self):
self.flag.clear()
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
p = pexpect.spawn(
"python -c "
"'import random, time\n"
"while True: print(random.choice((-1, 1))); time.sleep(0.5)'")
while self.flag.is_set():
yield from p.expect_exact('\n', async=True)
self.sum += int(p.before)
with (yield from self.ready):
self.ready.notify_all() # Or just notify() depending on situation
p.terminate()
@asyncio.coroutine
def read(self):
with (yield from self.ready):
yield from self.ready.wait()
return self.sum
@asyncio.coroutine
def reporter(source):
while True:
# Something like:
new_sum = yield from source.read()
print("New sum is: {:d}".format(new_sum))
# Other potentially blocking stuff in here
# usage
async def main():
source = Source()
loop.call_later(1, source.start)
loop.call_later(11, source.stop)
# Again, not sure what goes here...
asyncio.ensure_future(reporter(source))
await source.run()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
队列
asyncio.Queue() 允许您将数据放入队列(LIFO 或 FIFO)并让其他内容从中读取。当您绝对想响应每个事件时,即使您的使用者落在后面(在时间上),也可以使用它。请注意,如果您限制队列的大小,当您的消费者足够慢时,您的生产者最终会阻塞。
示例代码:
import asyncio
import pexpect
class Source:
def __init__(self):
self.flag = asyncio.Event()
# NOTE: self.sum removed!
# For consumers
self.output = asyncio.Queue()
def start(self):
self.flag.set()
def stop(self):
self.flag.clear()
@asyncio.coroutine
def run(self):
yield from self.flag.wait()
sum = 0
p = pexpect.spawn(
"python -c "
"'import random, time\n"
"while True: print(random.choice((-1, 1))); time.sleep(0.5)'")
while self.flag.is_set():
yield from p.expect_exact('\n', async=True)
sum += int(p.before)
yield from self.output.put(sum)
p.terminate()
@asyncio.coroutine
def read(self):
return (yield from self.output.get())
@asyncio.coroutine
def reporter(source):
while True:
# Something like:
new_sum = yield from source.read()
print("New sum is: {:d}".format(new_sum))
# Other potentially blocking stuff here
# usage
async def main():
source = Source()
loop.call_later(1, source.start)
loop.call_later(11, source.stop)
# Again, not sure what goes here...
asyncio.ensure_future(reporter(source))
await source.run()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
关键点总结
asyncio.Queue
是在生产者和消费者间传递数据的首选方法。asyncio.Event
适合需要通知机制的场景。async for
)结合队列实现实时动态数据流。通过以上方法,可以实现流式、动态、高效的数据生成与消费。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。