侧边栏壁纸
  • 累计撰写 62 篇文章
  • 累计创建 15 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

精通 Milvus 向量库:从入门到生产级实战

温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

精通 Milvus 向量库:从入门到生产级实战

基于 Milvus 2.6.18 LTS 长期支持版本 | 覆盖开源社区版 + Zilliz Cloud 企业版


前言:教程定位与学习路径

0.1 适用人群

本教程面向以下技术群体:

  • RAG 系统工程师:正在构建或优化检索增强生成系统,需要生产级向量检索能力
  • AI 应用后端开发者:需要在应用中集成语义搜索、相似度匹配等向量检索能力
  • 分布式系统运维工程师:负责 Milvus 集群的部署、监控、调优与故障排查
  • 数据库管理员:从传统数据库领域扩展到向量数据库管理
  • 架构师:需要构建亿级以上向量检索系统,进行技术选型与架构设计
  • 技术团队负责人:计划从原型向量库(Chroma、FAISS)迁移到生产级方案

0.2 前置知识

知识领域 最低要求 推荐掌握
编程语言 Python 或 Java 基础语法 Python 异步编程、类型注解
机器学习 了解向量、嵌入概念 理解 Embedding 模型原理、相似度计算
分布式系统 了解基本概念 Kubernetes 编排、微服务架构
大语言模型 了解 RAG 基本原理 LangChain/LlamaIndex 框架使用经验
数据库 SQL 基础 NoSQL 数据库使用经验

0.3 2026 年 Milvus 技术生态全景

版本演进

Milvus 1.0 (2019)    Milvus 2.0 (2021)    Milvus 2.5.x (2024-2025)   Milvus 2.6.x (2025-2026)
┌──────────────┐ ┌──────────────────┐ ┌──────────────────────┐ ┌────────────────────────────────┐
│  单机架构     │ │  云原生重构       │ │  全能力增强期         │ │  规模化降本期                   │
│  共享存储     │ │  存算分离         │ │  全文检索(BM25)      │ │  RaBitQ 1-bit量化(内存降72%)   │
│  仅支持CPU    │ │  分布式架构       │ │  稀疏向量原生支持     │ │  冷热分层存储(成本降50%)        │
│  有限索引类型  │ │  多索引体系       │ │  分层存储             │ │  Woodpecker零磁盘WAL           │
│              │ │  K8s原生部署      │ │  GPU CAGRA索引       │ │  Path Index(JSON过滤快100倍)   │
│              │ │  混合检索         │ │  增强RBAC与安全      │ │  StreamingNode流批分离架构      │
│              │ │                  │ │  动态Schema成熟      │ │  Int8向量/HNSW压缩             │
│              │ │                  │ │  Group By分组查询    │ │  Nullable Vector可空向量        │
│              │ │                  │ │                      │ │  Geometry地理空间字段           │
│              │ │                  │ │                      │ │  Struct Array结构化数组         │
│              │ │                  │ │                      │ │  动态添加字段(Add Field)        │
│              │ │                  │ │                      │ │  多语言BM25分析器               │
│              │ │                  │ │                      │ │  10万+集合多租户支持            │
└──────────────┘ └──────────────────┘ └──────────────────────┘ └────────────────────────────────┘

产品矩阵

产品 定位 数据规模 适用场景
Milvus Lite 嵌入式版,pip 安装 万级向量 本地开发、Jupyter 原型、单元测试
Milvus 开源版 分布式自托管 百万~十亿级向量 生产环境、私有化部署、定制化需求
Zilliz Cloud 全托管云服务 百万~百亿级向量 快速上线、免运维、弹性伸缩

核心能力边界

  • 百万级向量:单机 Docker Compose 即可胜任,HNSW 索引 P99 延迟 < 10ms
  • 千万级向量:小规模 K8s 集群(3~5 节点),需要合理的分片与索引设计
  • 亿级向量:标准分布式集群(10+ 节点),需要存算分离 + DiskANN + 冷热分层
  • 十亿级向量:超大规模集群 + Zilliz Cloud 企业版,需要多层级架构设计

行业定位

Milvus 是全球最主流的开源云原生向量数据库,截至 2026 年:

  • GitHub Star 数超过 40,000+
  • 全球生产部署超过 1,000 家企业
  • LF AI & Data 基金会毕业项目
  • Milvus 2.6 实现内存降低 72%、全文检索比 Elasticsearch 快 4 倍、JSON 过滤快 100 倍

0.4 学习路线规划

30 天系统化学习路径

Week 1 (Day 1-6)     Week 2 (Day 7-12)    Week 3 (Day 13-18)   Week 4 (Day 19-24)   Week 5 (Day 25-30)
┌─────────────┐     ┌─────────────┐      ┌─────────────┐      ┌─────────────┐      ┌─────────────┐
│ 入门基础     │     │ 进阶功能     │      │ 分布式架构   │      │ 性能优化     │      │ 生态实战     │
│ ·向量数据库  │     │ ·索引调优    │      │ ·架构深度    │      │ ·写入优化    │      │ ·多语言客户端│
│ ·环境搭建   │     │ ·混合检索    │      │ ·生产部署    │      │ ·查询优化    │      │ ·RAG集成    │
│ ·数据模型   │     │ ·高级特性    │      │ ·集群运维    │      │ ·大规模架构  │      │ ·项目实战    │
│ ·数据写入   │     │             │      │             │      │             │      │ ·底层原理    │
│ ·索引基础   │     │             │      │             │      │             │      │ ·前沿技术    │
│ ·查询检索   │     │             │      │             │      │             │      │             │
└─────────────┘     └─────────────┘      └─────────────┘      └─────────────┘      └─────────────┘

学习方法建议

  1. 理论 + 实操结合:每章先理解原理,再动手实践
  2. 渐进式深入:从单机到集群,从基础到高级
  3. 项目驱动:第七阶段的实战项目是检验学习成果的关键
  4. 社区参与:加入 Milvus 社区(Slack/Discord),参与讨论与贡献

第一阶段:入门基础(Day 1-2)

第 1 章 向量数据库与 Milvus 概述

1.1 向量数据库核心基础

1.1.1 向量嵌入(Embedding)与语义检索原理

向量嵌入是将非结构化数据(文本、图像、音频)转换为高维数值向量的过程。这些向量在数学空间中捕获了数据的语义信息——语义相近的数据在向量空间中距离更近。

原始数据                    嵌入模型                    向量空间
┌──────────┐            ┌──────────┐            ┌──────────────────┐
│ "猫在睡觉" │  ──────→  │ Embedding │  ──────→  │ [0.12, -0.34,    │
│          │            │  Model    │            │  0.56, 0.78, ...] │
└──────────┘            │(BGE/E5/   │            │  (768维/1024维)   │
┌──────────┐            │ OpenAI...)│            └──────────────────┘
│ "小猫打盹" │  ──────→  │          │  ──────→  ┌──────────────────┐
│          │            │          │            │ [0.11, -0.32,    │
└──────────┘            └──────────┘            │  0.55, 0.79, ...] │
                                                └──────────────────┘
                                                  ↑ 距离很近,语义相似

核心流程

  1. 嵌入生成:使用预训练模型(如 BGE-large、text-embedding-3-large)将原始数据转为向量
  2. 向量存储:将向量及其元数据存入向量数据库
  3. 相似度计算:查询时将查询文本同样转为向量,与库中向量计算相似度
  4. 结果排序:按相似度从高到低返回 Top-K 结果

1.1.2 相似度度量方式

度量方式 公式 值域 特点 适用场景
余弦距离 (COSINE) 1 - cos(a,b) [0, 2] 只关注方向,忽略幅度 文本语义搜索(最常用)
欧氏距离 (L2) √Σ(aᵢ-bᵢ)² [0, +∞) 关注绝对距离 图像检索、空间数据
内积 (IP) Σaᵢbᵢ (-∞, +∞) 值越大越相似 归一化向量等价余弦
汉明距离 (HAMMING) 异或位计数 [0, dim] 按位比较 二值向量、哈希检索
JACCARD |A∩B|/|A∪B| [0, 1] 集合相似度 稀疏二值向量
import numpy as np

def cosine_distance(a, b):
    """计算余弦距离"""
    dot_product = np.dot(a, b)
    norm_a = np.linalg.norm(a)
    norm_b = np.linalg.norm(b)
    cosine_similarity = dot_product / (norm_a * norm_b)
    return 1 - cosine_similarity

vec_cat = np.array([0.12, -0.34, 0.56, 0.78])
vec_kitten = np.array([0.11, -0.32, 0.55, 0.79])
vec_stock = np.array([0.89, 0.12, -0.45, 0.33])

print(f"猫 vs 小猫: {cosine_distance(vec_cat, vec_kitten):.4f}")  # ≈ 0.002
print(f"猫 vs 股票: {cosine_distance(vec_cat, vec_stock):.4f}")   # ≈ 1.2

重要提示:当向量已经 L2 归一化后,内积(IP)与余弦相似度等价,此时使用 IP 可以省去归一化计算,性能更优。

1.1.3 近似最近邻搜索(ANN)核心思想

精确最近邻搜索(Exact NN)时间复杂度 O(N),在亿级数据上不可接受。ANN 通过牺牲少量精度换取数量级的性能提升:

精确搜索 (Exact NN)              近似搜索 (ANN)
┌─────────────────────┐        ┌─────────────────────┐
│ 查询向量 vs 所有向量   │        │ 构建索引结构          │
│ 时间复杂度: O(N)      │        │ 查询时只搜索子集      │
│ 召回率: 100%         │   vs   │ 时间复杂度: O(log N)  │
│ 延迟: 高             │        │ 召回率: 95%~99.5%    │
│ 适用: 小数据集        │        │ 延迟: 低             │
└─────────────────────┘        │ 适用: 大规模生产      │
                                └─────────────────────┘

ANN 的三大技术路线

  1. 基于图的方法(HNSW、NSW):构建导航图,沿边逼近最近邻,高召回率
  2. 基于量化的方法(IVF_PQ、SQ8):压缩向量降低内存,牺牲部分精度
  3. 基于分区的方法(IVF_FLAT):将空间分区,只搜索相关分区

1.1.4 向量数据库 vs 传统数据库 vs 全文搜索引擎

维度 关系型数据库 全文搜索引擎 向量数据库
数据类型 结构化数据 文本数据 向量+标量混合
查询方式 精确匹配 关键词匹配 语义相似度
索引结构 B+ Tree 倒排索引 ANN 索引
查询结果 精确结果 排序结果 Top-K 近似结果
语义理解 有限(词频) 强(向量语义)
典型产品 MySQL/PG Elasticsearch Milvus/Qdrant
适用场景 事务处理 关键词搜索 语义检索/RAG

为什么不能只用 Elasticsearch?

  • ES 的向量检索(kNN)是后加功能,非原生设计,性能远不如专用向量库
  • ES 不支持稀疏向量+稠密向量混合检索
  • Milvus 2.5 的 BM25 全文检索在百万向量上仅需 6ms,ES 需要 200ms

1.1.5 向量检索的技术挑战:召回率、延迟、成本的三角平衡

                    召回率 (Recall)
                        ▲
                    ┌───┼───┐
                    │ 不可能 │
                    │ 三角形 │
                    └───┼───┘
                   ╱    │    ╲
    低成本 ◄────────────┼────────────► 低延迟
    (Cost)              │           (Latency)

Milvus 的破局之道

  • 多索引体系:不同场景选择不同索引
  • 混合检索:向量+标量联合过滤,缩小搜索空间
  • 存算分离:按需加载,降低内存成本
  • 分层存储:冷数据放磁盘,热数据放内存

1.2 Milvus 产品定位与发展

1.2.1 设计理念

  1. 云原生(Cloud Native):全面拥抱云原生架构,所有组件容器化,支持 K8s 编排
  2. 存算分离(Disaggregated Storage & Compute):存储和计算完全解耦,可独立扩展
  3. 水平扩展(Horizontal Scaling):无状态节点可随意扩缩容
  4. 高性能(High Performance):C++ 核心引擎,SIMD 指令优化,GPU 加速支持

1.2.2 2026 年 2.6.x 核心新特性总览

Milvus 2.6 是一次重大版本升级,围绕"降本、增效、规模化"三大主题,带来了突破性创新:

特性 说明 影响
RaBitQ 1-bit 量化 将 FP32 向量压缩为 1-bit 表示,内存降低 72%,查询速度提升 4 倍 亿级向量成本大幅下降
冷热分层存储 自动识别冷热数据,冷数据按需加载,存储成本降低 50% 大规模场景成本优化
Woodpecker 零磁盘 WAL 自研云原生 WAL 引擎,替代 Kafka/Pulsar,零磁盘架构 运维复杂度大幅降低
Path Index JSON 路径索引,嵌套元数据过滤速度提升 100 倍 复杂 JSON 过滤场景质变
StreamingNode 流批分离 新增流式节点,实时写入即时可查,写入吞吐大幅提升 实时性要求场景受益
Int8 向量压缩 HNSW 索引支持 Int8 向量存储,内存减半 高精度场景成本优化
Nullable Vector 向量字段支持可空,缺失嵌入不占存储,搜索自动跳过 Schema 演变更灵活
动态添加字段 (Add Field) 无需停机即可向已有集合添加新标量字段 业务迭代零停机
Geometry 地理空间字段 支持 POINT/LINESTRING/POLYGON 类型 LBS + 语义检索融合
Struct Array 结构化数组字段,支持元素级向量搜索 多向量实体建模更自然
多语言 BM25 分析器 自动识别语言并应用对应分词器,多语言全文检索 全球化业务场景
CAGRA+Vamana 混合索引 GPU 构建 + CPU 查询,降低 GPU 部署成本 GPU 成本优化
10 万+集合多租户 单集群支持超过 10 万个 Collection SaaS 多租户场景突破
Storage v2 新存储引擎,降低 IOPS 和内存占用 整体性能提升

2.5.x 继承特性:全文检索(BM25)、稀疏向量原生支持、GPU CAGRA 索引、增强 RBAC、Group By 分组查询等特性在 2.6.x 中全部继承并增强。

1.2.3 开源社区版 vs Zilliz Cloud 企业版 vs Milvus Lite

维度 Milvus Lite Milvus 开源版 Zilliz Cloud
部署方式 pip install Docker/K8s 自托管 全托管云服务
数据规模 万级 百万~十亿级 百万~百亿级
高可用 手动配置 自动
运维成本
成本 免费 服务器成本 按用量付费
SLA 自行保障 99.9%+
适用阶段 开发原型 生产环境 快速上线

1.3 核心优势与适用场景

1.3.1 核心技术优势

  1. 多索引体系:FLAT、HNSW、IVF 系列、DiskANN、GPU 索引、稀疏索引,覆盖所有场景
  2. 混合检索:向量检索+标量过滤+全文检索,一站式解决
  3. 存算分离:存储和计算独立扩展,资源利用率最优
  4. 弹性伸缩:无状态节点秒级扩缩容,有状态数据自动均衡
  5. 多语言 SDK:Python、Java、Go、Node.js、RESTful API
  6. 生态丰富:LangChain、LlamaIndex、Haystack、Dify 等深度集成

1.3.2 典型业务场景

场景一:RAG 知识库

用户提问 → Embedding → Milvus 语义检索 → Top-K 文档 → LLM 生成回答
                      ↘ BM25 关键词检索 ↗
                      (混合检索 + Rerank)

场景二:图像检索

上传图片 → CLIP/ViT Embedding → Milvus 向量检索 → 相似图片列表

场景三:推荐召回

用户行为 → 用户向量 → Milvus ANN 检索 → 候选物品集 → 排序 → 推荐

1.3.3 选型边界

选择 Milvus 的场景

  • 数据规模超过百万级向量
  • 需要分布式部署和高可用
  • 需要混合检索(向量+标量+全文)
  • 需要 GPU 加速或磁盘索引
  • 企业级安全与 RBAC 需求

可以考虑其他方案的场景

  • 原型验证阶段,数据量小 → Chroma(更轻量)
  • 单机部署,QPS 要求不高 → Qdrant(Rust 单机性能好)
  • 纯云服务,不想运维 → Pinecone(全托管,但成本高)
  • 仅需本地向量检索库 → FAISS(无数据库功能)—

第 2 章 环境搭建与快速上手

2.1 环境准备与版本选择

硬件要求

规模 CPU 内存 存储 GPU(可选)
开发测试 4 核+ 8 GB+ 50 GB SSD
百万级生产 8 核+ 32 GB+ 500 GB SSD
千万级生产 16 核+ 64 GB+ 1 TB NVMe 可选
亿级生产 32 核+ 128 GB+ 多 TB NVMe 推荐 A100/H100

软件依赖

组件 最低版本 推荐版本
Python 3.8+ 3.10+
Docker 20.10+ 24.0+
Kubernetes 1.20+ 1.28+
Helm 3.0+ 3.12+

版本选择原则

  • LTS 长期支持版(推荐):2.6.18 系列,稳定可靠,安全补丁持续更新,包含 RaBitQ、Woodpecker 等重大特性
  • 开发版:包含最新功能,但可能存在不稳定性,仅用于功能预览

2.2 四种部署方式快速启动

方式一:Milvus Lite 嵌入式版

最简单的入门方式,零依赖,适合本地开发和原型验证:

# 安装 PyMilvus(内置 Milvus Lite)
pip install pymilvus

# 使用 Milvus Lite,只需指定本地文件路径
from pymilvus import MilvusClient

client = MilvusClient("./milvus_demo.db")  # 数据存储在本地文件

# 使用完毕后关闭
client.close()

Milvus Lite 特点

  • 无需 Docker、无需服务器
  • 数据存储在本地 SQLite 文件中
  • 支持完整的 CRUD 和向量检索 API
  • 不支持分布式、高可用、GPU 索引
  • 适合 Jupyter Notebook 开发和单元测试

方式二:Docker Compose 单机版

适合开发测试环境,包含所有依赖组件:

# 下载 docker-compose 配置文件
wget https://github.com/milvus-io/milvus/releases/download/v2.6.18/milvus-standalone-docker-compose.yml -O docker-compose.yml

# 启动 Milvus 单机版
docker compose up -d

# 查看服务状态
docker compose ps

# 停止服务
docker compose down

配置文件核心参数说明

# docker-compose.yml 关键配置
services:
  etcd:
    # 元数据存储
    environment:
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1000
    volumes:
      - etcd_data:/etcd

  minio:
    # 对象存储
    environment:
      - MINIO_ACCESS_KEY=minioadmin
      - MINIO_SECRET_KEY=minioadmin
    volumes:
      - minio_data:/minio/data

  standalone:
    # Milvus 主服务
    ports:
      - "19530:19530"  # gRPC 端口
      - "9091:9091"    # Metrics 端口
    volumes:
      - milvus_data:/var/lib/milvus

方式三:Helm Chart Kubernetes 集群版

生产环境推荐方式,支持完整分布式部署:

# 添加 Milvus Helm 仓库
helm repo add milvus https://zilliztech.github.io/milvus-helm/
helm repo update

# 创建命名空间
kubectl create namespace milvus

# 部署 Milvus 分布式集群
helm install milvus milvus/milvus \
  --namespace milvus \
  --set cluster.enabled=true \
  --set proxy.service.type=LoadBalancer \
  --set queryNode.replicas=2 \
  --set dataNode.replicas=1 \
  --set indexNode.replicas=1

# 查看部署状态
kubectl get pods -n milvus

# 端口转发(本地访问)
kubectl port-forward svc/milvus 19530:19530 -n milvus

生产级 Helm values 配置示例

# values-prod.yaml (Milvus 2.6.18)
cluster:
  enabled: true

proxy:
  replicas: 2
  resources:
    requests:
      cpu: "2"
      memory: "4Gi"
    limits:
      cpu: "4"
      memory: "8Gi"

