celery分布式异步任务队列

celery是一个基于分布消息传递的异步任务队列.它一定需要建立在一个分布的消息传递机制上,这个消息传递机制就是celery文档里常说的broker。

celery隐藏了rabbitmq接口的实现细节,既充当了publisher(client)又充当了consumer (worker)的角色。

'''思考一下,如果我们用rabbitmq自己实现任务队列,有一天我们不想用rabbit了怎么办?我们换个思维,如果没有celery,让你自己设计一个异步任务队列你怎么做。首先,要有一个发起任务的client,选定一定保存任务信息的媒介,由一个worker去一直监听这个信息媒介,这个worker最好是多进程的,另外可以兼容尽可能多得信息媒介。好吧,这个不就是celery所做的事儿么,celery兼容多个broker,既是任务发起者又是执行者,另外支持多进程…还有好多通用功能考虑。
'''

假设项目的目录结构是:

1
2
task--
--celeryapp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 启动消费者
celery -A task.celeryapp worker -l info -c 5

# 消费者异常退出后 ,正常情况下重启后会继续消息已经积压的任务, 看情况要将过期任务清除

# 启动web监控
celery -A task.celeryapp flower -l info --basic_auth=user1:111111
flower -A task.celeryapp --port=8091


# 启动Beat进程,定时任务, 定时将任务发送到broker
celery beat -A task.celeryapp -l info

# 同时启动 消费者和定时任务
celery -B -A task.celeryapp worker -l info

# 也可以在当前目录的上一层来启动 需指定 package.celeryapp


# 后台启动 定时任务和消费者
celery multi restart w1 -B -A task.celeryapp -l info


# celery status -A celery_task
celery -A task.celeryapp inspect stats

# celery.send_task()这个方法解决了producer和consumer的网路拓扑传递数据问题。
celery=Celery()
celery.config_from_object('task.celeryconfig')
celery.send_task('tq.tasks.test', ("hello world",))

# 一般调用方法
from task.tasks import test
print test.delay('param from invoke ')



# 停止正在执行的任务
# 使用方法名调用 可以避免依赖方法的实现代码
...
celery.config_from_object('task.celeryconfig')
from celery.task.control import revoke
revoke('aac00b9c-f701-4a21-bed4-53a8b865a39a', terminate=True)



from celery.result import AsyncResult
res=AsyncResult(t.task_id)
res.state
<!---->

res.revoke(terminate=True)



# 配置信息
json.dumps(celeryapp.control.inspect().conf(), indent=2)

# 已注册的任务列表(得到的列表可以在web页面上做处理, 文件手动触发)
celeryapp.control.inspect().registered_tasks()

# 任务执行情况
celeryapp.control.inspect().active()

# 任务执行情况
celeryapp.control.inspect().stats()



1
2
3
在使用flower时遇到  'stats' inspect method failed , 最终通安装指定版本的kombu 解决 4.5.0
pipenv install kombu==4.5.0