杂谈-FastAPI中的异步后台任务之Celery篇
前言
前些时间有人问起关于如何在FastAPI中进行异步任务的处理。对此也刚好有一定的使用经验,借此机会也谈谈一下。
异步任务实现方式
在FastAPI中的异步任务的方式有几种: 在FastAPI中,有以下几种方式可以处理异步任务: 首先我们所熟知的异步协程或线程等方式实现异步任务是最常见,如下我们最场景的就是基于协程方式的异步API接口。如我们经常使用async/await关键字:可以在路由处理函数中使用async/await关键字来处理异步任务。例如:
from fastapi import FastAPI
app = FastAPI()@app.get("/async_task")
async def async_task():# 异步任务的代码,并等待执行完成
    await some_async_function()
    return {"message": "OK"}但是有点我们是需要注意,上面的只是说我们实现了异步任务,但是由于我们在接口中进行await,所以尽管的任务是异步,但是还是需要等待,我们本节主要是学习相关异步化后台的任务,也就是说我们某些任务放置到后台或其他地方去执行,不影响当前主线程的运行。
BackgroundTasks异步化的后台任务实现方式
在前面示例中,对于的任务执行了await等待,就会进行任务挂起并等待。假如some_async_function()的任务需要耗时比较久的话,且不在意它相关的处理结果,或者结果可以通过另一种方式进行通知的话,我们可以把some_async_function()的任务进行后台话,可以理解为放到另一个线程去执行,而不而不需要await等待,如之前的代码,我们可以使用FasAPI框架所提供的BackgroundTasks的BackgroundTasks类来处理后台任务,如下示例代码:
from fastapi import FastAPI, BackgroundTasks
app = FastAPI()def some_async_function():# 后台任务的代码
    pass@app.get("/background_task")
async def background_task(background_tasks: BackgroundTasks):
    background_tasks.add_task(some_async_function)
    return {"message": "后台任务添加成功"}当然上面这种方式也不合适长期运行耗时的任务,所以引出我们本小结主要需要了解的Celery库,—芹菜. 使用Celery我们可以基于异步任务队列的方式来处理异步任务。
Celery库—芹菜.异步化的后台任务实现方式
下面我一步一步的介绍如何基于Celery实现异步任务以及延迟任务。
首先我们从官网介绍总结一下关于Celery,它是一个基于Python的分布式任务队列框架,用于实现异步任务的调度和执行。它的主要作用是可以帮助我们将耗时的任务从主线程中分离出来,不因任务的耗时而空等,以此来提高我们系统的并发性和响应速度。
Celery是用来处理异步任务,所以我们可以使用它来处理以下类似的一些业务场景,例如发送电子邮件、生成报表、处理大量数据等。通过将这些任务放入任务队列中,可以让主线程继续处理其他请求,而不需要等待任务完成。当然Celery还提供定时任务的调度功能,可以让我们按照设定的时间间隔或者时间点执行任务。
Celery简单使用步骤
通常我们使用Celery需要进行以下相关以下大致的几个步骤:
- 实例化Celery对象来创建一个Celery应用。
- 基于已实例化的任务的实例对象来定于该实例所包含的任务:通常我们是使用装饰器将函数注册为Celery任务,并设置任务的参数和返回值。(加入我们的是在相关的Fastapi框架进行使用,通常我们需要做就是实例化对象并声明任务,并进行任务的发布,通常是通过调用Celery应用的apply_async()方法来提交任务到任务队列中。)
3.当我们的任务发布之后,任务会进入一个消息队列里面进行等待,等待消费者去消费,所以接下里我们需要启动Celery Worker:启动Celery Worker的消费者对象来处理任务队列中的任务。
例如:
1 定义任务和发布任务
主要我们这里使用消息代理的中间件是redis,所以是先启动我们的redis,
 接着定义相关任务,如下示例代码:
 接着定义相关任务,如下示例代码:
