celery路由配置后,启动服务之后执行当前配置路由任务
最简单的路由方式是使用 task_create_missing_queues 设置 (默认是开启的)。
这个设置开启后, 一个在task_queues中还未定义的命名队列会被自动创建。这让简单的路由任务变得很容易。 假如你有两台服务器x 和 y,来处理常规(regular)任务,一个服务器z只处理feed相关的任务。你可以使用这样的配置:
task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}
这个路由启用( enabled)后import_feed任务会被路由给“feeds”队列, 并且所有其他任务会被路由到默认队列(由于历史原因这个队列叫“celery”)。
除此以外,你可以使用全局模式匹配,或者正则表达式,匹配所有在feed.tasks 命名空间中的任务:
app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}
如果匹配模式的顺序很重要,你应该在条目列表中指定路由器:
task_routes = ([
(‘feed.tasks.*’, {‘queue’: ‘feeds’}),
(‘web.tasks.*’, {‘queue’: ‘web’}),
(re.compile(r’(video|image).tasks..*’), {‘queue’: ‘media’}),
],)
注意
task_routes设置既可以是一个字典,也可是路由器对象的列表,所以在这种情况下我们需要指定设置为一个包含列表的元组(tuple)。
配置好路由器后,你可以开启服务器z来只处理feeds队列:
user@z:/$ celery -A proj worker -Q feeds
你可以指定很多队列,所以你也可以让这个服务器处理默认队列:
user@z:/$ celery -A proj worker -Q feeds,celery
你可以通过下面的配置改变默认队列的名字:
app.conf.task_default_queue = 'default'
这样处理是对用户隐藏了AMQP协议的复杂性,而只提供基本的需求。然而你可能仍然对于队列的声明感兴趣。 使用下面的设置,一个名为“video” 的队列会被创建:
{'exchange': 'video',
'exchange_type': 'direct',
'routing_key': 'video'}
像Redis 或 SQS这样的非AMQP的后端,不支持交易所(exchange),所以他们需要exchange同队列有相同的名字。使用这个设计可以确保对他们也有效。
假如你有两台服务器x 和 y,来处理常规(regular)任务,一个服务器z只处理feed相关的任务,你可以使用这样的配置:
from kombu import Queue
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
Queue('default', routing_key='task.#'),
Queue('feed_tasks', routing_key='feed.#'),
)
task_default_exchange = 'tasks'
task_default_exchange_type = 'topic'
task_default_routing_key = 'task.default'
task_queues 是Queue 实例的列表.。如果你没有给一个键设置exchange或者exchange类型,这些会从task_default_exchange和task_default_exchange_type设置中取。 为了路由任务到feed_tasks 队列,你可以在task_routes 设置中添加一个容器:
task_routes = {
‘feeds.tasks.import_feed’: {
‘queue’: ‘feed_tasks’,
‘routing_key’: ‘feed.import’,
},
}
你也可以给Task.apply_async()或者send_task()使用routing_key参数覆盖这个配置:
>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
... queue='feed_tasks',
... routing_key='feed.import')
为了让服务器z从feed队列消费,你可以使用celery worker -Q 选项启动它:
user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h
服务器 x 和 y 必须被配置成从默认队列消费:
user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h
如果你想,甚至可以让你的处理feed的工人(worker)也处理正规(regular)任务:
user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h
如果你有另一个队列但想要绑定在另一个交易所,你可以指定一个自定义交易所和交易类型:
from kombu import Exchange, Queue
app.conf.task_queues = (
Queue('feed_tasks', routing_key='feed.#'),
Queue('regular_tasks', routing_key='task.#'),
Queue('image_tasks', exchange=Exchange('mediatasks', type='direct'),
routing_key='image.compress'),
)
在Celery中可用的队列通过task_queues设置被定义。 这有一个队列配置的例子,它配置了三个队列;一个给video,一个给images还有一个给别的东西的默认队列:
default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')
app.conf.task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('videos', media_exchange, routing_key='media.video'),
Queue('images', media_exchange, routing_key='media.image')
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'
这里面, task_default_queue 会被用来路由那些没有明确路由的任务。 默认的交易所,交易类型和路由键会作为任务的默认路由值,并且作为task_queues中登记注册(entries)的默认值。
也支持多重绑定到单一队列。下面是两个路由键都绑定到相同队列的例子:
from kombu import Exchange, Queue, binding
media_exchange = Exchange('media', type='direct')
CELERY_QUEUES = (
Queue('media', [
binding(media_exchange, routing_key='media.video'),
binding(media_exchange, routing_key='media.image'),
]),
)
一个任务的终点由下面来决定(按次序): 在task_routes中定义的Routers。 The Routers defined in task_routes. 给Task.apply_async()的路由参数。 The routing arguments to Task.apply_async(). 定义在Task自身中的路由相关的属性。 Routing related attributes defined on the Task itself. 一般最好不要硬编码这些设置,而是通过使用Routers把那个作为配置选项。这是最灵活的途径,但明确合理的默认值仍然可以被设置为任务属性。
路由器是一个为任务决定路由选项的的函数。A router is a function that decides the routing options for a task.
定义一个新路由器时,你所要做的就是定义一个函数,带有签名 (name, args, kwargs, options, task=None, **kw):
def route_task(name, args, kwargs, options, task=None, **kw):
if name == 'myapp.tasks.compress_video':
return {'exchange': 'video',
'exchange_type': 'topic',
'routing_key': 'video.compress'}
如果你返回了队列的键,它会用task_queues中的那个队列已定义的设置进行扩展:
{'queue': 'video', 'routing_key': 'video.compress'}
变成 –>
{'queue': 'video',
'exchange': 'video',
'exchange_type': 'topic',
'routing_key': 'video.compress'}
通过把他们添加到task_routes设置中来配置路由器类:
task_routes = (route_task,)
Router 函数也可以通过名字添加:
task_routes = ('myapp.routers.route_task',)
对于简单的任务名For simple task name -> 路由映射就像上面的路由器例子,你可以简单的把一个字典放到task_routes中,获取同样的表现:
task_routes = {
'myapp.tasks.compress_video': {
'queue': 'video',
'routing_key': 'video.compress',
},
}
路由器们(routers)会被按次序探查,在第一个返回真值的路由器处停止,选择它作为任务的最终路由。
你也可以在一个序列中定义多个路由器:
task_routes = [
route_task,
{
‘myapp.tasks.compress_video’: {
‘queue’: ‘video’,
‘routing_key’: ‘video.compress’,
},
]
路由器会被按次序访问,第一个返回值的会被选中。
Celery 也支持广播路由。这有一个 broadcast_tasks交易所的例子,它会传递任务的拷贝给所有连接到它的工人(workers)。:
from kombu.common import Broadcast
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
'tasks.reload_cache': {
'queue': 'broadcast_tasks',
'exchange': 'broadcast_tasks'
}
}
现在tasks.reload_cache任务会被发送给从这个队列中消费的每一个工人。 这个广播路由的另一个例子,这次有一个celery定时器:
from kombu.common import Broadcast
from celery.schedules import crontab
app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.beat_schedule = {
'test-task': {
'task': 'tasks.reload_cache',
'schedule': crontab(minute=0, hour='*/3'),
'options': {'exchange': 'broadcast_tasks'}
},
}
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文系转载,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。