# 2.6 新增:StreamingNode 流式节点
streamingNode:
  replicas: 2
  resources:
    requests:
      cpu: "2"
      memory: "8Gi"
    limits:
      cpu: "4"
      memory: "16Gi"

queryNode:
  replicas: 3
  resources:
    requests:
      cpu: "4"
      memory: "16Gi"
    limits:
      cpu: "8"
      memory: "32Gi"

dataNode:
  replicas: 2
  resources:
    requests:
      cpu: "2"
      memory: "8Gi"

indexNode:
  replicas: 1
  resources:
    requests:
      cpu: "4"
      memory: "16Gi"

etcd:
  replicas: 3

minio:
  mode: distributed
  replicas: 4

# 2.6 新增:Woodpecker 零磁盘 WAL(可替代 Pulsar/Kafka)
woodpecker:
  enabled: true

# 如果仍需 Pulsar,可保留(与 Woodpecker 二选一)
pulsar:
  enabled: false

方式四:Zilliz Cloud 免费托管版

无需运维,快速体验:

  1. 访问 cloud.zilliz.com 注册账号
  2. 创建免费集群(Free Tier 提供 1 个 CU)
  3. 获取连接端点和 API Key
  4. 使用 PyMilvus 连接
from pymilvus import MilvusClient

# Zilliz Cloud 连接
client = MilvusClient(
    uri="https://in03-xxxxx.aws-us-west-2.vectordb.zillizcloud.com:19530",
    token="db_name:api_key"  # 数据库名:API密钥
)

2.3 可视化与命令行工具

Attu 可视化管理平台

Attu 是 Milvus 官方可视化管理工具,提供图形化界面管理集合、数据、索引:

# Docker 启动 Attu
docker run -d \
  --name attu \
  -p 3000:3000 \
  -e MILVUS_URL=host.docker.internal:19530 \
  zilliz/attu:latest

# 访问 http://localhost:3000

Attu 核心功能

  • 集合管理:创建、删除、查看集合详情
  • 数据浏览:查看、搜索数据
  • 索引管理:创建、查看索引
  • 系统监控:节点状态、系统信息

Milvus CLI 命令行工具

# 安装
pip install milvus-cli

# 连接 Milvus
milvus_cli > connect -h localhost -p 19530

# 常用命令
milvus_cli > list collections
milvus_cli > describe collection -c my_collection
milvus_cli > query -c my_collection -f "id > 100"

Birdwatcher 调试工具

用于 Milvus 运维调试的高级工具,可直接连接 etcd 查看元数据:

# 连接 etcd 查看元数据
./birdwatcher --etcd=127.0.0.1:2379 --rootPath=by-dev

# 查看集合信息
show collections
show segments

2.4 Hello World:第一个向量检索程序

Python 客户端 PyMilvus 安装

pip install pymilvus
# 验证安装
python -c "import pymilvus; print(pymilvus.__version__)"

连接 Milvus 服务的三种方式

from pymilvus import MilvusClient

# 方式一:使用 MilvusClient(推荐,2.4+ 新 API)
client = MilvusClient(
    uri="http://localhost:19530",  # Milvus 服务地址
    token="root:Milvus"            # 认证信息(如果启用)
)

# 方式二:Milvus Lite 本地文件
client = MilvusClient("./milvus_local.db")

# 方式三:Zilliz Cloud
client = MilvusClient(
    uri="https://xxx.vectordb.zillizcloud.com:19530",
    token="db_name:api_key"
)

全流程实操

from pymilvus import MilvusClient

# 1. 连接 Milvus
client = MilvusClient("http://localhost:19530")

# 2. 创建集合(快速模式)
client.create_collection(
    collection_name="demo_collection",
    dimension=768  # 向量维度,与嵌入模型输出维度一致
)

# 3. 插入数据
data = [
    {"id": 1, "vector": [0.1] * 768, "text": "Milvus是开源向量数据库", "category": "数据库"},
    {"id": 2, "vector": [0.2] * 768, "text": "向量检索用于语义搜索", "category": "搜索"},
    {"id": 3, "vector": [0.3] * 768, "text": "深度学习模型生成嵌入向量", "category": "AI"},
]

client.insert(collection_name="demo_collection", data=data)

# 4. 向量查询
query_vector = [0.15] * 768

results = client.search(
    collection_name="demo_collection",
    data=[query_vector],       # 支持批量查询
    limit=3,                   # 返回 Top-3
    output_fields=["text", "category"]
)

# 5. 查看结果
for hits in results:
    for hit in hits:
        print(f"ID: {hit['id']}, 距离: {hit['distance']:.4f}, "
              f"文本: {hit['entity']['text']}")

# 6. 清理
client.drop_collection("demo_collection")

核心对象解析

Milvus 数据层级结构:

Milvus 实例
└── 数据库 (Database)           ← 逻辑隔离,类似 MySQL 的 Database
    └── 集合 (Collection)       ← 类似 MySQL 的 Table
        ├── 分区 (Partition)    ← 数据按规则分区,加速查询
        │   └── 段 (Segment)    ← 数据存储的最小单元
        │       ├── Growing 段  ← 正在写入的段(存在内存中)
        │       └── Sealed 段   ← 已封存的段(已持久化到对象存储)
        └── 索引 (Index)        ← 加速检索的数据结构
对象 说明 类比
Connection 与 Milvus 服务的连接 数据库连接
Database 逻辑命名空间 MySQL Database
Collection 数据集合,包含 Schema MySQL Table
Partition 集合内的数据分区 分区表
Segment 数据存储的物理单元 数据文件
Index 加速检索的索引结构 B+ Tree Index

第二阶段:核心操作与数据模型(Day 3-6)

第 3 章 数据模型与集合设计

3.1 Milvus 核心数据概念

层级关系

Database (数据库)
  └── Collection (集合) ─── Schema + 索引 + 配置
        ├── Partition (分区) ─── 数据按规则划分
        │     └── Segment (段) ─── 物理存储单元
        │           ├── Growing Segment ─── 正在写入,在内存
        │           └── Sealed Segment ─── 已封存,在对象存储
        └── Shard (分片) ─── 基于 Pulsar 的写入通道

关键概念解释

  • 集合(Collection):Milvus 中存储数据的基本单元,类似关系数据库的表
  • 分区(Partition):集合内的数据分区,可按业务规则手动创建或通过分区键自动管理
  • 分片(Shard):基于消息队列(Pulsar/Kafka)的写入通道,分片数量决定写入吞吐量上限
  • 段(Segment):数据存储的最小物理单元,每个段默认包含最多 512MB 数据

字段分类

字段类型 说明 示例
主键字段 唯一标识每条记录,必选 id (INT64 或 VARCHAR)
标量字段 存储非向量的结构化数据 title (VARCHAR), price (FLOAT)
向量字段 存储向量嵌入数据 embedding (FLOAT_VECTOR)

3.2 Schema 字段设计详解

主键字段

from pymilvus import MilvusClient, DataType

# 方式一:INT64 自增主键(推荐,性能最优)
schema = MilvusClient.create_schema()
schema.add_field(
    field_name="id",
    datatype=DataType.INT64,
    is_primary=True,
    auto_id=True  # 自动生成递增ID
)

# 方式二:VARCHAR 自定义主键(适合有自然键的场景)
schema.add_field(
    field_name="doc_id",
    datatype=DataType.VARCHAR,
    max_length=256,
    is_primary=True,
    auto_id=False  # 需要手动指定
)

选型建议

  • 优先使用 INT64 自增主键,性能最优
  • 当需要与外部系统数据关联时(如文档ID),使用 VARCHAR 主键
  • VARCHAR 主键在过滤查询中性能略低于 INT64

标量字段类型全解

schema = MilvusClient.create_schema()

# 整数类型
schema.add_field(field_name="age", datatype=DataType.INT64)       # 64位整数
schema.add_field(field_name="count", datatype=DataType.INT32)     # 32位整数
schema.add_field(field_name="big_id", datatype=DataType.INT16)    # 16位整数
schema.add_field(field_name="flag", datatype=DataType.INT8)       # 8位整数

# 浮点类型
schema.add_field(field_name="score", datatype=DataType.FLOAT)     # 32位浮点
schema.add_field(field_name="price", datatype=DataType.DOUBLE)    # 64位浮点

# 布尔类型
schema.add_field(field_name="is_active", datatype=DataType.BOOL)

# 字符串类型
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=512)

# JSON 类型(灵活存储半结构化数据)
schema.add_field(field_name="metadata", datatype=DataType.JSON)

# 数组类型
schema.add_field(
    field_name="tags",
    datatype=DataType.ARRAY,
    element_type=DataType.VARCHAR,
    max_capacity=20,       # 数组最大容量
    max_length=64          # 元素最大长度
)

JSON 字段使用详解

# JSON 字段支持嵌套结构,适合存储动态元数据
data = [
    {
        "id": 1,
        "vector": [0.1] * 128,
        "metadata": {
            "source": "wiki",
            "author": "张三",
            "tags": ["AI", "数据库"],
            "stats": {"views": 1000, "likes": 50}
        }
    }
]

# JSON 字段支持嵌套过滤
results = client.search(
    collection_name="my_collection",
    data=[query_vector],
    filter='metadata["source"] == "wiki" && metadata["stats"]["views"] > 500',
    limit=10
)

向量字段类型

# 浮点稠密向量(最常用)
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768
)

# 二值向量(用于哈希检索场景)
schema.add_field(
    field_name="binary_hash",
    datatype=DataType.BINARY_VECTOR,
    dim=256  # 维度必须是8的倍数
)

# 稀疏向量(2.4+ 支持,用于关键词检索)
schema.add_field(
    field_name="sparse_embedding",
    datatype=DataType.SPARSE_FLOAT_VECTOR
    # 稀疏向量无需指定 dim,维度动态
)

# Float16 向量(2.5+ 支持,节省内存)
schema.add_field(
    field_name="fp16_embedding",
    datatype=DataType.FLOAT16_VECTOR,
    dim=768
)

# BFloat16 向量(2.5+ 支持)
schema.add_field(
    field_name="bf16_embedding",
    datatype=DataType.BFLOAT16_VECTOR,
    dim=768
)

# Int8 向量(2.6 新增,内存减半)
schema.add_field(
    field_name="int8_embedding",
    datatype=DataType.INT8_VECTOR,
    dim=768
)

2.6 新增字段类型

# Geometry 地理空间字段(2.6 新增)
schema.add_field(
    field_name="location",
    datatype=DataType.GEOMETRY  # 支持 POINT/LINESTRING/POLYGON
)

# Struct Array 结构化数组(2.6 新增)
# 一个实体包含多个子项,每个子项有独立向量
schema.add_field(
    field_name="page_embeddings",
    datatype=DataType.ARRAY,
    element_type=DataType.STRUCT,
    struct_fields=[
        FieldSchema("page_text", DataType.VARCHAR, max_length=4096),
        FieldSchema("page_vector", DataType.FLOAT_VECTOR, dim=768),
        FieldSchema("page_number", DataType.INT64),
    ],
    max_capacity=50
)

# Nullable Vector 可空向量(2.6.18 新增)
# 允许插入时向量字段为空,搜索时自动跳过 NULL 向量
schema.add_field(
    field_name="embedding",
    datatype=DataType.FLOAT_VECTOR,
    dim=768,
    nullable=True  # 2.6.18 新增:向量字段可空
)

Nullable Vector 使用场景

  • 嵌入尚未生成时先插入元数据,后续回填向量
  • 动态添加向量字段到已有集合(新字段对旧数据为 NULL)
  • 部分实体不需要向量检索(如纯元数据记录)
# 插入含 NULL 向量的数据
data = [
    {"id": 1, "text": "已处理文档", "embedding": [0.1] * 768},
    {"id": 2, "text": "待处理文档", "embedding": None},  # NULL 向量
]
client.insert(collection_name="docs", data=data)

# 搜索时自动跳过 NULL 向量,不会报错
results = client.search(
    collection_name="docs",
    data=[[0.1] * 768],
    anns_field="embedding",
    limit=10
)

向量维度选型建议

嵌入模型 输出维度 推荐向量类型 内存占用(每百万条)
OpenAI text-embedding-3-small 1536 FLOAT_VECTOR ~5.9 GB
OpenAI text-embedding-3-large 3072 FLOAT_VECTOR / INT8_VECTOR ~11.7 GB / ~2.9 GB
BGE-large-zh 1024 FLOAT_VECTOR ~3.9 GB
BGE-base-zh 768 FLOAT_VECTOR / INT8_VECTOR ~2.9 GB / ~0.7 GB
SPLADE 稀疏向量 动态 SPARSE_FLOAT_VECTOR ~0.5 GB
Cohere embed-v4 1024 FLOAT_VECTOR ~3.9 GB

3.3 集合创建与核心配置

create_collection 参数全解

from pymilvus import MilvusClient, DataType

# 1. 创建 Schema
schema = MilvusClient.create_schema(auto_id=False, enable_dynamic_field=True)

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=2048)
schema.add_field(field_name="category", datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name="year", datatype=DataType.INT32)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=1024)

# 2. 创建索引参数
index_params = client.prepare_index_params()

index_params.add_index(
    field_name="embedding",
    index_type="HNSW",
    metric_type="COSINE",
    params={"M": 16, "efConstruction": 256}
)

index_params.add_index(
    field_name="category",
    index_type="INVERTED"
)

# 3. 创建集合
client.create_collection(
    collection_name="knowledge_base",
    schema=schema,
    index_params=index_params,
    shards_num=2,                    # 分片数量
    num_partitions=64,               # 分区数量(使用分区键时)
    partition_key_field="category",  # 分区键字段
    consistency_level="Bounded"      # 一致性级别
)

分片数量(shards_num)配置原则

数据规模 推荐分片数 说明
< 100万 1 单分片足够
100万~1000万 2~4 中等写入量
1000万~1亿 4~8 高写入量
> 1亿 8~16 超高写入量

注意:分片数量创建后不可修改,请根据预估数据量和写入速率提前规划。

分区键(Partition Key)设计

# 分区键自动分区:按 category 字段自动分区
schema.add_field(
    field_name="category",
    datatype=DataType.VARCHAR,
    max_length=64,
    is_partition_key=True  # 标记为分区键
)

# 创建集合时指定分区数量
client.create_collection(
    collection_name="auto_partition_demo",
    schema=schema,
    index_params=index_params,
    num_partitions=64  # 自动创建64个分区
)

分区键 vs 手动分区

维度 分区键 手动分区
管理方式 自动 手动创建/管理
分区数量 创建时指定,可很多 通常少量
过滤优化 自动按分区裁剪 需指定分区名
灵活性 较低 较高
适用场景 枚举值少的字段 复杂业务分区

动态字段 enable_dynamic_field

# 启用动态字段后,可以插入 Schema 中未定义的字段
schema = MilvusClient.create_schema(enable_dynamic_field=True)

data = [
    {
        "id": 1,
        "embedding": [0.1] * 768,
        "text": "示例文本",
        # 以下字段不在 Schema 中,但可以存储
        "author": "张三",
        "priority": 5,
    }
]
# 动态字段存储在 $meta 字段中,可以通过 filter 过滤

建议:高频过滤的字段使用静态字段,低频/探索性字段使用动态字段。

集合别名(Alias)

# 创建别名(用于平滑切换、灰度发布)
client.create_alias(
    collection_name="knowledge_base_v2",
    alias="knowledge_base"
)

# 修改别名指向(灰度切换)
client.alter_alias(
    collection_name="knowledge_base_v3",
    alias="knowledge_base"
)

# 删除别名
client.drop_alias(alias="knowledge_base")

动态添加字段(2.6 新增 Add Field)

Milvus 2.6 支持在不停止服务的情况下向已有集合添加新标量字段,无需重建集合:

# 向已有集合添加新字段(2.6 新增)
client.add_collection_field(
    collection_name="knowledge_base",
    field_name="priority",
    data_type=DataType.INT64,
    nullable=True,       # 新增字段必须可空(旧数据该字段值为 NULL)
    default_value=0      # 可选:设置默认值
)

# 添加 VARCHAR 字段
client.add_collection_field(
    collection_name="knowledge_base",
    field_name="department",
    data_type=DataType.VARCHAR,
    max_length=128,
    nullable=True
)

Add Field 限制与注意事项

  • 只能添加标量字段(INT64、VARCHAR、FLOAT、DOUBLE 等),不能添加向量字段
  • 新增字段必须设置为 nullable=True,因为旧数据没有该字段值
  • 字段名必须唯一,不可与已有字段重名
  • 每个集合的字段总数有上限(通常不超过 1024 个)
  • 不可通过 Add Field 启用动态字段功能,需在创建时设置

与 Nullable Vector 配合:2.6.18 支持向量字段可空后,可以先用 Add Field 添加标量字段,再添加新的可空向量字段,实现 Schema 的渐进式演进。

3.4 分区管理

分区的核心作用

分区是 Milvus 中缩小查询范围的核心机制。当查询指定分区时,Milvus 只在对应分区内搜索,大幅减少计算量。

无分区查询:搜索全部数据                    指定分区查询:只搜索目标分区
┌─────────────────────────┐            ┌───────────┬───────────┐
│    全部数据 (1亿条)      │            │ 分区A      │ 分区B      │
│    搜索范围: 1亿         │     →      │ 搜索范围:  │           │
│                         │            │ 1000万 ✓  │           │
└─────────────────────────┘            └───────────┴───────────┘

手动分区操作

# 创建分区
client.create_partition(
    collection_name="knowledge_base",
    partition_name="partition_2024"
)

# 插入数据到指定分区
client.insert(
    collection_name="knowledge_base",
    data=data_2024,
    partition_name="partition_2024"
)

# 在指定分区中搜索
results = client.search(
    collection_name="knowledge_base",
    data=[query_vector],
    partition_names=["partition_2024"],
    limit=10
)

# 释放分区(释放内存)
client.release_partitions(
    collection_name="knowledge_base",
    partition_names=["partition_2023"]
)

# 删除分区
client.drop_partition(
    collection_name="knowledge_base",
    partition_name="partition_2022"
)

TTL 配置

# 设置集合的数据生存时间(TTL),过期数据自动清理
client.alter_collection_properties(
    collection_name="logs_collection",
    properties={"collection.ttl.seconds": 2592000}  # 30天
)

3.5 集合生命周期管理

加载与释放

# 加载集合到内存(查询前必须加载)
client.load_collection(
    collection_name="knowledge_base",
    replica_number=2  # 可选:指定加载的副本数(读高可用)
)

# 释放集合(释放内存,数据仍在对象存储中)
client.release_collection(
    collection_name="knowledge_base"
)

# 查看集合加载状态
info = client.describe_collection("knowledge_base")
print(info["load_state"])  # Loaded / NotLoad / Loading

重要概念

  • Load:将数据从对象存储加载到 QueryNode 内存,使其可被查询
  • Release:从 QueryNode 内存卸载数据,释放计算资源,数据不会丢失
  • 数据写入后不需要手动 Load,Growing 段自动在内存中
  • Sealed 段需要 Load 后才能被查询到

查看与管理

# 列出所有集合
collections = client.list_collections()

# 查看集合详情
info = client.describe_collection("knowledge_base")

# 检查集合是否存在
exists = client.has_collection("knowledge_base")

# 删除集合(不可恢复!)
client.drop_collection("knowledge_base")

第 4 章 数据写入与更新

4.1 基础写入 API 全解析

insert:批量新增数据

data = [
    {"id": 1, "text": "文档1", "category": "技术", "embedding": [0.1] * 768},
    {"id": 2, "text": "文档2", "category": "产品", "embedding": [0.2] * 768},
    {"id": 3, "text": "文档3", "category": "技术", "embedding": [0.3] * 768},
]

result = client.insert(
    collection_name="knowledge_base",
    data=data
)
print(f"插入数量: {result['insert_count']}")

upsert:更新插入(幂等写入)

