侧边栏壁纸
  • 累计撰写 56 篇文章
  • 累计创建 5 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

MaxKB_v2_深度学习教程

温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

MaxKB v2 全方位学习教程:从功能使用到源码深度剖析

本教程基于 MaxKB v2.0.0 源码和官方文档(https://maxkb.cn/docs/v2/)编写,旨在帮助开发者全面理解 MaxKB 的功能使用、架构设计和源码实现细节。


第一部分:认识 MaxKB

1.1 MaxKB 是什么

MaxKB(Max Knowledge Brain)是由飞致云(FIT2CLOUD)开源的企业级智能体平台,采用 GPLv3 许可证。它的核心定位是帮助企业以最低门槛落地 AI 应用——你不需要从零开始训练模型,而是通过"接入大模型 + 构建知识库 + 编排工作流"三步走策略,快速搭建出智能客服、办公助手、知识问答等场景的 AI 应用。

MaxKB 的设计理念可以概括为八个字:开箱即用,伴随成长。新手可以通过简易模式 5 分钟搭建一个 RAG 问答应用,而高级用户则可以通过工作流引擎编排包含 30+ 种节点类型的复杂业务流程。

1.2 核心能力概览

MaxKB 的核心能力可以分为四大支柱:

RAG 检索增强生成:这是 MaxKB 的基础能力。你可以上传 Word、PDF、Markdown、TXT 等格式的文档(或自动爬取在线文档),系统会自动完成文本拆分、向量化和索引构建。当用户提问时,系统先从知识库中检索相关段落,再将其作为上下文传给大模型,从而有效减少幻觉,提升回答质量。

工作流引擎:这是 MaxKB 的高级能力。内置了一个自研的 DAG(有向无环图)执行引擎,支持 AI 对话、知识库检索、条件分支、循环、工具调用、表单交互、图像生成/理解等 30+ 种节点类型,可以通过可视化界面拖拽编排。

模型中立:MaxKB 不绑定任何一家大模型供应商。目前已内置 27 个供应商适配器,涵盖国际厂商(OpenAI、Anthropic、Google Gemini、AWS Bedrock)、国内厂商(通义千问、文心一言、火山引擎、腾讯混元、智谱、Kimi、MiniMax、讯飞等)以及开源自部署方案(Ollama、vLLM、Xinference)。支持 9 种模型类型:LLM、Embedding、STT(语音转文字)、TTS(文字转语音)、Image(图像理解)、TTI(图像生成)、Reranker(重排序)、TTV(文生视频)、ITV(图生视频)。

无缝嵌入:MaxKB 提供 JS 嵌入脚本和 OpenAI 兼容 API,可以零编码嵌入到现有业务系统中。同时还支持 MCP(Model Context Protocol)协议,既可作为 MCP Server 暴露智能体,也可在工作流中调用外部 MCP 工具。

1.3 技术栈总览

层次 技术选型
前端框架 Vue 3 + TypeScript + Vite
UI 组件库 Element Plus
工作流编辑器 LogicFlow(节点拖拽与连线)
后端框架 Python 3.11+ / Django 5.2 + Django REST Framework 3.17
AI 框架 LangChain 1.2 + LangGraph 1.1 + deepagents 0.5
向量数据库 PostgreSQL + pgvector 扩展
关系数据库 PostgreSQL(深度使用全文检索、数组字段、Large Object 等特性)
缓存/消息队列 Redis(缓存 + Celery Broker)
异步任务 Celery 5.5 + django-celery-beat + celery-once(防重复执行)
定时调度 APScheduler(通过 django-apscheduler 集成)
部署 Docker + Gunicorn(多进程模式)
API 文档 drf-spectacular(自动生成 OpenAPI 3 / Swagger 文档)

1.4 快速部署

最简单的方式是使用 Docker 一行命令启动:

# Linux
docker run -d --name=maxkb --restart=always -p 8080:8080 \
  -v ~/.maxkb:/opt/maxkb registry.fit2cloud.com/maxkb/maxkb

# Windows (PowerShell)
docker run -d --name=maxkb --restart=always -p 8080:8080 `
  -v C:/maxkb:/opt/maxkb registry.fit2cloud.com/maxkb/maxkb

启动后访问 http://localhost:8080,默认管理员账号为 admin,密码为 MaxKB@123..

对于离线环境,可以使用官方提供的离线安装包;如果你已经在用 1Panel 服务器管理面板,也可以直接从应用商店一键安装。


第二部分:功能使用指南

2.1 五步上手流程

一个典型的 MaxKB 使用流程如下:

第一步:对接模型。进入"模型管理"页面,选择一个供应商(如通义千问),填写 API Key 等凭证信息,完成模型接入。建议至少接入一个 LLM(大语言模型)和一个 Embedding(向量模型)。

第二步:创建知识库。进入"知识库"页面,创建一个知识库,然后上传文档或输入网页 URL。系统会自动完成文档解析、分段和向量化。

第三步:创建智能体。进入"智能体"页面,选择"简易模式"或"工作流模式"。简易模式只需选择模型、关联知识库、配置提示词即可;工作流模式则需要通过可视化编辑器编排流程。

第四步:测试对话。在智能体详情页直接进行对话测试,观察回答效果,根据效果调整知识库内容或提示词。

第五步:发布集成。通过 JS 嵌入脚本将对话窗口嵌入到现有网站,或通过 OpenAI 兼容 API 从后端调用。

2.2 模型管理详解

2.2.1 支持的模型类型

MaxKB 支持 9 种模型类型,每种类型在系统中扮演不同角色:

模型类型 用途 典型供应商
LLM(大语言模型) 核心对话生成 OpenAI GPT、通义千问、DeepSeek
Embedding(向量模型) 文本向量化 text-embedding-ada-002、bge-large
Reranker(重排序) 检索结果精排 Cohere Rerank、bge-reranker
STT(语音转文字) 语音输入 Whisper
TTS(文字转语音) 语音输出 OpenAI TTS
Image(图像理解) 多模态理解 GPT-4o、Claude 3
TTI(图像生成) 文生图 DALL-E、通义万相
TTV(文生视频) 文字生成视频 各厂商视频模型
ITV(图生视频) 图片生成视频 各厂商视频模型

2.2.2 对接国内模型示例(以通义千问为例)

  1. 登录阿里云百炼平台(https://bailian.console.aliyun.com/),获取 API Key
  2. 在 MaxKB 的"模型管理"页面点击"添加模型"
  3. 供应商选择"通义千问"
  4. 填写 API Key 和 API Base(通常为 https://dashscope.aliyuncs.com/compatible-mode/v1
  5. 选择要添加的模型(如 qwen-max、qwen-plus 等)
  6. 点击"验证"确认连接成功,然后保存

2.3 知识库管理详解

2.3.1 知识库类型

MaxKB 支持多种知识库数据来源:

  • 通用知识库(BASE):手动上传文档,支持 PDF、Word(docx/doc)、Markdown、TXT、HTML、Excel、CSV 等格式
  • Web 站点知识库(WEB):输入 URL 后自动爬取网页内容并构建知识库
  • 飞书知识库(LARK):对接飞书文档
  • 语雀知识库(YUQUE):对接语雀文档
  • 工作流知识库(WORKFLOW):通过工作流自动生成知识库内容

2.3.2 文档分段策略

文档分段是 RAG 质量的关键。MaxKB 提供两种分段方式:

智能分段:系统自动根据文档结构(标题、段落、表格等)进行拆分,适合大多数场景。

高级分段:允许自定义分段规则,包括:

  • 分段标识符(如 \n\n--- 等)
  • 最大分段长度
  • 重叠长度(相邻段落的重叠字符数,保证上下文连续性)

2.3.3 检索模式

MaxKB 支持三种检索模式,这是其 RAG 能力的一大亮点:

  • 向量检索(Embedding Search):通过余弦距离计算语义相似度,适合语义匹配场景
  • 关键词检索(Keywords Search):基于 PostgreSQL 全文检索引擎,支持中文分词(jieba),适合精确关键词匹配
  • 混合检索(Blend Search):同时执行向量检索和关键词检索,然后通过融合排序算法合并结果,兼顾语义理解和精确匹配

此外,还支持 Reranker 重排序,在初步检索结果上使用重排序模型进行精排,进一步提升检索质量。

2.4 智能体创建详解

2.4.1 简易智能体

简易智能体的配置非常直观,核心要素包括:

模型选择:选择已接入的 LLM 模型,设置温度(Temperature)、最大 Token 数等参数。

提示词模板:定义系统提示词,支持变量引用。例如:

你是一个专业的客服助手。请根据以下已知信息回答用户的问题。
已知信息:{data}
用户问题:{question}

知识库关联:选择要关联的知识库,配置检索参数(检索模式、Top K、相似度阈值等)。

对话轮次:设置保留的历史对话轮数,用于多轮对话上下文。

长期记忆:可选开启,系统会使用 LLM 自动从对话中提取用户偏好和关键信息,存储在长期记忆中用于后续对话。

2.4.2 工作流智能体(高级)

工作流智能体通过可视化 DAG 编排复杂的 AI 处理流程。以下是全部节点类型的分类说明:

流程控制类

  • start-node:流程起始节点,定义全局输入变量
  • condition-node:条件分支,根据表达式选择不同的执行路径
  • loop-node / loop-start-node / loop-break-node / loop-continue-node:循环节点组,支持重复执行子流程

AI 能力类

  • ai-chat-node:调用 LLM 生成回答
  • image-generate-node:文字生成图像
  • image-understand-node:图像理解分析
  • video-understand-node:视频理解分析
  • text-to-video-node / image-to-video-node:视频生成
  • speech-to-text-node / text-to-speech-node:语音转写与合成

知识检索类

  • search-knowledge-node:知识库向量检索
  • search-document-node:文档搜索
  • reranker-node:结果重排序
  • knowledge-write-node:向知识库写入数据

工具调用类

  • tool-node / tool-lib-node:调用自定义工具
  • mcp-node:调用 MCP 协议工具
  • application-node:嵌套调用其他智能体

数据处理类

  • question-node:问题优化/重写
  • intent-node:意图识别
  • parameter-extraction-node:结构化参数提取
  • variable-aggregation-node / variable-assign-node / variable-splitting-node:变量操作
  • document-extract-node / document-split-node:文档抽取与拆分
  • data-source-local-node / data-source-web-node:数据源接入

交互类

  • direct-reply-node:直接回复(不经过 LLM)
  • form-node:表单交互节点,暂停流程等待用户输入

2.5 工具系统

MaxKB 支持 6 种工具类型:

  • 自定义工具(CUSTOM):用 Python 代码编写自定义逻辑
  • 工作流工具(WORKFLOW):将工作流封装为可复用工具
  • MCP 工具(MCP):通过 MCP 协议调用外部工具
  • 技能工具(SKILL):预置的技能包
  • 数据源工具(DATA_SOURCE):连接外部数据源
  • 内置工具(INTERNAL):系统自带的工具

2.6 触发器系统

触发器允许你定时或基于事件自动执行智能体对话或工具调用:

定时触发器:支持日/周/月/间隔/Cron 表达式五种调度模式。例如,每天上午 9 点自动运行一个数据分析智能体。

事件触发器(Webhook):通过 HTTP Webhook 接收外部事件,触发智能体响应。例如,当有新的客服工单时自动触发智能体处理。

2.7 集成方式

MaxKB 提供三种主要的集成方式:

JS 嵌入:在你的网页中添加一段 JS 脚本即可嵌入对话窗口,无需编写前端代码。

OpenAI 兼容 API:MaxKB 提供 /v1/chat/completions 兼容接口,任何支持 OpenAI API 的客户端都可以直接对接。

MCP 协议:将 MaxKB 智能体作为 MCP Server 暴露,其他 MCP Client 可以直接调用。


第三部分:源码架构深度剖析

3.1 项目入口与启动流程

MaxKB 的统一入口是 main.py,支持四种运行模式:

# main.py 的核心命令行参数
parser.add_argument('action', choices=("start", "dev", "upgrade_db", "collect_static"))
parser.add_argument("services", choices=("all", "web", "task", "celery", "local_model"))

生产模式start):先执行 collect_static(收集前端静态文件)和 perform_db_migrate(数据库迁移),然后通过 Gunicorn 启动 Web 服务,同时启动 Celery Worker 处理异步任务。

开发模式dev):使用 Django 内置的 runserver 启动开发服务器,支持热重载。

启动时的一个巧妙设计是数据库迁移的重试机制——PostgreSQL 在崩溃恢复期间虽然 TCP 端口已开放但会拒绝查询,因此 perform_db_migrate() 函数包含了最多 10 次、间隔 5 秒的重试逻辑,确保在容器启动场景下能可靠完成迁移。

3.2 目录结构与模块职责

D:\data\coder\MaxKB\
├── main.py                    # 统一入口
├── pyproject.toml             # 依赖管理(使用 uv 包管理器)
├── apps/                      # Django 应用根目录
│   ├── maxkb/                 # 核心配置
│   │   ├── settings/          # 双模式配置(web/model)
│   │   │   ├── base/
│   │   │   │   ├── web.py     # Web 服务 Django 配置
│   │   │   │   └── model.py   # 本地模型服务配置
│   │   │   ├── auth/          # 认证处理器配置
│   │   │   ├── lib.py         # Celery 配置
│   │   │   └── logging.py     # 日志配置
│   │   ├── urls/
│   │   │   └── web.py         # 全局路由(admin API + chat API)
│   │   ├── conf.py            # 配置管理器(YAML + 环境变量)
│   │   └── const.py           # 常量定义
│   ├── application/           # ★ 核心:智能体/应用模块
│   ├── knowledge/             # ★ 核心:知识库模块
│   ├── models_provider/        # ★ 核心:模型供应商适配
│   ├── chat/                  # ★ 核心:对话处理模块
│   ├── tools/                 # 工具管理模块
│   ├── trigger/               # 触发器模块
│   ├── users/                 # 用户管理模块
│   ├── system_manage/         # 系统管理(权限、资源映射)
│   ├── common/                # ★ 基础设施层(30+ 子模块)
│   ├── ops/                   # 运维(Celery 配置)
│   ├── oss/                   # 对象存储模块
│   ├── folders/               # 文件夹管理
│   ├── homepage/              # 首页统计
│   ├── local_model/           # 本地模型服务
│   └── locales/               # 国际化翻译
└── ui/                        # Vue 3 前端项目

3.3 双模式配置架构

MaxKB 的 Settings 采用了一个精巧的双模式设计,通过环境变量 SERVER_NAME 动态切换配置:

SERVER_NAME=web         →  使用 settings/base/web.py + settings/auth/web.py
SERVER_NAME=local_model →  使用 settings/base/model.py + settings/auth/model.py

这样设计的原因是 Web 服务和本地模型服务的安全需求不同:Web 服务需要完整的用户认证体系,而本地模型服务只需要简单的 API Key 认证,两者使用不同的认证处理器链。

3.4 URL 路由架构

全局路由分为两个入口前缀:

# Admin API(管理后台)
urlpatterns += [path(f'{admin_path}/api/', include([
    path('application/', include('application.urls')),
    path('knowledge/', include('knowledge.urls')),
    path('models/', include('models_provider.urls')),
    # ... 其他管理模块
]))]

# Chat API(对话接口)
urlpatterns += [path(f'{chat_path}/api/', include([
    path('application/', include('chat.urls')),
]))]

404 处理做了 SPA 前端路由兼容——当请求的不是 API 路径时,自动返回 index.html,让 Vue Router 接管前端路由。

3.5 模块依赖关系图

                    ┌──────────────────┐
                    │     common       │  (基础设施层)
                    │  认证/缓存/工具   │
                    └────────┬─────────┘
                             │
            ┌────────────────┼────────────────┐
            │                │                │
    ┌───────┴──────┐ ┌──────┴─────┐ ┌───────┴────────┐
    │    users     │ │  models    │ │ system_manage  │
    │  (用户认证)   │ │ _provider  │ │ (权限/资源映射)  │
    └───────┬──────┘ │ (模型适配)  │ └───────┬────────┘
            │        └──────┬─────┘         │
            │               │               │
    ┌───────┴──────┐ ┌──────┴─────┐ ┌───────┴────────┐
    │  knowledge   │ │application │ │    tools       │
    │  (知识库)     │ │ (智能体)    │ │   (工具)       │
    └───────┬──────┘ └──────┬─────┘ └───────┬────────┘
            │               │               │
            │        ┌──────┴─────┐         │
            └───────>│    chat    │<────────┘
                     │  (对话)    │
                     └──────┬─────┘
                            │
                     ┌──────┴─────┐
                     │  trigger   │
                     │ (触发器)    │
                     └────────────┘

核心依赖链:chat 是最终入口,依赖 application(工作流/流水线执行)、knowledge(知识库检索)、models_provider(LLM 调用)、tools(工具调用)。trigger 在最上层,通过触发 applicationtools 实现自动化任务。


第四部分:核心模块源码深度剖析

4.1 Application 模块——智能体的心脏

路径apps/application/

Application 模块是整个平台最核心的业务模块,承担了智能体定义、工作流引擎和对话流水线三大职责。

4.1.1 数据模型设计

Application 模型apps/application/models/application.py)的设计体现了"一个模型承载两种模式"的思想:

class Application(models.Model):
    id = models.UUIDField(primary_key=True, default=uuid.uuid7)
    name = models.CharField(max_length=128)
    type = models.CharField(max_length=256)  # SIMPLE 或 WORK_FLOW
    model = models.ForeignKey(Model, on_delete=models.SET_NULL)  # 关联 LLM
    knowledge_setting = models.JSONField()     # 知识库检索配置
    model_setting = models.JSONField()         # 提示词模板、对话轮次等
    work_flow = models.JSONField()             # 工作流定义(DAG JSON)
    # ... 还有 40+ 字段:prologue, dialogue_number, problem_optimization,
    #     mcp_enable, mcp_tool_ids, tool_enable, application_enable,
    #     skill_tool_ids, long_term_enable, tts_model, stt_model 等

关键字段 type 决定了应用的行为模式。当 type=SIMPLE 时,系统使用 knowledge_settingmodel_setting 构建 RAG 流水线;当 type=WORK_FLOW 时,系统解析 work_flow 字段的 JSON 定义来构建 DAG 工作流。

work_flow 字段的 JSON 结构示例:

{
  "nodes": [
    {"id": "node_1", "type": "start-node", "properties": {...}},
    {"id": "node_2", "type": "ai-chat-node", "properties": {...}},
    {"id": "node_3", "type": "search-knowledge-node", "properties": {...}}
  ],
  "edges": [
    {"sourceNodeId": "node_1", "targetNodeId": "node_2"},
    {"sourceNodeId": "node_1", "targetNodeId": "node_3"},
    {"sourceNodeId": "node_3", "targetNodeId": "node_2"}
  ]
}

ApplicationVersion 模型实现了版本快照机制——每次"发布"应用时,系统会创建一条 ApplicationVersion 记录,完整拷贝当前的应用配置。这样即使后续修改了应用配置,已发布的版本也不会受影响。

ApplicationFolder 使用 django-mptt 库实现了 MPTT(Modified Preorder Tree Traversal)树形文件夹结构,支持无限层级的文件夹嵌套,查询效率优于传统的递归方式。

4.1.2 工作流引擎——自研 DAG 执行器

工作流引擎是 MaxKB 最精华的部分,位于 apps/application/flow/workflow_manage.py

核心类 WorkflowManage 的设计思路:

class WorkflowManage:
    def __init__(self, flow, params, ...):
        self.flow = flow           # Workflow 对象(包含 nodes 和 edges)
        self.params = params       # 运行参数(question, chat_id 等)
        self.context = {}          # 全局上下文
        self.node_chunk_manage = NodeChunkManage(self)  # 流式 chunk 管理

    def run(self):
        """入口方法"""
        start_node = self.get_start_node()      # 找到 start-node
        self.run_chain_async(start_node)         # 异步执行

    def run_chain_async(self, node):
        """提交节点到线程池异步执行"""
        future = executor.submit(self._run_node, node)
        # 200 个工作线程的 ThreadPoolExecutor

    def _run_node(self, node):
        """执行单个节点"""
        result = node.run()                      # 调用节点的 run 方法
        self.write_context(result)               # 将结果写入上下文
        next_nodes = self.get_next_node_list(node, result)  # 推导下游节点
        for next_node in next_nodes:
            self.run_chain_async(next_node)      # 递归执行下游节点

执行模型的关键特性

  1. 并行执行:当一个节点有多个下游节点时,它们会被并行提交到线程池执行。例如条件分支的两个分支、或并行检索多个知识库。

  2. 条件分支路由condition-node 执行后会产生一个 branch_id,在推导下游节点时,只有 sourceAnchorId 匹配该 branch_id 的边才会被选中,从而实现分支路由。

  3. 流式响应run_stream() 方法通过 Python generator 实现 SSE(Server-Sent Events)流式输出。NodeChunkManage 负责管理多个节点的 chunk 输出顺序——它维护一个 chunk 队列,按照节点完成的顺序依次输出每个节点的流式内容。

  4. 表单中断form-node 是一个特殊节点,它可以中断整个工作流的执行,将当前状态保存到 Redis 缓存中,等待用户提交表单后再恢复执行。这是通过 is_interrupt() 函数检测的:

def is_interrupt(node, step_variable, global_variable):
    return node.type == 'form-node' and not node.context.get('is_submit', False)
  1. 异常处理:支持节点级异常捕获。当节点执行失败时,如果开启了 enableException,流程会走异常分支(error edge)而不是直接终止。

4.1.3 工作流节点体系

所有节点都继承自 INode 基类(apps/application/flow/i_step_node.py),核心接口:

class INode(ABC):
    @abstractmethod
    def execute(self, **kwargs) -> NodeResult:
        """节点的核心执行逻辑,返回 NodeResult"""
        pass

    def run(self) -> NodeResult:
        """模板方法:记录开始时间 → 调用 execute → 写入上下文 → 返回结果"""
        self.context['start_time'] = time.time()
        result = self.execute(**self.get_node_params())
        return result

NodeResult 封装了两层变量:

  • step_variable:节点级变量,写入当前节点的 context
  • workflow_variable:全局变量,写入工作流的 context

ai-chat-node 为例,它的 execute 方法会:

  1. 从上下文获取前面节点传入的变量(如检索到的知识段落)
  2. 组装 PromptTemplate(使用 Jinja2 模板引擎)
  3. 调用 LLM 模型生成回答(支持流式)
  4. 返回 NodeResult(step_variable={'answer': answer}, workflow_variable=None)

4.1.4 简易模式对话流水线

对于简易模式的智能体,MaxKB 使用了一套责任链模式的对话流水线(apps/application/chat_pipeline/):

ResetProblemStep → SearchDatasetStep → GenerateHumanMessageStep → ChatStep

ResetProblemStep(问题优化):可选步骤。使用 LLM 对用户的问题进行重写/优化,使其更适合检索。例如将"上次那个问题"重写为"关于XX产品的退换货政策"。

SearchDatasetStep(知识库检索):对优化后的问题执行向量检索/关键词检索/混合检索,获取 Top K 个最相关的文档段落。

GenerateHumanMessageStep(组装 Prompt):将检索到的知识段落和用户问题组装成完整的 Prompt,使用 Jinja2 模板引擎渲染。

ChatStep(调用 LLM):将组装好的 Prompt 发送给 LLM,获取流式回答。

这条流水线通过 PipelineManage.builder() 构建,实际代码中使用统一的 append_step() 方法:

pipeline_manage_builder = PipelineManage.builder()
# 如果开启了问题优化,才添加 ResetProblemStep
if chat_info.application.problem_optimization:
    pipeline_manage_builder.append_step(BaseResetProblemStep)
# 依次添加检索、Prompt 组装、对话步骤
pipeline = (pipeline_manage_builder
    .append_step(BaseSearchDatasetStep)
    .append_step(BaseGenerateHumanMessageStep)
    .append_step(BaseChatStep)
    .add_base_to_response(base_to_response)
    .build())

注意这里传入的是(如 BaseChatStep)而非实例,Builder 内部负责实例化。ResetProblemStep 是可选的,只有在应用配置中开启了"问题优化"功能时才会添加。

4.2 Knowledge 模块——知识库的大脑

路径apps/knowledge/

4.2.1 数据模型设计

Knowledge 模块的数据模型设计非常精巧,体现了对 RAG 全流程的深度思考:

Knowledge (知识库)
  ├── Document (文档)
  │     ├── Paragraph (段落) ← 文档被拆分后的每个段落
  │     │     └── Embedding (向量) ← 每个段落对应的向量表示
  │     └── Problem (问题) ← FAQ 关联问题
  │           └── ProblemParagraphMapping ← 问题与段落的映射
  ├── Tag (标签) / DocumentTag ← 文档标签
  ├── Termbase (术语表) ← 优化分词的自定义词典
  └── KnowledgeFolder ← MPTT 树形文件夹

Document 模型使用了一套精巧的状态字符串编码来管理四种并行任务的进度:

class TaskType:
    EMBEDDING = 1          # 向量化任务
    GENERATE_PROBLEM = 2   # 问题生成任务
    SYNC = 3               # 同步任务
    TOKENIZE = 4           # 分词索引任务

# Status 类使用字符串编码,每个字符位代表一种任务类型的状态
# 状态值:0=未开始, 1=进行中, 2=成功, 3=失败, 4=撤销, 5=已撤销, n=忽略
class Status:
    # 通过字符串位置索引来管理多个任务状态
    # 例如 status_str[TaskType.EMBEDDING] 可获取向量化任务的状态

这样设计的好处是四种任务可以并行执行(通过 Celery 异步任务),互不干扰,同时通过一个字符串状态字段就能追踪所有任务的状态。

Embedding 模型深度利用了 PostgreSQL 的特性:

class Embedding(models.Model):
    id = models.CharField(max_length=128, primary_key=True)  # 运行时通过 uuid.uuid7() 赋值
    knowledge = models.ForeignKey(Knowledge, on_delete=models.CASCADE)  # 外键关联
    document = models.ForeignKey(Document, on_delete=models.CASCADE)
    paragraph = models.ForeignKey(Paragraph, on_delete=models.CASCADE)
    embedding = VectorField(verbose_name="向量")              # pgvector 向量字段
    search_vector = SearchVectorField(verbose_name="分词", default="")  # PostgreSQL 全文检索向量
    source_type = models.CharField()                          # 来源类型
    source_id = models.CharField(max_length=128)              # 来源 ID

注意这里的一个设计要点:id 使用的是 CharField 而非 UUIDField,这是因为 UUID7 在运行时以字符串形式生成并赋值,使用 CharField 可以更灵活地处理主键格式。同时 knowledgedocumentparagraph 使用 Django 的 ForeignKey 建立了标准的外键关联关系,可以利用 ORM 的 select_related 做关联查询优化。

4.2.2 向量检索实现

向量检索的核心逻辑在 apps/knowledge/vector/pg_vector.pyPGVector 类封装了三种搜索模式:

向量检索(EmbeddingSearch)

def search(self, query_text, query_embedding, knowledge_id_list, ...):
    # 1. 对查询文本进行向量化
    text_embedding = embedding.embed_query(query_text)
    # 2. 通过加载 .sql 文件执行原生 SQL 查询(而非 Django ORM)
    #    这样可以充分利用 pgvector 的 HNSW 索引和自定义排序逻辑
    exec_sql, exec_params = generate_sql_by_query_dict(
        {"embedding_query": query_set},
        select_string=get_file_content(
            os.path.join(PROJECT_DIR, "apps", "knowledge", "sql", "embedding_search.sql")
        ),
        with_table_name=True,
    )
    # 3. 按知识库逐个查询(利用每个知识库的 HNSW 索引)
    embedding_model = select_list(exec_sql, exec_params)

MaxKB 选择使用原生 SQL 文件(.sql)而非 Django ORM 来执行向量检索,这是因为 pgvector 的距离计算、索引提示和结果排序逻辑用原生 SQL 表达更为高效和可控。

关键词检索(KeywordsSearch):利用 PostgreSQL 的 tsvectortsquery,结合 jieba 中文分词和自定义术语表进行全文检索。to_ts_vector()to_query() 函数封装了分词逻辑:

# 自定义分词:先加载术语表中的自定义词,再用 jieba 分词
terms = list(QuerySet(Termbase).filter(knowledge_id=knowledge_id)
             .values_list("content", flat=True))
search_vector = SearchVector(Value(to_ts_vector(text, user_words=terms)))

混合检索(BlendSearch):同时执行向量检索和关键词检索,然后通过 RRF(Reciprocal Rank Fusion)算法或加权融合算法合并结果。

4.2.3 异步向量化任务

向量化是计算密集型任务,MaxKB 通过 Celery 异步执行:

# apps/knowledge/task/embedding.py

@app.task(base=QueueOnce, once={'graceful': True})
def embedding_by_paragraph(paragraph_id, knowledge_id, ...):
    """单段落向量化"""
    embedding_model = get_embedding_model(...)
    pg_vector.save(text, embedding_model)

@app.task(base=QueueOnce, once={'graceful': True})
def embedding_by_document(document_id, knowledge_id, ...):
    """文档级向量化:遍历所有段落"""
    paragraphs = QuerySet(Paragraph).filter(document_id=document_id)
    for paragraph in paragraphs:
        embedding_by_paragraph.delay(paragraph.id, ...)

这里有一个重要的设计决策:使用了 celery_once.QueueOnce 作为任务基类,通过 Redis 锁防止同一个文档或段落的重复向量化任务被提交。这在用户频繁点击"重新向量化"按钮时尤为重要。

4.3 Models Provider 模块——模型适配的抽象层

路径apps/models_provider/

4.3.1 三层抽象架构

Models Provider 模块采用了经典的三层抽象设计,使得添加新供应商变得非常简单:

第一层:IModelProvider(供应商接口)

class IModelProvider(ABC):
    @abstractmethod
    def get_model_info_manage(self):
        """返回 ModelInfoManage 对象,包含该供应商支持的所有模型信息"""
        pass

    def get_model(self, model_type, model_name, model_credential, **kwargs):
        """根据模型类型和名称,实例化并返回模型对象"""
        model_info = self.get_model_info_manage().get_model_info(model_type, model_name)
        return model_info.model_class.new_instance(model_type, model_name, model_credential, **kwargs)

    def is_valid_credential(self, model_type, model_name, model_credential, ...):
        """验证凭证有效性(通常是发送一个简单请求测试连通性)"""
        model_info = self.get_model_info_manage().get_model_info(model_type, model_name)
        return model_info.model_credential.is_valid(...)

第二层:MaxKBBaseModel(模型实例基类)

class MaxKBBaseModel(ABC):
    @staticmethod
    @abstractmethod
    def new_instance(model_type, model_name, model_credential, **model_kwargs):
        """工厂方法:根据凭证创建具体的模型实例"""
        pass

第三层:BaseModelCredential(凭证管理基类)

class BaseModelCredential(ABC):
    @abstractmethod
    def is_valid(self, model_type, model_name, model_credential, ...):
        """验证凭证是否有效"""
        pass

    def encryption(self, model_credential):
        """加密敏感凭证(API Key 等)"""
        pass

4.3.2 供应商实现模式

每个供应商在 impl/ 目录下遵循统一的目录结构:

impl/{provider_name}/
  ├── {provider}_model_provider.py   # 供应商类(注册支持的模型列表)
  ├── credential/                    # 各模型类型的凭证校验类
  │   ├── llm.py
  │   ├── embedding.py
  │   └── ...
  ├── model/                         # 各模型类型的实际调用类
  │   ├── llm.py                     # 封装 LangChain 的 ChatModel
  │   ├── embedding.py
  │   └── ...
  └── icon/                          # 供应商标图(SVG/PNG)

以 OpenAI 供应商为例,model/llm.py 内部实际是封装了 LangChain 的对话模型。MaxKB 定义了一个中间基类 BaseChatOpenAI,然后具体供应商继承它:

class OpenAIChatModel(MaxKBBaseModel, BaseChatOpenAI):
    @staticmethod
    def new_instance(model_type, model_name, model_credential, **kwargs):
        return OpenAIChatModel(
            model=model_name,
            openai_api_key=model_credential.get('api_key'),
            openai_api_base=model_credential.get('api_base'),
            streaming=kwargs.get('streaming', False),
            # ...
        )

这种设计的好处是:新增一个供应商只需要实现"供应商类 + 凭证校验 + 模型封装"三个文件,而所有供应商的公共逻辑(模型查找、凭证加密、参数校验)都由基类处理。

4.3.3 凭证安全

模型的 API Key 等敏感信息在存储前会被加密。BaseModelCredential 提供了 encryption() 方法,使用 common.utils.common.encryption() 函数(基于 cryptography 库)进行加密。数据库中存储的是加密后的凭证,使用时再解密。

4.4 Chat 模块——对话的入口

路径apps/chat/

Chat 模块是对话请求的处理入口,它的核心逻辑在 apps/chat/serializers/chat.py 中:

class ChatSerializers(serializers.Serializer):
    def chat(self, instance: dict, base_to_response: BaseToResponse = SystemToResponse()):
        """对话主入口:获取 ChatInfo 后根据应用类型分发"""
        chat_info = self.get_chat_info()  # 从缓存/数据库获取对话上下文
        application = chat_info.application

        if application.type == ApplicationTypeChoices.SIMPLE:
            return self.chat_simple(chat_info, instance, base_to_response)
        else:
            return self.chat_work_flow(chat_info, instance, base_to_response)

    def chat_simple(self, chat_info, instance, base_to_response):
        """简易模式:构建对话流水线执行"""
        # 使用 PipelineManage.builder() 构建流水线
        # 根据是否开启问题优化决定是否添加 ResetProblemStep
        pipeline = PipelineManage.builder()...build()
        return pipeline.run(...)

    def chat_work_flow(self, chat_info, instance, base_to_response):
        """工作流模式:创建 WorkflowManage 实例执行"""
        work_flow = Workflow.new_instance(application.work_flow)
        workflow = WorkflowManage(flow=work_flow, params={...})
        if stream:
            return workflow.run_stream()
        else:
            return workflow.run_block()

Chat 模块还实现了 OpenAI 兼容 APIOpenAIChatSerializer),使得 MaxKB 智能体可以作为 OpenAI API 的替代被调用:

POST /api/application/{api_key}/chat
Content-Type: application/json

{
    "model": "maxkb",
    "messages": [{"role": "user", "content": "你好"}],
    "stream": true
}

此外,Chat 模块还包含 MCP Server 实现apps/chat/mcp/tools.py),将 MaxKB 应用暴露为标准 MCP 工具,支持 initialize() / list_tools() / call_tool() 协议。

4.5 Common 模块——基础设施层

路径apps/common/

Common 模块是整个系统的地基,包含 30+ 个子模块,为上层业务提供通用能力:

认证框架(auth/:采用策略模式,通过配置动态加载认证处理器。支持三种认证方式:UserToken(用户 Token)、ChatAnonymousUserToken(匿名对话)、ApplicationKey(API Key)。

缓存层(cache/:封装了 Redis 操作,提供统一的缓存接口。对话上下文(ChatInfo)就缓存在 Redis 中,避免每次都查数据库。

文档分块(chunk/:实现了多种文档分段策略,支持按段落、按固定长度、按自定义分隔符等方式拆分文档。

分布式锁(lock/:基于 Redis 实现的分布式锁,用于防止并发任务冲突。

全局异常处理(exception/handle_exception 函数统一处理所有 API 异常,确保返回格式一致的错误响应。

统一响应封装(result/Result 类封装了统一的 API 响应格式:{code, message, data}

4.6 Trigger 模块——自动化调度

路径apps/trigger/

触发器模块的设计简洁而实用。核心模型 Trigger 支持两种类型:

class Trigger(models.Model):
    name = models.CharField(max_length=128)
    trigger_type = models.CharField()     # SCHEDULED 或 EVENT
    trigger_setting = models.JSONField()  # 调度配置(cron 表达式、间隔等)
    is_active = models.BooleanField()
    user = models.ForeignKey(User)        # 创建者
    desc = models.TextField()             # 描述
    meta = models.JSONField()             # 元数据

class TriggerTask(models.Model):
    trigger = models.ForeignKey(Trigger)
    source_type = models.CharField()      # APPLICATION 或 TOOL
    source_id = models.UUIDField()        # 关联的智能体或工具 ID
    parameter = models.JSONField()        # 输入参数
    is_active = models.BooleanField()

class TaskRecord(models.Model):
    """触发器任务执行记录"""
    trigger = models.ForeignKey(Trigger)
    trigger_task = models.ForeignKey(TriggerTask)
    state = models.CharField()            # PENDING/STARTED/SUCCESS/FAILURE 等
    run_time = models.FloatField()        # 执行耗时

定时触发器的实现基于 APScheduler:

class ScheduledTrigger:
    def deploy(self, trigger):
        """将触发器部署到调度器"""
        config = trigger.trigger_setting
        if config['type'] == 'cron':
            scheduler.add_job(
                func=ScheduledTrigger.execute,
                trigger='cron',
                kwargs={"trigger": trigger, "trigger_task": task},
                **config['cron_params']
            )

为了防止定时任务的并发执行(比如一个任务还没跑完,下一次调度又触发了),系统使用了 Redis 分布式锁:

def execute(trigger, trigger_task):
    rlock = RedisLock()
    lock_key = f'{trigger.id}:{trigger_task.source_id}'
    if rlock.try_lock(lock_key, 30 * 30):  # 锁超时 30 分钟
        try:
            # 根据 source_type 决定执行智能体对话还是工具调用
            if trigger_task.source_type == 'APPLICATION':
                # 调用智能体
                ...
            elif trigger_task.source_type == 'TOOL':
                # 调用工具
                ...
        finally:
            rlock.un_lock(lock_key)

第五部分:设计模式与架构决策分析

5.1 数据库设计亮点

UUID7 主键:全局使用 uuid_utils.compat.uuid7 生成主键。UUID7 是时间有序的 UUID,相比传统 UUID4,对数据库 B-tree 索引更加友好(新插入的记录不会导致索引页分裂),同时又保持了全局唯一性。

JSONField 的广泛使用work_flowmodel_settingknowledge_settingmeta 等灵活配置都使用 JSONField 存储。这种"关系型 + 半结构化"的混合设计,既保证了核心数据的查询效率,又给予了业务配置的灵活性。

PostgreSQL Large ObjectFile 模型使用 PostgreSQL 的 Large Object 机制存储大文件(文档、图片等),配合 zip 压缩,避免了文件系统管理的复杂性。

pgvector 向量存储:直接在 PostgreSQL 中存储向量数据,利用 HNSW 索引实现高效的近似最近邻搜索。这避免了引入独立的向量数据库(如 Milvus、Pinecone),降低了部署复杂性。

5.2 异步任务设计

MaxKB 的 Celery 配置有几个值得学习的设计:

HMAC 签名序列化:Celery 消息使用自定义的 hmac_signed_serializer 进行序列化,每个消息都带有 HMAC 签名,防止消息在 Redis 传输过程中被篡改。

防重复执行:使用 celery_once.QueueOnce 基类,基于 Redis 锁确保同一参数的任务不会被重复提交。例如,同一文档的向量化任务在队列中只会存在一份。

软超时机制:配置了 3600 秒(1 小时)的软超时,超时后 Celery 会抛出 SoftTimeLimitExceeded 异常,任务可以捕获该异常进行清理工作。

5.3 工作流引擎的设计哲学

MaxKB 的工作流引擎没有使用现有的工作流引擎(如 Airflow、Temporal),而是自研了一个轻量级的 DAG 执行器。这个决策的背后有几个考量:

流式支持:传统工作流引擎主要面向批处理场景,不支持流式输出。而 MaxKB 的 AI 对话场景需要从 LLM 获取流式 token 并实时推送给前端,这要求工作流引擎与 SSE 机制深度集成。

轻量高效:自研引擎基于线程池执行,没有引入额外的调度器和状态机,启动快、资源占用低。

表单中断:AI 工作流经常需要人工介入(如确认、补充信息),form-node 的中断-恢复机制在传统工作流引擎中实现起来比较复杂。

5.4 多租户设计

MaxKB 使用 workspace_id 字段实现工作空间级别的多租户隔离。每个数据模型(知识库、智能体、模型、工具等)都带有 workspace_id 字段,查询时自动过滤。WorkspaceUserPermission 模型管理用户与工作空间之间的权限关系。


第六部分:前端架构简析

6.1 技术栈与构建

前端位于 ui/ 目录,使用 Vue 3 + TypeScript + Vite 构建,UI 组件库为 Element Plus。构建分为两个入口:

  • build:构建管理后台(admin),包含智能体管理、知识库管理、模型管理等完整功能
  • build-chat:构建对话前台(chat),仅包含对话界面,体积更小

6.2 工作流编辑器

工作流可视化编辑器基于 LogicFlow 实现,这是一个国产的开源流程编辑框架。MaxKB 在其基础上自定义了 30+ 种节点类型和相应的图标、配置面板。前端 workflow/ 目录包含自定义节点的组件定义和插件。

6.3 状态管理

使用 Pinia 作为状态管理方案(Vue 3 官方推荐),管理全局状态如当前用户、当前工作空间、主题设置等。

6.4 国际化

通过 vue-i18n 支持多语言,后端的 apps/locales/ 目录包含 Django 的翻译文件(.po/.mo),实现了前后端统一的国际化方案。


第七部分:开发实践指南

7.1 本地开发环境搭建

后端环境

# 1. 安装依赖(使用 uv 包管理器)
pip install uv
cd D:\data\coder\MaxKB
uv sync

# 2. 启动 PostgreSQL(需要 pgvector 扩展)
docker run -d --name=maxkb-pg -p 5432:5432 \
  -e POSTGRES_DB=maxkb -e POSTGRES_USER=maxkb -e POSTGRES_PASSWORD=maxkb \
  pgvector/pgvector:pg16

# 3. 启动 Redis
docker run -d --name=maxkb-redis -p 6379:6379 redis:7

# 4. 配置数据库连接
# 编辑 apps/maxkb/config.yaml 或设置环境变量

# 5. 启动 Web 服务(开发模式)
python main.py dev web

# 6. 启动 Celery Worker(另开终端)
python main.py dev celery

前端环境

cd D:\data\coder\MaxKB\ui
npm install
npm run dev        # 启动开发服务器

7.2 新增一个模型供应商的步骤

假设你要新增一个名为 “MyAI” 的模型供应商:

  1. apps/models_provider/impl/ 下创建 my_ai/ 目录
  2. 创建 my_ai_model_provider.py,继承 IModelProvider,注册支持的模型列表
  3. credential/ 下创建各模型类型的凭证校验类(继承 BaseModelCredential
  4. model/ 下创建模型封装类(继承 MaxKBBaseModel 和对应的 LangChain 模型类)
  5. icon/ 下放供应商标图
  6. 在供应商注册处添加新供应商的引用

7.3 新增一个工作流节点的步骤

假设你要新增一个 “sentiment-analysis-node”(情感分析节点):

  1. apps/application/flow/step_node/ 下创建 sentiment_analysis_node/ 目录
  2. 创建节点类,继承 INode,实现 execute() 方法
  3. apps/application/flow/step_node/__init__.py 中注册节点类型
  4. 在前端 ui/src/workflow/ 下创建对应的自定义节点组件

7.4 API 文档访问

MaxKB 使用 drf-spectacular 自动生成 OpenAPI 3 文档。启动服务后访问:

http://localhost:8080/admin/api/schema/swagger-ui/

可以查看所有 API 接口的详细参数说明,并支持在线调试。


第八部分:核心数据流全链路追踪

8.1 一次完整的 RAG 对话流程

让我们追踪一次用户提问 “MaxKB 支持哪些模型?” 的完整处理过程:

1. 用户请求到达
   POST /chat/api/application/{api_key}/chat
   Body: {"message": "MaxKB 支持哪些模型?", "stream": true}

2. Chat 模块接收请求
   ChatSerializers.chat()
   → 判断应用类型为 SIMPLE
   → 调用 chat_simple()

3. 对话流水线启动
   PipelineManage.run()

4. 问题优化(可选)
   ResetProblemStep.execute()
   → 使用 LLM 重写问题(如果开启了此功能)
   → 输出优化后的问题:"MaxKB 智能体平台支持的模型供应商和模型类型有哪些?"

5. 知识库检索
   SearchDatasetStep.execute()
   → 调用 PGVector.search()
   → 文本向量化:embedding.embed_query("MaxKB 智能体平台支持的模型...")
   → 在 pgvector 中执行余弦相似度搜索
   → 同时执行全文检索(如果配置了混合检索)
   → 融合排序,返回 Top K 个段落

6. Prompt 组装
   GenerateHumanMessageStep.execute()
   → 加载提示词模板
   → 填充变量:{data} = 检索到的段落, {question} = 用户问题
   → 渲染 Jinja2 模板

7. LLM 调用
   ChatStep.execute()
   → 获取 LLM 模型实例(通过 models_provider 模块)
   → 调用 model.stream(messages) 获取流式响应
   → 逐 token 通过 SSE 推送给前端

8. 后处理
   WorkFlowPostHandler.handler()
   → 保存 ChatRecord(问题、回答、Token 消耗、运行详情)
   → 更新 Redis 对话缓存
   → 异步触发长期记忆提取(Celery 任务)

9. SSE 响应完成
   → 前端收到 [DONE] 信号
   → 渲染完整的回答

8.2 一次工作流对话的执行流程

对于工作流模式的智能体,执行路径有所不同:

第九部分:学习路径建议

9.1 功能学习者路径

如果你主要是想用好 MaxKB 的功能:

  1. 先用 Docker 一键部署跑起来
  2. 按照"五步上手流程"完成模型接入、知识库创建、智能体创建
  3. 重点学习知识库的分段策略和检索模式调优(这直接决定 RAG 效果)
  4. 尝试工作流编排,从简单的"检索→生成→回复"流程开始,逐步添加条件分支、循环等复杂节点
  5. 学习触发器配置,实现自动化任务

9.2 源码学习者路径

如果你想深入理解 MaxKB 的源码:

  1. 先读入口:从 main.py 开始理解启动流程,然后看 apps/maxkb/settings/ 理解配置体系
  2. 读模型定义:按 Application → Knowledge → Document → Paragraph → Embedding 的顺序阅读数据模型,理解数据关系
  3. 读对话流程:从 ChatSerializers.chat() 出发,分别追踪简易模式和工作流模式的执行路径
  4. 读工作流引擎:这是最复杂的部分,建议先理解 WorkflowManage 的整体调度逻辑,再逐个看节点实现
  5. 读模型适配层:选择一个供应商(如 OpenAI),从 IModelProviderMaxKBBaseModel 到具体的 ChatOpenAI,理解三层抽象
  6. 读基础设施:Common 模块中的认证、缓存、异常处理等,理解系统的横切关注点

9.3 二次开发者路径

如果你想在 MaxKB 基础上做二次开发:

  1. 完成源码学习路径的前 3 步
  2. 实践"新增一个工作流节点"(最简单)
  3. 实践"新增一个模型供应商"(中等难度)
  4. 学习 API 文档(/admin/api/schema/swagger-ui/),理解接口规范
  5. 阅读前端 ui/src/workflow/ 目录,理解工作流编辑器的扩展方式
  6. 阅读 apps/common/ 模块,了解可用的基础设施组件

附录 A:关键配置项说明

MaxKB 的配置通过 apps/maxkb/conf.pyConfigManager 管理,支持 YAML 文件和环境变量两种方式:

配置项 说明 默认值
DB_HOST PostgreSQL 地址 127.0.0.1
DB_PORT PostgreSQL 端口 5432
DB_USER 数据库用户 root
DB_PASSWORD 数据库密码 Password123@postgres
DB_NAME 数据库名 从 YAML 配置文件加载
REDIS_HOST Redis 地址 127.0.0.1
REDIS_PORT Redis 端口 6379
REDIS_PASSWORD Redis 密码 Password123@redis
MAXKB_CORE_WORKER Gunicorn Worker 数量 系统 CPU 核数
LOCAL_MODEL_HOST 本地模型服务地址 127.0.0.1
LOCAL_MODEL_PORT 本地模型服务端口 11636
EMBEDDING_MODEL_NAME 默认 Embedding 模型 各供应商默认模型

附录 B:Celery 任务清单

任务名 功能 所在文件
embedding_by_paragraph 单段落向量化 knowledge/task/embedding.py
embedding_by_document 文档级向量化 knowledge/task/embedding.py
embedding_by_knowledge 知识库级全量向量化 knowledge/task/embedding.py
tokenize_by_document 分词索引构建 knowledge/task/embedding.py
extract_long_term_memory 长期记忆提取 application/long_term_memory/__init__.py
deploy_scheduled_trigger 定时触发器部署 trigger/handler/impl/trigger/scheduled_trigger.py

附录 C:参考资料

0

评论区