使用uvicorn.run函数运行了一个应用程序。它指定了应用程序的主机和端口,并且设置了reload参数为True。
uvicorn.run(f"{inspect.getmodulename(Path(__file__).name)}:app", host='127.0.0.1', port=31110, reload=True,workers=1)app = FastAPI()
celery_app = Celery("tasks", broker="redis://localhost:6379/0")
# 定义任务
@celery_app.task
def some_celery_task():
# Celery任务的代码
    pass
@app.get("/celery_task")
async def celery_task():
# 开始发布任务
    some_celery_task.delay()
    return {"message": "Celery 任务发布成功"}
if __name__ == "__main__":
# 使用os.path.basename函数获取了当前文件的名称,并将.py文件扩展名替换为空字符串
# import os
# app_modeel_name = os.path.basename(__file__).replace(".py", "")
    from pathlib import Path
# 使用Path函数获取了当前文件的名称,并将.py文件扩展名替换为空字符串
# app_modeel_name = Path(__file__).name.replace(".py", "")
    import uvicorn
    import inspect
# 根据文件路径返回模块名
# print("app_modeel_name:",inspect.getmodulename(Path(__file__).name))
# 使用uvicorn.run函数运行了一个应用程序。它指定了应用程序的主机和端口,并且设置了reload参数为True。
    uvicorn.run(f"{inspect.getmodulename(Path(__file__).name)}:app", host='127.0.0.1', port=31110, reload=True,workers=1)启动应用并发布任务:

