侧边栏壁纸
  • 累计撰写 62 篇文章
  • 累计创建 15 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

OpenClaw 核心原理深度解析:Agent运行时与记忆系统内部实现

温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

OpenClaw 核心原理深度解析:Agent运行时与记忆系统内部实现

前言

在 AI Agent 框架百花齐放的 2026 年,OpenClaw 凭借其独特的五层架构设计——Gateway 网关层、Agent 运行时层、Skill 技能层、Memory 记忆层、Plugin 插件层——在开源社区中脱颖而出。其中,Agent 运行时Memory 记忆系统是整个架构的脊梁:前者决定了 Agent 如何思考、决策和执行,后者决定了 Agent 能否跨越会话边界积累和运用知识。

2026.6.6 版本对这两大核心模块进行了关键更新:ContextEngine 上下文引擎重构、混合检索机制优化、子 Agent 调度能力增强。这些改进不仅提升了系统性能,更从根本上改变了 Agent 的行为模式。

本文将从执行流程、数据结构、核心算法三个维度,深入拆解 Agent 运行时与记忆系统的内部实现逻辑。无论你是想理解 OpenClaw 的设计哲学,还是希望基于 OpenClaw 进行二次开发,这篇文章都将为你提供系统性的技术参考。


第一部分 Agent智能体运行时原理与内部实现

1.1 Agent运行时的架构定位与核心职责

1.1.1 在「Gateway - Agent - Skill」三层链路中的承上启下作用

OpenClaw 的请求处理遵循清晰的三层链路:

用户请求 → Gateway网关层 → Agent运行时层 → Skill技能层
                ↑               ↑               ↑
           协议适配&路由    思考决策&调度     工具执行&返回

Gateway 层负责协议适配(HTTP/WebSocket/Webhook)、身份认证、请求路由和限流熔断,将外部请求标准化后转发给 Agent 层。

Agent 运行时层是整个系统的"大脑",接收 Gateway 转发的标准化请求后,负责思考决策、任务拆解、工具调度和状态维护。它向上对接 Gateway 的请求上下文,向下调度 Skill 层的具体工具执行。

Skill 层是"手脚",提供 52+ 内置技能(以 .md 文件形式定义),每个 Skill 封装了特定领域能力(如代码执行、Web 搜索、文件操作等),由 Agent 运行时按需加载和调用。

Agent 运行时在这三层中的核心价值在于:它不是简单的请求转发器,而是一个具备自主决策能力的智能调度中心。它需要理解用户意图、规划执行路径、选择合适工具、处理异常情况,并在整个过程中维护一致的上下文状态。

1.1.2 核心职责边界

Agent 运行时的职责可以精确划分为四个维度:

职责 说明 关键机制
任务调度 接收请求、拆解子任务、编排执行顺序 任务图(DAG)构建、依赖分析
思考决策 调用 LLM 生成决策、解析工具调用指令 ReAct 循环、Prompt 动态组装
执行管控 调度工具执行、监控进度、处理异常 同步/异步调度、熔断重试
状态维护 管理会话上下文、维护执行状态 状态机、ContextEngine

这四个维度形成了一个闭环:任务调度触发思考决策,思考决策驱动执行管控,执行管控更新状态维护,状态维护反过来影响下一轮任务调度。

1.1.3 关键依赖模块

Agent 运行时的核心依赖有三个:

  • ContextEngine 上下文引擎:控制每次 LLM 调用的上下文组装——哪些消息纳入、旧历史如何摘要压缩、跨子 Agent 上下文如何传递。2026.6.6 版本对其进行了重构,引入了可插拔引擎机制。
  • NodeManager 节点管理器:管理本地和远程执行节点的注册、健康检查和负载均衡,决定工具调用路由到哪个节点执行。
  • SkillLoader 技能加载器:负责 Skill 的发现、加载、缓存和生命周期管理,支持懒加载和热更新。

1.2 Agent完整生命周期管理

Agent 实例从创建到销毁,经历五个核心状态:

创建(Instantiate) → 就绪(Ready) → 运行(Running) ⇄ 挂起(Suspended) → 销毁(Destroyed)

1.2.1 实例创建:Soul配置加载与运行时初始化

每个 Agent 实例由一个 Soul 配置文件(SOUL.md)定义,它描述了 Agent 的身份、行为规则和约束。创建流程如下:

# 伪代码:Agent 实例创建流程
class AgentFactory:
    async def create(self, soul_config: SoulConfig) -> Agent:
        # 1. 解析 Soul 配置
        soul = SoulParser.parse(soul_config)

        # 2. 初始化运行时组件
        context_engine = ContextEngineFactory.create(soul.context_policy)
        node_manager = NodeManager(soul.node_config)
        skill_loader = SkillLoader(soul.skill_paths)

        # 3. 构建 Agent 实例
        agent = Agent(
            soul=soul,
            context_engine=context_engine,
            node_manager=node_manager,
            skill_loader=skill_loader,
            state=AgentState.INSTANTIATED
        )

        # 4. 预加载必要资源
        await agent.warmup()

        return agent

关键步骤包括:Soul 配置解析(提取系统提示词、工具白名单、约束规则)、ContextEngine 初始化(根据配置选择上下文管理策略)、SkillLoader 预扫描(发现可用技能但不加载实现)。

1.2.2 就绪状态:资源预加载与会话绑定

Agent 进入就绪状态后,会完成两项关键准备:

  1. 资源预加载:加载 Soul 中标记为 preload 的技能、预热 LLM 连接池、初始化记忆检索索引
  2. 会话绑定:将 Agent 实例与特定用户会话关联,建立会话级的上下文隔离边界
async def warmup(self):
    """资源预加载"""
    # 预加载标记为 preload 的技能
    for skill_name in self.soul.preload_skills:
        await self.skill_loader.load(skill_name)

    # 预热 LLM Provider 连接
    await self.llm_provider.warmup()

    # 初始化记忆检索索引
    await self.memory_engine.ensure_index()

1.2.3 运行状态:进入主循环与任务接收处理

Agent 进入运行状态后,核心是启动 AgentLoop 主循环,持续监听并处理来自 Gateway 的请求:

async def run(self, initial_input: str) -> LoopResult:
    """Agent 主循环入口"""
    state = await self.state_manager.initialize(initial_input)
    iteration = 0

    while iteration < self.max_iterations:
        iteration += 1

        # 感知阶段:接收当前状态
        perception = await self._perceive(state)

        # 决策阶段:调用 LLM 生成决策
        decision = await self._decide(perception)

        # 执行阶段:调度工具执行
        result = await self._act(decision)

        # 反思阶段:更新状态,判断是否继续
        state = await self._reflect(result)

        if state.should_stop:
            break

    return LoopResult(state=state, iteration=iteration)

1.2.4 挂起/恢复:上下文持久化与续跑

在长时间运行的任务中,Agent 可能需要挂起(等待外部事件、用户确认、异步操作完成)。挂起时的关键操作是上下文持久化

async def suspend(self, reason: SuspendReason) -> Snapshot:
    """挂起 Agent,保存完整上下文快照"""
    snapshot = Snapshot(
        agent_id=self.id,
        state=self.state,
        context=self.context_engine.export(),
        memory_cursor=self.memory_engine.get_cursor(),
        pending_tasks=self.task_queue.dump(),
        reason=reason,
        timestamp=datetime.now()
    )
    await self.snapshot_store.save(snapshot)
    self.state = AgentState.SUSPENDED
    return snapshot

async def resume(self, snapshot_id: str) -> None:
    """从快照恢复 Agent"""
    snapshot = await self.snapshot_store.load(snapshot_id)
    self.state = snapshot.state
    await self.context_engine.restore(snapshot.context)
    await self.memory_engine.seek(snapshot.memory_cursor)
    self.task_queue.load(snapshot.pending_tasks)
    self.state = AgentState.RUNNING

1.2.5 销毁流程:资源回收、记忆归档与会话清理

