Celery dynamic queue creation and routing(Celery 动态队列创建和路由)
问题描述
我正在尝试调用一个任务并为该任务创建一个队列,如果它不存在,则立即将调用的任务插入该队列.我有以下代码:
I'm trying to call a task and create a queue for that task if it doesn't exist then immediately insert to that queue the called task. I have the following code:
@task
def greet(name):
return "Hello %s!" % name
def run():
result = greet.delay(args=['marc'], queue='greet.1',
routing_key='greet.1')
print result.ready()
然后我有一个自定义路由器:
then I have a custom router:
class MyRouter(object):
def route_for_task(self, task, args=None, kwargs=None):
if task == 'tasks.greet':
return {'queue': kwargs['queue'],
'exchange': 'greet',
'exchange_type': 'direct',
'routing_key': kwargs['routing_key']}
return None
这将创建一个名为 greet.1
的交换和一个名为 greet.1
的队列,但该队列为空.交换器应该只称为 greet
,它知道如何将路由键(如 greet.1
)路由到名为 greet.1
的队列.
this creates an exchange called greet.1
and a queue called greet.1
but the queue is empty. The exchange should be just called greet
which knows how to route a routing key like greet.1
to the queue called greet.1
.
有什么想法吗?
推荐答案
当您执行以下操作时:
task.apply_async(queue='foo', routing_key='foobar')
然后 Celery 将从 CELERY_QUEUES 中的 'foo' 队列中获取默认值,或者如果它不存在,则使用 (queue=foo, exchange=foo, routing_key=foo) 自动创建它
Then Celery will take default values from the 'foo' queue in CELERY_QUEUES, or if it does not exist then automatically create it using (queue=foo, exchange=foo, routing_key=foo)
因此,如果 CELERY_QUEUES 中不存在foo",您最终会得到:
So if 'foo' does not exist in CELERY_QUEUES you will end up with:
queues['foo'] = Queue('foo', exchange=Exchange('foo'), routing_key='foo')
然后生产者将声明该队列,但是由于您覆盖了 routing_key,实际使用 routing_key = 'foobar'
The producer will then declare that queue, but since you override the routing_key,
actually send the message using routing_key = 'foobar'
这可能看起来很奇怪,但这种行为实际上对主题交换很有用,发布到不同主题的地方.
This may seem strange but the behavior is actually useful for topic exchanges, where you publish to different topics.
虽然很难做你想做的事,你可以自己创建队列并声明它,但这不适用于自动消息发布重试.如果 apply_async 的 queue 参数可以支持会更好一个自定义的 kombu.Queue
将被声明并用作目的地.也许你可以在 http://github.com/celery/celery/issues
It's harder to do what you want though, you can create the queue yourself
and declare it, but that won't work well with automatic message publish retries.
It would be better if the queue argument to apply_async could support
a custom kombu.Queue
instead that will be both declared and used as the destination.
Maybe you could open an issue for that at http://github.com/celery/celery/issues
这篇关于Celery 动态队列创建和路由的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:Celery 动态队列创建和路由
基础教程推荐
- 哪些 Python 包提供独立的事件系统? 2022-01-01
- 如何在Python中绘制多元函数? 2022-01-01
- 如何在 Python 中检测文件是否为二进制(非文本)文 2022-01-01
- 使 Python 脚本在 Windows 上运行而不指定“.py";延期 2022-01-01
- 合并具有多索引的两个数据帧 2022-01-01
- Python 的 List 是如何实现的? 2022-01-01
- 将 YAML 文件转换为 python dict 2022-01-01
- 使用Python匹配Stata加权xtil命令的确定方法? 2022-01-01
- 使用 Google App Engine (Python) 将文件上传到 Google Cloud Storage 2022-01-01
- 症状类型错误:无法确定关系的真值 2022-01-01