在FastAPI应用程序,当调用 some_celery_task.delay() 来发布任务时,Celery将把任务放入Redis队列中。
2 启动Celery worker进程,进行任务消费
在命令行中,切换到您的项目目录。启动Celery worker进程。在命令行中运行以下命令:
celery -A main.celery_app worker --loglevel=info启动后得到如下图所示的结果:
 Celery worker进程将从Redis队列中获取任务,并执行任务所定义的代码。当任务被执行时,您将在Celery worker进程的日志中看到相关的日志消息。但是我们运行时候遇到了问题如下:
 Celery worker进程将从Redis队列中获取任务,并执行任务所定义的代码。当任务被执行时,您将在Celery worker进程的日志中看到相关的日志消息。但是我们运行时候遇到了问题如下:
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)The full contents of the message headers:
{'lang': 'py', 'task': 'main_celely.some_celery_task', 'id': 'e134d62c-0d8c-46b3-86a8-981411f41eac', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'e134d62c-0d8c-46b3-86a8-981411f41eac', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen15648@xiaozhong', 'ignore_result': False, 'stamped_headers': None, 'stamps': {}}The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
  File "D:code_loaclmm_ring_v2venvLibsite-packagesceleryworkerconsumerconsumer.py", line 642, in on_task_received
    strategy = strategies[type_]
               ~~~~~~~~~~^^^^^^^出现上述原因问题在于我们的:
@celery_app.task
def some_celery_task():
# Celery任务的代码
    pass没有进行相关实例绑定,我们可以修改为:
@celery_app.task(bind=True)
def some_celery_task(self):
# Celery任务的代码
    import time
    time.sleep(10)
    pass然后每次发布任务的时候,重新观察消费者的任务输出信息如下:
 如上的输出,表示我们的任务已经被正常进行消费了!
 如上的输出,表示我们的任务已经被正常进行消费了!
Celery 详解之相关参数项
在上面实例我们已简单完成相关异步任务的处理,通常我们一般需要使用提各种参数来配置和控制其行为。下面介绍一下Celery一些常用的配置项:
PS:配置选项的名称和具体含义可能会因Celery的版本而有所不同。
- broker:
- 类型:字符串
- 默认值:'amqp://guest:guest@localhost//'
- 描述:指定消息代理(broker)的URL地址,用于任务消息的传输。Celery支持多种消息代理,如RabbitMQ、Redis等。
 
- backend:
- 类型:字符串
- 默认值:None
- 描述:指定结果存储后端的URL地址。当任务执行完成后,结果将被存储在指定的后端中,以供后续查询和获取。
 
- include:
- 类型:列表
- 默认值:[]
- 描述:指定要包含的任务模块列表。这些任务模块中的任务函数将被Celery自动发现和注册。
 
- task_track_started:
- 类型:布尔值
- 默认值:False
- 描述:设置为True时,Celery将跟踪任务的开始状态,并在任务开始时发送任务状态更新。
 
- task_time_limit:
- 类型:整数
- 默认值:None
- 描述:设置任务的最大运行时间(以秒为单位)。如果任务执行时间超过此限制,Celery将会终止任务。
 
- task_soft_time_limit:
- 类型:整数
- 默认值:None
- 描述:设置任务的软时间限制(以秒为单位)。软时间限制是一个警告机制,当任务执行时间接近限制时,Celery会发送警告通知。
 
- task_acks_late:
- 类型:布尔值
- 默认值:False
- 描述:设置为True时,Celery将在任务执行完成后再发送确认消息。这可以确保即使在任务执行期间发生错误,任务也不会丢失。
 
- task_ignore_result:
- 类型:布尔值
- 默认值:False
- 描述:设置为True时,Celery将不会存储任务的执行结果。这可以节省存储资源,适用于不关心任务结果的情况。
 
除了之前提到的常用配置项外,以下是一些其他常见的Celery配置项的说明:
- task_serializer:
- 类型:字符串
- 默认值:'json'
- 描述:指定任务消息的序列化器。Celery支持多种序列化器,如JSON、pickle等。
 
- result_serializer:
- 类型:字符串
- 默认值:与task_serializer相同
- 描述:指定任务结果的序列化器。任务执行完成后,结果将使用指定的序列化器进行序列化。
 
- task_default_queue:
- 类型:字符串
- 默认值:'celery'
- 描述:指定任务的默认队列名称。当任务没有指定队列时,将使用此默认队列。
 
- task_default_exchange:
- 类型:字符串
- 默认值:'celery'
- 描述:指定任务的默认交换机名称。当任务没有指定交换机时,将使用此默认交换机。
 
- task_default_routing_key:
- 类型:字符串
- 默认值:与task_default_queue相同
- 描述:指定任务的默认路由键。当任务没有指定路由键时,将使用此默认路由键。
 
- worker_concurrency:
- 类型:整数
- 默认值:根据当前系统的CPU核心数动态调整
- 描述:指定每个工作进程的并发任务数。可以根据系统的性能和资源情况进行调整。
 
- worker_prefetch_multiplier:
- 类型:整数
- 默认值:4
- 描述:指定工作进程从消息队列中预取任务的数量的乘数。较高的值可以提高任务处理的吞吐量。
 
- beat_schedule:
- 类型:字典
- 默认值:{}
- 描述:指定定时任务的调度配置。您可以定义多个定时任务,并指定它们的执行时间和要执行的任务函数。
 
······其他参数说明,有需要我得话,我在去官方文档查阅接口。接下里我,我们看看和它对于就是所谓得配置文件。配置文件其实和上面所谓得参数是没有区别得,也就是说,我们可以使用其他的方式来定义我们的参数传入,如这些配置项我们还可以可以在Celery的配置文件中进行设置,通常命名为celeryconfig.py或celery.py。如下的示例代码所示:
下面是一个使用配置文件的示例:
- 首先我们创建一个名为celeryconfig.py的配置文件,里面包含的内容如下:
每30秒执行一次
}}
# celeryconfig.py
from datetime import timedelta
from celery.schedules import crontab
broker_url = 'amqp://guest:guest@localhost//'
# 消息代理的URL
result_backend = 'redis://localhost:6379/0'
# 结果存储后端的URL
task_serializer = 'json'
# 任务的序列化器
result_serializer = 'json'
# 任务结果的序列化器
timezone = 'Asia/Shanghai'
# 使用的时区
task_acks_late = True
# 启用延迟确认
task_ignore_result = False
# 不忽略任务结果
task_soft_time_limit = 60
# 任务的软时间限制为60秒
task_time_limit = 120
# 任务的硬时间限制为120秒
worker_prefetch_multiplier = 4
# 任务执行者的预取乘数
worker_concurrency = 8
# 每个任务执行者的并发处理数量
worker_max_tasks_per_child = 100
# 每个任务执行者最大处理任务数
task_default_priority = 0
# 任务的默认优先级
task_routes = {
    'myapp.tasks.email_task': {'queue': 'email_queue'},
# 指定任务的队列
    'myapp.tasks.image_task': {'queue': 'image_queue'}
}
worker_hijack_root_logger = False
# 不重写根日志记录器
worker_disable_rate_limits = False
# 不禁用任务速率限制
beat_schedule = {
    'task1': {
        'task': 'myapp.tasks.task1',
        'schedule': crontab(minute='*/15'),
# 每15分钟执行一次
    },
    'task2': {
        'task': 'myapp.tasks.task2',
        'schedule': timedelta(seconds=30),
# 每30秒执行一次
    }
}在你的Celery应用程序中加载配置文件。
# celery_app.py
from celery import Celeryapp = Celery('myapp')
app.config_from_object('celeryconfig')在上述示例中,我们创建了一个名为celeryconfig.py的配置文件,并在其中设置了各种Celery的配置选项。然后,我们在Celery应用程序的入口文件celery_app.py中加载了该配置文件。
你可以根据自己的需求和应用程序的特定要求,在配置文件中进行相应的配置。注意,Celery的配置文件应该位于与你的Celery应用程序代码相同的目录中,或者可以通过正确的路径进行引用。
确保在启动Celery应用程序时,使用正确的配置文件加载配置。例如,通过以下命令启动Celery Worker:
celery -A celery_app worker --loglevel=infoPS: 通常win系统下可能你需要使用如下的方式启动:
#  D:code_loaclmm_ring_v2> celery -A src.tasks.app worker -n migutasks    --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
#     celery -A tasks worker --loglevel=info  -P eventlet
# 启动celery监控和管理Flower:
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5555获取使用代码的方式启动:
from src.tasks.app import celery_app
if __name__ == '__main__':# python -m celery -A src.tasks.app worker -Q mm_ring_v2  --loglevel=info -P eventlet
    celery_app.worker_main(argv=['-A', 'src.tasks.app','worker','--loglevel=info',  '-Q', 'mm_ring_v2','-P', 'eventlet'])这样,Celery将会根据配置文件中的设置来运行任务调度和执行过程。