Agent 销毁时需要完成三项清理工作:

  1. 资源回收:释放 LLM 连接、关闭文件句柄、清理临时数据
  2. 记忆归档:将短期记忆中的有价值内容提取并归档到长期记忆
  3. 会话清理:解除会话绑定、清理会话级缓存、通知 Gateway 释放路由条目

1.2.6 2026.6.6版本优化:Agent池化复用与懒加载机制

2026.6.6 版本引入了两个重要的性能优化:

Agent 池化复用:类似数据库连接池,预创建一批 Agent 实例放入池中,请求到来时直接从池中获取,避免每次请求都走完整的创建流程:

class AgentPool:
    def __init__(self, soul_config: SoulConfig, pool_size: int = 10):
        self.pool = asyncio.Queue(maxsize=pool_size)
        self.soul_config = soul_config

    async def acquire(self) -> Agent:
        if self.pool.empty():
            agent = await AgentFactory.create(self.soul_config)
        else:
            agent = await self.pool.get()
            await agent.reset()  # 重置状态但保留预加载资源
        return agent

    async def release(self, agent: Agent):
        await agent.archive_memory()  # 归档记忆
        await self.pool.put(agent)

懒加载机制:Skill 不再在 Agent 创建时全部加载,而是在首次被调用时才加载实现代码,大幅减少冷启动时间。


1.3 ReAct思考循环:核心执行引擎源码级拆解

ReAct(Reasoning + Acting)循环是 Agent 运行时的心脏。OpenClaw 采用双层循环架构:外层循环处理用户 follow-up,内层循环处理工具调用链。

1.3.1 主循环入口与状态机设计

AgentLoop 控制器的核心数据结构

@dataclass
class AgentLoop:
    soul: Soul                    # Agent 身份配置
    context_engine: ContextEngine # 上下文引擎
    llm_provider: LLMProvider     # LLM 调用封装
    tool_registry: ToolRegistry   # 工具注册表
    state_manager: StateManager   # 状态管理器
    max_iterations: int = 25      # 最大迭代次数
    stop_conditions: List[StopCondition] = field(default_factory=list)

循环状态机:Agent 在每次迭代中经历五个状态的流转:

idle → thinking → acting → observing → (idle | error)
  ↑                                    |
  └────────────────────────────────────┘
状态 触发条件 核心动作
idle 初始状态 / 上一轮观察完成 等待新输入或进入下一轮迭代
thinking 进入新迭代 组装 Prompt → 调用 LLM → 获取决策
acting LLM 返回工具调用指令 解析指令 → 路由执行 → 获取结果
observing 工具执行完成 结果封装 → 上下文更新 → 判断是否继续
error 异常发生 分级重试 / 熔断 / 降级

双层循环的完整结构

# 外层循环:处理用户 follow-up
while True:
    # 内层循环:处理工具调用链
    while has_more_tool_calls or has_pending_messages:
        # [1] 注入待处理的 steering/follow-up 消息
        inject_pending_messages(context)

        # [2] 流式调用 LLM
        response = await stream_assistant_response(context)

        # [3] 检查停止原因
        if response.stop_reason == "tool_call":
            # [4] 解析并执行工具调用
            tool_results = await execute_tool_calls(response.tool_calls)
            # [5] 将结果追加到上下文
            context.append(tool_results)
        elif response.stop_reason == "end_turn":
            break  # 内层循环结束

    # 检查是否有 follow-up
    if not has_follow_up:
        break  # 外层循环结束

1.3.2 Think阶段:决策生成的完整链路

Think 阶段是 ReAct 循环中最复杂的环节,涉及动态 Prompt 组装、模型调用和 Token 预算管理。

动态 Prompt 组装

每次 LLM 调用前,ContextEngine 会按以下优先级拼接上下文:

系统提示词(SOUL.md) + 记忆召回结果 + 工具清单 + 历史交互 + 当前输入
async def assemble_prompt(self, state: AgentState) -> PromptPacket:
    """动态 Prompt 组装"""
    # 1. 系统提示词(最高优先级,不可截断)
    system_prompt = self.soul.system_prompt

    # 2. 记忆召回(按相关性排序)
    memories = await self.memory_engine.recall(
        query=state.current_input,
        top_k=5,
        min_relevance=0.7
    )

    # 3. 工具清单(仅列出当前可用的工具)
    available_tools = self.tool_registry.get_available_tools(
        permissions=state.permissions
    )

    # 4. 历史交互(受 Token 预算控制)
    history = self.context_engine.get_history(
        max_tokens=self.token_budget - reserved_tokens
    )

    # 5. 当前输入
    current_input = state.current_input

    return PromptPacket(
        system=system_prompt,
        memories=memories,
        tools=available_tools,
        history=history,
        input=current_input
    )

模型调用封装

async def call_llm(self, prompt: PromptPacket) -> LLMResponse:
    """模型调用封装,支持流式响应和超时控制"""
    try:
        response = await asyncio.wait_for(
            self.llm_provider.chat(
                messages=prompt.to_messages(),
                tools=prompt.to_tool_definitions(),
                stream=True
            ),
            timeout=self.config.llm_timeout  # 默认 120s
        )
        return response
    except asyncio.TimeoutError:
        raise LLMTimeoutError(f"LLM 调用超时 ({self.config.llm_timeout}s)")

Token 预算管理

Token 预算是成本控制和上下文窗口管理的关键机制:

class TokenBudget:
    def __init__(self, total: int = 128000):
        self.total = total
        self.reserved_for_system = 2000    # 系统提示词预留
        self.reserved_for_tools = 1000     # 工具定义预留
        self.reserved_for_output = 4096    # 输出预留

    @property
    def available_for_history(self) -> int:
        return (self.total
                - self.reserved_for_system
                - self.reserved_for_tools
                - self.reserved_for_output)

    def truncate_history(self, messages: List[Message]) -> List[Message]:
        """按 Token 预算截断历史消息"""
        budget = self.available_for_history
        result = []
        for msg in reversed(messages):  # 从最新消息开始保留
            if msg.token_count <= budget:
                result.insert(0, msg)
                budget -= msg.token_count
            else:
                # 对超长消息进行摘要压缩
                summary = self.summarize(msg)
                if summary.token_count <= budget:
                    result.insert(0, summary)
                    budget -= summary.token_count
                break
        return result

1.3.3 Parse阶段:工具指令的解析与校验

LLM 返回的响应需要经过严格的解析和校验才能进入执行阶段。

结构化输出解析

def parse_tool_calls(self, response: LLMResponse) -> List[ToolCall]:
    """解析 LLM 响应中的工具调用指令"""
    tool_calls = []

    for block in response.content_blocks:
        if block.type != "tool_use":
            continue

        try:
            # JSON 格式校验与参数提取
            params = json.loads(block.input)
            tool_call = ToolCall(
                id=block.id,
                name=block.name,
                params=params
            )
            tool_calls.append(tool_call)
        except json.JSONDecodeError as e:
            # 格式错误时的多轮修正
            corrected = self._attempt_correction(block.input, e)
            if corrected:
                tool_calls.append(corrected)

    return tool_calls

工具合法性校验

def validate_tool_call(self, call: ToolCall, permissions: Permissions) -> ValidationResult:
    """校验工具调用的合法性"""
    # 1. 工具是否存在
    tool = self.tool_registry.get(call.name)
    if not tool:
        return ValidationResult.error(f"未知工具: {call.name}")

    # 2. 权限校验
    if not permissions.has_permission(tool.required_permission):
        return ValidationResult.error(f"权限不足: {tool.required_permission}")

    # 3. 参数 Schema 匹配
    try:
        tool.param_schema.validate(call.params)
    except SchemaValidationError as e:
        return ValidationResult.error(f"参数校验失败: {e}")

    return ValidationResult.ok()

异常解析兜底:当 LLM 输出的工具调用格式不正确时,OpenClaw 会尝试多轮修正:

  1. 第一轮:尝试 JSON 修复(补全括号、修复引号)
  2. 第二轮:提取关键参数,用默认值填充缺失字段
  3. 第三轮:将原始输出回注 LLM,请求重新生成

1.3.4 Act阶段:工具执行的调度与管控

