第三章:流式协议 -- AI 是如何"逐字吐出"回答的
对应源文件:
packages/ai/src/types.ts(AssistantMessageEvent)、packages/ai/src/utils/event-stream.ts
为什么需要流式传输
当你用 ChatGPT 聊天时,你会看到文字一个一个跳出来,而不是等很久然后一整段话突然出现。这就是流式传输(Streaming)。
为什么不一次性返回完整结果?两个原因:
-
用户体验:LLM 生成一段话可能需要 5-30 秒。如果等它全部生成完再显示,用户会以为程序卡死了。流式传输让用户立刻看到输出开始了。
-
即时响应:在 Agent 场景中,你可能需要在 AI 还在"说"的过程中就做出反应。比如 AI 正在生成一个工具调用的参数,你可以提前显示"正在写入 xxx.ts..."的提示。
事件流协议(Event Stream Protocol)
pi-ai 定义了一套统一的事件类型,无论背后是哪个 provider,你收到的事件序列格式都一样。
事件类型一览
| 事件 | 何时触发 | 携带的关键信息 |
|---|---|---|
start | 流开始 | partial:初始的空 AssistantMessage 结构 |
text_start | 一段文本开始 | contentIndex:在 content 数组中的位置 |
text_delta | 收到一小块文字 | delta:新增的文字片段 |
text_end | 一段文本结束 | content:完整的文本内容 |
thinking_start | 思考开始 | |
thinking_delta | 收到一小块思考内容 | delta:新增的思考片段 |
thinking_end | 思考结束 | |
toolcall_start | 工具调用开始 | |
toolcall_delta | 工具调用参数逐步到达 | delta:JSON 片段 |
toolcall_end | 工具调用完成 | toolCall:完整的工具调用信息 |
done | 成功结束 | reason + message:最终完整消息 |
error | 出错或被取消 | reason + error:包含部分内容的消息 |
一次典型的事件序列
假设 AI 先思考,然后输出一段文字:
1. start → 流开始了
2. thinking_start → AI 开始思考
3. thinking_delta → "让我"
4. thinking_delta → "分析"
5. thinking_delta → "一下这个问题..."
6. thinking_end → 思考完成
7. text_start → 文本输出开始
8. text_delta → "这是一个"
9. text_delta → "快速排序"
10. text_delta → "的实现..."
11. text_end → 文本完成
12. done → 全部结束,reason: "stop"
如果 AI 调用了工具:
1. start → 流开始
2. text_start → 文本开始
3. text_delta → "我来读取这个文件"
4. text_end → 文本完成
5. toolcall_start → 工具调用开始
6. toolcall_delta → '{"path":'
7. toolcall_delta → '"src/main'
8. toolcall_delta → '.ts"}'
9. toolcall_end → 工具调用完成,toolCall: { name: "read", arguments: { path: "src/main.ts" } }
10. done → 结束,reason: "toolUse"(注意:不是 "stop",因为需要等工具结果)
错误场景
1. start → 流开始
2. text_start → 文本开始
3. text_delta → "这是一个"
4. text_delta → "关于..."
5. error → 出错了!reason: "error", error 中包含已收到的部分内容
注意:error 事件不会丢失已收到的数据。error 中的 AssistantMessage 包含了所有已经收到的内容块,你可以选择保留它们。
EventStream -- 异步事件队列
如何把上面的事件传递给调用者?pi-ai 使用了一个叫 EventStream 的类。
什么是异步迭代器
JavaScript 有一个叫 for await...of 的语法,可以逐个处理异步到达的数据:
for await (const event of stream) {
console.log(event.type); // 每收到一个事件就执行一次
}
// 所有事件都处理完后,循环自然结束
EventStream 就是实现了这个接口的类。它内部有一个队列:
- 生产者侧(provider 适配器):调用
stream.push(event)往队列里放事件 - 消费者侧(你的代码):通过
for await...of从队列里取事件
Provider 适配器 你的代码
│ │
│ push(text_delta) │
│───────────────► [队列] ───► │ for await (event of stream)
│ push(text_delta) │ → 处理 text_delta
│───────────────► [队列] ───► │ → 处理 text_delta
│ push(done) │
│───────────────► [队列] ───► │ → 处理 done
│ │ 循环结束
result() -- 获取最终结果
除了逐事件处理,你也可以直接拿到最终的完整 AssistantMessage:
const stream = stream(model, context);
// 方式 1:逐事件处理
for await (const event of stream) { ... }
// 方式 2:直接拿最终结果(内部消费完所有事件)
const finalMessage = await stream.result();
result() 返回一个 Promise,在 done 或 error 事件到达时 resolve。
什么是 AssistantMessageEventStream
它就是一个预配置好的 EventStream:
class AssistantMessageEventStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
// 当事件是 done 或 error 时,认为流结束了
// 从 done 事件中提取最终的 AssistantMessage 作为 result
}
stream() vs complete() -- 四个入口函数
packages/ai/src/stream.ts 导出了四个函数:
| 函数 | 流式? | 选项级别 | 用途 |
|---|---|---|---|
stream() | 是 | Provider 特定选项 | 需要逐事件处理 + 精细控制 |
complete() | 否 | Provider 特定选项 | 只要最终结果 |
streamSimple() | 是 | 统一简化选项 | 需要逐事件处理 + 简单控制 |
completeSimple() | 否 | 统一简化选项 | 只要最终结果 + 简单控制 |
stream / streamSimple 的区别:
stream()接受每个 provider 自己的特定选项。比如 Anthropic 有thinkingBudgetTokens,OpenAI 有reasoningEffortstreamSimple()接受统一的SimpleStreamOptions,用reasoning: "medium"一个参数控制所有 provider 的思考行为
complete() 只是 stream() 的封装:启动流,然后等待 result():
// complete() 的实现就是这么简单
function complete(model, context, options) {
const s = stream(model, context, options);
return s.result(); // 等待流结束,返回最终消息
}
在 Agent 场景中,通常使用 streamSimple(),因为:
- Agent 需要流式显示 AI 的输出(用户体验)
- Agent 不需要精细控制每个 provider 的特有参数(用统一选项更方便)
关键设计决策:错误不抛异常
传统的网络请求:
try {
const result = await fetch(url);
} catch (error) {
// 错误通过异常传播
}
pi-ai 的流式请求:
for await (const event of stream) {
if (event.type === "error") {
// 错误通过事件传播
// event.error 包含了已收到的部分内容
// 不会丢失任何数据
}
}
为什么这样设计?
- 流式场景下,你可能已经处理了一半的事件(比如已经向用户显示了部分文字),如果突然抛异常,之前的处理逻辑无法优雅地收尾
- 错误消息本身也是一个
AssistantMessage(只是stopReason是"error"或"aborted"),它包含了所有已收到的部分数据,你可以把它保存到对话历史中
小结
stream(model, context) → AssistantMessageEventStream → for await (event of ...)
│
┌───────────────────────────┤
│ │
push() [async iterator]
(provider 写入) (你的代码消费)
│ │
start, text_delta, switch(event.type)
toolcall_end, done 处理每个事件
下一章我们看 API 注册表和 provider 路由机制 -- 即 stream() 是如何找到正确的 provider 适配器的。