Celery延时任务的执行
延迟任务场景我们也通常会遇到,比如超时未支付则自当取消订单等。下面举例说明一下具体的实现过程:
基本任务项目结果:
 1:定义Celery实例对象
 1:定义Celery实例对象
celery -A src.tasks.app flower –address=127.0.0.1 –port=5555
from src.tasks.service import TaskFactory
# 任务工厂,可以创建多个,目前只是创建了一个celery_app对象
factory = TaskFactory()
# 创建第一个 Celery 应用
celery_app = factory.create_celery_app(namespces='mm_sync_tasks', config_name='src.tasks.config.ProductionConfig')
celery_app.conf["worker_redirect_stdouts"] = False
# 禁止重定向工作进程的标准输出和标准错误流
def get_pending_tasks():
    with celery_app.connection() as connection:
        tasks = connection.default_channel.queue_declare(queue='celery', passive=True).message_count
    return tasks
# 移除任务队列中的所有等待执行的任务
def remove_pending_tasks():
    with celery_app.connection() as connection:
        connection.default_channel.queue_purge(queue='celery')
# 重新调度任务
def reschedule_tasks():
    pending_tasks = get_pending_tasks()
    remove_pending_tasks()
# task_prerun:任务开始运行前触发的信号。
# task_postrun:任务运行完成后触发的信号。
# task_success:任务成功完成时触发的信号。
# task_failure:任务失败时触发的信号。
# task_retry:任务重试时触发的信号。
# task_revoked:任务被撤销时触发的信号。
# task_rejected:任务被拒绝时触发的信号。
# worker_ready:Worker准备就绪时触发的信号。
@task_retry.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
    if isinstance(exception, MaxRetriesExceededError):
