前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >如何连接不断生成和使用数据的 asyncio.coroutines?

如何连接不断生成和使用数据的 asyncio.coroutines?

原创
作者头像
华科云商小徐
发布2024-12-10 09:19:11
发布2024-12-10 09:19:11
11400
代码可运行
举报
文章被收录于专栏:小徐学爬虫小徐学爬虫
运行总次数:0
代码可运行

在使用 asyncio 时,连接不断生成和使用数据的多个协程是常见需求。下面就是我在实际操作中遇到的问题以及解决方法可以供大家参考,有问题的也可以指正。

1、问题背景

使用 Python 3.4 的 asyncio 模块时,可能会遇到需要连接不断生成和使用数据的问题。例如,您可能想要创建一个程序来定期报告从 subprocess 接收到的数字的总和,同时不会阻止对 subprocess 的读取。您希望报告函数不会阻止读取子进程的迭代,并且报告者的迭代次数与在足够长的时间内 expect_exact() 的迭代次数大致相同。

2、解决方案

有两种主要的方法可以使用 asyncio 模块解决这个问题:条件和队列。

条件

asyncio.Condition() 提供了一种通知条件的方法。当您不必关心丢失一些事件时,可以使用它。

示例代码:

代码语言:javascript
代码运行次数:0
复制
import asyncio
import pexpect
​
class Source:
​
    def __init__(self):
        self.flag = asyncio.Event()
        self.sum = 0
​
        # For consumers
        self.ready = asyncio.Condition()
​
    def start(self):
        self.flag.set()
​
    def stop(self):
        self.flag.clear()
​
    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()
​
        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")
​
        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            self.sum += int(p.before)
            with (yield from self.ready):
                self.ready.notify_all() # Or just notify() depending on situation
​
        p.terminate()
​
    @asyncio.coroutine
    def read(self):
        with (yield from self.ready):
            yield from self.ready.wait()
            return self.sum
​
​
@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff in here
​
# usage
async def main():
    source = Source()
    loop.call_later(1, source.start)
    loop.call_later(11, source.stop)
​
    # Again, not sure what goes here...
    asyncio.ensure_future(reporter(source))
​
    await source.run()
​
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

队列

asyncio.Queue() 允许您将数据放入队列(LIFO 或 FIFO)并让其他内容从中读取。当您绝对想响应每个事件时,即使您的使用者落在后面(在时间上),也可以使用它。请注意,如果您限制队列的大小,当您的消费者足够慢时,您的生产者最终会阻塞。

示例代码:

代码语言:javascript
代码运行次数:0
复制
import asyncio
import pexpect
​
class Source:
​
    def __init__(self):
        self.flag = asyncio.Event()
        # NOTE: self.sum removed!
​
        # For consumers
        self.output = asyncio.Queue()
​
    def start(self):
        self.flag.set()
​
    def stop(self):
        self.flag.clear()
​
    @asyncio.coroutine
    def run(self):
        yield from self.flag.wait()
​
        sum = 0
​
        p = pexpect.spawn(
            "python -c "
            "'import random, time\n"
            "while True: print(random.choice((-1, 1))); time.sleep(0.5)'")
​
        while self.flag.is_set():
            yield from p.expect_exact('\n', async=True)
            sum += int(p.before)
            yield from self.output.put(sum)
​
        p.terminate()
​
    @asyncio.coroutine
    def read(self):
        return (yield from self.output.get())
​
@asyncio.coroutine
def reporter(source):
    while True:
        # Something like:
        new_sum = yield from source.read()
        print("New sum is: {:d}".format(new_sum))
        # Other potentially blocking stuff here
​
# usage
async def main():
    source = Source()
    loop.call_later(1, source.start)
    loop.call_later(11, source.stop)
​
    # Again, not sure what goes here...
    asyncio.ensure_future(reporter(source))
​
    await source.run()
​
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

关键点总结

  1. asyncio.Queue 是在生产者和消费者间传递数据的首选方法。
  2. asyncio.Event 适合需要通知机制的场景。
  3. 使用 生成器协程async for)结合队列实现实时动态数据流。
  4. 确保结束信号的设计正确,否则协程可能进入死循环。

通过以上方法,可以实现流式、动态、高效的数据生成与消费。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档