如何在姜戈测试芹菜周期任务?

How to test celery periodic_task in Django?(如何在姜戈测试芹菜周期任务?)

本文介绍了如何在姜戈测试芹菜周期任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的定期任务:

from celery.decorators import periodic_task
from celery.task.schedules import crontab
from .models import Subscription

@periodic_task(run_every=crontab(minute=0, hour=0))
def deactivate_subscriptions():
    for subscription in Subscription.objects.filter(is_expired=True):
        print(subscription)
        subscription.is_active = False
        subscription.can_activate = False
        subscription.save()

并且我想用测试来覆盖它。

我找到有关如何测试简单任务的信息,如@SHARED_TASK,但我在任何地方都找不到测试示例@periodic_task

推荐答案

使用函数apply()。Documentation for apply().

这篇关于如何在姜戈测试芹菜周期任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:如何在姜戈测试芹菜周期任务?

基础教程推荐