# upsert:如果主键已存在则更新,不存在则插入
upsert_data = [
    {"id": 1, "text": "文档1-更新版", "category": "技术", "embedding": [0.15] * 768},
]

result = client.upsert(
    collection_name="knowledge_base",
    data=upsert_data
)

insert vs upsert

  • insert:主键冲突时会报错
  • upsert:主键冲突时更新,不冲突时插入,适合幂等写入场景
  • upsert 性能略低于 insert,因为需要先查询主键是否存在

delete:按主键/条件删除

# 按主键删除
client.delete(
    collection_name="knowledge_base",
    ids=[1, 2, 3]
)

# 按条件删除
client.delete(
    collection_name="knowledge_base",
    filter='category == "过期" && year < 2020'
)

数据写入的一致性级别与可见性延迟

Milvus 采用 WAL(Write-Ahead Log)机制保证数据持久性:

客户端写入 → Proxy → Pulsar/Kafka → DataNode → 对象存储(MinIO/S3)
                                              ↓
                                    Growing Segment(立即可查)
                                              ↓
                                    Sealed Segment(持久化后可查)

4.2 批量写入最佳实践

单批次大小优化

import numpy as np

BATCH_SIZE = 1000  # 每批1000条,平衡吞吐与内存

def batch_insert(client, collection_name, all_data, batch_size=BATCH_SIZE):
    """分批写入数据"""
    total = len(all_data)
    for i in range(0, total, batch_size):
        batch = all_data[i:i+batch_size]
        result = client.insert(
            collection_name=collection_name,
            data=batch
        )
        print(f"已写入: {min(i+batch_size, total)}/{total}")

批次大小建议

  • 小批次(< 100):延迟低但吞吐低,适合实时写入
  • 中批次(100~5000):吞吐与延迟平衡,推荐大多数场景
  • 大批次(> 5000):吞吐高但内存占用大,适合离线导入

并发写入与连接池

from concurrent.futures import ThreadPoolExecutor, as_completed

def insert_batch(client, collection_name, batch_data):
    try:
        result = client.insert(collection_name=collection_name, data=batch_data)
        return result['insert_count']
    except Exception as e:
        print(f"写入失败: {e}")
        return 0

def concurrent_insert(client, collection_name, all_data,
                      batch_size=1000, max_workers=4):
    batches = [all_data[i:i+batch_size]
               for i in range(0, len(all_data), batch_size)]

    total_inserted = 0
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(insert_batch, client, collection_name, batch)
            for batch in batches
        ]
        for future in as_completed(futures):
            total_inserted += future.result()

    print(f"总写入: {total_inserted}")

4.3 数据更新与删除底层机制

软删除与硬删除原理

Milvus 的删除操作采用软删除机制:

删除流程:
1. 客户端发送 Delete 请求
2. Proxy 将删除标记写入 Pulsar/Kafka
3. DataNode 消费删除标记,写入 Delete Log
4. 查询时,QueryNode 根据删除标记过滤结果
5. Compaction 时,真正从物理文件中移除已删除数据

删除标记(Delete Log):
┌──────────────────────────────────────┐
│ Segment ID │ Primary Key │ Timestamp │
│ seg_001    │ 42          │ 1700000   │
│ seg_001    │ 87          │ 1700001   │
│ seg_002    │ 15          │ 1700002   │
└──────────────────────────────────────┘

关键点

  • 删除是异步的,删除后数据不会立即从磁盘消失
  • 查询时通过 Delete Log 过滤已删除数据
  • 物理删除在 Compaction 时执行
  • 大量删除后建议触发 Compaction 回收空间

Compaction 数据合并机制

# 手动触发 Compaction
client.compact(
    collection_name="knowledge_base",
)

# 查看 Compaction 状态
status = client.get_compaction_state(collection_name="knowledge_base")

Compaction 类型

类型 说明 触发条件
段合并 将多个小段合并为大段 段数量超过阈值
垃圾回收 清理已删除数据和过期段 删除数据量超过阈值
索引重建 合并后重建索引 段合并完成后自动触发

4.4 海量数据离线导入(Bulk Insert)

当需要导入百万级以上数据时,使用 Bulk Insert 比 insert API 效率更高:

import json

# 1. 准备数据文件(JSON 行格式)
bulk_data = []
for i in range(1000000):
    bulk_data.append({
        "id": i,
        "text": f"文档_{i}",
        "embedding": np.random.randn(768).tolist()
    })

with open("bulk_data.json", "w") as f:
    for item in bulk_data:
        f.write(json.dumps(item) + "\n")

# 2. 上传到对象存储(MinIO/S3)
# mc cp bulk_data.json myminio/milvus-bucket/bulk/

# 3. 调用 BulkInsert API

支持的文件格式

格式 适用场景 压缩支持
JSON Rows 小规模、灵活格式
Parquet 大规模、列式存储 内置压缩
CSV 简单数据、兼容性好

4.5 时间旅行(Time Travel)

Milvus 支持按时间戳查询历史数据快照:

import time

timestamp_before = int(time.time())

client.insert(collection_name="knowledge_base", data=new_data)

timestamp_after = int(time.time())

# 查询插入前的数据(不包含新插入的数据)
results = client.query(
    collection_name="knowledge_base",
    filter="id > 0",
    travel_timestamp=timestamp_before
)

# 查询插入后的数据(包含新插入的数据)
results = client.query(
    collection_name="knowledge_base",
    filter="id > 0",
    travel_timestamp=timestamp_after
)

时间旅行应用场景

  • 数据误删恢复:查询删除前的数据快照
  • 版本对比:比较不同时间点的数据差异
  • 审计追踪:查看历史数据状态

注意:时间旅行依赖 Compaction 前的段数据,已 Compaction 的旧数据可能无法回溯。—

第 5 章 向量索引基础

5.1 索引的核心作用与分类

为什么必须构建索引

无索引(暴力搜索):
  查询向量 vs 1亿条向量 → 逐一计算相似度 → O(N) → 延迟: 秒级

有索引(ANN搜索):
  查询向量 → 索引结构导航 → 只搜索相关子集 → O(log N) → 延迟: 毫秒级

索引分类

Milvus 索引体系(2.6.18)
├── 向量索引(加速向量相似度搜索)
│   ├── FLAT              暴力搜索,100%召回
│   ├── HNSW              图索引,高召回高性能
│   ├── IVF_FLAT          倒排索引,均衡型
│   ├── IVF_SQ8           量化压缩,省内存
│   ├── IVF_PQ            乘积量化,极致压缩
│   ├── IVF_RABITQ        1-bit量化索引(2.6新增),内存降72%
│   ├── DiskANN           磁盘索引,超大规模
│   ├── GPU_IVF_FLAT      GPU加速
│   ├── GPU_IVF_PQ        GPU加速+量化
│   ├── GPU_CAGRA         GPU图索引(2.5+)
│   ├── SPARSE_INVERTED_INDEX  稀疏倒排索引
│   └── SPARSE_WAND           稀疏WAND索引
│
└── 标量索引(加速标量过滤)
    ├── INVERTED          倒排索引
    ├── STL_SORT          排序索引(数值)
    ├── TRIE              前缀索引(字符串)
    └── PATH_INDEX        JSON路径索引(2.6新增),过滤快100倍

段与索引的关系

Growing Segment(增长段):
  - 正在写入的段,数据在内存中
  - 使用暴力搜索(FLAT)进行查询
  - 不构建索引

Sealed Segment(封存段):
  - 已持久化到对象存储的段
  - 触发索引构建任务
  - 索引构建完成后,查询使用索引加速

5.2 向量索引选型全景

FLAT 暴力索引

index_params.add_index(
    field_name="embedding",
    index_type="FLAT",
    metric_type="COSINE"
)
  • 召回率:100%(精确搜索)
  • 查询速度:最慢,O(N)
  • 内存占用:原始向量大小
  • 适用场景:小数据集(< 10万)、验证基准、需要精确结果的场景

HNSW 索引(主流生产选型)

index_params.add_index(
    field_name="embedding",
    index_type="HNSW",
    metric_type="COSINE",
    params={
        "M": 16,              # 图的连接度
        "efConstruction": 256  # 构建时的搜索宽度
    }
)
  • 召回率:99%+(参数调优后)
  • 查询速度:极快,亚毫秒级
  • 内存占用:较高(约为原始向量的 1.5~2 倍)
  • 适用场景:百万~千万级数据,延迟敏感型应用

HNSW 原理简述

HNSW(Hierarchical Navigable Small World)构建了一个多层导航图:

  • 上层是稀疏的"高速公路",用于快速跳转
  • 下层是稠密的"城市道路",用于精确定位
  • 查询时从顶层开始,逐层向下逼近最近邻
Layer 2:  ○────────────────○         (稀疏,快速跳转)
          │                │
Layer 1:  ○────○────○────○────○      (中等密度)
          │    │    │    │
Layer 0:  ○──○──○──○──○──○──○──○     (稠密,精确定位)

IVF 系列索引

# IVF_FLAT:倒排+原始向量
index_params.add_index(
    field_name="embedding",
    index_type="IVF_FLAT",
    metric_type="L2",
    params={"nlist": 1024}  # 聚类中心数量
)

# IVF_SQ8:倒排+标量量化(8位整数)
index_params.add_index(
    field_name="embedding",
    index_type="IVF_SQ8",
    metric_type="L2",
    params={"nlist": 1024}
)

# IVF_PQ:倒排+乘积量化
index_params.add_index(
    field_name="embedding",
    index_type="IVF_PQ",
    metric_type="L2",
    params={
        "nlist": 1024,
        "m": 16,       # 子向量数量(dim必须能被m整除)
        "nbits": 8     # 每个子量化的位数
    }
)
索引 内存占用 召回率 查询速度 适用场景
IVF_FLAT 原始大小 均衡型,中等数据量
IVF_SQ8 原始的 25% 中高 内存受限场景
IVF_PQ 原始的 5~10% 极端内存受限

IVF_RABITQ 索引(2.6 新增,重大突破)

RaBitQ(Random Bit Quantization)是 Milvus 2.6 引入的革命性量化索引,将 FP32 向量压缩为 1-bit 表示:

# IVF_RABITQ:1-bit 量化索引(2.6 新增)
index_params.add_index(
    field_name="embedding",
    index_type="IVF_RABITQ",
    metric_type="COSINE",
    params={
        "nlist": 1024,       # 聚类中心数量
        "nbits": 1,          # 量化位数(1-bit,默认)
        "refine": True,      # 可选:启用SQ8精排,提升召回率
        "refine_type": "SQ8" # 精排量化类型
    }
)

RaBitQ 核心原理

  • 将高维向量归一化后,每个维度用 1 个 bit 表示(正/负)
  • 压缩比达到 32:1(FP32 → 1-bit)
  • 结合 SQ8 精排(refine),在保持 ~95% 召回率的同时大幅降低内存

性能数据

指标 HNSW (FP32) IVF_RABITQ (1-bit) IVF_RABITQ + SQ8精排
内存占用 100% ~8% ~28%
查询 QPS 1x 4x 4x
召回率 99%+ ~85% ~95%
适用场景 延迟敏感 成本敏感 均衡

选型建议:IVF_RABITQ 是 IVF_SQ8 和 IVF_FLAT 的理想替代方案,在内存成本和召回率之间取得了更好的平衡。

DiskANN 索引

index_params.add_index(
    field_name="embedding",
    index_type="DISKANN",
    metric_type="L2",
    params={
        "search_list": 100  # 搜索列表大小,影响召回率
    }
)
  • 核心优势:向量数据存储在 SSD 上,内存只存压缩后的图结构
  • 内存占用:约为原始向量的 5~10%
  • 查询速度:依赖 SSD 性能,P99 约 5~20ms
  • 适用场景:亿级以上数据,成本敏感,可接受稍高延迟

GPU 索引

# GPU_IVF_FLAT
index_params.add_index(
    field_name="embedding",
    index_type="GPU_IVF_FLAT",
    metric_type="L2",
    params={"nlist": 1024}
)

# GPU_CAGRA(2.5+ 新增,NVIDIA CAGRA 图索引)
index_params.add_index(
    field_name="embedding",
    index_type="GPU_CAGRA",
    metric_type="L2",
    params={
        "intermediate_graph_degree": 64,
        "graph_degree": 32
    }
)
  • GPU_CAGRA:Milvus 2.5 新增,索引构建速度比 CPU HNSW 快 40 倍
  • CAGRA+Vamana 混合模式(2.6 新增):在 GPU 上构建图索引,但查询时使用 CPU,大幅降低 GPU 部署成本。无需常驻 GPU 即可享受 GPU 构建速度
  • 适用场景:高并发批量查询、索引需要频繁重建

稀疏向量索引

# SPARSE_INVERTED_INDEX:稀疏倒排索引
index_params.add_index(
    field_name="sparse_embedding",
    index_type="SPARSE_INVERTED_INDEX",
    metric_type="IP",
    params={"drop_ratio_build": 0.2}  # 构建时丢弃最小20%的值
)

# SPARSE_WAND:WAND算法索引,查询更快
index_params.add_index(
    field_name="sparse_embedding",
    index_type="SPARSE_WAND",
    metric_type="IP",
    params={"drop_ratio_build": 0.2}
)

5.3 标量索引

# 自动索引(Milvus 自动选择最优索引类型)
index_params.add_index(
    field_name="category",
    index_type="AUTOINDEX"
)

# 倒排索引(推荐,支持等值和范围查询)
index_params.add_index(
    field_name="category",
    index_type="INVERTED"
)

# 排序索引(数值字段,加速范围查询)
index_params.add_index(
    field_name="year",
    index_type="STL_SORT"
)

# 前缀索引(VARCHAR字段,加速前缀匹配)
index_params.add_index(
    field_name="title",
    index_type="TRIE"
)

# JSON Path Index(2.6 新增,JSON嵌套过滤快100倍)
index_params.add_index(
    field_name="metadata",
    index_type="PATH_INDEX",
    params={
        "path": "author.name",    # JSON路径
        "type": "VARCHAR"         # 路径值类型
    }
)

Path Index 详解(2.6 新增)

Path Index 是 Milvus 2.6 为 JSON 字段引入的专用索引,解决了嵌套 JSON 过滤性能差的痛点:

# 创建带 Path Index 的集合
schema.add_field(field_name="metadata", datatype=DataType.JSON)

index_params.add_index(
    field_name="metadata",
    index_type="PATH_INDEX",
    params={"path": "region", "type": "VARCHAR"}
)
index_params.add_index(
    field_name="metadata",
    index_type="PATH_INDEX",
    params={"path": "stats.views", "type": "INT64"}
)

# 过滤查询自动使用 Path Index
results = client.search(
    collection_name="docs",
    data=[query_vector],
    filter='metadata["region"] == "cn" && metadata["stats"]["views"] > 1000',
    limit=10
)
# 无 Path Index: 全集扫描,JSON解析开销大
# 有 Path Index: 索引直接定位,速度提升 100 倍
对比 无 Path Index 有 Path Index
嵌套 JSON 过滤 全集扫描 索引定位
过滤性能 基准 快 100 倍
存储开销 轻微增加
适用场景 简单过滤 复杂嵌套 JSON 过滤

5.4 索引构建与管理

# 创建索引
client.create_index(
    collection_name="knowledge_base",
    index_params=index_params
)

# 查看索引信息
indexes = client.list_indexes(collection_name="knowledge_base")
for index in indexes:
    print(f"字段: {index['field_name']}, 类型: {index['index_type']}")

# 删除索引
client.drop_index(
    collection_name="knowledge_base",
    index_name="embedding"
)

索引内存占用估算

HNSW 索引内存 ≈ 原始向量大小 × (1 + M × 2 × 8 / dim)
以 768 维 FLOAT 向量、M=16 为例:
  原始向量: 768 × 4 bytes = 3,072 bytes/条
  图结构: 16 × 2 × 8 = 256 bytes/条
  总计: ~3,328 bytes/条
  百万条: ~3.2 GB
  千万条: ~32 GB

第 6 章 向量查询与检索

6.1 基础向量检索

search() 方法参数全解

results = client.search(
    collection_name="knowledge_base",   # 集合名称
    data=[query_vector],                # 查询向量列表(支持批量)
    limit=10,                           # 返回Top-K结果
    output_fields=["text", "category"], # 返回的字段
    search_params={                      # 搜索参数
        "metric_type": "COSINE",
        "params": {
            "ef_search": 128,            # HNSW搜索宽度
        }
    },
    filter='category == "技术"',         # 标量过滤条件
    partition_names=["partition_2024"],  # 指定分区
    timeout=10.0,                       # 超时时间(秒)
    consistency_level="Bounded",        # 一致性级别
)

距离分数解读

不同度量方式下,distance 值的含义不同:

度量方式 distance 含义 完全相同 完全无关 排序
COSINE 余弦距离 0 2 升序(越小越相似)
L2 欧氏距离 0 +∞ 升序(越小越相似)
IP 内积 最大值 负值 降序(越大越相似)
# 距离阈值过滤示例
results = client.search(
    collection_name="knowledge_base",
    data=[query_vector],
    limit=20,
    search_params={
        "metric_type": "COSINE",
        "params": {"ef_search": 128}
    }
)

# 过滤距离大于0.5的结果(余弦距离越小越相似)
filtered = [
    hit for hits in results for hit in hits
    if hit['distance'] < 0.5
]

6.2 标量过滤混合检索

过滤表达式语法全解

Milvus 的过滤表达式支持丰富的操作符:

# 比较运算符
filter = 'age > 18'
filter = 'price <= 100.0'
filter = 'category == "技术"'
filter = 'category != "过期"'

# 逻辑运算符
filter = 'age > 18 && category == "技术"'  # AND
filter = 'category == "技术" || category == "产品"'  # OR
filter = 'not (category == "过期")'  # NOT

# IN / NOT IN
filter = 'category in ["技术", "产品", "运营"]'
filter = 'status not in ["deleted", "archived"]'

# LIKE 模式匹配
filter = 'title like "%向量%"'
filter = 'title like "Milvus%"'

# JSON 字段嵌套过滤
filter = 'metadata["source"] == "wiki"'
filter = 'metadata["stats"]["views"] > 1000'
filter = 'metadata["tags"][0] == "AI"'

# 数组字段过滤
filter = 'array_contains(tags, "AI")'
filter = 'array_contains_all(tags, ["AI", "数据库"])'
filter = 'array_contains_any(tags, ["AI", "ML"])'
filter = 'array_length(tags) > 3'

前置过滤 vs 后置过滤原理

前置过滤(Pre-filtering):
  1. 先根据标量条件过滤出候选集
  2. 在候选集上进行向量搜索
  3. 适合高选择性过滤(过滤掉大部分数据)

后置过滤(Post-filtering):
  1. 先进行向量搜索得到 Top-K
  2. 再根据标量条件过滤
  3. 适合低选择性过滤(过滤掉少量数据)

Milvus 2.5 默认使用前置过滤(过滤下推优化)

6.3 其他查询能力

query():标量条件精确查询

# 按条件查询(不涉及向量检索)
results = client.query(
    collection_name="knowledge_base",
    filter='category == "技术" && year >= 2024',
    output_fields=["id", "text", "category", "year"],
    limit=100
)

# 按主键查询
results = client.query(
    collection_name="knowledge_base",
    ids=[1, 2, 3],
    output_fields=["id", "text"]
)

get():按主键批量获取数据

# 按主键获取完整数据
results = client.get(
    collection_name="knowledge_base",
    ids=[1, 2, 3],
    output_fields=["id", "text", "category"]
)

分页查询

# 方式一:limit + offset 分页(2.5+ 支持)
results = client.query(
    collection_name="knowledge_base",
    filter='category == "技术"',
    limit=10,
    offset=20  # 跳过前20条,返回第21~30条
)

# 方式二:游标分页(基于主键,性能更好)
last_id = 0
page_size = 10

