目录

fastapi-celery-flower-rabbitmq-redis-可运行demo

fastapi celery flower rabbitmq redis 可运行demo

资料

过程

开发环境

windows, 已经安装好的 rabbitmq

目录结构

/project_root │ ├── /app │ ├── init.py │ ├── main.py │ ├── tasks.py │ ├── celery_config.py │ └── worker.py │ ├── /logs(非必须) ├── /venv(非必须) ├── requirements.txt(非必须) └── README.md(非必须)

依赖的三方库

amqp==5.3.1 annotated-types==0.7.0 anyio==4.8.0 billiard==4.2.1 celery==5.4.0 click==8.1.8 click-didyoumean==0.3.1 click-plugins==1.1.1 click-repl==0.3.0 colorama==0.4.6 dnspython==2.7.0 eventlet==0.39.1 fastapi==0.115.11 flower==2.0.1 greenlet==3.1.1 h11==0.14.0 humanize==4.12.1 idna==3.10 kombu==5.4.2 pip==25.0.1 prometheus_client==0.21.1 prompt_toolkit==3.0.50 pydantic==2.10.6 pydantic_core==2.27.2 python-dateutil==2.9.0.post0 pytz==2025.1 six==1.17.0 sniffio==1.3.1 starlette==0.46.1 tornado==6.4.2 typing_extensions==4.12.2 tzdata==2025.1 uvicorn==0.34.0 vine==5.1.0 wcwidth==0.2.13

文件内容

celery_config.py from celery import Celery

使用RabbitMQ作为消息中间件

celery_app = Celery(’tasks’, broker=“amqp://《username》:《password》@《hostname》:《port》//”)

配置Celery

celery_app.conf.update( imports=[“app.tasks”], result_backend=‘rpc://’, # 使用RPC作为结果后端 task_serializer=‘json’, # 任务数据使用json格式 )

main.py from fastapi import FastAPI from .tasks import add app = FastAPI() @app.get("/") def read_root(): return {“message”: “Hello World”} @app.post("/add_task/") def create_task(a: int, b: int): task = add.delay(a, b) # 调用Celery任务 return {“task_id”: task.id} tasks.py from celery import Celery

引入 Celery 配置

from .celery_config import celery_app

定义一个简单的任务

@celery_app.task def add(a, b): return a + b

worker.py from .celery_config import celery_app if name == “main”: celery_app.start()

运行

fastapi uvicorn main:app –host 0.0.0.0 –port 9000 celery celery -A app.worker worker -l info -P solo -c 2 flower celery -A app.worker flower

测试

打开网址

  • 你的 rabbitmq 面板
  • 点点就可以了

踩坑

无法运行 celery 命令

在写 demo 的 时候 为了便捷运行 , 笔者 将 fastapi,celery,flower 的运行 命令写在了三个 脚本文件里面 celery.cmd , runfastapi.bat , flower.cmd 运行时只有 runfastapi.bat 正确运行,其他两个脚本在控制台 无限重复运行,手动在控制台输入命令即可。【应该是与文件名有关 -_-!】

运行 celery 后电脑卡顿

与celery的命令参数有关 添加 -c 2 就好了。

无法运行 worker.py

不能直接运行 worker.py 文件,这个需要配合 celery 来运行滴

创建了许多 随机队列

查看这些随机队列,会自动删除,周期是 24h Features| | x-expires:| 86400000 —|— auto-delete:| true