
为什么需要流水线
一个 app.py 调 OpenAI 能跑 demo,但生产会遇到:
- 上传 100MB PDF 阻塞事件循环;
- 并发突刺打满 API 配额;
- 失败无重试、无 dead letter。
流水线 = 阶段解耦 + 异步 + 可观测。

RAG 四阶段
- Ingest:解析、清洗、切块(chunk size 与 overlap 要可配置);
- Embed:批处理写入向量库;
- Retrieve:top-k + 可选 rerank;
- Generate:拼 prompt → LLM → 引用溯源。
每阶段独立扩缩容:embedding 可 GPU Worker,retrieve 可 CPU 密集。
异步任务模型

- Broker:Redis / RabbitMQ;
- Worker:Celery / ARQ / Dramatiq;
- 结果后端:Redis / PostgreSQL(job 状态)。
API 只负责 校验 + 入队 + 返回 jobId,长活交给 Worker。
模型网关(One API / 自建)
统一 OPENAI_BASE_URL 的好处:
- 多模型路由与 fallback;
- 统一审计与限流;
- 密钥不散落在各 Worker。
本博客后端已采用此模式(见 .env.example OPENAI_BASE_URL)。
批处理与缓存
- 相同文档 re-ingest 前 hash 去重;
- 热门 query 结果短缓存(注意 TTL 与权限);
- Embedding 批大小按供应商限制调优。
小结
Python AI 架构画流水线:ingress 轻、worker 重、网关统一、队列缓冲。这样图片生成(如本文封面)也可走同一套「提交 → 轮询 → 存 CDN」模式。
作者结合博客
image-pipeline脚本与 Agent 模块实践。