Run multiple Celery tasks using a topic exchange(使用主题交换运行多个 Celery 任务)
问题描述
我正在用 Celery 替换一些本地代码,但很难复制当前的行为.我想要的行为如下:
I'm replacing some homegrown code with Celery, but having a hard time replicating the current behaviour. My desired behaviour is as follows:
- 创建新用户时,应使用
user.created
路由键将消息发布到tasks
交换. - 此消息应触发两个 Celery 任务,即
send_user_activate_email
和check_spam
.
- When creating a new user, a message should be published to the
tasks
exchange with theuser.created
routing key. - Two Celery tasks should be trigged by this message, namely
send_user_activate_email
andcheck_spam
.
我尝试通过使用 ignore_result=True
参数定义 user_created
任务以及 send_user_activate_email
和 check_spam 的任务来实现这一点
.
I tried implementing this by defining a user_created
task with a ignore_result=True
argument, plus a task for send_user_activate_email
and check_spam
.
在我的配置中,我添加了以下路由和队列定义.当消息传递到 user_created
队列时,它不会传递到其他两个队列.
In my configuration, I added the following routes and queues definitions. While the message is delivered to the user_created
queue, it is not delivered to the other two queues.
理想情况下,邮件仅传递到 send_user_activate_email
和 check_spam
队列.使用 vanilla RabbitMQ 时,消息会发布到交换器,队列可以绑定到交换器,但 Celery 似乎直接将消息传递到队列.
Ideally, the message is only delivery to the send_user_activate_email
and check_spam
queues. When using vanilla RabbitMQ, messages are published to an exchange, to which queues can bind, but Celery seems to deliver a message to a queue directly.
如何在 Celery 中实现上述行为?
How would I implement the behaviour outlined above in Celery?
CELERY_QUEUES = {
'user_created': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
'send_user_activate_email': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
'check_spam': {'binding_key':'user.created', 'exchange': 'tasks', 'exchange_type': 'topic'},
}
CELERY_ROUTES = {
'user_created': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
'send_user_activate_email': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
'check_spam': {
'queue': 'user_created',
'routing_key': 'user.created',
'exchange': 'tasks',
'exchange_type': 'topic',
},
}
推荐答案
听起来您希望单个消息触发/被两个队列使用,但这不是 Celery 的工作方式.一个 Exchange 会将一个任务发布到符合条件的队列中,但是一旦它被使用,其他队列就会忽略该消息.每个要触发的任务都需要一条消息.
It sounds like you are expecting a single message to trigger/be consumed by two queues but this is not how Celery works. An Exchange will post a task to eligible queues, but once it is consumed, the other Queues ignore the message. You need a message per Task you want to trigger.
Celery 新用户经常会感到困惑,因为在这个系统中队列"有两种用途;Queue() 和文档引用的 Kombu 队列,以及直接保存消息并由工作人员使用的 AMQP 队列.当我们发布到队列时,我们会想到 AMQP 队列,这是不正确的.(感谢下面链接的答案).
There is often confusion with new Celery users because there are two uses of "Queue" in this system; Kombu Queues which the Queue() and documentation refer to, and the AMQP Queues, which hold messages directly and are consumed by workers. When we publish to queues, we think of the AMQP ones, which is incorrect. (thanks to answer linked below).
回到你的问题,如果我理解正确的话,当 user_created 被消费时,你希望它产生另外两个任务;send_user_activate_email 和 check_spam.此外,它们不应相互依赖;它们可以在不同的机器上并行运行,并且不需要知道彼此的状态.
Back to your issue, if I am understanding correctly, when user_created is consumed, you want it to spawn two more tasks; send_user_activate_email and check_spam. Furthermore, these should not be dependent on each other; they can run in parallel on separate machines and do not need to know the status of one another.
在这种情况下,您希望 user_created 应用异步"这两个新任务并返回.这可以直接完成,或者您可以使用包含 check_spam 和 send_user_activate_email 的 Celery组"来实现此目的.该小组提供了一些很好的速记并为您的任务提供了一些结构,所以我个人会向您推荐这个方向.
In this case, you want user_created to "apply_async" these two new Tasks and return. This could be done directly, or you can use a Celery "Group" containing check_spam and send_user_activate_email to achieve this. The group gives some nice shorthand and lends some structure to your tasks, so personally I'd nudge you that direction.
#pseudocode
group(check_spam.s(... checkspam kwargs ...), send_user_activate_email.s(... active email kwargs ...)).apply_async()
此设置将创建四个消息;一个用于您要执行的每个任务加上一个用于 Group(),它本身就会有一个结果.
This setup would create four messages; one for each Task you want to execute plus one for the Group(), which itself will have a result.
在您的情况下,我不确定 Exchange 或 ignore_result 是否必要,但我需要查看任务代码并更多地了解系统才能做出判断.
In your case, I am not sure the Exchange or ignore_result is necessary, but I'd need to see the Task code and understand the system more to make that judgement.
http://docs.celeryproject.org/en/latest/userguide/canvas.html#groupshttp://celery.阅读thedocs.org/en/v2.2.6/userguide/routing.html#exchanges-queues-and-routing-keys为什么 CELERY_ROUTES 都有一个 "排队"还有一个routing_key"?
(如果我离开了,我会删除/删除答案...)
(if I am way off I'll delete/remove the answer...)
这篇关于使用主题交换运行多个 Celery 任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:使用主题交换运行多个 Celery 任务
基础教程推荐
- 合并具有多索引的两个数据帧 2022-01-01
- 症状类型错误:无法确定关系的真值 2022-01-01
- 使用 Google App Engine (Python) 将文件上传到 Google Cloud Storage 2022-01-01
- 使用Python匹配Stata加权xtil命令的确定方法? 2022-01-01
- Python 的 List 是如何实现的? 2022-01-01
- 使 Python 脚本在 Windows 上运行而不指定“.py";延期 2022-01-01
- 如何在Python中绘制多元函数? 2022-01-01
- 如何在 Python 中检测文件是否为二进制(非文本)文 2022-01-01
- 哪些 Python 包提供独立的事件系统? 2022-01-01
- 将 YAML 文件转换为 python dict 2022-01-01