精通 Milvus 向量库:从入门到生产级实战
基于 Milvus 2.6.18 LTS 长期支持版本 | 覆盖开源社区版 + Zilliz Cloud 企业版
前言:教程定位与学习路径
0.1 适用人群
本教程面向以下技术群体:
- RAG 系统工程师:正在构建或优化检索增强生成系统,需要生产级向量检索能力
- AI 应用后端开发者:需要在应用中集成语义搜索、相似度匹配等向量检索能力
- 分布式系统运维工程师:负责 Milvus 集群的部署、监控、调优与故障排查
- 数据库管理员:从传统数据库领域扩展到向量数据库管理
- 架构师:需要构建亿级以上向量检索系统,进行技术选型与架构设计
- 技术团队负责人:计划从原型向量库(Chroma、FAISS)迁移到生产级方案
0.2 前置知识
| 知识领域 | 最低要求 | 推荐掌握 |
|---|---|---|
| 编程语言 | Python 或 Java 基础语法 | Python 异步编程、类型注解 |
| 机器学习 | 了解向量、嵌入概念 | 理解 Embedding 模型原理、相似度计算 |
| 分布式系统 | 了解基本概念 | Kubernetes 编排、微服务架构 |
| 大语言模型 | 了解 RAG 基本原理 | LangChain/LlamaIndex 框架使用经验 |
| 数据库 | SQL 基础 | NoSQL 数据库使用经验 |
0.3 2026 年 Milvus 技术生态全景
版本演进
Milvus 1.0 (2019) Milvus 2.0 (2021) Milvus 2.5.x (2024-2025) Milvus 2.6.x (2025-2026)
┌──────────────┐ ┌──────────────────┐ ┌──────────────────────┐ ┌────────────────────────────────┐
│ 单机架构 │ │ 云原生重构 │ │ 全能力增强期 │ │ 规模化降本期 │
│ 共享存储 │ │ 存算分离 │ │ 全文检索(BM25) │ │ RaBitQ 1-bit量化(内存降72%) │
│ 仅支持CPU │ │ 分布式架构 │ │ 稀疏向量原生支持 │ │ 冷热分层存储(成本降50%) │
│ 有限索引类型 │ │ 多索引体系 │ │ 分层存储 │ │ Woodpecker零磁盘WAL │
│ │ │ K8s原生部署 │ │ GPU CAGRA索引 │ │ Path Index(JSON过滤快100倍) │
│ │ │ 混合检索 │ │ 增强RBAC与安全 │ │ StreamingNode流批分离架构 │
│ │ │ │ │ 动态Schema成熟 │ │ Int8向量/HNSW压缩 │
│ │ │ │ │ Group By分组查询 │ │ Nullable Vector可空向量 │
│ │ │ │ │ │ │ Geometry地理空间字段 │
│ │ │ │ │ │ │ Struct Array结构化数组 │
│ │ │ │ │ │ │ 动态添加字段(Add Field) │
│ │ │ │ │ │ │ 多语言BM25分析器 │
│ │ │ │ │ │ │ 10万+集合多租户支持 │
└──────────────┘ └──────────────────┘ └──────────────────────┘ └────────────────────────────────┘
产品矩阵
| 产品 | 定位 | 数据规模 | 适用场景 |
|---|---|---|---|
| Milvus Lite | 嵌入式版,pip 安装 | 万级向量 | 本地开发、Jupyter 原型、单元测试 |
| Milvus 开源版 | 分布式自托管 | 百万~十亿级向量 | 生产环境、私有化部署、定制化需求 |
| Zilliz Cloud | 全托管云服务 | 百万~百亿级向量 | 快速上线、免运维、弹性伸缩 |
核心能力边界
- 百万级向量:单机 Docker Compose 即可胜任,HNSW 索引 P99 延迟 < 10ms
- 千万级向量:小规模 K8s 集群(3~5 节点),需要合理的分片与索引设计
- 亿级向量:标准分布式集群(10+ 节点),需要存算分离 + DiskANN + 冷热分层
- 十亿级向量:超大规模集群 + Zilliz Cloud 企业版,需要多层级架构设计
行业定位
Milvus 是全球最主流的开源云原生向量数据库,截至 2026 年:
- GitHub Star 数超过 40,000+
- 全球生产部署超过 1,000 家企业
- LF AI & Data 基金会毕业项目
- Milvus 2.6 实现内存降低 72%、全文检索比 Elasticsearch 快 4 倍、JSON 过滤快 100 倍
0.4 学习路线规划
30 天系统化学习路径
Week 1 (Day 1-6) Week 2 (Day 7-12) Week 3 (Day 13-18) Week 4 (Day 19-24) Week 5 (Day 25-30)
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 入门基础 │ │ 进阶功能 │ │ 分布式架构 │ │ 性能优化 │ │ 生态实战 │
│ ·向量数据库 │ │ ·索引调优 │ │ ·架构深度 │ │ ·写入优化 │ │ ·多语言客户端│
│ ·环境搭建 │ │ ·混合检索 │ │ ·生产部署 │ │ ·查询优化 │ │ ·RAG集成 │
│ ·数据模型 │ │ ·高级特性 │ │ ·集群运维 │ │ ·大规模架构 │ │ ·项目实战 │
│ ·数据写入 │ │ │ │ │ │ │ │ ·底层原理 │
│ ·索引基础 │ │ │ │ │ │ │ │ ·前沿技术 │
│ ·查询检索 │ │ │ │ │ │ │ │ │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
学习方法建议
- 理论 + 实操结合:每章先理解原理,再动手实践
- 渐进式深入:从单机到集群,从基础到高级
- 项目驱动:第七阶段的实战项目是检验学习成果的关键
- 社区参与:加入 Milvus 社区(Slack/Discord),参与讨论与贡献
第一阶段:入门基础(Day 1-2)
第 1 章 向量数据库与 Milvus 概述
1.1 向量数据库核心基础
1.1.1 向量嵌入(Embedding)与语义检索原理
向量嵌入是将非结构化数据(文本、图像、音频)转换为高维数值向量的过程。这些向量在数学空间中捕获了数据的语义信息——语义相近的数据在向量空间中距离更近。
原始数据 嵌入模型 向量空间
┌──────────┐ ┌──────────┐ ┌──────────────────┐
│ "猫在睡觉" │ ──────→ │ Embedding │ ──────→ │ [0.12, -0.34, │
│ │ │ Model │ │ 0.56, 0.78, ...] │
└──────────┘ │(BGE/E5/ │ │ (768维/1024维) │
┌──────────┐ │ OpenAI...)│ └──────────────────┘
│ "小猫打盹" │ ──────→ │ │ ──────→ ┌──────────────────┐
│ │ │ │ │ [0.11, -0.32, │
└──────────┘ └──────────┘ │ 0.55, 0.79, ...] │
└──────────────────┘
↑ 距离很近,语义相似
核心流程:
- 嵌入生成:使用预训练模型(如 BGE-large、text-embedding-3-large)将原始数据转为向量
- 向量存储:将向量及其元数据存入向量数据库
- 相似度计算:查询时将查询文本同样转为向量,与库中向量计算相似度
- 结果排序:按相似度从高到低返回 Top-K 结果
1.1.2 相似度度量方式
| 度量方式 | 公式 | 值域 | 特点 | 适用场景 |
|---|---|---|---|---|
| 余弦距离 (COSINE) | 1 - cos(a,b) | [0, 2] | 只关注方向,忽略幅度 | 文本语义搜索(最常用) |
| 欧氏距离 (L2) | √Σ(aᵢ-bᵢ)² | [0, +∞) | 关注绝对距离 | 图像检索、空间数据 |
| 内积 (IP) | Σaᵢbᵢ | (-∞, +∞) | 值越大越相似 | 归一化向量等价余弦 |
| 汉明距离 (HAMMING) | 异或位计数 | [0, dim] | 按位比较 | 二值向量、哈希检索 |
| JACCARD | |A∩B|/|A∪B| | [0, 1] | 集合相似度 | 稀疏二值向量 |
import numpy as np
def cosine_distance(a, b):
"""计算余弦距离"""
dot_product = np.dot(a, b)
norm_a = np.linalg.norm(a)
norm_b = np.linalg.norm(b)
cosine_similarity = dot_product / (norm_a * norm_b)
return 1 - cosine_similarity
vec_cat = np.array([0.12, -0.34, 0.56, 0.78])
vec_kitten = np.array([0.11, -0.32, 0.55, 0.79])
vec_stock = np.array([0.89, 0.12, -0.45, 0.33])
print(f"猫 vs 小猫: {cosine_distance(vec_cat, vec_kitten):.4f}") # ≈ 0.002
print(f"猫 vs 股票: {cosine_distance(vec_cat, vec_stock):.4f}") # ≈ 1.2
重要提示:当向量已经 L2 归一化后,内积(IP)与余弦相似度等价,此时使用 IP 可以省去归一化计算,性能更优。
1.1.3 近似最近邻搜索(ANN)核心思想
精确最近邻搜索(Exact NN)时间复杂度 O(N),在亿级数据上不可接受。ANN 通过牺牲少量精度换取数量级的性能提升:
精确搜索 (Exact NN) 近似搜索 (ANN)
┌─────────────────────┐ ┌─────────────────────┐
│ 查询向量 vs 所有向量 │ │ 构建索引结构 │
│ 时间复杂度: O(N) │ │ 查询时只搜索子集 │
│ 召回率: 100% │ vs │ 时间复杂度: O(log N) │
│ 延迟: 高 │ │ 召回率: 95%~99.5% │
│ 适用: 小数据集 │ │ 延迟: 低 │
└─────────────────────┘ │ 适用: 大规模生产 │
└─────────────────────┘
ANN 的三大技术路线:
- 基于图的方法(HNSW、NSW):构建导航图,沿边逼近最近邻,高召回率
- 基于量化的方法(IVF_PQ、SQ8):压缩向量降低内存,牺牲部分精度
- 基于分区的方法(IVF_FLAT):将空间分区,只搜索相关分区
1.1.4 向量数据库 vs 传统数据库 vs 全文搜索引擎
| 维度 | 关系型数据库 | 全文搜索引擎 | 向量数据库 |
|---|---|---|---|
| 数据类型 | 结构化数据 | 文本数据 | 向量+标量混合 |
| 查询方式 | 精确匹配 | 关键词匹配 | 语义相似度 |
| 索引结构 | B+ Tree | 倒排索引 | ANN 索引 |
| 查询结果 | 精确结果 | 排序结果 | Top-K 近似结果 |
| 语义理解 | 无 | 有限(词频) | 强(向量语义) |
| 典型产品 | MySQL/PG | Elasticsearch | Milvus/Qdrant |
| 适用场景 | 事务处理 | 关键词搜索 | 语义检索/RAG |
为什么不能只用 Elasticsearch?
- ES 的向量检索(kNN)是后加功能,非原生设计,性能远不如专用向量库
- ES 不支持稀疏向量+稠密向量混合检索
- Milvus 2.5 的 BM25 全文检索在百万向量上仅需 6ms,ES 需要 200ms
1.1.5 向量检索的技术挑战:召回率、延迟、成本的三角平衡
召回率 (Recall)
▲
┌───┼───┐
│ 不可能 │
│ 三角形 │
└───┼───┘
╱ │ ╲
低成本 ◄────────────┼────────────► 低延迟
(Cost) │ (Latency)
Milvus 的破局之道:
- 多索引体系:不同场景选择不同索引
- 混合检索:向量+标量联合过滤,缩小搜索空间
- 存算分离:按需加载,降低内存成本
- 分层存储:冷数据放磁盘,热数据放内存
1.2 Milvus 产品定位与发展
1.2.1 设计理念
- 云原生(Cloud Native):全面拥抱云原生架构,所有组件容器化,支持 K8s 编排
- 存算分离(Disaggregated Storage & Compute):存储和计算完全解耦,可独立扩展
- 水平扩展(Horizontal Scaling):无状态节点可随意扩缩容
- 高性能(High Performance):C++ 核心引擎,SIMD 指令优化,GPU 加速支持
1.2.2 2026 年 2.6.x 核心新特性总览
Milvus 2.6 是一次重大版本升级,围绕"降本、增效、规模化"三大主题,带来了突破性创新:
| 特性 | 说明 | 影响 |
|---|---|---|
| RaBitQ 1-bit 量化 | 将 FP32 向量压缩为 1-bit 表示,内存降低 72%,查询速度提升 4 倍 | 亿级向量成本大幅下降 |
| 冷热分层存储 | 自动识别冷热数据,冷数据按需加载,存储成本降低 50% | 大规模场景成本优化 |
| Woodpecker 零磁盘 WAL | 自研云原生 WAL 引擎,替代 Kafka/Pulsar,零磁盘架构 | 运维复杂度大幅降低 |
| Path Index | JSON 路径索引,嵌套元数据过滤速度提升 100 倍 | 复杂 JSON 过滤场景质变 |
| StreamingNode 流批分离 | 新增流式节点,实时写入即时可查,写入吞吐大幅提升 | 实时性要求场景受益 |
| Int8 向量压缩 | HNSW 索引支持 Int8 向量存储,内存减半 | 高精度场景成本优化 |
| Nullable Vector | 向量字段支持可空,缺失嵌入不占存储,搜索自动跳过 | Schema 演变更灵活 |
| 动态添加字段 (Add Field) | 无需停机即可向已有集合添加新标量字段 | 业务迭代零停机 |
| Geometry 地理空间字段 | 支持 POINT/LINESTRING/POLYGON 类型 | LBS + 语义检索融合 |
| Struct Array | 结构化数组字段,支持元素级向量搜索 | 多向量实体建模更自然 |
| 多语言 BM25 分析器 | 自动识别语言并应用对应分词器,多语言全文检索 | 全球化业务场景 |
| CAGRA+Vamana 混合索引 | GPU 构建 + CPU 查询,降低 GPU 部署成本 | GPU 成本优化 |
| 10 万+集合多租户 | 单集群支持超过 10 万个 Collection | SaaS 多租户场景突破 |
| Storage v2 | 新存储引擎,降低 IOPS 和内存占用 | 整体性能提升 |
2.5.x 继承特性:全文检索(BM25)、稀疏向量原生支持、GPU CAGRA 索引、增强 RBAC、Group By 分组查询等特性在 2.6.x 中全部继承并增强。
1.2.3 开源社区版 vs Zilliz Cloud 企业版 vs Milvus Lite
| 维度 | Milvus Lite | Milvus 开源版 | Zilliz Cloud |
|---|---|---|---|
| 部署方式 | pip install | Docker/K8s 自托管 | 全托管云服务 |
| 数据规模 | 万级 | 百万~十亿级 | 百万~百亿级 |
| 高可用 | 无 | 手动配置 | 自动 |
| 运维成本 | 零 | 高 | 低 |
| 成本 | 免费 | 服务器成本 | 按用量付费 |
| SLA | 无 | 自行保障 | 99.9%+ |
| 适用阶段 | 开发原型 | 生产环境 | 快速上线 |
1.3 核心优势与适用场景
1.3.1 核心技术优势
- 多索引体系:FLAT、HNSW、IVF 系列、DiskANN、GPU 索引、稀疏索引,覆盖所有场景
- 混合检索:向量检索+标量过滤+全文检索,一站式解决
- 存算分离:存储和计算独立扩展,资源利用率最优
- 弹性伸缩:无状态节点秒级扩缩容,有状态数据自动均衡
- 多语言 SDK:Python、Java、Go、Node.js、RESTful API
- 生态丰富:LangChain、LlamaIndex、Haystack、Dify 等深度集成
1.3.2 典型业务场景
场景一:RAG 知识库
用户提问 → Embedding → Milvus 语义检索 → Top-K 文档 → LLM 生成回答
↘ BM25 关键词检索 ↗
(混合检索 + Rerank)
场景二:图像检索
上传图片 → CLIP/ViT Embedding → Milvus 向量检索 → 相似图片列表
场景三:推荐召回
用户行为 → 用户向量 → Milvus ANN 检索 → 候选物品集 → 排序 → 推荐
1.3.3 选型边界
选择 Milvus 的场景:
- 数据规模超过百万级向量
- 需要分布式部署和高可用
- 需要混合检索(向量+标量+全文)
- 需要 GPU 加速或磁盘索引
- 企业级安全与 RBAC 需求
可以考虑其他方案的场景:
- 原型验证阶段,数据量小 → Chroma(更轻量)
- 单机部署,QPS 要求不高 → Qdrant(Rust 单机性能好)
- 纯云服务,不想运维 → Pinecone(全托管,但成本高)
- 仅需本地向量检索库 → FAISS(无数据库功能)—
第 2 章 环境搭建与快速上手
2.1 环境准备与版本选择
硬件要求
| 规模 | CPU | 内存 | 存储 | GPU(可选) |
|---|---|---|---|---|
| 开发测试 | 4 核+ | 8 GB+ | 50 GB SSD | 无 |
| 百万级生产 | 8 核+ | 32 GB+ | 500 GB SSD | 无 |
| 千万级生产 | 16 核+ | 64 GB+ | 1 TB NVMe | 可选 |
| 亿级生产 | 32 核+ | 128 GB+ | 多 TB NVMe | 推荐 A100/H100 |
软件依赖
| 组件 | 最低版本 | 推荐版本 |
|---|---|---|
| Python | 3.8+ | 3.10+ |
| Docker | 20.10+ | 24.0+ |
| Kubernetes | 1.20+ | 1.28+ |
| Helm | 3.0+ | 3.12+ |
版本选择原则
- LTS 长期支持版(推荐):2.6.18 系列,稳定可靠,安全补丁持续更新,包含 RaBitQ、Woodpecker 等重大特性
- 开发版:包含最新功能,但可能存在不稳定性,仅用于功能预览
2.2 四种部署方式快速启动
方式一:Milvus Lite 嵌入式版
最简单的入门方式,零依赖,适合本地开发和原型验证:
# 安装 PyMilvus(内置 Milvus Lite)
pip install pymilvus
# 使用 Milvus Lite,只需指定本地文件路径
from pymilvus import MilvusClient
client = MilvusClient("./milvus_demo.db") # 数据存储在本地文件
# 使用完毕后关闭
client.close()
Milvus Lite 特点:
- 无需 Docker、无需服务器
- 数据存储在本地 SQLite 文件中
- 支持完整的 CRUD 和向量检索 API
- 不支持分布式、高可用、GPU 索引
- 适合 Jupyter Notebook 开发和单元测试
方式二:Docker Compose 单机版
适合开发测试环境,包含所有依赖组件:
# 下载 docker-compose 配置文件
wget https://github.com/milvus-io/milvus/releases/download/v2.6.18/milvus-standalone-docker-compose.yml -O docker-compose.yml
# 启动 Milvus 单机版
docker compose up -d
# 查看服务状态
docker compose ps
# 停止服务
docker compose down
配置文件核心参数说明:
# docker-compose.yml 关键配置
services:
etcd:
# 元数据存储
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
volumes:
- etcd_data:/etcd
minio:
# 对象存储
environment:
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin
volumes:
- minio_data:/minio/data
standalone:
# Milvus 主服务
ports:
- "19530:19530" # gRPC 端口
- "9091:9091" # Metrics 端口
volumes:
- milvus_data:/var/lib/milvus
方式三:Helm Chart Kubernetes 集群版
生产环境推荐方式,支持完整分布式部署:
# 添加 Milvus Helm 仓库
helm repo add milvus https://zilliztech.github.io/milvus-helm/
helm repo update
# 创建命名空间
kubectl create namespace milvus
# 部署 Milvus 分布式集群
helm install milvus milvus/milvus \
--namespace milvus \
--set cluster.enabled=true \
--set proxy.service.type=LoadBalancer \
--set queryNode.replicas=2 \
--set dataNode.replicas=1 \
--set indexNode.replicas=1
# 查看部署状态
kubectl get pods -n milvus
# 端口转发(本地访问)
kubectl port-forward svc/milvus 19530:19530 -n milvus
生产级 Helm values 配置示例:
# values-prod.yaml (Milvus 2.6.18)
cluster:
enabled: true
proxy:
replicas: 2
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
# 2.6 新增:StreamingNode 流式节点
streamingNode:
replicas: 2
resources:
requests:
cpu: "2"
memory: "8Gi"
limits:
cpu: "4"
memory: "16Gi"
queryNode:
replicas: 3
resources:
requests:
cpu: "4"
memory: "16Gi"
limits:
cpu: "8"
memory: "32Gi"
dataNode:
replicas: 2
resources:
requests:
cpu: "2"
memory: "8Gi"
indexNode:
replicas: 1
resources:
requests:
cpu: "4"
memory: "16Gi"
etcd:
replicas: 3
minio:
mode: distributed
replicas: 4
# 2.6 新增:Woodpecker 零磁盘 WAL(可替代 Pulsar/Kafka)
woodpecker:
enabled: true
# 如果仍需 Pulsar,可保留(与 Woodpecker 二选一)
pulsar:
enabled: false
方式四:Zilliz Cloud 免费托管版
无需运维,快速体验:
- 访问 cloud.zilliz.com 注册账号
- 创建免费集群(Free Tier 提供 1 个 CU)
- 获取连接端点和 API Key
- 使用 PyMilvus 连接
from pymilvus import MilvusClient
# Zilliz Cloud 连接
client = MilvusClient(
uri="https://in03-xxxxx.aws-us-west-2.vectordb.zillizcloud.com:19530",
token="db_name:api_key" # 数据库名:API密钥
)
2.3 可视化与命令行工具
Attu 可视化管理平台
Attu 是 Milvus 官方可视化管理工具,提供图形化界面管理集合、数据、索引:
# Docker 启动 Attu
docker run -d \
--name attu \
-p 3000:3000 \
-e MILVUS_URL=host.docker.internal:19530 \
zilliz/attu:latest
# 访问 http://localhost:3000
Attu 核心功能:
- 集合管理:创建、删除、查看集合详情
- 数据浏览:查看、搜索数据
- 索引管理:创建、查看索引
- 系统监控:节点状态、系统信息
Milvus CLI 命令行工具
# 安装
pip install milvus-cli
# 连接 Milvus
milvus_cli > connect -h localhost -p 19530
# 常用命令
milvus_cli > list collections
milvus_cli > describe collection -c my_collection
milvus_cli > query -c my_collection -f "id > 100"
Birdwatcher 调试工具
用于 Milvus 运维调试的高级工具,可直接连接 etcd 查看元数据:
# 连接 etcd 查看元数据
./birdwatcher --etcd=127.0.0.1:2379 --rootPath=by-dev
# 查看集合信息
show collections
show segments
2.4 Hello World:第一个向量检索程序
Python 客户端 PyMilvus 安装
pip install pymilvus
# 验证安装
python -c "import pymilvus; print(pymilvus.__version__)"
连接 Milvus 服务的三种方式
from pymilvus import MilvusClient
# 方式一:使用 MilvusClient(推荐,2.4+ 新 API)
client = MilvusClient(
uri="http://localhost:19530", # Milvus 服务地址
token="root:Milvus" # 认证信息(如果启用)
)
# 方式二:Milvus Lite 本地文件
client = MilvusClient("./milvus_local.db")
# 方式三:Zilliz Cloud
client = MilvusClient(
uri="https://xxx.vectordb.zillizcloud.com:19530",
token="db_name:api_key"
)
全流程实操
from pymilvus import MilvusClient
# 1. 连接 Milvus
client = MilvusClient("http://localhost:19530")
# 2. 创建集合(快速模式)
client.create_collection(
collection_name="demo_collection",
dimension=768 # 向量维度,与嵌入模型输出维度一致
)
# 3. 插入数据
data = [
{"id": 1, "vector": [0.1] * 768, "text": "Milvus是开源向量数据库", "category": "数据库"},
{"id": 2, "vector": [0.2] * 768, "text": "向量检索用于语义搜索", "category": "搜索"},
{"id": 3, "vector": [0.3] * 768, "text": "深度学习模型生成嵌入向量", "category": "AI"},
]
client.insert(collection_name="demo_collection", data=data)
# 4. 向量查询
query_vector = [0.15] * 768
results = client.search(
collection_name="demo_collection",
data=[query_vector], # 支持批量查询
limit=3, # 返回 Top-3
output_fields=["text", "category"]
)
# 5. 查看结果
for hits in results:
for hit in hits:
print(f"ID: {hit['id']}, 距离: {hit['distance']:.4f}, "
f"文本: {hit['entity']['text']}")
# 6. 清理
client.drop_collection("demo_collection")
核心对象解析
Milvus 数据层级结构:
Milvus 实例
└── 数据库 (Database) ← 逻辑隔离,类似 MySQL 的 Database
└── 集合 (Collection) ← 类似 MySQL 的 Table
├── 分区 (Partition) ← 数据按规则分区,加速查询
│ └── 段 (Segment) ← 数据存储的最小单元
│ ├── Growing 段 ← 正在写入的段(存在内存中)
│ └── Sealed 段 ← 已封存的段(已持久化到对象存储)
└── 索引 (Index) ← 加速检索的数据结构
| 对象 | 说明 | 类比 |
|---|---|---|
| Connection | 与 Milvus 服务的连接 | 数据库连接 |
| Database | 逻辑命名空间 | MySQL Database |
| Collection | 数据集合,包含 Schema | MySQL Table |
| Partition | 集合内的数据分区 | 分区表 |
| Segment | 数据存储的物理单元 | 数据文件 |
| Index | 加速检索的索引结构 | B+ Tree Index |
第二阶段:核心操作与数据模型(Day 3-6)
第 3 章 数据模型与集合设计
3.1 Milvus 核心数据概念
层级关系
Database (数据库)
└── Collection (集合) ─── Schema + 索引 + 配置
├── Partition (分区) ─── 数据按规则划分
│ └── Segment (段) ─── 物理存储单元
│ ├── Growing Segment ─── 正在写入,在内存
│ └── Sealed Segment ─── 已封存,在对象存储
└── Shard (分片) ─── 基于 Pulsar 的写入通道
关键概念解释:
- 集合(Collection):Milvus 中存储数据的基本单元,类似关系数据库的表
- 分区(Partition):集合内的数据分区,可按业务规则手动创建或通过分区键自动管理
- 分片(Shard):基于消息队列(Pulsar/Kafka)的写入通道,分片数量决定写入吞吐量上限
- 段(Segment):数据存储的最小物理单元,每个段默认包含最多 512MB 数据
字段分类
| 字段类型 | 说明 | 示例 |
|---|---|---|
| 主键字段 | 唯一标识每条记录,必选 | id (INT64 或 VARCHAR) |
| 标量字段 | 存储非向量的结构化数据 | title (VARCHAR), price (FLOAT) |
| 向量字段 | 存储向量嵌入数据 | embedding (FLOAT_VECTOR) |
3.2 Schema 字段设计详解
主键字段
from pymilvus import MilvusClient, DataType
# 方式一:INT64 自增主键(推荐,性能最优)
schema = MilvusClient.create_schema()
schema.add_field(
field_name="id",
datatype=DataType.INT64,
is_primary=True,
auto_id=True # 自动生成递增ID
)
# 方式二:VARCHAR 自定义主键(适合有自然键的场景)
schema.add_field(
field_name="doc_id",
datatype=DataType.VARCHAR,
max_length=256,
is_primary=True,
auto_id=False # 需要手动指定
)
选型建议:
- 优先使用 INT64 自增主键,性能最优
- 当需要与外部系统数据关联时(如文档ID),使用 VARCHAR 主键
- VARCHAR 主键在过滤查询中性能略低于 INT64
标量字段类型全解
schema = MilvusClient.create_schema()
# 整数类型
schema.add_field(field_name="age", datatype=DataType.INT64) # 64位整数
schema.add_field(field_name="count", datatype=DataType.INT32) # 32位整数
schema.add_field(field_name="big_id", datatype=DataType.INT16) # 16位整数
schema.add_field(field_name="flag", datatype=DataType.INT8) # 8位整数
# 浮点类型
schema.add_field(field_name="score", datatype=DataType.FLOAT) # 32位浮点
schema.add_field(field_name="price", datatype=DataType.DOUBLE) # 64位浮点
# 布尔类型
schema.add_field(field_name="is_active", datatype=DataType.BOOL)
# 字符串类型
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=512)
# JSON 类型(灵活存储半结构化数据)
schema.add_field(field_name="metadata", datatype=DataType.JSON)
# 数组类型
schema.add_field(
field_name="tags",
datatype=DataType.ARRAY,
element_type=DataType.VARCHAR,
max_capacity=20, # 数组最大容量
max_length=64 # 元素最大长度
)
JSON 字段使用详解:
# JSON 字段支持嵌套结构,适合存储动态元数据
data = [
{
"id": 1,
"vector": [0.1] * 128,
"metadata": {
"source": "wiki",
"author": "张三",
"tags": ["AI", "数据库"],
"stats": {"views": 1000, "likes": 50}
}
}
]
# JSON 字段支持嵌套过滤
results = client.search(
collection_name="my_collection",
data=[query_vector],
filter='metadata["source"] == "wiki" && metadata["stats"]["views"] > 500',
limit=10
)
向量字段类型
# 浮点稠密向量(最常用)
schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=768
)
# 二值向量(用于哈希检索场景)
schema.add_field(
field_name="binary_hash",
datatype=DataType.BINARY_VECTOR,
dim=256 # 维度必须是8的倍数
)
# 稀疏向量(2.4+ 支持,用于关键词检索)
schema.add_field(
field_name="sparse_embedding",
datatype=DataType.SPARSE_FLOAT_VECTOR
# 稀疏向量无需指定 dim,维度动态
)
# Float16 向量(2.5+ 支持,节省内存)
schema.add_field(
field_name="fp16_embedding",
datatype=DataType.FLOAT16_VECTOR,
dim=768
)
# BFloat16 向量(2.5+ 支持)
schema.add_field(
field_name="bf16_embedding",
datatype=DataType.BFLOAT16_VECTOR,
dim=768
)
# Int8 向量(2.6 新增,内存减半)
schema.add_field(
field_name="int8_embedding",
datatype=DataType.INT8_VECTOR,
dim=768
)
2.6 新增字段类型:
# Geometry 地理空间字段(2.6 新增)
schema.add_field(
field_name="location",
datatype=DataType.GEOMETRY # 支持 POINT/LINESTRING/POLYGON
)
# Struct Array 结构化数组(2.6 新增)
# 一个实体包含多个子项,每个子项有独立向量
schema.add_field(
field_name="page_embeddings",
datatype=DataType.ARRAY,
element_type=DataType.STRUCT,
struct_fields=[
FieldSchema("page_text", DataType.VARCHAR, max_length=4096),
FieldSchema("page_vector", DataType.FLOAT_VECTOR, dim=768),
FieldSchema("page_number", DataType.INT64),
],
max_capacity=50
)
# Nullable Vector 可空向量(2.6.18 新增)
# 允许插入时向量字段为空,搜索时自动跳过 NULL 向量
schema.add_field(
field_name="embedding",
datatype=DataType.FLOAT_VECTOR,
dim=768,
nullable=True # 2.6.18 新增:向量字段可空
)
Nullable Vector 使用场景:
- 嵌入尚未生成时先插入元数据,后续回填向量
- 动态添加向量字段到已有集合(新字段对旧数据为 NULL)
- 部分实体不需要向量检索(如纯元数据记录)
# 插入含 NULL 向量的数据
data = [
{"id": 1, "text": "已处理文档", "embedding": [0.1] * 768},
{"id": 2, "text": "待处理文档", "embedding": None}, # NULL 向量
]
client.insert(collection_name="docs", data=data)
# 搜索时自动跳过 NULL 向量,不会报错
results = client.search(
collection_name="docs",
data=[[0.1] * 768],
anns_field="embedding",
limit=10
)
向量维度选型建议:
| 嵌入模型 | 输出维度 | 推荐向量类型 | 内存占用(每百万条) |
|---|---|---|---|
| OpenAI text-embedding-3-small | 1536 | FLOAT_VECTOR | ~5.9 GB |
| OpenAI text-embedding-3-large | 3072 | FLOAT_VECTOR / INT8_VECTOR | ~11.7 GB / ~2.9 GB |
| BGE-large-zh | 1024 | FLOAT_VECTOR | ~3.9 GB |
| BGE-base-zh | 768 | FLOAT_VECTOR / INT8_VECTOR | ~2.9 GB / ~0.7 GB |
| SPLADE 稀疏向量 | 动态 | SPARSE_FLOAT_VECTOR | ~0.5 GB |
| Cohere embed-v4 | 1024 | FLOAT_VECTOR | ~3.9 GB |
3.3 集合创建与核心配置
create_collection 参数全解
from pymilvus import MilvusClient, DataType
# 1. 创建 Schema
schema = MilvusClient.create_schema(auto_id=False, enable_dynamic_field=True)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=2048)
schema.add_field(field_name="category", datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name="year", datatype=DataType.INT32)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=1024)
# 2. 创建索引参数
index_params = client.prepare_index_params()
index_params.add_index(
field_name="embedding",
index_type="HNSW",
metric_type="COSINE",
params={"M": 16, "efConstruction": 256}
)
index_params.add_index(
field_name="category",
index_type="INVERTED"
)
# 3. 创建集合
client.create_collection(
collection_name="knowledge_base",
schema=schema,
index_params=index_params,
shards_num=2, # 分片数量
num_partitions=64, # 分区数量(使用分区键时)
partition_key_field="category", # 分区键字段
consistency_level="Bounded" # 一致性级别
)
分片数量(shards_num)配置原则
| 数据规模 | 推荐分片数 | 说明 |
|---|---|---|
| < 100万 | 1 | 单分片足够 |
| 100万~1000万 | 2~4 | 中等写入量 |
| 1000万~1亿 | 4~8 | 高写入量 |
| > 1亿 | 8~16 | 超高写入量 |
注意:分片数量创建后不可修改,请根据预估数据量和写入速率提前规划。
分区键(Partition Key)设计
# 分区键自动分区:按 category 字段自动分区
schema.add_field(
field_name="category",
datatype=DataType.VARCHAR,
max_length=64,
is_partition_key=True # 标记为分区键
)
# 创建集合时指定分区数量
client.create_collection(
collection_name="auto_partition_demo",
schema=schema,
index_params=index_params,
num_partitions=64 # 自动创建64个分区
)
分区键 vs 手动分区:
| 维度 | 分区键 | 手动分区 |
|---|---|---|
| 管理方式 | 自动 | 手动创建/管理 |
| 分区数量 | 创建时指定,可很多 | 通常少量 |
| 过滤优化 | 自动按分区裁剪 | 需指定分区名 |
| 灵活性 | 较低 | 较高 |
| 适用场景 | 枚举值少的字段 | 复杂业务分区 |
动态字段 enable_dynamic_field
# 启用动态字段后,可以插入 Schema 中未定义的字段
schema = MilvusClient.create_schema(enable_dynamic_field=True)
data = [
{
"id": 1,
"embedding": [0.1] * 768,
"text": "示例文本",
# 以下字段不在 Schema 中,但可以存储
"author": "张三",
"priority": 5,
}
]
# 动态字段存储在 $meta 字段中,可以通过 filter 过滤
建议:高频过滤的字段使用静态字段,低频/探索性字段使用动态字段。
集合别名(Alias)
# 创建别名(用于平滑切换、灰度发布)
client.create_alias(
collection_name="knowledge_base_v2",
alias="knowledge_base"
)
# 修改别名指向(灰度切换)
client.alter_alias(
collection_name="knowledge_base_v3",
alias="knowledge_base"
)
# 删除别名
client.drop_alias(alias="knowledge_base")
动态添加字段(2.6 新增 Add Field)
Milvus 2.6 支持在不停止服务的情况下向已有集合添加新标量字段,无需重建集合:
# 向已有集合添加新字段(2.6 新增)
client.add_collection_field(
collection_name="knowledge_base",
field_name="priority",
data_type=DataType.INT64,
nullable=True, # 新增字段必须可空(旧数据该字段值为 NULL)
default_value=0 # 可选:设置默认值
)
# 添加 VARCHAR 字段
client.add_collection_field(
collection_name="knowledge_base",
field_name="department",
data_type=DataType.VARCHAR,
max_length=128,
nullable=True
)
Add Field 限制与注意事项:
- 只能添加标量字段(INT64、VARCHAR、FLOAT、DOUBLE 等),不能添加向量字段
- 新增字段必须设置为
nullable=True,因为旧数据没有该字段值 - 字段名必须唯一,不可与已有字段重名
- 每个集合的字段总数有上限(通常不超过 1024 个)
- 不可通过 Add Field 启用动态字段功能,需在创建时设置
与 Nullable Vector 配合:2.6.18 支持向量字段可空后,可以先用 Add Field 添加标量字段,再添加新的可空向量字段,实现 Schema 的渐进式演进。
3.4 分区管理
分区的核心作用
分区是 Milvus 中缩小查询范围的核心机制。当查询指定分区时,Milvus 只在对应分区内搜索,大幅减少计算量。
无分区查询:搜索全部数据 指定分区查询:只搜索目标分区
┌─────────────────────────┐ ┌───────────┬───────────┐
│ 全部数据 (1亿条) │ │ 分区A │ 分区B │
│ 搜索范围: 1亿 │ → │ 搜索范围: │ │
│ │ │ 1000万 ✓ │ │
└─────────────────────────┘ └───────────┴───────────┘
手动分区操作
# 创建分区
client.create_partition(
collection_name="knowledge_base",
partition_name="partition_2024"
)
# 插入数据到指定分区
client.insert(
collection_name="knowledge_base",
data=data_2024,
partition_name="partition_2024"
)
# 在指定分区中搜索
results = client.search(
collection_name="knowledge_base",
data=[query_vector],
partition_names=["partition_2024"],
limit=10
)
# 释放分区(释放内存)
client.release_partitions(
collection_name="knowledge_base",
partition_names=["partition_2023"]
)
# 删除分区
client.drop_partition(
collection_name="knowledge_base",
partition_name="partition_2022"
)
TTL 配置
# 设置集合的数据生存时间(TTL),过期数据自动清理
client.alter_collection_properties(
collection_name="logs_collection",
properties={"collection.ttl.seconds": 2592000} # 30天
)
3.5 集合生命周期管理
加载与释放
# 加载集合到内存(查询前必须加载)
client.load_collection(
collection_name="knowledge_base",
replica_number=2 # 可选:指定加载的副本数(读高可用)
)
# 释放集合(释放内存,数据仍在对象存储中)
client.release_collection(
collection_name="knowledge_base"
)
# 查看集合加载状态
info = client.describe_collection("knowledge_base")
print(info["load_state"]) # Loaded / NotLoad / Loading
重要概念:
- Load:将数据从对象存储加载到 QueryNode 内存,使其可被查询
- Release:从 QueryNode 内存卸载数据,释放计算资源,数据不会丢失
- 数据写入后不需要手动 Load,Growing 段自动在内存中
- Sealed 段需要 Load 后才能被查询到
查看与管理
# 列出所有集合
collections = client.list_collections()
# 查看集合详情
info = client.describe_collection("knowledge_base")
# 检查集合是否存在
exists = client.has_collection("knowledge_base")
# 删除集合(不可恢复!)
client.drop_collection("knowledge_base")
第 4 章 数据写入与更新
4.1 基础写入 API 全解析
insert:批量新增数据
data = [
{"id": 1, "text": "文档1", "category": "技术", "embedding": [0.1] * 768},
{"id": 2, "text": "文档2", "category": "产品", "embedding": [0.2] * 768},
{"id": 3, "text": "文档3", "category": "技术", "embedding": [0.3] * 768},
]
result = client.insert(
collection_name="knowledge_base",
data=data
)
print(f"插入数量: {result['insert_count']}")
upsert:更新插入(幂等写入)
# upsert:如果主键已存在则更新,不存在则插入
upsert_data = [
{"id": 1, "text": "文档1-更新版", "category": "技术", "embedding": [0.15] * 768},
]
result = client.upsert(
collection_name="knowledge_base",
data=upsert_data
)
insert vs upsert:
insert:主键冲突时会报错upsert:主键冲突时更新,不冲突时插入,适合幂等写入场景upsert性能略低于insert,因为需要先查询主键是否存在
delete:按主键/条件删除
# 按主键删除
client.delete(
collection_name="knowledge_base",
ids=[1, 2, 3]
)
# 按条件删除
client.delete(
collection_name="knowledge_base",
filter='category == "过期" && year < 2020'
)
数据写入的一致性级别与可见性延迟
Milvus 采用 WAL(Write-Ahead Log)机制保证数据持久性:
客户端写入 → Proxy → Pulsar/Kafka → DataNode → 对象存储(MinIO/S3)
↓
Growing Segment(立即可查)
↓
Sealed Segment(持久化后可查)
4.2 批量写入最佳实践
单批次大小优化
import numpy as np
BATCH_SIZE = 1000 # 每批1000条,平衡吞吐与内存
def batch_insert(client, collection_name, all_data, batch_size=BATCH_SIZE):
"""分批写入数据"""
total = len(all_data)
for i in range(0, total, batch_size):
batch = all_data[i:i+batch_size]
result = client.insert(
collection_name=collection_name,
data=batch
)
print(f"已写入: {min(i+batch_size, total)}/{total}")
批次大小建议:
- 小批次(< 100):延迟低但吞吐低,适合实时写入
- 中批次(100~5000):吞吐与延迟平衡,推荐大多数场景
- 大批次(> 5000):吞吐高但内存占用大,适合离线导入
并发写入与连接池
from concurrent.futures import ThreadPoolExecutor, as_completed
def insert_batch(client, collection_name, batch_data):
try:
result = client.insert(collection_name=collection_name, data=batch_data)
return result['insert_count']
except Exception as e:
print(f"写入失败: {e}")
return 0
def concurrent_insert(client, collection_name, all_data,
batch_size=1000, max_workers=4):
batches = [all_data[i:i+batch_size]
for i in range(0, len(all_data), batch_size)]
total_inserted = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(insert_batch, client, collection_name, batch)
for batch in batches
]
for future in as_completed(futures):
total_inserted += future.result()
print(f"总写入: {total_inserted}")
4.3 数据更新与删除底层机制
软删除与硬删除原理
Milvus 的删除操作采用软删除机制:
删除流程:
1. 客户端发送 Delete 请求
2. Proxy 将删除标记写入 Pulsar/Kafka
3. DataNode 消费删除标记,写入 Delete Log
4. 查询时,QueryNode 根据删除标记过滤结果
5. Compaction 时,真正从物理文件中移除已删除数据
删除标记(Delete Log):
┌──────────────────────────────────────┐
│ Segment ID │ Primary Key │ Timestamp │
│ seg_001 │ 42 │ 1700000 │
│ seg_001 │ 87 │ 1700001 │
│ seg_002 │ 15 │ 1700002 │
└──────────────────────────────────────┘
关键点:
- 删除是异步的,删除后数据不会立即从磁盘消失
- 查询时通过 Delete Log 过滤已删除数据
- 物理删除在 Compaction 时执行
- 大量删除后建议触发 Compaction 回收空间
Compaction 数据合并机制
# 手动触发 Compaction
client.compact(
collection_name="knowledge_base",
)
# 查看 Compaction 状态
status = client.get_compaction_state(collection_name="knowledge_base")
Compaction 类型:
| 类型 | 说明 | 触发条件 |
|---|---|---|
| 段合并 | 将多个小段合并为大段 | 段数量超过阈值 |
| 垃圾回收 | 清理已删除数据和过期段 | 删除数据量超过阈值 |
| 索引重建 | 合并后重建索引 | 段合并完成后自动触发 |
4.4 海量数据离线导入(Bulk Insert)
当需要导入百万级以上数据时,使用 Bulk Insert 比 insert API 效率更高:
import json
# 1. 准备数据文件(JSON 行格式)
bulk_data = []
for i in range(1000000):
bulk_data.append({
"id": i,
"text": f"文档_{i}",
"embedding": np.random.randn(768).tolist()
})
with open("bulk_data.json", "w") as f:
for item in bulk_data:
f.write(json.dumps(item) + "\n")
# 2. 上传到对象存储(MinIO/S3)
# mc cp bulk_data.json myminio/milvus-bucket/bulk/
# 3. 调用 BulkInsert API
支持的文件格式:
| 格式 | 适用场景 | 压缩支持 |
|---|---|---|
| JSON Rows | 小规模、灵活格式 | 否 |
| Parquet | 大规模、列式存储 | 内置压缩 |
| CSV | 简单数据、兼容性好 | 否 |
4.5 时间旅行(Time Travel)
Milvus 支持按时间戳查询历史数据快照:
import time
timestamp_before = int(time.time())
client.insert(collection_name="knowledge_base", data=new_data)
timestamp_after = int(time.time())
# 查询插入前的数据(不包含新插入的数据)
results = client.query(
collection_name="knowledge_base",
filter="id > 0",
travel_timestamp=timestamp_before
)
# 查询插入后的数据(包含新插入的数据)
results = client.query(
collection_name="knowledge_base",
filter="id > 0",
travel_timestamp=timestamp_after
)
时间旅行应用场景:
- 数据误删恢复:查询删除前的数据快照
- 版本对比:比较不同时间点的数据差异
- 审计追踪:查看历史数据状态
注意:时间旅行依赖 Compaction 前的段数据,已 Compaction 的旧数据可能无法回溯。—
第 5 章 向量索引基础
5.1 索引的核心作用与分类
为什么必须构建索引
无索引(暴力搜索):
查询向量 vs 1亿条向量 → 逐一计算相似度 → O(N) → 延迟: 秒级
有索引(ANN搜索):
查询向量 → 索引结构导航 → 只搜索相关子集 → O(log N) → 延迟: 毫秒级
索引分类
Milvus 索引体系(2.6.18)
├── 向量索引(加速向量相似度搜索)
│ ├── FLAT 暴力搜索,100%召回
│ ├── HNSW 图索引,高召回高性能
│ ├── IVF_FLAT 倒排索引,均衡型
│ ├── IVF_SQ8 量化压缩,省内存
│ ├── IVF_PQ 乘积量化,极致压缩
│ ├── IVF_RABITQ 1-bit量化索引(2.6新增),内存降72%
│ ├── DiskANN 磁盘索引,超大规模
│ ├── GPU_IVF_FLAT GPU加速
│ ├── GPU_IVF_PQ GPU加速+量化
│ ├── GPU_CAGRA GPU图索引(2.5+)
│ ├── SPARSE_INVERTED_INDEX 稀疏倒排索引
│ └── SPARSE_WAND 稀疏WAND索引
│
└── 标量索引(加速标量过滤)
├── INVERTED 倒排索引
├── STL_SORT 排序索引(数值)
├── TRIE 前缀索引(字符串)
└── PATH_INDEX JSON路径索引(2.6新增),过滤快100倍
段与索引的关系
Growing Segment(增长段):
- 正在写入的段,数据在内存中
- 使用暴力搜索(FLAT)进行查询
- 不构建索引
Sealed Segment(封存段):
- 已持久化到对象存储的段
- 触发索引构建任务
- 索引构建完成后,查询使用索引加速
5.2 向量索引选型全景
FLAT 暴力索引
index_params.add_index(
field_name="embedding",
index_type="FLAT",
metric_type="COSINE"
)
- 召回率:100%(精确搜索)
- 查询速度:最慢,O(N)
- 内存占用:原始向量大小
- 适用场景:小数据集(< 10万)、验证基准、需要精确结果的场景
HNSW 索引(主流生产选型)
index_params.add_index(
field_name="embedding",
index_type="HNSW",
metric_type="COSINE",
params={
"M": 16, # 图的连接度
"efConstruction": 256 # 构建时的搜索宽度
}
)
- 召回率:99%+(参数调优后)
- 查询速度:极快,亚毫秒级
- 内存占用:较高(约为原始向量的 1.5~2 倍)
- 适用场景:百万~千万级数据,延迟敏感型应用
HNSW 原理简述:
HNSW(Hierarchical Navigable Small World)构建了一个多层导航图:
- 上层是稀疏的"高速公路",用于快速跳转
- 下层是稠密的"城市道路",用于精确定位
- 查询时从顶层开始,逐层向下逼近最近邻
Layer 2: ○────────────────○ (稀疏,快速跳转)
│ │
Layer 1: ○────○────○────○────○ (中等密度)
│ │ │ │
Layer 0: ○──○──○──○──○──○──○──○ (稠密,精确定位)
IVF 系列索引
# IVF_FLAT:倒排+原始向量
index_params.add_index(
field_name="embedding",
index_type="IVF_FLAT",
metric_type="L2",
params={"nlist": 1024} # 聚类中心数量
)
# IVF_SQ8:倒排+标量量化(8位整数)
index_params.add_index(
field_name="embedding",
index_type="IVF_SQ8",
metric_type="L2",
params={"nlist": 1024}
)
# IVF_PQ:倒排+乘积量化
index_params.add_index(
field_name="embedding",
index_type="IVF_PQ",
metric_type="L2",
params={
"nlist": 1024,
"m": 16, # 子向量数量(dim必须能被m整除)
"nbits": 8 # 每个子量化的位数
}
)
| 索引 | 内存占用 | 召回率 | 查询速度 | 适用场景 |
|---|---|---|---|---|
| IVF_FLAT | 原始大小 | 高 | 中 | 均衡型,中等数据量 |
| IVF_SQ8 | 原始的 25% | 中高 | 中 | 内存受限场景 |
| IVF_PQ | 原始的 5~10% | 中 | 中 | 极端内存受限 |
IVF_RABITQ 索引(2.6 新增,重大突破)
RaBitQ(Random Bit Quantization)是 Milvus 2.6 引入的革命性量化索引,将 FP32 向量压缩为 1-bit 表示:
# IVF_RABITQ:1-bit 量化索引(2.6 新增)
index_params.add_index(
field_name="embedding",
index_type="IVF_RABITQ",
metric_type="COSINE",
params={
"nlist": 1024, # 聚类中心数量
"nbits": 1, # 量化位数(1-bit,默认)
"refine": True, # 可选:启用SQ8精排,提升召回率
"refine_type": "SQ8" # 精排量化类型
}
)
RaBitQ 核心原理:
- 将高维向量归一化后,每个维度用 1 个 bit 表示(正/负)
- 压缩比达到 32:1(FP32 → 1-bit)
- 结合 SQ8 精排(refine),在保持 ~95% 召回率的同时大幅降低内存
性能数据:
| 指标 | HNSW (FP32) | IVF_RABITQ (1-bit) | IVF_RABITQ + SQ8精排 |
|---|---|---|---|
| 内存占用 | 100% | ~8% | ~28% |
| 查询 QPS | 1x | 4x | 4x |
| 召回率 | 99%+ | ~85% | ~95% |
| 适用场景 | 延迟敏感 | 成本敏感 | 均衡 |
选型建议:IVF_RABITQ 是 IVF_SQ8 和 IVF_FLAT 的理想替代方案,在内存成本和召回率之间取得了更好的平衡。
DiskANN 索引
index_params.add_index(
field_name="embedding",
index_type="DISKANN",
metric_type="L2",
params={
"search_list": 100 # 搜索列表大小,影响召回率
}
)
- 核心优势:向量数据存储在 SSD 上,内存只存压缩后的图结构
- 内存占用:约为原始向量的 5~10%
- 查询速度:依赖 SSD 性能,P99 约 5~20ms
- 适用场景:亿级以上数据,成本敏感,可接受稍高延迟
GPU 索引
# GPU_IVF_FLAT
index_params.add_index(
field_name="embedding",
index_type="GPU_IVF_FLAT",
metric_type="L2",
params={"nlist": 1024}
)
# GPU_CAGRA(2.5+ 新增,NVIDIA CAGRA 图索引)
index_params.add_index(
field_name="embedding",
index_type="GPU_CAGRA",
metric_type="L2",
params={
"intermediate_graph_degree": 64,
"graph_degree": 32
}
)
- GPU_CAGRA:Milvus 2.5 新增,索引构建速度比 CPU HNSW 快 40 倍
- CAGRA+Vamana 混合模式(2.6 新增):在 GPU 上构建图索引,但查询时使用 CPU,大幅降低 GPU 部署成本。无需常驻 GPU 即可享受 GPU 构建速度
- 适用场景:高并发批量查询、索引需要频繁重建
稀疏向量索引
# SPARSE_INVERTED_INDEX:稀疏倒排索引
index_params.add_index(
field_name="sparse_embedding",
index_type="SPARSE_INVERTED_INDEX",
metric_type="IP",
params={"drop_ratio_build": 0.2} # 构建时丢弃最小20%的值
)
# SPARSE_WAND:WAND算法索引,查询更快
index_params.add_index(
field_name="sparse_embedding",
index_type="SPARSE_WAND",
metric_type="IP",
params={"drop_ratio_build": 0.2}
)
5.3 标量索引
# 自动索引(Milvus 自动选择最优索引类型)
index_params.add_index(
field_name="category",
index_type="AUTOINDEX"
)
# 倒排索引(推荐,支持等值和范围查询)
index_params.add_index(
field_name="category",
index_type="INVERTED"
)
# 排序索引(数值字段,加速范围查询)
index_params.add_index(
field_name="year",
index_type="STL_SORT"
)
# 前缀索引(VARCHAR字段,加速前缀匹配)
index_params.add_index(
field_name="title",
index_type="TRIE"
)
# JSON Path Index(2.6 新增,JSON嵌套过滤快100倍)
index_params.add_index(
field_name="metadata",
index_type="PATH_INDEX",
params={
"path": "author.name", # JSON路径
"type": "VARCHAR" # 路径值类型
}
)
Path Index 详解(2.6 新增):
Path Index 是 Milvus 2.6 为 JSON 字段引入的专用索引,解决了嵌套 JSON 过滤性能差的痛点:
# 创建带 Path Index 的集合
schema.add_field(field_name="metadata", datatype=DataType.JSON)
index_params.add_index(
field_name="metadata",
index_type="PATH_INDEX",
params={"path": "region", "type": "VARCHAR"}
)
index_params.add_index(
field_name="metadata",
index_type="PATH_INDEX",
params={"path": "stats.views", "type": "INT64"}
)
# 过滤查询自动使用 Path Index
results = client.search(
collection_name="docs",
data=[query_vector],
filter='metadata["region"] == "cn" && metadata["stats"]["views"] > 1000',
limit=10
)
# 无 Path Index: 全集扫描,JSON解析开销大
# 有 Path Index: 索引直接定位,速度提升 100 倍
| 对比 | 无 Path Index | 有 Path Index |
|---|---|---|
| 嵌套 JSON 过滤 | 全集扫描 | 索引定位 |
| 过滤性能 | 基准 | 快 100 倍 |
| 存储开销 | 无 | 轻微增加 |
| 适用场景 | 简单过滤 | 复杂嵌套 JSON 过滤 |
5.4 索引构建与管理
# 创建索引
client.create_index(
collection_name="knowledge_base",
index_params=index_params
)
# 查看索引信息
indexes = client.list_indexes(collection_name="knowledge_base")
for index in indexes:
print(f"字段: {index['field_name']}, 类型: {index['index_type']}")
# 删除索引
client.drop_index(
collection_name="knowledge_base",
index_name="embedding"
)
索引内存占用估算:
HNSW 索引内存 ≈ 原始向量大小 × (1 + M × 2 × 8 / dim)
以 768 维 FLOAT 向量、M=16 为例:
原始向量: 768 × 4 bytes = 3,072 bytes/条
图结构: 16 × 2 × 8 = 256 bytes/条
总计: ~3,328 bytes/条
百万条: ~3.2 GB
千万条: ~32 GB
第 6 章 向量查询与检索
6.1 基础向量检索
search() 方法参数全解
results = client.search(
collection_name="knowledge_base", # 集合名称
data=[query_vector], # 查询向量列表(支持批量)
limit=10, # 返回Top-K结果
output_fields=["text", "category"], # 返回的字段
search_params={ # 搜索参数
"metric_type": "COSINE",
"params": {
"ef_search": 128, # HNSW搜索宽度
}
},
filter='category == "技术"', # 标量过滤条件
partition_names=["partition_2024"], # 指定分区
timeout=10.0, # 超时时间(秒)
consistency_level="Bounded", # 一致性级别
)
距离分数解读
不同度量方式下,distance 值的含义不同:
| 度量方式 | distance 含义 | 完全相同 | 完全无关 | 排序 |
|---|---|---|---|---|
| COSINE | 余弦距离 | 0 | 2 | 升序(越小越相似) |
| L2 | 欧氏距离 | 0 | +∞ | 升序(越小越相似) |
| IP | 内积 | 最大值 | 负值 | 降序(越大越相似) |
# 距离阈值过滤示例
results = client.search(
collection_name="knowledge_base",
data=[query_vector],
limit=20,
search_params={
"metric_type": "COSINE",
"params": {"ef_search": 128}
}
)
# 过滤距离大于0.5的结果(余弦距离越小越相似)
filtered = [
hit for hits in results for hit in hits
if hit['distance'] < 0.5
]
6.2 标量过滤混合检索
过滤表达式语法全解
Milvus 的过滤表达式支持丰富的操作符:
# 比较运算符
filter = 'age > 18'
filter = 'price <= 100.0'
filter = 'category == "技术"'
filter = 'category != "过期"'
# 逻辑运算符
filter = 'age > 18 && category == "技术"' # AND
filter = 'category == "技术" || category == "产品"' # OR
filter = 'not (category == "过期")' # NOT
# IN / NOT IN
filter = 'category in ["技术", "产品", "运营"]'
filter = 'status not in ["deleted", "archived"]'
# LIKE 模式匹配
filter = 'title like "%向量%"'
filter = 'title like "Milvus%"'
# JSON 字段嵌套过滤
filter = 'metadata["source"] == "wiki"'
filter = 'metadata["stats"]["views"] > 1000'
filter = 'metadata["tags"][0] == "AI"'
# 数组字段过滤
filter = 'array_contains(tags, "AI")'
filter = 'array_contains_all(tags, ["AI", "数据库"])'
filter = 'array_contains_any(tags, ["AI", "ML"])'
filter = 'array_length(tags) > 3'
前置过滤 vs 后置过滤原理
前置过滤(Pre-filtering):
1. 先根据标量条件过滤出候选集
2. 在候选集上进行向量搜索
3. 适合高选择性过滤(过滤掉大部分数据)
后置过滤(Post-filtering):
1. 先进行向量搜索得到 Top-K
2. 再根据标量条件过滤
3. 适合低选择性过滤(过滤掉少量数据)
Milvus 2.5 默认使用前置过滤(过滤下推优化)
6.3 其他查询能力
query():标量条件精确查询
# 按条件查询(不涉及向量检索)
results = client.query(
collection_name="knowledge_base",
filter='category == "技术" && year >= 2024',
output_fields=["id", "text", "category", "year"],
limit=100
)
# 按主键查询
results = client.query(
collection_name="knowledge_base",
ids=[1, 2, 3],
output_fields=["id", "text"]
)
get():按主键批量获取数据
# 按主键获取完整数据
results = client.get(
collection_name="knowledge_base",
ids=[1, 2, 3],
output_fields=["id", "text", "category"]
)
分页查询
# 方式一:limit + offset 分页(2.5+ 支持)
results = client.query(
collection_name="knowledge_base",
filter='category == "技术"',
limit=10,
offset=20 # 跳过前20条,返回第21~30条
)
# 方式二:游标分页(基于主键,性能更好)
last_id = 0
page_size = 10
while True:
results = client.query(
collection_name="knowledge_base",
filter=f'category == "技术" && id > {last_id}',
limit=page_size,
output_fields=["id", "text"]
)
if not results:
break
last_id = results[-1]["id"]
process_results(results)
分页方式对比:
| 方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| limit+offset | 简单直观 | 深度分页性能差 | 浅分页 |
| 游标分页 | 深度分页性能好 | 不能跳页 | 流式遍历 |
分组查询(Group By)
Milvus 2.5 新增 Group By 功能,按元数据分组返回 Top-K:
# 按分类分组,每组返回最相似的1条
results = client.search(
collection_name="knowledge_base",
data=[query_vector],
limit=5, # 最多返回5组
group_by_field="category", # 按category分组
output_fields=["text", "category"]
)
# 应用场景:
# - 搜索结果去重:按文档ID分组,避免同一文档的多个片段
# - 分类检索:每个分类返回最相关的一条
# - 多样性保证:确保结果来自不同来源
6.4 高级检索特性
多向量字段联合检索
Milvus 支持在同一个集合中存储多个向量字段,并进行联合检索:
from pymilvus import AnnSearchRequest, WeightedRanker
# 假设集合同时有 dense_embedding 和 sparse_embedding 两个向量字段
# 创建多个搜索请求
req_dense = AnnSearchRequest(
data=[query_dense_vector],
anns_field="dense_embedding",
param={"metric_type": "COSINE", "params": {"ef_search": 128}},
limit=20
)
req_sparse = AnnSearchRequest(
data=[query_sparse_vector],
anns_field="sparse_embedding",
param={"metric_type": "IP"},
limit=20
)
# 使用加权融合
results = client.hybrid_search(
collection_name="knowledge_base",
reqs=[req_dense, req_sparse],
ranker=WeightedRanker(0.7, 0.3), # 稠密向量权重0.7,稀疏向量权重0.3
limit=10
)
稀疏向量 + 稠密向量混合检索
这是 RAG 系统中最常用的混合检索模式:
from pymilvus import AnnSearchRequest, WeightedRanker, RRFRanker
# 稠密向量搜索请求(语义检索)
req_dense = AnnSearchRequest(
data=[dense_query_vector],
anns_field="dense_embedding",
param={"metric_type": "COSINE", "params": {"ef_search": 128}},
limit=20
)
# 稀疏向量搜索请求(关键词检索)
req_sparse = AnnSearchRequest(
data=[sparse_query_vector],
anns_field="sparse_embedding",
param={"metric_type": "IP"},
limit=20
)
# 方式一:加权融合
results_weighted = client.hybrid_search(
collection_name="knowledge_base",
reqs=[req_dense, req_sparse],
ranker=WeightedRanker(0.7, 0.3),
limit=10
)
# 方式二:RRF 融合(Reciprocal Rank Fusion)
results_rrf = client.hybrid_search(
collection_name="knowledge_base",
reqs=[req_dense, req_sparse],
ranker=RRFRanker(k=60), # k是RRF的平滑参数
limit=10
)
RRF 融合算法原理:
RRF Score = Σ 1/(k + rank_i)
其中 rank_i 是第 i 个检索结果的排名,k 是平滑参数(默认60)
示例:
文档A: 稠密检索排名1, 稀疏检索排名3
RRF Score = 1/(60+1) + 1/(60+3) = 0.0164 + 0.0159 = 0.0323
文档B: 稠密检索排名5, 稀疏检索排名1
RRF Score = 1/(60+5) + 1/(60+1) = 0.0154 + 0.0164 = 0.0318
文档A 的 RRF 分数更高,因为两个检索都排名靠前
全文检索(BM25)实战
Milvus 2.5 新增原生 BM25 全文检索功能:
from pymilvus import MilvusClient, DataType
from pymilvus import Function, FunctionType
# 1. 创建带 BM25 功能的集合
schema = MilvusClient.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=8192)
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)
# 添加 BM25 函数:自动将文本转为稀疏向量
schema.add_function(Function(
name="bm25_func",
input_field_names=["text"],
output_field_names=["sparse_vector"],
function_type=FunctionType.BM25,
))
# 创建索引
index_params = client.prepare_index_params()
index_params.add_index(
field_name="sparse_vector",
index_type="SPARSE_INVERTED_INDEX",
metric_type="BM25" # 使用BM25度量
)
client.create_collection(
collection_name="fulltext_demo",
schema=schema,
index_params=index_params
)
# 2. 插入数据(只需提供文本,稀疏向量自动生成)
data = [
{"text": "Milvus是开源的云原生向量数据库"},
{"text": "向量检索在RAG系统中发挥关键作用"},
{"text": "BM25是经典的关键词检索算法"},
]
client.insert(collection_name="fulltext_demo", data=data)
# 3. 全文检索
results = client.search(
collection_name="fulltext_demo",
data=["向量数据库"], # 直接输入查询文本
anns_field="sparse_vector",
limit=5,
output_fields=["text"]
)
BM25 全文检索原理:
Milvus 2.5 的 BM25 实现基于稀疏向量,核心创新点:
- 插入时自动对文本分词,生成词频(TF)稀疏向量
- 查询时动态计算 BM25 分数(考虑文档长度归一化)
- 不需要预计算 IDF,查询时实时计算
- 支持与稠密向量混合检索,实现语义+关键词双重匹配—
第三阶段:进阶功能与深度使用(Day 7-12)
第 7 章 索引深度解析与参数调优
7.1 HNSW 索引深度
M、efConstruction、efSearch 三大核心参数原理
M(连接度):
- 控制图中每个节点的最大连接数
- M 越大,图的连通性越好,召回率越高
- 但 M 越大,内存占用和构建时间也越大
- 推荐范围:8~64,常用值 16 或 32
efConstruction(构建搜索宽度):
- 构建索引时搜索最近邻的范围
- efConstruction 越大,构建的图质量越高,召回率越高
- 但构建时间显著增加
- 推荐范围:100~500,常用值 256
efSearch(查询搜索宽度):
- 查询时搜索最近邻的范围
- efSearch 越大,查询召回率越高,但延迟也越高
- 可以在查询时动态调整,无需重建索引
- 推荐范围:64~512,常用值 128
参数对性能的量化影响
测试条件:100万条 768维向量,COSINE 度量
M=16, efConstruction=256:
┌──────────┬──────────┬───────────┬──────────┐
│ efSearch │ 召回率@10│ 延迟(ms) │ 内存(GB) │
├──────────┼──────────┼───────────┼──────────┤
│ 64 │ 97.2% │ 0.8 │ 3.2 │
│ 128 │ 98.8% │ 1.2 │ 3.2 │
│ 256 │ 99.5% │ 2.1 │ 3.2 │
│ 512 │ 99.8% │ 3.8 │ 3.2 │
└──────────┴──────────┴───────────┴──────────┘
M=32, efConstruction=512:
┌──────────┬──────────┬───────────┬──────────┐
│ efSearch │ 召回率@10│ 延迟(ms) │ 内存(GB) │
├──────────┼──────────┼───────────┼──────────┤
│ 64 │ 98.5% │ 1.1 │ 4.8 │
│ 128 │ 99.4% │ 1.8 │ 4.8 │
│ 256 │ 99.8% │ 3.2 │ 4.8 │
│ 512 │ 99.9% │ 5.6 │ 4.8 │
└──────────┴──────────┴───────────┴──────────┘
不同数据规模下的参数推荐
| 数据规模 | M | efConstruction | efSearch(默认) | 内存估算 |
|---|---|---|---|---|
| < 100万 | 16 | 256 | 128 | 原始 × 1.5 |
| 100万~1000万 | 16~32 | 256~512 | 128~256 | 原始 × 1.5~2 |
| 1000万~1亿 | 32 | 512 | 256 | 原始 × 2 |
| > 1亿 | 32~64 | 512 | 256~512 | 原始 × 2~2.5 |
HNSW 内存占用计算公式
内存占用 = N × (dim × 4 + M × 2 × 8) bytes
其中:
N = 向量数量
dim = 向量维度
M = 连接度参数
4 = float32 字节数
8 = int64 字节数(邻居指针)
示例:100万条 768维向量,M=16
= 1,000,000 × (768 × 4 + 16 × 2 × 8)
= 1,000,000 × (3,072 + 256)
= 1,000,000 × 3,328
= 3.1 GB
7.2 IVF 系列索引深度
nlist、nprobe 参数原理
nlist(聚类中心数量):
- 将向量空间划分为 nlist 个聚类(Voronoi 单元)
- nlist 越大,每个聚类越小,查询越精确
- nlist 推荐:√N ~ 4√N(N 为数据量)
nprobe(查询时搜索的聚类数):
- 查询时搜索 nprobe 个最近的聚类
- nprobe 越大,召回率越高,延迟越高
- nprobe ≤ nlist
nlist=1024, nprobe=32:
搜索范围 = 32/1024 = 3.1% 的数据
召回率约 95%
nlist=1024, nprobe=64:
搜索范围 = 64/1024 = 6.25% 的数据
召回率约 98%
乘积量化(PQ)原理与精度损失
PQ 将高维向量拆分为多个子向量,分别量化:
原始向量 (768维, float32):
[0.12, -0.34, 0.56, ..., 0.78] → 768 × 4 = 3,072 bytes
PQ 量化 (m=16, nbits=8):
将768维拆分为16个子空间,每个48维
每个子空间用256个聚类中心表示
量化后: 16 × 1 = 16 bytes
压缩比: 3,072 / 16 = 192倍!
精度损失: 召回率下降约 5~15%
IVF 索引选型决策树
是否内存充足?
├── 是 → HNSW(最佳性能)
└── 否 → 是否可接受 SSD 延迟?
├── 是 → DiskANN
└── 否 → 对召回率要求?
├── 高 → IVF_FLAT
├── 中 → IVF_SQ8
└── 低 → IVF_PQ
7.3 DiskANN 索引深度
磁盘索引的技术突破
DiskANN 的核心思想是将向量数据存储在 SSD 上,内存中只保留压缩后的图结构(PQ 量化向量):
内存中:PQ 压缩向量 + 图结构
PQ 向量: 16 bytes/条(vs 原始 3072 bytes)
图结构: 约 64 bytes/条
总计: ~80 bytes/条
SSD 上:原始向量数据
原始向量: 3072 bytes/条
查询流程:
1. 在内存中用 PQ 向量做粗排,确定候选集
2. 从 SSD 读取候选集的原始向量
3. 用原始向量做精排,返回最终结果
核心参数配置
index_params.add_index(
field_name="embedding",
index_type="DISKANN",
metric_type="L2",
params={
"search_list": 100, # 搜索列表大小,越大召回越高
"pq_code_budget_gb": 8 # PQ 编码内存预算(GB)
}
)
| search_list | 召回率@10 | 延迟(ms) | 适用场景 |
|---|---|---|---|
| 50 | 92% | 3~5 | 低延迟优先 |
| 100 | 96% | 5~10 | 均衡 |
| 200 | 98% | 10~20 | 高召回优先 |
7.4 GPU 索引深度
GPU 索引的硬件要求
| 组件 | 最低要求 | 推荐 |
|---|---|---|
| GPU | NVIDIA T4 (16GB) | A100 (80GB) |
| CUDA | 11.8+ | 12.0+ |
| 驱动 | 525+ | 535+ |
| 内存 | GPU 内存 ≥ 向量数据 | GPU 内存 ≥ 2× 向量数据 |
GPU_CAGRA 索引(2.5+ 新增)
index_params.add_index(
field_name="embedding",
index_type="GPU_CAGRA",
metric_type="L2",
params={
"intermediate_graph_degree": 64, # 中间图度数
"graph_degree": 32, # 最终图度数
"build_algo": "IVF_PQ", # 构建算法
"cache_dataset_on_device": "true" # 缓存数据到GPU
}
)
GPU_CAGRA 性能对比:
| 操作 | CPU HNSW | GPU_CAGRA | 提升倍数 |
|---|---|---|---|
| 索引构建(100万条768维) | ~120s | ~3s | 40× |
| 查询 QPS(Top-10) | ~2000 | ~8000 | 4× |
| 批量查询(1000条) | ~500ms | ~50ms | 10× |
7.5 索引选型方法论
选型公式
如果 数据量 < 100万 且 延迟要求 < 5ms:
→ HNSW (M=16, efConstruction=256)
如果 100万 < 数据量 < 1000万 且 内存充足:
→ HNSW (M=16~32, efConstruction=256~512)
如果 1000万 < 数据量 < 1亿 且 内存受限:
→ DiskANN 或 IVF_SQ8
如果 数据量 > 1亿:
→ DiskANN + 分层存储
如果有 GPU 且 需要高QPS:
→ GPU_CAGRA
如果需要关键词检索:
→ SPARSE_INVERTED_INDEX 或 SPARSE_WAND
索引选型常见误区
- 误区:数据量小就不需要索引 → 即使 10 万条,HNSW 也比 FLAT 快 10 倍
- 误区:HNSW 总是最好的 → 亿级数据 HNSW 内存可能不够
- 误区:DiskANN 很慢 → 现代 NVMe SSD 上 DiskANN P99 可达 10ms
- 误区:GPU 索引总是更快 → 单条查询 GPU 开销可能更大,批量查询才占优
第 8 章 混合搜索与高级检索
8.1 混合检索架构原理
向量检索 + 标量过滤的执行全流程
客户端请求: search(vector, filter='category == "技术"')
│
▼
Proxy: 请求路由与参数校验
│
▼
QueryNode:
┌─────────────────────────────────────────┐
│ 1. 分区裁剪:根据 filter 裁剪不相关分区 │
│ 2. 段级过滤:在每个 Segment 上执行 │
│ a. 标量过滤:根据 filter 过滤候选集 │
│ b. 向量搜索:在过滤后的候选集中搜索 │
│ 3. 结果合并:合并多个 Segment 的结果 │
└─────────────────────────────────────────┘
│
▼
Proxy: 结果聚合与排序返回
Milvus 的过滤下推优化机制
Milvus 2.5 优化了过滤执行策略,将过滤操作下推到段级别执行:
- 高选择性过滤(过滤掉 > 90% 数据):先过滤再搜索,效率高
- 低选择性过滤(过滤掉 < 10% 数据):先搜索再过滤,避免索引失效
- Milvus 自动根据统计信息选择最优策略
8.2 稀疏向量检索实战
稀疏向量生成方案
方案一:BM25(Milvus 内置)
# Milvus 2.5 内置 BM25,插入文本时自动生成稀疏向量
# 无需额外模型,详见第6章全文检索部分
方案二:SPLADE(学习型稀疏向量)
from transformers import AutoModelForMaskedLM, AutoTokenizer
import torch
# 加载 SPLADE 模型
tokenizer = AutoTokenizer.from_pretrained("naver/splade-cocondenser-ensembledistil")
model = AutoModelForMaskedLM.from_pretrained("naver/splade-cocondenser-ensembledistil")
def get_sparse_vector(text):
"""使用SPLADE生成稀疏向量"""
tokens = tokenizer(text, return_tensors="pt")
with torch.no_grad():
output = model(**tokens)
logits = output.logits
# SPLADE: max pooling + ReLU + log
sparse_vec = torch.max(
torch.log1p(torch.relu(logits)) * tokens["attention_mask"].unsqueeze(-1),
dim=1
).values.squeeze()
# 转为稀疏格式 {token_id: weight}
nonzero = sparse_vec.nonzero().squeeze()
weights = sparse_vec[nonzero]
return {int(k): float(v) for k, v in zip(nonzero.tolist(), weights.tolist())}
# 生成稀疏向量
sparse_vec = get_sparse_vector("向量数据库在RAG系统中的应用")
# 输出格式: {1023: 2.34, 4567: 1.56, ...} → token_id: weight
关键词 + 语义混合检索完整实现
from pymilvus import MilvusClient, DataType, AnnSearchRequest, WeightedRanker
# 1. 创建集合(稠密+稀疏双向量)
schema = MilvusClient.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=8192)
schema.add_field(field_name="dense_vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)
index_params = client.prepare_index_params()
index_params.add_index(field_name="dense_vector", index_type="HNSW",
metric_type="COSINE", params={"M": 16, "efConstruction": 256})
index_params.add_index(field_name="sparse_vector", index_type="SPARSE_INVERTED_INDEX",
metric_type="IP")
client.create_collection("hybrid_search_demo", schema=schema, index_params=index_params)
# 2. 插入数据
from sentence_transformers import SentenceTransformer
dense_model = SentenceTransformer('BAAI/bge-large-zh-v1.5')
texts = ["Milvus向量数据库架构设计", "RAG系统最佳实践", "深度学习模型优化"]
data = []
for text in texts:
dense_vec = dense_model.encode(text).tolist()
sparse_vec = get_sparse_vector(text) # 使用SPLADE
data.append({"text": text, "dense_vector": dense_vec, "sparse_vector": sparse_vec})
client.insert("hybrid_search_demo", data)
# 3. 混合检索
query_text = "向量数据库"
dense_query = dense_model.encode(query_text).tolist()
sparse_query = get_sparse_vector(query_text)
req_dense = AnnSearchRequest(
data=[dense_query],
anns_field="dense_vector",
param={"metric_type": "COSINE", "params": {"ef_search": 128}},
limit=20
)
req_sparse = AnnSearchRequest(
data=[sparse_query],
anns_field="sparse_vector",
param={"metric_type": "IP"},
limit=20
)
results = client.hybrid_search(
collection_name="hybrid_search_demo",
reqs=[req_dense, req_sparse],
ranker=WeightedRanker(0.7, 0.3),
limit=10
)
多语言 BM25 分析器(2.6 新增)
Milvus 2.6 为 BM25 全文检索引入了多语言分析器支持,自动识别文本语言并应用对应的分词策略:
# 2.6 新增:创建集合时指定多语言 BM25 分析器
from pymilvus import MilvusClient, DataType, Function, FunctionType
schema = MilvusClient.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=8192)
schema.add_field(field_name="dense_vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)
# 多语言 BM25 函数(2.6 新增)
schema.add_function(Function(
name="bm25_multilang",
input_field_names=["text"],
output_field_names=["sparse_vector"],
function_type=FunctionType.BM25,
params={
"analyzer_params": {
"type": "multi_language" # 自动检测语言并分词
}
}
))
支持的分析器类型:
| 分析器 | 说明 | 适用语言 |
|---|---|---|
standard |
默认通用分词器 | 英语等空格分隔语言 |
chinese |
中文分词器 | 中文 |
multi_language |
自动语言检测 | 多语言混合 |
| 自定义 | 指定分词器和参数 | 特定需求 |
多语言 BM25 性能提升:
- 相比 Elasticsearch,BM25 全文检索速度提升 4 倍(部分数据集可达 7 倍)
- 索引大小仅为 Elasticsearch 的约 1/3
- 支持短语匹配(Phrase Match),精确匹配连续词组
8.3 多模态向量检索
图文跨模态检索完整实现
from pymilvus import MilvusClient, DataType
from sentence_transformers import SentenceTransformer
# 使用 CLIP 模型实现图文跨模态检索
model = SentenceTransformer('clip-ViT-B-32')
# 创建集合
schema = MilvusClient.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="image_path", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="description", datatype=DataType.VARCHAR, max_length=1024)
schema.add_field(field_name="clip_vector", datatype=DataType.FLOAT_VECTOR, dim=512)
index_params = client.prepare_index_params()
index_params.add_index(field_name="clip_vector", index_type="HNSW",
metric_type="COSINE", params={"M": 16, "efConstruction": 256})
client.create_collection("image_search", schema=schema, index_params=index_params)
# 插入图片数据
from PIL import Image
images = [
{"path": "cat.jpg", "desc": "一只橘猫在阳光下睡觉"},
{"path": "dog.jpg", "desc": "金毛犬在草地上奔跑"},
]
for img in images:
image = Image.open(img["path"])
vector = model.encode(image).tolist()
client.insert("image_search", [{
"image_path": img["path"],
"description": img["desc"],
"clip_vector": vector
}])
# 用文本搜索图片
query_vector = model.encode("可爱的猫咪").tolist()
results = client.search(
collection_name="image_search",
data=[query_vector],
limit=5,
output_fields=["image_path", "description"]
)
8.4 两阶段检索与重排序
粗排 + 精排的两阶段检索架构
第一阶段(粗排):Milvus ANN 检索
→ 返回 Top-100 候选结果
→ 延迟低,召回率高
第二阶段(精排):Reranker 模型
→ 对100个候选结果精排
→ 返回 Top-10 最终结果
→ 精度高,延迟可控
from sentence_transformers import CrossEncoder
# 使用 Cross-Encoder 做精排
reranker = CrossEncoder('BAAI/bge-reranker-large')
# 第一阶段:Milvus 粗排
results = client.search(
collection_name="knowledge_base",
data=[query_vector],
limit=100, # 多召回一些候选
output_fields=["text"]
)
# 提取候选文档
candidates = []
for hits in results:
for hit in hits:
candidates.append(hit['entity']['text'])
# 第二阶段:Reranker 精排
pairs = [[query_text, doc] for doc in candidates]
scores = reranker.predict(pairs)
# 按精排分数排序
ranked_results = sorted(
zip(candidates, scores),
key=lambda x: x[1],
reverse=True
)[:10]
for doc, score in ranked_results:
print(f"分数: {score:.4f}, 文本: {doc[:50]}...")
HyDE 检索增强技术
# HyDE (Hypothetical Document Embeddings)
# 核心思想:让LLM生成假设性答案,用假设答案的向量做检索
def hyde_search(query_text, llm_client, milvus_client, collection_name):
# 1. 让LLM生成假设性答案
prompt = f"请回答以下问题,给出详细答案:\n{query_text}"
hypothetical_answer = llm_client.generate(prompt)
# 2. 用假设答案的向量做检索(比直接用问题向量效果更好)
query_vector = dense_model.encode(hypothetical_answer).tolist()
results = milvus_client.search(
collection_name=collection_name,
data=[query_vector],
limit=10,
output_fields=["text"]
)
return results
第 9 章 数据管理高级特性
9.1 数据一致性体系
四种一致性级别
Milvus 提供四种一致性级别,基于 Guarantee Timestamp(GuaranteeTs)机制实现:
| 一致性级别 | GuaranteeTs 设置 | 特点 | 适用场景 |
|---|---|---|---|
| Strong | 最新时间戳 | 读取所有已写入数据,延迟最高 | 金融交易、实时监控 |
| Bounded | 最新时间戳 - 容忍窗口 | 允许短时间内的数据不一致 | 大多数业务场景(推荐) |
| Session | 客户端最后写入时间戳 | 自己写入的数据立即可见 | 交互式应用 |
| Eventually | 极小值(如1) | 不保证一致性,延迟最低 | 日志分析、推荐召回 |
# 在查询时指定一致性级别
results = client.search(
collection_name="knowledge_base",
data=[query_vector],
limit=10,
consistency_level="Strong" # Strong / Bounded / Session / Eventually
)
# 在创建集合时指定默认一致性级别
client.create_collection(
collection_name="knowledge_base",
schema=schema,
index_params=index_params,
consistency_level="Bounded"
)
一致性级别性能对比:
测试条件:100万条数据,持续写入
Strong: 查询延迟 P99 = 15ms (等待数据同步完成)
Bounded: 查询延迟 P99 = 5ms (容忍2秒窗口)
Session: 查询延迟 P99 = 5ms (自己写入立即可见)
Eventually: 查询延迟 P99 = 3ms (不等待同步)
9.2 存储优化与数据生命周期
向量量化压缩技术
Milvus 支持多种向量压缩方式来降低内存占用:
| 压缩方式 | 压缩比 | 精度损失 | 实现方式 |
|---|---|---|---|
| SQ8(标量量化) | 4× | 低 | float32 → int8 |
| PQ(乘积量化) | 12~48× | 中 | 子向量聚类编码 |
| Float16 | 2× | 极低 | float32 → float16 |
| BFloat16 | 2× | 低 | float32 → bfloat16 |
# 使用 Float16 向量字段节省内存
schema.add_field(
field_name="fp16_embedding",
datatype=DataType.FLOAT16_VECTOR,
dim=768
)
# 内存占用减半,精度损失极小
冷热数据分层存储架构
Milvus 2.5 引入分层存储(Tiered Storage),允许冷数据按需从磁盘加载:
热数据层(内存) 冷数据层(SSD)
┌──────────────┐ ┌──────────────┐
│ 最近访问的段 │ │ 不常访问的段 │
│ 索引结构 │ │ 原始向量数据 │
│ 高频查询数据 │ │ 低频查询数据 │
│ │ │ │
│ 延迟: <5ms │ │ 延迟: 5~20ms │
│ 成本: 高 │ │ 成本: 低 │
└──────────────┘ └──────────────┘
分层存储配置:
# Helm values 中配置分层存储
queryNode:
config:
queryNode:
lazyLoadEnabled: true # 启用懒加载
loadingMemoryPortion: 0.3 # 内存中保留30%热数据
9.3 数据备份与迁移
全量备份方案
# 使用 Milvus Backup 工具
# 安装
pip install milvus-backup
# 创建备份
milvus-backup create -n backup_20260614 --milvus-config config.yaml
# 恢复备份
milvus-backup restore -n backup_20260614 --milvus-config config.yaml
跨集群数据迁移
# 使用 Milvus Backup 进行跨集群迁移
# 1. 在源集群创建备份
milvus-backup create -n migration_backup --milvus-config source_config.yaml
# 2. 将备份数据复制到目标集群的对象存储
# 使用 rclone 或 mc 工具
# 3. 在目标集群恢复
milvus-backup restore -n migration_backup --milvus-config target_config.yaml
9.4 多租户与资源隔离
资源组(Resource Group)机制
# 创建资源组
client.create_resource_group("rg_tenant_a", config={
"node_num": 2 # 分配2个QueryNode
})
# 将集合加载到指定资源组
client.load_collection(
collection_name="tenant_a_collection",
resource_groups=["rg_tenant_a"]
)
多租户隔离方案
| 方案 | 隔离级别 | 资源开销 | 适用场景 |
|---|---|---|---|
| 每租户一个集合 | 强隔离 | 高 | 大租户 |
| 每租户一个分区 | 中隔离 | 中 | 中等租户 |
| 每租户一个字段值 | 弱隔离 | 低 | 小租户 |
| 每租户一个数据库 | 强隔离 | 高 | 企业级 |
# 方案一:每租户一个集合(强隔离)
client.create_collection(f"tenant_{tenant_id}_docs", schema=schema, ...)
# 方案二:每租户一个分区(中隔离)
client.create_partition("shared_docs", f"tenant_{tenant_id}")
# 查询时指定分区
client.search("shared_docs", partition_names=[f"tenant_{tenant_id}"], ...)
# 方案三:分区键自动隔离(推荐)
# 使用 tenant_id 作为分区键
schema.add_field(field_name="tenant_id", datatype=DataType.VARCHAR,
max_length=64, is_partition_key=True)
# 查询时自动分区裁剪
client.search("shared_docs", filter=f'tenant_id == "{tenant_id}"', ...)
9.5 安全与合规
传输加密 TLS 配置
# Milvus Helm values 中启用 TLS
proxy:
tls:
enabled: true
certSecret: milvus-tls-secret
# 创建 TLS 证书 Secret
kubectl create secret generic milvus-tls-secret \
--from-file=tls.crt=server.crt \
--from-file=tls.key=server.key \
--from-file=ca.crt=ca.crt \
-n milvus
身份认证
# 启用用户名密码认证
common:
security:
authorizationEnabled: true
# 连接时提供认证信息
client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus" # 用户名:密码
)
RBAC 权限控制
Milvus 2.5 提供 9 个内置特权组,覆盖从只读到管理的所有权限:
| 特权组 | 级别 | 权限范围 |
|---|---|---|
| COLL_RO | 集合 | 读取集合数据 |
| COLL_RW | 集合 | 读写集合数据 |
| COLL_ADMIN | 集合 | 集合管理(含DDL) |
| DB_RO | 数据库 | 读取数据库 |
| DB_RW | 数据库 | 读写数据库 |
| DB_Admin | 数据库 | 数据库管理 |
| Cluster_RO | 集群 | 全局只读 |
| Cluster_RW | 集群 | 全局读写 |
| Cluster_Admin | 集群 | 全局管理 |
# 创建用户和角色
client.create_user("analyst", "secure_password")
client.create_role("data_analyst")
# 授予角色权限
client.grant_privilege("data_analyst", "COLL_RO", "knowledge_base", "*")
# 将角色分配给用户
client.grant_role("analyst", "data_analyst")
# 验证权限
client.describe_role("data_analyst")
```---
# 第四阶段:分布式架构与生产部署(Day 13-18)
## 第 10 章 Milvus 分布式架构深度解析
### 10.1 云原生存算分离架构总览
Milvus 2.6 在 2.5 四层架构基础上,引入了 StreamingNode 流式节点和 Woodpecker WAL 引擎,实现了流批分离架构:
┌──────────────────────────────────────────────────────────────┐
│ 访问层 (Access Layer) │
│ Proxy (无状态网关) │
├──────────────────────────────────────────────────────────────┤
│ 协调层 (Coordinator Layer) │
│ RootCoord │ DataCoord │ QueryCoord │ IndexCoord │
├──────────────────────────────────────────────────────────────┤
│ 计算层 (Worker Layer) │
│ StreamingNode(2.6新增) │ DataNode │ QueryNode │ IndexNode │
├──────────────────────────────────────────────────────────────┤
│ 存储层 (Storage Layer) │
│ etcd │ Woodpecker(2.6新增)/Pulsar/Kafka │ MinIO/S3 │
└──────────────────────────────────────────────────────────────┘
**2.6 架构升级要点**:
1. **StreamingNode(流式节点)**:新增的流式处理节点,负责实时数据摄入和即时索引
2. **Woodpecker(零磁盘 WAL)**:替代 Kafka/Pulsar 的自研 WAL 引擎,零磁盘架构
3. **Storage v2**:新存储引擎,降低 IOPS 和内存占用
4. **流批分离**:实时写入走 StreamingNode,批量处理走 DataNode
**存算分离的核心优势**:
1. **独立扩展**:计算节点和存储可以独立扩缩容
2. **资源优化**:计算节点无状态,可按需弹性伸缩
3. **高可用**:存储层由专业组件保障,计算节点故障可快速恢复
4. **成本优化**:冷数据存对象存储,热数据存内存
### 10.2 访问层:Proxy 节点
Proxy 是 Milvus 的无状态网关,是客户端请求的唯一入口:
**核心职责**:
1. **请求路由**:接收客户端请求,路由到正确的协调器
2. **参数校验**:校验请求参数的合法性
3. **结果聚合**:MPP 架构下,聚合多个 QueryNode 的中间结果
4. **限流熔断**:保护后端服务不被过载请求打垮
客户端请求流程:
Client → Load Balancer → Proxy
│
├── 参数校验
├── 请求路由
├── 转发到 QueryNode/DataNode
├── 收集各节点结果
└── 聚合排序返回客户端
**运维要点**:
- Proxy 无状态,可随意水平扩展
- 生产环境建议至少部署 2 个 Proxy + 负载均衡器
- 通过 Nginx 或 K8s Ingress 实现负载均衡
### 10.3 协调层:Coord 组件
协调层是 Milvus 的"大脑",负责元数据管理和任务调度:
#### RootCoord:元数据管理中枢
- **DDL 执行**:处理 Collection、Partition、Index 的创建/删除
- **全局时间戳分配**:通过 TSO(Timestamp Oracle)分配全局单调递增时间戳
- **元数据管理**:维护所有 Collection 的 Schema、索引定义等元信息
#### DataCoord:数据调度中枢
- **段生命周期管理**:管理 Segment 从 Growing → Sealed → Flushed 的状态转换
- **数据节点调度**:分配写入通道(Channel)给 DataNode
- **Compaction 调度**:触发和监控段合并任务
- **垃圾回收**:清理过期的段文件和删除标记
#### QueryCoord:查询调度中枢
- **段加载调度**:决定哪些 Segment 加载到哪些 QueryNode
- **负载均衡**:在 QueryNode 之间均衡段分布
- **副本管理**:管理段的多个副本,保证高可用
#### IndexCoord:索引调度中枢
- **索引任务调度**:将索引构建任务分配给 IndexNode
- **索引版本管理**:管理索引的版本和状态
### 10.4 计算层:Node 节点
#### DataNode:数据写入处理
数据写入流程:
Pulsar/Kafka → DataNode 消费消息
│
├── 写入 Growing Segment(内存缓冲区)
├── 当段大小达到阈值 → Flush 到对象存储
└── 通知 DataCoord 段状态变更
- 每个 DataNode 消费一个或多个写入通道(Channel)
- Growing Segment 在内存中累积,达到阈值后 Flush
- Flush 后的 Segment 变为 Sealed,触发索引构建
#### QueryNode:查询执行引擎
查询执行流程:
Proxy → QueryNode
│
├── 加载 Sealed Segment 到内存
├── 在 Growing + Sealed Segment 上执行搜索
├── 应用 Delete Log 过滤已删除数据
├── 应用标量过滤条件
└── 返回 Top-K 中间结果给 Proxy
- QueryNode 是内存密集型节点,负责向量查询和标量过滤
- 支持多副本:同一 Segment 可加载到多个 QueryNode
- 分层存储:冷数据按需从磁盘加载
#### IndexNode:索引构建执行器
- 接收 IndexCoord 分配的索引构建任务
- 从对象存储读取 Sealed Segment 数据
- 构建索引文件并写回对象存储
- 通知 IndexCoord 构建完成
### 10.5 存储层:核心依赖组件
#### 元数据存储:etcd
etcd 存储内容:
├── Collection 元数据(Schema、索引定义、配置)
├── Segment 元数据(状态、大小、行数)
├── 索引元数据(类型、参数、构建状态)
├── Checkpoint 信息(消息消费进度)
└── 全局时间戳信息
**运维要点**:
- etcd 是有状态组件,建议 3 节点集群部署
- 定期备份数据,etcd 数据丢失会导致集群不可用
- 控制 etcd 数据量,Milvus 会自动压缩历史修订版
#### 消息存储:Woodpecker / Pulsar / Kafka
**Woodpecker(2.6 新增,推荐)**:
Woodpecker 是 Milvus 2.6 自研的云原生 WAL 引擎,采用 Zero-Disk 零磁盘架构,可直接替代 Kafka/Pulsar:
Woodpecker 核心设计:
┌─────────────────────────────────────────┐
│ Zero-Disk 架构 │
│ ├── 所有日志数据存储在对象存储(S3/GCS) │
│ ├── 元数据由 etcd 管理 │
│ ├── 无本地磁盘依赖 │
│ └── 完美对齐云原生原则 │
└─────────────────────────────────────────┘
部署模式:
-
MemoryBuffer 模式(轻量级)
- 客户端内存缓冲写入,定期 Flush 到对象存储
- 无需独立部署,维护成本为零
- 适合中小规模场景
-
Independent 部署模式(高性能)
- 独立 Woodpecker 服务
- 更高写入吞吐
- 适合大规模生产环境
**Woodpecker vs Pulsar vs Kafka**:
| 维度 | Woodpecker | Pulsar | Kafka |
|------|-----------|--------|-------|
| 部署复杂度 | 极低 | 高 | 中 |
| 本地磁盘 | 不需要 | 需要 | 需要 |
| 运维成本 | 极低 | 高 | 中 |
| 写入性能 | 高 | 高 | 高 |
| 云原生 | 原生 | 部分 | 部分 |
| 2.6 推荐度 | ★★★★★ | ★★★ | ★★★ |
**Pulsar vs Kafka 选择(如不使用 Woodpecker)**:
| 维度 | Pulsar | Kafka |
|------|--------|-------|
| 部署复杂度 | 较高 | 中等 |
| 消息保序 | 支持 | 支持 |
| 多租户 | 原生支持 | 需要额外配置 |
| 社区支持 | Milvus 默认 | 2.5+ 支持 |
| 适用场景 | 大规模生产 | 中等规模 |
#### 对象存储:MinIO / S3
对象存储文件结构:
milvus-bucket/
├── insert_log/ # 插入数据日志
│ └── {collection_id}/
│ └── {partition_id}/
│ └── {segment_id}/
│ ├── {field_id}_1.binlog # 字段数据
│ └── {field_id}_2.binlog
├── delta_log/ # 删除标记日志
│ └── {segment_id}/
│ └── delete_log.binlog
├── index_files/ # 索引文件
│ └── {index_id}/
│ └── index_file.bin
└── stats_log/ # 统计信息
### 10.6 核心全链路流程
#### 数据写入全链路
**2.6 流批分离架构下的写入流程**:
实时写入路径(StreamingNode):
- 客户端调用 insert(data)
- Proxy 校验参数,分配主键(如果 auto_id)
- Proxy 将数据写入 Woodpecker/Pulsar 对应 Shard 的 Channel
- StreamingNode 消费 Channel 消息(2.6 新增)
- StreamingNode 即时构建 Growing Segment 索引,数据立即可查
- Growing Segment 达到阈值,Flush 到对象存储
- DataCoord 更新 Segment 状态为 Sealed
- IndexCoord 触发索引构建任务
- IndexNode 构建索引,写入对象存储
- QueryCoord 调度 QueryNode 加载新的 Sealed Segment
关键改进(2.6):
- StreamingNode 实时消费并构建索引,写入后毫秒级可查
- Woodpecker 替代 Kafka/Pulsar,减少运维复杂度
- Storage v2 降低 IOPS 和内存占用
#### 数据查询全链路
- 客户端调用 search(vector, filter, top_k)
- Proxy 校验参数,解析过滤条件
- Proxy 将请求路由到所有持有相关 Segment 的 QueryNode
- QueryNode 在本地执行搜索:
a. 在 Growing Segment 上暴力搜索
b. 在 Sealed Segment 上使用索引搜索
c. 应用 Delete Log 过滤
d. 应用标量过滤条件
e. 返回 Top-K 中间结果 - Proxy 聚合所有 QueryNode 的中间结果
- Proxy 全局排序,返回最终 Top-K 结果
#### Segment 生命周期详解
Growing → Sealed → Indexed → Loaded → Compacted
Growing: 正在写入,数据在内存
↓ 达到大小阈值或手动 Flush
Sealed: 已封存,数据持久化到对象存储
↓ IndexNode 构建索引
Indexed: 索引构建完成,可被查询
↓ QueryNode 加载到内存
Loaded: 已加载到 QueryNode 内存,可被查询
↓ Compaction 合并
Compacted: 已合并,旧 Segment 标记为待清理
---
## 第 11 章 生产环境部署
### 11.1 部署方案选型
| 方案 | 适用场景 | 数据规模 | 运维复杂度 |
|------|---------|---------|----------|
| Docker Compose 单机 | 开发测试 | < 500万 | 低 |
| 分布式物理机 | 私有化部署 | 千万~亿级 | 高 |
| Kubernetes 集群 | 生产环境(推荐) | 百万~十亿级 | 中 |
| Zilliz Cloud | 快速上线 | 任意规模 | 极低 |
### 11.2 Kubernetes 集群部署详解
#### Helm Chart 部署与配置
```bash
# 生产级部署命令
helm install milvus milvus/milvus \
--namespace milvus \
-f values-prod.yaml \
--timeout 10m
values-prod.yaml 完整配置:
# 集群模式
cluster:
enabled: true
# Proxy 配置
proxy:
replicas: 2
service:
type: LoadBalancer
resources:
requests: { cpu: "2", memory: "4Gi" }
limits: { cpu: "4", memory: "8Gi" }
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app: milvus-proxy
topologyKey: kubernetes.io/hostname
# StreamingNode 配置(2.6 新增,流式处理节点)
streamingNode:
replicas: 2
resources:
requests: { cpu: "2", memory: "8Gi" }
limits: { cpu: "4", memory: "16Gi" }
# QueryNode 配置(内存密集型)
queryNode:
replicas: 3
resources:
requests: { cpu: "4", memory: "16Gi" }
limits: { cpu: "8", memory: "32Gi" }
nodeSelector:
node-type: memory-optimized
tolerations:
- key: "memory-optimized"
operator: "Equal"
value: "true"
effect: "NoSchedule"
# DataNode 配置
dataNode:
replicas: 2
resources:
requests: { cpu: "2", memory: "8Gi" }
limits: { cpu: "4", memory: "16Gi" }
# IndexNode 配置(CPU密集型)
indexNode:
replicas: 2
resources:
requests: { cpu: "4", memory: "16Gi" }
limits: { cpu: "8", memory: "32Gi" }
nodeSelector:
node-type: cpu-optimized
# etcd 高可用
etcd:
replicas: 3
persistence:
enabled: true
storageClass: local-ssd
size: 10Gi
# MinIO 分布式
minio:
mode: distributed
replicas: 4
persistence:
enabled: true
storageClass: local-ssd
size: 500Gi
# Woodpecker 配置(2.6 新增,替代 Pulsar/Kafka)
woodpecker:
enabled: true
# MemoryBuffer 模式:轻量级,无需独立部署
# Independent 模式:高性能,需要独立部署
mode: memorybuffer
# 如果仍需 Pulsar(与 Woodpecker 二选一)
pulsar:
enabled: false
# Milvus 配置
extraConfigFiles:
user.yaml: |
common:
security:
authorizationEnabled: true
queryNode:
lazyLoadEnabled: true
loadingMemoryPortion: 0.5
# 2.6 新增:冷热分层存储配置
tieredStorage:
enabled: true
hotDataTTL: 7d # 热数据保留7天
warmDataTTL: 30d # 温数据保留30天
存储类配置
# 本地 SSD StorageClass(推荐用于 etcd 和热数据)
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: local-ssd
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer
reclaimPolicy: Retain
11.3 集群高可用设计
所有组件的多副本部署
| 组件 | 最小副本数 | 推荐副本数 | 故障影响 |
|---|---|---|---|
| Proxy | 2 | 2~4 | 请求路由不受影响 |
| RootCoord | 1 | 2 | 主备切换 |
| DataCoord | 1 | 2 | 主备切换 |
| QueryCoord | 1 | 2 | 主备切换 |
| IndexCoord | 1 | 2 | 主备切换 |
| StreamingNode | 1 | 2~4 | 写入通道自动迁移(2.6新增) |
| DataNode | 1 | 2~4 | 写入通道自动迁移 |
| QueryNode | 2 | 3~6 | 查询副本自动切换 |
| IndexNode | 1 | 2 | 索引任务自动重分配 |
| etcd | 3 | 3~5 | Raft 协议保证 |
| Woodpecker | 1 | 2~3 | 日志服务自动恢复(2.6新增) |
| MinIO | 4 | 4~8 | 纠删码保证 |
单节点故障自动转移机制
QueryNode 故障恢复流程:
1. QueryCoord 检测到 QueryNode 心跳超时
2. QueryCoord 将该节点上的 Segment 标记为未加载
3. QueryCoord 调度其他 QueryNode 加载这些 Segment
4. 新的查询请求路由到健康的 QueryNode
5. 整个过程约 30~60 秒
11.4 网络与安全加固
# NetworkPolicy 配置
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: milvus-network-policy
namespace: milvus
spec:
podSelector:
matchLabels:
app: milvus
policyTypes:
- Ingress
- Egress
ingress:
- from:
- namespaceSelector:
matchLabels:
name: production
ports:
- port: 19530
protocol: TCP
egress:
- to:
- podSelector:
matchLabels:
app: etcd
ports:
- port: 2379
11.5 容灾部署
同城双活架构
┌─────────────────┐
│ 全局负载均衡 │
└────────┬────────┘
│
┌────────────┼────────────┐
│ │
┌───────▼──────┐ ┌───────▼──────┐
│ 可用区 A │ │ 可用区 B │
│ Milvus集群 │ │ Milvus集群 │
│ (主) │ │ (从) │
│ Proxy ×2 │ │ Proxy ×2 │
│ QueryNode×3 │ │ QueryNode×3 │
│ DataNode×2 │ │ DataNode×2 │
└───────┬──────┘ └───────┬──────┘
│ │
┌───────▼──────┐ ┌───────▼──────┐
│ etcd ×3 │ │ etcd ×3 │
│ MinIO ×4 │ │ MinIO ×4 │
└──────────────┘ └──────────────┘
│ │
└───────── 对象存储同步 ─────┘
RPO/RTO 指标:
- 同城双活:RPO < 1分钟,RTO < 5分钟
- 异地灾备:RPO < 15分钟,RTO < 30分钟
第 12 章 集群运维与监控
12.1 可观测性体系搭建
Prometheus 指标采集配置
# Prometheus ServiceMonitor
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
name: milvus-metrics
namespace: milvus
spec:
selector:
matchLabels:
app: milvus
endpoints:
- port: metrics
interval: 15s
path: /metrics
核心监控指标分类
集群健康指标:
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| milvus_proxy_num_online | 在线 Proxy 数量 | < 2 |
| milvus_querynode_num_online | 在线 QueryNode 数量 | < 配置副本数 |
| milvus_datanode_num_online | 在线 DataNode 数量 | < 1 |
| etcd_server_has_leader | etcd 是否有 leader | 0 |
写入指标:
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| milvus_proxy_insert_vectors_count | 写入向量数/秒 | 异常下降 |
| milvus_proxy_insert_latency | 写入延迟 P99 | > 500ms |
| milvus_msgstream_consume_lag | 消息积压量 | > 100000 |
查询指标:
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| milvus_proxy_search_vectors_count | 查询 QPS | 异常下降 |
| milvus_proxy_search_latency | 查询延迟 P99 | > 100ms |
| milvus_querynode_search_latency | QueryNode 搜索延迟 | > 50ms |
资源指标:
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| milvus_querynode_memory_used | QueryNode 内存使用 | > 85% |
| milvus_querynode_cpu_usage | QueryNode CPU 使用率 | > 80% |
| milvus_indexnode_task_num | 索引构建任务数 | 积压过多 |
Grafana 官方监控大盘
# 导入 Milvus 官方 Grafana Dashboard
# Dashboard ID: 17535 (Milvus Cluster Overview)
# Dashboard ID: 17536 (Milvus Query Node)
# Dashboard ID: 17537 (Milvus Data Node)
# 在 Grafana 中导入
# Configuration → Data Sources → Add Prometheus
# Dashboards → Import → 输入 Dashboard ID
12.2 日志管理
# 使用 Loki 采集 Milvus 日志
# Promtail 配置
scrape_configs:
- job_name: milvus
kubernetes_sd_configs:
- role: pod
namespaces:
names: [milvus]
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
target_label: app
pipeline_stages:
- cri: {}
- labels:
level:
component:
关键错误日志识别:
| 日志关键字 | 含义 | 处理方式 |
|---|---|---|
OOM killed |
内存溢出 | 增加内存或优化查询 |
segment not loaded |
段未加载 | 检查 QueryNode 状态 |
msgstream consume failed |
消息消费失败 | 检查 Pulsar/Kafka |
etcd server timeout |
etcd 超时 | 检查 etcd 集群健康 |
index build failed |
索引构建失败 | 检查 IndexNode 资源 |
12.3 集群扩缩容
# 扩容 QueryNode(在线扩容,无需停机)
kubectl scale deployment milvus-querynode --replicas=5 -n milvus
# QueryCoord 自动检测新节点并重新平衡段分布
# 缩容 QueryNode
kubectl scale deployment milvus-querynode --replicas=3 -n milvus
# QueryCoord 自动将段迁移到剩余节点
# 扩容 Proxy
kubectl scale deployment milvus-proxy --replicas=4 -n milvus
# 存储扩容(MinIO)
# 修改 PVC 大小
kubectl patch pvc data-milvus-minio-0 -p '{"spec":{"resources":{"requests":{"storage":"1Ti"}}}}' -n milvus
扩缩容注意事项:
- QueryNode 扩容后需要等待段加载完成才能接收查询
- 缩容 QueryNode 前确保剩余节点内存充足
- DataNode 扩容后 DataCoord 会自动重新分配 Channel
12.4 版本升级
滚动升级流程
# 1. 备份 etcd 数据
ETCDCTL_API=3 etcdctl snapshot save backup.db
# 2. 更新 Helm 仓库
helm repo update
# 3. 滚动升级
helm upgrade milvus milvus/milvus \
--namespace milvus \
-f values-prod.yaml \
--version 2.6.18 \
--timeout 15m
# 4. 验证升级
kubectl get pods -n milvus
kubectl logs -l app=milvus -n milvus --tail=100
2.5 → 2.6 升级特别注意事项:
从 Milvus 2.5 升级到 2.6 是一次中版本升级,需要注意以下关键变更:
| 变更项 | 说明 | 处理方式 |
|---|---|---|
| StreamingNode | 2.6 新增组件 | 升级后自动部署,确保资源充足 |
| Woodpecker | 可选替代 Pulsar/Kafka | 建议先保留 Pulsar,验证后再切换 |
| Storage v2 | 新存储引擎 | 自动迁移,升级期间 IOPS 可能增加 |
| IVF_RABITQ | 新索引类型 | 需要手动重建索引才能使用 |
| Path Index | 新标量索引 | 需要手动创建,不影响现有索引 |
| Nullable Vector | 向量字段可空 | 新功能,不影响现有集合 |
| Add Field | 动态添加字段 | 新功能,不影响现有集合 |
升级风险评估:
- 小版本升级(2.6.17 → 2.6.18):低风险,仅 bug 修复
- 中版本升级(2.5.x → 2.6.x):中风险,需要关注架构变更
- 大版本升级(2.x → 3.x):高风险,需要全面测试
回滚方案:
# Helm 回滚
helm rollback milvus -n milvus
# 如果 etcd 数据已变更,需要从备份恢复
ETCDCTL_API=3 etcdctl snapshot restore backup.db
12.5 常见故障排查
节点宕机与自动恢复验证
# 模拟 QueryNode 故障
kubectl delete pod milvus-querynode-xxxxx -n milvus
# 观察自动恢复
kubectl get pods -n milvus -w
# 检查段重新加载
kubectl logs -l app=milvus-querynode -n milvus | grep "load segment"
查询超时与慢查询定位
# 1. 检查 QueryNode 资源使用
kubectl top pods -n milvus -l app=milvus-querynode
# 2. 检查段加载状态
# 通过 Birdwatcher 查看
show segment-loaded
# 3. 检查查询延迟分布
# 通过 Prometheus 查询
histogram_quantile(0.99, rate(milvus_proxy_search_latency_bucket[5m]))
写入失败与消息积压排查
# 1. 检查 DataNode 状态
kubectl get pods -n milvus -l app=milvus-datanode
# 2. 检查消息积压
# Prometheus 查询
milvus_msgstream_consume_lag
# 3. 检查 Pulsar/Kafka 状态
kubectl get pods -n milvus -l app=pulsar-broker
内存 OOM 问题排查
OOM 排查步骤:
1. 检查 QueryNode 内存配置
kubectl describe pod milvus-querynode-xxx -n milvus
2. 分析内存使用
- 向量数据大小:N × dim × 4 bytes
- HNSW 图结构:N × M × 2 × 8 bytes
- 标量数据:根据字段类型估算
- Growing Segment 缓冲区
3. 优化方案
- 增加 QueryNode 内存
- 使用 DiskANN 减少内存占用
- 启用分层存储
- 减少 replica_number
- 使用量化压缩(SQ8/PQ)
```---
# 第五阶段:性能优化与大规模实践(Day 19-24)
## 第 13 章 性能优化实战
### 13.1 写入性能优化
#### 批次大小与并发数最优配比
写入性能测试结果(768维向量,Docker Compose 单机版):
┌──────────┬──────────┬──────────────┬──────────┐
│ 批次大小 │ 并发数 │ 吞吐(条/秒) │ P99延迟 │
├──────────┼──────────┼──────────────┼──────────┤
│ 100 │ 1 │ 2,000 │ 50ms │
│ 100 │ 4 │ 6,000 │ 80ms │
│ 1000 │ 1 │ 8,000 │ 130ms │
│ 1000 │ 4 │ 25,000 │ 200ms │
│ 5000 │ 1 │ 12,000 │ 420ms │
│ 5000 │ 4 │ 35,000 │ 600ms │
│ 10000 │ 1 │ 10,000 │ 1000ms │
│ 10000 │ 4 │ 30,000 │ 1500ms │
└──────────┴──────────┴──────────────┴──────────┘
最优配比:批次大小 1000~5000,并发数 4~8
#### 消息队列参数调优
```yaml
# Pulsar 调优参数
pulsar:
broker:
configData:
managedLedgerDefaultEnsembleSize: "3"
managedLedgerDefaultWriteQuorum: "3"
managedLedgerDefaultAckQuorum: "2"
dispatchThrottlingRatePerTopicInMsg: "50000"
DataNode 并发与刷盘策略
# Milvus DataNode 配置
dataNode:
config:
dataNode:
flushInsertBufferSize: 16777216 # 16MB 缓冲区
flushDeleteBufferSize: 4194304 # 4MB 删除缓冲区
channelWorkPoolSize: 16 # 并发处理Channel数
13.2 查询性能优化
索引参数调优方法论
调优步骤:
- 建立基准:使用 FLAT 索引测量 100% 召回率下的延迟
- 选择索引:根据数据规模和延迟要求选择索引类型
- 调优构建参数:先固定查询参数,调优构建参数(M、efConstruction、nlist)
- 调优查询参数:在构建参数确定后,调优查询参数(efSearch、nprobe)
- 验证召回率:确保召回率满足业务要求
# 召回率测试脚本
def measure_recall(client, collection_name, test_queries, ground_truth, k=10):
"""测量搜索召回率"""
total_recall = 0
for query, gt_ids in zip(test_queries, ground_truth):
results = client.search(
collection_name=collection_name,
data=[query],
limit=k,
search_params={"metric_type": "COSINE", "params": {"ef_search": 128}}
)
result_ids = set(hit['id'] for hit in results[0])
gt_set = set(gt_ids[:k])
recall = len(result_ids & gt_set) / k
total_recall += recall
return total_recall / len(test_queries)
查询参数动态调优
# 根据业务场景动态调整 efSearch
def adaptive_search(client, collection_name, query_vector, target_latency_ms=10):
"""自适应搜索:在延迟约束下最大化召回率"""
ef_search_values = [64, 128, 256, 512]
for ef in ef_search_values:
start = time.time()
results = client.search(
collection_name=collection_name,
data=[query_vector],
limit=10,
search_params={"metric_type": "COSINE", "params": {"ef_search": ef}}
)
latency = (time.time() - start) * 1000
if latency <= target_latency_ms:
return results, ef, latency
# 如果最低 ef_search 也超时,返回最后的结果
return results, ef, latency
缓存机制与数据预热
# 数据预热:在低峰期提前加载集合
def warmup_collection(client, collection_name, sample_queries, num_warmup=100):
"""用样本查询预热缓存"""
for i in range(num_warmup):
query = sample_queries[i % len(sample_queries)]
client.search(
collection_name=collection_name,
data=[query],
limit=1 # 只需触发缓存加载
)
print(f"预热完成: {num_warmup} 次查询")
# 首次查询通常较慢(冷缓存),预热后延迟降低 50%+
高并发场景下的查询优化
# 使用连接池和异步查询
import asyncio
from pymilvus import AsyncMilvusClient
async def concurrent_search():
"""异步并发查询"""
client = AsyncMilvusClient("http://localhost:19530")
# 并发发送多个查询
tasks = [
client.search(
collection_name="knowledge_base",
data=[query_vectors[i]],
limit=10
)
for i in range(100)
]
results = await asyncio.gather(*tasks)
return results
# 高并发优化要点:
# 1. 使用异步客户端(AsyncMilvusClient)
# 2. 增加 Proxy 副本数
# 3. 增加 QueryNode 副本数
# 4. 调整 Proxy 的 maxReceiveSize 和 maxResultSize
13.3 内存优化
索引内存占用精准估算
内存估算公式:
HNSW (FP32):
总内存 = N × (dim × 4 + M × 2 × 8) bytes
HNSW (Int8, 2.6新增):
总内存 = N × (dim × 1 + M × 2 × 8) bytes → 内存减半
IVF_FLAT:
总内存 = N × dim × 4 + nlist × dim × 4 bytes
IVF_RABITQ (2.6新增):
总内存 = N × (dim / 8) + nlist × dim × 4 bytes → 内存降72%
+ SQ8精排: N × dim × 1 bytes
DiskANN:
内存 = N × PQ_code_size + N × graph_degree × 8 bytes
(PQ_code_size 通常为 16~32 bytes)
示例:1000万条 768维向量
HNSW (FP32, M=16): ~32 GB
HNSW (Int8, M=16): ~16 GB (2.6, 内存减半)
IVF_FLAT: ~29 GB
IVF_RABITQ (1-bit): ~2.3 GB (2.6, 内存降72%)
IVF_RABITQ + SQ8精排: ~9.5 GB (2.6, 召回率~95%)
DiskANN: ~1.5 GB (内存) + SSD 存储
2.6 内存优化方案对比
| 方案 | 内存占用 | 召回率 | 查询速度 | 适用场景 |
|---|---|---|---|---|
| HNSW (FP32) | 100% | 99%+ | 极快 | 延迟敏感 |
| HNSW (Int8) | ~50% | 98%+ | 极快 | 延迟敏感+成本优化 |
| IVF_RABITQ | ~8% | ~85% | 快 | 成本敏感 |
| IVF_RABITQ + SQ8 | ~28% | ~95% | 快 | 均衡 |
| DiskANN | ~5% | 95%+ | 中 | 超大规模 |
QueryNode 内存配置最佳实践
# QueryNode 内存配置原则
queryNode:
resources:
requests:
memory: "计算值 × 1.5" # 留50%余量
limits:
memory: "计算值 × 2" # 留100%余量
# 计算公式:
# 总内存 = 索引内存 + Growing段内存 + 标量数据内存 + 系统开销
# 系统开销 ≈ 2~4 GB
OOM 问题根因与规避方案
OOM 常见原因:
1. 加载的段总大小超过 QueryNode 内存
2. Growing Segment 积压过多
3. 查询中间结果占用大量内存
4. 标量数据未压缩
规避方案:
1. 启用分层存储,冷数据按需加载
2. 使用 DiskANN 减少内存占用
3. 使用量化压缩(SQ8/PQ/Float16/Int8/RaBitQ)
4. 使用 IVF_RABITQ 替代 IVF_FLAT/IVF_SQ8(2.6推荐)
5. 限制单次查询的 top_k 和返回字段
6. 增加 QueryNode 副本数分摊内存
7. 设置合理的内存水位告警(85%)
13.4 存储与索引优化
对象存储性能调优
# MinIO 性能优化
minio:
config:
MINIO_API_REQUESTS_MAX: 1600
MINIO_API_REQUESTS_DEADLINE: "2m"
persistence:
storageClass: local-ssd # 使用本地SSD
段合并策略优化
# Compaction 策略配置
extraConfigFiles:
user.yaml: |
dataCoord:
compaction:
enableAutoCompaction: true
compactionInterval: 60 # 每60秒检查一次
minSegmentToMerge: 3 # 最少3个段才触发合并
maxSegmentToMerge: 30 # 最多合并30个段
targetSegmentSize: 536870912 # 目标段大小512MB
13.5 混合检索性能优化
过滤下推优化
# 高选择性过滤:先过滤再搜索(Milvus 自动优化)
# 当过滤条件能排除 >90% 数据时,性能最优
# 优化前:低选择性过滤
results = client.search(
collection_name="knowledge_base",
data=[query_vector],
filter='year > 2020', # 只排除少量数据
limit=10
)
# 优化后:增加更精确的过滤条件
results = client.search(
collection_name="knowledge_base",
data=[query_vector],
filter='year > 2020 && category == "技术"', # 更高选择性
limit=10
)
复合过滤场景的索引设计
# 为高频过滤字段创建标量索引
index_params.add_index(field_name="category", index_type="INVERTED")
index_params.add_index(field_name="year", index_type="STL_SORT")
# 使用分区键加速等值过滤
schema.add_field(
field_name="category",
datatype=DataType.VARCHAR,
max_length=64,
is_partition_key=True # 等值过滤自动分区裁剪
)
第 14 章 大规模向量系统架构设计
14.1 不同量级的方案选型
百万级向量:单机优化方案
架构:Docker Compose 单机版
硬件:16核 CPU, 64GB 内存, 1TB SSD
索引:HNSW (M=16, efConstruction=256)
预期性能:
- 写入: ~10,000 条/秒
- 查询: P99 < 5ms, QPS ~500
- 内存: ~5 GB (768维, 100万条)
千万级向量:小规模集群方案
架构:K8s 3~5 节点集群
硬件:每节点 16核, 64GB 内存, 2TB NVMe
组件配置:
- Proxy ×2
- QueryNode ×3 (每节点32GB)
- DataNode ×2
- IndexNode ×1
- etcd ×3
- MinIO ×4
索引:HNSW (M=16, efConstruction=256)
预期性能:
- 写入: ~50,000 条/秒
- 查询: P99 < 10ms, QPS ~1000
- 内存: ~50 GB (768维, 1000万条)
亿级向量:标准分布式集群方案
架构:K8s 10+ 节点集群
硬件:每节点 32核, 128GB 内存, 4TB NVMe
组件配置:
- Proxy ×4
- QueryNode ×6 (每节点64GB)
- DataNode ×4
- IndexNode ×2
- etcd ×5
- Pulsar Broker ×3
- MinIO ×8
索引:DiskANN + HNSW(热数据)
分层存储:热数据内存,冷数据SSD
预期性能:
- 写入: ~200,000 条/秒
- 查询: P99 < 20ms, QPS ~2000
- 内存: ~200 GB (768维, 1亿条, 含DiskANN压缩)
十亿级向量:超大规模分层方案
架构:多集群 + Zilliz Cloud
设计要点:
1. 按业务分集群(如按地域、按业务线)
2. 热数据集群:HNSW,内存存储
3. 温数据集群:DiskANN,SSD存储
4. 冷数据归档:对象存储
5. 全局路由层:根据请求路由到对应集群
6. 读写分离:写集群 + 多读副本
预期性能:
- 写入: ~500,000 条/秒(多集群并行)
- 查询: P99 < 50ms, QPS ~5000
- 总内存: ~1 TB (10亿条, DiskANN压缩)
14.2 分片策略设计
哈希分片原理
Milvus 默认使用哈希分片,将主键哈希值映射到不同的 Shard(写入通道):
主键 → Hash(主键) % shards_num → Shard ID
优点:
- 数据均匀分布
- 写入负载均衡
- 实现简单
局限:
- 无法按业务逻辑分片
- 跨分片查询需要聚合
- 分片数量不可修改
分片数量规划原则
分片数量 = max(写入QPS / 单分片写入QPS, 1)
单分片写入QPS参考值:
- Docker Compose: ~10,000 条/秒
- K8s 集群: ~30,000 条/秒
示例:
目标写入QPS = 100,000 条/秒
单分片QPS = 30,000 条/秒
分片数量 = ceil(100000/30000) = 4
14.3 读写分离架构
┌──────────────┐
│ 写入请求 │
└──────┬───────┘
│
┌──────▼───────┐
│ Proxy (写) │
└──────┬───────┘
│
┌────────────┼────────────┐
│ │ │
┌─────▼────┐ ┌────▼─────┐ ┌───▼──────┐
│ DataNode │ │ DataNode │ │ DataNode │
└─────┬────┘ └────┬─────┘ └───┬──────┘
│ │ │
└────────────┼────────────┘
│
对象存储 (MinIO/S3)
│
┌────────────┼────────────┐
│ │ │
┌─────▼────┐ ┌────▼─────┐ ┌───▼──────┐
│QueryNode │ │QueryNode │ │QueryNode │ ← 读副本
│(主) │ │(副本1) │ │(副本2) │
└─────┬────┘ └────┬─────┘ └───┬──────┘
│ │ │
└────────────┼────────────┘
│
┌──────▼───────┐
│ Proxy (读) │
└──────┬───────┘
│
┌──────▼───────┐
│ 查询请求 │
└──────────────┘
# 配置读副本
client.load_collection(
collection_name="knowledge_base",
replica_number=3 # 3个读副本
)
# 查询自动路由到不同副本,实现读负载均衡
14.4 冷热分层架构
冷热数据划分标准:
- 热数据:最近7天访问过的数据,占总量约10~20%
- 温数据:7~30天未访问的数据,占总量约30%
- 冷数据:30天以上未访问的数据,占总量约50~60%
Milvus 分层存储实现:
┌─────────────────────────────────────┐
│ QueryNode 内存 │
│ ┌─────────────────────────────┐ │
│ │ 热数据:HNSW 索引 + 原始向量 │ │
│ │ 延迟: <5ms │ │
│ └─────────────────────────────┘ │
│ ┌─────────────────────────────┐ │
│ │ 温数据:PQ 压缩向量 │ │
│ │ 延迟: 5~10ms │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│ SSD 存储 │
│ ┌─────────────────────────────┐ │
│ │ 冷数据:DiskANN 索引 │ │
│ │ 延迟: 10~20ms │ │
│ └─────────────────────────────┘ │
└─────────────────────────────────────┘
成本收益:
- 内存需求降低 80%
- 查询延迟增加 2~5 倍
- 总成本降低 60~70%
14.5 多地域部署
全球部署架构:
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ 亚太区域 │ │ 欧洲区域 │ │ 美洲区域 │
│ Milvus集群 │ │ Milvus集群 │ │ Milvus集群 │
│ (本地数据) │ │ (本地数据) │ │ (本地数据) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└────────────────────┼────────────────────┘
│
┌────────▼────────┐
│ 全局路由层 │
│ (就近访问) │
└────────┬────────┘
│
┌────────▼────────┐
│ 对象存储同步 │
│ (跨地域复制) │
└─────────────────┘
关键设计:
1. 用户请求就近路由到最近的 Milvus 集群
2. 各区域数据通过对象存储跨地域复制
3. 写入请求路由到主区域,异步同步到从区域
4. 读请求从就近区域返回,接受短暂不一致
```---
# 第六阶段:生态集成与项目实战(Day 25-28)
## 第 15 章 多语言客户端开发
### 15.1 Python 客户端深度使用
#### PyMilvus API 进阶用法
```python
from pymilvus import MilvusClient, DataType, AnnSearchRequest, WeightedRanker, RRFRanker
# 连接管理
client = MilvusClient(
uri="http://localhost:19530",
token="root:Milvus",
db_name="default", # 指定数据库
timeout=10.0, # 超时时间
server_version="2.6.x" # 服务端版本
)
# 数据库管理
client.create_database("production")
client.using_database("production")
client.list_databases()
# 批量操作优化
def bulk_insert_optimized(client, collection_name, data, batch_size=5000):
"""优化的批量插入"""
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
try:
result = client.insert(collection_name=collection_name, data=batch)
yield result
except Exception as e:
print(f"批次 {i} 失败: {e}")
# 重试逻辑
result = client.insert(collection_name=collection_name, data=batch)
yield result
异步客户端使用
import asyncio
from pymilvus import AsyncMilvusClient
async def async_search_demo():
"""异步查询示例"""
client = AsyncMilvusClient("http://localhost:19530")
# 并发查询
queries = [query_vector_1, query_vector_2, query_vector_3]
tasks = [
client.search(
collection_name="knowledge_base",
data=[q],
limit=10,
output_fields=["text"]
)
for q in queries
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"查询 {i} 失败: {result}")
else:
print(f"查询 {i} 返回 {len(result[0])} 条结果")
await client.close()
asyncio.run(async_search_demo())
连接池与重试机制
from pymilvus import connections, Collection
import tenacity
# 连接池配置
connections.connect(
alias="default",
host="localhost",
port="19530",
user="root",
password="Milvus",
pool_size=10, # 连接池大小
max_retry=3, # 最大重试次数
retry_on_failure=True # 失败时重试
)
# 使用 tenacity 实现自定义重试
@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(multiplier=1, min=1, max=10),
retry=tenacity.retry_if_exception_type(Exception)
)
def search_with_retry(client, collection_name, query_vector, **kwargs):
return client.search(
collection_name=collection_name,
data=[query_vector],
**kwargs
)
15.2 Java / Go 客户端
Java 客户端生产级集成
// Maven 依赖
// <dependency>
// <groupId>io.milvus</groupId>
// <artifactId>milvus-sdk-java</artifactId>
// <version>2.6.18</version>
// </dependency>
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.service.vector.request.SearchReq;
import io.milvus.v2.service.vector.response.SearchResp;
// 连接配置
ConnectConfig config = ConnectConfig.builder()
.uri("http://localhost:19530")
.token("root:Milvus")
.build();
MilvusClientV2 client = new MilvusClientV2(config);
// 向量搜索
List<Float> queryVector = Arrays.asList(0.1f, 0.2f, ...);
SearchReq searchReq = SearchReq.builder()
.collectionName("knowledge_base")
.data(Collections.singletonList(queryVector))
.topK(10)
.filter("category == '技术'")
.outputFields(Arrays.asList("text", "category"))
.build();
SearchResp searchResp = client.search(searchReq);
Go 客户端
// go get github.com/milvus-io/milvus-sdk-go/v2
import (
"context"
"github.com/milvus-io/milvus-sdk-go/v2/client"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
)
// 连接
c, err := client.NewClient(context.Background(), client.Config{
Address: "localhost:19530",
})
if err != nil {
log.Fatal(err)
}
// 搜索
results, err := c.Search(
context.Background(),
"knowledge_base",
[]string{},
"category == '技术'",
[]string{"text", "category"},
[]entity.Vector{entity.FloatVector(queryVector)},
"embedding",
entity.COSINE,
10,
map[string]interface{}{"ef_search": 128},
)
15.3 Node.js 与 REST API
// npm install @zilliz/milvus2-sdk-node
const { MilvusClient } = require('@zilliz/milvus2-sdk-node');
const client = new MilvusClient({
address: 'localhost:19530',
token: 'root:Milvus'
});
// 搜索
const results = await client.search({
collection_name: 'knowledge_base',
data: [queryVector],
limit: 10,
output_fields: ['text', 'category'],
filter: 'category == "技术"'
});
第 16 章 AI 生态与 RAG 集成
16.1 LangChain 集成
Milvus 向量存储配置
from langchain_milvus import Milvus
from langchain_openai import OpenAIEmbeddings
# 创建 Milvus 向量存储
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = Milvus(
embedding_function=embeddings,
connection_args={
"uri": "http://localhost:19530",
},
collection_name="langchain_docs",
auto_id=True,
)
# 添加文档
from langchain_core.documents import Document
docs = [
Document(page_content="Milvus是开源向量数据库", metadata={"source": "wiki"}),
Document(page_content="RAG系统结合检索与生成", metadata={"source": "blog"}),
]
vector_store.add_documents(docs)
# 相似度搜索
results = vector_store.similarity_search(
query="向量数据库",
k=5,
expr='source == "wiki"' # Milvus 过滤表达式
)
混合检索管道搭建
from langchain_milvus import Milvus
from langchain_openai import OpenAIEmbeddings
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
# 创建向量存储(支持稠密+稀疏混合检索)
vector_store = Milvus(
embedding_function=embeddings,
connection_args={"uri": "http://localhost:19530"},
collection_name="hybrid_rag",
)
# RAG 链
template = """基于以下检索到的文档内容回答问题:
{context}
问题:{question}
请基于文档内容回答,如果文档中没有相关信息,请说明。"""
prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o")
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
rag_chain = (
{"context": vector_store.as_retriever(search_kwargs={"k": 5}) | format_docs,
"question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# 执行 RAG
answer = rag_chain.invoke("什么是向量数据库?")
16.2 LlamaIndex 集成
from llama_index.vector_stores.milvus import MilvusVectorStore
from llama_index.core import StorageContext, VectorStoreIndex, SimpleDirectoryReader
# 创建 Milvus 向量存储
vector_store = MilvusVectorStore(
uri="http://localhost:19530",
collection_name="llamaindex_docs",
dim=1536,
overwrite=False,
)
# 加载文档
documents = SimpleDirectoryReader("./docs").load_data()
# 构建索引
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
)
# 查询
query_engine = index.as_query_engine(similarity_top_k=5)
response = query_engine.query("向量数据库的核心优势是什么?")
print(response)
16.3 其他生态集成
Dify 低代码平台对接
Dify 配置 Milvus 步骤:
1. 进入 Dify 管理后台 → 知识库 → 数据源
2. 选择 "Milvus" 作为向量存储
3. 配置连接参数:
- Host: milvus-service.milvus.svc.cluster.local
- Port: 19530
- Database: default
- 用户名/密码(如果启用认证)
4. 创建知识库时选择 Milvus 作为存储后端
5. 上传文档,Dify 自动分块、向量化、存入 Milvus
16.4 RAG 系统最佳实践
文档分块 + 向量化流水线设计
from langchain_text_splitters import RecursiveCharacterTextSplitter
def build_rag_pipeline(docs_path, milvus_uri, embedding_model):
"""构建完整的 RAG 数据处理流水线"""
# 1. 文档加载
from langchain_community.document_loaders import DirectoryLoader
loader = DirectoryLoader(docs_path, glob="**/*.md")
documents = loader.load()
# 2. 文档分块
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # 每块500字符
chunk_overlap=50, # 块间重叠50字符
separators=["\n## ", "\n### ", "\n\n", "\n", "。", "!", "?"]
)
chunks = text_splitter.split_documents(documents)
# 3. 向量化并存储
vector_store = Milvus(
embedding_function=embedding_model,
connection_args={"uri": milvus_uri},
collection_name="rag_knowledge_base",
)
vector_store.add_documents(chunks)
return vector_store, len(chunks)
检索效果评估
def evaluate_retrieval(vector_store, test_queries, ground_truth, k=10):
"""评估检索效果"""
recall_scores = []
mrr_scores = []
for query, relevant_ids in zip(test_queries, ground_truth):
results = vector_store.similarity_search(query, k=k)
result_ids = [doc.metadata.get("doc_id") for doc in results]
# Recall@K
relevant_set = set(relevant_ids)
result_set = set(result_ids[:k])
recall = len(relevant_set & result_set) / len(relevant_set) if relevant_set else 0
recall_scores.append(recall)
# MRR (Mean Reciprocal Rank)
mrr = 0
for i, rid in enumerate(result_ids):
if rid in relevant_set:
mrr = 1 / (i + 1)
break
mrr_scores.append(mrr)
return {
"Recall@K": sum(recall_scores) / len(recall_scores),
"MRR": sum(mrr_scores) / len(mrr_scores),
}
第 17 章 实战项目
项目一:企业级知识库 RAG 系统
需求分析与架构设计
需求:
- 支持百万级文档的语义检索
- 支持关键词+语义混合检索
- 支持权限过滤(部门级隔离)
- 支持 RAG 问答
- 查询延迟 P99 < 200ms
架构设计:
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ 前端UI │ → │ API网关 │ → │ RAG服务 │ → │ LLM API │
│ │ │ (Nginx) │ │ (Python) │ │ (GPT-4o) │
└──────────┘ └──────────┘ └────┬─────┘ └──────────┘
│
┌──────▼──────┐
│ Milvus │
│ (K8s集群) │
└─────────────┘
向量库 Schema 与索引设计
from pymilvus import MilvusClient, DataType, Function, FunctionType
schema = MilvusClient.create_schema(auto_id=True, enable_dynamic_field=True)
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="doc_id", datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name="chunk_text", datatype=DataType.VARCHAR, max_length=4096)
schema.add_field(field_name="department", datatype=DataType.VARCHAR, max_length=64,
is_partition_key=True) # 按部门分区
schema.add_field(field_name="source", datatype=DataType.VARCHAR, max_length=256)
schema.add_field(field_name="created_at", datatype=DataType.INT64)
schema.add_field(field_name="dense_vector", datatype=DataType.FLOAT_VECTOR, dim=1024)
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)
# BM25 函数
schema.add_function(Function(
name="bm25_func",
input_field_names=["chunk_text"],
output_field_names=["sparse_vector"],
function_type=FunctionType.BM25,
))
# 索引
index_params = client.prepare_index_params()
index_params.add_index(field_name="dense_vector", index_type="HNSW",
metric_type="COSINE", params={"M": 16, "efConstruction": 256})
index_params.add_index(field_name="sparse_vector", index_type="SPARSE_INVERTED_INDEX",
metric_type="BM25")
index_params.add_index(field_name="source", index_type="INVERTED")
client.create_collection("enterprise_kb", schema=schema, index_params=index_params,
num_partitions=64)
权限过滤与检索问答完整实现
def search_with_permission(client, query_text, user_department, top_k=10):
"""带权限过滤的混合检索"""
dense_vector = dense_model.encode(query_text).tolist()
# 稠密向量搜索
req_dense = AnnSearchRequest(
data=[dense_vector],
anns_field="dense_vector",
param={"metric_type": "COSINE", "params": {"ef_search": 128}},
limit=top_k * 2,
expr=f'department == "{user_department}"' # 权限过滤
)
# BM25 全文搜索
req_sparse = AnnSearchRequest(
data=[query_text],
anns_field="sparse_vector",
param={"metric_type": "BM25"},
limit=top_k * 2,
expr=f'department == "{user_department}"'
)
results = client.hybrid_search(
collection_name="enterprise_kb",
reqs=[req_dense, req_sparse],
ranker=RRFRanker(k=60),
limit=top_k,
output_fields=["chunk_text", "doc_id", "source", "department"]
)
return results
项目二:亿级图像相似度检索系统
集群规模规划与分片设计
数据规模:1亿张图片,512维 CLIP 向量
存储估算:
- 向量数据: 1亿 × 512 × 4 = 200 GB
- HNSW 索引: ~300 GB
- 总内存需求: ~500 GB
集群规划:
- QueryNode ×8 (每节点 64GB 内存)
- DataNode ×4
- IndexNode ×2
- Proxy ×4
- 分片数: 8
- 索引: HNSW (M=32, efConstruction=512)
检索性能优化与压测
import asyncio
import time
from pymilvus import AsyncMilvusClient
async def benchmark_search(concurrency=100, total_queries=10000):
"""搜索性能压测"""
client = AsyncMilvusClient("http://localhost:19530")
sem = asyncio.Semaphore(concurrency)
latencies = []
async def single_query(query_vector):
async with sem:
start = time.time()
await client.search(
collection_name="image_search",
data=[query_vector],
limit=20,
output_fields=["image_path"]
)
latencies.append((time.time() - start) * 1000)
# 生成测试查询向量
query_vectors = [np.random.randn(512).tolist() for _ in range(total_queries)]
start = time.time()
tasks = [single_query(qv) for qv in query_vectors]
await asyncio.gather(*tasks)
total_time = time.time() - start
latencies.sort()
print(f"QPS: {total_queries / total_time:.1f}")
print(f"P50: {latencies[len(latencies)//2]:.1f}ms")
print(f"P99: {latencies[int(len(latencies)*0.99)]:.1f}ms")
项目三:个性化推荐召回系统
# 用户/物品向量设计
schema = MilvusClient.create_schema()
schema.add_field(field_name="item_id", datatype=DataType.VARCHAR, max_length=64, is_primary=True)
schema.add_field(field_name="item_type", datatype=DataType.VARCHAR, max_length=32,
is_partition_key=True)
schema.add_field(field_name="category", datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=256)
schema.add_field(field_name="created_at", datatype=DataType.INT64)
# 实时向量更新方案
def update_item_vector(client, item_id, new_embedding):
"""更新物品向量(使用 upsert)"""
client.upsert(
collection_name="recommendation_items",
data=[{
"item_id": item_id,
"embedding": new_embedding,
}]
)
# 召回查询
def recall_items(client, user_vector, category=None, top_k=200):
"""个性化召回"""
filter_expr = ''
if category:
filter_expr = f'category == "{category}"'
results = client.search(
collection_name="recommendation_items",
data=[user_vector],
limit=top_k,
filter=filter_expr,
output_fields=["item_id", "item_type", "category"]
)
return results
项目四:多租户 SaaS 向量检索平台
# 多租户隔离方案:分区键隔离
schema = MilvusClient.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="tenant_id", datatype=DataType.VARCHAR, max_length=64,
is_partition_key=True) # 租户ID作为分区键
schema.add_field(field_name="content", datatype=DataType.VARCHAR, max_length=4096)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=768)
# 查询时自动分区裁剪
def tenant_search(client, tenant_id, query_vector, top_k=10):
"""租户隔离的向量检索"""
results = client.search(
collection_name="saas_platform",
data=[query_vector],
limit=top_k,
filter=f'tenant_id == "{tenant_id}"', # 自动分区裁剪
output_fields=["content"]
)
return results
# 资源配额设计
# 通过 Resource Group 实现租户资源隔离
def setup_tenant_resources(client, tenant_id, node_num=1):
"""为租户分配计算资源"""
rg_name = f"rg_{tenant_id}"
client.create_resource_group(rg_name, config={"node_num": node_num})
return rg_name
第七阶段:底层精通与前沿(Day 29-30)
第 18 章 底层原理与源码导读
18.1 存储引擎原理
Segment 文件内部结构
Segment 目录结构:
segment_{id}/
├── insert_log/
│ ├── {field_id}_1.binlog # 字段数据(列式存储)
│ ├── {field_id}_2.binlog
│ └── ...
├── delta_log/
│ └── {field_id}_delete.binlog # 删除标记
├── stats_log/
│ └── {field_id}_stats.binlog # 统计信息(min/max/null_count)
└── index_files/
└── {index_id}/
└── index_file.bin # 索引文件
Binlog 格式:
┌──────────┬──────────┬──────────┬──────────┐
│ Header │ Event 1 │ Event 2 │ ... │
│ (魔数+版本) │ (数据块1) │ (数据块2) │ │
└──────────┴──────────┴──────────┴──────────┘
向量数据存储格式
浮点向量存储(列式):
每个向量字段一个 Binlog 文件
数据按行存储,每行 dim × 4 bytes
支持批量读取,SIMD 友好
二值向量存储:
每行 dim / 8 bytes
位运算加速汉明距离计算
稀疏向量存储:
CSR 格式(Compressed Sparse Row)
只存储非零元素的位置和值
18.2 ANN 算法深度
HNSW 分层导航小世界图数学原理
HNSW 的核心思想是构建一个多层概率图,每层的节点以指数递减的概率出现:
层级分配概率:P(level = l) = (1/M) ^ l
层级0:所有节点(100%)
层级1:约 1/M 的节点
层级2:约 1/M² 的节点
层级3:约 1/M³ 的节点
...
查询算法:
1. 从最高层的入口点开始
2. 在当前层贪心搜索最近邻
3. 找到当前层最近邻后,跳到下一层
4. 重复直到层级0
5. 在层级0执行精确的 beam search
插入算法:
1. 随机分配层级 l(按上述概率)
2. 从最高层到 l+1 层:贪心搜索最近邻
3. 从 l 层到 0 层:搜索 efConstruction 个最近邻
4. 在每层选择最近的 M 个邻居建立连接
5. 如果邻居的连接数超过 Mmax,执行启发式剪枝
PQ 乘积量化压缩原理
乘积量化(Product Quantization)步骤:
1. 将 d 维向量拆分为 m 个子向量
原始: [x₁, x₂, ..., x_d] → [x₁...x_d/m], [x_d/m+1...x_2d/m], ..., [x_(m-1)d/m+1...x_d]
2. 对每个子空间独立进行 K-means 聚类(K=2^nbits)
每个子空间生成 256 个聚类中心(nbits=8)
3. 编码:每个子向量用最近的聚类中心 ID 表示
原始子向量 → 1 byte 聚类中心 ID
4. 存储编码:m 个 byte 表示一个向量
压缩比: d × 4 / m = 768 × 4 / 16 = 192 倍
5. 距离计算(非对称距离):
查询向量与所有聚类中心预计算距离表
查表 + 求和得到近似距离
时间复杂度: O(m × K + m) per query
DiskANN 算法核心思想
DiskANN 关键创新:
1. 图结构存储在内存(PQ 压缩向量 + 邻居列表)
2. 原始向量存储在 SSD
3. 查询时:
a. 在内存中用 PQ 向量做粗排
b. 从 SSD 读取候选向量的原始数据
c. 用原始向量重新计算距离
d. 返回精确结果
4. Vamana 图构建算法:
a. 随机初始化图
b. 迭代优化:对每个节点,搜索其最近邻,建立更优连接
c. 剪枝:限制每个节点的最大连接数
5. 内存占用:
PQ 编码: m bytes/条 (通常 16~32 bytes)
邻居列表: R × 4 bytes/条 (R 通常 32~64)
总计: ~80~300 bytes/条 (vs 原始 3072 bytes)
18.3 分布式调度原理
负载均衡策略
QueryCoord 负载均衡策略:
1. 基于段分布的均衡
- 计算每个 QueryNode 的段数量和内存占用
- 将段从负载高的节点迁移到负载低的节点
- 迁移过程中保证至少一个副本可用
2. 基于副本的均衡
- 同一段的多个副本分布在不同节点
- 节点故障时自动切换到其他副本
3. 均衡触发条件
- 新节点加入集群
- 节点故障恢复
- 段数量变化
- 手动触发
故障检测与转移机制
心跳检测:
- 每个节点每 3 秒发送心跳
- Coord 连续 15 秒未收到心跳 → 标记节点为 Offline
- 触发故障转移流程
故障转移:
1. QueryCoord:将 Offline 节点的段重新分配
2. DataCoord:将 Offline 节点的 Channel 重新分配
3. IndexCoord:将 Offline 节点的索引任务重新分配
4. 自动恢复:节点重新上线后,Coord 自动分配任务
18.4 源码结构与扩展
项目代码结构总览
milvus/
├── cmd/ # 入口程序
│ ├── milvus/ # 主程序
│ └── components/ # 各组件入口
├── internal/ # 核心实现
│ ├── proxy/ # Proxy 节点
│ ├── rootcoord/ # RootCoord
│ ├── datacoord/ # DataCoord
│ ├── querycoord/ # QueryCoord
│ ├── indexcoord/ # IndexCoord
│ ├── datanode/ # DataNode
│ ├── querynode/ # QueryNode
│ ├── indexnode/ # IndexNode
│ ├── storage/ # 存储层
│ ├── msgstream/ # 消息流
│ └── core/ # C++ 核心引擎(SegCore)
├── pkg/ # 公共包
│ ├── common/ # 公共定义
│ ├── util/ # 工具函数
│ └── proto/ # Protobuf 定义
├── deployments/ # 部署配置
│ ├── docker/ # Docker 配置
│ └── helm/ # Helm Chart
└── tests/ # 测试
第 19 章 前沿技术与选型对比
19.1 向量数据库前沿技术
向量量化技术演进
量化技术演进路线:
SQ8 (2019) → PQ (2020) → OPQ (2021) → Residual PQ (2022) → LSQ (2023)
未来方向:
1. 学习型量化:用神经网络学习最优量化方案
2. 混合精度量化:不同维度使用不同精度
3. 自适应量化:根据数据分布自动选择量化策略
4. 量化感知训练:在模型训练时考虑量化损失
检索增强新范式
1. Agentic RAG:AI Agent 自主决定检索策略
2. Multi-modal RAG:文本+图像+音频多模态检索
3. Graph RAG:结合知识图谱的检索增强
4. Adaptive RAG:根据查询复杂度自适应选择检索策略
5. Streaming RAG:流式检索与生成
19.2 Milvus 未来路线图
Milvus 2.6 已实现 ✅ / 未来规划 🔮:
1. Serverless 架构 🔮
- 按查询付费,零成本待机
- 自动弹性伸缩
- 冷启动优化
2. 原生多模态能力增强 ✅+🔮
- ✅ Geometry 地理空间字段(2.6)
- ✅ Struct Array 多向量实体(2.6)
- 🔮 内置图像/音频/视频嵌入
- 🔮 跨模态检索原生支持
3. 湖仓一体集成 🔮
- 与 Iceberg/Hudi/Delta Lake 集成
- 向量数据湖方案
- 统一元数据管理
4. 更强的混合检索 ✅+🔮
- ✅ 向量+全文+结构化联合检索(2.6 BM25增强)
- ✅ 多语言 BM25 分析器(2.6)
- ✅ Path Index JSON过滤(2.6)
- 🔮 学习型排序
- 🔮 个性化检索
5. 成本优化 ✅(2.6 重大突破)
- ✅ RaBitQ 1-bit 量化(内存降72%)
- ✅ 冷热分层存储(成本降50%)
- ✅ Woodpecker 零磁盘WAL(运维降本)
- ✅ Int8 向量压缩(内存减半)
- ✅ CAGRA+Vamana 混合索引(GPU成本优化)
6. 架构演进 ✅+🔮
- ✅ StreamingNode 流批分离(2.6)
- ✅ Storage v2 存储引擎(2.6)
- ✅ 10万+集合多租户(2.6)
- ✅ Nullable Vector / Add Field(2.6)
- 🔮 Milvus 3.0 进一步架构升级
19.3 主流向量数据库横向对比
| 维度 | Milvus 2.6 | Qdrant | Weaviate | Pinecone | Chroma |
|---|---|---|---|---|---|
| 开源 | ✅ | ✅ | ✅ | ❌ | ✅ |
| 分布式 | ✅ | 有限 | ✅ | ✅ | ❌ |
| 存算分离 | ✅ | ❌ | ❌ | ✅ | ❌ |
| 索引类型 | 12+ | 4 | 3 | 有限 | 2 |
| 混合检索 | ✅ | ✅ | ✅ | 有限 | ❌ |
| 全文检索 | ✅(BM25+多语言) | ❌ | ✅ | ❌ | ❌ |
| GPU加速 | ✅ | ❌ | ❌ | ✅ | ❌ |
| 磁盘索引 | ✅(DiskANN) | ❌ | ❌ | ✅ | ❌ |
| 1-bit量化 | ✅(RaBitQ) | ❌ | ❌ | ❌ | ❌ |
| JSON路径索引 | ✅(Path Index) | ❌ | ❌ | ❌ | ❌ |
| 零磁盘WAL | ✅(Woodpecker) | ❌ | ❌ | ❌ | ❌ |
| 地理空间字段 | ✅(Geometry) | ❌ | ❌ | ❌ | ❌ |
| 动态添加字段 | ✅(Add Field) | ❌ | ❌ | ❌ | ❌ |
| 可空向量 | ✅(Nullable) | ❌ | ❌ | ❌ | ❌ |
| 多语言SDK | 5+ | 4+ | 4+ | 3+ | 2 |
| 社区规模 | 最大(40K+ Stars) | 中等 | 中等 | 商业 | 小 |
| 生产成熟度 | 高 | 中 | 中 | 高 | 低 |
| 十亿级支持 | ✅ | ❌ | 有限 | ✅ | ❌ |
| 多租户集合数 | 10万+ | 有限 | 有限 | 有限 | ❌ |
选型决策树:
是否需要分布式?
├── 否 → 数据规模?
│ ├── < 10万 → Chroma
│ └── 10万~100万 → Qdrant
└── 是 → 是否需要全托管?
├── 是 → Pinecone 或 Zilliz Cloud
└── 否 → 是否需要混合检索?
├── 是 → Milvus(最佳选择)
└── 否 → Milvus 或 Weaviate
第 20 章 避坑指南与最佳实践
20.1 设计阶段避坑
Schema 设计常见错误
| 错误 | 后果 | 正确做法 |
|---|---|---|
| VARCHAR 长度设置过大 | 浪费内存 | 根据实际最大长度设置 |
| 未设置分区键 | 全集扫描 | 高频过滤字段设为分区键 |
| 过多标量字段 | 内存浪费 | 低频字段用动态字段 |
| 主键选择不当 | 性能问题 | 优先 INT64 自增主键 |
| 向量维度选择过大 | 内存翻倍 | 选择合适的嵌入模型 |
索引选型误区
| 误区 | 正确理解 |
|---|---|
| “HNSW 总是最好的” | 亿级数据内存可能不够,考虑 DiskANN |
| “PQ 压缩不影响召回” | PQ 可能导致 5~15% 召回率下降 |
| “GPU 索引总是更快” | 单条查询 GPU 开销更大,批量查询才占优 |
| “索引参数越大越好” | M/efConstruction 过大导致构建慢、内存高 |
分片数量规划失误
常见失误:
1. 分片数过少 → 写入吞吐瓶颈
2. 分片数过多 → 协调开销增大,资源浪费
3. 创建后无法修改 → 必须提前规划
正确做法:
1. 根据写入QPS预估分片数
2. 预留 50% 的写入余量
3. 使用公式:分片数 = ceil(目标写入QPS / 单分片QPS) × 1.5
20.2 开发阶段避坑
写入性能常见坑
| 坑 | 原因 | 解决方案 |
|---|---|---|
| 单条写入 | 网络开销大 | 批量写入(1000条/批) |
| 批次过大 | 内存溢出 | 控制批次大小 < 5000 |
| 无并发 | 吞吐低 | 使用多线程/协程并发写入 |
| 频繁 upsert | 产生大量删除标记 | 减少 upsert,定期 Compaction |
| 未等 Flush | 数据可能丢失 | 关键数据确认 Flush |
查询过滤常见坑
| 坑 | 原因 | 解决方案 |
|---|---|---|
| 低选择性过滤 | 过滤后数据仍很多 | 增加过滤条件提高选择性 |
| JSON 深层过滤 | 性能差 | 高频字段提升为静态字段 |
| 未建标量索引 | 全集扫描 | 为过滤字段创建 INVERTED 索引 |
| offset 过大 | 深度分页性能差 | 使用游标分页 |
数据一致性常见坑
| 坑 | 原因 | 解决方案 |
|---|---|---|
| 写入后立即查不到 | 一致性级别太低 | 使用 Session 或 Bounded 级别 |
| 查询结果不一致 | 多副本间延迟 | 使用 Strong 级别(牺牲性能) |
| 删除后仍能查到 | 软删除延迟 | 触发 Compaction 加速清理 |
20.3 生产运维避坑
集群部署配置坑
| 坑 | 后果 | 解决方案 |
|---|---|---|
| etcd 单节点 | 元数据丢失 | 至少 3 节点集群 |
| MinIO 单节点 | 数据丢失 | 至少 4 节点分布式 |
| 未配置资源限制 | 节点 OOM | 设置 Requests/Limits |
| 未配置亲和性 | 调度不均衡 | 配置节点亲和性和反亲和 |
| 未启用认证 | 安全风险 | 启用 RBAC 和 TLS |
内存与资源配置坑
常见问题:
1. QueryNode 内存不足 → OOM Kill
解决:精确估算内存,预留50%余量
2. IndexNode 内存不足 → 索引构建失败
解决:IndexNode 内存 ≥ 最大段大小 × 2
3. etcd 磁盘不足 → 集群不可用
解决:定期压缩,监控磁盘使用
4. MinIO 磁盘不足 → 写入失败
解决:监控磁盘使用,设置扩容告警
升级与迁移风险
升级前检查清单:
□ 备份 etcd 数据
□ 备份对象存储数据
□ 阅读升级文档和变更日志
□ 在测试环境验证升级
□ 准备回滚方案
□ 通知相关团队
□ 选择低峰期执行
迁移前检查清单:
□ 评估数据量和迁移时间
□ 验证目标集群版本兼容性
□ 测试迁移工具
□ 制定数据一致性验证方案
□ 准备回滚方案
20.4 大规模场景避坑
亿级数据扩容坑
| 坑 | 解决方案 |
|---|---|
| 扩容后段分布不均 | 等待 QueryCoord 自动均衡 |
| 新节点内存不足 | 预先计算内存需求 |
| 扩容期间查询超时 | 分批迁移段,避免同时迁移过多 |
| 索引重建耗时过长 | 使用 GPU_CAGRA 加速索引构建 |
冷热分层常见问题
| 问题 | 解决方案 |
|---|---|
| 冷数据查询延迟高 | 预热常用冷数据到内存 |
| 分层策略不合理 | 根据访问频率动态调整 |
| 冷热切换抖动 | 设置缓冲区,避免频繁切换 |
| 分层存储配置复杂 | 参考官方文档,逐步启用 |
高可用容灾误区
| 误区 | 正确理解 |
|---|---|
| “多副本就是高可用” | 还需要自动故障检测和转移 |
| “同城双活 = 零停机” | 切换期间可能有短暂不可用 |
| “数据备份 = 容灾” | 还需要定期演练恢复流程 |
| “云服务不需要容灾” | 云服务也可能出故障,需要跨区域部署 |
附录
A. Milvus 2.6.18 常用配置参数速查
| 配置项 | 默认值 | 说明 |
|---|---|---|
common.security.authorizationEnabled |
false | 启用 RBAC |
queryNode.lazyLoadEnabled |
false | 启用分层存储 |
queryNode.loadingMemoryPortion |
0.5 | 内存中热数据比例 |
dataCoord.compaction.enableAutoCompaction |
true | 自动 Compaction |
proxy.maxReceiveSize |
67108864 | Proxy 最大接收大小 |
proxy.maxResultSize |
104857600 | Proxy 最大返回大小 |
tieredStorage.enabled |
false | 启用冷热分层存储(2.6新增) |
tieredStorage.hotDataTTL |
7d | 热数据保留时间(2.6新增) |
woodpecker.enabled |
false | 启用 Woodpecker WAL(2.6新增) |
streamingNode.enabled |
true | 启用流式节点(2.6新增) |
B. 性能基准参考
| 数据规模 | 维度 | 索引 | QPS | P99延迟 | 内存 |
|---|---|---|---|---|---|
| 100万 | 768 | HNSW | 2000 | 5ms | 3.2GB |
| 1000万 | 768 | HNSW | 1500 | 10ms | 32GB |
| 1000万 | 768 | IVF_RABITQ+SQ8 | 6000 | 8ms | 9.5GB |
| 1亿 | 768 | DiskANN | 1000 | 20ms | 50GB |
| 1亿 | 768 | IVF_RABITQ+SQ8 | 4000 | 25ms | 95GB |
C. 常用命令速查
# 集群管理
kubectl get pods -n milvus
kubectl logs -f milvus-proxy-xxx -n milvus
kubectl scale deployment milvus-querynode --replicas=5 -n milvus
# Milvus 运维
helm upgrade milvus milvus/milvus -n milvus -f values.yaml
helm rollback milvus -n milvus
helm uninstall milvus -n milvus
# 数据备份
milvus-backup create -n backup_name
milvus-backup restore -n backup_name
# 调试工具
birdwatcher --etcd=127.0.0.1:2379
D. 推荐学习资源
- 官方文档:milvus.io/docs
- GitHub 仓库:github.com/milvus-io/milvus
- Milvus 社区:slack.milvus.io
- Zilliz Cloud 文档:docs.zilliz.com
- Milvus Bootcamp:github.com/milvus-io/bootcamp
🎉 恭喜你完成了 2026 精通 Milvus 向量库的 30 天学习之旅!
从向量数据库基础概念到分布式架构深度解析,从单机开发到亿级生产部署,你已经掌握了 Milvus 的全栈知识体系。
接下来,建议你:
- 在实际项目中应用所学知识
- 参与 Milvus 开源社区贡献
- 持续关注 Milvus 版本更新和新特性
- 分享你的实践经验,帮助更多人成长
评论区