本地/远程节点的路由决策

async def route_tool_call(self, call: ToolCall) -> Node:
    """工具调用路由决策"""
    tool = self.tool_registry.get(call.name)

    if tool.execution_mode == "local":
        return self.node_manager.get_local_node()

    # 远程节点选择:基于负载和亲和性
    candidates = self.node_manager.get_remote_nodes(
        capability=tool.required_capability
    )

    # 加权评分:负载(40%) + 延迟(30%) + 亲和性(30%)
    scored = []
    for node in candidates:
        score = (0.4 * (1 - node.load_factor) +
                 0.3 * (1 - node.latency / MAX_LATENCY) +
                 0.3 * node.affinity_score(call.session_id))
        scored.append((node, score))

    return max(scored, key=lambda x: x[1])[0]

同步/异步执行模式

async def execute_tool(self, call: ToolCall, node: Node) -> ToolResult:
    """工具执行,支持同步和异步模式"""
    tool = self.tool_registry.get(call.name)

    if tool.execution_mode == "sync":
        # 同步执行:等待结果返回
        result = await node.execute(call, timeout=tool.timeout)
        return result

    elif tool.execution_mode == "async":
        # 异步执行:返回任务ID,后续轮询结果
        task_id = await node.submit(call)

        # 注册进度回调
        await node.on_progress(task_id, self._handle_progress)

        # 等待完成(带超时)
        result = await node.wait_for_result(task_id, timeout=tool.timeout)
        return result

执行过程监督:支持进度上报和中断响应:

async def _handle_progress(self, task_id: str, progress: Progress):
    """处理工具执行进度"""
    # 更新状态中的进度信息
    self.state.update_task_progress(task_id, progress)

    # 如果用户发送了中断信号
    if self.state.has_interrupt_signal():
        await self.node_manager.cancel_task(task_id)
        raise TaskInterruptedError(f"任务被用户中断: {task_id}")

1.3.5 Observe阶段:结果反馈与上下文更新

Observe 阶段是 ReAct 循环的收尾,也是连接下一轮迭代的桥梁。

执行结果的标准化封装

def standardize_result(self, raw_result: Any, tool: Tool) -> ToolResult:
    """将工具执行结果标准化封装"""
    if isinstance(raw_result, ToolResult):
        return raw_result

    return ToolResult(
        tool_name=tool.name,
        status="success" if raw_result else "empty",
        output=str(raw_result),
        metadata={
            "execution_time": raw_result.execution_time if hasattr(raw_result, 'execution_time') else None,
            "token_count": count_tokens(str(raw_result)),
        }
    )

错误结果的语义化转换:将技术性错误信息转换为 LLM 可理解的语义描述:

def semantic_error_transform(self, error: Exception) -> str:
    """将技术错误转换为语义化描述"""
    error_map = {
        TimeoutError: "工具执行超时,请考虑简化任务或增加超时时间",
        PermissionError: "权限不足,当前会话无权执行此操作",
        ConnectionError: "远程节点连接失败,请稍后重试或切换到本地执行",
        SchemaValidationError: "工具参数格式不正确,请检查参数类型和必填项",
    }

    for error_type, message in error_map.items():
        if isinstance(error, error_type):
            return message

    return f"工具执行失败: {str(error)}"

短期记忆的同步写入:每次工具执行结果都会同步写入短期记忆:

async def sync_to_short_term_memory(self, result: ToolResult):
    """将执行结果同步写入短期记忆"""
    memory_entry = MemoryEntry(
        content=f"[{result.tool_name}] {result.output}",
        type="tool_result",
        importance=self._calculate_importance(result),
        timestamp=datetime.now(),
        session_id=self.session_id,
        metadata=result.metadata
    )
    await self.memory_engine.write_short_term(memory_entry)

1.4 任务拆解与工具调用编排机制

1.4.1 复杂任务的层级拆解算法

当用户请求涉及多个步骤时,Agent 需要将复杂任务拆解为可执行的子任务图(DAG):

class TaskDecomposer:
    async def decompose(self, task: Task) -> TaskGraph:
        """将复杂任务拆解为子任务 DAG"""
        # 1. 调用 LLM 生成拆解方案
        decomposition = await self.llm_provider.chat(
            system="你是一个任务规划专家,将复杂任务拆解为可并行/串行执行的子任务",
            input=f"任务: {task.description}\n可用工具: {self.tool_registry.list_names()}"
        )

        # 2. 解析为任务图
        graph = TaskGraph()
        for subtask in decomposition.subtasks:
            node = TaskNode(
                id=subtask.id,
                description=subtask.description,
                tool=subtask.suggested_tool,
                dependencies=subtask.depends_on
            )
            graph.add_node(node)

        # 3. 校验 DAG 合法性(无环、依赖可达)
        graph.validate()

        return graph

1.4.2 多工具调用的依赖编排与并行调度

class TaskScheduler:
    async def execute_graph(self, graph: TaskGraph) -> Dict[str, ToolResult]:
        """执行任务图,支持并行调度"""
        results = {}
        completed = set()
        ready_queue = asyncio.Queue()

        # 初始化:将无依赖的节点放入就绪队列
        for node in graph.get_root_nodes():
            await ready_queue.put(node)

        while completed != graph.all_node_ids:
            # 取出就绪节点
            node = await ready_queue.get()

            # 并行执行无相互依赖的节点
            batch = [node]
            while not ready_queue.empty():
                next_node = ready_queue.get_nowait()
                if next_node.dependencies.issubset(completed):
                    batch.append(next_node)

            # 并行执行
            task_results = await asyncio.gather(*[
                self._execute_node(n) for n in batch
            ])

            # 收集结果,解锁后续节点
            for n, result in zip(batch, task_results):
                results[n.id] = result
                completed.add(n.id)

                # 检查后续节点是否就绪
                for successor in graph.get_successors(n.id):
                    if successor.dependencies.issubset(completed):
                        await ready_queue.put(successor)

        return results

1.4.3 工具选择评分机制

当多个工具都能完成同一任务时,Agent 通过加权评分选择最优工具:

def score_tool(self, tool: Tool, context: AgentState) -> float:
    """工具选择评分:相关性 × 历史成功率 × 调用成本"""
    # 相关性评分(0-1):工具描述与当前任务的语义相似度
    relevance = self.embedding_similarity(
        tool.description, context.current_task.description
    )

    # 历史成功率(0-1):该工具在类似任务中的历史表现
    success_rate = self.history_tracker.get_success_rate(
        tool_name=tool.name,
        task_type=context.current_task.type
    )

    # 调用成本(0-1,归一化):延迟 + Token 消耗
    cost = self._normalize_cost(tool.avg_latency, tool.avg_token_cost)

    # 加权评分
    score = (0.5 * relevance +
             0.3 * success_rate +
             0.2 * (1 - cost))  # 成本越低越好

    return score

1.4.4 子任务的状态追踪与结果汇总

@dataclass
class SubTaskTracker:
    task_id: str
    status: str = "pending"  # pending / running / success / failed
    result: Optional[ToolResult] = None
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None

    def mark_running(self):
        self.status = "running"
        self.start_time = datetime.now()

    def mark_success(self, result: ToolResult):
        self.status = "success"
        self.result = result
        self.end_time = datetime.now()

    def mark_failed(self, error: str):
        self.status = "failed"
        self.result = ToolResult(tool_name="unknown", status="failed", output=error)
        self.end_time = datetime.now()

1.5 多Agent协同与子Agent管理

1.5.1 Master-Slave协同模式

OpenClaw 采用 Master-Slave 协同模式处理复杂任务:Master Agent 负责任务规划和调度,Slave Agent 负责具体执行。

用户请求
    ↓
Master Agent(规划、调度、汇总)
    ├── Slave Agent 1(执行子任务A)
    ├── Slave Agent 2(执行子任务B)
    └── Slave Agent 3(执行子任务C)
    ↓
结果合并 → 返回用户

1.5.2 Sub-Agent的创建、任务分配与资源隔离

