0%

持久化 OpenAI API 流式响应的工程实践

(由DeepSeek R1辅助编写)

问题背景

挑战分析

在实现基于 OpenAI 流式 API 的对话系统时,面临两个核心挑战:

  1. 长时操作风险:openai接口生成长文本需要 10-30 秒,客户端网络波动可能导致连接中断
  2. 数据一致性要求:用户发送的消息与 AI 的完整响应必须保证原子性持久化

传统同步处理模式存在致命缺陷,在客户端连接中断时会导致消息丢失:

1
2
3
4
5
6
7
8
9
10
sequenceDiagram
participant Client
participant Server
participant OpenAI

Client->>Server: POST /chat
Server->>OpenAI: 流式请求
OpenAI-->>Server: 数据流
Server-->>Client: 流式响应
Note over Client,Server: 若此时客户端断开连接<br/>未持久化的数据将丢失

架构设计

使用异步 Worker 执行实际请求并通过消息队列通信

方案描述

该方案通过异步任务和消息队列实现了 OpenAI API 流式响应的持久化和可靠传输。具体步骤如下:

  1. 客户端请求:客户端发送 POST 请求到 Web 服务器,创建对话任务。
  2. 异步任务:Web 服务器将任务委派给 异步 Worker,立即返回任务 ID 给客户端。
  3. 流式处理:Worker 向 OpenAI 发起流式请求,实时处理响应数据。
  4. 数据持久化:每个响应片段同时写入数据库和 Redis 消息队列,确保数据安全。
  5. 消息推送:Web 服务器通过 SSE 将消息队列中的数据推送给客户端,实现流式响应。
  6. 断连恢复:客户端可根据任务 ID 重新连接,获取未接收的历史消息。

该方案解决了长时操作风险和数据一致性问题,确保在客户端断连情况下数据不丢失,并支持断线重连和消息恢复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
sequenceDiagram
participant Client
participant WebServer
participant MessageQueue
participant Worker
participant OpenAI
participant Database

Client->>WebServer: POST /chat
WebServer->>Worker: 创建异步任务
Worker-->>WebServer: 返回任务ID
WebServer-->>Client: 返回任务ID (202 Accepted)

Client->>WebServer: GET /stream/<task_id>
WebServer->>MessageQueue: 订阅任务通道

Worker->>OpenAI: 发起流式请求
loop 流式响应处理
OpenAI-->>Worker: 数据片段
Worker->>Database: 实时持久化
Worker->>MessageQueue: 发布片段
MessageQueue-->>WebServer: 推送消息
WebServer-->>Client: SSE 推送
end
OpenAI-->>Worker: 消息结束
WebServer->>Client: 消息结束

关键技术决策

  1. 双写一致性:消息队列推送与数据库写入保持原子性
  2. 断连恢复机制:客户端根据任务 ID 重新连接后可获取历史消息
  3. 背压控制:通过 Redis 的 Stream 数据类型实现流量控制

Python 实现示例

技术栈

  • Web 框架:Flask
  • 异步任务:Celery
  • 消息队列:Redis Streams
  • ORM:SQLAlchemy

数据库模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Conversation(Base):
__tablename__ = 'conversations'
id = Column(String(36), primary_key=True)
user_id = Column(Integer)
content = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)

class ResponseChunk(Base):
__tablename__ = 'response_chunks'
id = Column(Integer, primary_key=True)
task_id = Column(String(36))
content = Column(Text)
sequence = Column(Integer)
timestamp = Column(DateTime, default=datetime.utcnow)

Celery 任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@app.task(bind=True)
def process_streaming_response(self, prompt, user_id):
conv_id = str(uuid4())
save_initial_conversation(conv_id, user_id, prompt) # 持久化用户输入

response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)

redis = get_redis()
sequence = 0
full_response = []

for chunk in response:
content = chunk.choices[0].delta.get('content', '')
if content:
full_response.append(content)
sequence += 1

# 原子化双写
with db.session.begin_nested():
chunk = ResponseChunk(
task_id=self.request.id,
content=content,
sequence=sequence
)
db.session.add(chunk)

redis.xadd(
f"task:{self.request.id}",
{"content": content},
maxlen=100
)

Conversation.query.filter_by(id=conv_id).update(
{"content": ''.join(full_response)}
)
db.session.commit()

流式端点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@app.route('/stream/<task_id>')
def stream_response(task_id):
def generate():
redis = get_redis()
last_id = '0'

# 读取历史未确认消息
while True:
messages = redis.xrange(f"task:{task_id}", min=last_id, count=10)
if not messages:
break

for msg_id, data in messages:
last_id = msg_id
yield f"data: {data['content']}\n\n"

# 监听新消息
while True:
try:
messages = redis.xread(
{f"task:{task_id}": last_id},
count=1, block=5000
)
if messages:
_, msg_data = messages[0]
for msg_id, data in msg_data:
last_id = msg_id
yield f"data: {data['content']}\n\n"
except Timeout:
continue

return Response(generate(), mimetype='text/event-stream')

优化方向

生产级增强

  1. 消息压缩:对高频小数据包进行 Buffer 聚合
  2. 重试策略:实现指数退避的 OpenAI API 重试机制
  3. 监控埋点
    1
    2
    3
    4
    5
    6
    7
    @app.task.after_connect
    def setup_monitoring(sender, **kwargs):
    from prometheus_client import Counter
    sender.TASK_STREAM_CHUNKS = Counter(
    'task_stream_chunks',
    'Number of processed stream chunks'
    )

客户端示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class ChatStream {
constructor(taskId) {
this.taskId = taskId;
this.es = null;
this.reconnectDelay = 1000;
}

connect() {
this.es = new EventSource(`/stream/${this.taskId}`);

this.es.onmessage = (e) => {
this.reconnectDelay = 1000;
this.onData(e.data);
};

this.es.onerror = () => {
this.es.close();
setTimeout(() => this.connect(), this.reconnectDelay);
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 10000);
};
}

onData(data) {
const el = document.getElementById("response");
el.innerHTML += data;
}
}

总结模式

该方案实现了三个关键能力:

  1. 异步解耦:通过 Celery Worker 将耗时操作移出请求生命周期
  2. 数据安全:双重持久化机制(数据库+消息队列)确保数据不丢失
  3. 弹性通信:基于 Redis Stream 的消息通道支持断线自动恢复

实际部署时需要特别注意:

  • 设置合理的 Redis Stream TTL
  • 监控 Celery 任务积压情况
  • 对 XRead 使用适当的阻塞超时时间(建议 5-10 秒)
扫码加入技术交流群🖱️
QR code