手把手写一个Celery,带你轻松搞定Python异步任务
异步任务,这个听起来有点神秘的家伙,常常藏在一些复杂应用的背后,默默帮我们处理那些“慢到拖沓”的事情。什么发邮件、生成报告、清理数据啊,没它还真有点麻烦。今天就来聊聊Celery,一个专门用来管理异步任务的库,简单高效又强大。
Celery的本事呢,就是把那些耗时的操作交给后台处理,不让程序被某个“龟速”任务卡住。就像开车遇到堵车,不如给程序装个“超车道”,把不急的事甩到后面,继续专注眼前的路。Celery的出现,可以让Python的“堵车”问题迎刃而解。
1. 先有个总账本:Celery的结构
Celery的运行结构其实挺简单:它的核心结构就三样东西——任务(Task)、任务队列(Broker)和结果储存(Backend)。
-
任务:顾名思义,就是我们要让Celery去做的事儿。比如发送邮件、图片处理啥的。写好任务后,我们再把它塞进任务队列中。 -
任务队列:队列的意思就是像排队一样,所有任务按顺序处理,不会乱套。Celery推荐用Redis或RabbitMQ来当队列。 -
结果储存:任务执行完了,有些情况下还需要查一下结果对吧?比如查查邮件有没有发出去,报告有没有生成完。Celery支持多种储存方式,比如Redis、数据库等。
简单总结:任务是操作,队列是安排任务的位置,结果储存是给任务结果找个家。接下来看看代码怎么写。
2. 手把手写一个Celery任务
用Celery写任务,最简单的流程如下:先导入celery,然后定义任务函数,最后启动worker让它开始干活。
创建一个Celery实例
首先需要创建一个Celery实例,用它来组织任务。
from celery import Celery
# 创建Celery实例,并定义任务队列使用redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
温馨提示:Redis、RabbitMQ都可以当broker,但记得要先安装和配置好这些消息队列,不然Celery就不知道去哪儿排队了。
定义任务
任务写好后,Celery就负责“通知”这些任务去完成。
@app.task
def send_email(email_address):
print(f"Sending email to {email_address}")
# 模拟邮件发送时间
time.sleep(5)
return f"Email sent to {email_address}"
@app.task
这个装饰器是Celery的标签,标记了send_email
这个函数是一个任务。这里用到了Python的time.sleep(5)
模拟耗时任务。
执行任务
任务写好后,可以用.delay()
把它丢到队列里,后面就是Celery的事儿啦:
send_email.delay("test@example.com")
.delay()
就是把任务放进队列的指令。只要一调用delay()
,Celery就会在后台处理这个任务,而主程序可以继续跑下去,不用干等着。
3. 设置结果储存,记录任务结果
有时我们想知道任务是否完成、结果是什么。这个时候,我们就要设置一个“结果储存”的地方。
# 修改Celery配置,指定backend为Redis
app.conf.update(
result_backend='redis://localhost:6379/0'
)
这样一来,我们的任务会自动把结果存在Redis中。要查询结果,只需:
result = send_email.delay("test@example.com")
print(result.get(timeout=10))
.get()
会在任务完成后返回结果,参数timeout=10
的意思是最多等10秒钟。注意,这会让程序停下来等待结果,异步任务的初衷就是不等结果,但有时为了监测任务状态可以这样用。
4. 定时任务:定时处理那些必须定期执行的操作
如果你有些任务是每天、每周、每月定期执行的,比如数据库清理、自动发送报告等,Celery帮你轻松搞定。Celery配合Celery Beat就能实现定时任务。
配置定时任务
首先要安装celery[redis]
和celery[beat]
,以便让Celery支持定时功能。接着,通过Celery的配置来定义一个定时任务:
from celery.schedules import crontab
app.conf.beat_schedule = {
'send-report-every-morning': {
'task': 'tasks.send_report',
'schedule': crontab(hour=7, minute=30),
},
}
这里定义了一个任务send_report
,每天早上7点半执行。crontab
就是定时器,它可以像Linux中的cron
命令一样控制任务的执行时间。
温馨提示:Celery Beat
会持续运行,来检查有没有要执行的定时任务。所以在服务器上部署时别忘了启动beat
。
5. 多任务并行处理
如果任务特别多,Celery支持多任务并行来加速处理。我们可以通过Worker来实现。
启动多个Worker
可以在命令行启动多个worker来实现并行处理:
celery -A tasks worker --loglevel=info --concurrency=4
参数--concurrency=4
表示同时启动4个worker来处理任务。根据你的机器性能,增加concurrency
的值可以让任务处理得更快。
自动扩展任务量
假如任务量猛增,我们可以用Celery Flower来监控、控制任务的执行情况。只要安装flower
,然后运行以下命令就可以启动一个web管理界面:
celery -A tasks flower
访问localhost:5555,就能看到Celery的任务、worker运行状态等详细信息啦!
6. 异常处理与重试机制
在生产环境中,某些任务可能会失败(比如网络波动导致请求失败),我们可以设置任务自动重试。
添加重试机制
Celery的任务装饰器支持autoretry_for
和max_retries
等参数,用来定义自动重试。
from celery import Celery
from celery.exceptions import Retry
@app.task(autoretry_for=(Exception,), max_retries=3, retry_backoff=True)
def fetch_data():
# 模拟网络请求
if random.choice([True, False]):
raise Exception("Network error")
return "Data fetched successfully"
这里设置了任务在失败时自动重试3次,retry_backoff=True
会逐次增加等待时间。这样即使任务失败了,也能有重试的机会。
小贴士:重试机制适合短时失败的任务,不适用于本身耗时的任务,不然可能会浪费性能资源。
学习完了Celery,可以看到它不光帮我们解决异步任务,还可以定时、并行、重试、监控,让Python程序变得更强大、更灵活。
来源: