celery基础
常用的启动worker的方式:
celery -A proj worker -Q hipri,celery -l info
说明:
The --app argument specifies the Celery app instance to use, it must be in the form of module.path:attribute
-A proj的proj是celery实例
那么celery 是怎么找到proj这个celery实例呢?
With --app=proj:
1.an attribute named proj.app, or
2.an attribute named proj.celery, or
3.any attribute in the module proj where the value is a Celery application, or
If none of these are found it’ll try a submodule named proj.celery:
4.an attribute named proj.celery.app, or
5.an attribute named proj.celery.celery, or
6.Any attribute in the module proj.celery where the value is a Celery application.
proj 是 Celery实例对象的第一个参数
e.g.
app = Celery('proj', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0', include=['proj.tasks'])
celery worker - Start worker instance./启动worker实例
-l info 日志记录级别,完整的有:DEBUG, INFO, WARNING, ERROR, CRITICAL, or FATAL.
-Q 指定的任务队列,其实,如果你不显示的指定队列,默认的就是celery。
默认情况下woker数量是本机CPU的2倍,其实再多就没说明意义,worker太多,可能资源都浪费在进程调度上了。
signature概念。
celery的signature有点类似于闭包的概念,将一个signature赋值给一个变量,以便后面使用,比如在group或者chain里使用。
e.g.
chain(add.s(4,4) | mul.s(8))().get()
celery 与 Django的整合。
celery的配置可以在django里的settings.py里定义,比如:
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
CELERY_TASK_SERIALIZER = 'json'
启动的方式与单独的启动celery worker是一样的。
celery scheduler
这个类似于crontab,可以定时执行任务
http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#id8
pip install django-celery-beat
INSTALLED_APPS = (
...,
'django_celery_beat',
)
python manage.py migrate
启动
celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
PHP做客户端(库:celery-php)
https://github.com/gjedeer/celery-php
$c = new \Celery\Celery(
'127.0.0.1', /* Server */
'', /* Login */
'', /* Password */
0, /* vhost */
'celery', /* exchange */
'celery', /* binding */
6379, /* port */
'redis' /* connector */
);
$result = $c->PostTask('polls.tasks.test_send_sms', []);
while (!$result->isReady()) {
sleep(1);
echo '...';
}
if ($result->isSuccess()) {
echo $result->getResult();
} else {
echo "ERROR";
echo $result->getTraceback();
}