# 第三章：流式协议 -- AI 是如何"逐字吐出"回答的

> 对应源文件：`packages/ai/src/types.ts`（AssistantMessageEvent）、`packages/ai/src/utils/event-stream.ts`

## 为什么需要流式传输

当你用 ChatGPT 聊天时，你会看到文字一个一个跳出来，而不是等很久然后一整段话突然出现。这就是**流式传输**（Streaming）。

为什么不一次性返回完整结果？两个原因：

1. **用户体验**：LLM 生成一段话可能需要 5-30 秒。如果等它全部生成完再显示，用户会以为程序卡死了。流式传输让用户立刻看到输出开始了。

2. **即时响应**：在 Agent 场景中，你可能需要在 AI 还在"说"的过程中就做出反应。比如 AI 正在生成一个工具调用的参数，你可以提前显示"正在写入 xxx.ts..."的提示。

## 事件流协议（Event Stream Protocol）

pi-ai 定义了一套统一的事件类型，无论背后是哪个 provider，你收到的事件序列格式都一样。

### 事件类型一览

| 事件 | 何时触发 | 携带的关键信息 |
|------|----------|----------------|
| `start` | 流开始 | `partial`：初始的空 AssistantMessage 结构 |
| `text_start` | 一段文本开始 | `contentIndex`：在 content 数组中的位置 |
| `text_delta` | 收到一小块文字 | `delta`：新增的文字片段 |
| `text_end` | 一段文本结束 | `content`：完整的文本内容 |
| `thinking_start` | 思考开始 | |
| `thinking_delta` | 收到一小块思考内容 | `delta`：新增的思考片段 |
| `thinking_end` | 思考结束 | |
| `toolcall_start` | 工具调用开始 | |
| `toolcall_delta` | 工具调用参数逐步到达 | `delta`：JSON 片段 |
| `toolcall_end` | 工具调用完成 | `toolCall`：完整的工具调用信息 |
| `done` | 成功结束 | `reason` + `message`：最终完整消息 |
| `error` | 出错或被取消 | `reason` + `error`：包含部分内容的消息 |

### 一次典型的事件序列

假设 AI 先思考，然后输出一段文字：

```
1. start          → 流开始了
2. thinking_start → AI 开始思考
3. thinking_delta → "让我"
4. thinking_delta → "分析"
5. thinking_delta → "一下这个问题..."
6. thinking_end   → 思考完成
7. text_start     → 文本输出开始
8. text_delta     → "这是一个"
9. text_delta     → "快速排序"
10. text_delta    → "的实现..."
11. text_end      → 文本完成
12. done          → 全部结束，reason: "stop"
```

如果 AI 调用了工具：

```
1. start             → 流开始
2. text_start        → 文本开始
3. text_delta        → "我来读取这个文件"
4. text_end          → 文本完成
5. toolcall_start    → 工具调用开始
6. toolcall_delta    → '{"path":'
7. toolcall_delta    → '"src/main'
8. toolcall_delta    → '.ts"}'
9. toolcall_end      → 工具调用完成，toolCall: { name: "read", arguments: { path: "src/main.ts" } }
10. done             → 结束，reason: "toolUse"（注意：不是 "stop"，因为需要等工具结果）
```

### 错误场景

```
1. start       → 流开始
2. text_start  → 文本开始
3. text_delta  → "这是一个"
4. text_delta  → "关于..."
5. error       → 出错了！reason: "error", error 中包含已收到的部分内容
```

注意：**error 事件不会丢失已收到的数据**。error 中的 `AssistantMessage` 包含了所有已经收到的内容块，你可以选择保留它们。

## EventStream -- 异步事件队列

如何把上面的事件传递给调用者？pi-ai 使用了一个叫 `EventStream` 的类。

### 什么是异步迭代器

JavaScript 有一个叫 `for await...of` 的语法，可以逐个处理异步到达的数据：

```javascript
for await (const event of stream) {
  console.log(event.type);    // 每收到一个事件就执行一次
}
// 所有事件都处理完后，循环自然结束
```

`EventStream` 就是实现了这个接口的类。它内部有一个队列：