# 处理 MaxRetriesExceededError 异常
        print("全局异常错误获取!!", sender, task_id)
        pass
@task_retry.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
    if isinstance(exception, MaxRetriesExceededError):
# 处理 MaxRetriesExceededError 异常
        print("全局异常错误获取!!", sender, task_id)
        pass
@task_failure.connect
def handle_task_failure(sender: Task, task_id: str, exception: Exception, traceback, einfo, **kwargs):
#  # 处理任务失败的逻辑
    if isinstance(exception, MaxRetriesExceededError):
# 处理 MaxRetriesExceededError 异常
        print("全局异常错误获取!!", sender, task_id)
        pass
@task_success.connect
def handle_task_success(sender: Task, result, **kwargs):
# 处理任务成功完成的逻辑
    print("handle_task_success!", sender, result)
    pass
if __name__ == '__main__':
    pass
#  D:code_loaclmm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet
# celery -A src.tasks.app worker --loglevel=info -P eventlet
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559
#     celery -A tasks worker --loglevel=info  -P eventlet
# 启动celery监控和管理Flower:
# celery -A src.tasks.app flower --address=127.0.0.1 --port=5555定义实例配置信息:
class ProductionConfig(Config):
DEBUG = False
@Modify Time      @Author    @Version    @Desciption
------------      -------    --------    -----------
2023/6/16 11:25   小钟同学      1.0         None
'''
from kombu import Queue, Exchange
class Config:
# broker = 'redis://127.0.0.1:6379/1'  # 任务储存
# backend = 'redis://127.0.0.1:6379/2'  # 结果存储,执行完之后结果放在这
    BROKER_URL = 'redis://127.0.0.1:6379/1'
    CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
    CELERY_TIMEZONE = 'Asia/Shanghai'
    CELERY_ENABLE_UTC = True
# 全局的任务过期时间
# app.conf.update(result_expires=60)
# 定义队列名称
    CELERY_QUEUES = (
        Queue('mm_ring_v2'),
    )
# CELERY_QUEUES = (
#     Queue('default', exchange=Exchange('default'), routing_key='default'),
#     Queue('app_task1', exchange=Exchange('app_task1'), routing_key='app_task1'),
#     Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'),
# )
# 指定任务走什么对了和routing_key
# CELERY_ROUTES = {
#     'src.tasks.task.migu_order_sync_status': {'queue': 'app_task1', 'routing_key': 'app_task1'},
#     'celery_app.task.task2': {'queue': 'app_task2', 'routing_key': 'app_task2'}
# }
# 指定需要加载的任务
# 指定要导入的任务模块或任务文件列表
# celery_app.conf.update(
#     include=[
#         'src.tasks.task.migu_order_sync_status',
#     ]
# )
# 配置需要执行的任务所在的目录
    CELERY_INCLUDE = [
        'src.tasks.task.migu_order_sync_status',
    ]
class DevelopmentConfig(Config):
    DEBUG = True
class ProductionConfig(Config):
    DEBUG = False2:定义任务:
import datetime
import signal
from celery import current_app
from celery.result import AsyncResultfrom pydantic import BaseModeldef exponential_backoff(retries):
    return 2 ** retries# 定义延迟执行的任务
# expires=3600 将任务结果的存储有效期设置为 1 小时。
@celery_app.task(bind=True, max_retries=5, default_retry_delay=1, retry_backoff=exponential_backoff(2), expires=3660)
def action_migu_order_sync_status(self, orderid, mobile):# self.request.retries 表示当前重试的次数
    ······需要说明一点是,@celery_app.task:这是一个装饰器,用于将一个普通的Python函数注册为Celery任务。celery_app 是你的 Celery 应用实例,task 是 Celery 提供的装饰器函数。且这个的任务相关参数项说明如下
- bind=True:这个参数用于指定任务函数的第一个参数(通常被命名为self)将会绑定到任务实例上,使得在任务函数内部能够通过self访问任务实例的属性和方法。
- max_retries=5:指定任务在发生错误时最多重试的次数。如果任务执行失败,Celery 将会自动重试该任务,最多重试次数由该参数指定。
- default_retry_delay=1:指定任务在发生错误后进行重试之前的默认延迟时间,单位为秒。也就是说,在每次重试之间等待的秒数。
- retry_backoff=exponential_backoff(2):这个参数用于指定重试延迟时间的增长规律。exponential_backoff 是一个用于指数级增长延迟时间的函数,其中的 2 表示指数底数。也就是说,重试的延迟时间会按照指数级增长。
- expires=3660:指定任务的过期时间,单位为秒。如果任务在指定的时间内没有被执行,将会被标记为过期并丢弃。
3:在API接口发布任务
@router.get("/put/code", summary="订单提交")
def callback(*, forms: PutCodeForm = Depends()):
    .....
# 发布任务
    result = action_migu_order_sync_status.apply_async(args=(Orders.orderid, Orders.mobile), countdown=60, retry=5,eta=eta, expires=expires,queue='mm_ring_v2')    if result == -1:
        return Fail(message='提交失败')    return Success(message='提交成功')各个个参数项说明如下:
- args=(Orders.orderid, Orders.mobile):这个参数用于指定要传递给 Celery 任务的位置参数,即任务函数在执行时所需的参数值。在这个例子中,任务函数需要两个参数,分别对应- Orders.orderid和- Orders.mobile。
- countdown=60:指定任务在被放入队列之后需要延迟多少秒才开始执行。在这个例子中,任务会在被加入队列后延迟 60 秒后开始执行。
- retry=5:指定任务在发生错误时最多重试的次数。与之前提到的- max_retries类似,但这里是针对单次任务调用的重试次数。
- eta=eta:这个参数用于指定任务的预计执行时间。通常用于将任务调度到未来的某个时间点执行,而不是立即执行。
- expires=expires:同样的意义,指定任务的过期时间,单位为秒。如果任务在指定的时间内没有被执行,将会被标记为过期并丢弃。
- queue='mm_ring_v2':指定任务被发送到的队列名称。Celery 支持将任务发送到不同的队列中,以便进行任务的分类和分配。
综合起来,这些参数的设置使得对该 Celery 任务的调用具有了一定的灵活性。你可以控制任务的延迟执行、重试次数、预计执行时间以及过期时间,并且可以指定任务发送到哪个队列中。
4:启动消费者
if __name__ == '__main__':
    pass
#  D:code_loaclmm_ring_v2> celery -A src.tasks.app worker -n migutasks --loglevel=info -P eventlet# celery -A src.tasks.app worker --loglevel=info -P eventlet# celery -A src.tasks.app flower --address=127.0.0.1 --port=5559#     celery -A tasks worker --loglevel=info  -P eventlet5:启动celery监控和管理Flower

本文章转载微信公众号@程序员小钟同学
热门API
- 1. AI文本生成
- 2. AI图片生成_文生图
- 3. AI图片生成_图生图
- 4. AI图像编辑
- 5. AI视频生成_文生视频
- 6. AI视频生成_图生视频
- 7. AI语音合成_文生语音
- 8. AI文本生成(中国)
最新文章
- 如何使用API密钥实现API认证 | 作者:Jaz Allibhai
- 9个最佳Text2Sql开源项目:自然语言到SQL的高效转换工具
- 深入解析API网关策略:认证、授权、安全、流量处理与可观测性
- GraphQL API手册:如何构建、测试、使用和记录
- 自助式入职培训服务API:如何让企业管理更上一层楼?
- Python如何调用Jenkins API自动化发布
- 模型压缩四剑客:量化、剪枝、蒸馏、二值化
- 火山引擎如何接入API:从入门到实践的技术指南
- 为什么每个使用 API 的大型企业都需要一个 API 市场来增强其合作伙伴生态系统
- 构建更优质的API:2025年顶级API开发工具推荐 – Strapi
- 外部函数与内存API – Java 22 – 未记录
- FAPI 2.0 深度解析:下一代金融级 API 安全标准与实践指南