while True:
    results = client.query(
        collection_name="knowledge_base",
        filter=f'category == "技术" && id > {last_id}',
        limit=page_size,
        output_fields=["id", "text"]
    )
    if not results:
        break
    last_id = results[-1]["id"]
    process_results(results)

分页方式对比

方式 优点 缺点 适用场景
limit+offset 简单直观 深度分页性能差 浅分页
游标分页 深度分页性能好 不能跳页 流式遍历

分组查询(Group By)

Milvus 2.5 新增 Group By 功能,按元数据分组返回 Top-K:

# 按分类分组,每组返回最相似的1条
results = client.search(
    collection_name="knowledge_base",
    data=[query_vector],
    limit=5,                    # 最多返回5组
    group_by_field="category",  # 按category分组
    output_fields=["text", "category"]
)

# 应用场景:
# - 搜索结果去重:按文档ID分组,避免同一文档的多个片段
# - 分类检索:每个分类返回最相关的一条
# - 多样性保证:确保结果来自不同来源

6.4 高级检索特性

多向量字段联合检索

Milvus 支持在同一个集合中存储多个向量字段,并进行联合检索:

from pymilvus import AnnSearchRequest, WeightedRanker

# 假设集合同时有 dense_embedding 和 sparse_embedding 两个向量字段

# 创建多个搜索请求
req_dense = AnnSearchRequest(
    data=[query_dense_vector],
    anns_field="dense_embedding",
    param={"metric_type": "COSINE", "params": {"ef_search": 128}},
    limit=20
)

req_sparse = AnnSearchRequest(
    data=[query_sparse_vector],
    anns_field="sparse_embedding",
    param={"metric_type": "IP"},
    limit=20
)

# 使用加权融合
results = client.hybrid_search(
    collection_name="knowledge_base",
    reqs=[req_dense, req_sparse],
    ranker=WeightedRanker(0.7, 0.3),  # 稠密向量权重0.7,稀疏向量权重0.3
    limit=10
)

稀疏向量 + 稠密向量混合检索

这是 RAG 系统中最常用的混合检索模式:

from pymilvus import AnnSearchRequest, WeightedRanker, RRFRanker

# 稠密向量搜索请求(语义检索)
req_dense = AnnSearchRequest(
    data=[dense_query_vector],
    anns_field="dense_embedding",
    param={"metric_type": "COSINE", "params": {"ef_search": 128}},
    limit=20
)

# 稀疏向量搜索请求(关键词检索)
req_sparse = AnnSearchRequest(
    data=[sparse_query_vector],
    anns_field="sparse_embedding",
    param={"metric_type": "IP"},
    limit=20
)

# 方式一:加权融合
results_weighted = client.hybrid_search(
    collection_name="knowledge_base",
    reqs=[req_dense, req_sparse],
    ranker=WeightedRanker(0.7, 0.3),
    limit=10
)

# 方式二:RRF 融合(Reciprocal Rank Fusion)
results_rrf = client.hybrid_search(
    collection_name="knowledge_base",
    reqs=[req_dense, req_sparse],
    ranker=RRFRanker(k=60),  # k是RRF的平滑参数
    limit=10
)

RRF 融合算法原理

RRF Score = Σ 1/(k + rank_i)

其中 rank_i 是第 i 个检索结果的排名,k 是平滑参数(默认60)

示例:
  文档A: 稠密检索排名1, 稀疏检索排名3
  RRF Score = 1/(60+1) + 1/(60+3) = 0.0164 + 0.0159 = 0.0323

  文档B: 稠密检索排名5, 稀疏检索排名1
  RRF Score = 1/(60+5) + 1/(60+1) = 0.0154 + 0.0164 = 0.0318

  文档A 的 RRF 分数更高,因为两个检索都排名靠前

全文检索(BM25)实战

Milvus 2.5 新增原生 BM25 全文检索功能:

from pymilvus import MilvusClient, DataType
from pymilvus import Function, FunctionType

# 1. 创建带 BM25 功能的集合
schema = MilvusClient.create_schema()

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=8192)
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)

# 添加 BM25 函数:自动将文本转为稀疏向量
schema.add_function(Function(
    name="bm25_func",
    input_field_names=["text"],
    output_field_names=["sparse_vector"],
    function_type=FunctionType.BM25,
))

# 创建索引
index_params = client.prepare_index_params()
index_params.add_index(
    field_name="sparse_vector",
    index_type="SPARSE_INVERTED_INDEX",
    metric_type="BM25"  # 使用BM25度量
)

client.create_collection(
    collection_name="fulltext_demo",
    schema=schema,
    index_params=index_params
)

# 2. 插入数据(只需提供文本,稀疏向量自动生成)
data = [
    {"text": "Milvus是开源的云原生向量数据库"},
    {"text": "向量检索在RAG系统中发挥关键作用"},
    {"text": "BM25是经典的关键词检索算法"},
]
client.insert(collection_name="fulltext_demo", data=data)

# 3. 全文检索
results = client.search(
    collection_name="fulltext_demo",
    data=["向量数据库"],  # 直接输入查询文本
    anns_field="sparse_vector",
    limit=5,
    output_fields=["text"]
)

BM25 全文检索原理

Milvus 2.5 的 BM25 实现基于稀疏向量,核心创新点:

  • 插入时自动对文本分词,生成词频(TF)稀疏向量
  • 查询时动态计算 BM25 分数(考虑文档长度归一化)
  • 不需要预计算 IDF,查询时实时计算
  • 支持与稠密向量混合检索,实现语义+关键词双重匹配—

第三阶段:进阶功能与深度使用(Day 7-12)

第 7 章 索引深度解析与参数调优

7.1 HNSW 索引深度

M、efConstruction、efSearch 三大核心参数原理

M(连接度)

  • 控制图中每个节点的最大连接数
  • M 越大,图的连通性越好,召回率越高
  • 但 M 越大,内存占用和构建时间也越大
  • 推荐范围:8~64,常用值 16 或 32

efConstruction(构建搜索宽度)

  • 构建索引时搜索最近邻的范围
  • efConstruction 越大,构建的图质量越高,召回率越高
  • 但构建时间显著增加
  • 推荐范围:100~500,常用值 256

efSearch(查询搜索宽度)

  • 查询时搜索最近邻的范围
  • efSearch 越大,查询召回率越高,但延迟也越高
  • 可以在查询时动态调整,无需重建索引
  • 推荐范围:64~512,常用值 128

参数对性能的量化影响

测试条件:100万条 768维向量,COSINE 度量

M=16, efConstruction=256:
┌──────────┬──────────┬───────────┬──────────┐
│ efSearch │ 召回率@10│ 延迟(ms)  │ 内存(GB) │
├──────────┼──────────┼───────────┼──────────┤
│ 64       │ 97.2%    │ 0.8       │ 3.2      │
│ 128      │ 98.8%    │ 1.2       │ 3.2      │
│ 256      │ 99.5%    │ 2.1       │ 3.2      │
│ 512      │ 99.8%    │ 3.8       │ 3.2      │
└──────────┴──────────┴───────────┴──────────┘

M=32, efConstruction=512:
┌──────────┬──────────┬───────────┬──────────┐
│ efSearch │ 召回率@10│ 延迟(ms)  │ 内存(GB) │
├──────────┼──────────┼───────────┼──────────┤
│ 64       │ 98.5%    │ 1.1       │ 4.8      │
│ 128      │ 99.4%    │ 1.8       │ 4.8      │
│ 256      │ 99.8%    │ 3.2       │ 4.8      │
│ 512      │ 99.9%    │ 5.6       │ 4.8      │
└──────────┴──────────┴───────────┴──────────┘

不同数据规模下的参数推荐

数据规模 M efConstruction efSearch(默认) 内存估算
< 100万 16 256 128 原始 × 1.5
100万~1000万 16~32 256~512 128~256 原始 × 1.5~2
1000万~1亿 32 512 256 原始 × 2
> 1亿 32~64 512 256~512 原始 × 2~2.5

HNSW 内存占用计算公式

内存占用 = N × (dim × 4 + M × 2 × 8) bytes

其中:
  N = 向量数量
  dim = 向量维度
  M = 连接度参数
  4 = float32 字节数
  8 = int64 字节数(邻居指针)

示例:100万条 768维向量,M=16
  = 1,000,000 × (768 × 4 + 16 × 2 × 8)
  = 1,000,000 × (3,072 + 256)
  = 1,000,000 × 3,328
  = 3.1 GB

7.2 IVF 系列索引深度

nlist、nprobe 参数原理

nlist(聚类中心数量)

  • 将向量空间划分为 nlist 个聚类(Voronoi 单元)
  • nlist 越大,每个聚类越小,查询越精确
  • nlist 推荐:√N ~ 4√N(N 为数据量)

nprobe(查询时搜索的聚类数)

  • 查询时搜索 nprobe 个最近的聚类
  • nprobe 越大,召回率越高,延迟越高
  • nprobe ≤ nlist
nlist=1024, nprobe=32:
  搜索范围 = 32/1024 = 3.1% 的数据
  召回率约 95%

nlist=1024, nprobe=64:
  搜索范围 = 64/1024 = 6.25% 的数据
  召回率约 98%

乘积量化(PQ)原理与精度损失

PQ 将高维向量拆分为多个子向量,分别量化:

原始向量 (768维, float32):
[0.12, -0.34, 0.56, ..., 0.78]  → 768 × 4 = 3,072 bytes

PQ 量化 (m=16, nbits=8):
  将768维拆分为16个子空间,每个48维
  每个子空间用256个聚类中心表示
  量化后: 16 × 1 = 16 bytes

压缩比: 3,072 / 16 = 192倍!
精度损失: 召回率下降约 5~15%

IVF 索引选型决策树

是否内存充足?
├── 是 → HNSW(最佳性能)
└── 否 → 是否可接受 SSD 延迟?
    ├── 是 → DiskANN
    └── 否 → 对召回率要求?
        ├── 高 → IVF_FLAT
        ├── 中 → IVF_SQ8
        └── 低 → IVF_PQ

7.3 DiskANN 索引深度

磁盘索引的技术突破

DiskANN 的核心思想是将向量数据存储在 SSD 上,内存中只保留压缩后的图结构(PQ 量化向量):

内存中:PQ 压缩向量 + 图结构
  PQ 向量: 16 bytes/条(vs 原始 3072 bytes)
  图结构: 约 64 bytes/条
  总计: ~80 bytes/条

SSD 上:原始向量数据
  原始向量: 3072 bytes/条

查询流程:
  1. 在内存中用 PQ 向量做粗排,确定候选集
  2. 从 SSD 读取候选集的原始向量
  3. 用原始向量做精排,返回最终结果

核心参数配置

index_params.add_index(
    field_name="embedding",
    index_type="DISKANN",
    metric_type="L2",
    params={
        "search_list": 100,     # 搜索列表大小,越大召回越高
        "pq_code_budget_gb": 8  # PQ 编码内存预算(GB)
    }
)
search_list 召回率@10 延迟(ms) 适用场景
50 92% 3~5 低延迟优先
100 96% 5~10 均衡
200 98% 10~20 高召回优先

7.4 GPU 索引深度

GPU 索引的硬件要求

组件 最低要求 推荐
GPU NVIDIA T4 (16GB) A100 (80GB)
CUDA 11.8+ 12.0+
驱动 525+ 535+
内存 GPU 内存 ≥ 向量数据 GPU 内存 ≥ 2× 向量数据

GPU_CAGRA 索引(2.5+ 新增)

index_params.add_index(
    field_name="embedding",
    index_type="GPU_CAGRA",
    metric_type="L2",
    params={
        "intermediate_graph_degree": 64,  # 中间图度数
        "graph_degree": 32,               # 最终图度数
        "build_algo": "IVF_PQ",           # 构建算法
        "cache_dataset_on_device": "true" # 缓存数据到GPU
    }
)

GPU_CAGRA 性能对比

操作 CPU HNSW GPU_CAGRA 提升倍数
索引构建(100万条768维) ~120s ~3s 40×
查询 QPS(Top-10) ~2000 ~8000
批量查询(1000条) ~500ms ~50ms 10×

7.5 索引选型方法论

选型公式

如果 数据量 < 100万 且 延迟要求 < 5ms:
    → HNSW (M=16, efConstruction=256)

如果 100万 < 数据量 < 1000万 且 内存充足:
    → HNSW (M=16~32, efConstruction=256~512)

如果 1000万 < 数据量 < 1亿 且 内存受限:
    → DiskANN 或 IVF_SQ8

如果 数据量 > 1亿:
    → DiskANN + 分层存储

如果有 GPU 且 需要高QPS:
    → GPU_CAGRA

如果需要关键词检索:
    → SPARSE_INVERTED_INDEX 或 SPARSE_WAND

索引选型常见误区

  1. 误区:数据量小就不需要索引 → 即使 10 万条,HNSW 也比 FLAT 快 10 倍
  2. 误区:HNSW 总是最好的 → 亿级数据 HNSW 内存可能不够
  3. 误区:DiskANN 很慢 → 现代 NVMe SSD 上 DiskANN P99 可达 10ms
  4. 误区:GPU 索引总是更快 → 单条查询 GPU 开销可能更大,批量查询才占优

第 8 章 混合搜索与高级检索

8.1 混合检索架构原理

向量检索 + 标量过滤的执行全流程

客户端请求: search(vector, filter='category == "技术"')
    │
    ▼
Proxy: 请求路由与参数校验
    │
    ▼
QueryNode:
    ┌─────────────────────────────────────────┐
    │ 1. 分区裁剪:根据 filter 裁剪不相关分区    │
    │ 2. 段级过滤:在每个 Segment 上执行         │
    │    a. 标量过滤:根据 filter 过滤候选集      │
    │    b. 向量搜索:在过滤后的候选集中搜索      │
    │ 3. 结果合并:合并多个 Segment 的结果       │
    └─────────────────────────────────────────┘
    │
    ▼
Proxy: 结果聚合与排序返回

Milvus 的过滤下推优化机制

Milvus 2.5 优化了过滤执行策略,将过滤操作下推到段级别执行:

  • 高选择性过滤(过滤掉 > 90% 数据):先过滤再搜索,效率高
  • 低选择性过滤(过滤掉 < 10% 数据):先搜索再过滤,避免索引失效
  • Milvus 自动根据统计信息选择最优策略

8.2 稀疏向量检索实战

稀疏向量生成方案

方案一:BM25(Milvus 内置)

# Milvus 2.5 内置 BM25,插入文本时自动生成稀疏向量
# 无需额外模型,详见第6章全文检索部分

方案二:SPLADE(学习型稀疏向量)

from transformers import AutoModelForMaskedLM, AutoTokenizer
import torch

# 加载 SPLADE 模型
tokenizer = AutoTokenizer.from_pretrained("naver/splade-cocondenser-ensembledistil")
model = AutoModelForMaskedLM.from_pretrained("naver/splade-cocondenser-ensembledistil")

def get_sparse_vector(text):
    """使用SPLADE生成稀疏向量"""
    tokens = tokenizer(text, return_tensors="pt")
    with torch.no_grad():
        output = model(**tokens)
    logits = output.logits
    # SPLADE: max pooling + ReLU + log
    sparse_vec = torch.max(
        torch.log1p(torch.relu(logits)) * tokens["attention_mask"].unsqueeze(-1),
        dim=1
    ).values.squeeze()
    # 转为稀疏格式 {token_id: weight}
    nonzero = sparse_vec.nonzero().squeeze()
    weights = sparse_vec[nonzero]
    return {int(k): float(v) for k, v in zip(nonzero.tolist(), weights.tolist())}

# 生成稀疏向量
sparse_vec = get_sparse_vector("向量数据库在RAG系统中的应用")
# 输出格式: {1023: 2.34, 4567: 1.56, ...}  → token_id: weight

关键词 + 语义混合检索完整实现

from pymilvus import MilvusClient, DataType, AnnSearchRequest, WeightedRanker

# 1. 创建集合(稠密+稀疏双向量)
schema = MilvusClient.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=8192)
schema.add_field(field_name="dense_vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)

index_params = client.prepare_index_params()
index_params.add_index(field_name="dense_vector", index_type="HNSW",
                       metric_type="COSINE", params={"M": 16, "efConstruction": 256})
index_params.add_index(field_name="sparse_vector", index_type="SPARSE_INVERTED_INDEX",
                       metric_type="IP")

client.create_collection("hybrid_search_demo", schema=schema, index_params=index_params)

# 2. 插入数据
from sentence_transformers import SentenceTransformer
dense_model = SentenceTransformer('BAAI/bge-large-zh-v1.5')

texts = ["Milvus向量数据库架构设计", "RAG系统最佳实践", "深度学习模型优化"]
data = []
for text in texts:
    dense_vec = dense_model.encode(text).tolist()
    sparse_vec = get_sparse_vector(text)  # 使用SPLADE
    data.append({"text": text, "dense_vector": dense_vec, "sparse_vector": sparse_vec})

client.insert("hybrid_search_demo", data)

# 3. 混合检索
query_text = "向量数据库"
dense_query = dense_model.encode(query_text).tolist()
sparse_query = get_sparse_vector(query_text)

req_dense = AnnSearchRequest(
    data=[dense_query],
    anns_field="dense_vector",
    param={"metric_type": "COSINE", "params": {"ef_search": 128}},
    limit=20
)

req_sparse = AnnSearchRequest(
    data=[sparse_query],
    anns_field="sparse_vector",
    param={"metric_type": "IP"},
    limit=20
)

results = client.hybrid_search(
    collection_name="hybrid_search_demo",
    reqs=[req_dense, req_sparse],
    ranker=WeightedRanker(0.7, 0.3),
    limit=10
)

多语言 BM25 分析器(2.6 新增)

Milvus 2.6 为 BM25 全文检索引入了多语言分析器支持,自动识别文本语言并应用对应的分词策略:

# 2.6 新增:创建集合时指定多语言 BM25 分析器
from pymilvus import MilvusClient, DataType, Function, FunctionType

schema = MilvusClient.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=8192)
schema.add_field(field_name="dense_vector", datatype=DataType.FLOAT_VECTOR, dim=768)
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)

# 多语言 BM25 函数(2.6 新增)
schema.add_function(Function(
    name="bm25_multilang",
    input_field_names=["text"],
    output_field_names=["sparse_vector"],
    function_type=FunctionType.BM25,
    params={
        "analyzer_params": {
            "type": "multi_language"  # 自动检测语言并分词
        }
    }
))

支持的分析器类型

分析器 说明 适用语言
standard 默认通用分词器 英语等空格分隔语言
chinese 中文分词器 中文
multi_language 自动语言检测 多语言混合
自定义 指定分词器和参数 特定需求

多语言 BM25 性能提升

  • 相比 Elasticsearch,BM25 全文检索速度提升 4 倍(部分数据集可达 7 倍)
  • 索引大小仅为 Elasticsearch 的约 1/3
  • 支持短语匹配(Phrase Match),精确匹配连续词组

8.3 多模态向量检索

图文跨模态检索完整实现

from pymilvus import MilvusClient, DataType
from sentence_transformers import SentenceTransformer

# 使用 CLIP 模型实现图文跨模态检索
model = SentenceTransformer('clip-ViT-B-32')

# 创建集合
schema = MilvusClient.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="image_path", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="description", datatype=DataType.VARCHAR, max_length=1024)
schema.add_field(field_name="clip_vector", datatype=DataType.FLOAT_VECTOR, dim=512)

index_params = client.prepare_index_params()
index_params.add_index(field_name="clip_vector", index_type="HNSW",
                       metric_type="COSINE", params={"M": 16, "efConstruction": 256})

client.create_collection("image_search", schema=schema, index_params=index_params)

# 插入图片数据
from PIL import Image
images = [
    {"path": "cat.jpg", "desc": "一只橘猫在阳光下睡觉"},
    {"path": "dog.jpg", "desc": "金毛犬在草地上奔跑"},
]

