Mesos调度器Marathon有一个异步HTTP API。例如,当用户通过向/v2/apps
发布JSON来部署应用程序时,会返回一个部署id。然后,可以使用该id在/v2/deployments
中轮询部署状态,或者通过订阅/v2/events
并查找deployment_success
事件来进行轮询。
我想创建一个带有协程的异步Python客户端。例如,一旦deployment_success
事件到达,client.deploy_app(...)
就应该返回,而不是阻塞。
我如何使用asyncio实现这些方法?如何创建事件监听器?它感觉事件循环就是为此而创建的,但我不知道如何注册事件。
发布于 2017-12-16 17:37:50
可以使用aiohttp
模块创建/v2/apps
所需的异步post http请求:
import asyncio
import aiohttp
async def post(url, json):
async with aiohttp.ClientSession() as session:
async with session.post(url, json=json) as resp:
return await resp.json()
async def main():
res = await post('http://httpbin.org/post', {'test': 'object'})
print(res['json']) # {'test': 'object'}
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
如果您想使用/v2/events
来跟踪部署成功,您应该请求流(参见doc接口)。这可以通过It的asynchronous iteration在aiohttp
中实现:您只需以异步方式逐行读取内容,等待所需的事件,例如:
import asyncio
import aiohttp
async def stream(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
async for line in resp.content:
yield line
async def main():
async for line in stream('http://httpbin.org/stream/10'):
print(line) # check if line contains event you need
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
如果你想使用/v2/deployments
,你应该使用asyncio.sleep
定期请求,等待一些延迟。在这种情况下,你的函数不会阻塞:
import asyncio
import aiohttp
async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
async def main():
while True:
# Make request to see if deplayment finished:
res = await get('http://httpbin.org/get')
print(res['origin']) # break if ok here
# Async wait before next try:
await asyncio.sleep(3)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
loop.close()
https://stackoverflow.com/questions/47835371
复制相似问题