第四章:Agent 类 -- 有状态的高层封装
对应源文件:
packages/agent/src/agent.ts
为什么需要 Agent 类
agent-loop.ts 提供的是纯函数 API:你传入上下文和配置,它运行循环。但在实际应用中,你需要:
- 管理消息历史(messages 数组随对话增长)
- 在 AI 工作时插入新消息(steering / follow-up)
- 取消正在进行的操作(abort)
- 监听事件更新 UI(subscribe)
- 等待 AI 空闲(waitForIdle)
Agent 类把这些需求封装起来,提供了一个简洁的 API。
Agent 的核心 API
创建
const agent = new Agent({
initialState: {
systemPrompt: "你是一个编码助手...",
model: getModel("anthropic", "claude-sonnet-4-20250514"),
thinkingLevel: "medium",
tools: [readTool, writeTool, editTool],
messages: [], // 或从 session 恢复
},
convertToLlm: (messages) => messages.filter(m =>
m.role === "user" || m.role === "assistant" || m.role === "toolResult"
),
beforeToolCall: async ({ toolCall }) => {
// 权限控制
},
});
发送消息
// 发送文本
await agent.prompt("帮我读取 main.ts");
// 发送带图片的消息
await agent.prompt("这张图片里有什么?", [imageContent]);
// 发送自定义消息
await agent.prompt({ role: "custom", content: "...", timestamp: Date.now() });
prompt() 是异步的:它启动循环,等循环完全结束后 resolve。
监听事件
const unsubscribe = agent.subscribe((event, signal) => {
switch (event.type) {
case "message_update":
// 更新 UI 上的流式文本
updateUI(event.message);
break;
case "tool_execution_start":
// 显示"正在执行 read..."
showToolProgress(event.toolName);
break;
case "agent_end":
// AI 完成了
hideLoading();
break;
}
});
// 取消订阅
unsubscribe();
事件监听器接收两个参数:事件和当前的 AbortSignal。监听器可以是异步的,Agent 会等所有监听器完成后才继续。
中途插入消息
// 在 AI 当前回合的工具执行后注入
agent.steer({ role: "user", content: "用 TypeScript 重写", timestamp: Date.now() });
// 在 AI 完全停下后再处理
agent.followUp({ role: "user", content: "再加一个测试", timestamp: Date.now() });
取消和等待
// 取消当前操作
agent.abort();
// 等待 AI 空闲
await agent.waitForIdle();
// 检查状态
if (agent.state.isStreaming) {
console.log("AI 正在思考...");
}
内部状态管理
Agent 内部维护一个 MutableAgentState:
MutableAgentState
├── systemPrompt // 系统提示
├── model // 当前模型
├── thinkingLevel // 思考级别
├── tools // 工具列表(getter/setter 会拷贝数组)
├── messages // 消息历史(getter/setter 会拷贝数组)
├── isStreaming // 是否正在运行
├── streamingMessage // 当前正在生成的消息
├── pendingToolCalls // 正在执行的工具 ID
└── errorMessage // 最近的错误
tools 和 messages 使用 getter/setter,赋值时会拷贝数组。这防止外部代码意外修改 Agent 内部状态。
processEvents -- 事件与状态同步
Agent 类收到底层循环发出的每个事件后:
- 更新内部状态:根据事件类型修改
_state - 通知所有监听器:按订阅顺序调用,等待所有异步监听器完成
具体的状态更新规则:
| 事件 | 状态更新 |
|---|---|
message_start | 设置 streamingMessage |
message_update | 更新 streamingMessage |
message_end | 清除 streamingMessage,把消息加入 messages |
tool_execution_start | 添加到 pendingToolCalls |
tool_execution_end | 从 pendingToolCalls 移除 |
turn_end | 如果有错误,设置 errorMessage |
agent_end | 清除 streamingMessage |
PendingMessageQueue -- 消息队列
steering 和 follow-up 各有一个队列,支持两种排空模式:
| 模式 | 行为 |
|---|---|
"all" | 一次性取出所有消息 |
"one-at-a-time" | 一次只取一条(默认) |
one-at-a-time 是默认模式。为什么?因为用户可能快速打了 3 条消息,但 AI 应该一条一条处理,而不是把 3 条合并成一个巨大的输入。
Proxy -- 浏览器端代理
proxy.ts 提供了一个 streamProxy() 函数,用于浏览器端通过服务器代理调用 LLM。
为什么需要?浏览器不能直接调用 LLM API(需要 API key,而 API key 不能暴露在前端)。所以 web-ui 通过一个代理服务器中转:
浏览器 代理服务器 LLM Provider
│ │ │
│ POST /api/stream │ │
│ (model, context) ───────► │ 添加 API key │
│ │ 转发请求 ─────────────────► │
│ │ │
│ ◄──── SSE 事件流 ───────── │ ◄──── SSE 事件流 ────────── │
│ │ │
代理服务器做了一个优化:去掉了事件中的 partial 字段。每个 delta 事件都携带完整的 partial AssistantMessage,在频繁更新时这会产生大量冗余数据。代理服务器去掉它,客户端在本地重建 partial message。
端到端示例
import { Agent } from "@mariozechner/agent";
import { getModel, streamSimple } from "@mariozechner/pi-ai";
// 1. 创建 Agent
const agent = new Agent({
initialState: {
systemPrompt: "你是一个编码助手",
model: getModel("anthropic", "claude-sonnet-4-20250514"),
thinkingLevel: "medium",
tools: [readTool],
},
});
// 2. 监听事件(用于 UI 更新)
agent.subscribe((event) => {
if (event.type === "message_update") {
// 流式显示 AI 的回复
render(event.message);
}
});
// 3. 发送消息
await agent.prompt("帮我读取 main.ts");
// prompt() 返回时,整个循环已经完成
// agent.state.messages 包含了完整的对话历史
// 4. 继续对话
await agent.prompt("把第 10 行改成 console.log");
小结
Agent 类
├── 状态管理
│ ├── messages(对话历史,自动增长)
│ ├── tools(可用工具列表)
│ └── model / thinkingLevel / systemPrompt
│
├── 消息 API
│ ├── prompt() → 发送新消息,启动循环
│ ├── continue() → 从当前状态继续
│ ├── steer() → 中途插入消息
│ └── followUp() → 完成后追加消息
│
├── 控制 API
│ ├── abort() → 取消当前操作
│ ├── waitForIdle() → 等待空闲
│ └── reset() → 重置状态
│
├── 事件 API
│ └── subscribe() → 监听生命周期事件
│
└── 内部
├── runAgentLoop() → 底层循环
├── processEvents() → 事件 → 状态同步
└── PendingMessageQueue → steering/followUp 队列
到这里,第二阶段(agent 包)就完成了。你现在理解了 AI Agent 的核心运行机制。下一阶段我们会看 coding-agent 如何在这个基础上构建完整的编码助手产品。