for img in images:
    image = Image.open(img["path"])
    vector = model.encode(image).tolist()
    client.insert("image_search", [{
        "image_path": img["path"],
        "description": img["desc"],
        "clip_vector": vector
    }])

# 用文本搜索图片
query_vector = model.encode("可爱的猫咪").tolist()
results = client.search(
    collection_name="image_search",
    data=[query_vector],
    limit=5,
    output_fields=["image_path", "description"]
)

8.4 两阶段检索与重排序

粗排 + 精排的两阶段检索架构

第一阶段(粗排):Milvus ANN 检索
  → 返回 Top-100 候选结果
  → 延迟低,召回率高

第二阶段(精排):Reranker 模型
  → 对100个候选结果精排
  → 返回 Top-10 最终结果
  → 精度高,延迟可控
from sentence_transformers import CrossEncoder

# 使用 Cross-Encoder 做精排
reranker = CrossEncoder('BAAI/bge-reranker-large')

# 第一阶段:Milvus 粗排
results = client.search(
    collection_name="knowledge_base",
    data=[query_vector],
    limit=100,  # 多召回一些候选
    output_fields=["text"]
)

# 提取候选文档
candidates = []
for hits in results:
    for hit in hits:
        candidates.append(hit['entity']['text'])

# 第二阶段:Reranker 精排
pairs = [[query_text, doc] for doc in candidates]
scores = reranker.predict(pairs)

# 按精排分数排序
ranked_results = sorted(
    zip(candidates, scores),
    key=lambda x: x[1],
    reverse=True
)[:10]

for doc, score in ranked_results:
    print(f"分数: {score:.4f}, 文本: {doc[:50]}...")

HyDE 检索增强技术

# HyDE (Hypothetical Document Embeddings)
# 核心思想:让LLM生成假设性答案,用假设答案的向量做检索

def hyde_search(query_text, llm_client, milvus_client, collection_name):
    # 1. 让LLM生成假设性答案
    prompt = f"请回答以下问题,给出详细答案:\n{query_text}"
    hypothetical_answer = llm_client.generate(prompt)

    # 2. 用假设答案的向量做检索(比直接用问题向量效果更好)
    query_vector = dense_model.encode(hypothetical_answer).tolist()

    results = milvus_client.search(
        collection_name=collection_name,
        data=[query_vector],
        limit=10,
        output_fields=["text"]
    )
    return results

第 9 章 数据管理高级特性

9.1 数据一致性体系

四种一致性级别

Milvus 提供四种一致性级别,基于 Guarantee Timestamp(GuaranteeTs)机制实现:

一致性级别 GuaranteeTs 设置 特点 适用场景
Strong 最新时间戳 读取所有已写入数据,延迟最高 金融交易、实时监控
Bounded 最新时间戳 - 容忍窗口 允许短时间内的数据不一致 大多数业务场景(推荐)
Session 客户端最后写入时间戳 自己写入的数据立即可见 交互式应用
Eventually 极小值(如1) 不保证一致性,延迟最低 日志分析、推荐召回
# 在查询时指定一致性级别
results = client.search(
    collection_name="knowledge_base",
    data=[query_vector],
    limit=10,
    consistency_level="Strong"  # Strong / Bounded / Session / Eventually
)

# 在创建集合时指定默认一致性级别
client.create_collection(
    collection_name="knowledge_base",
    schema=schema,
    index_params=index_params,
    consistency_level="Bounded"
)

一致性级别性能对比

测试条件:100万条数据,持续写入

Strong:     查询延迟 P99 = 15ms  (等待数据同步完成)
Bounded:    查询延迟 P99 = 5ms   (容忍2秒窗口)
Session:    查询延迟 P99 = 5ms   (自己写入立即可见)
Eventually: 查询延迟 P99 = 3ms   (不等待同步)

9.2 存储优化与数据生命周期

向量量化压缩技术

Milvus 支持多种向量压缩方式来降低内存占用:

压缩方式 压缩比 精度损失 实现方式
SQ8(标量量化) float32 → int8
PQ(乘积量化) 12~48× 子向量聚类编码
Float16 极低 float32 → float16
BFloat16 float32 → bfloat16
# 使用 Float16 向量字段节省内存
schema.add_field(
    field_name="fp16_embedding",
    datatype=DataType.FLOAT16_VECTOR,
    dim=768
)
# 内存占用减半,精度损失极小

冷热数据分层存储架构

Milvus 2.5 引入分层存储(Tiered Storage),允许冷数据按需从磁盘加载:

热数据层(内存)          冷数据层(SSD)
┌──────────────┐        ┌──────────────┐
│ 最近访问的段   │        │ 不常访问的段   │
│ 索引结构      │        │ 原始向量数据   │
│ 高频查询数据   │        │ 低频查询数据   │
│              │        │              │
│ 延迟: <5ms   │        │ 延迟: 5~20ms │
│ 成本: 高     │        │ 成本: 低      │
└──────────────┘        └──────────────┘

分层存储配置

# Helm values 中配置分层存储
queryNode:
  config:
    queryNode:
      lazyLoadEnabled: true  # 启用懒加载
      loadingMemoryPortion: 0.3  # 内存中保留30%热数据

9.3 数据备份与迁移

全量备份方案

# 使用 Milvus Backup 工具
# 安装
pip install milvus-backup

# 创建备份
milvus-backup create -n backup_20260614 --milvus-config config.yaml

# 恢复备份
milvus-backup restore -n backup_20260614 --milvus-config config.yaml

跨集群数据迁移

# 使用 Milvus Backup 进行跨集群迁移
# 1. 在源集群创建备份
milvus-backup create -n migration_backup --milvus-config source_config.yaml

# 2. 将备份数据复制到目标集群的对象存储
# 使用 rclone 或 mc 工具

# 3. 在目标集群恢复
milvus-backup restore -n migration_backup --milvus-config target_config.yaml

9.4 多租户与资源隔离

资源组(Resource Group)机制

# 创建资源组
client.create_resource_group("rg_tenant_a", config={
    "node_num": 2  # 分配2个QueryNode
})

# 将集合加载到指定资源组
client.load_collection(
    collection_name="tenant_a_collection",
    resource_groups=["rg_tenant_a"]
)

多租户隔离方案

方案 隔离级别 资源开销 适用场景
每租户一个集合 强隔离 大租户
每租户一个分区 中隔离 中等租户
每租户一个字段值 弱隔离 小租户
每租户一个数据库 强隔离 企业级
# 方案一:每租户一个集合(强隔离)
client.create_collection(f"tenant_{tenant_id}_docs", schema=schema, ...)

# 方案二:每租户一个分区(中隔离)
client.create_partition("shared_docs", f"tenant_{tenant_id}")
# 查询时指定分区
client.search("shared_docs", partition_names=[f"tenant_{tenant_id}"], ...)

# 方案三:分区键自动隔离(推荐)
# 使用 tenant_id 作为分区键
schema.add_field(field_name="tenant_id", datatype=DataType.VARCHAR,
                 max_length=64, is_partition_key=True)
# 查询时自动分区裁剪
client.search("shared_docs", filter=f'tenant_id == "{tenant_id}"', ...)

9.5 安全与合规

传输加密 TLS 配置

# Milvus Helm values 中启用 TLS
proxy:
  tls:
    enabled: true
    certSecret: milvus-tls-secret

# 创建 TLS 证书 Secret
kubectl create secret generic milvus-tls-secret \
  --from-file=tls.crt=server.crt \
  --from-file=tls.key=server.key \
  --from-file=ca.crt=ca.crt \
  -n milvus

身份认证

# 启用用户名密码认证
common:
  security:
    authorizationEnabled: true
# 连接时提供认证信息
client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus"  # 用户名:密码
)

RBAC 权限控制

Milvus 2.5 提供 9 个内置特权组,覆盖从只读到管理的所有权限:

特权组 级别 权限范围
COLL_RO 集合 读取集合数据
COLL_RW 集合 读写集合数据
COLL_ADMIN 集合 集合管理(含DDL)
DB_RO 数据库 读取数据库
DB_RW 数据库 读写数据库
DB_Admin 数据库 数据库管理
Cluster_RO 集群 全局只读
Cluster_RW 集群 全局读写
Cluster_Admin 集群 全局管理
# 创建用户和角色
client.create_user("analyst", "secure_password")
client.create_role("data_analyst")

# 授予角色权限
client.grant_privilege("data_analyst", "COLL_RO", "knowledge_base", "*")

# 将角色分配给用户
client.grant_role("analyst", "data_analyst")

# 验证权限
client.describe_role("data_analyst")
```---

# 第四阶段:分布式架构与生产部署(Day 13-18)

## 第 10 章 Milvus 分布式架构深度解析

### 10.1 云原生存算分离架构总览

Milvus 2.6 在 2.5 四层架构基础上,引入了 StreamingNode 流式节点和 Woodpecker WAL 引擎,实现了流批分离架构:

┌──────────────────────────────────────────────────────────────┐
│ 访问层 (Access Layer) │
│ Proxy (无状态网关) │
├──────────────────────────────────────────────────────────────┤
│ 协调层 (Coordinator Layer) │
│ RootCoord │ DataCoord │ QueryCoord │ IndexCoord │
├──────────────────────────────────────────────────────────────┤
│ 计算层 (Worker Layer) │
│ StreamingNode(2.6新增) │ DataNode │ QueryNode │ IndexNode │
├──────────────────────────────────────────────────────────────┤
│ 存储层 (Storage Layer) │
│ etcd │ Woodpecker(2.6新增)/Pulsar/Kafka │ MinIO/S3 │
└──────────────────────────────────────────────────────────────┘


**2.6 架构升级要点**:

1. **StreamingNode(流式节点)**:新增的流式处理节点,负责实时数据摄入和即时索引
2. **Woodpecker(零磁盘 WAL)**:替代 Kafka/Pulsar 的自研 WAL 引擎,零磁盘架构
3. **Storage v2**:新存储引擎,降低 IOPS 和内存占用
4. **流批分离**:实时写入走 StreamingNode,批量处理走 DataNode

**存算分离的核心优势**:

1. **独立扩展**:计算节点和存储可以独立扩缩容
2. **资源优化**:计算节点无状态,可按需弹性伸缩
3. **高可用**:存储层由专业组件保障,计算节点故障可快速恢复
4. **成本优化**:冷数据存对象存储,热数据存内存

### 10.2 访问层:Proxy 节点

Proxy 是 Milvus 的无状态网关,是客户端请求的唯一入口:

**核心职责**:

1. **请求路由**:接收客户端请求,路由到正确的协调器
2. **参数校验**:校验请求参数的合法性
3. **结果聚合**:MPP 架构下,聚合多个 QueryNode 的中间结果
4. **限流熔断**:保护后端服务不被过载请求打垮

客户端请求流程:
Client → Load Balancer → Proxy

├── 参数校验
├── 请求路由
├── 转发到 QueryNode/DataNode
├── 收集各节点结果
└── 聚合排序返回客户端


**运维要点**:
- Proxy 无状态,可随意水平扩展
- 生产环境建议至少部署 2 个 Proxy + 负载均衡器
- 通过 Nginx 或 K8s Ingress 实现负载均衡

### 10.3 协调层:Coord 组件

协调层是 Milvus 的"大脑",负责元数据管理和任务调度:

#### RootCoord:元数据管理中枢

- **DDL 执行**:处理 Collection、Partition、Index 的创建/删除
- **全局时间戳分配**:通过 TSO(Timestamp Oracle)分配全局单调递增时间戳
- **元数据管理**:维护所有 Collection 的 Schema、索引定义等元信息

#### DataCoord:数据调度中枢

- **段生命周期管理**:管理 Segment 从 Growing → Sealed → Flushed 的状态转换
- **数据节点调度**:分配写入通道(Channel)给 DataNode
- **Compaction 调度**:触发和监控段合并任务
- **垃圾回收**:清理过期的段文件和删除标记

#### QueryCoord:查询调度中枢

- **段加载调度**:决定哪些 Segment 加载到哪些 QueryNode
- **负载均衡**:在 QueryNode 之间均衡段分布
- **副本管理**:管理段的多个副本,保证高可用

#### IndexCoord:索引调度中枢

- **索引任务调度**:将索引构建任务分配给 IndexNode
- **索引版本管理**:管理索引的版本和状态

### 10.4 计算层:Node 节点

#### DataNode:数据写入处理

数据写入流程:
Pulsar/Kafka → DataNode 消费消息

├── 写入 Growing Segment(内存缓冲区)
├── 当段大小达到阈值 → Flush 到对象存储
└── 通知 DataCoord 段状态变更


- 每个 DataNode 消费一个或多个写入通道(Channel)
- Growing Segment 在内存中累积,达到阈值后 Flush
- Flush 后的 Segment 变为 Sealed,触发索引构建

#### QueryNode:查询执行引擎

查询执行流程:
Proxy → QueryNode

├── 加载 Sealed Segment 到内存
├── 在 Growing + Sealed Segment 上执行搜索
├── 应用 Delete Log 过滤已删除数据
├── 应用标量过滤条件
└── 返回 Top-K 中间结果给 Proxy


- QueryNode 是内存密集型节点,负责向量查询和标量过滤
- 支持多副本:同一 Segment 可加载到多个 QueryNode
- 分层存储:冷数据按需从磁盘加载

#### IndexNode:索引构建执行器

- 接收 IndexCoord 分配的索引构建任务
- 从对象存储读取 Sealed Segment 数据
- 构建索引文件并写回对象存储
- 通知 IndexCoord 构建完成

### 10.5 存储层:核心依赖组件

#### 元数据存储:etcd

etcd 存储内容:
├── Collection 元数据(Schema、索引定义、配置)
├── Segment 元数据(状态、大小、行数)
├── 索引元数据(类型、参数、构建状态)
├── Checkpoint 信息(消息消费进度)
└── 全局时间戳信息


**运维要点**:
- etcd 是有状态组件,建议 3 节点集群部署
- 定期备份数据,etcd 数据丢失会导致集群不可用
- 控制 etcd 数据量,Milvus 会自动压缩历史修订版

#### 消息存储:Woodpecker / Pulsar / Kafka

**Woodpecker(2.6 新增,推荐)**:

Woodpecker 是 Milvus 2.6 自研的云原生 WAL 引擎,采用 Zero-Disk 零磁盘架构,可直接替代 Kafka/Pulsar:

Woodpecker 核心设计:
┌─────────────────────────────────────────┐
│ Zero-Disk 架构 │
│ ├── 所有日志数据存储在对象存储(S3/GCS) │
│ ├── 元数据由 etcd 管理 │
│ ├── 无本地磁盘依赖 │
│ └── 完美对齐云原生原则 │
└─────────────────────────────────────────┘

部署模式:

  1. MemoryBuffer 模式(轻量级)

    • 客户端内存缓冲写入,定期 Flush 到对象存储
    • 无需独立部署,维护成本为零
    • 适合中小规模场景
  2. Independent 部署模式(高性能)

    • 独立 Woodpecker 服务
    • 更高写入吞吐
    • 适合大规模生产环境

**Woodpecker vs Pulsar vs Kafka**:

| 维度 | Woodpecker | Pulsar | Kafka |
|------|-----------|--------|-------|
| 部署复杂度 | 极低 | 高 | 中 |
| 本地磁盘 | 不需要 | 需要 | 需要 |
| 运维成本 | 极低 | 高 | 中 |
| 写入性能 | 高 | 高 | 高 |
| 云原生 | 原生 | 部分 | 部分 |
| 2.6 推荐度 | ★★★★★ | ★★★ | ★★★ |

**Pulsar vs Kafka 选择(如不使用 Woodpecker)**:

| 维度 | Pulsar | Kafka |
|------|--------|-------|
| 部署复杂度 | 较高 | 中等 |
| 消息保序 | 支持 | 支持 |
| 多租户 | 原生支持 | 需要额外配置 |
| 社区支持 | Milvus 默认 | 2.5+ 支持 |
| 适用场景 | 大规模生产 | 中等规模 |

#### 对象存储:MinIO / S3

对象存储文件结构:
milvus-bucket/
├── insert_log/ # 插入数据日志
│ └── {collection_id}/
│ └── {partition_id}/
│ └── {segment_id}/
│ ├── {field_id}_1.binlog # 字段数据
│ └── {field_id}_2.binlog
├── delta_log/ # 删除标记日志
│ └── {segment_id}/
│ └── delete_log.binlog
├── index_files/ # 索引文件
│ └── {index_id}/
│ └── index_file.bin
└── stats_log/ # 统计信息


### 10.6 核心全链路流程

#### 数据写入全链路

**2.6 流批分离架构下的写入流程**:

实时写入路径(StreamingNode):

  1. 客户端调用 insert(data)
  2. Proxy 校验参数,分配主键(如果 auto_id)
  3. Proxy 将数据写入 Woodpecker/Pulsar 对应 Shard 的 Channel
  4. StreamingNode 消费 Channel 消息(2.6 新增)
  5. StreamingNode 即时构建 Growing Segment 索引,数据立即可查
  6. Growing Segment 达到阈值,Flush 到对象存储
  7. DataCoord 更新 Segment 状态为 Sealed
  8. IndexCoord 触发索引构建任务
  9. IndexNode 构建索引,写入对象存储
  10. QueryCoord 调度 QueryNode 加载新的 Sealed Segment

关键改进(2.6):

  • StreamingNode 实时消费并构建索引,写入后毫秒级可查
  • Woodpecker 替代 Kafka/Pulsar,减少运维复杂度
  • Storage v2 降低 IOPS 和内存占用

#### 数据查询全链路

  1. 客户端调用 search(vector, filter, top_k)
  2. Proxy 校验参数,解析过滤条件
  3. Proxy 将请求路由到所有持有相关 Segment 的 QueryNode
  4. QueryNode 在本地执行搜索:
    a. 在 Growing Segment 上暴力搜索
    b. 在 Sealed Segment 上使用索引搜索
    c. 应用 Delete Log 过滤
    d. 应用标量过滤条件
    e. 返回 Top-K 中间结果
  5. Proxy 聚合所有 QueryNode 的中间结果
  6. Proxy 全局排序,返回最终 Top-K 结果

#### Segment 生命周期详解

Growing → Sealed → Indexed → Loaded → Compacted

Growing: 正在写入,数据在内存
↓ 达到大小阈值或手动 Flush
Sealed: 已封存,数据持久化到对象存储
↓ IndexNode 构建索引
Indexed: 索引构建完成,可被查询
↓ QueryNode 加载到内存
Loaded: 已加载到 QueryNode 内存,可被查询
↓ Compaction 合并
Compacted: 已合并,旧 Segment 标记为待清理


---

## 第 11 章 生产环境部署

### 11.1 部署方案选型

| 方案 | 适用场景 | 数据规模 | 运维复杂度 |
|------|---------|---------|----------|
| Docker Compose 单机 | 开发测试 | < 500万 | 低 |
| 分布式物理机 | 私有化部署 | 千万~亿级 | 高 |
| Kubernetes 集群 | 生产环境(推荐) | 百万~十亿级 | 中 |
| Zilliz Cloud | 快速上线 | 任意规模 | 极低 |

### 11.2 Kubernetes 集群部署详解

#### Helm Chart 部署与配置

```bash
# 生产级部署命令
helm install milvus milvus/milvus \
  --namespace milvus \
  -f values-prod.yaml \
  --timeout 10m

values-prod.yaml 完整配置

# 集群模式
cluster:
  enabled: true

# Proxy 配置
proxy:
  replicas: 2
  service:
    type: LoadBalancer
  resources:
    requests: { cpu: "2", memory: "4Gi" }
    limits: { cpu: "4", memory: "8Gi" }
  affinity:
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 100
          podAffinityTerm:
            labelSelector:
              matchLabels:
                app: milvus-proxy
            topologyKey: kubernetes.io/hostname

