基于Celery的背景任务

Celery 是一个 Python 的任务队列,包含线程/进程池。 它以前有一个Flask集成,但是在Celery的内部版本与第3版的一些重组后变得不必要了。本指南填写了如何正确使用Ce​​lery with Flask的空白,但假定您通常已经阅读Celery官方文档中的Celery第一步指南。

安装Celery

Celery在Python包索引(PyPI)上,因此可以使用标准的Python工具(如pipeasy_install)安装:

$ pip install celery

配置Celery

你需要的第一个东西是一个 Celery 实例,称为 Celery 应用。 它与Flask中的Flask对象具有相同的目的,仅用于Celery。 因为这个实例用 于你在 Celery 中做任何事——诸如创建任务和管理职程(Worker)——的入口点, 它必须可以在其它模块中导入。

例如,你可以把它放置到 tasks 模块中。 虽然您可以使用Celery而无需使用Flask进行任何重新配置​​,但通过对任务进行子类化并添加对Flask应用程序上下文的支持并将其与Flask配置挂钩,它变得更好一些。

这是所有必要的正确整合Celery与Flask:

from celery import Celery

def make_celery(app):
    celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

该函数创建一个新的 Celery 对象,并用应用配置来配置中间人(Broker), 用 Flask 配置更新其余的 Celery 配置,之后在应用上下文中创建一个封装任务 执行的任务子类。

最小示例

有了我们上面的内容,这是使用Celery with Flask的最小示例:

from flask import Flask

flask_app = Flask(__name__)
flask_app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(flask_app)


@celery.task()
def add_together(a, b):
    return a + b

这项任务可以在后台调用:

>>> result = add_together.delay(23, 42)
>>> result.wait()
65

运行Celery Worker

现在如果你行动迅速,已经执行过了上述的代码,你会失望地得知 .wait() 永远不会实际地返回。 这是因为你也需要运行 Celery。 你可以通过作为工人运行芹菜来做到这一点:

$ celery -A your_application.celery worker

your_application 字符串需要指向创建 celery 对象的应用所在包或模块。