class SubAgentManager:
    async def create_sub_agent(self, task: SubTask, parent: Agent) -> Agent:
        """创建子 Agent,继承父 Agent 的部分上下文"""
        # 1. 构建子 Agent 的 Soul(继承父 Agent 的部分规则)
        sub_soul = Soul(
            system_prompt=f"你是子Agent,负责执行: {task.description}\n"
                          f"父Agent规则: {parent.soul.core_rules}",
            skills=task.required_skills,
            permissions=parent.permissions.intersect(task.required_permissions)
        )

        # 2. 创建子 Agent 实例
        sub_agent = await AgentFactory.create(sub_soul)

        # 3. 注入任务上下文(仅传递必要信息,实现资源隔离)
        await sub_agent.context_engine.inject(task.context_packet)

        # 4. 注册回调,监听子 Agent 状态
        sub_agent.on_complete(self._handle_sub_agent_complete)
        sub_agent.on_error(self._handle_sub_agent_error)

        return sub_agent

1.5.3 ACP协议:Agent间通信

Agent Communication Protocol(ACP)定义了 Agent 间通信的标准消息格式:

@dataclass
class ACPMessage:
    """Agent 通信协议消息"""
    sender: str           # 发送方 Agent ID
    receiver: str         # 接收方 Agent ID
    type: str             # 消息类型: task_assign / result_report / status_query / cancel
    payload: Dict         # 消息负载
    timestamp: datetime   # 时间戳
    correlation_id: str   # 关联ID(用于请求-响应匹配)

    def to_json(self) -> str:
        return json.dumps({
            "sender": self.sender,
            "receiver": self.receiver,
            "type": self.type,
            "payload": self.payload,
            "timestamp": self.timestamp.isoformat(),
            "correlation_id": self.correlation_id
        })

交互流程

Master → Slave: task_assign(分配子任务)
Slave → Master: status_query(请求状态确认)
Master → Slave: status_response(返回确认)
Slave → Master: result_report(汇报执行结果)
Master: result_merge(合并多个 Slave 的结果)

1.5.4 协同任务的冲突消解与结果合并

当多个子 Agent 的结果存在冲突时,Master Agent 通过以下策略消解:

  1. 优先级策略:按子 Agent 的置信度排序,取置信度最高的结果
  2. 投票策略:多个子 Agent 对同一问题给出不同答案时,取多数一致的结果
  3. LLM 裁决:将冲突结果提交 LLM,由 LLM 判断哪个更合理
async def resolve_conflict(self, results: List[SubTaskResult]) -> SubTaskResult:
    """冲突消解"""
    if len(results) == 1:
        return results[0]

    # 检查是否一致
    if all(r.output == results[0].output for r in results):
        return results[0]

    # 优先级策略
    if any(r.confidence >= 0.95 for r in results):
        return max(results, key=lambda r: r.confidence)

    # LLM 裁决
    verdict = await self.llm_provider.chat(
        system="你是结果裁决专家,从多个候选结果中选择最合理的一个",
        input=f"候选结果: {[r.output for r in results]}"
    )
    return results[int(verdict.choice)]

1.6 容错与自愈机制实现

1.6.1 分级重试策略

OpenClaw 根据错误类型采用差异化的重试策略:

错误类型 重试次数 退避策略 示例
参数错误 1 次 无退避 JSON 格式错误,修正后重试
执行失败 3 次 指数退避 网络超时、节点不可达
模型异常 2 次 固定间隔 LLM 返回空响应、Token 超限
class RetryPolicy:
    async def execute_with_retry(self, fn, *args, **kwargs):
        last_error = None

        for attempt in range(self.max_retries):
            try:
                return await fn(*args, **kwargs)
            except ParameterError as e:
                # 参数错误:尝试自动修正后重试一次
                if attempt == 0:
                    corrected = self._auto_correct_params(e)
                    if corrected:
                        continue
                raise
            except ExecutionError as e:
                last_error = e
                wait_time = self.base_delay * (2 ** attempt)  # 指数退避
                await asyncio.sleep(wait_time)
            except ModelError as e:
                last_error = e
                if attempt < self.max_retries - 1:
                    await asyncio.sleep(self.fixed_delay)
                else:
                    # 触发模型降级
                    raise ModelFallbackTrigger(e)

        raise last_error

1.6.2 熔断机制

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = "closed"  # closed / open / half_open
        self.last_failure_time = None

    async def call(self, fn, *args, **kwargs):
        if self.state == "open":
            # 检查是否进入半开状态
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half_open"
            else:
                raise CircuitOpenError("熔断器开启,请求被拒绝")

        try:
            result = await fn(*args, **kwargs)
            if self.state == "half_open":
                self.state = "closed"
                self.failure_count = 0
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = "open"

            raise

1.6.3 模型降级回退

当主模型故障时,自动切换到备用模型:

class ModelFallbackChain:
    def __init__(self, models: List[ModelConfig]):
        self.models = models  # 按优先级排序
        self.current_index = 0

    async def chat(self, **kwargs) -> LLMResponse:
        for i in range(self.current_index, len(self.models)):
            model = self.models[i]
            try:
                response = await model.provider.chat(**kwargs)
                self.current_index = i  # 更新当前可用模型
                return response
            except ModelError:
                continue

        raise AllModelsFailedError("所有模型均不可用")

1.6.4 异常状态的自动恢复与人工介入入口

OpenClaw 在 Agent 运行时中设计了自动恢复 + 人工介入的双通道机制:

  • 自动恢复通道:对于可预测的异常(超时、网络错误、模型降级),系统自动执行重试/熔断/降级策略
  • 人工介入入口:对于不可预测的异常(逻辑错误、无限循环、安全风险),系统通过 Gateway 向用户发送介入请求,用户可以:
    • 修改 Agent 的当前上下文
    • 手动指定下一步操作
    • 终止当前任务

第二部分 Memory记忆系统原理与内部实现

2.1 三层记忆模型架构设计

OpenClaw 的记忆系统采用三层记忆模型,模拟人类大脑的信息处理机制:

工作记忆(Working Memory) ←→ 短期记忆(Short-Term Memory) ←→ 长期记忆(Long-Term Memory)
     ↑ 实时上下文              ↑ 会话级缓存                    ↑ 持久化知识库
   容量: ~4K tokens          容量: ~32K tokens              容量: 无限
   生命周期: 单次调用          生命周期: 单次会话               生命周期: 永久
   存储: 内存                 存储: SQLite 内存表              存储: 向量数据库

2.1.1 工作记忆:实时上下文窗口

工作记忆是最顶层的记忆,对应 LLM 的上下文窗口。它存储当前正在处理的对话轮次、工具调用结果和临时推理状态。

@dataclass
class WorkingMemory:
    """工作记忆:实时上下文窗口"""
    messages: List[Message]           # 当前对话消息列表
    active_tool_calls: List[ToolCall] # 正在执行的工具调用
    reasoning_state: Dict             # 临时推理状态(如中间计算结果)
    token_count: int = 0              # 当前 Token 计数

    def add_message(self, message: Message):
        """添加消息,自动更新 Token 计数"""
        self.messages.append(message)
        self.token_count += message.token_count

    def is_overflow(self, budget: TokenBudget) -> bool:
        """检查是否超出 Token 预算"""
        return self.token_count > budget.available_for_history

工作记忆的核心特征是容量有限但访问速度极快——它是 LLM 直接"看到"的信息,任何需要被模型处理的数据都必须先进入工作记忆。

2.1.2 短期记忆:会话级信息缓存

短期记忆存储当前会话中的交互历史和中间结果,使用 SQLite 内存表实现,支持结构化查询和模糊检索。