# StreamingNode 配置(2.6 新增,流式处理节点)
streamingNode:
  replicas: 2
  resources:
    requests: { cpu: "2", memory: "8Gi" }
    limits: { cpu: "4", memory: "16Gi" }

# QueryNode 配置(内存密集型)
queryNode:
  replicas: 3
  resources:
    requests: { cpu: "4", memory: "16Gi" }
    limits: { cpu: "8", memory: "32Gi" }
  nodeSelector:
    node-type: memory-optimized
  tolerations:
    - key: "memory-optimized"
      operator: "Equal"
      value: "true"
      effect: "NoSchedule"

# DataNode 配置
dataNode:
  replicas: 2
  resources:
    requests: { cpu: "2", memory: "8Gi" }
    limits: { cpu: "4", memory: "16Gi" }

# IndexNode 配置(CPU密集型)
indexNode:
  replicas: 2
  resources:
    requests: { cpu: "4", memory: "16Gi" }
    limits: { cpu: "8", memory: "32Gi" }
  nodeSelector:
    node-type: cpu-optimized

# etcd 高可用
etcd:
  replicas: 3
  persistence:
    enabled: true
    storageClass: local-ssd
    size: 10Gi

# MinIO 分布式
minio:
  mode: distributed
  replicas: 4
  persistence:
    enabled: true
    storageClass: local-ssd
    size: 500Gi

# Woodpecker 配置(2.6 新增,替代 Pulsar/Kafka)
woodpecker:
  enabled: true
  # MemoryBuffer 模式:轻量级,无需独立部署
  # Independent 模式:高性能,需要独立部署
  mode: memorybuffer

# 如果仍需 Pulsar(与 Woodpecker 二选一)
pulsar:
  enabled: false

# Milvus 配置
extraConfigFiles:
  user.yaml: |
    common:
      security:
        authorizationEnabled: true
    queryNode:
      lazyLoadEnabled: true
      loadingMemoryPortion: 0.5
    # 2.6 新增:冷热分层存储配置
    tieredStorage:
      enabled: true
      hotDataTTL: 7d       # 热数据保留7天
      warmDataTTL: 30d     # 温数据保留30天

存储类配置

# 本地 SSD StorageClass(推荐用于 etcd 和热数据)
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: local-ssd
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer
reclaimPolicy: Retain

11.3 集群高可用设计

所有组件的多副本部署

组件 最小副本数 推荐副本数 故障影响
Proxy 2 2~4 请求路由不受影响
RootCoord 1 2 主备切换
DataCoord 1 2 主备切换
QueryCoord 1 2 主备切换
IndexCoord 1 2 主备切换
StreamingNode 1 2~4 写入通道自动迁移(2.6新增)
DataNode 1 2~4 写入通道自动迁移
QueryNode 2 3~6 查询副本自动切换
IndexNode 1 2 索引任务自动重分配
etcd 3 3~5 Raft 协议保证
Woodpecker 1 2~3 日志服务自动恢复(2.6新增)
MinIO 4 4~8 纠删码保证

单节点故障自动转移机制

QueryNode 故障恢复流程:
1. QueryCoord 检测到 QueryNode 心跳超时
2. QueryCoord 将该节点上的 Segment 标记为未加载
3. QueryCoord 调度其他 QueryNode 加载这些 Segment
4. 新的查询请求路由到健康的 QueryNode
5. 整个过程约 30~60 秒

11.4 网络与安全加固

# NetworkPolicy 配置
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: milvus-network-policy
  namespace: milvus
spec:
  podSelector:
    matchLabels:
      app: milvus
  policyTypes:
    - Ingress
    - Egress
  ingress:
    - from:
        - namespaceSelector:
            matchLabels:
              name: production
      ports:
        - port: 19530
          protocol: TCP
  egress:
    - to:
        - podSelector:
            matchLabels:
              app: etcd
      ports:
        - port: 2379

11.5 容灾部署

同城双活架构

                    ┌─────────────────┐
                    │   全局负载均衡    │
                    └────────┬────────┘
                             │
                ┌────────────┼────────────┐
                │                         │
        ┌───────▼──────┐         ┌───────▼──────┐
        │  可用区 A     │         │  可用区 B     │
        │  Milvus集群   │         │  Milvus集群   │
        │  (主)        │         │  (从)        │
        │  Proxy ×2    │         │  Proxy ×2    │
        │  QueryNode×3 │         │  QueryNode×3 │
        │  DataNode×2  │         │  DataNode×2  │
        └───────┬──────┘         └───────┬──────┘
                │                         │
        ┌───────▼──────┐         ┌───────▼──────┐
        │  etcd ×3     │         │  etcd ×3     │
        │  MinIO ×4    │         │  MinIO ×4    │
        └──────────────┘         └──────────────┘
                │                         │
                └───────── 对象存储同步 ─────┘

RPO/RTO 指标

  • 同城双活:RPO < 1分钟,RTO < 5分钟
  • 异地灾备:RPO < 15分钟,RTO < 30分钟

第 12 章 集群运维与监控

12.1 可观测性体系搭建

Prometheus 指标采集配置

# Prometheus ServiceMonitor
apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  name: milvus-metrics
  namespace: milvus
spec:
  selector:
    matchLabels:
      app: milvus
  endpoints:
    - port: metrics
      interval: 15s
      path: /metrics

核心监控指标分类

集群健康指标

指标 说明 告警阈值
milvus_proxy_num_online 在线 Proxy 数量 < 2
milvus_querynode_num_online 在线 QueryNode 数量 < 配置副本数
milvus_datanode_num_online 在线 DataNode 数量 < 1
etcd_server_has_leader etcd 是否有 leader 0

写入指标

指标 说明 告警阈值
milvus_proxy_insert_vectors_count 写入向量数/秒 异常下降
milvus_proxy_insert_latency 写入延迟 P99 > 500ms
milvus_msgstream_consume_lag 消息积压量 > 100000

查询指标

指标 说明 告警阈值
milvus_proxy_search_vectors_count 查询 QPS 异常下降
milvus_proxy_search_latency 查询延迟 P99 > 100ms
milvus_querynode_search_latency QueryNode 搜索延迟 > 50ms

资源指标

指标 说明 告警阈值
milvus_querynode_memory_used QueryNode 内存使用 > 85%
milvus_querynode_cpu_usage QueryNode CPU 使用率 > 80%
milvus_indexnode_task_num 索引构建任务数 积压过多

Grafana 官方监控大盘

# 导入 Milvus 官方 Grafana Dashboard
# Dashboard ID: 17535 (Milvus Cluster Overview)
# Dashboard ID: 17536 (Milvus Query Node)
# Dashboard ID: 17537 (Milvus Data Node)

# 在 Grafana 中导入
# Configuration → Data Sources → Add Prometheus
# Dashboards → Import → 输入 Dashboard ID

12.2 日志管理

# 使用 Loki 采集 Milvus 日志
# Promtail 配置
scrape_configs:
  - job_name: milvus
    kubernetes_sd_configs:
      - role: pod
        namespaces:
          names: [milvus]
    relabel_configs:
      - source_labels: [__meta_kubernetes_pod_label_app]
        target_label: app
    pipeline_stages:
      - cri: {}
      - labels:
          level:
          component:

关键错误日志识别

日志关键字 含义 处理方式
OOM killed 内存溢出 增加内存或优化查询
segment not loaded 段未加载 检查 QueryNode 状态
msgstream consume failed 消息消费失败 检查 Pulsar/Kafka
etcd server timeout etcd 超时 检查 etcd 集群健康
index build failed 索引构建失败 检查 IndexNode 资源

12.3 集群扩缩容

# 扩容 QueryNode(在线扩容,无需停机)
kubectl scale deployment milvus-querynode --replicas=5 -n milvus

# QueryCoord 自动检测新节点并重新平衡段分布

# 缩容 QueryNode
kubectl scale deployment milvus-querynode --replicas=3 -n milvus
# QueryCoord 自动将段迁移到剩余节点

# 扩容 Proxy
kubectl scale deployment milvus-proxy --replicas=4 -n milvus

# 存储扩容(MinIO)
# 修改 PVC 大小
kubectl patch pvc data-milvus-minio-0 -p '{"spec":{"resources":{"requests":{"storage":"1Ti"}}}}' -n milvus

扩缩容注意事项

  • QueryNode 扩容后需要等待段加载完成才能接收查询
  • 缩容 QueryNode 前确保剩余节点内存充足
  • DataNode 扩容后 DataCoord 会自动重新分配 Channel

12.4 版本升级

滚动升级流程

# 1. 备份 etcd 数据
ETCDCTL_API=3 etcdctl snapshot save backup.db

# 2. 更新 Helm 仓库
helm repo update

# 3. 滚动升级
helm upgrade milvus milvus/milvus \
  --namespace milvus \
  -f values-prod.yaml \
  --version 2.6.18 \
  --timeout 15m

# 4. 验证升级
kubectl get pods -n milvus
kubectl logs -l app=milvus -n milvus --tail=100

2.5 → 2.6 升级特别注意事项

从 Milvus 2.5 升级到 2.6 是一次中版本升级,需要注意以下关键变更:

变更项 说明 处理方式
StreamingNode 2.6 新增组件 升级后自动部署,确保资源充足
Woodpecker 可选替代 Pulsar/Kafka 建议先保留 Pulsar,验证后再切换
Storage v2 新存储引擎 自动迁移,升级期间 IOPS 可能增加
IVF_RABITQ 新索引类型 需要手动重建索引才能使用
Path Index 新标量索引 需要手动创建,不影响现有索引
Nullable Vector 向量字段可空 新功能,不影响现有集合
Add Field 动态添加字段 新功能,不影响现有集合

升级风险评估

  • 小版本升级(2.6.17 → 2.6.18):低风险,仅 bug 修复
  • 中版本升级(2.5.x → 2.6.x):中风险,需要关注架构变更
  • 大版本升级(2.x → 3.x):高风险,需要全面测试

回滚方案

# Helm 回滚
helm rollback milvus -n milvus

# 如果 etcd 数据已变更,需要从备份恢复
ETCDCTL_API=3 etcdctl snapshot restore backup.db

12.5 常见故障排查

节点宕机与自动恢复验证

# 模拟 QueryNode 故障
kubectl delete pod milvus-querynode-xxxxx -n milvus

# 观察自动恢复
kubectl get pods -n milvus -w

# 检查段重新加载
kubectl logs -l app=milvus-querynode -n milvus | grep "load segment"

查询超时与慢查询定位

# 1. 检查 QueryNode 资源使用
kubectl top pods -n milvus -l app=milvus-querynode

# 2. 检查段加载状态
# 通过 Birdwatcher 查看
show segment-loaded

# 3. 检查查询延迟分布
# 通过 Prometheus 查询
histogram_quantile(0.99, rate(milvus_proxy_search_latency_bucket[5m]))

写入失败与消息积压排查

# 1. 检查 DataNode 状态
kubectl get pods -n milvus -l app=milvus-datanode

# 2. 检查消息积压
# Prometheus 查询
milvus_msgstream_consume_lag

# 3. 检查 Pulsar/Kafka 状态
kubectl get pods -n milvus -l app=pulsar-broker

内存 OOM 问题排查

OOM 排查步骤:
1. 检查 QueryNode 内存配置
   kubectl describe pod milvus-querynode-xxx -n milvus

2. 分析内存使用
   - 向量数据大小:N × dim × 4 bytes
   - HNSW 图结构:N × M × 2 × 8 bytes
   - 标量数据:根据字段类型估算
   - Growing Segment 缓冲区

3. 优化方案
   - 增加 QueryNode 内存
   - 使用 DiskANN 减少内存占用
   - 启用分层存储
   - 减少 replica_number
   - 使用量化压缩(SQ8/PQ)
```---

# 第五阶段:性能优化与大规模实践(Day 19-24)

## 第 13 章 性能优化实战

### 13.1 写入性能优化

#### 批次大小与并发数最优配比

写入性能测试结果(768维向量,Docker Compose 单机版):

┌──────────┬──────────┬──────────────┬──────────┐
│ 批次大小 │ 并发数 │ 吞吐(条/秒) │ P99延迟 │
├──────────┼──────────┼──────────────┼──────────┤
│ 100 │ 1 │ 2,000 │ 50ms │
│ 100 │ 4 │ 6,000 │ 80ms │
│ 1000 │ 1 │ 8,000 │ 130ms │
│ 1000 │ 4 │ 25,000 │ 200ms │
│ 5000 │ 1 │ 12,000 │ 420ms │
│ 5000 │ 4 │ 35,000 │ 600ms │
│ 10000 │ 1 │ 10,000 │ 1000ms │
│ 10000 │ 4 │ 30,000 │ 1500ms │
└──────────┴──────────┴──────────────┴──────────┘

最优配比:批次大小 1000~5000,并发数 4~8


#### 消息队列参数调优

```yaml
# Pulsar 调优参数
pulsar:
  broker:
    configData:
      managedLedgerDefaultEnsembleSize: "3"
      managedLedgerDefaultWriteQuorum: "3"
      managedLedgerDefaultAckQuorum: "2"
      dispatchThrottlingRatePerTopicInMsg: "50000"

DataNode 并发与刷盘策略

# Milvus DataNode 配置
dataNode:
  config:
    dataNode:
      flushInsertBufferSize: 16777216  # 16MB 缓冲区
      flushDeleteBufferSize: 4194304   # 4MB 删除缓冲区
      channelWorkPoolSize: 16          # 并发处理Channel数

13.2 查询性能优化

索引参数调优方法论

调优步骤

  1. 建立基准:使用 FLAT 索引测量 100% 召回率下的延迟
  2. 选择索引:根据数据规模和延迟要求选择索引类型
  3. 调优构建参数:先固定查询参数,调优构建参数(M、efConstruction、nlist)
  4. 调优查询参数:在构建参数确定后,调优查询参数(efSearch、nprobe)
  5. 验证召回率:确保召回率满足业务要求
# 召回率测试脚本
def measure_recall(client, collection_name, test_queries, ground_truth, k=10):
    """测量搜索召回率"""
    total_recall = 0
    for query, gt_ids in zip(test_queries, ground_truth):
        results = client.search(
            collection_name=collection_name,
            data=[query],
            limit=k,
            search_params={"metric_type": "COSINE", "params": {"ef_search": 128}}
        )
        result_ids = set(hit['id'] for hit in results[0])
        gt_set = set(gt_ids[:k])
        recall = len(result_ids & gt_set) / k
        total_recall += recall
    return total_recall / len(test_queries)

查询参数动态调优

# 根据业务场景动态调整 efSearch
def adaptive_search(client, collection_name, query_vector, target_latency_ms=10):
    """自适应搜索:在延迟约束下最大化召回率"""
    ef_search_values = [64, 128, 256, 512]

    for ef in ef_search_values:
        start = time.time()
        results = client.search(
            collection_name=collection_name,
            data=[query_vector],
            limit=10,
            search_params={"metric_type": "COSINE", "params": {"ef_search": ef}}
        )
        latency = (time.time() - start) * 1000

        if latency <= target_latency_ms:
            return results, ef, latency

    # 如果最低 ef_search 也超时,返回最后的结果
    return results, ef, latency

缓存机制与数据预热

# 数据预热:在低峰期提前加载集合
def warmup_collection(client, collection_name, sample_queries, num_warmup=100):
    """用样本查询预热缓存"""
    for i in range(num_warmup):
        query = sample_queries[i % len(sample_queries)]
        client.search(
            collection_name=collection_name,
            data=[query],
            limit=1  # 只需触发缓存加载
        )
    print(f"预热完成: {num_warmup} 次查询")

# 首次查询通常较慢(冷缓存),预热后延迟降低 50%+

高并发场景下的查询优化

# 使用连接池和异步查询
import asyncio
from pymilvus import AsyncMilvusClient

async def concurrent_search():
    """异步并发查询"""
    client = AsyncMilvusClient("http://localhost:19530")

    # 并发发送多个查询
    tasks = [
        client.search(
            collection_name="knowledge_base",
            data=[query_vectors[i]],
            limit=10
        )
        for i in range(100)
    ]

    results = await asyncio.gather(*tasks)
    return results

# 高并发优化要点:
# 1. 使用异步客户端(AsyncMilvusClient)
# 2. 增加 Proxy 副本数
# 3. 增加 QueryNode 副本数
# 4. 调整 Proxy 的 maxReceiveSize 和 maxResultSize

13.3 内存优化

索引内存占用精准估算

内存估算公式:

HNSW (FP32):
  总内存 = N × (dim × 4 + M × 2 × 8) bytes

HNSW (Int8, 2.6新增):
  总内存 = N × (dim × 1 + M × 2 × 8) bytes  → 内存减半

IVF_FLAT:
  总内存 = N × dim × 4 + nlist × dim × 4 bytes

IVF_RABITQ (2.6新增):
  总内存 = N × (dim / 8) + nlist × dim × 4 bytes  → 内存降72%
  + SQ8精排: N × dim × 1 bytes

DiskANN:
  内存 = N × PQ_code_size + N × graph_degree × 8 bytes
  (PQ_code_size 通常为 16~32 bytes)

示例:1000万条 768维向量
  HNSW (FP32, M=16): ~32 GB
  HNSW (Int8, M=16): ~16 GB (2.6, 内存减半)
  IVF_FLAT: ~29 GB
  IVF_RABITQ (1-bit): ~2.3 GB (2.6, 内存降72%)
  IVF_RABITQ + SQ8精排: ~9.5 GB (2.6, 召回率~95%)
  DiskANN: ~1.5 GB (内存) + SSD 存储

2.6 内存优化方案对比

方案 内存占用 召回率 查询速度 适用场景
HNSW (FP32) 100% 99%+ 极快 延迟敏感
HNSW (Int8) ~50% 98%+ 极快 延迟敏感+成本优化
IVF_RABITQ ~8% ~85% 成本敏感
IVF_RABITQ + SQ8 ~28% ~95% 均衡
DiskANN ~5% 95%+ 超大规模

QueryNode 内存配置最佳实践

# QueryNode 内存配置原则
queryNode:
  resources:
    requests:
      memory: "计算值 × 1.5"  # 留50%余量
    limits:
      memory: "计算值 × 2"    # 留100%余量

# 计算公式:
# 总内存 = 索引内存 + Growing段内存 + 标量数据内存 + 系统开销
# 系统开销 ≈ 2~4 GB

OOM 问题根因与规避方案

OOM 常见原因:
1. 加载的段总大小超过 QueryNode 内存
2. Growing Segment 积压过多
3. 查询中间结果占用大量内存
4. 标量数据未压缩

规避方案:
1. 启用分层存储,冷数据按需加载
2. 使用 DiskANN 减少内存占用
3. 使用量化压缩(SQ8/PQ/Float16/Int8/RaBitQ)
4. 使用 IVF_RABITQ 替代 IVF_FLAT/IVF_SQ8(2.6推荐)
5. 限制单次查询的 top_k 和返回字段
6. 增加 QueryNode 副本数分摊内存
7. 设置合理的内存水位告警(85%)

13.4 存储与索引优化

对象存储性能调优

# MinIO 性能优化
minio:
  config:
    MINIO_API_REQUESTS_MAX: 1600
    MINIO_API_REQUESTS_DEADLINE: "2m"
  persistence:
    storageClass: local-ssd  # 使用本地SSD

段合并策略优化

# Compaction 策略配置
extraConfigFiles:
  user.yaml: |
    dataCoord:
      compaction:
        enableAutoCompaction: true
        compactionInterval: 60        # 每60秒检查一次
        minSegmentToMerge: 3          # 最少3个段才触发合并
        maxSegmentToMerge: 30         # 最多合并30个段
        targetSegmentSize: 536870912  # 目标段大小512MB

13.5 混合检索性能优化

过滤下推优化

# 高选择性过滤:先过滤再搜索(Milvus 自动优化)
# 当过滤条件能排除 >90% 数据时,性能最优

