首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何为Python异步创建自定义事件

如何为Python异步创建自定义事件
EN

Stack Overflow用户
提问于 2017-12-15 23:29:57
回答 1查看 1.6K关注 0票数 2

Mesos调度器Marathon有一个异步HTTP API。例如,当用户通过向/v2/apps发布JSON来部署应用程序时,会返回一个部署id。然后,可以使用该id在/v2/deployments中轮询部署状态,或者通过订阅/v2/events并查找deployment_success事件来进行轮询。

我想创建一个带有协程的异步Python客户端。例如,一旦deployment_success事件到达,client.deploy_app(...)就应该返回,而不是阻塞。

我如何使用asyncio实现这些方法?如何创建事件监听器?它感觉事件循环就是为此而创建的,但我不知道如何注册事件。

EN

回答 1

Stack Overflow用户

发布于 2017-12-16 17:37:50

可以使用aiohttp模块创建/v2/apps所需的异步post http请求:

代码语言:javascript
运行
复制
import asyncio
import aiohttp


async def post(url, json):
    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=json) as resp:
            return await resp.json()


async def main():
    res = await post('http://httpbin.org/post', {'test': 'object'})
    print(res['json'])  # {'test': 'object'}


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

如果您想使用/v2/events来跟踪部署成功,您应该请求流(参见doc接口)。这可以通过It的asynchronous iterationaiohttp中实现:您只需以异步方式逐行读取内容,等待所需的事件,例如:

代码语言:javascript
运行
复制
import asyncio
import aiohttp


async def stream(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            async for line in resp.content:
                yield line


async def main():
    async for line in stream('http://httpbin.org/stream/10'):
        print(line)  # check if line contains event you need


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

如果你想使用/v2/deployments,你应该使用asyncio.sleep定期请求,等待一些延迟。在这种情况下,你的函数不会阻塞:

代码语言:javascript
运行
复制
import asyncio
import aiohttp


async def get(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.json()


async def main():
    while True:       
        # Make request to see if deplayment finished:
        res = await get('http://httpbin.org/get')
        print(res['origin'])  # break if ok here
        # Async wait before next try:
        await asyncio.sleep(3)


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47835371

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档