第四章:Agent 类 -- 有状态的高层封装

对应源文件:packages/agent/src/agent.ts

为什么需要 Agent 类

agent-loop.ts 提供的是纯函数 API:你传入上下文和配置,它运行循环。但在实际应用中,你需要:

  • 管理消息历史(messages 数组随对话增长)
  • 在 AI 工作时插入新消息(steering / follow-up)
  • 取消正在进行的操作(abort)
  • 监听事件更新 UI(subscribe)
  • 等待 AI 空闲(waitForIdle)

Agent 类把这些需求封装起来,提供了一个简洁的 API。

Agent 的核心 API

创建

const agent = new Agent({
  initialState: {
    systemPrompt: "你是一个编码助手...",
    model: getModel("anthropic", "claude-sonnet-4-20250514"),
    thinkingLevel: "medium",
    tools: [readTool, writeTool, editTool],
    messages: [],  // 或从 session 恢复
  },
  convertToLlm: (messages) => messages.filter(m =>
    m.role === "user" || m.role === "assistant" || m.role === "toolResult"
  ),
  beforeToolCall: async ({ toolCall }) => {
    // 权限控制
  },
});

发送消息

// 发送文本
await agent.prompt("帮我读取 main.ts");

// 发送带图片的消息
await agent.prompt("这张图片里有什么?", [imageContent]);

// 发送自定义消息
await agent.prompt({ role: "custom", content: "...", timestamp: Date.now() });

prompt() 是异步的:它启动循环,等循环完全结束后 resolve。

监听事件

const unsubscribe = agent.subscribe((event, signal) => {
  switch (event.type) {
    case "message_update":
      // 更新 UI 上的流式文本
      updateUI(event.message);
      break;
    case "tool_execution_start":
      // 显示"正在执行 read..."
      showToolProgress(event.toolName);
      break;
    case "agent_end":
      // AI 完成了
      hideLoading();
      break;
  }
});

// 取消订阅
unsubscribe();

事件监听器接收两个参数:事件和当前的 AbortSignal。监听器可以是异步的,Agent 会等所有监听器完成后才继续。

中途插入消息

// 在 AI 当前回合的工具执行后注入
agent.steer({ role: "user", content: "用 TypeScript 重写", timestamp: Date.now() });

// 在 AI 完全停下后再处理
agent.followUp({ role: "user", content: "再加一个测试", timestamp: Date.now() });

取消和等待

// 取消当前操作
agent.abort();

// 等待 AI 空闲
await agent.waitForIdle();

// 检查状态
if (agent.state.isStreaming) {
  console.log("AI 正在思考...");
}

内部状态管理

Agent 内部维护一个 MutableAgentState

MutableAgentState
├── systemPrompt     // 系统提示
├── model            // 当前模型
├── thinkingLevel    // 思考级别
├── tools            // 工具列表(getter/setter 会拷贝数组)
├── messages         // 消息历史(getter/setter 会拷贝数组)
├── isStreaming       // 是否正在运行
├── streamingMessage  // 当前正在生成的消息
├── pendingToolCalls  // 正在执行的工具 ID
└── errorMessage      // 最近的错误

toolsmessages 使用 getter/setter,赋值时会拷贝数组。这防止外部代码意外修改 Agent 内部状态。


processEvents -- 事件与状态同步

Agent 类收到底层循环发出的每个事件后:

  1. 更新内部状态:根据事件类型修改 _state
  2. 通知所有监听器:按订阅顺序调用,等待所有异步监听器完成

具体的状态更新规则:

事件状态更新
message_start设置 streamingMessage
message_update更新 streamingMessage
message_end清除 streamingMessage,把消息加入 messages
tool_execution_start添加到 pendingToolCalls
tool_execution_endpendingToolCalls 移除
turn_end如果有错误,设置 errorMessage
agent_end清除 streamingMessage

PendingMessageQueue -- 消息队列

steering 和 follow-up 各有一个队列,支持两种排空模式:

模式行为
"all"一次性取出所有消息
"one-at-a-time"一次只取一条(默认)

one-at-a-time 是默认模式。为什么?因为用户可能快速打了 3 条消息,但 AI 应该一条一条处理,而不是把 3 条合并成一个巨大的输入。


Proxy -- 浏览器端代理

proxy.ts 提供了一个 streamProxy() 函数,用于浏览器端通过服务器代理调用 LLM

为什么需要?浏览器不能直接调用 LLM API(需要 API key,而 API key 不能暴露在前端)。所以 web-ui 通过一个代理服务器中转:

浏览器                      代理服务器                     LLM Provider
  │                            │                              │
  │ POST /api/stream           │                              │
  │ (model, context)  ───────► │ 添加 API key                 │
  │                            │ 转发请求  ─────────────────► │
  │                            │                              │
  │ ◄──── SSE 事件流 ───────── │ ◄──── SSE 事件流 ──────────  │
  │                            │                              │

代理服务器做了一个优化:去掉了事件中的 partial 字段。每个 delta 事件都携带完整的 partial AssistantMessage,在频繁更新时这会产生大量冗余数据。代理服务器去掉它,客户端在本地重建 partial message。


端到端示例

import { Agent } from "@mariozechner/agent";
import { getModel, streamSimple } from "@mariozechner/pi-ai";

// 1. 创建 Agent
const agent = new Agent({
  initialState: {
    systemPrompt: "你是一个编码助手",
    model: getModel("anthropic", "claude-sonnet-4-20250514"),
    thinkingLevel: "medium",
    tools: [readTool],
  },
});

// 2. 监听事件(用于 UI 更新)
agent.subscribe((event) => {
  if (event.type === "message_update") {
    // 流式显示 AI 的回复
    render(event.message);
  }
});

// 3. 发送消息
await agent.prompt("帮我读取 main.ts");
// prompt() 返回时,整个循环已经完成
// agent.state.messages 包含了完整的对话历史

// 4. 继续对话
await agent.prompt("把第 10 行改成 console.log");

小结

Agent 类
├── 状态管理
│   ├── messages(对话历史,自动增长)
│   ├── tools(可用工具列表)
│   └── model / thinkingLevel / systemPrompt
│
├── 消息 API
│   ├── prompt()      → 发送新消息,启动循环
│   ├── continue()    → 从当前状态继续
│   ├── steer()       → 中途插入消息
│   └── followUp()    → 完成后追加消息
│
├── 控制 API
│   ├── abort()       → 取消当前操作
│   ├── waitForIdle() → 等待空闲
│   └── reset()       → 重置状态
│
├── 事件 API
│   └── subscribe()   → 监听生命周期事件
│
└── 内部
    ├── runAgentLoop()   → 底层循环
    ├── processEvents()  → 事件 → 状态同步
    └── PendingMessageQueue → steering/followUp 队列

到这里,第二阶段(agent 包)就完成了。你现在理解了 AI Agent 的核心运行机制。下一阶段我们会看 coding-agent 如何在这个基础上构建完整的编码助手产品。