# 优化前:低选择性过滤
results = client.search(
    collection_name="knowledge_base",
    data=[query_vector],
    filter='year > 2020',  # 只排除少量数据
    limit=10
)

# 优化后:增加更精确的过滤条件
results = client.search(
    collection_name="knowledge_base",
    data=[query_vector],
    filter='year > 2020 && category == "技术"',  # 更高选择性
    limit=10
)

复合过滤场景的索引设计

# 为高频过滤字段创建标量索引
index_params.add_index(field_name="category", index_type="INVERTED")
index_params.add_index(field_name="year", index_type="STL_SORT")

# 使用分区键加速等值过滤
schema.add_field(
    field_name="category",
    datatype=DataType.VARCHAR,
    max_length=64,
    is_partition_key=True  # 等值过滤自动分区裁剪
)

第 14 章 大规模向量系统架构设计

14.1 不同量级的方案选型

百万级向量:单机优化方案

架构:Docker Compose 单机版
硬件:16核 CPU, 64GB 内存, 1TB SSD
索引:HNSW (M=16, efConstruction=256)
预期性能:
  - 写入: ~10,000 条/秒
  - 查询: P99 < 5ms, QPS ~500
  - 内存: ~5 GB (768维, 100万条)

千万级向量:小规模集群方案

架构:K8s 3~5 节点集群
硬件:每节点 16核, 64GB 内存, 2TB NVMe
组件配置:
  - Proxy ×2
  - QueryNode ×3 (每节点32GB)
  - DataNode ×2
  - IndexNode ×1
  - etcd ×3
  - MinIO ×4
索引:HNSW (M=16, efConstruction=256)
预期性能:
  - 写入: ~50,000 条/秒
  - 查询: P99 < 10ms, QPS ~1000
  - 内存: ~50 GB (768维, 1000万条)

亿级向量:标准分布式集群方案

架构:K8s 10+ 节点集群
硬件:每节点 32核, 128GB 内存, 4TB NVMe
组件配置:
  - Proxy ×4
  - QueryNode ×6 (每节点64GB)
  - DataNode ×4
  - IndexNode ×2
  - etcd ×5
  - Pulsar Broker ×3
  - MinIO ×8
索引:DiskANN + HNSW(热数据)
分层存储:热数据内存,冷数据SSD
预期性能:
  - 写入: ~200,000 条/秒
  - 查询: P99 < 20ms, QPS ~2000
  - 内存: ~200 GB (768维, 1亿条, 含DiskANN压缩)

十亿级向量:超大规模分层方案

架构:多集群 + Zilliz Cloud
设计要点:
  1. 按业务分集群(如按地域、按业务线)
  2. 热数据集群:HNSW,内存存储
  3. 温数据集群:DiskANN,SSD存储
  4. 冷数据归档:对象存储
  5. 全局路由层:根据请求路由到对应集群
  6. 读写分离:写集群 + 多读副本

预期性能:
  - 写入: ~500,000 条/秒(多集群并行)
  - 查询: P99 < 50ms, QPS ~5000
  - 总内存: ~1 TB (10亿条, DiskANN压缩)

14.2 分片策略设计

哈希分片原理

Milvus 默认使用哈希分片,将主键哈希值映射到不同的 Shard(写入通道):

主键 → Hash(主键) % shards_num → Shard ID

优点:
  - 数据均匀分布
  - 写入负载均衡
  - 实现简单

局限:
  - 无法按业务逻辑分片
  - 跨分片查询需要聚合
  - 分片数量不可修改

分片数量规划原则

分片数量 = max(写入QPS / 单分片写入QPS, 1)

单分片写入QPS参考值:
  - Docker Compose: ~10,000 条/秒
  - K8s 集群: ~30,000 条/秒

示例:
  目标写入QPS = 100,000 条/秒
  单分片QPS = 30,000 条/秒
  分片数量 = ceil(100000/30000) = 4

14.3 读写分离架构

                    ┌──────────────┐
                    │  写入请求     │
                    └──────┬───────┘
                           │
                    ┌──────▼───────┐
                    │  Proxy (写)  │
                    └──────┬───────┘
                           │
              ┌────────────┼────────────┐
              │            │            │
        ┌─────▼────┐ ┌────▼─────┐ ┌───▼──────┐
        │ DataNode  │ │ DataNode │ │ DataNode │
        └─────┬────┘ └────┬─────┘ └───┬──────┘
              │            │            │
              └────────────┼────────────┘
                           │
                    对象存储 (MinIO/S3)
                           │
              ┌────────────┼────────────┐
              │            │            │
        ┌─────▼────┐ ┌────▼─────┐ ┌───▼──────┐
        │QueryNode │ │QueryNode │ │QueryNode │ ← 读副本
        │(主)      │ │(副本1)   │ │(副本2)   │
        └─────┬────┘ └────┬─────┘ └───┬──────┘
              │            │            │
              └────────────┼────────────┘
                           │
                    ┌──────▼───────┐
                    │  Proxy (读)  │
                    └──────┬───────┘
                           │
                    ┌──────▼───────┐
                    │  查询请求     │
                    └──────────────┘
# 配置读副本
client.load_collection(
    collection_name="knowledge_base",
    replica_number=3  # 3个读副本
)

# 查询自动路由到不同副本,实现读负载均衡

14.4 冷热分层架构

冷热数据划分标准:
- 热数据:最近7天访问过的数据,占总量约10~20%
- 温数据:7~30天未访问的数据,占总量约30%
- 冷数据:30天以上未访问的数据,占总量约50~60%

Milvus 分层存储实现:
┌─────────────────────────────────────┐
│         QueryNode 内存               │
│  ┌─────────────────────────────┐    │
│  │  热数据:HNSW 索引 + 原始向量 │    │
│  │  延迟: <5ms                 │    │
│  └─────────────────────────────┘    │
│  ┌─────────────────────────────┐    │
│  │  温数据:PQ 压缩向量         │    │
│  │  延迟: 5~10ms               │    │
│  └─────────────────────────────┘    │
└─────────────────────────────────────┘
┌─────────────────────────────────────┐
│         SSD 存储                     │
│  ┌─────────────────────────────┐    │
│  │  冷数据:DiskANN 索引        │    │
│  │  延迟: 10~20ms              │    │
│  └─────────────────────────────┘    │
└─────────────────────────────────────┘

成本收益:
  - 内存需求降低 80%
  - 查询延迟增加 2~5 倍
  - 总成本降低 60~70%

14.5 多地域部署

全球部署架构:

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│  亚太区域     │     │  欧洲区域     │     │  美洲区域     │
│  Milvus集群   │     │  Milvus集群   │     │  Milvus集群   │
│  (本地数据)   │     │  (本地数据)   │     │  (本地数据)   │
└──────┬───────┘     └──────┬───────┘     └──────┬───────┘
       │                    │                    │
       └────────────────────┼────────────────────┘
                            │
                   ┌────────▼────────┐
                   │  全局路由层       │
                   │  (就近访问)       │
                   └────────┬────────┘
                            │
                   ┌────────▼────────┐
                   │  对象存储同步     │
                   │  (跨地域复制)     │
                   └─────────────────┘

关键设计:
1. 用户请求就近路由到最近的 Milvus 集群
2. 各区域数据通过对象存储跨地域复制
3. 写入请求路由到主区域,异步同步到从区域
4. 读请求从就近区域返回,接受短暂不一致
```---

# 第六阶段:生态集成与项目实战(Day 25-28)

## 第 15 章 多语言客户端开发

### 15.1 Python 客户端深度使用

#### PyMilvus API 进阶用法

```python
from pymilvus import MilvusClient, DataType, AnnSearchRequest, WeightedRanker, RRFRanker

# 连接管理
client = MilvusClient(
    uri="http://localhost:19530",
    token="root:Milvus",
    db_name="default",       # 指定数据库
    timeout=10.0,            # 超时时间
    server_version="2.6.x"  # 服务端版本
)

# 数据库管理
client.create_database("production")
client.using_database("production")
client.list_databases()

# 批量操作优化
def bulk_insert_optimized(client, collection_name, data, batch_size=5000):
    """优化的批量插入"""
    for i in range(0, len(data), batch_size):
        batch = data[i:i+batch_size]
        try:
            result = client.insert(collection_name=collection_name, data=batch)
            yield result
        except Exception as e:
            print(f"批次 {i} 失败: {e}")
            # 重试逻辑
            result = client.insert(collection_name=collection_name, data=batch)
            yield result

异步客户端使用

import asyncio
from pymilvus import AsyncMilvusClient

async def async_search_demo():
    """异步查询示例"""
    client = AsyncMilvusClient("http://localhost:19530")

    # 并发查询
    queries = [query_vector_1, query_vector_2, query_vector_3]
    tasks = [
        client.search(
            collection_name="knowledge_base",
            data=[q],
            limit=10,
            output_fields=["text"]
        )
        for q in queries
    ]

    results = await asyncio.gather(*tasks, return_exceptions=True)

    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"查询 {i} 失败: {result}")
        else:
            print(f"查询 {i} 返回 {len(result[0])} 条结果")

    await client.close()

asyncio.run(async_search_demo())

连接池与重试机制

from pymilvus import connections, Collection
import tenacity

# 连接池配置
connections.connect(
    alias="default",
    host="localhost",
    port="19530",
    user="root",
    password="Milvus",
    pool_size=10,           # 连接池大小
    max_retry=3,            # 最大重试次数
    retry_on_failure=True   # 失败时重试
)

# 使用 tenacity 实现自定义重试
@tenacity.retry(
    stop=tenacity.stop_after_attempt(3),
    wait=tenacity.wait_exponential(multiplier=1, min=1, max=10),
    retry=tenacity.retry_if_exception_type(Exception)
)
def search_with_retry(client, collection_name, query_vector, **kwargs):
    return client.search(
        collection_name=collection_name,
        data=[query_vector],
        **kwargs
    )

15.2 Java / Go 客户端

Java 客户端生产级集成

// Maven 依赖
// <dependency>
//     <groupId>io.milvus</groupId>
//     <artifactId>milvus-sdk-java</artifactId>
//     <version>2.6.18</version>
// </dependency>

import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.service.vector.request.SearchReq;
import io.milvus.v2.service.vector.response.SearchResp;

// 连接配置
ConnectConfig config = ConnectConfig.builder()
    .uri("http://localhost:19530")
    .token("root:Milvus")
    .build();

MilvusClientV2 client = new MilvusClientV2(config);

// 向量搜索
List<Float> queryVector = Arrays.asList(0.1f, 0.2f, ...);
SearchReq searchReq = SearchReq.builder()
    .collectionName("knowledge_base")
    .data(Collections.singletonList(queryVector))
    .topK(10)
    .filter("category == '技术'")
    .outputFields(Arrays.asList("text", "category"))
    .build();

SearchResp searchResp = client.search(searchReq);

Go 客户端

// go get github.com/milvus-io/milvus-sdk-go/v2

import (
    "context"
    "github.com/milvus-io/milvus-sdk-go/v2/client"
    "github.com/milvus-io/milvus-sdk-go/v2/entity"
)

// 连接
c, err := client.NewClient(context.Background(), client.Config{
    Address: "localhost:19530",
})
if err != nil {
    log.Fatal(err)
}

// 搜索
results, err := c.Search(
    context.Background(),
    "knowledge_base",
    []string{},
    "category == '技术'",
    []string{"text", "category"},
    []entity.Vector{entity.FloatVector(queryVector)},
    "embedding",
    entity.COSINE,
    10,
    map[string]interface{}{"ef_search": 128},
)

15.3 Node.js 与 REST API

// npm install @zilliz/milvus2-sdk-node

const { MilvusClient } = require('@zilliz/milvus2-sdk-node');

const client = new MilvusClient({
    address: 'localhost:19530',
    token: 'root:Milvus'
});

// 搜索
const results = await client.search({
    collection_name: 'knowledge_base',
    data: [queryVector],
    limit: 10,
    output_fields: ['text', 'category'],
    filter: 'category == "技术"'
});

第 16 章 AI 生态与 RAG 集成

16.1 LangChain 集成

Milvus 向量存储配置

from langchain_milvus import Milvus
from langchain_openai import OpenAIEmbeddings

# 创建 Milvus 向量存储
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

vector_store = Milvus(
    embedding_function=embeddings,
    connection_args={
        "uri": "http://localhost:19530",
    },
    collection_name="langchain_docs",
    auto_id=True,
)

# 添加文档
from langchain_core.documents import Document

docs = [
    Document(page_content="Milvus是开源向量数据库", metadata={"source": "wiki"}),
    Document(page_content="RAG系统结合检索与生成", metadata={"source": "blog"}),
]
vector_store.add_documents(docs)

# 相似度搜索
results = vector_store.similarity_search(
    query="向量数据库",
    k=5,
    expr='source == "wiki"'  # Milvus 过滤表达式
)

混合检索管道搭建

from langchain_milvus import Milvus
from langchain_openai import OpenAIEmbeddings
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

# 创建向量存储(支持稠密+稀疏混合检索)
vector_store = Milvus(
    embedding_function=embeddings,
    connection_args={"uri": "http://localhost:19530"},
    collection_name="hybrid_rag",
)

# RAG 链
template = """基于以下检索到的文档内容回答问题:

{context}

问题:{question}

请基于文档内容回答,如果文档中没有相关信息,请说明。"""

prompt = ChatPromptTemplate.from_template(template)
llm = ChatOpenAI(model="gpt-4o")

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

rag_chain = (
    {"context": vector_store.as_retriever(search_kwargs={"k": 5}) | format_docs,
     "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

# 执行 RAG
answer = rag_chain.invoke("什么是向量数据库?")

16.2 LlamaIndex 集成

from llama_index.vector_stores.milvus import MilvusVectorStore
from llama_index.core import StorageContext, VectorStoreIndex, SimpleDirectoryReader

# 创建 Milvus 向量存储
vector_store = MilvusVectorStore(
    uri="http://localhost:19530",
    collection_name="llamaindex_docs",
    dim=1536,
    overwrite=False,
)

# 加载文档
documents = SimpleDirectoryReader("./docs").load_data()

# 构建索引
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
)

# 查询
query_engine = index.as_query_engine(similarity_top_k=5)
response = query_engine.query("向量数据库的核心优势是什么?")
print(response)

16.3 其他生态集成

Dify 低代码平台对接

Dify 配置 Milvus 步骤:
1. 进入 Dify 管理后台 → 知识库 → 数据源
2. 选择 "Milvus" 作为向量存储
3. 配置连接参数:
   - Host: milvus-service.milvus.svc.cluster.local
   - Port: 19530
   - Database: default
   - 用户名/密码(如果启用认证)
4. 创建知识库时选择 Milvus 作为存储后端
5. 上传文档,Dify 自动分块、向量化、存入 Milvus

16.4 RAG 系统最佳实践

文档分块 + 向量化流水线设计

from langchain_text_splitters import RecursiveCharacterTextSplitter

def build_rag_pipeline(docs_path, milvus_uri, embedding_model):
    """构建完整的 RAG 数据处理流水线"""

    # 1. 文档加载
    from langchain_community.document_loaders import DirectoryLoader
    loader = DirectoryLoader(docs_path, glob="**/*.md")
    documents = loader.load()

    # 2. 文档分块
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,       # 每块500字符
        chunk_overlap=50,     # 块间重叠50字符
        separators=["\n## ", "\n### ", "\n\n", "\n", "。", "!", "?"]
    )
    chunks = text_splitter.split_documents(documents)

    # 3. 向量化并存储
    vector_store = Milvus(
        embedding_function=embedding_model,
        connection_args={"uri": milvus_uri},
        collection_name="rag_knowledge_base",
    )
    vector_store.add_documents(chunks)

    return vector_store, len(chunks)

检索效果评估

def evaluate_retrieval(vector_store, test_queries, ground_truth, k=10):
    """评估检索效果"""
    recall_scores = []
    mrr_scores = []

    for query, relevant_ids in zip(test_queries, ground_truth):
        results = vector_store.similarity_search(query, k=k)
        result_ids = [doc.metadata.get("doc_id") for doc in results]

        # Recall@K
        relevant_set = set(relevant_ids)
        result_set = set(result_ids[:k])
        recall = len(relevant_set & result_set) / len(relevant_set) if relevant_set else 0
        recall_scores.append(recall)

        # MRR (Mean Reciprocal Rank)
        mrr = 0
        for i, rid in enumerate(result_ids):
            if rid in relevant_set:
                mrr = 1 / (i + 1)
                break
        mrr_scores.append(mrr)

    return {
        "Recall@K": sum(recall_scores) / len(recall_scores),
        "MRR": sum(mrr_scores) / len(mrr_scores),
    }

第 17 章 实战项目

项目一:企业级知识库 RAG 系统

需求分析与架构设计

需求:
- 支持百万级文档的语义检索
- 支持关键词+语义混合检索
- 支持权限过滤(部门级隔离)
- 支持 RAG 问答
- 查询延迟 P99 < 200ms

架构设计:
┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│  前端UI   │ →  │  API网关  │ →  │ RAG服务   │ →  │  LLM API │
│          │    │  (Nginx) │    │ (Python) │    │ (GPT-4o) │
└──────────┘    └──────────┘    └────┬─────┘    └──────────┘
                                     │
                              ┌──────▼──────┐
                              │   Milvus     │
                              │  (K8s集群)   │
                              └─────────────┘

向量库 Schema 与索引设计

from pymilvus import MilvusClient, DataType, Function, FunctionType

schema = MilvusClient.create_schema(auto_id=True, enable_dynamic_field=True)

schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="doc_id", datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name="chunk_text", datatype=DataType.VARCHAR, max_length=4096)
schema.add_field(field_name="department", datatype=DataType.VARCHAR, max_length=64,
                 is_partition_key=True)  # 按部门分区
schema.add_field(field_name="source", datatype=DataType.VARCHAR, max_length=256)
schema.add_field(field_name="created_at", datatype=DataType.INT64)
schema.add_field(field_name="dense_vector", datatype=DataType.FLOAT_VECTOR, dim=1024)
schema.add_field(field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR)

# BM25 函数
schema.add_function(Function(
    name="bm25_func",
    input_field_names=["chunk_text"],
    output_field_names=["sparse_vector"],
    function_type=FunctionType.BM25,
))

# 索引
index_params = client.prepare_index_params()
index_params.add_index(field_name="dense_vector", index_type="HNSW",
                       metric_type="COSINE", params={"M": 16, "efConstruction": 256})
index_params.add_index(field_name="sparse_vector", index_type="SPARSE_INVERTED_INDEX",
                       metric_type="BM25")
index_params.add_index(field_name="source", index_type="INVERTED")

client.create_collection("enterprise_kb", schema=schema, index_params=index_params,
                         num_partitions=64)

权限过滤与检索问答完整实现

def search_with_permission(client, query_text, user_department, top_k=10):
    """带权限过滤的混合检索"""
    dense_vector = dense_model.encode(query_text).tolist()

    # 稠密向量搜索
    req_dense = AnnSearchRequest(
        data=[dense_vector],
        anns_field="dense_vector",
        param={"metric_type": "COSINE", "params": {"ef_search": 128}},
        limit=top_k * 2,
        expr=f'department == "{user_department}"'  # 权限过滤
    )

    # BM25 全文搜索
    req_sparse = AnnSearchRequest(
        data=[query_text],
        anns_field="sparse_vector",
        param={"metric_type": "BM25"},
        limit=top_k * 2,
        expr=f'department == "{user_department}"'
    )

    results = client.hybrid_search(
        collection_name="enterprise_kb",
        reqs=[req_dense, req_sparse],
        ranker=RRFRanker(k=60),
        limit=top_k,
        output_fields=["chunk_text", "doc_id", "source", "department"]
    )
    return results

项目二:亿级图像相似度检索系统

集群规模规划与分片设计

