OpenClaw 核心原理深度解析:Agent运行时与记忆系统内部实现
前言
在 AI Agent 框架百花齐放的 2026 年,OpenClaw 凭借其独特的五层架构设计——Gateway 网关层、Agent 运行时层、Skill 技能层、Memory 记忆层、Plugin 插件层——在开源社区中脱颖而出。其中,Agent 运行时与Memory 记忆系统是整个架构的脊梁:前者决定了 Agent 如何思考、决策和执行,后者决定了 Agent 能否跨越会话边界积累和运用知识。
2026.6.6 版本对这两大核心模块进行了关键更新:ContextEngine 上下文引擎重构、混合检索机制优化、子 Agent 调度能力增强。这些改进不仅提升了系统性能,更从根本上改变了 Agent 的行为模式。
本文将从执行流程、数据结构、核心算法三个维度,深入拆解 Agent 运行时与记忆系统的内部实现逻辑。无论你是想理解 OpenClaw 的设计哲学,还是希望基于 OpenClaw 进行二次开发,这篇文章都将为你提供系统性的技术参考。
第一部分 Agent智能体运行时原理与内部实现
1.1 Agent运行时的架构定位与核心职责
1.1.1 在「Gateway - Agent - Skill」三层链路中的承上启下作用
OpenClaw 的请求处理遵循清晰的三层链路:
用户请求 → Gateway网关层 → Agent运行时层 → Skill技能层
↑ ↑ ↑
协议适配&路由 思考决策&调度 工具执行&返回
Gateway 层负责协议适配(HTTP/WebSocket/Webhook)、身份认证、请求路由和限流熔断,将外部请求标准化后转发给 Agent 层。
Agent 运行时层是整个系统的"大脑",接收 Gateway 转发的标准化请求后,负责思考决策、任务拆解、工具调度和状态维护。它向上对接 Gateway 的请求上下文,向下调度 Skill 层的具体工具执行。
Skill 层是"手脚",提供 52+ 内置技能(以 .md 文件形式定义),每个 Skill 封装了特定领域能力(如代码执行、Web 搜索、文件操作等),由 Agent 运行时按需加载和调用。
Agent 运行时在这三层中的核心价值在于:它不是简单的请求转发器,而是一个具备自主决策能力的智能调度中心。它需要理解用户意图、规划执行路径、选择合适工具、处理异常情况,并在整个过程中维护一致的上下文状态。
1.1.2 核心职责边界
Agent 运行时的职责可以精确划分为四个维度:
| 职责 | 说明 | 关键机制 |
|---|---|---|
| 任务调度 | 接收请求、拆解子任务、编排执行顺序 | 任务图(DAG)构建、依赖分析 |
| 思考决策 | 调用 LLM 生成决策、解析工具调用指令 | ReAct 循环、Prompt 动态组装 |
| 执行管控 | 调度工具执行、监控进度、处理异常 | 同步/异步调度、熔断重试 |
| 状态维护 | 管理会话上下文、维护执行状态 | 状态机、ContextEngine |
这四个维度形成了一个闭环:任务调度触发思考决策,思考决策驱动执行管控,执行管控更新状态维护,状态维护反过来影响下一轮任务调度。
1.1.3 关键依赖模块
Agent 运行时的核心依赖有三个:
- ContextEngine 上下文引擎:控制每次 LLM 调用的上下文组装——哪些消息纳入、旧历史如何摘要压缩、跨子 Agent 上下文如何传递。2026.6.6 版本对其进行了重构,引入了可插拔引擎机制。
- NodeManager 节点管理器:管理本地和远程执行节点的注册、健康检查和负载均衡,决定工具调用路由到哪个节点执行。
- SkillLoader 技能加载器:负责 Skill 的发现、加载、缓存和生命周期管理,支持懒加载和热更新。
1.2 Agent完整生命周期管理
Agent 实例从创建到销毁,经历五个核心状态:
创建(Instantiate) → 就绪(Ready) → 运行(Running) ⇄ 挂起(Suspended) → 销毁(Destroyed)
1.2.1 实例创建:Soul配置加载与运行时初始化
每个 Agent 实例由一个 Soul 配置文件(SOUL.md)定义,它描述了 Agent 的身份、行为规则和约束。创建流程如下:
# 伪代码:Agent 实例创建流程
class AgentFactory:
async def create(self, soul_config: SoulConfig) -> Agent:
# 1. 解析 Soul 配置
soul = SoulParser.parse(soul_config)
# 2. 初始化运行时组件
context_engine = ContextEngineFactory.create(soul.context_policy)
node_manager = NodeManager(soul.node_config)
skill_loader = SkillLoader(soul.skill_paths)
# 3. 构建 Agent 实例
agent = Agent(
soul=soul,
context_engine=context_engine,
node_manager=node_manager,
skill_loader=skill_loader,
state=AgentState.INSTANTIATED
)
# 4. 预加载必要资源
await agent.warmup()
return agent
关键步骤包括:Soul 配置解析(提取系统提示词、工具白名单、约束规则)、ContextEngine 初始化(根据配置选择上下文管理策略)、SkillLoader 预扫描(发现可用技能但不加载实现)。
1.2.2 就绪状态:资源预加载与会话绑定
Agent 进入就绪状态后,会完成两项关键准备:
- 资源预加载:加载 Soul 中标记为
preload的技能、预热 LLM 连接池、初始化记忆检索索引 - 会话绑定:将 Agent 实例与特定用户会话关联,建立会话级的上下文隔离边界
async def warmup(self):
"""资源预加载"""
# 预加载标记为 preload 的技能
for skill_name in self.soul.preload_skills:
await self.skill_loader.load(skill_name)
# 预热 LLM Provider 连接
await self.llm_provider.warmup()
# 初始化记忆检索索引
await self.memory_engine.ensure_index()
1.2.3 运行状态:进入主循环与任务接收处理
Agent 进入运行状态后,核心是启动 AgentLoop 主循环,持续监听并处理来自 Gateway 的请求:
async def run(self, initial_input: str) -> LoopResult:
"""Agent 主循环入口"""
state = await self.state_manager.initialize(initial_input)
iteration = 0
while iteration < self.max_iterations:
iteration += 1
# 感知阶段:接收当前状态
perception = await self._perceive(state)
# 决策阶段:调用 LLM 生成决策
decision = await self._decide(perception)
# 执行阶段:调度工具执行
result = await self._act(decision)
# 反思阶段:更新状态,判断是否继续
state = await self._reflect(result)
if state.should_stop:
break
return LoopResult(state=state, iteration=iteration)
1.2.4 挂起/恢复:上下文持久化与续跑
在长时间运行的任务中,Agent 可能需要挂起(等待外部事件、用户确认、异步操作完成)。挂起时的关键操作是上下文持久化:
async def suspend(self, reason: SuspendReason) -> Snapshot:
"""挂起 Agent,保存完整上下文快照"""
snapshot = Snapshot(
agent_id=self.id,
state=self.state,
context=self.context_engine.export(),
memory_cursor=self.memory_engine.get_cursor(),
pending_tasks=self.task_queue.dump(),
reason=reason,
timestamp=datetime.now()
)
await self.snapshot_store.save(snapshot)
self.state = AgentState.SUSPENDED
return snapshot
async def resume(self, snapshot_id: str) -> None:
"""从快照恢复 Agent"""
snapshot = await self.snapshot_store.load(snapshot_id)
self.state = snapshot.state
await self.context_engine.restore(snapshot.context)
await self.memory_engine.seek(snapshot.memory_cursor)
self.task_queue.load(snapshot.pending_tasks)
self.state = AgentState.RUNNING
1.2.5 销毁流程:资源回收、记忆归档与会话清理
Agent 销毁时需要完成三项清理工作:
- 资源回收:释放 LLM 连接、关闭文件句柄、清理临时数据
- 记忆归档:将短期记忆中的有价值内容提取并归档到长期记忆
- 会话清理:解除会话绑定、清理会话级缓存、通知 Gateway 释放路由条目
1.2.6 2026.6.6版本优化:Agent池化复用与懒加载机制
2026.6.6 版本引入了两个重要的性能优化:
Agent 池化复用:类似数据库连接池,预创建一批 Agent 实例放入池中,请求到来时直接从池中获取,避免每次请求都走完整的创建流程:
class AgentPool:
def __init__(self, soul_config: SoulConfig, pool_size: int = 10):
self.pool = asyncio.Queue(maxsize=pool_size)
self.soul_config = soul_config
async def acquire(self) -> Agent:
if self.pool.empty():
agent = await AgentFactory.create(self.soul_config)
else:
agent = await self.pool.get()
await agent.reset() # 重置状态但保留预加载资源
return agent
async def release(self, agent: Agent):
await agent.archive_memory() # 归档记忆
await self.pool.put(agent)
懒加载机制:Skill 不再在 Agent 创建时全部加载,而是在首次被调用时才加载实现代码,大幅减少冷启动时间。
1.3 ReAct思考循环:核心执行引擎源码级拆解
ReAct(Reasoning + Acting)循环是 Agent 运行时的心脏。OpenClaw 采用双层循环架构:外层循环处理用户 follow-up,内层循环处理工具调用链。
1.3.1 主循环入口与状态机设计
AgentLoop 控制器的核心数据结构:
@dataclass
class AgentLoop:
soul: Soul # Agent 身份配置
context_engine: ContextEngine # 上下文引擎
llm_provider: LLMProvider # LLM 调用封装
tool_registry: ToolRegistry # 工具注册表
state_manager: StateManager # 状态管理器
max_iterations: int = 25 # 最大迭代次数
stop_conditions: List[StopCondition] = field(default_factory=list)
循环状态机:Agent 在每次迭代中经历五个状态的流转:
idle → thinking → acting → observing → (idle | error)
↑ |
└────────────────────────────────────┘
| 状态 | 触发条件 | 核心动作 |
|---|---|---|
idle |
初始状态 / 上一轮观察完成 | 等待新输入或进入下一轮迭代 |
thinking |
进入新迭代 | 组装 Prompt → 调用 LLM → 获取决策 |
acting |
LLM 返回工具调用指令 | 解析指令 → 路由执行 → 获取结果 |
observing |
工具执行完成 | 结果封装 → 上下文更新 → 判断是否继续 |
error |
异常发生 | 分级重试 / 熔断 / 降级 |
双层循环的完整结构:
# 外层循环:处理用户 follow-up
while True:
# 内层循环:处理工具调用链
while has_more_tool_calls or has_pending_messages:
# [1] 注入待处理的 steering/follow-up 消息
inject_pending_messages(context)
# [2] 流式调用 LLM
response = await stream_assistant_response(context)
# [3] 检查停止原因
if response.stop_reason == "tool_call":
# [4] 解析并执行工具调用
tool_results = await execute_tool_calls(response.tool_calls)
# [5] 将结果追加到上下文
context.append(tool_results)
elif response.stop_reason == "end_turn":
break # 内层循环结束
# 检查是否有 follow-up
if not has_follow_up:
break # 外层循环结束
1.3.2 Think阶段:决策生成的完整链路
Think 阶段是 ReAct 循环中最复杂的环节,涉及动态 Prompt 组装、模型调用和 Token 预算管理。
动态 Prompt 组装:
每次 LLM 调用前,ContextEngine 会按以下优先级拼接上下文:
系统提示词(SOUL.md) + 记忆召回结果 + 工具清单 + 历史交互 + 当前输入
async def assemble_prompt(self, state: AgentState) -> PromptPacket:
"""动态 Prompt 组装"""
# 1. 系统提示词(最高优先级,不可截断)
system_prompt = self.soul.system_prompt
# 2. 记忆召回(按相关性排序)
memories = await self.memory_engine.recall(
query=state.current_input,
top_k=5,
min_relevance=0.7
)
# 3. 工具清单(仅列出当前可用的工具)
available_tools = self.tool_registry.get_available_tools(
permissions=state.permissions
)
# 4. 历史交互(受 Token 预算控制)
history = self.context_engine.get_history(
max_tokens=self.token_budget - reserved_tokens
)
# 5. 当前输入
current_input = state.current_input
return PromptPacket(
system=system_prompt,
memories=memories,
tools=available_tools,
history=history,
input=current_input
)
模型调用封装:
async def call_llm(self, prompt: PromptPacket) -> LLMResponse:
"""模型调用封装,支持流式响应和超时控制"""
try:
response = await asyncio.wait_for(
self.llm_provider.chat(
messages=prompt.to_messages(),
tools=prompt.to_tool_definitions(),
stream=True
),
timeout=self.config.llm_timeout # 默认 120s
)
return response
except asyncio.TimeoutError:
raise LLMTimeoutError(f"LLM 调用超时 ({self.config.llm_timeout}s)")
Token 预算管理:
Token 预算是成本控制和上下文窗口管理的关键机制:
class TokenBudget:
def __init__(self, total: int = 128000):
self.total = total
self.reserved_for_system = 2000 # 系统提示词预留
self.reserved_for_tools = 1000 # 工具定义预留
self.reserved_for_output = 4096 # 输出预留
@property
def available_for_history(self) -> int:
return (self.total
- self.reserved_for_system
- self.reserved_for_tools
- self.reserved_for_output)
def truncate_history(self, messages: List[Message]) -> List[Message]:
"""按 Token 预算截断历史消息"""
budget = self.available_for_history
result = []
for msg in reversed(messages): # 从最新消息开始保留
if msg.token_count <= budget:
result.insert(0, msg)
budget -= msg.token_count
else:
# 对超长消息进行摘要压缩
summary = self.summarize(msg)
if summary.token_count <= budget:
result.insert(0, summary)
budget -= summary.token_count
break
return result
1.3.3 Parse阶段:工具指令的解析与校验
LLM 返回的响应需要经过严格的解析和校验才能进入执行阶段。
结构化输出解析:
def parse_tool_calls(self, response: LLMResponse) -> List[ToolCall]:
"""解析 LLM 响应中的工具调用指令"""
tool_calls = []
for block in response.content_blocks:
if block.type != "tool_use":
continue
try:
# JSON 格式校验与参数提取
params = json.loads(block.input)
tool_call = ToolCall(
id=block.id,
name=block.name,
params=params
)
tool_calls.append(tool_call)
except json.JSONDecodeError as e:
# 格式错误时的多轮修正
corrected = self._attempt_correction(block.input, e)
if corrected:
tool_calls.append(corrected)
return tool_calls
工具合法性校验:
def validate_tool_call(self, call: ToolCall, permissions: Permissions) -> ValidationResult:
"""校验工具调用的合法性"""
# 1. 工具是否存在
tool = self.tool_registry.get(call.name)
if not tool:
return ValidationResult.error(f"未知工具: {call.name}")
# 2. 权限校验
if not permissions.has_permission(tool.required_permission):
return ValidationResult.error(f"权限不足: {tool.required_permission}")
# 3. 参数 Schema 匹配
try:
tool.param_schema.validate(call.params)
except SchemaValidationError as e:
return ValidationResult.error(f"参数校验失败: {e}")
return ValidationResult.ok()
异常解析兜底:当 LLM 输出的工具调用格式不正确时,OpenClaw 会尝试多轮修正:
- 第一轮:尝试 JSON 修复(补全括号、修复引号)
- 第二轮:提取关键参数,用默认值填充缺失字段
- 第三轮:将原始输出回注 LLM,请求重新生成
1.3.4 Act阶段:工具执行的调度与管控
本地/远程节点的路由决策:
async def route_tool_call(self, call: ToolCall) -> Node:
"""工具调用路由决策"""
tool = self.tool_registry.get(call.name)
if tool.execution_mode == "local":
return self.node_manager.get_local_node()
# 远程节点选择:基于负载和亲和性
candidates = self.node_manager.get_remote_nodes(
capability=tool.required_capability
)
# 加权评分:负载(40%) + 延迟(30%) + 亲和性(30%)
scored = []
for node in candidates:
score = (0.4 * (1 - node.load_factor) +
0.3 * (1 - node.latency / MAX_LATENCY) +
0.3 * node.affinity_score(call.session_id))
scored.append((node, score))
return max(scored, key=lambda x: x[1])[0]
同步/异步执行模式:
async def execute_tool(self, call: ToolCall, node: Node) -> ToolResult:
"""工具执行,支持同步和异步模式"""
tool = self.tool_registry.get(call.name)
if tool.execution_mode == "sync":
# 同步执行:等待结果返回
result = await node.execute(call, timeout=tool.timeout)
return result
elif tool.execution_mode == "async":
# 异步执行:返回任务ID,后续轮询结果
task_id = await node.submit(call)
# 注册进度回调
await node.on_progress(task_id, self._handle_progress)
# 等待完成(带超时)
result = await node.wait_for_result(task_id, timeout=tool.timeout)
return result
执行过程监督:支持进度上报和中断响应:
async def _handle_progress(self, task_id: str, progress: Progress):
"""处理工具执行进度"""
# 更新状态中的进度信息
self.state.update_task_progress(task_id, progress)
# 如果用户发送了中断信号
if self.state.has_interrupt_signal():
await self.node_manager.cancel_task(task_id)
raise TaskInterruptedError(f"任务被用户中断: {task_id}")
1.3.5 Observe阶段:结果反馈与上下文更新
Observe 阶段是 ReAct 循环的收尾,也是连接下一轮迭代的桥梁。
执行结果的标准化封装:
def standardize_result(self, raw_result: Any, tool: Tool) -> ToolResult:
"""将工具执行结果标准化封装"""
if isinstance(raw_result, ToolResult):
return raw_result
return ToolResult(
tool_name=tool.name,
status="success" if raw_result else "empty",
output=str(raw_result),
metadata={
"execution_time": raw_result.execution_time if hasattr(raw_result, 'execution_time') else None,
"token_count": count_tokens(str(raw_result)),
}
)
错误结果的语义化转换:将技术性错误信息转换为 LLM 可理解的语义描述:
def semantic_error_transform(self, error: Exception) -> str:
"""将技术错误转换为语义化描述"""
error_map = {
TimeoutError: "工具执行超时,请考虑简化任务或增加超时时间",
PermissionError: "权限不足,当前会话无权执行此操作",
ConnectionError: "远程节点连接失败,请稍后重试或切换到本地执行",
SchemaValidationError: "工具参数格式不正确,请检查参数类型和必填项",
}
for error_type, message in error_map.items():
if isinstance(error, error_type):
return message
return f"工具执行失败: {str(error)}"
短期记忆的同步写入:每次工具执行结果都会同步写入短期记忆:
async def sync_to_short_term_memory(self, result: ToolResult):
"""将执行结果同步写入短期记忆"""
memory_entry = MemoryEntry(
content=f"[{result.tool_name}] {result.output}",
type="tool_result",
importance=self._calculate_importance(result),
timestamp=datetime.now(),
session_id=self.session_id,
metadata=result.metadata
)
await self.memory_engine.write_short_term(memory_entry)
1.4 任务拆解与工具调用编排机制
1.4.1 复杂任务的层级拆解算法
当用户请求涉及多个步骤时,Agent 需要将复杂任务拆解为可执行的子任务图(DAG):
class TaskDecomposer:
async def decompose(self, task: Task) -> TaskGraph:
"""将复杂任务拆解为子任务 DAG"""
# 1. 调用 LLM 生成拆解方案
decomposition = await self.llm_provider.chat(
system="你是一个任务规划专家,将复杂任务拆解为可并行/串行执行的子任务",
input=f"任务: {task.description}\n可用工具: {self.tool_registry.list_names()}"
)
# 2. 解析为任务图
graph = TaskGraph()
for subtask in decomposition.subtasks:
node = TaskNode(
id=subtask.id,
description=subtask.description,
tool=subtask.suggested_tool,
dependencies=subtask.depends_on
)
graph.add_node(node)
# 3. 校验 DAG 合法性(无环、依赖可达)
graph.validate()
return graph
1.4.2 多工具调用的依赖编排与并行调度
class TaskScheduler:
async def execute_graph(self, graph: TaskGraph) -> Dict[str, ToolResult]:
"""执行任务图,支持并行调度"""
results = {}
completed = set()
ready_queue = asyncio.Queue()
# 初始化:将无依赖的节点放入就绪队列
for node in graph.get_root_nodes():
await ready_queue.put(node)
while completed != graph.all_node_ids:
# 取出就绪节点
node = await ready_queue.get()
# 并行执行无相互依赖的节点
batch = [node]
while not ready_queue.empty():
next_node = ready_queue.get_nowait()
if next_node.dependencies.issubset(completed):
batch.append(next_node)
# 并行执行
task_results = await asyncio.gather(*[
self._execute_node(n) for n in batch
])
# 收集结果,解锁后续节点
for n, result in zip(batch, task_results):
results[n.id] = result
completed.add(n.id)
# 检查后续节点是否就绪
for successor in graph.get_successors(n.id):
if successor.dependencies.issubset(completed):
await ready_queue.put(successor)
return results
1.4.3 工具选择评分机制
当多个工具都能完成同一任务时,Agent 通过加权评分选择最优工具:
def score_tool(self, tool: Tool, context: AgentState) -> float:
"""工具选择评分:相关性 × 历史成功率 × 调用成本"""
# 相关性评分(0-1):工具描述与当前任务的语义相似度
relevance = self.embedding_similarity(
tool.description, context.current_task.description
)
# 历史成功率(0-1):该工具在类似任务中的历史表现
success_rate = self.history_tracker.get_success_rate(
tool_name=tool.name,
task_type=context.current_task.type
)
# 调用成本(0-1,归一化):延迟 + Token 消耗
cost = self._normalize_cost(tool.avg_latency, tool.avg_token_cost)
# 加权评分
score = (0.5 * relevance +
0.3 * success_rate +
0.2 * (1 - cost)) # 成本越低越好
return score
1.4.4 子任务的状态追踪与结果汇总
@dataclass
class SubTaskTracker:
task_id: str
status: str = "pending" # pending / running / success / failed
result: Optional[ToolResult] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
def mark_running(self):
self.status = "running"
self.start_time = datetime.now()
def mark_success(self, result: ToolResult):
self.status = "success"
self.result = result
self.end_time = datetime.now()
def mark_failed(self, error: str):
self.status = "failed"
self.result = ToolResult(tool_name="unknown", status="failed", output=error)
self.end_time = datetime.now()
1.5 多Agent协同与子Agent管理
1.5.1 Master-Slave协同模式
OpenClaw 采用 Master-Slave 协同模式处理复杂任务:Master Agent 负责任务规划和调度,Slave Agent 负责具体执行。
用户请求
↓
Master Agent(规划、调度、汇总)
├── Slave Agent 1(执行子任务A)
├── Slave Agent 2(执行子任务B)
└── Slave Agent 3(执行子任务C)
↓
结果合并 → 返回用户
1.5.2 Sub-Agent的创建、任务分配与资源隔离
class SubAgentManager:
async def create_sub_agent(self, task: SubTask, parent: Agent) -> Agent:
"""创建子 Agent,继承父 Agent 的部分上下文"""
# 1. 构建子 Agent 的 Soul(继承父 Agent 的部分规则)
sub_soul = Soul(
system_prompt=f"你是子Agent,负责执行: {task.description}\n"
f"父Agent规则: {parent.soul.core_rules}",
skills=task.required_skills,
permissions=parent.permissions.intersect(task.required_permissions)
)
# 2. 创建子 Agent 实例
sub_agent = await AgentFactory.create(sub_soul)
# 3. 注入任务上下文(仅传递必要信息,实现资源隔离)
await sub_agent.context_engine.inject(task.context_packet)
# 4. 注册回调,监听子 Agent 状态
sub_agent.on_complete(self._handle_sub_agent_complete)
sub_agent.on_error(self._handle_sub_agent_error)
return sub_agent
1.5.3 ACP协议:Agent间通信
Agent Communication Protocol(ACP)定义了 Agent 间通信的标准消息格式:
@dataclass
class ACPMessage:
"""Agent 通信协议消息"""
sender: str # 发送方 Agent ID
receiver: str # 接收方 Agent ID
type: str # 消息类型: task_assign / result_report / status_query / cancel
payload: Dict # 消息负载
timestamp: datetime # 时间戳
correlation_id: str # 关联ID(用于请求-响应匹配)
def to_json(self) -> str:
return json.dumps({
"sender": self.sender,
"receiver": self.receiver,
"type": self.type,
"payload": self.payload,
"timestamp": self.timestamp.isoformat(),
"correlation_id": self.correlation_id
})
交互流程:
Master → Slave: task_assign(分配子任务)
Slave → Master: status_query(请求状态确认)
Master → Slave: status_response(返回确认)
Slave → Master: result_report(汇报执行结果)
Master: result_merge(合并多个 Slave 的结果)
1.5.4 协同任务的冲突消解与结果合并
当多个子 Agent 的结果存在冲突时,Master Agent 通过以下策略消解:
- 优先级策略:按子 Agent 的置信度排序,取置信度最高的结果
- 投票策略:多个子 Agent 对同一问题给出不同答案时,取多数一致的结果
- LLM 裁决:将冲突结果提交 LLM,由 LLM 判断哪个更合理
async def resolve_conflict(self, results: List[SubTaskResult]) -> SubTaskResult:
"""冲突消解"""
if len(results) == 1:
return results[0]
# 检查是否一致
if all(r.output == results[0].output for r in results):
return results[0]
# 优先级策略
if any(r.confidence >= 0.95 for r in results):
return max(results, key=lambda r: r.confidence)
# LLM 裁决
verdict = await self.llm_provider.chat(
system="你是结果裁决专家,从多个候选结果中选择最合理的一个",
input=f"候选结果: {[r.output for r in results]}"
)
return results[int(verdict.choice)]
1.6 容错与自愈机制实现
1.6.1 分级重试策略
OpenClaw 根据错误类型采用差异化的重试策略:
| 错误类型 | 重试次数 | 退避策略 | 示例 |
|---|---|---|---|
| 参数错误 | 1 次 | 无退避 | JSON 格式错误,修正后重试 |
| 执行失败 | 3 次 | 指数退避 | 网络超时、节点不可达 |
| 模型异常 | 2 次 | 固定间隔 | LLM 返回空响应、Token 超限 |
class RetryPolicy:
async def execute_with_retry(self, fn, *args, **kwargs):
last_error = None
for attempt in range(self.max_retries):
try:
return await fn(*args, **kwargs)
except ParameterError as e:
# 参数错误:尝试自动修正后重试一次
if attempt == 0:
corrected = self._auto_correct_params(e)
if corrected:
continue
raise
except ExecutionError as e:
last_error = e
wait_time = self.base_delay * (2 ** attempt) # 指数退避
await asyncio.sleep(wait_time)
except ModelError as e:
last_error = e
if attempt < self.max_retries - 1:
await asyncio.sleep(self.fixed_delay)
else:
# 触发模型降级
raise ModelFallbackTrigger(e)
raise last_error
1.6.2 熔断机制
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = "closed" # closed / open / half_open
self.last_failure_time = None
async def call(self, fn, *args, **kwargs):
if self.state == "open":
# 检查是否进入半开状态
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half_open"
else:
raise CircuitOpenError("熔断器开启,请求被拒绝")
try:
result = await fn(*args, **kwargs)
if self.state == "half_open":
self.state = "closed"
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
raise
1.6.3 模型降级回退
当主模型故障时,自动切换到备用模型:
class ModelFallbackChain:
def __init__(self, models: List[ModelConfig]):
self.models = models # 按优先级排序
self.current_index = 0
async def chat(self, **kwargs) -> LLMResponse:
for i in range(self.current_index, len(self.models)):
model = self.models[i]
try:
response = await model.provider.chat(**kwargs)
self.current_index = i # 更新当前可用模型
return response
except ModelError:
continue
raise AllModelsFailedError("所有模型均不可用")
1.6.4 异常状态的自动恢复与人工介入入口
OpenClaw 在 Agent 运行时中设计了自动恢复 + 人工介入的双通道机制:
- 自动恢复通道:对于可预测的异常(超时、网络错误、模型降级),系统自动执行重试/熔断/降级策略
- 人工介入入口:对于不可预测的异常(逻辑错误、无限循环、安全风险),系统通过 Gateway 向用户发送介入请求,用户可以:
- 修改 Agent 的当前上下文
- 手动指定下一步操作
- 终止当前任务
第二部分 Memory记忆系统原理与内部实现
2.1 三层记忆模型架构设计
OpenClaw 的记忆系统采用三层记忆模型,模拟人类大脑的信息处理机制:
工作记忆(Working Memory) ←→ 短期记忆(Short-Term Memory) ←→ 长期记忆(Long-Term Memory)
↑ 实时上下文 ↑ 会话级缓存 ↑ 持久化知识库
容量: ~4K tokens 容量: ~32K tokens 容量: 无限
生命周期: 单次调用 生命周期: 单次会话 生命周期: 永久
存储: 内存 存储: SQLite 内存表 存储: 向量数据库
2.1.1 工作记忆:实时上下文窗口
工作记忆是最顶层的记忆,对应 LLM 的上下文窗口。它存储当前正在处理的对话轮次、工具调用结果和临时推理状态。
@dataclass
class WorkingMemory:
"""工作记忆:实时上下文窗口"""
messages: List[Message] # 当前对话消息列表
active_tool_calls: List[ToolCall] # 正在执行的工具调用
reasoning_state: Dict # 临时推理状态(如中间计算结果)
token_count: int = 0 # 当前 Token 计数
def add_message(self, message: Message):
"""添加消息,自动更新 Token 计数"""
self.messages.append(message)
self.token_count += message.token_count
def is_overflow(self, budget: TokenBudget) -> bool:
"""检查是否超出 Token 预算"""
return self.token_count > budget.available_for_history
工作记忆的核心特征是容量有限但访问速度极快——它是 LLM 直接"看到"的信息,任何需要被模型处理的数据都必须先进入工作记忆。
2.1.2 短期记忆:会话级信息缓存
短期记忆存储当前会话中的交互历史和中间结果,使用 SQLite 内存表实现,支持结构化查询和模糊检索。
class ShortTermMemory:
"""短期记忆:会话级信息缓存"""
def __init__(self, session_id: str, max_entries: int = 500):
self.session_id = session_id
self.max_entries = max_entries
self.conn = sqlite3.connect(":memory:")
self._init_schema()
def _init_schema(self):
"""初始化内存表结构"""
self.conn.execute("""
CREATE TABLE IF NOT EXISTS short_term_memory (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
type TEXT NOT NULL, -- user_input / tool_result / reasoning / system
importance REAL DEFAULT 0.5, -- 重要性评分 0-1
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
token_count INTEGER DEFAULT 0,
metadata TEXT -- JSON 格式附加信息
)
""")
self.conn.execute("""
CREATE INDEX IF NOT EXISTS idx_type ON short_term_memory(type)
""")
self.conn.execute("""
CREATE INDEX IF NOT EXISTS idx_importance ON short_term_memory(importance)
""")
async def write(self, entry: MemoryEntry):
"""写入短期记忆"""
self.conn.execute(
"INSERT INTO short_term_memory (content, type, importance, timestamp, token_count, metadata) VALUES (?, ?, ?, ?, ?, ?)",
(entry.content, entry.type, entry.importance, entry.timestamp, entry.token_count, json.dumps(entry.metadata))
)
# 容量控制:超出上限时淘汰低重要性条目
count = self.conn.execute("SELECT COUNT(*) FROM short_term_memory").fetchone()[0]
if count > self.max_entries:
self.conn.execute(
"DELETE FROM short_term_memory WHERE id IN (SELECT id FROM short_term_memory ORDER BY importance ASC LIMIT ?)",
(count - self.max_entries,)
)
async def recall(self, query: str, top_k: int = 5) -> List[MemoryEntry]:
"""从短期记忆中检索"""
# 基于关键词和重要性的混合检索
results = self.conn.execute(
"""SELECT * FROM short_term_memory
WHERE content LIKE ? OR type = 'reasoning'
ORDER BY importance DESC, timestamp DESC
LIMIT ?""",
(f"%{query}%", top_k)
).fetchall()
return [self._row_to_entry(r) for r in results]
2.1.3 长期记忆:持久化知识库
长期记忆是 Agent 跨会话知识积累的核心,使用向量数据库存储,支持语义检索。
class LongTermMemory:
"""长期记忆:持久化知识库"""
def __init__(self, vector_store: VectorStore, embedding_model: EmbeddingModel):
self.vector_store = vector_store
self.embedding_model = embedding_model
async def store(self, entry: MemoryEntry) -> str:
"""存储到长期记忆"""
# 生成向量嵌入
embedding = await self.embedding_model.embed(entry.content)
# 构建元数据
metadata = {
"type": entry.type,
"importance": entry.importance,
"session_id": entry.session_id,
"timestamp": entry.timestamp.isoformat(),
**entry.metadata
}
# 写入向量数据库
doc_id = await self.vector_store.add(
documents=[entry.content],
embeddings=[embedding],
metadatas=[metadata],
ids=[f"mem_{uuid4().hex}"]
)
return doc_id[0]
async def recall(self, query: str, top_k: int = 5, min_relevance: float = 0.7) -> List[MemoryEntry]:
"""语义检索长期记忆"""
query_embedding = await self.embedding_model.embed(query)
results = await self.vector_store.search(
query_embedding=query_embedding,
top_k=top_k,
filter_metadata={"importance": {"$gte": min_relevance}}
)
return [
MemoryEntry(
content=r.document,
type=r.metadata.get("type", "unknown"),
importance=r.metadata.get("importance", 0.5),
timestamp=datetime.fromisoformat(r.metadata["timestamp"]),
session_id=r.metadata.get("session_id"),
metadata={k: v for k, v in r.metadata.items() if k not in ["type", "importance", "timestamp", "session_id"]}
)
for r in results
]
2.1.4 三层记忆的流转规则与淘汰策略
信息在三层记忆间的流转遵循明确的规则:
| 流转方向 | 触发条件 | 核心动作 |
|---|---|---|
| 工作 → 短期 | 每轮 ReAct 迭代结束 | 将当前轮次的消息和结果写入短期记忆 |
| 短期 → 长期 | 会话结束 / 重要性超阈值 | 提取高价值内容归档到长期记忆 |
| 长期 → 工作 | 每次组装 Prompt 时 | 根据当前查询语义检索长期记忆,注入工作记忆 |
| 短期 → 工作 | 每次组装 Prompt 时 | 根据相关性检索短期记忆,注入工作记忆 |
淘汰策略:
- 工作记忆:超出 Token 预算时,对旧消息进行摘要压缩
- 短期记忆:超出条目上限时,按重要性评分淘汰低价值条目
- 长期记忆:永不淘汰,但会通过遗忘机制降低低价值记忆的检索权重
2.2 短期记忆的SQLite内存表实现
2.2.1 表结构与索引设计
短期记忆使用 SQLite 内存表,表结构设计兼顾查询效率和存储紧凑:
CREATE TABLE short_term_memory (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
type TEXT NOT NULL,
importance REAL DEFAULT 0.5,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
token_count INTEGER DEFAULT 0,
metadata TEXT,
session_id TEXT NOT NULL
);
-- 类型索引:按消息类型快速过滤
CREATE INDEX idx_type ON short_term_memory(type);
-- 重要性索引:按重要性排序淘汰
CREATE INDEX idx_importance ON short_term_memory(importance);
-- 时间索引:按时间范围查询
CREATE INDEX idx_timestamp ON short_term_memory(timestamp);
-- 会话索引:按会话ID隔离
CREATE INDEX idx_session ON short_term_memory(session_id);
2.2.2 写入流程与容量控制
async def write_with_eviction(self, entry: MemoryEntry):
"""带淘汰策略的写入"""
# 1. 写入新条目
self.conn.execute(
"""INSERT INTO short_term_memory
(content, type, importance, timestamp, token_count, metadata, session_id)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(entry.content, entry.type, entry.importance,
entry.timestamp, entry.token_count,
json.dumps(entry.metadata), entry.session_id)
)
# 2. 检查容量
total_tokens = self.conn.execute(
"SELECT SUM(token_count) FROM short_term_memory WHERE session_id = ?",
(entry.session_id,)
).fetchone()[0]
# 3. 超出容量时执行淘汰
if total_tokens > self.max_tokens_per_session:
# 计算需要释放的 Token 数
tokens_to_free = total_tokens - self.max_tokens_per_session
# 按重要性升序、时间升序选择淘汰候选
candidates = self.conn.execute(
"""SELECT id, token_count FROM short_term_memory
WHERE session_id = ? AND type != 'system'
ORDER BY importance ASC, timestamp ASC""",
(entry.session_id,)
).fetchall()
freed = 0
ids_to_delete = []
for row in candidates:
if freed >= tokens_to_free:
break
ids_to_delete.append(row[0])
freed += row[1]
# 执行淘汰
self.conn.execute(
f"DELETE FROM short_term_memory WHERE id IN ({','.join('?' * len(ids_to_delete))})",
ids_to_delete
)
2.2.3 检索策略:关键词+重要性混合排序
async def hybrid_recall(self, query: str, top_k: int = 5) -> List[MemoryEntry]:
"""混合检索:关键词匹配 + 重要性加权"""
results = self.conn.execute(
"""SELECT *, (
-- 关键词匹配度(0-1)
CASE WHEN content LIKE ? THEN 0.5 ELSE 0 END +
-- 重要性权重(0-1)
importance * 0.5 +
-- 时间衰减因子(越近越高)
MAX(0, 1 - (julianday('now') - julianday(timestamp)) * 24) * 0.2
) AS relevance_score
FROM short_term_memory
WHERE session_id = ?
ORDER BY relevance_score DESC
LIMIT ?""",
(f"%{query}%", self.session_id, top_k)
).fetchall()
return [self._row_to_entry(r) for r in results]
2.3 长期记忆的混合检索机制
2.3.1 向量检索与关键词检索的融合
OpenClaw 的长期记忆检索采用混合检索策略,融合向量语义检索和关键词精确检索的优势:
class HybridRetriever:
"""混合检索器:向量检索 + 关键词检索"""
def __init__(self, vector_store: VectorStore, keyword_index: KeywordIndex):
self.vector_store = vector_store
self.keyword_index = keyword_index
async def search(self, query: str, top_k: int = 5,
alpha: float = 0.7) -> List[SearchResult]:
"""
混合检索
alpha: 向量检索权重(0-1),1-alpha 为关键词检索权重
"""
# 1. 向量语义检索
vector_results = await self.vector_store.search(
query=query, top_k=top_k * 2 # 多召回一些,用于融合排序
)
# 2. 关键词精确检索
keyword_results = await self.keyword_index.search(
query=query, top_k=top_k * 2
)
# 3. 融合排序(Reciprocal Rank Fusion)
merged = self._reciprocal_rank_fusion(
vector_results, keyword_results, alpha=alpha
)
return merged[:top_k]
def _reciprocal_rank_fusion(self, vector_results, keyword_results,
alpha: float = 0.7, k: int = 60) -> List[SearchResult]:
"""RRF 融合排序"""
scores = {}
# 向量检索结果评分
for rank, result in enumerate(vector_results):
doc_id = result.id
scores[doc_id] = scores.get(doc_id, 0) + alpha / (k + rank + 1)
# 关键词检索结果评分
for rank, result in enumerate(keyword_results):
doc_id = result.id
scores[doc_id] = scores.get(doc_id, 0) + (1 - alpha) / (k + rank + 1)
# 按融合分数排序
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
# 构建结果列表
all_results = {r.id: r for r in vector_results + keyword_results}
return [all_results[doc_id] for doc_id in sorted_ids if doc_id in all_results]
2.3.2 2026.6.6版本优化:检索性能提升
2026.6.6 版本对混合检索进行了三项关键优化:
1. 向量索引优化:引入 HNSW 索引替代暴力搜索,检索延迟从 ~200ms 降至 ~15ms:
class OptimizedVectorStore:
"""优化后的向量存储,使用 HNSW 索引"""
def __init__(self, dimension: int = 1536, m: int = 16, ef_construction: int = 200):
self.index = hnswlib.Index(space='cosine', dim=dimension)
self.index.init_index(max_elements=100000, ef_construction=ef_construction, M=m)
self.index.set_ef(50) # 搜索时的 ef 值,越大越精确但越慢
async def search(self, query_embedding: List[float], top_k: int = 5) -> List[SearchResult]:
labels, distances = self.index.knn_query(query_embedding, k=top_k)
return [
SearchResult(id=str(label), score=1 - distance, document=self.doc_store[str(label)])
for label, distance in zip(labels[0], distances[0])
]
2. 查询缓存:对高频查询结果进行缓存,避免重复计算:
class QueryCache:
"""查询结果缓存"""
def __init__(self, max_size: int = 1000, ttl: int = 300):
self.cache = TTLCache(maxsize=max_size, ttl=ttl)
async def get_or_search(self, query: str, search_fn) -> List[SearchResult]:
cache_key = self._hash_query(query)
if cache_key in self.cache:
return self.cache[cache_key]
results = await search_fn(query)
self.cache[cache_key] = results
return results
3. 自适应 alpha 调节:根据查询类型自动调整向量检索和关键词检索的权重:
class AdaptiveAlpha:
"""自适应 alpha 调节"""
def adjust(self, query: str) -> float:
"""根据查询特征调整 alpha"""
# 精确查询(包含引号、特定术语)→ 降低 alpha,增加关键词权重
if '"' in query or any(term in query for term in self.exact_terms):
return 0.4
# 语义查询(自然语言描述)→ 提高 alpha,增加向量权重
if len(query.split()) > 5:
return 0.8
# 默认
return 0.7
2.4 记忆的重要性评估与遗忘机制
2.4.1 重要性评分算法
每条记忆在写入时都会计算一个重要性评分(0-1),该评分决定了记忆的保留优先级和检索权重:
class ImportanceScorer:
"""记忆重要性评分器"""
def score(self, entry: MemoryEntry) -> float:
"""计算综合重要性评分"""
# 1. 内容类型基础分
type_score = self._type_base_score(entry.type)
# 2. 信息熵评分(信息量越大越重要)
entropy_score = self._entropy_score(entry.content)
# 3. 工具执行结果评分(成功结果比失败更重要)
execution_score = self._execution_score(entry)
# 4. 用户反馈评分(用户明确表示有用的信息)
feedback_score = self._feedback_score(entry)
# 加权综合评分
importance = (
0.2 * type_score +
0.3 * entropy_score +
0.3 * execution_score +
0.2 * feedback_score
)
return min(max(importance, 0.0), 1.0) # 裁剪到 [0, 1]
def _type_base_score(self, entry_type: str) -> float:
"""内容类型基础分"""
type_scores = {
"system": 1.0, # 系统消息最重要
"user_input": 0.8, # 用户输入次之
"tool_result": 0.6, # 工具结果
"reasoning": 0.4, # 推理过程
"observation": 0.3, # 观察记录
}
return type_scores.get(entry_type, 0.5)
def _entropy_score(self, content: str) -> float:
"""信息熵评分:内容越独特越重要"""
# 计算字符级信息熵
from collections import Counter
freq = Counter(content)
total = len(content)
entropy = -sum((c / total) * math.log2(c / total) for c in freq.values())
# 归一化到 [0, 1]
return min(entropy / 8.0, 1.0) # 最大熵约 8 bits/char
2.4.2 时间衰减函数
记忆的重要性会随时间衰减,模拟人类遗忘曲线:
class TimeDecay:
"""时间衰减函数,基于艾宾浩斯遗忘曲线"""
def __init__(self, half_life_hours: float = 24.0):
"""
half_life_hours: 半衰期(小时),即重要性减半所需时间
- 短期记忆半衰期: 1小时
- 长期记忆半衰期: 720小时(30天)
"""
self.half_life_hours = half_life_hours
def decay(self, initial_importance: float, hours_elapsed: float) -> float:
"""计算衰减后的重要性"""
# 指数衰减: I(t) = I_0 * 2^(-t / T_half)
decayed = initial_importance * (2 ** (-hours_elapsed / self.half_life_hours))
return max(decayed, 0.01) # 最低保留 1%,不完全遗忘
def boost(self, current_importance: float, access_count: int) -> float:
"""访问增强:每次被检索到时提升重要性"""
# 间隔重复效应:每次访问使重要性恢复到初始值的 (1 - 1/2^n)
boost_factor = 1 - (1 / (2 ** access_count))
return min(current_importance + boost_factor * (1 - current_importance), 1.0)
2.4.3 记忆归档:从短期到长期的迁移
class MemoryArchiver:
"""记忆归档器:将高价值短期记忆迁移到长期记忆"""
def __init__(self, short_term: ShortTermMemory, long_term: LongTermMemory,
archive_threshold: float = 0.7):
self.short_term = short_term
self.long_term = long_term
self.archive_threshold = archive_threshold
async def archive_session(self, session_id: str):
"""会话结束时归档高价值记忆"""
# 1. 查询该会话的所有短期记忆
entries = self.short_term.get_by_session(session_id)
# 2. 筛选高价值条目
for entry in entries:
if entry.importance >= self.archive_threshold:
# 3. 去重检查:是否与已有长期记忆高度相似
similar = await self.long_term.recall(
query=entry.content, top_k=1, min_relevance=0.95
)
if not similar:
# 新知识,归档到长期记忆
await self.long_term.store(entry)
else:
# 已有相似记忆,合并更新
await self._merge_memory(similar[0], entry)
# 4. 清理已归档的短期记忆
self.short_term.clear_session(session_id)
async def _merge_memory(self, existing: MemoryEntry, new: MemoryEntry):
"""合并相似记忆"""
merged = MemoryEntry(
content=f"{existing.content}\n[补充] {new.content}",
type=existing.type,
importance=max(existing.importance, new.importance),
timestamp=new.timestamp, # 使用更新的时间戳
session_id=existing.session_id,
metadata={**existing.metadata, "merged_from": new.session_id}
)
await self.long_term.update(existing.id, merged)
2.5 ContextEngine上下文引擎详解
2.5.1 上下文组装策略
ContextEngine 是连接记忆系统与 Agent 运行时的桥梁,它控制每次 LLM 调用的上下文组装:
class ContextEngine:
"""上下文引擎:控制 LLM 调用的上下文组装"""
async def assemble(self, state: AgentState, budget: TokenBudget) -> AssembledContext:
"""组装 LLM 调用上下文"""
remaining_budget = budget.total
# [优先级1] 系统提示词(不可截断)
system_prompt = state.soul.system_prompt
remaining_budget -= count_tokens(system_prompt)
# [优先级2] 工具定义(不可截断)
tool_definitions = self._format_tool_definitions(state.available_tools)
remaining_budget -= count_tokens(tool_definitions)
# [优先级3] 记忆召回(可截断)
memories = await self._recall_memories(state.current_input, remaining_budget // 3)
memory_text = self._format_memories(memories)
remaining_budget -= count_tokens(memory_text)
# [优先级4] 历史交互(可截断、可摘要)
history = await self._get_managed_history(state, remaining_budget)
# [优先级5] 当前输入(不可截断)
current_input = state.current_input
return AssembledContext(
system_prompt=system_prompt,
tool_definitions=tool_definitions,
memories=memory_text,
history=history,
current_input=current_input
)
2.5.2 历史消息的摘要压缩算法
当历史消息超出 Token 预算时,ContextEngine 会对旧消息进行摘要压缩:
class HistoryCompressor:
"""历史消息摘要压缩器"""
async def compress(self, messages: List[Message],
target_tokens: int) -> List[Message]:
"""将历史消息压缩到目标 Token 数"""
total_tokens = sum(m.token_count for m in messages)
if total_tokens <= target_tokens:
return messages
# 策略1:保留最近的消息,对旧消息分组摘要
recent_count = len(messages) // 3 # 保留最近 1/3
recent = messages[-recent_count:]
old = messages[:-recent_count]
# 对旧消息按轮次分组摘要
summarized = []
group_size = 5 # 每5条消息为一组
for i in range(0, len(old), group_size):
group = old[i:i + group_size]
summary = await self._summarize_group(group)
summarized.append(summary)
result = summarized + recent
# 如果仍然超预算,递归压缩
if sum(m.token_count for m in result) > target_tokens:
return await self.compress(result, target_tokens)
return result
async def _summarize_group(self, messages: List[Message]) -> Message:
"""对一组消息生成摘要"""
combined = "\n".join(f"[{m.role}] {m.content[:200]}" for m in messages)
summary_text = await self.llm_provider.chat(
system="请将以下对话历史压缩为简洁的摘要,保留关键信息和决策点",
input=combined,
max_tokens=100
)
return Message(
role="system",
content=f"[历史摘要] {summary_text}",
token_count=count_tokens(summary_text) + 10
)
2.5.3 2026.6.6版本重构:可插拔引擎机制
2026.6.6 版本对 ContextEngine 进行了重构,引入了可插拔引擎机制,允许用户根据场景选择不同的上下文管理策略:
class ContextEngineFactory:
"""上下文引擎工厂"""
@staticmethod
def create(policy: ContextPolicy) -> ContextEngine:
engines = {
"sliding_window": SlidingWindowEngine, # 滑动窗口:保留最近N条消息
"summary_compression": SummaryEngine, # 摘要压缩:对旧消息摘要
"relevance_ranking": RelevanceEngine, # 相关性排序:按相关性选择消息
"hybrid": HybridContextEngine, # 混合策略:综合以上策略
}
engine_class = engines.get(policy.type, HybridContextEngine)
return engine_class(**policy.params)
class HybridContextEngine(ContextEngine):
"""混合上下文引擎:综合多种策略"""
async def assemble(self, state: AgentState, budget: TokenBudget) -> AssembledContext:
# 1. 始终保留最近3轮对话
recent_messages = state.messages[-6:] # 3轮 = 6条消息(用户+助手)
# 2. 对更早的消息按相关性检索
older_messages = state.messages[:-6]
relevant_older = await self._rank_by_relevance(
older_messages, state.current_input, budget.available_for_history // 2
)
# 3. 不相关的旧消息生成摘要
irrelevant_older = [m for m in older_messages if m not in relevant_older]
summary = await self._summarize(irrelevant_older)
# 4. 组装最终上下文
return AssembledContext(
recent_messages=recent_messages,
relevant_older=relevant_older,
summary=summary
)
第三部分 运行时与记忆的协同闭环
3.1 完整请求处理流程追踪
让我们追踪一个完整请求从进入到响应的全过程,观察运行时与记忆系统的协同:
用户: "帮我分析一下最近一周的销售数据趋势"
Step 1: Gateway 接收与路由
HTTP Request → Gateway → 认证 → 限流检查 → 路由到 Agent 运行时
Step 2: Agent 运行时初始化上下文
# 2.1 从长期记忆召回相关知识
memories = await long_term.recall("销售数据分析", top_k=5)
# 返回: ["Q3销售报告模板", "数据源配置信息", "上次分析结论"]
# 2.2 从短期记忆加载会话历史
session_history = await short_term.get_by_session(session_id)
# 2.3 ContextEngine 组装上下文
context = await context_engine.assemble(
system_prompt=soul.system_prompt,
memories=memories,
history=session_history,
current_input="帮我分析一下最近一周的销售数据趋势"
)
Step 3: ReAct 循环执行
[Think] LLM 分析用户意图 → 需要获取销售数据 + 生成趋势图
[Parse] 工具调用: database_query(sql="SELECT ... FROM sales WHERE date > ...")
[Act] 执行数据库查询 → 返回销售数据
[Observe] 数据写入短期记忆 → 继续循环
[Think] LLM 分析数据 → 需要生成可视化图表
[Parse] 工具调用: chart_generate(data=sales_data, type="line")
[Act] 执行图表生成 → 返回图表URL
[Observe] 结果写入短期记忆 → 继续循环
[Think] LLM 综合分析 → 生成趋势分析报告
[Parse] 无工具调用 → end_turn
Step 4: 记忆归档
# 会话结束后归档
await archiver.archive_session(session_id)
# 高价值记忆(如分析结论、数据源配置)迁移到长期记忆
3.2 记忆驱动的行为优化
记忆系统不仅存储信息,还主动驱动 Agent 的行为优化:
3.2.1 基于历史成功率的工具偏好
class MemoryDrivenToolSelector:
"""基于记忆的工具选择器"""
async def select(self, task: str, candidates: List[Tool]) -> Tool:
# 从长期记忆中检索该任务类型的历史工具使用记录
history = await self.long_term.recall(
query=f"工具使用记录 {task}",
top_k=10
)
# 统计各工具的历史成功率
success_rates = {}
for record in history:
tool_name = record.metadata.get("tool_name")
success = record.metadata.get("success", False)
if tool_name not in success_rates:
success_rates[tool_name] = {"success": 0, "total": 0}
success_rates[tool_name]["total"] += 1
if success:
success_rates[tool_name]["success"] += 1
# 结合历史成功率和当前相关性选择工具
scored = []
for tool in candidates:
rate = success_rates.get(tool.name, {"success": 0, "total": 0})
historical_rate = rate["success"] / max(rate["total"], 1)
relevance = self._compute_relevance(tool, task)
score = 0.6 * relevance + 0.4 * historical_rate
scored.append((tool, score))
return max(scored, key=lambda x: x[1])[0]
3.2.2 基于用户偏见的Prompt微调
class UserPreferenceAdapter:
"""用户偏好适配器"""
async def adapt_prompt(self, base_prompt: str, user_id: str) -> str:
# 从长期记忆检索用户偏好
preferences = await self.long_term.recall(
query=f"用户偏好 {user_id}",
top_k=5
)
# 提取偏好规则
preference_rules = []
for pref in preferences:
if pref.type == "user_preference":
preference_rules.append(pref.content)
if preference_rules:
# 将偏好规则注入系统提示词
preference_block = "\n".join(f"- {rule}" for rule in preference_rules)
adapted = f"{base_prompt}\n\n## 用户偏好\n{preference_block}"
return adapted
return base_prompt
3.3 多Agent间的记忆共享与隔离
3.3.1 共享记忆空间
Master Agent 可以创建共享记忆空间,让多个子 Agent 协作时共享关键信息:
class SharedMemorySpace:
"""共享记忆空间"""
def __init__(self, space_id: str, participants: List[str]):
self.space_id = space_id
self.participants = set(participants)
self.shared_store = {} # 共享键值存储
self.event_bus = EventBus() # 事件总线
async def write(self, agent_id: str, key: str, value: Any):
"""写入共享记忆"""
if agent_id not in self.participants:
raise PermissionError(f"Agent {agent_id} 不在共享空间中")
self.shared_store[key] = {
"value": value,
"written_by": agent_id,
"timestamp": datetime.now()
}
# 通知其他参与者
await self.event_bus.publish(
topic=f"shared_memory:{self.space_id}",
event={"key": key, "action": "write", "by": agent_id}
)
async def read(self, agent_id: str, key: str) -> Any:
"""读取共享记忆"""
if agent_id not in self.participants:
raise PermissionError(f"Agent {agent_id} 不在共享空间中")
return self.shared_store.get(key, {}).get("value")
3.3.2 记忆隔离边界
每个 Agent 实例的记忆默认是隔离的,只有通过共享记忆空间才能跨 Agent 访问:
class MemoryIsolationGuard:
"""记忆隔离守卫"""
def check_access(self, agent_id: str, memory_entry: MemoryEntry) -> bool:
"""检查 Agent 是否有权访问某条记忆"""
# 规则1:Agent 只能访问自己会话的记忆
if memory_entry.session_id in self.agent_sessions[agent_id]:
return True
# 规则2:Agent 可以访问共享记忆空间中的记忆
if memory_entry.metadata.get("shared_space") in self.agent_shared_spaces[agent_id]:
return True
# 规则3:Master Agent 可以访问所有子 Agent 的记忆
if agent_id in self.master_agents:
if memory_entry.session_id in self.get_sub_agent_sessions(agent_id):
return True
return False
第四部分 2026.6.6版本更新与最佳实践
4.1 关键更新总结
| 模块 | 更新内容 | 影响 |
|---|---|---|
| ContextEngine | 可插拔引擎机制 | 支持自定义上下文管理策略 |
| 混合检索 | HNSW索引 + 查询缓存 + 自适应alpha | 检索延迟降低 92% |
| Agent池化 | 实例复用 + 懒加载 | 冷启动时间降低 80% |
| 子Agent调度 | 共享记忆空间 + ACP协议增强 | 多Agent协作更高效 |
| 记忆归档 | 增量合并 + 去重优化 | 长期记忆膨胀率降低 60% |
4.2 生产部署最佳实践
4.2.1 记忆系统配置建议
# production-memory-config.yaml
memory:
short_term:
max_entries: 500
max_tokens_per_session: 32000
eviction_policy: "importance_weighted" # 按重要性加权淘汰
long_term:
vector_store: "milvus" # 生产环境推荐 Milvus
embedding_model: "bge-large-zh" # 中文场景推荐
dimension: 1024
index_type: "HNSW"
hnsw_params:
m: 16
ef_construction: 200
hybrid_retrieval:
alpha: 0.7 # 向量检索权重
enable_cache: true
cache_ttl: 300 # 缓存5分钟
adaptive_alpha: true # 自适应alpha
archiver:
archive_threshold: 0.7 # 重要性 >= 0.7 才归档
dedup_similarity: 0.95 # 相似度 >= 0.95 视为重复
schedule: "session_end" # 会话结束时归档
4.2.2 Agent运行时配置建议
# production-agent-config.yaml
agent:
pool_size: 10 # Agent池大小
max_iterations: 25 # ReAct最大迭代次数
llm_timeout: 120 # LLM调用超时(秒)
context_engine:
type: "hybrid" # 混合上下文引擎
recent_window: 6 # 保留最近6条消息
summary_model: "gpt-4o-mini" # 摘要用轻量模型
retry:
max_retries: 3
base_delay: 1 # 退避基础延迟(秒)
circuit_breaker:
failure_threshold: 5
recovery_timeout: 60
model_fallback:
- provider: "openai"
model: "gpt-4o"
- provider: "anthropic"
model: "claude-3.5-sonnet"
- provider: "openai"
model: "gpt-4o-mini"
4.2.3 监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
react_iterations |
单次请求的 ReAct 迭代次数 | > 20 |
memory_recall_latency |
记忆检索延迟 | > 100ms |
context_assembly_time |
上下文组装耗时 | > 500ms |
short_term_eviction_rate |
短期记忆淘汰率 | > 30% |
long_term_growth_rate |
长期记忆增长速率 | > 1MB/hour |
tool_success_rate |
工具调用成功率 | < 90% |
agent_pool_utilization |
Agent池利用率 | > 80% |
4.3 常见问题与调优建议
Q1: Agent 经常达到最大迭代次数怎么办?
调优建议:
- 检查 Soul 配置中的系统提示词是否足够清晰,避免 LLM 反复尝试
- 减少单次请求涉及的工具数量,拆分为多轮对话
- 调整
max_iterations上限(但需注意成本控制) - 启用"提前终止"策略:当连续2轮迭代无新进展时自动终止
Q2: 长期记忆检索结果不相关怎么办?
调优建议:
- 检查 Embedding 模型是否适合你的领域(通用模型 vs 领域微调模型)
- 调整
min_relevance阈值(默认 0.7,可提高到 0.8) - 调整混合检索的
alpha值(增加关键词检索权重) - 检查记忆归档时的重要性评分是否合理
Q3: 上下文窗口经常溢出怎么办?
调优建议:
- 切换到
hybrid上下文引擎,自动管理历史消息 - 降低
recent_window大小 - 启用更激进的历史摘要压缩
- 检查是否有工具返回了过大的结果,考虑截断
总结
OpenClaw 的 Agent 运行时与记忆系统构成了一个精密的协同闭环:
-
Agent 运行时是"大脑"——通过 ReAct 循环实现 Think-Parse-Act-Observe 的自主决策闭环,通过任务图编排实现复杂任务的并行调度,通过 Master-Slave 模式实现多 Agent 协同,通过分级重试和熔断机制保障系统韧性。
-
Memory 记忆系统是"记忆"——通过三层记忆模型实现信息的分层存储和流转,通过混合检索机制实现语义+关键词的精准召回,通过重要性评估和遗忘机制实现记忆的自适应管理,通过 ContextEngine 实现上下文的智能组装。
-
运行时与记忆的协同是"智慧"——记忆驱动工具选择偏好、用户偏好适配、行为模式优化;运行时驱动记忆的写入、归档和检索。两者形成正向飞轮:越用越聪明,越用越精准。
2026.6.6 版本的关键更新——可插拔 ContextEngine、HNSW 混合检索、Agent 池化复用——让这个飞轮转得更快、更稳。理解这些内部实现逻辑,是充分发挥 OpenClaw 能力的基础,也是进行二次开发和创新的前提。
评论区