将芹菜任务的结果链接到分布式组中

Chain a celery task#39;s results into a distributed group(将芹菜任务的结果链接到分布式组中)

本文介绍了将芹菜任务的结果链接到分布式组中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

与this other question中一样,我希望从一个芹菜任务返回的列表中创建一个芹菜组。第一个任务将返回一个列表,第二个任务将该列表分解为列表中每一项的并发任务。

计划是在下载内容时使用它。第一个任务从网站获取链接,第二个任务是一个链,它下载页面,对其进行处理,然后将其上传到S3。最后,一旦所有的子页面都完成了,网站在我们的数据库中被标记为完成。类似于:

chain(
    get_links_from_website.si('https://www.google.com'),
    dmap.s(  # <-- Distributed map
        download_sub_page.s() | 
        process_sub_page.s() | 
        upload_sub_page_to_s3.s()
    ),
    mark_website_done.s()
)

到目前为止,我看到的解决方案似乎在这方面做得很好,但当第二个任务是链时,由于clone不执行深度复制(有关详细信息,请参阅the comments on this answer):

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

它还有一个问题,如果迭代器的长度为10,000个项目,则它将创建一个包含10,000个项目的组。正如您可以想象的那样,这正在扩大我们的内存使用量。

因此,我正在寻找一种dmap实现这一点的方法:

  • 不会通过创建可怕的组来炸毁RAM(也许有一种方法可以分块遍历可迭代对象?)
  • 在芹菜连锁店上工作,深度复制没有问题。

推荐答案

芹菜画布提供chunks将任务拆分成块。遗憾的是,这不适用于Chain、Group等基元类型。

您可以使用芹菜信号来防止DMAP/克隆出现问题。

ch = chain(
    download_sub_page.s(),
    process_sub_page.s(),
    upload_sub_page.s(),
)

@task_success.connect(sender='get_links_from_website')
def task_success_handler(sender=None, headers=None, body=None, **kwargs):
    result = kwargs['result']    
    header = [ch(i) for i in result]
    callback = mark_website_done.si()
    chord(header)(callback)
创建一个处理页面的链,并使用Chord将最后一个任务挂接到它。只要get_links_from_website成功运行,就会执行此函数。

根据Chain花费的时间,您还可以将get_links_from_website的结果保存在某个地方。然后迭代它们中的一批以将链排队,并且使用最后一批,您可以将回调挂钩到最后一个任务。

这篇关于将芹菜任务的结果链接到分布式组中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:将芹菜任务的结果链接到分布式组中

基础教程推荐