Chain a celery task#39;s results into a distributed group(将芹菜任务的结果链接到分布式组中)
本文介绍了将芹菜任务的结果链接到分布式组中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
与this other question中一样,我希望从一个芹菜任务返回的列表中创建一个芹菜组。第一个任务将返回一个列表,第二个任务将该列表分解为列表中每一项的并发任务。计划是在下载内容时使用它。第一个任务从网站获取链接,第二个任务是一个链,它下载页面,对其进行处理,然后将其上传到S3。最后,一旦所有的子页面都完成了,网站在我们的数据库中被标记为完成。类似于:
chain(
get_links_from_website.si('https://www.google.com'),
dmap.s( # <-- Distributed map
download_sub_page.s() |
process_sub_page.s() |
upload_sub_page_to_s3.s()
),
mark_website_done.s()
)
到目前为止,我看到的解决方案似乎在这方面做得很好,但当第二个任务是链时,由于clone
不执行深度复制(有关详细信息,请参阅the comments on this answer):
@task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
它还有一个问题,如果迭代器的长度为10,000个项目,则它将创建一个包含10,000个项目的组。正如您可以想象的那样,这正在扩大我们的内存使用量。
因此,我正在寻找一种dmap
实现这一点的方法:
- 不会通过创建可怕的组来炸毁RAM(也许有一种方法可以分块遍历可迭代对象?)
- 在芹菜连锁店上工作,深度复制没有问题。
推荐答案
芹菜画布提供chunks将任务拆分成块。遗憾的是,这不适用于Chain、Group等基元类型。
您可以使用芹菜信号来防止DMAP/克隆出现问题。
ch = chain(
download_sub_page.s(),
process_sub_page.s(),
upload_sub_page.s(),
)
@task_success.connect(sender='get_links_from_website')
def task_success_handler(sender=None, headers=None, body=None, **kwargs):
result = kwargs['result']
header = [ch(i) for i in result]
callback = mark_website_done.si()
chord(header)(callback)
创建一个处理页面的链,并使用Chord将最后一个任务挂接到它。只要get_links_from_website
成功运行,就会执行此函数。
根据Chain花费的时间,您还可以将get_links_from_website
的结果保存在某个地方。然后迭代它们中的一批以将链排队,并且使用最后一批,您可以将回调挂钩到最后一个任务。
这篇关于将芹菜任务的结果链接到分布式组中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
沃梦达教程
本文标题为:将芹菜任务的结果链接到分布式组中
基础教程推荐
猜你喜欢
- 使用 Google App Engine (Python) 将文件上传到 Google Cloud Storage 2022-01-01
- Python 的 List 是如何实现的? 2022-01-01
- 症状类型错误:无法确定关系的真值 2022-01-01
- 合并具有多索引的两个数据帧 2022-01-01
- 使 Python 脚本在 Windows 上运行而不指定“.py";延期 2022-01-01
- 将 YAML 文件转换为 python dict 2022-01-01
- 如何在 Python 中检测文件是否为二进制(非文本)文 2022-01-01
- 使用Python匹配Stata加权xtil命令的确定方法? 2022-01-01
- 如何在Python中绘制多元函数? 2022-01-01
- 哪些 Python 包提供独立的事件系统? 2022-01-01