class ShortTermMemory:
    """短期记忆:会话级信息缓存"""

    def __init__(self, session_id: str, max_entries: int = 500):
        self.session_id = session_id
        self.max_entries = max_entries
        self.conn = sqlite3.connect(":memory:")
        self._init_schema()

    def _init_schema(self):
        """初始化内存表结构"""
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS short_term_memory (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                content TEXT NOT NULL,
                type TEXT NOT NULL,          -- user_input / tool_result / reasoning / system
                importance REAL DEFAULT 0.5, -- 重要性评分 0-1
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                token_count INTEGER DEFAULT 0,
                metadata TEXT                -- JSON 格式附加信息
            )
        """)
        self.conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_type ON short_term_memory(type)
        """)
        self.conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_importance ON short_term_memory(importance)
        """)

    async def write(self, entry: MemoryEntry):
        """写入短期记忆"""
        self.conn.execute(
            "INSERT INTO short_term_memory (content, type, importance, timestamp, token_count, metadata) VALUES (?, ?, ?, ?, ?, ?)",
            (entry.content, entry.type, entry.importance, entry.timestamp, entry.token_count, json.dumps(entry.metadata))
        )
        # 容量控制:超出上限时淘汰低重要性条目
        count = self.conn.execute("SELECT COUNT(*) FROM short_term_memory").fetchone()[0]
        if count > self.max_entries:
            self.conn.execute(
                "DELETE FROM short_term_memory WHERE id IN (SELECT id FROM short_term_memory ORDER BY importance ASC LIMIT ?)",
                (count - self.max_entries,)
            )

    async def recall(self, query: str, top_k: int = 5) -> List[MemoryEntry]:
        """从短期记忆中检索"""
        # 基于关键词和重要性的混合检索
        results = self.conn.execute(
            """SELECT * FROM short_term_memory
               WHERE content LIKE ? OR type = 'reasoning'
               ORDER BY importance DESC, timestamp DESC
               LIMIT ?""",
            (f"%{query}%", top_k)
        ).fetchall()
        return [self._row_to_entry(r) for r in results]

2.1.3 长期记忆:持久化知识库

长期记忆是 Agent 跨会话知识积累的核心,使用向量数据库存储,支持语义检索。

class LongTermMemory:
    """长期记忆:持久化知识库"""

    def __init__(self, vector_store: VectorStore, embedding_model: EmbeddingModel):
        self.vector_store = vector_store
        self.embedding_model = embedding_model

    async def store(self, entry: MemoryEntry) -> str:
        """存储到长期记忆"""
        # 生成向量嵌入
        embedding = await self.embedding_model.embed(entry.content)

        # 构建元数据
        metadata = {
            "type": entry.type,
            "importance": entry.importance,
            "session_id": entry.session_id,
            "timestamp": entry.timestamp.isoformat(),
            **entry.metadata
        }

        # 写入向量数据库
        doc_id = await self.vector_store.add(
            documents=[entry.content],
            embeddings=[embedding],
            metadatas=[metadata],
            ids=[f"mem_{uuid4().hex}"]
        )
        return doc_id[0]

    async def recall(self, query: str, top_k: int = 5, min_relevance: float = 0.7) -> List[MemoryEntry]:
        """语义检索长期记忆"""
        query_embedding = await self.embedding_model.embed(query)

        results = await self.vector_store.search(
            query_embedding=query_embedding,
            top_k=top_k,
            filter_metadata={"importance": {"$gte": min_relevance}}
        )

        return [
            MemoryEntry(
                content=r.document,
                type=r.metadata.get("type", "unknown"),
                importance=r.metadata.get("importance", 0.5),
                timestamp=datetime.fromisoformat(r.metadata["timestamp"]),
                session_id=r.metadata.get("session_id"),
                metadata={k: v for k, v in r.metadata.items() if k not in ["type", "importance", "timestamp", "session_id"]}
            )
            for r in results
        ]

2.1.4 三层记忆的流转规则与淘汰策略

信息在三层记忆间的流转遵循明确的规则:

流转方向 触发条件 核心动作
工作 → 短期 每轮 ReAct 迭代结束 将当前轮次的消息和结果写入短期记忆
短期 → 长期 会话结束 / 重要性超阈值 提取高价值内容归档到长期记忆
长期 → 工作 每次组装 Prompt 时 根据当前查询语义检索长期记忆,注入工作记忆
短期 → 工作 每次组装 Prompt 时 根据相关性检索短期记忆,注入工作记忆

淘汰策略

  • 工作记忆:超出 Token 预算时,对旧消息进行摘要压缩
  • 短期记忆:超出条目上限时,按重要性评分淘汰低价值条目
  • 长期记忆:永不淘汰,但会通过遗忘机制降低低价值记忆的检索权重

2.2 短期记忆的SQLite内存表实现

2.2.1 表结构与索引设计

短期记忆使用 SQLite 内存表,表结构设计兼顾查询效率和存储紧凑:

CREATE TABLE short_term_memory (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    content TEXT NOT NULL,
    type TEXT NOT NULL,
    importance REAL DEFAULT 0.5,
    timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
    token_count INTEGER DEFAULT 0,
    metadata TEXT,
    session_id TEXT NOT NULL
);

-- 类型索引:按消息类型快速过滤
CREATE INDEX idx_type ON short_term_memory(type);

-- 重要性索引:按重要性排序淘汰
CREATE INDEX idx_importance ON short_term_memory(importance);

-- 时间索引:按时间范围查询
CREATE INDEX idx_timestamp ON short_term_memory(timestamp);

-- 会话索引:按会话ID隔离
CREATE INDEX idx_session ON short_term_memory(session_id);

2.2.2 写入流程与容量控制

async def write_with_eviction(self, entry: MemoryEntry):
    """带淘汰策略的写入"""
    # 1. 写入新条目
    self.conn.execute(
        """INSERT INTO short_term_memory
           (content, type, importance, timestamp, token_count, metadata, session_id)
           VALUES (?, ?, ?, ?, ?, ?, ?)""",
        (entry.content, entry.type, entry.importance,
         entry.timestamp, entry.token_count,
         json.dumps(entry.metadata), entry.session_id)
    )

    # 2. 检查容量
    total_tokens = self.conn.execute(
        "SELECT SUM(token_count) FROM short_term_memory WHERE session_id = ?",
        (entry.session_id,)
    ).fetchone()[0]

    # 3. 超出容量时执行淘汰
    if total_tokens > self.max_tokens_per_session:
        # 计算需要释放的 Token 数
        tokens_to_free = total_tokens - self.max_tokens_per_session

        # 按重要性升序、时间升序选择淘汰候选
        candidates = self.conn.execute(
            """SELECT id, token_count FROM short_term_memory
               WHERE session_id = ? AND type != 'system'
               ORDER BY importance ASC, timestamp ASC""",
            (entry.session_id,)
        ).fetchall()

        freed = 0
        ids_to_delete = []
        for row in candidates:
            if freed >= tokens_to_free:
                break
            ids_to_delete.append(row[0])
            freed += row[1]

        # 执行淘汰
        self.conn.execute(
            f"DELETE FROM short_term_memory WHERE id IN ({','.join('?' * len(ids_to_delete))})",
            ids_to_delete
        )

2.2.3 检索策略:关键词+重要性混合排序

async def hybrid_recall(self, query: str, top_k: int = 5) -> List[MemoryEntry]:
    """混合检索:关键词匹配 + 重要性加权"""
    results = self.conn.execute(
        """SELECT *, (
               -- 关键词匹配度(0-1)
               CASE WHEN content LIKE ? THEN 0.5 ELSE 0 END +
               -- 重要性权重(0-1)
               importance * 0.5 +
               -- 时间衰减因子(越近越高)
               MAX(0, 1 - (julianday('now') - julianday(timestamp)) * 24) * 0.2
           ) AS relevance_score
           FROM short_term_memory
           WHERE session_id = ?
           ORDER BY relevance_score DESC
           LIMIT ?""",
        (f"%{query}%", self.session_id, top_k)
    ).fetchall()

    return [self._row_to_entry(r) for r in results]

2.3 长期记忆的混合检索机制

2.3.1 向量检索与关键词检索的融合

OpenClaw 的长期记忆检索采用混合检索策略,融合向量语义检索和关键词精确检索的优势:

class HybridRetriever:
    """混合检索器:向量检索 + 关键词检索"""

    def __init__(self, vector_store: VectorStore, keyword_index: KeywordIndex):
        self.vector_store = vector_store
        self.keyword_index = keyword_index

    async def search(self, query: str, top_k: int = 5,
                     alpha: float = 0.7) -> List[SearchResult]:
        """
        混合检索
        alpha: 向量检索权重(0-1),1-alpha 为关键词检索权重
        """
        # 1. 向量语义检索
        vector_results = await self.vector_store.search(
            query=query, top_k=top_k * 2  # 多召回一些,用于融合排序
        )

        # 2. 关键词精确检索
        keyword_results = await self.keyword_index.search(
            query=query, top_k=top_k * 2
        )

        # 3. 融合排序(Reciprocal Rank Fusion)
        merged = self._reciprocal_rank_fusion(
            vector_results, keyword_results, alpha=alpha
        )

        return merged[:top_k]

    def _reciprocal_rank_fusion(self, vector_results, keyword_results,
                                 alpha: float = 0.7, k: int = 60) -> List[SearchResult]:
        """RRF 融合排序"""
        scores = {}

        # 向量检索结果评分
        for rank, result in enumerate(vector_results):
            doc_id = result.id
            scores[doc_id] = scores.get(doc_id, 0) + alpha / (k + rank + 1)

        # 关键词检索结果评分
        for rank, result in enumerate(keyword_results):
            doc_id = result.id
            scores[doc_id] = scores.get(doc_id, 0) + (1 - alpha) / (k + rank + 1)

        # 按融合分数排序
        sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)

        # 构建结果列表
        all_results = {r.id: r for r in vector_results + keyword_results}
        return [all_results[doc_id] for doc_id in sorted_ids if doc_id in all_results]

2.3.2 2026.6.6版本优化:检索性能提升

2026.6.6 版本对混合检索进行了三项关键优化:

1. 向量索引优化:引入 HNSW 索引替代暴力搜索,检索延迟从 ~200ms 降至 ~15ms:

class OptimizedVectorStore:
    """优化后的向量存储,使用 HNSW 索引"""

    def __init__(self, dimension: int = 1536, m: int = 16, ef_construction: int = 200):
        self.index = hnswlib.Index(space='cosine', dim=dimension)
        self.index.init_index(max_elements=100000, ef_construction=ef_construction, M=m)
        self.index.set_ef(50)  # 搜索时的 ef 值,越大越精确但越慢

    async def search(self, query_embedding: List[float], top_k: int = 5) -> List[SearchResult]:
        labels, distances = self.index.knn_query(query_embedding, k=top_k)
        return [
            SearchResult(id=str(label), score=1 - distance, document=self.doc_store[str(label)])
            for label, distance in zip(labels[0], distances[0])
        ]

2. 查询缓存:对高频查询结果进行缓存,避免重复计算:

class QueryCache:
    """查询结果缓存"""

    def __init__(self, max_size: int = 1000, ttl: int = 300):
        self.cache = TTLCache(maxsize=max_size, ttl=ttl)

    async def get_or_search(self, query: str, search_fn) -> List[SearchResult]:
        cache_key = self._hash_query(query)
        if cache_key in self.cache:
            return self.cache[cache_key]

        results = await search_fn(query)
        self.cache[cache_key] = results
        return results

3. 自适应 alpha 调节:根据查询类型自动调整向量检索和关键词检索的权重:

class AdaptiveAlpha:
    """自适应 alpha 调节"""

    def adjust(self, query: str) -> float:
        """根据查询特征调整 alpha"""
        # 精确查询(包含引号、特定术语)→ 降低 alpha,增加关键词权重
        if '"' in query or any(term in query for term in self.exact_terms):
            return 0.4

        # 语义查询(自然语言描述)→ 提高 alpha,增加向量权重
        if len(query.split()) > 5:
            return 0.8

        # 默认
        return 0.7

2.4 记忆的重要性评估与遗忘机制

2.4.1 重要性评分算法

每条记忆在写入时都会计算一个重要性评分(0-1),该评分决定了记忆的保留优先级和检索权重:

class ImportanceScorer:
    """记忆重要性评分器"""

    def score(self, entry: MemoryEntry) -> float:
        """计算综合重要性评分"""
        # 1. 内容类型基础分
        type_score = self._type_base_score(entry.type)

        # 2. 信息熵评分(信息量越大越重要)
        entropy_score = self._entropy_score(entry.content)

        # 3. 工具执行结果评分(成功结果比失败更重要)
        execution_score = self._execution_score(entry)

        # 4. 用户反馈评分(用户明确表示有用的信息)
        feedback_score = self._feedback_score(entry)

        # 加权综合评分
        importance = (
            0.2 * type_score +
            0.3 * entropy_score +
            0.3 * execution_score +
            0.2 * feedback_score
        )

        return min(max(importance, 0.0), 1.0)  # 裁剪到 [0, 1]

    def _type_base_score(self, entry_type: str) -> float:
        """内容类型基础分"""
        type_scores = {
            "system": 1.0,        # 系统消息最重要
            "user_input": 0.8,    # 用户输入次之
            "tool_result": 0.6,   # 工具结果
            "reasoning": 0.4,     # 推理过程
            "observation": 0.3,   # 观察记录
        }
        return type_scores.get(entry_type, 0.5)

    def _entropy_score(self, content: str) -> float:
        """信息熵评分:内容越独特越重要"""
        # 计算字符级信息熵
        from collections import Counter
        freq = Counter(content)
        total = len(content)
        entropy = -sum((c / total) * math.log2(c / total) for c in freq.values())
        # 归一化到 [0, 1]
        return min(entropy / 8.0, 1.0)  # 最大熵约 8 bits/char

2.4.2 时间衰减函数

记忆的重要性会随时间衰减,模拟人类遗忘曲线:

class TimeDecay:
    """时间衰减函数,基于艾宾浩斯遗忘曲线"""

    def __init__(self, half_life_hours: float = 24.0):
        """
        half_life_hours: 半衰期(小时),即重要性减半所需时间
        - 短期记忆半衰期: 1小时
        - 长期记忆半衰期: 720小时(30天)
        """
        self.half_life_hours = half_life_hours

    def decay(self, initial_importance: float, hours_elapsed: float) -> float:
        """计算衰减后的重要性"""
        # 指数衰减: I(t) = I_0 * 2^(-t / T_half)
        decayed = initial_importance * (2 ** (-hours_elapsed / self.half_life_hours))
        return max(decayed, 0.01)  # 最低保留 1%,不完全遗忘

    def boost(self, current_importance: float, access_count: int) -> float:
        """访问增强:每次被检索到时提升重要性"""
        # 间隔重复效应:每次访问使重要性恢复到初始值的 (1 - 1/2^n)
        boost_factor = 1 - (1 / (2 ** access_count))
        return min(current_importance + boost_factor * (1 - current_importance), 1.0)

2.4.3 记忆归档:从短期到长期的迁移

class MemoryArchiver:
    """记忆归档器:将高价值短期记忆迁移到长期记忆"""

    def __init__(self, short_term: ShortTermMemory, long_term: LongTermMemory,
                 archive_threshold: float = 0.7):
        self.short_term = short_term
        self.long_term = long_term
        self.archive_threshold = archive_threshold

    async def archive_session(self, session_id: str):
        """会话结束时归档高价值记忆"""
        # 1. 查询该会话的所有短期记忆
        entries = self.short_term.get_by_session(session_id)

        # 2. 筛选高价值条目
        for entry in entries:
            if entry.importance >= self.archive_threshold:
                # 3. 去重检查:是否与已有长期记忆高度相似
                similar = await self.long_term.recall(
                    query=entry.content, top_k=1, min_relevance=0.95
                )

                if not similar:
                    # 新知识,归档到长期记忆
                    await self.long_term.store(entry)
                else:
                    # 已有相似记忆,合并更新
                    await self._merge_memory(similar[0], entry)

        # 4. 清理已归档的短期记忆
        self.short_term.clear_session(session_id)

    async def _merge_memory(self, existing: MemoryEntry, new: MemoryEntry):
        """合并相似记忆"""
        merged = MemoryEntry(
            content=f"{existing.content}\n[补充] {new.content}",
            type=existing.type,
            importance=max(existing.importance, new.importance),
            timestamp=new.timestamp,  # 使用更新的时间戳
            session_id=existing.session_id,
            metadata={**existing.metadata, "merged_from": new.session_id}
        )
        await self.long_term.update(existing.id, merged)

2.5 ContextEngine上下文引擎详解

2.5.1 上下文组装策略

ContextEngine 是连接记忆系统与 Agent 运行时的桥梁,它控制每次 LLM 调用的上下文组装:

class ContextEngine:
    """上下文引擎:控制 LLM 调用的上下文组装"""

    async def assemble(self, state: AgentState, budget: TokenBudget) -> AssembledContext:
        """组装 LLM 调用上下文"""
        remaining_budget = budget.total

        # [优先级1] 系统提示词(不可截断)
        system_prompt = state.soul.system_prompt
        remaining_budget -= count_tokens(system_prompt)

        # [优先级2] 工具定义(不可截断)
        tool_definitions = self._format_tool_definitions(state.available_tools)
        remaining_budget -= count_tokens(tool_definitions)

        # [优先级3] 记忆召回(可截断)
        memories = await self._recall_memories(state.current_input, remaining_budget // 3)
        memory_text = self._format_memories(memories)
        remaining_budget -= count_tokens(memory_text)

        # [优先级4] 历史交互(可截断、可摘要)
        history = await self._get_managed_history(state, remaining_budget)

        # [优先级5] 当前输入(不可截断)
        current_input = state.current_input

        return AssembledContext(
            system_prompt=system_prompt,
            tool_definitions=tool_definitions,
            memories=memory_text,
            history=history,
            current_input=current_input
        )

2.5.2 历史消息的摘要压缩算法

当历史消息超出 Token 预算时,ContextEngine 会对旧消息进行摘要压缩:

class HistoryCompressor:
    """历史消息摘要压缩器"""

    async def compress(self, messages: List[Message],
                       target_tokens: int) -> List[Message]:
        """将历史消息压缩到目标 Token 数"""
        total_tokens = sum(m.token_count for m in messages)

        if total_tokens <= target_tokens:
            return messages

        # 策略1:保留最近的消息,对旧消息分组摘要
        recent_count = len(messages) // 3  # 保留最近 1/3
        recent = messages[-recent_count:]
        old = messages[:-recent_count]

        # 对旧消息按轮次分组摘要
        summarized = []
        group_size = 5  # 每5条消息为一组
        for i in range(0, len(old), group_size):
            group = old[i:i + group_size]
            summary = await self._summarize_group(group)
            summarized.append(summary)

        result = summarized + recent

        # 如果仍然超预算,递归压缩
        if sum(m.token_count for m in result) > target_tokens:
            return await self.compress(result, target_tokens)

        return result

    async def _summarize_group(self, messages: List[Message]) -> Message:
        """对一组消息生成摘要"""
        combined = "\n".join(f"[{m.role}] {m.content[:200]}" for m in messages)

        summary_text = await self.llm_provider.chat(
            system="请将以下对话历史压缩为简洁的摘要,保留关键信息和决策点",
            input=combined,
            max_tokens=100
        )

        return Message(
            role="system",
            content=f"[历史摘要] {summary_text}",
            token_count=count_tokens(summary_text) + 10
        )

2.5.3 2026.6.6版本重构:可插拔引擎机制

2026.6.6 版本对 ContextEngine 进行了重构,引入了可插拔引擎机制,允许用户根据场景选择不同的上下文管理策略:

class ContextEngineFactory:
    """上下文引擎工厂"""

    @staticmethod
    def create(policy: ContextPolicy) -> ContextEngine:
        engines = {
            "sliding_window": SlidingWindowEngine,     # 滑动窗口:保留最近N条消息
            "summary_compression": SummaryEngine,       # 摘要压缩:对旧消息摘要
            "relevance_ranking": RelevanceEngine,       # 相关性排序:按相关性选择消息
            "hybrid": HybridContextEngine,              # 混合策略:综合以上策略
        }

        engine_class = engines.get(policy.type, HybridContextEngine)
        return engine_class(**policy.params)


class HybridContextEngine(ContextEngine):
    """混合上下文引擎:综合多种策略"""

    async def assemble(self, state: AgentState, budget: TokenBudget) -> AssembledContext:
        # 1. 始终保留最近3轮对话
        recent_messages = state.messages[-6:]  # 3轮 = 6条消息(用户+助手)

        # 2. 对更早的消息按相关性检索
        older_messages = state.messages[:-6]
        relevant_older = await self._rank_by_relevance(
            older_messages, state.current_input, budget.available_for_history // 2
        )

        # 3. 不相关的旧消息生成摘要
        irrelevant_older = [m for m in older_messages if m not in relevant_older]
        summary = await self._summarize(irrelevant_older)

        # 4. 组装最终上下文
        return AssembledContext(
            recent_messages=recent_messages,
            relevant_older=relevant_older,
            summary=summary
        )

第三部分 运行时与记忆的协同闭环

3.1 完整请求处理流程追踪

让我们追踪一个完整请求从进入到响应的全过程,观察运行时与记忆系统的协同:

用户: "帮我分析一下最近一周的销售数据趋势"

Step 1: Gateway 接收与路由

HTTP Request → Gateway → 认证 → 限流检查 → 路由到 Agent 运行时

Step 2: Agent 运行时初始化上下文

# 2.1 从长期记忆召回相关知识
memories = await long_term.recall("销售数据分析", top_k=5)
# 返回: ["Q3销售报告模板", "数据源配置信息", "上次分析结论"]

# 2.2 从短期记忆加载会话历史
session_history = await short_term.get_by_session(session_id)

# 2.3 ContextEngine 组装上下文
context = await context_engine.assemble(
    system_prompt=soul.system_prompt,
    memories=memories,
    history=session_history,
    current_input="帮我分析一下最近一周的销售数据趋势"
)

Step 3: ReAct 循环执行

[Think] LLM 分析用户意图 → 需要获取销售数据 + 生成趋势图
[Parse] 工具调用: database_query(sql="SELECT ... FROM sales WHERE date > ...")
[Act]   执行数据库查询 → 返回销售数据
[Observe] 数据写入短期记忆 → 继续循环

[Think] LLM 分析数据 → 需要生成可视化图表
[Parse] 工具调用: chart_generate(data=sales_data, type="line")
[Act]   执行图表生成 → 返回图表URL
[Observe] 结果写入短期记忆 → 继续循环

[Think] LLM 综合分析 → 生成趋势分析报告
[Parse] 无工具调用 → end_turn

Step 4: 记忆归档

# 会话结束后归档
await archiver.archive_session(session_id)
# 高价值记忆(如分析结论、数据源配置)迁移到长期记忆

3.2 记忆驱动的行为优化

记忆系统不仅存储信息,还主动驱动 Agent 的行为优化:

3.2.1 基于历史成功率的工具偏好

class MemoryDrivenToolSelector:
    """基于记忆的工具选择器"""

    async def select(self, task: str, candidates: List[Tool]) -> Tool:
        # 从长期记忆中检索该任务类型的历史工具使用记录
        history = await self.long_term.recall(
            query=f"工具使用记录 {task}",
            top_k=10
        )

        # 统计各工具的历史成功率
        success_rates = {}
        for record in history:
            tool_name = record.metadata.get("tool_name")
            success = record.metadata.get("success", False)
            if tool_name not in success_rates:
                success_rates[tool_name] = {"success": 0, "total": 0}
            success_rates[tool_name]["total"] += 1
            if success:
                success_rates[tool_name]["success"] += 1

        # 结合历史成功率和当前相关性选择工具
        scored = []
        for tool in candidates:
            rate = success_rates.get(tool.name, {"success": 0, "total": 0})
            historical_rate = rate["success"] / max(rate["total"], 1)
            relevance = self._compute_relevance(tool, task)
            score = 0.6 * relevance + 0.4 * historical_rate
            scored.append((tool, score))

        return max(scored, key=lambda x: x[1])[0]

3.2.2 基于用户偏见的Prompt微调

class UserPreferenceAdapter:
    """用户偏好适配器"""

    async def adapt_prompt(self, base_prompt: str, user_id: str) -> str:
        # 从长期记忆检索用户偏好
        preferences = await self.long_term.recall(
            query=f"用户偏好 {user_id}",
            top_k=5
        )

        # 提取偏好规则
        preference_rules = []
        for pref in preferences:
            if pref.type == "user_preference":
                preference_rules.append(pref.content)

        if preference_rules:
            # 将偏好规则注入系统提示词
            preference_block = "\n".join(f"- {rule}" for rule in preference_rules)
            adapted = f"{base_prompt}\n\n## 用户偏好\n{preference_block}"
            return adapted

        return base_prompt

3.3 多Agent间的记忆共享与隔离

3.3.1 共享记忆空间

Master Agent 可以创建共享记忆空间,让多个子 Agent 协作时共享关键信息:

class SharedMemorySpace:
    """共享记忆空间"""

    def __init__(self, space_id: str, participants: List[str]):
        self.space_id = space_id
        self.participants = set(participants)
        self.shared_store = {}  # 共享键值存储
        self.event_bus = EventBus()  # 事件总线

    async def write(self, agent_id: str, key: str, value: Any):
        """写入共享记忆"""
        if agent_id not in self.participants:
            raise PermissionError(f"Agent {agent_id} 不在共享空间中")

        self.shared_store[key] = {
            "value": value,
            "written_by": agent_id,
            "timestamp": datetime.now()
        }

        # 通知其他参与者
        await self.event_bus.publish(
            topic=f"shared_memory:{self.space_id}",
            event={"key": key, "action": "write", "by": agent_id}
        )

    async def read(self, agent_id: str, key: str) -> Any:
        """读取共享记忆"""
        if agent_id not in self.participants:
            raise PermissionError(f"Agent {agent_id} 不在共享空间中")
        return self.shared_store.get(key, {}).get("value")

3.3.2 记忆隔离边界

每个 Agent 实例的记忆默认是隔离的,只有通过共享记忆空间才能跨 Agent 访问:

class MemoryIsolationGuard:
    """记忆隔离守卫"""

    def check_access(self, agent_id: str, memory_entry: MemoryEntry) -> bool:
        """检查 Agent 是否有权访问某条记忆"""
        # 规则1:Agent 只能访问自己会话的记忆
        if memory_entry.session_id in self.agent_sessions[agent_id]:
            return True

        # 规则2:Agent 可以访问共享记忆空间中的记忆
        if memory_entry.metadata.get("shared_space") in self.agent_shared_spaces[agent_id]:
            return True

        # 规则3:Master Agent 可以访问所有子 Agent 的记忆
        if agent_id in self.master_agents:
            if memory_entry.session_id in self.get_sub_agent_sessions(agent_id):
                return True

        return False

第四部分 2026.6.6版本更新与最佳实践

4.1 关键更新总结

模块 更新内容 影响
ContextEngine 可插拔引擎机制 支持自定义上下文管理策略
混合检索 HNSW索引 + 查询缓存 + 自适应alpha 检索延迟降低 92%
Agent池化 实例复用 + 懒加载 冷启动时间降低 80%
子Agent调度 共享记忆空间 + ACP协议增强 多Agent协作更高效
记忆归档 增量合并 + 去重优化 长期记忆膨胀率降低 60%

4.2 生产部署最佳实践

4.2.1 记忆系统配置建议

# production-memory-config.yaml
memory:
  short_term:
    max_entries: 500
    max_tokens_per_session: 32000
    eviction_policy: "importance_weighted"  # 按重要性加权淘汰

  long_term:
    vector_store: "milvus"          # 生产环境推荐 Milvus
    embedding_model: "bge-large-zh" # 中文场景推荐
    dimension: 1024
    index_type: "HNSW"
    hnsw_params:
      m: 16
      ef_construction: 200

  hybrid_retrieval:
    alpha: 0.7                      # 向量检索权重
    enable_cache: true
    cache_ttl: 300                  # 缓存5分钟
    adaptive_alpha: true            # 自适应alpha

  archiver:
    archive_threshold: 0.7          # 重要性 >= 0.7 才归档
    dedup_similarity: 0.95          # 相似度 >= 0.95 视为重复
    schedule: "session_end"         # 会话结束时归档

4.2.2 Agent运行时配置建议

# production-agent-config.yaml
agent:
  pool_size: 10                     # Agent池大小
  max_iterations: 25                # ReAct最大迭代次数
  llm_timeout: 120                  # LLM调用超时(秒)

  context_engine:
    type: "hybrid"                  # 混合上下文引擎
    recent_window: 6                # 保留最近6条消息
    summary_model: "gpt-4o-mini"    # 摘要用轻量模型

  retry:
    max_retries: 3
    base_delay: 1                   # 退避基础延迟(秒)
    circuit_breaker:
      failure_threshold: 5
      recovery_timeout: 60

  model_fallback:
    - provider: "openai"
      model: "gpt-4o"
    - provider: "anthropic"
      model: "claude-3.5-sonnet"
    - provider: "openai"
      model: "gpt-4o-mini"

4.2.3 监控指标

指标 说明 告警阈值
react_iterations 单次请求的 ReAct 迭代次数 > 20
memory_recall_latency 记忆检索延迟 > 100ms
context_assembly_time 上下文组装耗时 > 500ms
short_term_eviction_rate 短期记忆淘汰率 > 30%
long_term_growth_rate 长期记忆增长速率 > 1MB/hour
tool_success_rate 工具调用成功率 < 90%
agent_pool_utilization Agent池利用率 > 80%

4.3 常见问题与调优建议

Q1: Agent 经常达到最大迭代次数怎么办?

调优建议:

  1. 检查 Soul 配置中的系统提示词是否足够清晰,避免 LLM 反复尝试
  2. 减少单次请求涉及的工具数量,拆分为多轮对话
  3. 调整 max_iterations 上限(但需注意成本控制)
  4. 启用"提前终止"策略:当连续2轮迭代无新进展时自动终止

Q2: 长期记忆检索结果不相关怎么办?

调优建议:

  1. 检查 Embedding 模型是否适合你的领域(通用模型 vs 领域微调模型)
  2. 调整 min_relevance 阈值(默认 0.7,可提高到 0.8)
  3. 调整混合检索的 alpha 值(增加关键词检索权重)
  4. 检查记忆归档时的重要性评分是否合理

Q3: 上下文窗口经常溢出怎么办?

调优建议:

  1. 切换到 hybrid 上下文引擎,自动管理历史消息
  2. 降低 recent_window 大小
  3. 启用更激进的历史摘要压缩
  4. 检查是否有工具返回了过大的结果,考虑截断

总结

OpenClaw 的 Agent 运行时与记忆系统构成了一个精密的协同闭环:

  1. Agent 运行时是"大脑"——通过 ReAct 循环实现 Think-Parse-Act-Observe 的自主决策闭环,通过任务图编排实现复杂任务的并行调度,通过 Master-Slave 模式实现多 Agent 协同,通过分级重试和熔断机制保障系统韧性。

  2. Memory 记忆系统是"记忆"——通过三层记忆模型实现信息的分层存储和流转,通过混合检索机制实现语义+关键词的精准召回,通过重要性评估和遗忘机制实现记忆的自适应管理,通过 ContextEngine 实现上下文的智能组装。

  3. 运行时与记忆的协同是"智慧"——记忆驱动工具选择偏好、用户偏好适配、行为模式优化;运行时驱动记忆的写入、归档和检索。两者形成正向飞轮:越用越聪明,越用越精准。

2026.6.6 版本的关键更新——可插拔 ContextEngine、HNSW 混合检索、Agent 池化复用——让这个飞轮转得更快、更稳。理解这些内部实现逻辑,是充分发挥 OpenClaw 能力的基础,也是进行二次开发和创新的前提。

0

评论区