Django整合Celery库

celery基础

常用的启动worker的方式:
celery -A proj worker -Q hipri,celery -l info
说明: The --app argument specifies the Celery app instance to use, it must be in the form of module.path:attribute
-A proj的proj是celery实例
那么celery 是怎么找到proj这个celery实例呢?
With --app=proj:
1.an attribute named proj.app, or
2.an attribute named proj.celery, or
3.any attribute in the module proj where the value is a Celery application, or
If none of these are found it’ll try a submodule named proj.celery:
4.an attribute named proj.celery.app, or
5.an attribute named proj.celery.celery, or
6.Any attribute in the module proj.celery where the value is a Celery application.

proj 是 Celery实例对象的第一个参数
e.g.

app = Celery('proj', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0', include=['proj.tasks'])  

celery worker - Start worker instance./启动worker实例 -l info 日志记录级别,完整的有:DEBUG, INFO, WARNING, ERROR, CRITICAL, or FATAL.
-Q 指定的任务队列,其实,如果你不显示的指定队列,默认的就是celery。
默认情况下woker数量是本机CPU的2倍,其实再多就没说明意义,worker太多,可能资源都浪费在进程调度上了。

signature概念。

celery的signature有点类似于闭包的概念,将一个signature赋值给一个变量,以便后面使用,比如在group或者chain里使用。
e.g.

chain(add.s(4,4) | mul.s(8))().get()  

celery 与 Django的整合。

celery的配置可以在django里的settings.py里定义,比如:

CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
CELERY_TASK_SERIALIZER = 'json'

启动的方式与单独的启动celery worker是一样的。

celery scheduler

这个类似于crontab,可以定时执行任务
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#id8

pip install django-celery-beat

INSTALLED_APPS = (
    ...,
    'django_celery_beat',
)

python manage.py migrate

启动
celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

PHP做客户端(库:celery-php)

https://github.com/gjedeer/celery-php

$c = new \Celery\Celery(
    '127.0.0.1', /* Server */
    '', /* Login */
    '', /* Password */
    0, /* vhost */
    'celery', /* exchange */
    'celery', /* binding */
    6379, /* port */
    'redis' /* connector */
);

$result = $c->PostTask('polls.tasks.test_send_sms', []);

while (!$result->isReady())    {
    sleep(1);
    echo '...';
}

if ($result->isSuccess()) {
    echo $result->getResult();
} else {
    echo "ERROR";
    echo $result->getTraceback();
}

Leave a Comment

Your email address will not be published. Required fields are marked *

PHP 8.1.1 - 17.848 ms, 0 Q