第六章:端到端流程 -- 一次完整的 LLM 调用
全景图
把前五章的内容串起来,我们用一个具体场景走一遍完整流程。
场景
用户输入:"帮我读取 src/main.ts 的内容"
准备阶段
import { getModel, streamSimple } from "@mariozechner/pi-ai";
// 1. 获取模型
const model = getModel("anthropic", "claude-sonnet-4-20250514");
// → 从模型注册表中查到 Model 对象,包含 api:"anthropic-messages", baseUrl, cost 等
// 2. 构建上下文
const context = {
systemPrompt: "你是一个编码助手,可以使用 read 工具读取文件...",
messages: [
{ role: "user", content: "帮我读取 src/main.ts 的内容", timestamp: Date.now() }
],
tools: [{
name: "read",
description: "读取文件内容",
parameters: Type.Object({
path: Type.String({ description: "文件路径" })
})
}]
};
// 3. 发起流式请求
const s = streamSimple(model, context, { reasoning: "medium" });
执行阶段
streamSimple(model, context, { reasoning: "medium" })
│
▼
stream.ts:
│ resolveApiProvider("anthropic-messages")
│ → 在 API 注册表中找到懒代理
│
▼
register-builtins.ts (懒代理):
│ 创建 outer EventStream
│ import("./anthropic.js") ← 首次使用时加载
│ 调用 streamSimpleAnthropic(model, context, { reasoning: "medium" })
│
▼
anthropic.ts:
│
│ ① reasoning 映射
│ "medium" → { thinkingEnabled: true, thinkingBudgetTokens: 8192 }
│
│ ② 消息转换 (transformMessages)
│ - 检查模型支持图片 → 无需降级
│ - 无历史 assistant 消息 → 无需处理 thinking 块
│
│ ③ 构建 Anthropic API 请求体
│ POST https://api.anthropic.com/v1/messages
│ {
│ "model": "claude-sonnet-4-20250514",
│ "system": [{ "type": "text", "text": "你是一个编码助手..." }],
│ "messages": [{ "role": "user", "content": "帮我读取 src/main.ts..." }],
│ "tools": [{ "name": "read", "input_schema": { ... } }],
│ "max_tokens": 40192, // 32000 (base) + 8192 (thinking budget)
│ "thinking": { "type": "enabled", "budget_tokens": 8192 },
│ "stream": true
│ }
│
│ ④ 发送 HTTP 请求,开始接收 SSE 响应
▼
响应解析阶段
Anthropic 服务器开始返回 SSE 事件流:
SSE 响应 pi-ai 事件
────────────────── ──────────────────
message_start → start { partial: AssistantMessage }
(包含 model, usage 初始值)
content_block_start → thinking_start
(type: "thinking")
content_block_delta → thinking_delta { delta: "用户想" }
(delta.thinking: "用户想")
content_block_delta → thinking_delta { delta: "读取文件..." }
(delta.thinking: "读取文件...")
content_block_stop → thinking_end { content: "用户想读取文件..." }
content_block_start → toolcall_start
(type: "tool_use", name: "read")
content_block_delta → toolcall_delta { delta: '{"path":' }
(delta: '{"path":')
content_block_delta → toolcall_delta { delta: '"src/main.ts"}' }
(delta: '"src/main.ts"}')
content_block_stop → toolcall_end { toolCall: { name:"read", arguments:{path:"src/main.ts"} } }
message_delta → (更新 stopReason 和 usage)
(stop_reason: "tool_use",
usage: { input: 1200, output: 300 })
message_stop → done { reason: "toolUse", message: 完整的 AssistantMessage }
消费阶段
你的代码通过 for await 接收这些事件:
for await (const event of s) {
switch (event.type) {
case "thinking_delta":
// 显示思考过程(可选)
break;
case "toolcall_end":
// AI 想调用 read 工具!
console.log(`工具调用: ${event.toolCall.name}(${JSON.stringify(event.toolCall.arguments)})`);
// → "工具调用: read({"path":"src/main.ts"})"
break;
case "done":
console.log(`结束原因: ${event.reason}`);
// → "结束原因: toolUse"
break;
}
}
// 获取完整的 AssistantMessage
const assistantMsg = await s.result();
// assistantMsg.content = [
// { type: "thinking", thinking: "用户想读取文件..." },
// { type: "toolCall", id: "tc_1", name: "read", arguments: { path: "src/main.ts" } }
// ]
// assistantMsg.stopReason = "toolUse"
// assistantMsg.usage = { input: 1200, output: 300, ... }
工具执行阶段(由上层 Agent 完成)
// 把 assistant 消息加入上下文
context.messages.push(assistantMsg);
// 执行工具
const fileContent = fs.readFileSync("src/main.ts", "utf-8");
// 把工具结果加入上下文
context.messages.push({
role: "toolResult",
toolCallId: "tc_1",
toolName: "read",
content: [{ type: "text", text: fileContent }],
isError: false,
timestamp: Date.now()
});
// 继续调用 LLM(让它根据文件内容回答用户)
const s2 = streamSimple(model, context, { reasoning: "medium" });
// 这次 LLM 看到了文件内容,会生成一个 text 回复
// stopReason 将是 "stop"(不再需要调用工具)
关键要点回顾
| 层次 | 职责 | 关键概念 |
|---|---|---|
| 类型系统 | 定义统一词汇 | Model, Context, Message, Tool, AssistantMessageEvent |
| 入口函数 | 路由到 provider | stream(), complete(), streamSimple(), completeSimple() |
| API 注册表 | 协议 → 适配器映射 | registerApiProvider(), getApiProvider() |
| 懒加载 | 按需加载 provider | createLazyStream(), forwardStream() |
| 消息转换 | 跨 provider 兼容 | transformMessages(), 图片降级, thinking 块, ID 标准化 |
| Provider 适配器 | 双向翻译 | 统一类型 ↔ 原生 API 格式 |
| 事件流 | 异步数据传递 | EventStream, push(), for await...of, result() |
第一阶段结束
到这里,你应该对 packages/ai 包有了完整的理解:
- 它解决的问题(多 provider 统一)
- 核心类型系统(对话的构建块)
- 流式协议(AI 如何"逐字吐出"回答)
- 路由和注册(请求如何找到正确的 provider)
- Provider 适配器(双向翻译的实现)
- 端到端流程(从 stream() 到收到事件)
下一步:进入第二阶段 packages/agent,学习 Agent 运行时如何在 pi-ai 之上构建完整的 tool-call loop(工具调用循环) 和 状态管理。pi-ai 只管"发请求、收响应",而 agent 包负责"循环调用、执行工具、管理上下文"。
推荐的源码阅读顺序
如果你想在理解原理后深入代码,建议按这个顺序读源码:
- types.ts -- 所有类型定义(452 行,不长)
- utils/event-stream.ts -- EventStream 实现(88 行,很短)
- stream.ts -- 四个入口函数(60 行,极短)
- api-registry.ts -- 注册表(99 行,很短)
- models.ts -- 模型查询(90 行,很短)
- providers/register-builtins.ts -- 懒加载注册(434 行,重复模式多)
- providers/transform-messages.ts -- 消息转换(221 行)
- providers/faux.ts -- 假 provider(理解事件协议的完整契约)
- 选读一个真实适配器:providers/anthropic.ts
前 5 个文件加起来不到 800 行,是理解整个包架构的核心。