- **生产者侧**（provider 适配器）：调用 `stream.push(event)` 往队列里放事件
- **消费者侧**（你的代码）：通过 `for await...of` 从队列里取事件

```
Provider 适配器                    你的代码
     │                               │
     │ push(text_delta)              │
     │───────────────►  [队列]  ───► │ for await (event of stream)
     │ push(text_delta)              │   → 处理 text_delta
     │───────────────►  [队列]  ───► │   → 处理 text_delta
     │ push(done)                    │
     │───────────────►  [队列]  ───► │   → 处理 done
     │                               │ 循环结束
```

### result() -- 获取最终结果

除了逐事件处理，你也可以直接拿到最终的完整 `AssistantMessage`：

```javascript
const stream = stream(model, context);

// 方式 1：逐事件处理
for await (const event of stream) { ... }

// 方式 2：直接拿最终结果（内部消费完所有事件）
const finalMessage = await stream.result();
```

`result()` 返回一个 Promise，在 `done` 或 `error` 事件到达时 resolve。

### 什么是 AssistantMessageEventStream

它就是一个预配置好的 `EventStream`：

```javascript
class AssistantMessageEventStream extends EventStream<AssistantMessageEvent, AssistantMessage> {
  // 当事件是 done 或 error 时，认为流结束了
  // 从 done 事件中提取最终的 AssistantMessage 作为 result
}
```

## stream() vs complete() -- 四个入口函数

`packages/ai/src/stream.ts` 导出了四个函数：

| 函数 | 流式？ | 选项级别 | 用途 |
|------|--------|----------|------|
| `stream()` | 是 | Provider 特定选项 | 需要逐事件处理 + 精细控制 |
| `complete()` | 否 | Provider 特定选项 | 只要最终结果 |
| `streamSimple()` | 是 | 统一简化选项 | 需要逐事件处理 + 简单控制 |
| `completeSimple()` | 否 | 统一简化选项 | 只要最终结果 + 简单控制 |

`stream` / `streamSimple` 的区别：

- `stream()` 接受每个 provider 自己的特定选项。比如 Anthropic 有 `thinkingBudgetTokens`，OpenAI 有 `reasoningEffort`
- `streamSimple()` 接受统一的 `SimpleStreamOptions`，用 `reasoning: "medium"` 一个参数控制所有 provider 的思考行为

`complete()` 只是 `stream()` 的封装：启动流，然后等待 `result()`：

```javascript
// complete() 的实现就是这么简单
function complete(model, context, options) {
  const s = stream(model, context, options);
  return s.result();  // 等待流结束，返回最终消息
}
```

在 Agent 场景中，通常使用 `streamSimple()`，因为：
1. Agent 需要流式显示 AI 的输出（用户体验）
2. Agent 不需要精细控制每个 provider 的特有参数（用统一选项更方便）

---

## 关键设计决策：错误不抛异常

传统的网络请求：
```javascript
try {
  const result = await fetch(url);
} catch (error) {
  // 错误通过异常传播
}
```

pi-ai 的流式请求：
```javascript
for await (const event of stream) {
  if (event.type === "error") {
    // 错误通过事件传播
    // event.error 包含了已收到的部分内容
    // 不会丢失任何数据
  }
}
```

为什么这样设计？

1. 流式场景下，你可能已经处理了一半的事件（比如已经向用户显示了部分文字），如果突然抛异常，之前的处理逻辑无法优雅地收尾
2. 错误消息本身也是一个 `AssistantMessage`（只是 `stopReason` 是 `"error"` 或 `"aborted"`），它包含了所有已收到的部分数据，你可以把它保存到对话历史中

---

## 小结

```
stream(model, context)  →  AssistantMessageEventStream  →  for await (event of ...)
                                                             │
                                 ┌───────────────────────────┤
                                 │                           │
                              push()                    [async iterator]
                          (provider 写入)               (你的代码消费)
                                 │                           │
                          start, text_delta,           switch(event.type)
                          toolcall_end, done              处理每个事件
```

下一章我们看 API 注册表和 provider 路由机制 -- 即 `stream()` 是如何找到正确的 provider 适配器的。
