MaxKB v2 全方位学习教程:从功能使用到源码深度剖析
本教程基于 MaxKB v2.0.0 源码和官方文档(https://maxkb.cn/docs/v2/)编写,旨在帮助开发者全面理解 MaxKB 的功能使用、架构设计和源码实现细节。
第一部分:认识 MaxKB
1.1 MaxKB 是什么
MaxKB(Max Knowledge Brain)是由飞致云(FIT2CLOUD)开源的企业级智能体平台,采用 GPLv3 许可证。它的核心定位是帮助企业以最低门槛落地 AI 应用——你不需要从零开始训练模型,而是通过"接入大模型 + 构建知识库 + 编排工作流"三步走策略,快速搭建出智能客服、办公助手、知识问答等场景的 AI 应用。
MaxKB 的设计理念可以概括为八个字:开箱即用,伴随成长。新手可以通过简易模式 5 分钟搭建一个 RAG 问答应用,而高级用户则可以通过工作流引擎编排包含 30+ 种节点类型的复杂业务流程。
1.2 核心能力概览
MaxKB 的核心能力可以分为四大支柱:
RAG 检索增强生成:这是 MaxKB 的基础能力。你可以上传 Word、PDF、Markdown、TXT 等格式的文档(或自动爬取在线文档),系统会自动完成文本拆分、向量化和索引构建。当用户提问时,系统先从知识库中检索相关段落,再将其作为上下文传给大模型,从而有效减少幻觉,提升回答质量。
工作流引擎:这是 MaxKB 的高级能力。内置了一个自研的 DAG(有向无环图)执行引擎,支持 AI 对话、知识库检索、条件分支、循环、工具调用、表单交互、图像生成/理解等 30+ 种节点类型,可以通过可视化界面拖拽编排。
模型中立:MaxKB 不绑定任何一家大模型供应商。目前已内置 27 个供应商适配器,涵盖国际厂商(OpenAI、Anthropic、Google Gemini、AWS Bedrock)、国内厂商(通义千问、文心一言、火山引擎、腾讯混元、智谱、Kimi、MiniMax、讯飞等)以及开源自部署方案(Ollama、vLLM、Xinference)。支持 9 种模型类型:LLM、Embedding、STT(语音转文字)、TTS(文字转语音)、Image(图像理解)、TTI(图像生成)、Reranker(重排序)、TTV(文生视频)、ITV(图生视频)。
无缝嵌入:MaxKB 提供 JS 嵌入脚本和 OpenAI 兼容 API,可以零编码嵌入到现有业务系统中。同时还支持 MCP(Model Context Protocol)协议,既可作为 MCP Server 暴露智能体,也可在工作流中调用外部 MCP 工具。
1.3 技术栈总览
| 层次 | 技术选型 |
|---|---|
| 前端框架 | Vue 3 + TypeScript + Vite |
| UI 组件库 | Element Plus |
| 工作流编辑器 | LogicFlow(节点拖拽与连线) |
| 后端框架 | Python 3.11+ / Django 5.2 + Django REST Framework 3.17 |
| AI 框架 | LangChain 1.2 + LangGraph 1.1 + deepagents 0.5 |
| 向量数据库 | PostgreSQL + pgvector 扩展 |
| 关系数据库 | PostgreSQL(深度使用全文检索、数组字段、Large Object 等特性) |
| 缓存/消息队列 | Redis(缓存 + Celery Broker) |
| 异步任务 | Celery 5.5 + django-celery-beat + celery-once(防重复执行) |
| 定时调度 | APScheduler(通过 django-apscheduler 集成) |
| 部署 | Docker + Gunicorn(多进程模式) |
| API 文档 | drf-spectacular(自动生成 OpenAPI 3 / Swagger 文档) |
1.4 快速部署
最简单的方式是使用 Docker 一行命令启动:
# Linux
docker run -d --name=maxkb --restart=always -p 8080:8080 \
-v ~/.maxkb:/opt/maxkb registry.fit2cloud.com/maxkb/maxkb
# Windows (PowerShell)
docker run -d --name=maxkb --restart=always -p 8080:8080 `
-v C:/maxkb:/opt/maxkb registry.fit2cloud.com/maxkb/maxkb
启动后访问 http://localhost:8080,默认管理员账号为 admin,密码为 MaxKB@123..。
对于离线环境,可以使用官方提供的离线安装包;如果你已经在用 1Panel 服务器管理面板,也可以直接从应用商店一键安装。
第二部分:功能使用指南
2.1 五步上手流程
一个典型的 MaxKB 使用流程如下:
第一步:对接模型。进入"模型管理"页面,选择一个供应商(如通义千问),填写 API Key 等凭证信息,完成模型接入。建议至少接入一个 LLM(大语言模型)和一个 Embedding(向量模型)。
第二步:创建知识库。进入"知识库"页面,创建一个知识库,然后上传文档或输入网页 URL。系统会自动完成文档解析、分段和向量化。
第三步:创建智能体。进入"智能体"页面,选择"简易模式"或"工作流模式"。简易模式只需选择模型、关联知识库、配置提示词即可;工作流模式则需要通过可视化编辑器编排流程。
第四步:测试对话。在智能体详情页直接进行对话测试,观察回答效果,根据效果调整知识库内容或提示词。
第五步:发布集成。通过 JS 嵌入脚本将对话窗口嵌入到现有网站,或通过 OpenAI 兼容 API 从后端调用。
2.2 模型管理详解
2.2.1 支持的模型类型
MaxKB 支持 9 种模型类型,每种类型在系统中扮演不同角色:
| 模型类型 | 用途 | 典型供应商 |
|---|---|---|
| LLM(大语言模型) | 核心对话生成 | OpenAI GPT、通义千问、DeepSeek |
| Embedding(向量模型) | 文本向量化 | text-embedding-ada-002、bge-large |
| Reranker(重排序) | 检索结果精排 | Cohere Rerank、bge-reranker |
| STT(语音转文字) | 语音输入 | Whisper |
| TTS(文字转语音) | 语音输出 | OpenAI TTS |
| Image(图像理解) | 多模态理解 | GPT-4o、Claude 3 |
| TTI(图像生成) | 文生图 | DALL-E、通义万相 |
| TTV(文生视频) | 文字生成视频 | 各厂商视频模型 |
| ITV(图生视频) | 图片生成视频 | 各厂商视频模型 |
2.2.2 对接国内模型示例(以通义千问为例)
- 登录阿里云百炼平台(https://bailian.console.aliyun.com/),获取 API Key
- 在 MaxKB 的"模型管理"页面点击"添加模型"
- 供应商选择"通义千问"
- 填写 API Key 和 API Base(通常为
https://dashscope.aliyuncs.com/compatible-mode/v1) - 选择要添加的模型(如 qwen-max、qwen-plus 等)
- 点击"验证"确认连接成功,然后保存
2.3 知识库管理详解
2.3.1 知识库类型
MaxKB 支持多种知识库数据来源:
- 通用知识库(BASE):手动上传文档,支持 PDF、Word(docx/doc)、Markdown、TXT、HTML、Excel、CSV 等格式
- Web 站点知识库(WEB):输入 URL 后自动爬取网页内容并构建知识库
- 飞书知识库(LARK):对接飞书文档
- 语雀知识库(YUQUE):对接语雀文档
- 工作流知识库(WORKFLOW):通过工作流自动生成知识库内容
2.3.2 文档分段策略
文档分段是 RAG 质量的关键。MaxKB 提供两种分段方式:
智能分段:系统自动根据文档结构(标题、段落、表格等)进行拆分,适合大多数场景。
高级分段:允许自定义分段规则,包括:
- 分段标识符(如
\n\n、---等) - 最大分段长度
- 重叠长度(相邻段落的重叠字符数,保证上下文连续性)
2.3.3 检索模式
MaxKB 支持三种检索模式,这是其 RAG 能力的一大亮点:
- 向量检索(Embedding Search):通过余弦距离计算语义相似度,适合语义匹配场景
- 关键词检索(Keywords Search):基于 PostgreSQL 全文检索引擎,支持中文分词(jieba),适合精确关键词匹配
- 混合检索(Blend Search):同时执行向量检索和关键词检索,然后通过融合排序算法合并结果,兼顾语义理解和精确匹配
此外,还支持 Reranker 重排序,在初步检索结果上使用重排序模型进行精排,进一步提升检索质量。
2.4 智能体创建详解
2.4.1 简易智能体
简易智能体的配置非常直观,核心要素包括:
模型选择:选择已接入的 LLM 模型,设置温度(Temperature)、最大 Token 数等参数。
提示词模板:定义系统提示词,支持变量引用。例如:
你是一个专业的客服助手。请根据以下已知信息回答用户的问题。
已知信息:{data}
用户问题:{question}
知识库关联:选择要关联的知识库,配置检索参数(检索模式、Top K、相似度阈值等)。
对话轮次:设置保留的历史对话轮数,用于多轮对话上下文。
长期记忆:可选开启,系统会使用 LLM 自动从对话中提取用户偏好和关键信息,存储在长期记忆中用于后续对话。
2.4.2 工作流智能体(高级)
工作流智能体通过可视化 DAG 编排复杂的 AI 处理流程。以下是全部节点类型的分类说明:
流程控制类:
start-node:流程起始节点,定义全局输入变量condition-node:条件分支,根据表达式选择不同的执行路径loop-node/loop-start-node/loop-break-node/loop-continue-node:循环节点组,支持重复执行子流程
AI 能力类:
ai-chat-node:调用 LLM 生成回答image-generate-node:文字生成图像image-understand-node:图像理解分析video-understand-node:视频理解分析text-to-video-node/image-to-video-node:视频生成speech-to-text-node/text-to-speech-node:语音转写与合成
知识检索类:
search-knowledge-node:知识库向量检索search-document-node:文档搜索reranker-node:结果重排序knowledge-write-node:向知识库写入数据
工具调用类:
tool-node/tool-lib-node:调用自定义工具mcp-node:调用 MCP 协议工具application-node:嵌套调用其他智能体
数据处理类:
question-node:问题优化/重写intent-node:意图识别parameter-extraction-node:结构化参数提取variable-aggregation-node/variable-assign-node/variable-splitting-node:变量操作document-extract-node/document-split-node:文档抽取与拆分data-source-local-node/data-source-web-node:数据源接入
交互类:
direct-reply-node:直接回复(不经过 LLM)form-node:表单交互节点,暂停流程等待用户输入
2.5 工具系统
MaxKB 支持 6 种工具类型:
- 自定义工具(CUSTOM):用 Python 代码编写自定义逻辑
- 工作流工具(WORKFLOW):将工作流封装为可复用工具
- MCP 工具(MCP):通过 MCP 协议调用外部工具
- 技能工具(SKILL):预置的技能包
- 数据源工具(DATA_SOURCE):连接外部数据源
- 内置工具(INTERNAL):系统自带的工具
2.6 触发器系统
触发器允许你定时或基于事件自动执行智能体对话或工具调用:
定时触发器:支持日/周/月/间隔/Cron 表达式五种调度模式。例如,每天上午 9 点自动运行一个数据分析智能体。
事件触发器(Webhook):通过 HTTP Webhook 接收外部事件,触发智能体响应。例如,当有新的客服工单时自动触发智能体处理。
2.7 集成方式
MaxKB 提供三种主要的集成方式:
JS 嵌入:在你的网页中添加一段 JS 脚本即可嵌入对话窗口,无需编写前端代码。
OpenAI 兼容 API:MaxKB 提供 /v1/chat/completions 兼容接口,任何支持 OpenAI API 的客户端都可以直接对接。
MCP 协议:将 MaxKB 智能体作为 MCP Server 暴露,其他 MCP Client 可以直接调用。
第三部分:源码架构深度剖析
3.1 项目入口与启动流程
MaxKB 的统一入口是 main.py,支持四种运行模式:
# main.py 的核心命令行参数
parser.add_argument('action', choices=("start", "dev", "upgrade_db", "collect_static"))
parser.add_argument("services", choices=("all", "web", "task", "celery", "local_model"))
生产模式(start):先执行 collect_static(收集前端静态文件)和 perform_db_migrate(数据库迁移),然后通过 Gunicorn 启动 Web 服务,同时启动 Celery Worker 处理异步任务。
开发模式(dev):使用 Django 内置的 runserver 启动开发服务器,支持热重载。
启动时的一个巧妙设计是数据库迁移的重试机制——PostgreSQL 在崩溃恢复期间虽然 TCP 端口已开放但会拒绝查询,因此 perform_db_migrate() 函数包含了最多 10 次、间隔 5 秒的重试逻辑,确保在容器启动场景下能可靠完成迁移。
3.2 目录结构与模块职责
D:\data\coder\MaxKB\
├── main.py # 统一入口
├── pyproject.toml # 依赖管理(使用 uv 包管理器)
├── apps/ # Django 应用根目录
│ ├── maxkb/ # 核心配置
│ │ ├── settings/ # 双模式配置(web/model)
│ │ │ ├── base/
│ │ │ │ ├── web.py # Web 服务 Django 配置
│ │ │ │ └── model.py # 本地模型服务配置
│ │ │ ├── auth/ # 认证处理器配置
│ │ │ ├── lib.py # Celery 配置
│ │ │ └── logging.py # 日志配置
│ │ ├── urls/
│ │ │ └── web.py # 全局路由(admin API + chat API)
│ │ ├── conf.py # 配置管理器(YAML + 环境变量)
│ │ └── const.py # 常量定义
│ ├── application/ # ★ 核心:智能体/应用模块
│ ├── knowledge/ # ★ 核心:知识库模块
│ ├── models_provider/ # ★ 核心:模型供应商适配
│ ├── chat/ # ★ 核心:对话处理模块
│ ├── tools/ # 工具管理模块
│ ├── trigger/ # 触发器模块
│ ├── users/ # 用户管理模块
│ ├── system_manage/ # 系统管理(权限、资源映射)
│ ├── common/ # ★ 基础设施层(30+ 子模块)
│ ├── ops/ # 运维(Celery 配置)
│ ├── oss/ # 对象存储模块
│ ├── folders/ # 文件夹管理
│ ├── homepage/ # 首页统计
│ ├── local_model/ # 本地模型服务
│ └── locales/ # 国际化翻译
└── ui/ # Vue 3 前端项目
3.3 双模式配置架构
MaxKB 的 Settings 采用了一个精巧的双模式设计,通过环境变量 SERVER_NAME 动态切换配置:
SERVER_NAME=web → 使用 settings/base/web.py + settings/auth/web.py
SERVER_NAME=local_model → 使用 settings/base/model.py + settings/auth/model.py
这样设计的原因是 Web 服务和本地模型服务的安全需求不同:Web 服务需要完整的用户认证体系,而本地模型服务只需要简单的 API Key 认证,两者使用不同的认证处理器链。
3.4 URL 路由架构
全局路由分为两个入口前缀:
# Admin API(管理后台)
urlpatterns += [path(f'{admin_path}/api/', include([
path('application/', include('application.urls')),
path('knowledge/', include('knowledge.urls')),
path('models/', include('models_provider.urls')),
# ... 其他管理模块
]))]
# Chat API(对话接口)
urlpatterns += [path(f'{chat_path}/api/', include([
path('application/', include('chat.urls')),
]))]
404 处理做了 SPA 前端路由兼容——当请求的不是 API 路径时,自动返回 index.html,让 Vue Router 接管前端路由。
3.5 模块依赖关系图
┌──────────────────┐
│ common │ (基础设施层)
│ 认证/缓存/工具 │
└────────┬─────────┘
│
┌────────────────┼────────────────┐
│ │ │
┌───────┴──────┐ ┌──────┴─────┐ ┌───────┴────────┐
│ users │ │ models │ │ system_manage │
│ (用户认证) │ │ _provider │ │ (权限/资源映射) │
└───────┬──────┘ │ (模型适配) │ └───────┬────────┘
│ └──────┬─────┘ │
│ │ │
┌───────┴──────┐ ┌──────┴─────┐ ┌───────┴────────┐
│ knowledge │ │application │ │ tools │
│ (知识库) │ │ (智能体) │ │ (工具) │
└───────┬──────┘ └──────┬─────┘ └───────┬────────┘
│ │ │
│ ┌──────┴─────┐ │
└───────>│ chat │<────────┘
│ (对话) │
└──────┬─────┘
│
┌──────┴─────┐
│ trigger │
│ (触发器) │
└────────────┘
核心依赖链:chat 是最终入口,依赖 application(工作流/流水线执行)、knowledge(知识库检索)、models_provider(LLM 调用)、tools(工具调用)。trigger 在最上层,通过触发 application 或 tools 实现自动化任务。
第四部分:核心模块源码深度剖析
4.1 Application 模块——智能体的心脏
路径:apps/application/
Application 模块是整个平台最核心的业务模块,承担了智能体定义、工作流引擎和对话流水线三大职责。
4.1.1 数据模型设计
Application 模型(apps/application/models/application.py)的设计体现了"一个模型承载两种模式"的思想:
class Application(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid7)
name = models.CharField(max_length=128)
type = models.CharField(max_length=256) # SIMPLE 或 WORK_FLOW
model = models.ForeignKey(Model, on_delete=models.SET_NULL) # 关联 LLM
knowledge_setting = models.JSONField() # 知识库检索配置
model_setting = models.JSONField() # 提示词模板、对话轮次等
work_flow = models.JSONField() # 工作流定义(DAG JSON)
# ... 还有 40+ 字段:prologue, dialogue_number, problem_optimization,
# mcp_enable, mcp_tool_ids, tool_enable, application_enable,
# skill_tool_ids, long_term_enable, tts_model, stt_model 等
关键字段 type 决定了应用的行为模式。当 type=SIMPLE 时,系统使用 knowledge_setting 和 model_setting 构建 RAG 流水线;当 type=WORK_FLOW 时,系统解析 work_flow 字段的 JSON 定义来构建 DAG 工作流。
work_flow 字段的 JSON 结构示例:
{
"nodes": [
{"id": "node_1", "type": "start-node", "properties": {...}},
{"id": "node_2", "type": "ai-chat-node", "properties": {...}},
{"id": "node_3", "type": "search-knowledge-node", "properties": {...}}
],
"edges": [
{"sourceNodeId": "node_1", "targetNodeId": "node_2"},
{"sourceNodeId": "node_1", "targetNodeId": "node_3"},
{"sourceNodeId": "node_3", "targetNodeId": "node_2"}
]
}
ApplicationVersion 模型实现了版本快照机制——每次"发布"应用时,系统会创建一条 ApplicationVersion 记录,完整拷贝当前的应用配置。这样即使后续修改了应用配置,已发布的版本也不会受影响。
ApplicationFolder 使用 django-mptt 库实现了 MPTT(Modified Preorder Tree Traversal)树形文件夹结构,支持无限层级的文件夹嵌套,查询效率优于传统的递归方式。
4.1.2 工作流引擎——自研 DAG 执行器
工作流引擎是 MaxKB 最精华的部分,位于 apps/application/flow/workflow_manage.py。
核心类 WorkflowManage 的设计思路:
class WorkflowManage:
def __init__(self, flow, params, ...):
self.flow = flow # Workflow 对象(包含 nodes 和 edges)
self.params = params # 运行参数(question, chat_id 等)
self.context = {} # 全局上下文
self.node_chunk_manage = NodeChunkManage(self) # 流式 chunk 管理
def run(self):
"""入口方法"""
start_node = self.get_start_node() # 找到 start-node
self.run_chain_async(start_node) # 异步执行
def run_chain_async(self, node):
"""提交节点到线程池异步执行"""
future = executor.submit(self._run_node, node)
# 200 个工作线程的 ThreadPoolExecutor
def _run_node(self, node):
"""执行单个节点"""
result = node.run() # 调用节点的 run 方法
self.write_context(result) # 将结果写入上下文
next_nodes = self.get_next_node_list(node, result) # 推导下游节点
for next_node in next_nodes:
self.run_chain_async(next_node) # 递归执行下游节点
执行模型的关键特性:
-
并行执行:当一个节点有多个下游节点时,它们会被并行提交到线程池执行。例如条件分支的两个分支、或并行检索多个知识库。
-
条件分支路由:
condition-node执行后会产生一个branch_id,在推导下游节点时,只有sourceAnchorId匹配该branch_id的边才会被选中,从而实现分支路由。 -
流式响应:
run_stream()方法通过 Python generator 实现 SSE(Server-Sent Events)流式输出。NodeChunkManage负责管理多个节点的 chunk 输出顺序——它维护一个 chunk 队列,按照节点完成的顺序依次输出每个节点的流式内容。 -
表单中断:
form-node是一个特殊节点,它可以中断整个工作流的执行,将当前状态保存到 Redis 缓存中,等待用户提交表单后再恢复执行。这是通过is_interrupt()函数检测的:
def is_interrupt(node, step_variable, global_variable):
return node.type == 'form-node' and not node.context.get('is_submit', False)
- 异常处理:支持节点级异常捕获。当节点执行失败时,如果开启了
enableException,流程会走异常分支(error edge)而不是直接终止。
4.1.3 工作流节点体系
所有节点都继承自 INode 基类(apps/application/flow/i_step_node.py),核心接口:
class INode(ABC):
@abstractmethod
def execute(self, **kwargs) -> NodeResult:
"""节点的核心执行逻辑,返回 NodeResult"""
pass
def run(self) -> NodeResult:
"""模板方法:记录开始时间 → 调用 execute → 写入上下文 → 返回结果"""
self.context['start_time'] = time.time()
result = self.execute(**self.get_node_params())
return result
NodeResult 封装了两层变量:
step_variable:节点级变量,写入当前节点的 contextworkflow_variable:全局变量,写入工作流的 context
以 ai-chat-node 为例,它的 execute 方法会:
- 从上下文获取前面节点传入的变量(如检索到的知识段落)
- 组装 PromptTemplate(使用 Jinja2 模板引擎)
- 调用 LLM 模型生成回答(支持流式)
- 返回
NodeResult(step_variable={'answer': answer}, workflow_variable=None)
4.1.4 简易模式对话流水线
对于简易模式的智能体,MaxKB 使用了一套责任链模式的对话流水线(apps/application/chat_pipeline/):
ResetProblemStep → SearchDatasetStep → GenerateHumanMessageStep → ChatStep
ResetProblemStep(问题优化):可选步骤。使用 LLM 对用户的问题进行重写/优化,使其更适合检索。例如将"上次那个问题"重写为"关于XX产品的退换货政策"。
SearchDatasetStep(知识库检索):对优化后的问题执行向量检索/关键词检索/混合检索,获取 Top K 个最相关的文档段落。
GenerateHumanMessageStep(组装 Prompt):将检索到的知识段落和用户问题组装成完整的 Prompt,使用 Jinja2 模板引擎渲染。
ChatStep(调用 LLM):将组装好的 Prompt 发送给 LLM,获取流式回答。
这条流水线通过 PipelineManage.builder() 构建,实际代码中使用统一的 append_step() 方法:
pipeline_manage_builder = PipelineManage.builder()
# 如果开启了问题优化,才添加 ResetProblemStep
if chat_info.application.problem_optimization:
pipeline_manage_builder.append_step(BaseResetProblemStep)
# 依次添加检索、Prompt 组装、对话步骤
pipeline = (pipeline_manage_builder
.append_step(BaseSearchDatasetStep)
.append_step(BaseGenerateHumanMessageStep)
.append_step(BaseChatStep)
.add_base_to_response(base_to_response)
.build())
注意这里传入的是类(如 BaseChatStep)而非实例,Builder 内部负责实例化。ResetProblemStep 是可选的,只有在应用配置中开启了"问题优化"功能时才会添加。
4.2 Knowledge 模块——知识库的大脑
路径:apps/knowledge/
4.2.1 数据模型设计
Knowledge 模块的数据模型设计非常精巧,体现了对 RAG 全流程的深度思考:
Knowledge (知识库)
├── Document (文档)
│ ├── Paragraph (段落) ← 文档被拆分后的每个段落
│ │ └── Embedding (向量) ← 每个段落对应的向量表示
│ └── Problem (问题) ← FAQ 关联问题
│ └── ProblemParagraphMapping ← 问题与段落的映射
├── Tag (标签) / DocumentTag ← 文档标签
├── Termbase (术语表) ← 优化分词的自定义词典
└── KnowledgeFolder ← MPTT 树形文件夹
Document 模型使用了一套精巧的状态字符串编码来管理四种并行任务的进度:
class TaskType:
EMBEDDING = 1 # 向量化任务
GENERATE_PROBLEM = 2 # 问题生成任务
SYNC = 3 # 同步任务
TOKENIZE = 4 # 分词索引任务
# Status 类使用字符串编码,每个字符位代表一种任务类型的状态
# 状态值:0=未开始, 1=进行中, 2=成功, 3=失败, 4=撤销, 5=已撤销, n=忽略
class Status:
# 通过字符串位置索引来管理多个任务状态
# 例如 status_str[TaskType.EMBEDDING] 可获取向量化任务的状态
这样设计的好处是四种任务可以并行执行(通过 Celery 异步任务),互不干扰,同时通过一个字符串状态字段就能追踪所有任务的状态。
Embedding 模型深度利用了 PostgreSQL 的特性:
class Embedding(models.Model):
id = models.CharField(max_length=128, primary_key=True) # 运行时通过 uuid.uuid7() 赋值
knowledge = models.ForeignKey(Knowledge, on_delete=models.CASCADE) # 外键关联
document = models.ForeignKey(Document, on_delete=models.CASCADE)
paragraph = models.ForeignKey(Paragraph, on_delete=models.CASCADE)
embedding = VectorField(verbose_name="向量") # pgvector 向量字段
search_vector = SearchVectorField(verbose_name="分词", default="") # PostgreSQL 全文检索向量
source_type = models.CharField() # 来源类型
source_id = models.CharField(max_length=128) # 来源 ID
注意这里的一个设计要点:id 使用的是 CharField 而非 UUIDField,这是因为 UUID7 在运行时以字符串形式生成并赋值,使用 CharField 可以更灵活地处理主键格式。同时 knowledge、document、paragraph 使用 Django 的 ForeignKey 建立了标准的外键关联关系,可以利用 ORM 的 select_related 做关联查询优化。
4.2.2 向量检索实现
向量检索的核心逻辑在 apps/knowledge/vector/pg_vector.py,PGVector 类封装了三种搜索模式:
向量检索(EmbeddingSearch):
def search(self, query_text, query_embedding, knowledge_id_list, ...):
# 1. 对查询文本进行向量化
text_embedding = embedding.embed_query(query_text)
# 2. 通过加载 .sql 文件执行原生 SQL 查询(而非 Django ORM)
# 这样可以充分利用 pgvector 的 HNSW 索引和自定义排序逻辑
exec_sql, exec_params = generate_sql_by_query_dict(
{"embedding_query": query_set},
select_string=get_file_content(
os.path.join(PROJECT_DIR, "apps", "knowledge", "sql", "embedding_search.sql")
),
with_table_name=True,
)
# 3. 按知识库逐个查询(利用每个知识库的 HNSW 索引)
embedding_model = select_list(exec_sql, exec_params)
MaxKB 选择使用原生 SQL 文件(.sql)而非 Django ORM 来执行向量检索,这是因为 pgvector 的距离计算、索引提示和结果排序逻辑用原生 SQL 表达更为高效和可控。
关键词检索(KeywordsSearch):利用 PostgreSQL 的 tsvector 和 tsquery,结合 jieba 中文分词和自定义术语表进行全文检索。to_ts_vector() 和 to_query() 函数封装了分词逻辑:
# 自定义分词:先加载术语表中的自定义词,再用 jieba 分词
terms = list(QuerySet(Termbase).filter(knowledge_id=knowledge_id)
.values_list("content", flat=True))
search_vector = SearchVector(Value(to_ts_vector(text, user_words=terms)))
混合检索(BlendSearch):同时执行向量检索和关键词检索,然后通过 RRF(Reciprocal Rank Fusion)算法或加权融合算法合并结果。
4.2.3 异步向量化任务
向量化是计算密集型任务,MaxKB 通过 Celery 异步执行:
# apps/knowledge/task/embedding.py
@app.task(base=QueueOnce, once={'graceful': True})
def embedding_by_paragraph(paragraph_id, knowledge_id, ...):
"""单段落向量化"""
embedding_model = get_embedding_model(...)
pg_vector.save(text, embedding_model)
@app.task(base=QueueOnce, once={'graceful': True})
def embedding_by_document(document_id, knowledge_id, ...):
"""文档级向量化:遍历所有段落"""
paragraphs = QuerySet(Paragraph).filter(document_id=document_id)
for paragraph in paragraphs:
embedding_by_paragraph.delay(paragraph.id, ...)
这里有一个重要的设计决策:使用了 celery_once.QueueOnce 作为任务基类,通过 Redis 锁防止同一个文档或段落的重复向量化任务被提交。这在用户频繁点击"重新向量化"按钮时尤为重要。
4.3 Models Provider 模块——模型适配的抽象层
路径:apps/models_provider/
4.3.1 三层抽象架构
Models Provider 模块采用了经典的三层抽象设计,使得添加新供应商变得非常简单:
第一层:IModelProvider(供应商接口)
class IModelProvider(ABC):
@abstractmethod
def get_model_info_manage(self):
"""返回 ModelInfoManage 对象,包含该供应商支持的所有模型信息"""
pass
def get_model(self, model_type, model_name, model_credential, **kwargs):
"""根据模型类型和名称,实例化并返回模型对象"""
model_info = self.get_model_info_manage().get_model_info(model_type, model_name)
return model_info.model_class.new_instance(model_type, model_name, model_credential, **kwargs)
def is_valid_credential(self, model_type, model_name, model_credential, ...):
"""验证凭证有效性(通常是发送一个简单请求测试连通性)"""
model_info = self.get_model_info_manage().get_model_info(model_type, model_name)
return model_info.model_credential.is_valid(...)
第二层:MaxKBBaseModel(模型实例基类)
class MaxKBBaseModel(ABC):
@staticmethod
@abstractmethod
def new_instance(model_type, model_name, model_credential, **model_kwargs):
"""工厂方法:根据凭证创建具体的模型实例"""
pass
第三层:BaseModelCredential(凭证管理基类)
class BaseModelCredential(ABC):
@abstractmethod
def is_valid(self, model_type, model_name, model_credential, ...):
"""验证凭证是否有效"""
pass
def encryption(self, model_credential):
"""加密敏感凭证(API Key 等)"""
pass
4.3.2 供应商实现模式
每个供应商在 impl/ 目录下遵循统一的目录结构:
impl/{provider_name}/
├── {provider}_model_provider.py # 供应商类(注册支持的模型列表)
├── credential/ # 各模型类型的凭证校验类
│ ├── llm.py
│ ├── embedding.py
│ └── ...
├── model/ # 各模型类型的实际调用类
│ ├── llm.py # 封装 LangChain 的 ChatModel
│ ├── embedding.py
│ └── ...
└── icon/ # 供应商标图(SVG/PNG)
以 OpenAI 供应商为例,model/llm.py 内部实际是封装了 LangChain 的对话模型。MaxKB 定义了一个中间基类 BaseChatOpenAI,然后具体供应商继承它:
class OpenAIChatModel(MaxKBBaseModel, BaseChatOpenAI):
@staticmethod
def new_instance(model_type, model_name, model_credential, **kwargs):
return OpenAIChatModel(
model=model_name,
openai_api_key=model_credential.get('api_key'),
openai_api_base=model_credential.get('api_base'),
streaming=kwargs.get('streaming', False),
# ...
)
这种设计的好处是:新增一个供应商只需要实现"供应商类 + 凭证校验 + 模型封装"三个文件,而所有供应商的公共逻辑(模型查找、凭证加密、参数校验)都由基类处理。
4.3.3 凭证安全
模型的 API Key 等敏感信息在存储前会被加密。BaseModelCredential 提供了 encryption() 方法,使用 common.utils.common.encryption() 函数(基于 cryptography 库)进行加密。数据库中存储的是加密后的凭证,使用时再解密。
4.4 Chat 模块——对话的入口
路径:apps/chat/
Chat 模块是对话请求的处理入口,它的核心逻辑在 apps/chat/serializers/chat.py 中:
class ChatSerializers(serializers.Serializer):
def chat(self, instance: dict, base_to_response: BaseToResponse = SystemToResponse()):
"""对话主入口:获取 ChatInfo 后根据应用类型分发"""
chat_info = self.get_chat_info() # 从缓存/数据库获取对话上下文
application = chat_info.application
if application.type == ApplicationTypeChoices.SIMPLE:
return self.chat_simple(chat_info, instance, base_to_response)
else:
return self.chat_work_flow(chat_info, instance, base_to_response)
def chat_simple(self, chat_info, instance, base_to_response):
"""简易模式:构建对话流水线执行"""
# 使用 PipelineManage.builder() 构建流水线
# 根据是否开启问题优化决定是否添加 ResetProblemStep
pipeline = PipelineManage.builder()...build()
return pipeline.run(...)
def chat_work_flow(self, chat_info, instance, base_to_response):
"""工作流模式:创建 WorkflowManage 实例执行"""
work_flow = Workflow.new_instance(application.work_flow)
workflow = WorkflowManage(flow=work_flow, params={...})
if stream:
return workflow.run_stream()
else:
return workflow.run_block()
Chat 模块还实现了 OpenAI 兼容 API(OpenAIChatSerializer),使得 MaxKB 智能体可以作为 OpenAI API 的替代被调用:
POST /api/application/{api_key}/chat
Content-Type: application/json
{
"model": "maxkb",
"messages": [{"role": "user", "content": "你好"}],
"stream": true
}
此外,Chat 模块还包含 MCP Server 实现(apps/chat/mcp/tools.py),将 MaxKB 应用暴露为标准 MCP 工具,支持 initialize() / list_tools() / call_tool() 协议。
4.5 Common 模块——基础设施层
路径:apps/common/
Common 模块是整个系统的地基,包含 30+ 个子模块,为上层业务提供通用能力:
认证框架(auth/):采用策略模式,通过配置动态加载认证处理器。支持三种认证方式:UserToken(用户 Token)、ChatAnonymousUserToken(匿名对话)、ApplicationKey(API Key)。
缓存层(cache/):封装了 Redis 操作,提供统一的缓存接口。对话上下文(ChatInfo)就缓存在 Redis 中,避免每次都查数据库。
文档分块(chunk/):实现了多种文档分段策略,支持按段落、按固定长度、按自定义分隔符等方式拆分文档。
分布式锁(lock/):基于 Redis 实现的分布式锁,用于防止并发任务冲突。
全局异常处理(exception/):handle_exception 函数统一处理所有 API 异常,确保返回格式一致的错误响应。
统一响应封装(result/):Result 类封装了统一的 API 响应格式:{code, message, data}。
4.6 Trigger 模块——自动化调度
路径:apps/trigger/
触发器模块的设计简洁而实用。核心模型 Trigger 支持两种类型:
class Trigger(models.Model):
name = models.CharField(max_length=128)
trigger_type = models.CharField() # SCHEDULED 或 EVENT
trigger_setting = models.JSONField() # 调度配置(cron 表达式、间隔等)
is_active = models.BooleanField()
user = models.ForeignKey(User) # 创建者
desc = models.TextField() # 描述
meta = models.JSONField() # 元数据
class TriggerTask(models.Model):
trigger = models.ForeignKey(Trigger)
source_type = models.CharField() # APPLICATION 或 TOOL
source_id = models.UUIDField() # 关联的智能体或工具 ID
parameter = models.JSONField() # 输入参数
is_active = models.BooleanField()
class TaskRecord(models.Model):
"""触发器任务执行记录"""
trigger = models.ForeignKey(Trigger)
trigger_task = models.ForeignKey(TriggerTask)
state = models.CharField() # PENDING/STARTED/SUCCESS/FAILURE 等
run_time = models.FloatField() # 执行耗时
定时触发器的实现基于 APScheduler:
class ScheduledTrigger:
def deploy(self, trigger):
"""将触发器部署到调度器"""
config = trigger.trigger_setting
if config['type'] == 'cron':
scheduler.add_job(
func=ScheduledTrigger.execute,
trigger='cron',
kwargs={"trigger": trigger, "trigger_task": task},
**config['cron_params']
)
为了防止定时任务的并发执行(比如一个任务还没跑完,下一次调度又触发了),系统使用了 Redis 分布式锁:
def execute(trigger, trigger_task):
rlock = RedisLock()
lock_key = f'{trigger.id}:{trigger_task.source_id}'
if rlock.try_lock(lock_key, 30 * 30): # 锁超时 30 分钟
try:
# 根据 source_type 决定执行智能体对话还是工具调用
if trigger_task.source_type == 'APPLICATION':
# 调用智能体
...
elif trigger_task.source_type == 'TOOL':
# 调用工具
...
finally:
rlock.un_lock(lock_key)
第五部分:设计模式与架构决策分析
5.1 数据库设计亮点
UUID7 主键:全局使用 uuid_utils.compat.uuid7 生成主键。UUID7 是时间有序的 UUID,相比传统 UUID4,对数据库 B-tree 索引更加友好(新插入的记录不会导致索引页分裂),同时又保持了全局唯一性。
JSONField 的广泛使用:work_flow、model_setting、knowledge_setting、meta 等灵活配置都使用 JSONField 存储。这种"关系型 + 半结构化"的混合设计,既保证了核心数据的查询效率,又给予了业务配置的灵活性。
PostgreSQL Large Object:File 模型使用 PostgreSQL 的 Large Object 机制存储大文件(文档、图片等),配合 zip 压缩,避免了文件系统管理的复杂性。
pgvector 向量存储:直接在 PostgreSQL 中存储向量数据,利用 HNSW 索引实现高效的近似最近邻搜索。这避免了引入独立的向量数据库(如 Milvus、Pinecone),降低了部署复杂性。
5.2 异步任务设计
MaxKB 的 Celery 配置有几个值得学习的设计:
HMAC 签名序列化:Celery 消息使用自定义的 hmac_signed_serializer 进行序列化,每个消息都带有 HMAC 签名,防止消息在 Redis 传输过程中被篡改。
防重复执行:使用 celery_once.QueueOnce 基类,基于 Redis 锁确保同一参数的任务不会被重复提交。例如,同一文档的向量化任务在队列中只会存在一份。
软超时机制:配置了 3600 秒(1 小时)的软超时,超时后 Celery 会抛出 SoftTimeLimitExceeded 异常,任务可以捕获该异常进行清理工作。
5.3 工作流引擎的设计哲学
MaxKB 的工作流引擎没有使用现有的工作流引擎(如 Airflow、Temporal),而是自研了一个轻量级的 DAG 执行器。这个决策的背后有几个考量:
流式支持:传统工作流引擎主要面向批处理场景,不支持流式输出。而 MaxKB 的 AI 对话场景需要从 LLM 获取流式 token 并实时推送给前端,这要求工作流引擎与 SSE 机制深度集成。
轻量高效:自研引擎基于线程池执行,没有引入额外的调度器和状态机,启动快、资源占用低。
表单中断:AI 工作流经常需要人工介入(如确认、补充信息),form-node 的中断-恢复机制在传统工作流引擎中实现起来比较复杂。
5.4 多租户设计
MaxKB 使用 workspace_id 字段实现工作空间级别的多租户隔离。每个数据模型(知识库、智能体、模型、工具等)都带有 workspace_id 字段,查询时自动过滤。WorkspaceUserPermission 模型管理用户与工作空间之间的权限关系。
第六部分:前端架构简析
6.1 技术栈与构建
前端位于 ui/ 目录,使用 Vue 3 + TypeScript + Vite 构建,UI 组件库为 Element Plus。构建分为两个入口:
build:构建管理后台(admin),包含智能体管理、知识库管理、模型管理等完整功能build-chat:构建对话前台(chat),仅包含对话界面,体积更小
6.2 工作流编辑器
工作流可视化编辑器基于 LogicFlow 实现,这是一个国产的开源流程编辑框架。MaxKB 在其基础上自定义了 30+ 种节点类型和相应的图标、配置面板。前端 workflow/ 目录包含自定义节点的组件定义和插件。
6.3 状态管理
使用 Pinia 作为状态管理方案(Vue 3 官方推荐),管理全局状态如当前用户、当前工作空间、主题设置等。
6.4 国际化
通过 vue-i18n 支持多语言,后端的 apps/locales/ 目录包含 Django 的翻译文件(.po/.mo),实现了前后端统一的国际化方案。
第七部分:开发实践指南
7.1 本地开发环境搭建
后端环境:
# 1. 安装依赖(使用 uv 包管理器)
pip install uv
cd D:\data\coder\MaxKB
uv sync
# 2. 启动 PostgreSQL(需要 pgvector 扩展)
docker run -d --name=maxkb-pg -p 5432:5432 \
-e POSTGRES_DB=maxkb -e POSTGRES_USER=maxkb -e POSTGRES_PASSWORD=maxkb \
pgvector/pgvector:pg16
# 3. 启动 Redis
docker run -d --name=maxkb-redis -p 6379:6379 redis:7
# 4. 配置数据库连接
# 编辑 apps/maxkb/config.yaml 或设置环境变量
# 5. 启动 Web 服务(开发模式)
python main.py dev web
# 6. 启动 Celery Worker(另开终端)
python main.py dev celery
前端环境:
cd D:\data\coder\MaxKB\ui
npm install
npm run dev # 启动开发服务器
7.2 新增一个模型供应商的步骤
假设你要新增一个名为 “MyAI” 的模型供应商:
- 在
apps/models_provider/impl/下创建my_ai/目录 - 创建
my_ai_model_provider.py,继承IModelProvider,注册支持的模型列表 - 在
credential/下创建各模型类型的凭证校验类(继承BaseModelCredential) - 在
model/下创建模型封装类(继承MaxKBBaseModel和对应的 LangChain 模型类) - 在
icon/下放供应商标图 - 在供应商注册处添加新供应商的引用
7.3 新增一个工作流节点的步骤
假设你要新增一个 “sentiment-analysis-node”(情感分析节点):
- 在
apps/application/flow/step_node/下创建sentiment_analysis_node/目录 - 创建节点类,继承
INode,实现execute()方法 - 在
apps/application/flow/step_node/__init__.py中注册节点类型 - 在前端
ui/src/workflow/下创建对应的自定义节点组件
7.4 API 文档访问
MaxKB 使用 drf-spectacular 自动生成 OpenAPI 3 文档。启动服务后访问:
http://localhost:8080/admin/api/schema/swagger-ui/
可以查看所有 API 接口的详细参数说明,并支持在线调试。
第八部分:核心数据流全链路追踪
8.1 一次完整的 RAG 对话流程
让我们追踪一次用户提问 “MaxKB 支持哪些模型?” 的完整处理过程:
1. 用户请求到达
POST /chat/api/application/{api_key}/chat
Body: {"message": "MaxKB 支持哪些模型?", "stream": true}
2. Chat 模块接收请求
ChatSerializers.chat()
→ 判断应用类型为 SIMPLE
→ 调用 chat_simple()
3. 对话流水线启动
PipelineManage.run()
4. 问题优化(可选)
ResetProblemStep.execute()
→ 使用 LLM 重写问题(如果开启了此功能)
→ 输出优化后的问题:"MaxKB 智能体平台支持的模型供应商和模型类型有哪些?"
5. 知识库检索
SearchDatasetStep.execute()
→ 调用 PGVector.search()
→ 文本向量化:embedding.embed_query("MaxKB 智能体平台支持的模型...")
→ 在 pgvector 中执行余弦相似度搜索
→ 同时执行全文检索(如果配置了混合检索)
→ 融合排序,返回 Top K 个段落
6. Prompt 组装
GenerateHumanMessageStep.execute()
→ 加载提示词模板
→ 填充变量:{data} = 检索到的段落, {question} = 用户问题
→ 渲染 Jinja2 模板
7. LLM 调用
ChatStep.execute()
→ 获取 LLM 模型实例(通过 models_provider 模块)
→ 调用 model.stream(messages) 获取流式响应
→ 逐 token 通过 SSE 推送给前端
8. 后处理
WorkFlowPostHandler.handler()
→ 保存 ChatRecord(问题、回答、Token 消耗、运行详情)
→ 更新 Redis 对话缓存
→ 异步触发长期记忆提取(Celery 任务)
9. SSE 响应完成
→ 前端收到 [DONE] 信号
→ 渲染完整的回答
8.2 一次工作流对话的执行流程
对于工作流模式的智能体,执行路径有所不同:
第九部分:学习路径建议
9.1 功能学习者路径
如果你主要是想用好 MaxKB 的功能:
- 先用 Docker 一键部署跑起来
- 按照"五步上手流程"完成模型接入、知识库创建、智能体创建
- 重点学习知识库的分段策略和检索模式调优(这直接决定 RAG 效果)
- 尝试工作流编排,从简单的"检索→生成→回复"流程开始,逐步添加条件分支、循环等复杂节点
- 学习触发器配置,实现自动化任务
9.2 源码学习者路径
如果你想深入理解 MaxKB 的源码:
- 先读入口:从
main.py开始理解启动流程,然后看apps/maxkb/settings/理解配置体系 - 读模型定义:按
Application → Knowledge → Document → Paragraph → Embedding的顺序阅读数据模型,理解数据关系 - 读对话流程:从
ChatSerializers.chat()出发,分别追踪简易模式和工作流模式的执行路径 - 读工作流引擎:这是最复杂的部分,建议先理解
WorkflowManage的整体调度逻辑,再逐个看节点实现 - 读模型适配层:选择一个供应商(如 OpenAI),从
IModelProvider到MaxKBBaseModel到具体的ChatOpenAI,理解三层抽象 - 读基础设施:Common 模块中的认证、缓存、异常处理等,理解系统的横切关注点
9.3 二次开发者路径
如果你想在 MaxKB 基础上做二次开发:
- 完成源码学习路径的前 3 步
- 实践"新增一个工作流节点"(最简单)
- 实践"新增一个模型供应商"(中等难度)
- 学习 API 文档(
/admin/api/schema/swagger-ui/),理解接口规范 - 阅读前端
ui/src/workflow/目录,理解工作流编辑器的扩展方式 - 阅读
apps/common/模块,了解可用的基础设施组件
附录 A:关键配置项说明
MaxKB 的配置通过 apps/maxkb/conf.py 的 ConfigManager 管理,支持 YAML 文件和环境变量两种方式:
| 配置项 | 说明 | 默认值 |
|---|---|---|
DB_HOST |
PostgreSQL 地址 | 127.0.0.1 |
DB_PORT |
PostgreSQL 端口 | 5432 |
DB_USER |
数据库用户 | root |
DB_PASSWORD |
数据库密码 | Password123@postgres |
DB_NAME |
数据库名 | 从 YAML 配置文件加载 |
REDIS_HOST |
Redis 地址 | 127.0.0.1 |
REDIS_PORT |
Redis 端口 | 6379 |
REDIS_PASSWORD |
Redis 密码 | Password123@redis |
MAXKB_CORE_WORKER |
Gunicorn Worker 数量 | 系统 CPU 核数 |
LOCAL_MODEL_HOST |
本地模型服务地址 | 127.0.0.1 |
LOCAL_MODEL_PORT |
本地模型服务端口 | 11636 |
EMBEDDING_MODEL_NAME |
默认 Embedding 模型 | 各供应商默认模型 |
附录 B:Celery 任务清单
| 任务名 | 功能 | 所在文件 |
|---|---|---|
embedding_by_paragraph |
单段落向量化 | knowledge/task/embedding.py |
embedding_by_document |
文档级向量化 | knowledge/task/embedding.py |
embedding_by_knowledge |
知识库级全量向量化 | knowledge/task/embedding.py |
tokenize_by_document |
分词索引构建 | knowledge/task/embedding.py |
extract_long_term_memory |
长期记忆提取 | application/long_term_memory/__init__.py |
deploy_scheduled_trigger |
定时触发器部署 | trigger/handler/impl/trigger/scheduled_trigger.py |
附录 C:参考资料
- MaxKB 官方文档:https://maxkb.cn/docs/v2/
- MaxKB GitHub 仓库:https://github.com/1Panel-dev/MaxKB
- MaxKB 论坛:https://bbs.fit2cloud.com/c/mk/11
- LangChain 官方文档:https://python.langchain.com/docs/
- Django REST Framework 文档:https://www.django-rest-framework.org/
- pgvector 文档:https://github.com/pgvector/pgvector
- LogicFlow 文档:https://site.logic-flow.cn/
- Celery 文档:https://docs.celeryq.dev/
评论区