数据规模:1亿张图片,512维 CLIP 向量
存储估算:
  - 向量数据: 1亿 × 512 × 4 = 200 GB
  - HNSW 索引: ~300 GB
  - 总内存需求: ~500 GB

集群规划:
  - QueryNode ×8 (每节点 64GB 内存)
  - DataNode ×4
  - IndexNode ×2
  - Proxy ×4
  - 分片数: 8
  - 索引: HNSW (M=32, efConstruction=512)

检索性能优化与压测

import asyncio
import time
from pymilvus import AsyncMilvusClient

async def benchmark_search(concurrency=100, total_queries=10000):
    """搜索性能压测"""
    client = AsyncMilvusClient("http://localhost:19530")

    sem = asyncio.Semaphore(concurrency)
    latencies = []

    async def single_query(query_vector):
        async with sem:
            start = time.time()
            await client.search(
                collection_name="image_search",
                data=[query_vector],
                limit=20,
                output_fields=["image_path"]
            )
            latencies.append((time.time() - start) * 1000)

    # 生成测试查询向量
    query_vectors = [np.random.randn(512).tolist() for _ in range(total_queries)]

    start = time.time()
    tasks = [single_query(qv) for qv in query_vectors]
    await asyncio.gather(*tasks)
    total_time = time.time() - start

    latencies.sort()
    print(f"QPS: {total_queries / total_time:.1f}")
    print(f"P50: {latencies[len(latencies)//2]:.1f}ms")
    print(f"P99: {latencies[int(len(latencies)*0.99)]:.1f}ms")

项目三:个性化推荐召回系统

# 用户/物品向量设计
schema = MilvusClient.create_schema()
schema.add_field(field_name="item_id", datatype=DataType.VARCHAR, max_length=64, is_primary=True)
schema.add_field(field_name="item_type", datatype=DataType.VARCHAR, max_length=32,
                 is_partition_key=True)
schema.add_field(field_name="category", datatype=DataType.VARCHAR, max_length=64)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=256)
schema.add_field(field_name="created_at", datatype=DataType.INT64)

# 实时向量更新方案
def update_item_vector(client, item_id, new_embedding):
    """更新物品向量(使用 upsert)"""
    client.upsert(
        collection_name="recommendation_items",
        data=[{
            "item_id": item_id,
            "embedding": new_embedding,
        }]
    )

# 召回查询
def recall_items(client, user_vector, category=None, top_k=200):
    """个性化召回"""
    filter_expr = ''
    if category:
        filter_expr = f'category == "{category}"'

    results = client.search(
        collection_name="recommendation_items",
        data=[user_vector],
        limit=top_k,
        filter=filter_expr,
        output_fields=["item_id", "item_type", "category"]
    )
    return results

项目四:多租户 SaaS 向量检索平台

# 多租户隔离方案:分区键隔离
schema = MilvusClient.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True, auto_id=True)
schema.add_field(field_name="tenant_id", datatype=DataType.VARCHAR, max_length=64,
                 is_partition_key=True)  # 租户ID作为分区键
schema.add_field(field_name="content", datatype=DataType.VARCHAR, max_length=4096)
schema.add_field(field_name="embedding", datatype=DataType.FLOAT_VECTOR, dim=768)

# 查询时自动分区裁剪
def tenant_search(client, tenant_id, query_vector, top_k=10):
    """租户隔离的向量检索"""
    results = client.search(
        collection_name="saas_platform",
        data=[query_vector],
        limit=top_k,
        filter=f'tenant_id == "{tenant_id}"',  # 自动分区裁剪
        output_fields=["content"]
    )
    return results

# 资源配额设计
# 通过 Resource Group 实现租户资源隔离
def setup_tenant_resources(client, tenant_id, node_num=1):
    """为租户分配计算资源"""
    rg_name = f"rg_{tenant_id}"
    client.create_resource_group(rg_name, config={"node_num": node_num})
    return rg_name

第七阶段:底层精通与前沿(Day 29-30)

第 18 章 底层原理与源码导读

18.1 存储引擎原理

Segment 文件内部结构

Segment 目录结构:
segment_{id}/
├── insert_log/
│   ├── {field_id}_1.binlog    # 字段数据(列式存储)
│   ├── {field_id}_2.binlog
│   └── ...
├── delta_log/
│   └── {field_id}_delete.binlog  # 删除标记
├── stats_log/
│   └── {field_id}_stats.binlog   # 统计信息(min/max/null_count)
└── index_files/
    └── {index_id}/
        └── index_file.bin         # 索引文件

Binlog 格式:
┌──────────┬──────────┬──────────┬──────────┐
│ Header   │ Event 1  │ Event 2  │ ...      │
│ (魔数+版本) │ (数据块1) │ (数据块2) │          │
└──────────┴──────────┴──────────┴──────────┘

向量数据存储格式

浮点向量存储(列式):
  每个向量字段一个 Binlog 文件
  数据按行存储,每行 dim × 4 bytes
  支持批量读取,SIMD 友好

二值向量存储:
  每行 dim / 8 bytes
  位运算加速汉明距离计算

稀疏向量存储:
  CSR 格式(Compressed Sparse Row)
  只存储非零元素的位置和值

18.2 ANN 算法深度

HNSW 分层导航小世界图数学原理

HNSW 的核心思想是构建一个多层概率图,每层的节点以指数递减的概率出现:

层级分配概率:P(level = l) = (1/M) ^ l

层级0:所有节点(100%)
层级1:约 1/M 的节点
层级2:约 1/M² 的节点
层级3:约 1/M³ 的节点
...

查询算法:
1. 从最高层的入口点开始
2. 在当前层贪心搜索最近邻
3. 找到当前层最近邻后,跳到下一层
4. 重复直到层级0
5. 在层级0执行精确的 beam search

插入算法:
1. 随机分配层级 l(按上述概率)
2. 从最高层到 l+1 层:贪心搜索最近邻
3. 从 l 层到 0 层:搜索 efConstruction 个最近邻
4. 在每层选择最近的 M 个邻居建立连接
5. 如果邻居的连接数超过 Mmax,执行启发式剪枝

PQ 乘积量化压缩原理

乘积量化(Product Quantization)步骤:

1. 将 d 维向量拆分为 m 个子向量
   原始: [x₁, x₂, ..., x_d] → [x₁...x_d/m], [x_d/m+1...x_2d/m], ..., [x_(m-1)d/m+1...x_d]

2. 对每个子空间独立进行 K-means 聚类(K=2^nbits)
   每个子空间生成 256 个聚类中心(nbits=8)

3. 编码:每个子向量用最近的聚类中心 ID 表示
   原始子向量 → 1 byte 聚类中心 ID

4. 存储编码:m 个 byte 表示一个向量
   压缩比: d × 4 / m = 768 × 4 / 16 = 192 倍

5. 距离计算(非对称距离):
   查询向量与所有聚类中心预计算距离表
   查表 + 求和得到近似距离
   时间复杂度: O(m × K + m) per query

DiskANN 算法核心思想

DiskANN 关键创新:

1. 图结构存储在内存(PQ 压缩向量 + 邻居列表)
2. 原始向量存储在 SSD
3. 查询时:
   a. 在内存中用 PQ 向量做粗排
   b. 从 SSD 读取候选向量的原始数据
   c. 用原始向量重新计算距离
   d. 返回精确结果

4. Vamana 图构建算法:
   a. 随机初始化图
   b. 迭代优化:对每个节点,搜索其最近邻,建立更优连接
   c. 剪枝:限制每个节点的最大连接数

5. 内存占用:
   PQ 编码: m bytes/条 (通常 16~32 bytes)
   邻居列表: R × 4 bytes/条 (R 通常 32~64)
   总计: ~80~300 bytes/条 (vs 原始 3072 bytes)

18.3 分布式调度原理

负载均衡策略

QueryCoord 负载均衡策略:

1. 基于段分布的均衡
   - 计算每个 QueryNode 的段数量和内存占用
   - 将段从负载高的节点迁移到负载低的节点
   - 迁移过程中保证至少一个副本可用

2. 基于副本的均衡
   - 同一段的多个副本分布在不同节点
   - 节点故障时自动切换到其他副本

3. 均衡触发条件
   - 新节点加入集群
   - 节点故障恢复
   - 段数量变化
   - 手动触发

故障检测与转移机制

心跳检测:
  - 每个节点每 3 秒发送心跳
  - Coord 连续 15 秒未收到心跳 → 标记节点为 Offline
  - 触发故障转移流程

故障转移:
  1. QueryCoord:将 Offline 节点的段重新分配
  2. DataCoord:将 Offline 节点的 Channel 重新分配
  3. IndexCoord:将 Offline 节点的索引任务重新分配
  4. 自动恢复:节点重新上线后,Coord 自动分配任务

18.4 源码结构与扩展

项目代码结构总览

milvus/
├── cmd/                    # 入口程序
│   ├── milvus/            # 主程序
│   └── components/        # 各组件入口
├── internal/               # 核心实现
│   ├── proxy/             # Proxy 节点
│   ├── rootcoord/         # RootCoord
│   ├── datacoord/         # DataCoord
│   ├── querycoord/        # QueryCoord
│   ├── indexcoord/        # IndexCoord
│   ├── datanode/          # DataNode
│   ├── querynode/         # QueryNode
│   ├── indexnode/         # IndexNode
│   ├── storage/           # 存储层
│   ├── msgstream/         # 消息流
│   └── core/              # C++ 核心引擎(SegCore)
├── pkg/                    # 公共包
│   ├── common/            # 公共定义
│   ├── util/              # 工具函数
│   └── proto/             # Protobuf 定义
├── deployments/            # 部署配置
│   ├── docker/            # Docker 配置
│   └── helm/              # Helm Chart
└── tests/                  # 测试

第 19 章 前沿技术与选型对比

19.1 向量数据库前沿技术

向量量化技术演进

量化技术演进路线:
SQ8 (2019) → PQ (2020) → OPQ (2021) → Residual PQ (2022) → LSQ (2023)

未来方向:
1. 学习型量化:用神经网络学习最优量化方案
2. 混合精度量化:不同维度使用不同精度
3. 自适应量化:根据数据分布自动选择量化策略
4. 量化感知训练:在模型训练时考虑量化损失

检索增强新范式

1. Agentic RAG:AI Agent 自主决定检索策略
2. Multi-modal RAG:文本+图像+音频多模态检索
3. Graph RAG:结合知识图谱的检索增强
4. Adaptive RAG:根据查询复杂度自适应选择检索策略
5. Streaming RAG:流式检索与生成

19.2 Milvus 未来路线图

Milvus 2.6 已实现 ✅ / 未来规划 🔮:

1. Serverless 架构 🔮
   - 按查询付费,零成本待机
   - 自动弹性伸缩
   - 冷启动优化

2. 原生多模态能力增强 ✅+🔮
   - ✅ Geometry 地理空间字段(2.6)
   - ✅ Struct Array 多向量实体(2.6)
   - 🔮 内置图像/音频/视频嵌入
   - 🔮 跨模态检索原生支持

3. 湖仓一体集成 🔮
   - 与 Iceberg/Hudi/Delta Lake 集成
   - 向量数据湖方案
   - 统一元数据管理

4. 更强的混合检索 ✅+🔮
   - ✅ 向量+全文+结构化联合检索(2.6 BM25增强)
   - ✅ 多语言 BM25 分析器(2.6)
   - ✅ Path Index JSON过滤(2.6)
   - 🔮 学习型排序
   - 🔮 个性化检索

5. 成本优化 ✅(2.6 重大突破)
   - ✅ RaBitQ 1-bit 量化(内存降72%)
   - ✅ 冷热分层存储(成本降50%)
   - ✅ Woodpecker 零磁盘WAL(运维降本)
   - ✅ Int8 向量压缩(内存减半)
   - ✅ CAGRA+Vamana 混合索引(GPU成本优化)

6. 架构演进 ✅+🔮
   - ✅ StreamingNode 流批分离(2.6)
   - ✅ Storage v2 存储引擎(2.6)
   - ✅ 10万+集合多租户(2.6)
   - ✅ Nullable Vector / Add Field(2.6)
   - 🔮 Milvus 3.0 进一步架构升级

19.3 主流向量数据库横向对比

维度 Milvus 2.6 Qdrant Weaviate Pinecone Chroma
开源
分布式 有限
存算分离
索引类型 12+ 4 3 有限 2
混合检索 有限
全文检索 ✅(BM25+多语言)
GPU加速
磁盘索引 ✅(DiskANN)
1-bit量化 ✅(RaBitQ)
JSON路径索引 ✅(Path Index)
零磁盘WAL ✅(Woodpecker)
地理空间字段 ✅(Geometry)
动态添加字段 ✅(Add Field)
可空向量 ✅(Nullable)
多语言SDK 5+ 4+ 4+ 3+ 2
社区规模 最大(40K+ Stars) 中等 中等 商业
生产成熟度
十亿级支持 有限
多租户集合数 10万+ 有限 有限 有限

选型决策树

是否需要分布式?
├── 否 → 数据规模?
│   ├── < 10万 → Chroma
│   └── 10万~100万 → Qdrant
└── 是 → 是否需要全托管?
    ├── 是 → Pinecone 或 Zilliz Cloud
    └── 否 → 是否需要混合检索?
        ├── 是 → Milvus(最佳选择)
        └── 否 → Milvus 或 Weaviate

第 20 章 避坑指南与最佳实践

20.1 设计阶段避坑

Schema 设计常见错误

错误 后果 正确做法
VARCHAR 长度设置过大 浪费内存 根据实际最大长度设置
未设置分区键 全集扫描 高频过滤字段设为分区键
过多标量字段 内存浪费 低频字段用动态字段
主键选择不当 性能问题 优先 INT64 自增主键
向量维度选择过大 内存翻倍 选择合适的嵌入模型

索引选型误区

误区 正确理解
“HNSW 总是最好的” 亿级数据内存可能不够,考虑 DiskANN
“PQ 压缩不影响召回” PQ 可能导致 5~15% 召回率下降
“GPU 索引总是更快” 单条查询 GPU 开销更大,批量查询才占优
“索引参数越大越好” M/efConstruction 过大导致构建慢、内存高

分片数量规划失误

常见失误:
1. 分片数过少 → 写入吞吐瓶颈
2. 分片数过多 → 协调开销增大,资源浪费
3. 创建后无法修改 → 必须提前规划

正确做法:
1. 根据写入QPS预估分片数
2. 预留 50% 的写入余量
3. 使用公式:分片数 = ceil(目标写入QPS / 单分片QPS) × 1.5

20.2 开发阶段避坑

写入性能常见坑

原因 解决方案
单条写入 网络开销大 批量写入(1000条/批)
批次过大 内存溢出 控制批次大小 < 5000
无并发 吞吐低 使用多线程/协程并发写入
频繁 upsert 产生大量删除标记 减少 upsert,定期 Compaction
未等 Flush 数据可能丢失 关键数据确认 Flush

查询过滤常见坑

原因 解决方案
低选择性过滤 过滤后数据仍很多 增加过滤条件提高选择性
JSON 深层过滤 性能差 高频字段提升为静态字段
未建标量索引 全集扫描 为过滤字段创建 INVERTED 索引
offset 过大 深度分页性能差 使用游标分页

数据一致性常见坑

原因 解决方案
写入后立即查不到 一致性级别太低 使用 Session 或 Bounded 级别
查询结果不一致 多副本间延迟 使用 Strong 级别(牺牲性能)
删除后仍能查到 软删除延迟 触发 Compaction 加速清理

20.3 生产运维避坑

集群部署配置坑

后果 解决方案
etcd 单节点 元数据丢失 至少 3 节点集群
MinIO 单节点 数据丢失 至少 4 节点分布式
未配置资源限制 节点 OOM 设置 Requests/Limits
未配置亲和性 调度不均衡 配置节点亲和性和反亲和
未启用认证 安全风险 启用 RBAC 和 TLS

内存与资源配置坑

常见问题:
1. QueryNode 内存不足 → OOM Kill
   解决:精确估算内存,预留50%余量

2. IndexNode 内存不足 → 索引构建失败
   解决:IndexNode 内存 ≥ 最大段大小 × 2

3. etcd 磁盘不足 → 集群不可用
   解决:定期压缩,监控磁盘使用

4. MinIO 磁盘不足 → 写入失败
   解决:监控磁盘使用,设置扩容告警

升级与迁移风险

升级前检查清单:
□ 备份 etcd 数据
□ 备份对象存储数据
□ 阅读升级文档和变更日志
□ 在测试环境验证升级
□ 准备回滚方案
□ 通知相关团队
□ 选择低峰期执行

迁移前检查清单:
□ 评估数据量和迁移时间
□ 验证目标集群版本兼容性
□ 测试迁移工具
□ 制定数据一致性验证方案
□ 准备回滚方案

20.4 大规模场景避坑

亿级数据扩容坑

解决方案
扩容后段分布不均 等待 QueryCoord 自动均衡
新节点内存不足 预先计算内存需求
扩容期间查询超时 分批迁移段,避免同时迁移过多
索引重建耗时过长 使用 GPU_CAGRA 加速索引构建

冷热分层常见问题

问题 解决方案
冷数据查询延迟高 预热常用冷数据到内存
分层策略不合理 根据访问频率动态调整
冷热切换抖动 设置缓冲区,避免频繁切换
分层存储配置复杂 参考官方文档,逐步启用

高可用容灾误区

误区 正确理解
“多副本就是高可用” 还需要自动故障检测和转移
“同城双活 = 零停机” 切换期间可能有短暂不可用
“数据备份 = 容灾” 还需要定期演练恢复流程
“云服务不需要容灾” 云服务也可能出故障,需要跨区域部署

附录

A. Milvus 2.6.18 常用配置参数速查

配置项 默认值 说明
common.security.authorizationEnabled false 启用 RBAC
queryNode.lazyLoadEnabled false 启用分层存储
queryNode.loadingMemoryPortion 0.5 内存中热数据比例
dataCoord.compaction.enableAutoCompaction true 自动 Compaction
proxy.maxReceiveSize 67108864 Proxy 最大接收大小
proxy.maxResultSize 104857600 Proxy 最大返回大小
tieredStorage.enabled false 启用冷热分层存储(2.6新增)
tieredStorage.hotDataTTL 7d 热数据保留时间(2.6新增)
woodpecker.enabled false 启用 Woodpecker WAL(2.6新增)
streamingNode.enabled true 启用流式节点(2.6新增)

B. 性能基准参考

数据规模 维度 索引 QPS P99延迟 内存
100万 768 HNSW 2000 5ms 3.2GB
1000万 768 HNSW 1500 10ms 32GB
1000万 768 IVF_RABITQ+SQ8 6000 8ms 9.5GB
1亿 768 DiskANN 1000 20ms 50GB
1亿 768 IVF_RABITQ+SQ8 4000 25ms 95GB

C. 常用命令速查

# 集群管理
kubectl get pods -n milvus
kubectl logs -f milvus-proxy-xxx -n milvus
kubectl scale deployment milvus-querynode --replicas=5 -n milvus

# Milvus 运维
helm upgrade milvus milvus/milvus -n milvus -f values.yaml
helm rollback milvus -n milvus
helm uninstall milvus -n milvus

# 数据备份
milvus-backup create -n backup_name
milvus-backup restore -n backup_name

# 调试工具
birdwatcher --etcd=127.0.0.1:2379

D. 推荐学习资源


🎉 恭喜你完成了 2026 精通 Milvus 向量库的 30 天学习之旅!

从向量数据库基础概念到分布式架构深度解析,从单机开发到亿级生产部署,你已经掌握了 Milvus 的全栈知识体系。

接下来,建议你:

  1. 在实际项目中应用所学知识
  2. 参与 Milvus 开源社区贡献
  3. 持续关注 Milvus 版本更新和新特性
  4. 分享你的实践经验,帮助更多人成长
0

评论区