手把手写一个Celery,带你轻松搞定Python异步任务

异步任务,这个听起来有点神秘的家伙,常常藏在一些复杂应用的背后,默默帮我们处理那些“慢到拖沓”的事情。什么发邮件、生成报告、清理数据啊,没它还真有点麻烦。今天就来聊聊Celery,一个专门用来管理异步任务的库,简单高效又强大。

Celery的本事呢,就是把那些耗时的操作交给后台处理,不让程序被某个“龟速”任务卡住。就像开车遇到堵车,不如给程序装个“超车道”,把不急的事甩到后面,继续专注眼前的路。Celery的出现,可以让Python的“堵车”问题迎刃而解。

1. 先有个总账本:Celery的结构

Celery的运行结构其实挺简单:它的核心结构就三样东西——任务(Task)任务队列(Broker)结果储存(Backend)

  • 任务:顾名思义,就是我们要让Celery去做的事儿。比如发送邮件、图片处理啥的。写好任务后,我们再把它塞进任务队列中。
  • 任务队列:队列的意思就是像排队一样,所有任务按顺序处理,不会乱套。Celery推荐用RedisRabbitMQ来当队列。
  • 结果储存:任务执行完了,有些情况下还需要查一下结果对吧?比如查查邮件有没有发出去,报告有没有生成完。Celery支持多种储存方式,比如Redis、数据库等。

简单总结:任务是操作,队列是安排任务的位置,结果储存是给任务结果找个家。接下来看看代码怎么写。

2. 手把手写一个Celery任务

用Celery写任务,最简单的流程如下:先导入celery,然后定义任务函数,最后启动worker让它开始干活。

创建一个Celery实例

首先需要创建一个Celery实例,用它来组织任务。

from celery import Celery

# 创建Celery实例,并定义任务队列使用redis
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

温馨提示RedisRabbitMQ都可以当broker,但记得要先安装和配置好这些消息队列,不然Celery就不知道去哪儿排队了。

定义任务

任务写好后,Celery就负责“通知”这些任务去完成。

@app.task
def send_email(email_address):
    print(f"Sending email to {email_address}")
    # 模拟邮件发送时间
    time.sleep(5)
    return f"Email sent to {email_address}"

@app.task这个装饰器是Celery的标签,标记了send_email这个函数是一个任务。这里用到了Python的time.sleep(5)模拟耗时任务。

执行任务

任务写好后,可以用.delay()把它丢到队列里,后面就是Celery的事儿啦:

send_email.delay("test@example.com")

.delay()就是把任务放进队列的指令。只要一调用delay(),Celery就会在后台处理这个任务,而主程序可以继续跑下去,不用干等着。

3. 设置结果储存,记录任务结果

有时我们想知道任务是否完成、结果是什么。这个时候,我们就要设置一个“结果储存”的地方。

# 修改Celery配置,指定backend为Redis
app.conf.update(
    result_backend='redis://localhost:6379/0'
)

这样一来,我们的任务会自动把结果存在Redis中。要查询结果,只需:

result = send_email.delay("test@example.com")
print(result.get(timeout=10))

.get()会在任务完成后返回结果,参数timeout=10的意思是最多等10秒钟。注意,这会让程序停下来等待结果,异步任务的初衷就是不等结果,但有时为了监测任务状态可以这样用。

4. 定时任务:定时处理那些必须定期执行的操作

如果你有些任务是每天、每周、每月定期执行的,比如数据库清理、自动发送报告等,Celery帮你轻松搞定。Celery配合Celery Beat就能实现定时任务。

配置定时任务

首先要安装celery[redis]celery[beat],以便让Celery支持定时功能。接着,通过Celery的配置来定义一个定时任务:

from celery.schedules import crontab

app.conf.beat_schedule = {
    'send-report-every-morning': {
        'task': 'tasks.send_report',
        'schedule': crontab(hour=7, minute=30),
    },
}

这里定义了一个任务send_report,每天早上7点半执行。crontab就是定时器,它可以像Linux中的cron命令一样控制任务的执行时间。

温馨提示Celery Beat会持续运行,来检查有没有要执行的定时任务。所以在服务器上部署时别忘了启动beat

5. 多任务并行处理

如果任务特别多,Celery支持多任务并行来加速处理。我们可以通过Worker来实现。

启动多个Worker

可以在命令行启动多个worker来实现并行处理:

celery -A tasks worker --loglevel=info --concurrency=4

参数--concurrency=4表示同时启动4个worker来处理任务。根据你的机器性能,增加concurrency的值可以让任务处理得更快。

自动扩展任务量

假如任务量猛增,我们可以用Celery Flower来监控、控制任务的执行情况。只要安装flower,然后运行以下命令就可以启动一个web管理界面:

celery -A tasks flower

访问localhost:5555,就能看到Celery的任务、worker运行状态等详细信息啦!

6. 异常处理与重试机制

在生产环境中,某些任务可能会失败(比如网络波动导致请求失败),我们可以设置任务自动重试。

添加重试机制

Celery的任务装饰器支持autoretry_formax_retries等参数,用来定义自动重试。

from celery import Celery
from celery.exceptions import Retry

@app.task(autoretry_for=(Exception,), max_retries=3, retry_backoff=True)
def fetch_data():
    # 模拟网络请求
    if random.choice([True, False]):
        raise Exception("Network error")
    return "Data fetched successfully"

这里设置了任务在失败时自动重试3次,retry_backoff=True会逐次增加等待时间。这样即使任务失败了,也能有重试的机会。

小贴士重试机制适合短时失败的任务,不适用于本身耗时的任务,不然可能会浪费性能资源。


学习完了Celery,可以看到它不光帮我们解决异步任务,还可以定时、并行、重试、监控,让Python程序变得更强大、更灵活。

来源:沐子 沐子曰

THE END