在 Django 单元测试中使用 mock 修补 celery 任务

Using mock to patch a celery task in Django unit tests(在 Django 单元测试中使用 mock 修补 celery 任务)

本文介绍了在 Django 单元测试中使用 mock 修补 celery 任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 python 模拟库来修补在我的 django 应用程序中保存模型时运行的 Celery 任务,以查看它是否被正确调用.

I'm trying to use the python mock library to patch a Celery task that is run when a model is saved in my django app, to see that it's being called correctly.

基本上,任务是在 myapp.tasks 中定义的,并像这样在我的 models.py 文件的顶部导入:

Basically, the task is defined inside myapp.tasks, and is imported at the top of my models.py-file like so:

from .tasks import mytask

...然后使用 mytask.delay(foo, bar) 在模型内部的 save() 上运行.到目前为止一切顺利 - 当我实际运行 Celeryd 等时效果很好.

...and then runs on save() inside the model using mytask.delay(foo, bar). So far so good - works out fine when I'm actually running Celeryd etc.

我想构建一个模拟任务的单元测试,只是为了检查它是否被正确的参数调用,并且实际上并没有尝试运行 Celery 任务.

I want to construct a unit test that mocks the task, just to check that it gets called with the correct arguments, and doesn't actually try to run the Celery task ever.

所以在测试文件中,我在标准 TestCase 中有这样的内容:

So in the test file, I've got something like this inside of a standard TestCase:

from mock import patch # at the top of the file

# ...then later
def test_celery_task(self):
    with patch('myapp.models.mytask.delay') as mock_task:
        # ...create an instance of the model and save it etc
        self.assertTrue(mock_task.called)

...但它永远不会被调用/总是错误的.我尝试了各种化身(修补 myapp.models.mytask ,并检查是否调用了 mock_task.delay .我从模拟文档中收集到导入路径至关重要,谷歌搜索告诉我它应该是在被测模块内部看到的路径(应该是 myapp.models.mytask.delay 而不是 myapp.tasks.mytask.delay,如果我理解正确的话).

...but it never gets called/is always false. I've tried various incarnations (patching myapp.models.mytask instead, and checking if mock_task.delay was called instead. I've gathered from the mock docs that the import path is crucial, and googling tells me that it should be the path as it is seen inside the module under tests (which would be myapp.models.mytask.delay rather than myapp.tasks.mytask.delay, if I understand it correctly).

我哪里错了?修补 Celery 任务是否有一些特定的困难?我可以修补 celery.task(用作 mytask 的装饰器)吗?

Where am I going wrong here? Is there some specific difficulties in patching Celery tasks? Could I patch celery.task (which is used as a decorator to mytask) instead?

推荐答案

您遇到的问题与这是 Celery 任务这一事实无关.你只是碰巧修补了错误的东西.;)

The issue that you are having is unrelated to the fact that this is a Celery task. You just happen to be patching the wrong thing. ;)

具体来说,您需要找出哪个视图或其他文件正在导入mytask"并在那里对其进行修补,因此相关行如下所示:

Specifically, you need to find out which view or other file is importing "mytask" and patch it over there, so the relevant line would look like this:

with patch('myapp.myview.mytask.delay') as mock_task:

这里有更多的味道:

http://www.voidspace.org.uk/python/mock/patch.html#where-to-patch

这篇关于在 Django 单元测试中使用 mock 修补 celery 任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!

本文标题为:在 Django 单元测试中使用 mock 修补 celery 任务

基础教程推荐