第三章:流式协议 -- AI 是如何"逐字吐出"回答的

对应源文件:packages/ai/src/types.ts(AssistantMessageEvent)、packages/ai/src/utils/event-stream.ts

为什么需要流式传输

当你用 ChatGPT 聊天时,你会看到文字一个一个跳出来,而不是等很久然后一整段话突然出现。这就是流式传输(Streaming)。

为什么不一次性返回完整结果?两个原因:

  1. 用户体验:LLM 生成一段话可能需要 5-30 秒。如果等它全部生成完再显示,用户会以为程序卡死了。流式传输让用户立刻看到输出开始了。

  2. 即时响应:在 Agent 场景中,你可能需要在 AI 还在"说"的过程中就做出反应。比如 AI 正在生成一个工具调用的参数,你可以提前显示"正在写入 xxx.ts..."的提示。

事件流协议(Event Stream Protocol)

pi-ai 定义了一套统一的事件类型,无论背后是哪个 provider,你收到的事件序列格式都一样。

事件类型一览

事件何时触发携带的关键信息
start流开始partial:初始的空 AssistantMessage 结构
text_start一段文本开始contentIndex:在 content 数组中的位置
text_delta收到一小块文字delta:新增的文字片段
text_end一段文本结束content:完整的文本内容
thinking_start思考开始
thinking_delta收到一小块思考内容delta:新增的思考片段
thinking_end思考结束
toolcall_start工具调用开始
toolcall_delta工具调用参数逐步到达delta:JSON 片段
toolcall_end工具调用完成toolCall:完整的工具调用信息
done成功结束reason + message:最终完整消息
error出错或被取消reason + error:包含部分内容的消息

一次典型的事件序列

假设 AI 先思考,然后输出一段文字:

1. start          → 流开始了
2. thinking_start → AI 开始思考
3. thinking_delta → "让我"
4. thinking_delta → "分析"
5. thinking_delta → "一下这个问题..."
6. thinking_end   → 思考完成
7. text_start     → 文本输出开始
8. text_delta     → "这是一个"
9. text_delta     → "快速排序"
10. text_delta    → "的实现..."
11. text_end      → 文本完成
12. done          → 全部结束,reason: "stop"

如果 AI 调用了工具:

1. start             → 流开始
2. text_start        → 文本开始
3. text_delta        → "我来读取这个文件"
4. text_end          → 文本完成
5. toolcall_start    → 工具调用开始
6. toolcall_delta    → '{"path":'
7. toolcall_delta    → '"src/main'
8. toolcall_delta    → '.ts"}'
9. toolcall_end      → 工具调用完成,toolCall: { name: "read", arguments: { path: "src/main.ts" } }
10. done             → 结束,reason: "toolUse"(注意:不是 "stop",因为需要等工具结果)

错误场景

1. start       → 流开始
2. text_start  → 文本开始
3. text_delta  → "这是一个"
4. text_delta  → "关于..."
5. error       → 出错了!reason: "error", error 中包含已收到的部分内容

注意:error 事件不会丢失已收到的数据。error 中的 AssistantMessage 包含了所有已经收到的内容块,你可以选择保留它们。

EventStream -- 异步事件队列

如何把上面的事件传递给调用者?pi-ai 使用了一个叫 EventStream 的类。

什么是异步迭代器

JavaScript 有一个叫 for await...of 的语法,可以逐个处理异步到达的数据:

for await (const event of stream) {
  console.log(event.type);    // 每收到一个事件就执行一次
}
// 所有事件都处理完后,循环自然结束

EventStream 就是实现了这个接口的类。它内部有一个队列:

  • 生产者侧(provider 适配器):调用 stream.push(event) 往队列里放事件
  • 消费者侧(你的代码):通过 for await...of 从队列里取事件
Provider 适配器                    你的代码
     │                               │
     │ push(text_delta)              │
     │───────────────►  [队列]  ───► │ for await (event of stream)
     │ push(text_delta)              │   → 处理 text_delta
     │───────────────►  [队列]  ───► │   → 处理 text_delta
     │ push(done)                    │
     │───────────────►  [队列]  ───► │   → 处理 done
     │                               │ 循环结束

result() -- 获取最终结果

除了逐事件处理,你也可以直接拿到最终的完整 AssistantMessage

const stream = stream(model, context);

// 方式 1:逐事件处理
for await (const event of stream) { ... }

// 方式 2:直接拿最终结果(内部消费完所有事件)
const finalMessage = await stream.result();

result() 返回一个 Promise,在 doneerror 事件到达时 resolve。

什么是 AssistantMessageEventStream

它就是一个预配置好的 EventStream

class AssistantMessageEventStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
  // 当事件是 done 或 error 时,认为流结束了
  // 从 done 事件中提取最终的 AssistantMessage 作为 result
}

stream() vs complete() -- 四个入口函数

packages/ai/src/stream.ts 导出了四个函数:

函数流式?选项级别用途
stream()Provider 特定选项需要逐事件处理 + 精细控制
complete()Provider 特定选项只要最终结果
streamSimple()统一简化选项需要逐事件处理 + 简单控制
completeSimple()统一简化选项只要最终结果 + 简单控制

stream / streamSimple 的区别:

  • stream() 接受每个 provider 自己的特定选项。比如 Anthropic 有 thinkingBudgetTokens,OpenAI 有 reasoningEffort
  • streamSimple() 接受统一的 SimpleStreamOptions,用 reasoning: "medium" 一个参数控制所有 provider 的思考行为

complete() 只是 stream() 的封装:启动流,然后等待 result()

// complete() 的实现就是这么简单
function complete(model, context, options) {
  const s = stream(model, context, options);
  return s.result();  // 等待流结束,返回最终消息
}

在 Agent 场景中,通常使用 streamSimple(),因为:

  1. Agent 需要流式显示 AI 的输出(用户体验)
  2. Agent 不需要精细控制每个 provider 的特有参数(用统一选项更方便)

关键设计决策:错误不抛异常

传统的网络请求:

try {
  const result = await fetch(url);
} catch (error) {
  // 错误通过异常传播
}

pi-ai 的流式请求:

for await (const event of stream) {
  if (event.type === "error") {
    // 错误通过事件传播
    // event.error 包含了已收到的部分内容
    // 不会丢失任何数据
  }
}

为什么这样设计?

  1. 流式场景下,你可能已经处理了一半的事件(比如已经向用户显示了部分文字),如果突然抛异常,之前的处理逻辑无法优雅地收尾
  2. 错误消息本身也是一个 AssistantMessage(只是 stopReason"error""aborted"),它包含了所有已收到的部分数据,你可以把它保存到对话历史中

小结

stream(model, context)  →  AssistantMessageEventStream  →  for await (event of ...)
                                                             │
                                 ┌───────────────────────────┤
                                 │                           │
                              push()                    [async iterator]
                          (provider 写入)               (你的代码消费)
                                 │                           │
                          start, text_delta,           switch(event.type)
                          toolcall_end, done              处理每个事件

下一章我们看 API 注册表和 provider 路由机制 -- 即 stream() 是如何找到正确的 provider 适配器的。