侧边栏壁纸
  • 累计撰写 62 篇文章
  • 累计创建 15 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Chroma 向量数据库从入门到精通完整教程

温馨提示:
部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

Chroma 向量数据库从入门到精通完整教程

基于 Chroma 1.5.9 最新版本 · 涵盖 Python / TypeScript / Rust 多语言实战
最后更新:2026 年 6 月


前言:教程定位与学习路径

0.1 适用人群

本教程面向三类核心读者。第一类是 AI 应用开发者和 RAG 系统工程师,他们需要在检索增强生成管道中使用向量数据库作为核心存储引擎。第二类是对向量数据库零基础的后端开发者,他们有扎实的编程功底但从未系统接触过向量检索技术。第三类是需要将 Chroma 从原型阶段迁移到生产环境的技术团队,他们需要理解部署架构、性能调优和运维监控的全套知识。

0.2 前置知识

学习本教程前,你应具备以下基础:Python 基础编程能力(能使用 pip 安装依赖、编写函数和类);对向量、嵌入(Embedding)、相似度等机器学习概念有初步了解;了解大语言模型与 RAG 的基本原理(知道"为什么需要向量数据库"即可)。如果你对这些概念完全陌生,建议先阅读相关入门材料,但本教程在第一章也会对核心概念进行详细讲解。

0.3 2026 年 Chroma 技术生态概览

版本演进历程。 Chroma 从 0.4.x 时代起步,经历了数次重大架构变革。0.4.x 版本将底层存储从 DuckDB/ClickHouse 迁移到 SQLite,并引入了全新的客户端构造函数(EphemeralClientPersistentClientHttpClient)。0.5.x 系列完善了持久化和客户端-服务器模式。0.6.x 对 list_collections 的返回行为做了破坏性变更。而 2025 年 3 月发布的 v1.0.0 是一个里程碑式的版本——Chroma 的核心引擎被完全用 Rust 重写,带来了数量级的性能提升,同时配置方式从环境变量迁移到 YAML 配置文件,内置的 Python 认证中间件被移除。此后 1.x 系列快速迭代,到 2026 年 5 月的 v1.5.9 已成为功能最完备的稳定版本。

定位与边界。 Chroma 的核心定位是"AI 应用的开源数据基础设施",采用 Apache 2.0 许可证。它支持嵌入式(in-process)和客户端-服务器两种运行模式。嵌入式模式零依赖、开箱即用,非常适合原型开发和中小规模应用(单机可承载约 1500 万条 1536 维向量)。当数据量超过单机上限或需要高可用时,可以部署分布式模式或迁移到 Chroma Cloud。但如果你面对的是亿级以上的向量规模,可能需要评估 Milvus、Qdrant 等专为超大规模设计的系统——这一点在第 19 章会详细讨论。

2026 年技术栈。 Chroma 提供 Python、TypeScript/JavaScript、Rust 和 Go 四种语言的官方客户端。Python 客户端功能最完整(包括多模态嵌入支持),TypeScript 客户端适合全栈和前端开发者,Rust 客户端提供最低延迟的访问能力。Chroma Cloud 提供零运维的托管服务,支持集合分叉(Forking)、数据同步(Sync)等企业级特性。


第一阶段:入门基础(Day 1-2)


第 1 章 向量数据库与 Chroma 概述

1.1 向量数据库基础概念

什么是向量嵌入(Embedding)

向量嵌入是将非结构化数据(文本、图像、音频等)映射到高维数学空间中的稠密数值数组的过程。这个数组就是"向量"或"嵌入向量",它捕获了原始数据的语义信息。语义相近的数据在向量空间中的距离也会更近。

举个直观的例子:假设我们用某个嵌入模型将两段文本编码为向量:

"机器学习是人工智能的子领域"  → [0.23, -0.15, 0.87, ..., 0.42]  (1536维)
"深度学习属于AI研究领域"    → [0.21, -0.12, 0.85, ..., 0.40]  (1536维)
"今天的天气非常好"           → [-0.65, 0.33, -0.11, ..., 0.78]  (1536维)

前两段文本语义相似,它们的向量在空间中的距离很近;而第三段文本语义无关,向量距离较远。嵌入模型(如 OpenAI 的 text-embedding-3-small、开源的 all-MiniLM-L6-v2)在训练时通过对比学习或掩码语言建模等目标函数,学会了将语义信息编码到向量空间中。

嵌入的生成流程通常包含四个步骤:文本分词(Tokenize)→ 模型前向传播(Forward Pass)→ 池化聚合(Pooling,通常取 [CLS] token 或做平均池化)→ 可选的归一化(L2 Normalize)。最终得到一个固定维度的浮点数数组。

常见的嵌入模型对比如下:all-MiniLM-L6-v2 是开源轻量模型,384 维,适合本地开发和资源受限场景;text-embedding-3-small 是 OpenAI 提供的 1536 维模型,性价比极高;text-embedding-3-large 是 OpenAI 的 3072 维旗舰模型,精度最高;Cohere embed-v3 支持 1024 维,对多语言支持优秀;Google Gemini embedding 支持长达 8192 token 的上下文窗口。

相似度计算

向量数据库的核心操作是"找到与查询向量最相似的 K 个向量",这就需要定义"相似"的度量方式。Chroma 支持三种度量空间:

余弦相似度(Cosine Similarity) 衡量两个向量之间的夹角余弦值,与向量的模长无关,只关注方向。公式为:

cosine_similarity(A, B) = (A · B) / (|A| × |B|)

值域为 [-1, 1],1 表示完全相同方向,-1 表示完全相反。Chroma 存储的是余弦距离 1 - cosine_similarity,值越小越相似。这是文本检索场景的默认选择,因为嵌入模型通常在归一化后的向量空间中训练,方向比模长更有语义意义。

欧氏距离(L2 / Euclidean Distance) 是两点之间的直线距离:

L2(A, B) = √(Σ(aᵢ - bᵢ)²)

它对向量的绝对位置敏感,在图像特征向量的检索中更常用。如果你的嵌入模型没有做 L2 归一化,且向量的模长携带重要信息(比如某些 CLIP 变体),选择 L2 度量更合适。

内积(Inner Product / Dot Product) 直接计算两个向量的点积:

IP(A, B) = Σ(aᵢ × bᵢ)

Chroma 存储的是负内积 -IP,使得值越小越相似。当向量已经归一化(模长为 1)时,内积等价于余弦相似度,但计算更快(省去了归一化步骤)。适合对性能要求极高的场景。

选择建议: 对于文本嵌入,默认使用 cosine;对于图像特征向量,考虑 l2;如果你的向量已归一化且追求极致性能,使用 ip。最重要的是,度量空间应该匹配嵌入模型的训练目标——如果模型在 cosine 目标下训练,就应该用 cosine 度量。

近似最近邻搜索(ANN)原理

精确最近邻搜索(暴力搜索)需要计算查询向量与数据库中每一个向量的距离,时间复杂度为 O(n×d),其中 n 是数据量,d 是向量维度。当 n 达到百万级时,每次查询需要数十亿次浮点运算,延迟不可接受。

近似最近邻搜索(Approximate Nearest Neighbor, ANN)通过构建特殊的数据结构,牺牲少量精度(通常用"召回率"衡量)来大幅加速查询。主流的 ANN 算法家族包括:

  • 树形方法(KD-Tree, Ball-Tree):通过空间划分构建二叉树,适合低维数据,但在高维空间(>100 维)中性能退化严重,因为"维度灾难"导致空间划分效率骤降。
  • 局部敏感哈希(LSH):设计特殊的哈希函数使相似的向量有高概率碰撞到同一个桶中。内存效率好但召回率不稳定。
  • 倒排文件索引(IVF):先对向量做聚类(如 K-Means),查询时只搜索最近的几个簇。常与乘积量化(PQ)结合使用。
  • 图方法(HNSW, NSG):构建近邻图,查询时在图上贪心遍历。这是当前生产环境中主流的选择,HNSW 更是其中的王者算法。

Chroma 默认使用 HNSW 算法(基于 HNSWlib 库),这也是当前向量数据库领域应用最广泛的索引算法。我们将在第 3 章和第 18 章深入讲解 HNSW 的原理和调优。

向量数据库 vs 传统数据库

传统关系型数据库(MySQL, PostgreSQL)通过精确匹配(WHERE name = 'xxx')或全文索引(LIKE '%keyword%')检索数据,本质上是符号匹配——要么完全命中,要么完全错过。向量数据库则通过语义相似度检索数据,能发现"意思相近但措辞不同"的内容。

核心区别可以总结为:传统数据库的查询是确定性的(匹配或不匹配),向量数据库的查询是概率性的(相似度排名)。传统数据库擅长结构化数据的精确操作(事务、JOIN、聚合),向量数据库擅长非结构化数据的语义检索。在现代 AI 应用中,两者通常是互补的——向量数据库存储嵌入向量用于语义检索,关系型数据库存储业务元数据用于精确过滤。

1.2 Chroma 核心定位与优势

设计理念

Chroma 的设计哲学可以概括为三个关键词:极简、嵌入式、快速迭代

极简 API 体现在:3 行代码即可完成向量存储与查询,无需手动管理嵌入过程(内置默认嵌入函数)。嵌入式模式意味着 Chroma 可以直接作为 Python 库导入使用,无需启动独立的数据库进程——这一点对于快速原型开发和 Jupyter Notebook 实验特别友好。快速迭代则体现在 Chroma 的版本更新节奏上,从 v1.0 到 v1.5.9 仅用了一年多时间,每次迭代都带来实质性的功能增强。

三大运行模式

Chroma 提供三种运行模式,覆盖从开发到生产的完整生命周期:

内存模式(Ephemeral):数据完全存储在内存中,进程退出即丢失。适合单元测试、快速实验和一次性脚本。

import chromadb
client = chromadb.Client()  # 纯内存,零配置

持久化模式(Persistent):数据持久化到本地磁盘(SQLite + 文件),进程重启后数据仍在。适合单机应用和开发调试。

client = chromadb.PersistentClient(path="./my_chroma_db")

客户端-服务器模式(Client-Server):Chroma 作为独立服务运行,客户端通过 HTTP/REST API 访问。适合多应用共享、团队协作和生产部署。

client = chromadb.HttpClient(host="localhost", port=8000)

多语言支持

2026 年的 Chroma 提供四种语言的官方客户端。Python 客户端功能最完整,支持所有嵌入函数(包括多模态的 OpenCLIP)、全部 CRUD 操作和高级特性。TypeScript/JavaScript 客户端适合 Node.js 后端和浏览器前端(通过 HTTP 连接)。Rust 客户端提供最低延迟的访问能力和自动只读后端故障转移。Go 客户端也在积极开发中。

1.3 Chroma 1.5.x 核心新特性

Chroma 1.5.x 系列(截至 v1.5.9)引入了大量重要特性,以下是核心亮点的概述,后续章节将逐一展开。

Metadata Arrays(元数据数组)。 原生支持在元数据字段中存储数组值。例如一个文档可以有 tags: ["python", "ai", "vector-db"] 这样的多值属性,配合 $contains / $not_contains 运算符进行过滤。数组必须类型统一且不能为空。

Sparse Vector Search(稀疏向量搜索)。 支持 BM25 和 SPLADE 两种稀疏检索方式,可以与稠密向量检索组合实现混合搜索(Hybrid Search)。这是提升检索质量的关键特性,特别是在关键词精确匹配和语义理解需要兼得的场景中。

GroupBy 聚合。 按元数据字段对检索结果进行分组,支持组内 Top-K 筛选(MinK / MaxK)。这对搜索结果多样化、去重和分类感知排序非常有用。v1.5.9 进一步支持了分片环境下的 GroupBy。

WAL3 与 MCMR。 WAL3 是基于对象存储的第三代写入日志架构,v1.5.9 引入了 MCMR(Multi-Collection Multi-Region)支持用于日志垃圾回收,使跨集合、跨区域的日志清理更加高效。

Indexing Status(索引状态监控)。 可以实时查询索引构建进度,了解数据写入后何时完成索引化。v1.5.9 增强了分片集合的重建支持和压缩游标中的租户/数据库去规范化追踪。

Read Level(读取一致性级别)。 提供 full(强一致性,合并存储和日志)和 index_only(仅查询已索引数据,更低延迟)两种模式。v1.5.9 在 Rust 客户端中增加了自动只读后端故障转移能力。

MaxScore 索引。 v1.5.9 新增的索引类型,使用 MaxScore 算法配合 SIMD 硬件加速实现高效的稀疏向量搜索早停(Early Termination)。


第 2 章 环境搭建与 Hello World

2.1 安装 Chroma

Python 客户端安装

最简安装只需一行命令:

pip install chromadb

这会自动安装所有核心依赖,包括 HNSWlib(C++ 实现,自动编译或通过预编译 wheel 安装)、SQLite 绑定和 ONNX Runtime(用于默认的 all-MiniLM-L6-v2 嵌入模型)。如果你只需要连接远程 Chroma 服务器(不需要本地嵌入功能),可以安装轻量版:

pip install chromadb-client

这个轻量包去除了嵌入模型相关的依赖(ONNX Runtime、Tokenizers 等),体积更小,适合生产环境中应用服务器只作为客户端的场景。

验证安装成功:

import chromadb
print(chromadb.__version__)  # 应输出 1.5.9 或类似版本号

本地持久化模式配置

持久化模式不需要额外安装,创建客户端时指定路径即可:

client = chromadb.PersistentClient(path="./my_chroma_db")

Chroma 会在指定路径下创建 SQLite 数据库文件和 HNSW 索引文件。请确保该路径有足够的磁盘空间,且进程对该路径有读写权限。

Docker 服务端部署

如果你需要以独立服务的方式运行 Chroma(供多个应用连接),Docker 是最便捷的方式:

docker run -d \
  --name chroma-server \
  -v ./chroma-data:/data \
  -p 8000:8000 \
  chromadb/chroma:1.5.9

这条命令会拉取 Chroma 官方镜像,将容器的 8000 端口映射到主机,并通过 -v 将数据目录挂载到主机实现持久化。容器启动后,可以通过 http://localhost:8000/api/v2/heartbeat 检查服务健康状态。

生产级 Docker Compose 配置建议增加健康检查和资源限制:

version: '3.8'
services:
  chroma:
    image: chromadb/chroma:1.5.9
    container_name: chroma-server
    ports:
      - "8000:8000"
    volumes:
      - chroma-data:/data
    environment:
      - IS_PERSISTENT=TRUE
      - ANONYMIZED_TELEMETRY=FALSE
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/api/v2/heartbeat"]
      interval: 30s
      timeout: 10s
      retries: 3
    deploy:
      resources:
        limits:
          memory: 4G
        reservations:
          memory: 2G

volumes:
  chroma-data:
    driver: local

Chroma Cloud 云端快速体验

如果你不想自己管理基础设施,Chroma Cloud 提供零运维的托管服务。注册后获取 API Key,即可通过 CloudClient 连接:

client = chromadb.CloudClient(
    tenant="your-tenant",
    database="your-database",
    api_key="your-api-key"
)

Chroma Cloud 支持多租户隔离、集合分叉、自动数据同步(GitHub / S3 / 网页爬虫)等高级特性,且按用量计费(写入 $2.50/GiB,查询 $0.0075/TiB,存储 $0.33/GiB/月)。

2.2 三种客户端模式详解

Chroma 的 Python SDK 提供五种客户端构造方式,每种对应不同的使用场景。

EphemeralClient(纯内存客户端)

import chromadb

# 纯内存模式 - 进程退出数据丢失
client = chromadb.Client()

适用场景:单元测试(每次测试一个干净的数据库)、Jupyter Notebook 实验、一次性数据处理脚本。数据完全存储在内存中,不产生任何磁盘 I/O,速度最快。

PersistentClient(本地持久化客户端)

client = chromadb.PersistentClient(path="./chroma_db")

数据持久化到本地磁盘,使用 SQLite 存储元数据和集合目录,HNSW 索引文件存储在子目录中,Parquet 文件存储嵌入向量和文档文本。进程重启后数据完整保留。适用场景:单机应用、开发调试、小规模生产(百万级以下数据量)。

HttpClient(远程 HTTP 客户端)

client = chromadb.HttpClient(
    host="localhost",
    port=8000,
    # 可选:认证
    # settings=Settings(chroma_client_auth_credentials="your-token")
)

连接到以独立服务运行的 Chroma 实例。支持同步和异步两种方式(AsyncHttpClient)。适用场景:多应用共享同一个 Chroma 实例、团队协作、生产部署。

AsyncHttpClient(异步 HTTP 客户端)

client = chromadb.AsyncHttpClient(host="localhost", port=8000)

# 所有操作都是 async/await 的
collection = await client.get_or_create_collection("docs")
results = await collection.query(query_texts=["hello"], n_results=5)

完全异步的客户端,适合在 FastAPI、aiohttp 等异步框架中使用,避免阻塞事件循环。

CloudClient(Chroma Cloud 客户端)

client = chromadb.CloudClient(
    tenant="my-tenant",
    database="default_database",
    api_key="ck-xxxxxxxxxxxxxxxx"
)

专门用于连接 Chroma Cloud 托管服务,自动处理认证和租户路由。

2.3 第一个向量检索程序

让我们用 Chroma 编写一个完整的向量检索示例,从创建客户端到查询结果,核心只需几行代码。

import chromadb

# 步骤 1:创建客户端(这里使用内存模式快速演示)
client = chromadb.Client()

# 步骤 2:创建集合
# Chroma 默认使用 all-MiniLM-L6-v2 嵌入函数
# 它会自动将文本转换为 384 维的嵌入向量
collection = client.create_collection(name="knowledge_base")

# 步骤 3:写入数据
collection.add(
    documents=[
        "Chroma 是一个开源的向量数据库,专为 AI 应用设计",
        "Python 是最流行的机器学习和数据科学编程语言",
        "向量数据库通过语义相似度检索非结构化数据",
        "HNSW 是目前最广泛使用的近似最近邻搜索算法",
        "RAG(检索增强生成)结合检索和大语言模型提升回答质量",
    ],
    ids=["doc1", "doc2", "doc3", "doc4", "doc5"]
)

# 步骤 4:语义查询
results = collection.query(
    query_texts=["什么是用于 AI 的数据库"],
    n_results=3
)

# 步骤 5:查看结果
print(results)

输出结果结构如下:

{
    'ids': [['doc1', 'doc3', 'doc5']],
    'documents': [[
        'Chroma 是一个开源的向量数据库,专为 AI 应用设计',
        '向量数据库通过语义相似度检索非结构化数据',
        'RAG(检索增强生成)结合检索和大语言模型提升回答质量'
    ]],
    'distances': [[0.3241, 0.4523, 0.6187]],
    'metadatas': [[None, None, None]],
    'embeddings': None
}

核心对象解读。 Client 是入口对象,管理与 Chroma 后端的连接。Collection 是数据的容器,类似于关系型数据库中的"表",但存储的是向量数据。每个集合有一个名称、一个距离度量空间(默认 L2)和一个嵌入函数。

默认嵌入函数。 当你不指定嵌入函数时,Chroma 自动使用 all-MiniLM-L6-v2——这是一个由 Sentence Transformers 库提供的轻量级开源模型,384 维,在多种语言的任务上表现良好。它通过 ONNX Runtime 在本地运行,无需 API Key,首次使用时会自动下载模型文件(约 80MB)。

查询结果结构。 query() 返回的结果是一个字典,每个字段都是二维列表(外层对应多个查询,内层对应每个查询的多个结果)。ids 是匹配的文档 ID,documents 是原始文本,distances 是距离分数(越小越相似),metadatas 是元数据。


第二阶段:核心操作(Day 3-5)


第 3 章 集合(Collection)管理

集合(Collection)是 Chroma 中数据组织的基本单元。你可以把它类比为关系型数据库中的"表"或 MongoDB 中的"集合",但它有一个关键区别:集合中的数据是向量化存储的,支持语义相似度检索。

3.1 集合的创建与配置

create_collection vs get_or_create_collection

# 创建新集合 - 如果同名集合已存在则抛出异常
collection = client.create_collection(name="my_docs")

# 获取或创建 - 如果已存在则返回现有集合,否则创建新的
collection = client.get_or_create_collection(name="my_docs")

在生产代码中,强烈推荐使用 get_or_create_collection,它避免了"集合已存在"的错误,使代码具有幂等性——无论你运行多少次都是安全的。create_collection 更适合在初始化脚本中明确表达"这个集合必须是新创建的"这一意图。

集合命名规范

集合名称必须满足以下规则:长度在 3 到 512 个字符之间;只能包含字母、数字、连字符(-)、下划线(_)和点(.);必须以字母或数字开头和结尾(不能以连字符、下划线或点开头/结尾)。这些限制是因为集合名称会被用于构建存储路径和 API URL。

# 合法名称
"documents", "user-embeddings-v2", "my_app.docs"

# 非法名称
"ab"           # 太短(<3 字符)
"-docs"        # 以连字符开头
"docs."        # 以点结尾
"my collection" # 包含空格

距离度量空间选择

创建集合时通过 metadata 参数指定距离度量:

collection = client.create_collection(
    name="text_docs",
    metadata={"hnsw:space": "cosine"}  # 可选: "cosine", "l2", "ip"
)

选择指南:文本嵌入用 cosine(这是最常见的场景),图像特征用 l2,已归一化的向量用 ip(速度最快)。一旦集合创建,距离度量不可更改——如果需要切换,必须创建新集合并迁移数据。

集合元数据配置

除了 HNSW 参数外,集合的 metadata 字段还可以存储自定义信息:

collection = client.create_collection(
    name="production_docs",
    metadata={
        "hnsw:space": "cosine",
        "description": "生产环境文档库",
        "version": "2.0",
        "owner": "ai-team"
    }
)

3.2 HNSW 索引参数配置

HNSW(Hierarchical Navigable Small World)是 Chroma 的核心索引算法。理解并正确配置 HNSW 参数对检索性能至关重要。这里先给出配置方法,第 18 章会深入讲解算法原理。

collection = client.create_collection(
    name="optimized_collection",
    metadata={
        # 距离度量空间
        "hnsw:space": "cosine",
        
        # 构建阶段参数
        "hnsw:construction_ef": 128,   # 构建时的搜索宽度
        "hnsw:M": 16,                  # 每个节点的连接数
        
        # 查询阶段参数
        "hnsw:search_ef": 64,          # 查询时的搜索宽度
        
        # 运行参数
        "hnsw:num_threads": 8,         # 并行线程数
        "hnsw:batch_size": 100,        # 批量操作大小
        "hnsw:sync_threshold": 1000,   # 同步到磁盘的阈值
        "hnsw:resize_factor": 1.2      # 索引扩容系数
    }
)

各参数的详细含义和调优指南:

hnsw:construction_ef(构建阶段搜索宽度)。 控制构建索引时搜索近邻的范围。值越大,构建出的索引质量越高(图连接更优),但构建速度越慢。取值范围通常 50-500,默认值 100。建议在生产环境中设置为 128-256。这个参数只在数据写入时起作用,不影响查询性能。

hnsw:search_ef(查询阶段搜索宽度)。 控制查询时搜索候选近邻的范围。值越大,召回率越高(越接近精确搜索),但查询延迟越大。取值范围通常 10-200,默认值 10。这是你最常调优的参数——当发现召回率不够时,首先增大 search_ef。建议从 32 开始,逐步增大直到满意。

hnsw:M(每层邻居节点数)。 控制图中每个节点的最大连接数。值越大,图连通性越好,查询越准确,但内存占用越大(每个连接需要存储两个整数指针)。取值范围通常 12-64,默认值 16。对于大多数场景 16 已经足够,高维数据(>1000 维)可以考虑增大到 24-32。

参数调优的核心原则是在"召回率"和"性能"之间找到平衡点。一个实用的方法是:先用默认参数导入数据,然后用一组标注好的测试查询评估召回率(Recall@10),如果不够就逐步增大 search_ef,如果查询太慢就适当减小。construction_ef 影响的是索引质量上限,一旦建好就无法在不重建索引的情况下修改。

3.3 集合的生命周期

获取与列出集合

# 获取已有集合
collection = client.get_collection("my_docs")

# 如果指定了自定义嵌入函数,获取时需要传入
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction
ef = OpenAIEmbeddingFunction(api_key="sk-...", model_name="text-embedding-3-small")
collection = client.get_collection("my_docs", embedding_function=ef)

# 列出所有集合(支持分页)
collections = client.list_collections()       # 返回 Collection 对象列表
collections = client.list_collections(limit=10, offset=0)  # 分页

注意:从 v0.6.0 开始,list_collections 返回的是 Collection 对象列表(而非早期的集合名称字符串列表)。

删除集合

client.delete_collection("my_docs")

删除操作是不可逆的,集合中的所有数据都会被永久删除。在生产环境中建议做好备份再执行删除。

集合重命名与修改

# 修改集合名称和元数据
collection = client.get_collection("old_name")
collection.modify(
    name="new_name",
    metadata={"hnsw:search_ef": 128, "description": "已重命名"}
)

modify 方法可以修改集合的名称和元数据(包括 HNSW 参数),但不能修改距离度量空间。如果需要更改距离度量,必须创建新集合并迁移数据。


第 4 章 数据写入操作

4.1 数据模型详解

Chroma 中的每条数据(称为"记录"或"文档")由以下字段组成:

ids(必填)。 每条记录的唯一标识符,字符串类型。在同一个集合内必须唯一。如果你执行 add() 时传入已存在的 ID,Chroma 会抛出异常(应使用 upsert() 来更新)。ID 的设计建议:使用有意义的前缀加 UUID,如 "doc_550e8400-e29b-41d4-a716-446655440000",避免使用纯数字序列(缺乏自描述性),也避免使用原始文档标题(可能包含非法字符且容易重复)。

documents(可选但推荐)。 原始文本内容。如果你不传 embeddings,Chroma 会使用集合配置的嵌入函数自动将 documents 转换为向量。即使你提供了预计算的 embeddings,也建议同时存储 documents,这样查询时可以直接返回原始文本,省去回查原始数据源的开销。

embeddings(可选)。 预计算的嵌入向量,浮点数列表。维度必须与集合的嵌入函数输出维度一致。当你使用自己的嵌入模型(或需要批量预计算优化)时传入此字段。

metadatas(可选)。 附加的键值对字典,用于过滤和组织数据。支持的值类型包括:字符串、整数、浮点数、布尔值,以及 v1.5.x 新增的数组类型。元数据有两类用途:一是查询时的过滤条件(如 where={"category": "finance"}),二是业务逻辑中的展示信息(如来源、作者、创建时间)。

uris(可选)。 数据资源的 URI 路径,主要用于多模态场景(如图像文件路径)。配合 DataLoader 使用,Chroma 会在嵌入时加载 URI 指向的数据。

4.2 写入 API 全解析

add():新增数据

collection.add(
    ids=["doc1", "doc2", "doc3"],
    documents=[
        "Chroma 支持多种嵌入模型",
        "向量检索比全文搜索更理解语义",
        "HNSW 算法构建分层导航图"
    ],
    metadatas=[
        {"source": "docs", "category": "database", "priority": 1},
        {"source": "blog", "category": "search", "priority": 2},
        {"source": "paper", "category": "algorithm", "priority": 3}
    ]
)

如果传入的 ID 已存在,add() 会抛出 UniqueConstraintError。如果你不确定 ID 是否已存在,使用 upsert()

upsert():更新或插入

collection.upsert(
    ids=["doc1", "doc_new"],
    documents=[
        "Chroma 支持超过 20 种嵌入模型",  # 更新已有 doc1
        "这是一条全新的文档"                  # 插入新文档
    ]
)

upsert 是 “update or insert” 的缩写:如果 ID 已存在,则更新该记录的所有字段(document、embedding、metadata);如果 ID 不存在,则插入新记录。这是最安全的写入方式,特别适合增量更新和幂等操作。

update():更新已有数据

# 只更新元数据,不改变文档内容和嵌入
collection.update(
    ids=["doc1"],
    metadatas=[{"source": "updated_docs", "category": "database", "priority": 5}]
)

# 更新文档内容 - 会自动重新生成嵌入向量
collection.update(
    ids=["doc1"],
    documents=["Chroma 支持超过 25 种嵌入模型,包括最新的多模态模型"]
)

update() 只能更新已存在的记录。你可以部分更新——只传 metadatas 不传 documents,或者只传 documents 不传 metadatas。如果更新了 documents,Chroma 会自动重新计算嵌入向量。

批量写入与性能最佳实践

批量写入是提升性能的关键。单次写入一条记录和批量写入 200 条记录的吞吐量差距可以达到 10 倍以上,因为 HNSW 索引构建可以批量化处理。

最优批量大小:50-250 条/批。 这是 Chroma 官方推荐的单节点最优批量范围。太小(<50)会导致频繁的小事务,太大(>500)会导致内存峰值和单次操作超时。在客户端-服务器模式下,可以适当增大到 500-1000。

import chromadb

client = chromadb.PersistentClient(path="./chroma_db")
collection = client.get_or_create_collection("large_dataset")

# 假设 documents 和 embeddings 是预先准备好的大列表
all_documents = [...]   # 10 万条文档
all_embeddings = [...]  # 对应的嵌入向量
all_ids = [f"doc_{i}" for i in range(len(all_documents))]

# 分批写入
batch_size = 200
for i in range(0, len(all_documents), batch_size):
    end = min(i + batch_size, len(all_documents))
    collection.add(
        ids=all_ids[i:end],
        documents=all_documents[i:end],
        embeddings=all_embeddings[i:end]
    )
    if (i // batch_size) % 50 == 0:
        print(f"已写入 {end}/{len(all_documents)} 条")

4.3 嵌入函数(Embedding Function)

嵌入函数是 Chroma 的核心组件之一,负责将文本(或其他模态的数据)转换为嵌入向量。Chroma 内置了超过 20 种嵌入函数适配器,覆盖主流的嵌入模型提供商。

默认嵌入函数:Sentence Transformers (all-MiniLM-L6-v2)

当你在创建集合时不指定嵌入函数,Chroma 自动使用 all-MiniLM-L6-v2。这是一个 22M 参数的轻量模型,输出 384 维向量,在多种语言和任务上表现良好。它通过 ONNX Runtime 在本地运行,无需 API Key,首次使用时自动下载模型文件。

# 等价于不传 embedding_function
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction

ef = SentenceTransformerEmbeddingFunction(
    model_name="all-MiniLM-L6-v2",
    device="cpu"  # 可选 "cuda" 使用 GPU 加速
)

collection = client.create_collection(
    name="my_docs",
    embedding_function=ef
)

OpenAI 嵌入

from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction

ef = OpenAIEmbeddingFunction(
    api_key="sk-...",
    model_name="text-embedding-3-small"  # 1536 维,性价比高
    # model_name="text-embedding-3-large"  # 3072 维,精度最高
)

collection = client.create_collection(
    name="openai_docs",
    embedding_function=ef
)

OpenAI 的嵌入模型需要通过 API 调用,有网络延迟和成本(text-embedding-3-small 约 $0.02/百万 token),但质量优秀且维度灵活(支持 dimensions 参数降维)。

Azure OpenAI 嵌入

from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction

ef = OpenAIEmbeddingFunction(
    api_key="your-azure-key",
    api_base="https://your-resource.openai.azure.com/",
    api_type="azure",
    api_version="2024-02-01",
    model_name="text-embedding-3-small"
)

其他内置嵌入函数

Chroma 还支持 Cohere(多语言优秀)、HuggingFace(通过 Inference API 或本地模型)、Google Gemini(支持长上下文)、Jina(支持多任务嵌入)、Mistral、Ollama(本地大模型嵌入)、VoyageAI(专为 RAG 优化的嵌入)等。使用方式类似,都是创建对应的嵌入函数实例并传入集合。

# Cohere
from chromadb.utils.embedding_functions import CohereEmbeddingFunction
ef = CohereEmbeddingFunction(api_key="...", model_name="embed-multilingual-v3.0")

# Google Gemini
from chromadb.utils.embedding_functions import GoogleGenerativeAiEmbeddingFunction
ef = GoogleGenerativeAiEmbeddingFunction(api_key="...", model_name="models/embedding-001")

# Ollama(本地运行)
from chromadb.utils.embedding_functions import OllamaEmbeddingFunction
ef = OllamaEmbeddingFunction(url="http://localhost:11434/api/embeddings", model_name="nomic-embed-text")

自定义嵌入函数开发

如果内置的嵌入函数都不满足需求(比如你使用自研模型或私有部署的嵌入服务),可以开发自定义嵌入函数。需要实现以下接口:

from chromadb import EmbeddingFunction
from chromadb.embeddings import Embeddings

class MyCustomEmbeddingFunction(EmbeddingFunction):
    def __init__(self, model_url: str, **kwargs):
        self.model_url = model_url
    
    def __call__(self, input: list[str]) -> Embeddings:
        """将文本列表转换为嵌入向量列表"""
        # 调用你的嵌入服务
        embeddings = call_my_embedding_service(self.model_url, input)
        return embeddings
    
    def name(self) -> str:
        return "my_custom_ef"
    
    def get_config(self) -> dict:
        return {"model_url": self.model_url}
    
    @staticmethod
    def build_from_config(config: dict) -> "MyCustomEmbeddingFunction":
        return MyCustomEmbeddingFunction(**config)

嵌入维度与集合的匹配规则

一个集合只能对应一个嵌入维度。当你第一次向集合写入数据时,嵌入维度就被确定下来了(无论是通过嵌入函数自动计算还是通过 embeddings 参数手动指定)。后续写入的数据必须具有相同的维度,否则会抛出维度不匹配错误。

如果你需要切换嵌入模型(维度不同),必须创建新集合并迁移数据。如果新旧模型维度相同但语义空间不同(如从 text-embedding-ada-002 切换到 text-embedding-3-small,都是 1536 维),也建议创建新集合——因为不同模型的向量空间不兼容,混用会导致检索质量严重下降。


第 5 章 向量查询与检索

5.1 基础语义检索

query() 方法是 Chroma 最核心的 API,它执行语义相似度搜索:

results = collection.query(
    query_texts=["什么是向量数据库"],  # 查询文本(可传多个)
    n_results=5,                        # 返回的最相似结果数量
    include=["documents", "metadatas", "distances"],  # 返回的字段
    # where={"category": "database"},   # 可选:元数据过滤
    # where_document={"$contains": "Chroma"}  # 可选:文档内容过滤
)

query_texts。 可以传入单个字符串或字符串列表。传入列表时,每个查询文本独立执行一次搜索,结果按查询分组返回。

n_results。 返回的最相似结果数量,默认 10。注意这个值不能超过集合中的记录总数,否则会抛出错误。

include。 控制返回结果中包含哪些字段。可选值:"documents"(原始文本)、"metadatas"(元数据)、"embeddings"(嵌入向量)、"distances"(距离分数)。默认包含 documentsmetadatasdistances。如果你不需要原始文本(比如在管道中只需要 ID 和分数),可以省略 documents 来减少数据传输量。

距离分数解读。 距离的含义取决于集合配置的距离度量空间。对于 cosine 度量,距离 = 1 - 余弦相似度,范围 [0, 2],0 表示完全相同;对于 l2 度量,距离是欧氏距离的平方(注意不是开方后的值),值越大差异越大;对于 ip 度量,距离是负内积,值越小越相似。在所有度量下,距离越小越相似

5.2 元数据过滤(Metadata Filtering)

元数据过滤允许你在向量搜索的同时施加结构化条件,类似于 SQL 的 WHERE 子句。这是实现精确控制检索范围的关键手段。

基础比较运算符

# 等于(默认运算符,可省略 $eq)
collection.query(
    query_texts=["..."],
    where={"category": "database"}           # 简写
    # where={"category": {"$eq": "database"}} # 完整写法
)

# 不等于
where={"status": {"$ne": "archived"}}

# 大于 / 大于等于
where={"priority": {"$gt": 5}}
where={"created_at": {"$gte": 1700000000}}

# 小于 / 小于等于
where={"price": {"$lt": 100.0}}
where={"score": {"$lte": 0.5}}

集合运算符

# 包含于列表
where={"category": {"$in": ["database", "search", "ml"]}}

# 不包含于列表
where={"status": {"$nin": ["deleted", "archived"]}}

逻辑运算符

# AND 组合
where={
    "$and": [
        {"category": "database"},
        {"priority": {"$gte": 3}},
        {"status": {"$ne": "archived"}}
    ]
}

# OR 组合
where={
    "$or": [
        {"source": "official_docs"},
        {"source": "blog", "priority": {"$gte": 5}}
    ]
}

# NOT
where={
    "$not": {"status": "draft"}
}

# 复杂嵌套
where={
    "$and": [
        {"$or": [
            {"category": "database"},
            {"category": "search"}
        ]},
        {"priority": {"$gte": 3}},
        {"$not": {"status": "deleted"}}
    ]
}

Metadata Arrays 数组过滤(v1.5.x 新特性)

当元数据字段存储的是数组时,可以使用 $contains$not_contains 运算符:

# 写入带数组元数据的数据
collection.add(
    ids=["doc1", "doc2"],
    documents=["Python 向量数据库教程", "Java Web 开发指南"],
    metadatas=[
        {"tags": ["python", "ai", "vector-db"], "level": "intermediate"},
        {"tags": ["java", "web", "spring"], "level": "beginner"}
    ]
)

# 查询包含特定标签的文档
results = collection.query(
    query_texts=["数据库教程"],
    where={"tags": {"$contains": "python"}},
    n_results=5
)

# 查询不包含某标签的文档
results = collection.get(
    where={"tags": {"$not_contains": "java"}}
)

数组过滤使用与标量值相同的倒排索引结构,因此性能开销与标量过滤相当。但需要注意数组必须类型统一(不能混用字符串和数字)且不能为空。

文档内容过滤

除了元数据过滤外,Chroma 还支持对文档文本内容进行过滤:

# 文档包含特定子串
collection.query(
    query_texts=["..."],
    where_document={"$contains": "Chroma"}
)

# 文档不包含特定子串
where_document={"$not_contains": "deprecated"}

# 正则表达式匹配
where_document={"$regex": "v\\d+\\.\\d+"}     # 匹配版本号
where_document={"$not_regex": "^\\[DRAFT\\]"}  # 排除草稿

# 组合文档过滤和元数据过滤
results = collection.query(
    query_texts=["向量检索"],
    where={"category": "tutorial"},
    where_document={"$contains": "HNSW"},
    n_results=5
)

5.3 其他查询方式

get():按 ID 精确获取

# 按 ID 获取
results = collection.get(ids=["doc1", "doc2", "doc3"])

# 按元数据过滤获取(不做向量搜索)
results = collection.get(
    where={"source": "official_docs"},
    limit=100,
    offset=0,  # 分页
    include=["documents", "metadatas"]
)

get() 不做向量搜索,只按 ID 或元数据条件检索,速度非常快。适合在知道文档 ID 后回查详情,或按元数据条件列出文档。

peek():快速预览

# 预览集合中的前 N 条数据
results = collection.peek(limit=5)

等价于 get(limit=5),用于快速查看集合中的数据样本。

count():统计总量

total = collection.count()
print(f"集合中共有 {total} 条记录")

delete():按条件删除

# 按 ID 删除
collection.delete(ids=["doc1", "doc2"])

# 按元数据条件删除
collection.delete(where={"status": "archived"})

# 按文档内容条件删除
collection.delete(where_document={"$contains": "DEPRECATED"})

删除操作是永久的。在生产环境中建议使用软删除模式:在元数据中添加 "deleted": true 标记,查询时过滤掉已删除记录,定期执行物理删除释放空间。


第三阶段:进阶功能(Day 6-10)


第 6 章 混合搜索(Hybrid Search)

6.1 稀疏向量搜索原理

稠密向量 vs 稀疏向量

前几章中我们使用的都是稠密向量(Dense Vector)——一个每个维度都有非零值的浮点数数组。稠密向量擅长捕获语义相似性,比如"汽车"和"轿车"在稠密向量空间中距离很近。但稠密向量有一个弱点:对精确的关键词匹配不够敏感。比如查询"Chroma 1.5.9 的 WAL3 特性",稠密检索可能会返回讨论 WAL(写入日志)的文档,但可能错过精确提到"WAL3"这个术语的文档。

稀疏向量(Sparse Vector)则相反——它是一个极高维度的向量(通常维度等于词汇表大小,数万到数十万),但绝大多数维度为零,只有少数维度有非零值。每个非零维度对应一个词汇项(term),值表示该项的重要性。稀疏向量天然擅长精确的关键词匹配。

BM25 算法

BM25(Best Matching 25)是信息检索领域最经典的排序算法之一,由 Robertson 和 Walker 于 1994 年提出。它是 TF-IDF 的改进版本,引入了两个关键优化:

词频饱和(Term Frequency Saturation)。 一个词在文档中出现的次数越多,相关性分数越高,但增长会逐渐饱和。这通过参数 k₁ 控制:

TF_component = (f(q, D) × (k₁ + 1)) / (f(q, D) + k₁)

其中 f(q, D) 是词 q 在文档 D 中的出现次数。当 k₁ = 0 时,词频完全不考虑(只看是否出现);k₁ 越大,高频词的权重增长越快。通常设置 k₁ = 1.2。

文档长度归一化。 较长的文档天然有更高的词频,BM25 通过参数 b 对此进行补偿:

normalized_TF = (f(q, D) × (k₁ + 1)) / (f(q, D) + k₁ × (1 - b + b × |D|/avgdl))

其中 |D| 是文档长度,avgdl 是平均文档长度。b = 0 表示不做长度归一化,b = 1 表示完全归一化。通常设置 b = 0.75。

最终的 BM25 分数是查询中所有词的 TF 分数乘以 IDF(逆文档频率)之和。IDF 衡量一个词的稀有程度——越稀有的词对匹配的贡献越大。

SPLADE 模型

SPLADE(SParse Lexical AnD Expansion)是一种学习式稀疏表示模型,它使用预训练的 BERT 的 MLM(Masked Language Model)头部来为每个词分配权重。与 BM25 不同,SPLADE 还能为文档中没有出现的词分配权重(文档扩展),从而在保持精确匹配优势的同时具备一定的语义理解能力。

SPLADE 的权重计算公式为:

w_j = max_i log(1 + ReLU(W_ij))

其中 W_ij 是第 i 个 token 对第 j 个词汇项的预测分数。max 操作确保每个词汇项取最高的 token 级权重。

混合检索的优势场景

混合检索将稠密向量(语义匹配)和稀疏向量(关键词匹配)的结果融合,在以下场景中效果显著:专有名词和技术术语的检索(如产品名、API 名称、版本号);需要同时理解意图和匹配关键词的问答系统;法律、医疗等领域文档检索(精确术语 + 语义理解同等重要)。

6.2 Chroma 稀疏向量实战

BM25 嵌入函数配置

Chroma 内置了 BM25 嵌入函数,使用 Snowball 词干提取器进行文本归一化:

from chromadb.utils.embedding_functions import ChromaBM25

bm25_ef = ChromaBM25(
    k=1.2,              # 词频饱和参数
    b=0.75,             # 长度归一化参数
    avg_doc_length=256,  # 平均文档长度
    token_max_length=40  # 最大 token 长度
)

这个嵌入函数完全在本地运行,不需要 API Key,支持 Python、TypeScript 和 Rust 客户端。

稀疏 + 稠密向量联合检索(Hybrid Search with RRF)

在 Chroma Cloud 的新 Search API 中,混合搜索通过 RRF(Reciprocal Rank Fusion)实现:

from chromadb import Search, K, Knn, Rrf

# 定义稠密向量搜索(语义匹配)
dense_rank = Knn(
    query="Chroma WAL3 写入日志架构",
    return_rank=True,  # RRF 要求必须设为 True
    limit=200          # 先取较多候选再融合
)

# 定义稀疏向量搜索(关键词匹配)
sparse_rank = Knn(
    query="Chroma WAL3 写入日志架构",
    key="sparse_embedding",  # 引用稀疏索引
    return_rank=True,
    limit=200
)

# RRF 融合
hybrid_rank = Rrf(
    ranks=[dense_rank, sparse_rank],
    weights=[0.7, 0.3],  # 70% 语义,30% 关键词
    k=60,                # 平滑参数
    normalize=True        # 权重归一化
)

# 执行搜索
results = collection.search(
    Search()
        .rank(hybrid_rank)
        .limit(10)
        .where(K("status") == "active")
)

RRF 融合算法详解

RRF(Reciprocal Rank Fusion)的公式为:

RRF_score(d) = -Σ(wᵢ / (k + rᵢ(d)))

其中 wᵢ 是第 i 个排名信号的权重,k 是平滑参数(默认 60),rᵢ(d) 是文档 d 在第 i 个排名中的位置(从 1 开始)。

RRF 的精妙之处在于它只使用排名位置而非原始分数来融合。这解决了不同检索系统分数尺度不一致的问题——BM25 的分数可能是 0-30,而稠密向量的余弦距离是 0-2,直接加权没有意义。但通过转化为排名位置,两者可以在同一尺度下融合。

举例说明:假设文档 A 在稠密检索中排第 1,在稀疏检索中排第 5,权重各为 0.7 和 0.3,k=60:

RRF(A) = -(0.7/(60+1) + 0.3/(60+5))
       = -(0.7/61 + 0.3/65)
       = -(0.01148 + 0.00462)
       = -0.01610

文档 B 在稠密检索中排第 3,在稀疏检索中排第 1:

RRF(B) = -(0.7/(60+3) + 0.3/(60+1))
       = -(0.7/63 + 0.3/61)
       = -(0.01111 + 0.00492)
       = -0.01603

A 的 RRF 分数绝对值更大(-0.01610 vs -0.01603),所以 A 排名更靠前。这反映了 A 在权重更高的稠密检索中排名第 1 的优势。

权重调优建议

权重选择取决于你的应用场景。对于通用问答系统,建议 0.6-0.7 给稠密向量(语义理解更重要);对于技术文档检索(专有名词多),可以 0.5/0.5 或偏向稀疏(关键词精确匹配更重要);对于搜索型应用(用户输入的是查询词而非自然语言问题),可以 0.3/0.7(偏向关键词匹配)。


第 7 章 多模态向量检索

7.1 多模态嵌入基础

统一向量空间概念

多模态嵌入的核心思想是将不同类型的数据(文本、图像、音频)映射到同一个向量空间中,使得跨模态的语义相似度计算成为可能。比如一张"海滩日落"的照片和一段描述"海边日落美景"的文字,在统一向量空间中的距离应该很近。

CLIP / OpenCLIP 模型原理

CLIP(Contrastive Language-Image Pre-training)是 OpenAI 于 2021 年提出的多模态模型,它同时训练了一个图像编码器和一个文本编码器,使得配对的图像-文本对在向量空间中距离相近,不配对的距离较远。训练使用 InfoNCE 对比损失函数:

Loss = -log(exp(sim(image, text) / τ) / Σ exp(sim(image_i, text_j) / τ))

其中 τ 是温度参数。训练完成后,图像编码器和文本编码器可以将各自的输入映射到同一个向量空间中。

Chroma 使用 OpenCLIP(开源复现版本)作为多模态嵌入函数,默认使用 ViT-B-32 架构和 laion2b_s34b_b79k 预训练权重。

7.2 Chroma 多模态实战

注意:多模态嵌入功能目前仅支持 Python 客户端,需要额外安装 open-clip-torchtorchpillow 依赖。

OpenCLIPEmbeddingFunction 配置

from chromadb.utils.embedding_functions import OpenCLIPEmbeddingFunction

multimodal_ef = OpenCLIPEmbeddingFunction(
    model_name="ViT-B-32",          # 模型架构
    checkpoint="laion2b_s34b_b79k",  # 预训练权重
    device="cpu"                      # "cpu" 或 "cuda"
)

可选的模型架构还包括 ViT-L-14(更大、更准确、更慢)。device="cuda" 在有 GPU 的环境中可以显著加速嵌入计算。

图像数据入库

from chromadb.utils.data_loaders import ImageLoader

loader = ImageLoader()

# 创建多模态集合
collection = client.create_collection(
    name="image_gallery",
    embedding_function=multimodal_ef
)

# 通过 URI 添加图像
collection.add(
    ids=["sunset_001", "city_001", "forest_001"],
    uris=[
        "./images/sunset.jpg",
        "./images/city_skyline.jpg",
        "./images/forest_path.jpg"
    ],
    metadatas=[
        {"description": "海边日落", "location": "三亚", "type": "nature"},
        {"description": "城市夜景", "location": "上海", "type": "urban"},
        {"description": "森林小径", "location": "张家界", "type": "nature"}
    ],
    loader=loader
)

ImageLoader 负责从 URI 加载图像数据供嵌入函数处理。注意 Chroma 不存储原始图像数据——它只存储嵌入向量和元数据。URI 指向的图像文件需要在查询时仍然可访问(或你通过元数据自行管理)。

跨模态检索

# 文本查图像 - "以文搜图"
results = collection.query(
    query_texts=["海边的夕阳美景"],
    n_results=3,
    loader=loader
)
# 返回最相似的图像(通过 URI 访问)

# 图像查图像 - "以图搜图"
results = collection.query(
    query_uris=["./images/query_sunset.jpg"],
    n_results=3,
    loader=loader
)

文本 + 图像混合集合

你可以在同一个集合中混合存储文本和图像数据,因为它们共享同一个向量空间:

# 添加文本描述
collection.add(
    ids=["text_desc_001"],
    documents=["三亚的海滩是观赏日落的绝佳地点,金色的阳光洒在海面上"],
    metadatas=[{"type": "text_description", "location": "三亚"}]
)

# 现在可以用文本查询同时匹配文本和图像
results = collection.query(
    query_texts=["三亚日落"],
    n_results=5,
    loader=loader
)
# 结果可能同时包含图像记录和文本记录

第 8 章 高级查询与聚合

8.1 GroupBy 聚合查询(1.5.x 新特性)

GroupBy 是 Chroma v1.5.x 引入的强大聚合功能,它将搜索结果按元数据字段分组,并在每个组内应用聚合策略。这对于搜索结果多样化、去重和分类感知的排序非常有用。

基本用法

from chromadb import Search, K, Knn, GroupBy, MinK

# 按 category 分组,每组保留分数最小的 3 个结果
search = (
    Search()
    .rank(Knn(query="机器学习算法教程"))
    .group_by(
        GroupBy(
            keys=K("category"),         # 按 category 字段分组
            aggregate=MinK(
                keys=K.SCORE,           # 按分数排序
                k=3                     # 每组保留 3 个
            )
        )
    )
    .limit(30)                           # 总共返回 30 个结果
)

results = collection.search(search)

使用场景

搜索结果多样化。 假设你搜索技术文档,不希望前 10 个结果全部来自同一个来源。通过按 source 字段 GroupBy,可以确保每个来源都有代表出现在结果中。

去重。 同一个内容可能有多个版本(草稿、审核版、发布版),按 content_id 分组并取 MinK(score, k=1) 可以实现每个内容只返回最优版本。

分类感知排序。 电商场景中按 product_category 分组,每个类别返回 Top-5 商品,让用户一次搜索看到多个类别的结果。

8.2 读取一致性控制

Chroma 提供两种读取一致性级别(Read Level),允许你在数据新鲜度和查询延迟之间做权衡。

full(强一致性读取)。 查询时同时读取已索引的数据和未提交的日志条目(WAL),确保返回的结果包含所有已确认的写入操作。这是默认模式,保证数据的完整性,但查询延迟略高(需要合并两个数据源)。

index_only(索引优先低延迟)。 只查询已完成索引的数据,跳过尚未被 Compactor 处理的日志条目。查询延迟更低,但可能看不到最近写入的数据(直到 Compactor 完成索引构建)。

# 在 Chroma Cloud 或分布式模式中使用
results = collection.search(
    Search()
    .rank(Knn(query="..."))
    .read_level("index_only")  # 低延迟模式
    .limit(10)
)

选择建议: 对于面向用户的实时搜索(如电商商品搜索),如果最近写入的数据有几秒的可见延迟是可接受的,使用 index_only 获得更低延迟。对于需要保证数据完整性的场景(如审计查询、数据验证),使用 full

8.3 索引状态监控

当大量数据写入后,HNSW 索引的构建是异步进行的。Chroma 提供了索引进度查询能力:

# 查询索引构建进度
progress = collection.index_progress()
print(f"已索引: {progress.indexed_count}")
print(f"待索引: {progress.pending_count}")
print(f"进度: {progress.percentage}%")

这个 API 在生产环境中非常有用:你可以在大批量导入后监控索引进度,在索引完成前给用户展示"正在更新索引"的提示,或在索引完成后触发下游任务。

v1.5.9 增强了分片集合的重建支持,并在压缩游标中去规范化了租户和数据库信息,使得多租户环境下的索引追踪更加精确。


第 9 章 数据处理最佳实践

9.1 文档分块策略

在 RAG 系统中,原始文档通常需要被切分成较小的块(Chunk)再入库。分块策略直接影响检索质量——块太大,向量表示不够精确,检索精度下降;块太小,上下文信息丢失,生成质量降低。

分块大小对检索效果的影响

经验法则:256-1024 token 是大多数场景的最优分块大小。对于技术文档,512 token(约 300-400 个英文单词或 500-800 个中文字符)是一个好的起点。对于对话式数据,可以按对话轮次自然切分。对于代码文档,按函数或类为单位切分。

主流分块策略

固定大小分块。 按固定字符数或 token 数切分,实现简单但可能在句子中间截断。适合结构均匀的文本。

递归字符分割(推荐默认策略)。 按层级分隔符(\n\n\n. )递归切分,尽量在自然边界处断开。这是 LangChain 的 RecursiveCharacterTextSplitter 使用的策略,适合大多数场景。

from langchain_text_splitters import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,      # 每块最大字符数
    chunk_overlap=50,    # 相邻块重叠字符数
    separators=["\n\n", "\n", "。", ",", " ", ""]
)
chunks = splitter.split_text(full_text)

语义分块。 使用嵌入模型计算相邻句子的语义相似度,在相似度骤降处切分。效果最好但计算成本最高。

父子分块。 将文档组织为层级结构:小块(子块)用于检索,大块(父块)用于提供给 LLM 作为上下文。这样既能精确检索,又能提供充足的上下文。

分块重叠率设置

建议重叠率设置为分块大小的 10-20%。512 token 的块设置 50-100 token 的重叠。重叠确保了关键信息不会因为切分而丢失,也保证了相邻块之间的语义连续性。

9.2 元数据设计方法论

元数据分类

元数据字段按用途可分为两类:

过滤型元数据: 用于查询时的精确过滤,如 categorystatustenant_idcreated_at。这类字段应选择低基数(可选值有限)的类型,因为高基数(如 UUID、URL)的过滤效率较低。

展示型元数据: 用于结果展示和业务逻辑,如 titleauthorurlsummary。这类字段不参与过滤,但查询时需要返回。

高基数 vs 低基数字段

低基数字段(如 category 只有 5-10 个值)非常适合过滤——Chroma 的倒排索引在这种场景下效率极高。高基数字段(如 user_id 有百万个不同值)虽然也可以过滤,但索引效率会下降。如果必须用高基数字段过滤,考虑将其作为 ID 的一部分而非元数据。

避免过度元数据的性能陷阱

每条记录的元数据字段越多,写入时构建倒排索引的开销越大,存储空间也越大。建议:只为需要过滤的字段建立元数据,纯展示信息可以放在 documents 中或从外部系统查询;避免在元数据中存储大段文本(使用字符串类型,不超过几 KB);定期评估哪些元数据字段实际被用在过滤条件中,移除不使用的字段。

9.3 数据更新与删除

增量更新策略

对于持续更新的文档集合(如知识库、帮助中心),推荐使用以下增量更新模式:

  1. 为每个文档块生成确定性 ID(如基于文档路径和分块位置的哈希值)。
  2. 使用 upsert() 写入——新内容自动插入,变更内容自动更新,删除的内容通过对比新旧 ID 集合发现。
  3. 维护一个版本元数据字段,便于追踪数据时效性。
import hashlib

def generate_chunk_id(doc_path: str, chunk_index: int) -> str:
    """生成确定性的分块 ID"""
    raw = f"{doc_path}::chunk_{chunk_index}"
    return hashlib.sha256(raw.encode()).hexdigest()[:16]

# 增量同步
old_ids = set(collection.get()["ids"])
new_ids = set()

for doc_path, chunks in new_documents.items():
    for i, chunk in enumerate(chunks):
        chunk_id = generate_chunk_id(doc_path, i)
        new_ids.add(chunk_id)
        collection.upsert(
            ids=[chunk_id],
            documents=[chunk],
            metadatas=[{"source": doc_path, "chunk_index": i, "version": "2.0"}]
        )

# 删除不再存在的文档块
deleted_ids = list(old_ids - new_ids)
if deleted_ids:
    collection.delete(ids=deleted_ids)

软删除 vs 硬删除

软删除 是在元数据中标记删除状态(如 "deleted": true),查询时通过 where={"deleted": {"$ne": true}} 过滤。优点是可以恢复,不影响索引完整性;缺点是占用存储空间,索引中仍包含已删除向量。

硬删除 是调用 collection.delete(ids=[...]) 物理移除记录。优点是释放存储空间;缺点是不可恢复,频繁删除可能导致索引碎片。

推荐方案: 日常操作使用软删除(安全优先),定期(如每周一次)执行硬删除 + vacuum 清理空间。


第四阶段:生产部署与优化(Day 11-15)


第 10 章 Chroma 服务端部署

10.1 Docker 单机部署

Docker 是部署 Chroma 服务最便捷的方式。以下是一个生产级的 Docker Compose 配置:

version: '3.8'
services:
  chroma:
    image: chromadb/chroma:1.5.9
    container_name: chroma-prod
    ports:
      - "8000:8000"
    volumes:
      - chroma-data:/data
      - ./config.yaml:/config.yaml
    environment:
      - IS_PERSISTENT=TRUE
      - CHROMA_OPEN_TELEMETRY__ENDPOINT=http://otel-collector:4317
      - CHROMA_OPEN_TELEMETRY__SERVICE_NAME=chroma-prod
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/api/v2/heartbeat"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 30s
    deploy:
      resources:
        limits:
          memory: 8G
          cpus: '4'
        reservations:
          memory: 4G
          cpus: '2'

volumes:
  chroma-data:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: /mnt/ssd/chroma-data  # 使用 SSD 存储

关键配置要点:数据卷应使用 SSD 存储(HNSW 索引对磁盘 I/O 敏感);设置合理的内存限制(HNSW 索引必须完全加载到内存中);配置健康检查确保服务可用;通过 OpenTelemetry 集成实现分布式追踪。

也可以通过命令行直接启动(适合开发测试):

chroma run --path /data/chroma_db

10.2 认证与安全

API Key 认证配置

Chroma v1.0.0 的 Rust 重写移除了内置的 Python 认证中间件。在生产环境中,认证通常通过以下方式实现:

方式一:反向代理层认证。 在 Nginx、Traefik 或云 API Gateway 层配置认证,验证 API Key 后转发到 Chroma。

server {
    listen 443 ssl;
    server_name chroma.yourdomain.com;
    
    ssl_certificate /etc/ssl/certs/chroma.pem;
    ssl_certificate_key /etc/ssl/private/chroma.key;
    
    location / {
        # 验证 API Key
        if ($http_x_chroma_token != "your-secret-token") {
            return 401 '{"error": "Unauthorized"}';
        }
        
        proxy_pass http://chroma:8000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    }
}

方式二:RBAC 认证(Legacy 模式)。 对于仍使用旧版认证系统的部署:

# auth_config.yaml
roles:
  admin:
    actions:
      - "system:reset"
      - "collection:*"
  reader:
    actions:
      - "collection:query"
      - "collection:get"
  writer:
    actions:
      - "collection:add"
      - "collection:upsert"
      - "collection:query"
      - "collection:get"

users:
  - id: app-server
    role: writer
    tokens: ["app-writer-token-xyz"]
  - id: search-service
    role: reader
    tokens: ["search-reader-token-abc"]

HTTPS 加密传输

永远不要将 Chroma 端口直接暴露在公网上。务必通过反向代理终止 TLS,或使用 Chroma Cloud 的内置 HTTPS 支持。在 Kubernetes 环境中,使用 cert-manager 自动管理 TLS 证书。

网络隔离

在 Docker Compose 中使用自定义网络隔离:

networks:
  internal:
    driver: bridge
    internal: true  # 不可访问外部网络
  external:
    driver: bridge

services:
  chroma:
    networks: [internal]  # 只能被同网络的服务访问
  app:
    networks: [internal, external]  # 可以访问内外网络

10.3 Kubernetes 生产部署

对于需要高可用和弹性伸缩的环境,Kubernetes 是标准选择。

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: chroma
  namespace: ai-platform
spec:
  serviceName: chroma
  replicas: 1  # Chroma 单节点模式
  selector:
    matchLabels:
      app: chroma
  template:
    metadata:
      labels:
        app: chroma
    spec:
      containers:
        - name: chroma
          image: chromadb/chroma:1.5.9
          ports:
            - containerPort: 8000
          volumeMounts:
            - name: data
              mountPath: /data
          resources:
            requests:
              memory: "4Gi"
              cpu: "2"
            limits:
              memory: "8Gi"
              cpu: "4"
          livenessProbe:
            httpGet:
              path: /api/v2/heartbeat
              port: 8000
            initialDelaySeconds: 30
            periodSeconds: 30
          readinessProbe:
            httpGet:
              path: /api/v2/heartbeat
              port: 8000
            initialDelaySeconds: 10
            periodSeconds: 10
  volumeClaimTemplates:
    - metadata:
        name: data
      spec:
        accessModes: ["ReadWriteOnce"]
        storageClassName: ssd  # 使用 SSD 存储类
        resources:
          requests:
            storage: 100Gi
---
apiVersion: v1
kind: Service
metadata:
  name: chroma
  namespace: ai-platform
spec:
  selector:
    app: chroma
  ports:
    - port: 8000
      targetPort: 8000
  type: ClusterIP  # 集群内访问

注意使用 StatefulSet 而非 Deployment,因为 Chroma 的持久化数据需要有状态存储。对于读写分离架构,可以部署多个查询副本(通过 HttpClient 连接),但写入必须指向主节点。


第 11 章 分布式架构(Chroma Distributed)

11.1 分布式架构五大组件

当数据量超过单机上限或需要高可用时,Chroma 的分布式架构开始发挥作用。它由五个核心组件协作运行:

                      ┌──────────┐
                      │ Gateway  │  ← API 入口、认证、路由
                      └────┬─────┘
                           │
              ┌────────────┼────────────┐
              │            │            │
        ┌─────┴─────┐ ┌───┴───┐ ┌─────┴─────┐
        │  Log (WAL) │ │ SysDB │ │   Query   │
        │  写入日志   │ │ 目录库 │ │  查询节点  │
        └─────┬─────┘ └───────┘ └─────┬─────┘
              │                        │
        ┌─────┴─────┐                  │
        │ Compactor │                  │
        │  压缩节点  │                  │
        └─────┬─────┘                  │
              │                        │
        ┌─────┴────────────────────────┴─────┐
        │    Object Storage (S3 / GCS)       │
        │    对象存储(持久化)                 │
        └────────────────────────────────────┘

Gateway(网关层)。 用 Rust 实现的高性能 API 网关,负责认证授权、请求路由和限流。它使用 Rendezvous Hashing(会合点哈希)将请求路由到正确的分片。Gateway 是无状态的,可以水平扩展——前面放一个负载均衡器即可。

Distributed Log(分布式写入日志)。 所有写操作(add、update、delete)首先被追加到分布式日志中,确保持久性。基于 Spanner 的日志服务实现,支持 MCMR(Multi-Collection Multi-Region)的日志垃圾回收。写入操作在日志落盘后立即返回确认,索引构建异步进行。

Query Nodes(查询执行节点)。 处理所有读操作(query、get、search)。维护本地 SSD 缓存加速热数据访问。查询时同时读取已索引数据和未压缩的日志条目(在 full 读级别下),确保数据一致性。查询并行化到 vCPU 数量。

Compactor Nodes(压缩节点)。 异步地将日志条目构建成不可变的 HNSW 索引段。这个过程不阻塞读写操作——新的索引版本构建完成后原子性地替换旧版本。v1.5.9 引入了 Foundation CLI 来管理压缩操作。

SysDB(系统数据库)。 存储所有集合、租户、数据库的目录信息。追踪每个集合的压缩游标。v1.5.9 对租户和数据库信息进行了去规范化处理,提升了查询性能。

11.2 读写路径详解

写入路径

  1. 客户端发送写操作请求到 Gateway
  2. Gateway 认证请求,通过 Rendezvous Hashing 确定目标分片
  3. 写操作被追加到 Distributed Log(WAL)
  4. 立即向客户端返回确认——此时数据已持久化,但尚未被索引
  5. Compactor 异步读取日志条目,构建新的 HNSW 索引段
  6. 压缩完成的索引段写入对象存储(S3/GCS)
  7. 旧索引段被标记为可回收

这意味着写入操作具有非常低的延迟(只需写日志),但新写入的数据在 Compactor 完成索引前可能不会出现在查询结果中(取决于 Read Level 设置)。

读取路径

  1. 客户端发送查询请求到 Gateway
  2. Gateway 认证并路由到正确的查询节点
  3. 查询节点从对象存储读取已索引数据(SSD 缓存加速)
  4. 如果 Read Level 为 full,还从未压缩的日志条目中读取最新数据
  5. 合并两个数据源的结果,返回给客户端

11.3 水平扩展方案

读副本扩展

增加 Query Node 的数量可以提升查询吞吐量。每个 Query Node 是无状态的(索引数据来自对象存储),可以独立扩缩容。在 Gateway 前面放一个负载均衡器(如 ALB/Nginx),查询请求在多个节点间轮询分发。

分片策略

Chroma 的分布式模式支持通过集合级别的分片实现工作负载隔离。每个集合可以被路由到不同的存储分片。对于超大规模场景,可以在应用层实现哈希分片:

import hashlib

def get_shard_collection(client, doc_id: str, num_shards: int = 4):
    """根据文档 ID 的哈希值确定分片"""
    shard = int(hashlib.md5(doc_id.encode()).hexdigest(), 16) % num_shards
    return client.get_or_create_collection(f"docs_shard_{shard}")

查询时并行搜索所有分片并合并结果。

Chroma 原生分片限制

Chroma 原生的分片能力主要基于集合级别,单个集合内部的分片支持仍在发展中。对于需要大规模单集合分片的场景,建议在应用层实现分片逻辑(如上所示),或评估 Milvus/Qdrant 等原生支持集合内分片的系统。


第 12 章 性能优化实战

12.1 写入性能优化

批量大小最优值

Chroma 官方推荐的单节点最优批量大小为 50-250 条/批。这个范围的依据是:太小(<50)会导致每次写入都触发 HNSW 图更新,图的频繁重建带来大量开销;太大(>500)会导致单次操作的内存峰值过高,甚至 OOM。在客户端-服务器模式下,由于网络开销占比较大,可以适当增大到 500-1000 来减少网络往返次数。

# 写入性能对比测试
import time

# 不推荐:逐条写入
for doc_id, doc_text in zip(ids, documents):
    collection.add(ids=[doc_id], documents=[doc_text])
# 1 万条约需 300+ 秒

# 推荐:批量写入
batch_size = 200
for i in range(0, len(ids), batch_size):
    collection.add(
        ids=ids[i:i+batch_size],
        documents=documents[i:i+batch_size]
    )
# 1 万条约需 15-30 秒(10-20 倍提升)

并发写入控制

在持久化模式下,Chroma 使用 SQLite 的 WAL 模式实现并发读写。SQLite 支持多个并发读者但只允许一个写入者。如果你需要高并发写入,建议:使用客户端-服务器模式(Chroma 服务端有写入队列和锁管理);或在应用层使用写入队列串行化写操作;或在分布式模式中使用分布式日志(不存在 SQLite 锁竞争问题)。

大批量导入最佳实践

当需要一次性导入百万级数据时,推荐流程:

  1. 预计算所有嵌入向量(使用 GPU 加速或批处理 API)。
  2. 创建集合时设置较高的 construction_ef(如 256),因为这是一次性成本。
  3. 使用 200 条/批的批量大小写入。
  4. 导入完成后,将 search_ef 调整到适合查询的值。
  5. 执行 vacuum 操作优化存储。
# 大批量导入模板
collection = client.create_collection(
    name="million_docs",
    metadata={
        "hnsw:space": "cosine",
        "hnsw:construction_ef": 256,  # 高质量索引
        "hnsw:M": 24                  # 更多连接
    }
)

batch_size = 200
start = time.time()
for i in range(0, total_docs, batch_size):
    collection.add(
        ids=all_ids[i:i+batch_size],
        embeddings=all_embeddings[i:i+batch_size],
        documents=all_documents[i:i+batch_size],
        metadatas=all_metadatas[i:i+batch_size]
    )
    if (i // batch_size) % 100 == 0:
        elapsed = time.time() - start
        throughput = (i + batch_size) / elapsed
        print(f"进度: {i+batch_size}/{total_docs} "
              f"({(i+batch_size)/total_docs*100:.1f}%) "
              f"吞吐: {throughput:.0f} docs/s")

12.2 查询性能优化

ef_search 参数调优

hnsw:search_ef 是最常调优的查询参数,直接控制查询时的搜索范围:

# 动态调整 search_ef
collection = client.get_collection("my_docs")
collection.modify(metadata={"hnsw:search_ef": 128})  # 增大搜索宽度

调优方法:准备一组标注好的测试查询(至少 100 条),测量不同 search_ef 值下的召回率(Recall@10)和平均延迟。通常 ef_search = 32-64 就能达到 95%+ 的召回率,ef_search = 100-200 可以达到 99%+。

索引预热

Chroma 在首次访问集合时需要将 HNSW 索引加载到内存中。对于大型集合(百万级以上),首次加载可能需要几秒到几十秒。在生产环境中,建议在服务启动时主动触发预热:

# 服务启动时预热索引
_ = collection.peek(limit=1)  # 触发索引加载
_ = collection.count()         # 确认集合可用

批量查询

当需要同时执行多个查询时,使用批量查询 API 可以获得 3-10 倍的速度提升:

from chromadb import Search, Knn

searches = [
    Search().rank(Knn(query=q)).limit(10)
    for q in query_list  # 20 个查询
]
results = collection.search_batch(searches)
# results[i] 对应 searches[i] 的结果

12.3 内存优化

HNSW 索引内存估算

HNSW 索引必须完全加载到内存中才能工作。内存使用量的估算公式为:

每条向量的内存 ≈ M × 2 × sizeof(int) + dim × sizeof(float) + overhead
             = M × 8 + dim × 4 + 64  (字节)

以 1536 维向量、M=16 为例:

每条向量 = 16 × 8 + 1536 × 4 + 64 = 128 + 6144 + 64 = 6336 字节 ≈ 6.2 KB
100 万条 = 6.2 GB
1000 万条 = 62 GB

Chroma 的经验公式是:每 GB 内存可存储约 0.245 百万条 1536 维向量。也就是说,64 GB 内存可以承载约 1500 万条向量。

大集合内存溢出解决方案

当集合大小超过可用内存时,有几个策略:一是增大机器内存(最直接);二是使用维度更低的嵌入模型(如从 1536 维降到 384 维,内存需求降低 4 倍);三是将数据分片到多个节点;四是冷数据降级——将不常查询的历史数据移到独立的低配节点。

12.4 性能基准测试

建议在部署前执行基准测试,验证系统是否满足 SLA 要求:

import time
import numpy as np

# 构建测试数据
test_queries = ["query text 1", "query text 2", ...]  # 100+ 条
ground_truth = [...]  # 每条查询的真实最近邻 ID

# 测量延迟
latencies = []
for q in test_queries:
    start = time.perf_counter()
    results = collection.query(query_texts=[q], n_results=10)
    latency = time.perf_counter() - start
    latencies.append(latency)

print(f"平均延迟: {np.mean(latencies)*1000:.1f}ms")
print(f"P50 延迟: {np.percentile(latencies, 50)*1000:.1f}ms")
print(f"P95 延迟: {np.percentile(latencies, 95)*1000:.1f}ms")
print(f"P99 延迟: {np.percentile(latencies, 99)*1000:.1f}ms")

# 测量召回率
recalls = []
for q, truth in zip(test_queries, ground_truth):
    results = collection.query(query_texts=[q], n_results=10)
    retrieved_ids = set(results["ids"][0])
    recall = len(retrieved_ids & set(truth)) / len(truth)
    recalls.append(recall)

print(f"平均 Recall@10: {np.mean(recalls):.4f}")

Chroma 单节点的参考性能指标:在 64 GB 内存、1500 万条 1536 维向量的条件下,平均查询延迟约 5ms,P99 延迟低于 200ms。


第 13 章 监控与运维

13.1 可观测性体系

Chroma 内置了 OpenTelemetry 集成,可以将指标和追踪数据发送到 Jaeger、Zipkin 或任何兼容的 OTLP 收集器。

Prometheus 指标接入

配置 Chroma 将指标暴露给 Prometheus:

# docker-compose.yml 环境变量
environment:
  - CHROMA_OPEN_TELEMETRY__ENDPOINT=http://otel-collector:4317
  - CHROMA_OPEN_TELEMETRY__SERVICE_NAME=chroma-production

关键监控指标

在生产环境中需要关注的核心指标包括:

  • QPS(Queries Per Second):查询吞吐量,反映系统负载
  • 查询延迟 P50/P95/P99:响应时间的分位值,P99 应低于 200ms
  • 写入吞吐量:每秒写入的文档数
  • 内存使用率:HNSW 索引占用内存 / 总可用内存,应保持低于 80%
  • 磁盘使用率:数据目录占用空间,应保持低于 80%
  • 索引构建进度:pending_count 应持续下降
  • SQLite 锁等待时间:(持久化模式)高值表示写入竞争

告警规则配置

# Prometheus alerting rules
groups:
  - name: chroma_alerts
    rules:
      - alert: HighQueryLatency
        expr: histogram_quantile(0.99, chroma_query_duration_seconds) > 0.5
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Chroma P99 查询延迟超过 500ms"
      
      - alert: HighMemoryUsage
        expr: chroma_memory_usage_bytes / chroma_memory_limit_bytes > 0.85
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "Chroma 内存使用率超过 85%"
      
      - alert: IndexBuildStalled
        expr: chroma_pending_index_count > 0 and rate(chroma_indexed_count[5m]) == 0
        for: 15m
        labels:
          severity: warning
        annotations:
          summary: "索引构建停滞,有待处理的数据但未进展"

13.2 常见故障排查

SQLite 锁竞争问题

现象: 持久化模式下,并发写入时出现 database is locked 错误,写入延迟飙升。

原因: SQLite 虽然支持 WAL 模式的并发读取,但写操作仍然需要获取独占锁。多个并发写入者会竞争锁。

解决: 迁移到客户端-服务器模式(Chroma 服务端有写入队列管理);或在应用层串行化写操作(使用写入队列或锁);或升级到分布式模式(使用分布式日志替代 SQLite)。

索引损坏与修复

现象: 查询结果异常(缺失结果或返回不相关结果),日志中出现索引相关错误。

原因: 进程异常终止(如 OOM Kill、断电)导致 HNSW 索引文件不完整。

修复: 尝试重建索引——删除损坏的索引文件(在集合 UUID 目录下),重启 Chroma 服务,它会自动从 SQLite 元数据和 WAL 重建。如果重建失败,可能需要从备份恢复。

查询超时定位

排查步骤: 首先检查 search_ef 是否设置过高(>200 会显著增加延迟);然后检查集合大小是否超过内存限制(导致部分索引在磁盘上);最后检查是否有大量 pending 索引(查询需要合并日志数据,增加延迟)。

数据一致性校验

# 比较集合记录数和预期数量
actual = collection.count()
expected = len(expected_ids)
if actual != expected:
    print(f"数据不一致: 预期 {expected},实际 {actual}")

# 抽样验证
sample_ids = random.sample(expected_ids, min(100, len(expected_ids)))
results = collection.get(ids=sample_ids)
if len(results["ids"]) != len(sample_ids):
    missing = set(sample_ids) - set(results["ids"])
    print(f"丢失了 {len(missing)} 条记录: {list(missing)[:10]}")

13.3 数据备份与恢复

全量备份

对于持久化模式,备份整个数据目录即可:

# 备份(建议在低负载时段执行)
tar -czf chroma-backup-$(date +%Y%m%d).tar.gz /data/chroma_db/

# 恢复到新机器
tar -xzf chroma-backup-20260614.tar.gz -C /data/

对于 Docker 部署:

docker exec chroma-server tar -czf /data/backup.tar.gz /data/
docker cp chroma-server:/data/backup.tar.gz ./backup.tar.gz

增量备份

SQLite 支持在线备份(.backup 命令),可以在不影响服务的情况下创建一致性快照。建议结合 WAL 日志实现时间点恢复(PITR)。

灾难恢复流程

  1. 检测到故障(主节点不可用或数据损坏)
  2. 从最近的全量备份恢复数据到备用节点
  3. 应用增量备份和 WAL 日志追赶到最新状态
  4. 切换流量到备用节点
  5. 调查故障原因并修复

第五阶段:高级应用与实战(Day 16-25)


第 14 章 RAG 系统集成

14.1 LangChain 集成

LangChain 是最流行的 LLM 应用开发框架,Chroma 是其官方支持的向量存储后端之一。

安装与基础用法

pip install langchain-chroma langchain-openai langchain-text-splitters
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader

# 1. 加载文档
loader = PyPDFLoader("technical_manual.pdf")
docs = loader.load()

# 2. 分块
splitter = RecursiveCharacterTextSplitter(
    chunk_size=512,
    chunk_overlap=50
)
chunks = splitter.split_documents(docs)

# 3. 创建 Chroma 向量存储并导入数据
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    collection_name="tech_manual",
    persist_directory="./chroma_langchain_db"
)

# 4. 相似度搜索
results = vectorstore.similarity_search(
    query="如何配置 HNSW 索引参数",
    k=5
)

# 5. 带相关性分数的搜索
results_with_scores = vectorstore.similarity_search_with_relevance_scores(
    query="如何配置 HNSW 索引参数",
    k=5
)
for doc, score in results_with_scores:
    print(f"[{score:.4f}] {doc.page_content[:100]}")

Retriever 检索器配置

# 将 Chroma 封装为 LangChain Retriever
retriever = vectorstore.as_retriever(
    search_type="similarity",       # 或 "mmr"(最大边际相关性)
    search_kwargs={
        "k": 5,
        "filter": {"source": "technical_manual.pdf"}  # 元数据过滤
    }
)

# 在 RAG 链中使用
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

prompt = ChatPromptTemplate.from_template("""
基于以下参考资料回答问题。如果资料中没有相关信息,请说明。

参考资料:
{context}

问题:{question}

回答:
""")

llm = ChatOpenAI(model="gpt-4o", temperature=0)

rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

answer = rag_chain.invoke("Chroma 的批量写入最优大小是多少?")

MMR 多查询检索

MMR(Maximal Marginal Relevance)在检索时兼顾相关性和多样性,避免返回过于相似的结果:

retriever_mmr = vectorstore.as_retriever(
    search_type="mmr",
    search_kwargs={
        "k": 5,
        "fetch_k": 20,          # 先检索 20 个候选
        "lambda_mult": 0.5      # 0=最大多样性,1=最大相关性
    }
)

14.2 LlamaIndex 集成

pip install llama-index-vector-stores-chroma llama-index
from llama_index.core import VectorStoreIndex, StorageContext, SimpleDirectoryReader
from llama_index.vector_stores.chroma import ChromaVectorStore
import chromadb

# 初始化 Chroma
chroma_client = chromadb.PersistentClient(path="./chroma_llamaindex")
chroma_collection = chroma_client.get_or_create_collection("llama_docs")

# 创建向量存储
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
storage_context = StorageContext.from_defaults(vector_store=vector_store)

# 从文件加载并构建索引
documents = SimpleDirectoryReader("./my_docs").load_data()
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context
)

# 查询
query_engine = index.as_query_engine(similarity_top_k=5)
response = query_engine.query("Chroma 支持哪些嵌入模型?")
print(response)

14.3 RAG 进阶技巧

父子文档分块检索

核心思想:用小块检索(精确匹配),用大块生成(充足上下文)。

from langchain_text_splitters import RecursiveCharacterTextSplitter

# 父块(大)和子块(小)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=40)

parent_docs = parent_splitter.split_documents(raw_docs)
child_docs = []
parent_id_map = {}

for parent in parent_docs:
    parent_id = hashlib.md5(parent.page_content.encode()).hexdigest()[:12]
    children = child_splitter.split_documents([parent])
    for child in children:
        child.metadata["parent_id"] = parent_id
        child_docs.append(child)
    parent_id_map[parent_id] = parent

# 子块入 Chroma
vectorstore.add_documents(child_docs)

# 检索时:找到子块 → 回溯父块 → 用父块作为 LLM 上下文

检索效果评估

核心指标包括 Recall@K(前 K 个结果中包含正确答案的比例)、MRR(Mean Reciprocal Rank,正确答案的排名倒数的平均值)和 NDCG(考虑排名位置的归一化折扣累积增益)。建议构建一个评估数据集(至少 100 条 query-relevant_doc 对),在每次调整参数后运行评估。


第 15 章 多语言客户端开发

15.1 TypeScript / JavaScript 客户端

安装与初始化

npm install chromadb
# 或
yarn add chromadb
import { ChromaClient, OpenAIEmbeddingFunction } from "chromadb";

// 内存模式(仅开发测试用)
const client = new ChromaClient();

// HTTP 客户端(连接 Chroma 服务)
const httpClient = new ChromaClient({
    path: "http://localhost:8000",
});

// Chroma Cloud 客户端
const cloudClient = new ChromaClient({
    path: "https://api.trychroma.com",
    tenant: "your-tenant",
    database: "your-database",
    token: "your-api-key",
});

完整操作示例

import { ChromaClient, OpenAIEmbeddingFunction } from "chromadb";

async function main() {
    const client = new ChromaClient({ path: "http://localhost:8000" });
    
    // 创建嵌入函数
    const embedder = new OpenAIEmbeddingFunction({
        openai_api_key: process.env.OPENAI_API_KEY!,
        openai_model: "text-embedding-3-small",
    });
    
    // 创建集合
    const collection = await client.getOrCreateCollection({
        name: "ts_documents",
        embeddingFunction: embedder,
        metadata: { "hnsw:space": "cosine" },
    });
    
    // 添加数据
    await collection.add({
        ids: ["doc1", "doc2", "doc3"],
        documents: [
            "TypeScript provides type safety for JavaScript",
            "Chroma supports multiple programming languages",
            "Vector databases enable semantic search",
        ],
        metadatas: [
            { category: "programming", lang: "typescript" },
            { category: "database", lang: "multi" },
            { category: "database", lang: "multi" },
        ],
    });
    
    // 语义查询
    const results = await collection.query({
        queryTexts: ["typesafe programming languages"],
        nResults: 3,
        where: { category: "programming" },
    });
    
    console.log("IDs:", results.ids);
    console.log("Documents:", results.documents);
    console.log("Distances:", results.distances);
    
    // 按 ID 获取
    const records = await collection.get({
        ids: ["doc1"],
        include: ["documents", "metadatas", "embeddings"],
    });
    
    // 统计
    const count = await collection.count();
    console.log(`Total records: ${count}`);
    
    // 更新
    await collection.update({
        ids: ["doc1"],
        documents: ["TypeScript is a typed superset of JavaScript"],
        metadatas: [{ category: "programming", lang: "typescript", updated: true }],
    });
    
    // 删除
    await collection.delete({ ids: ["doc3"] });
}

main().catch(console.error);

浏览器端使用

TypeScript 客户端可以通过 HTTP 连接在浏览器中直接使用。但要注意 CORS 配置——需要在 Chroma 服务端配置允许的来源:

# config.yaml
cors:
  allow_origins: ["https://your-frontend.com"]
  allow_methods: ["*"]
  allow_headers: ["*"]

安全警告:永远不要在前端代码中硬编码 Chroma Cloud API Key 或 OpenAI API Key。应该通过后端代理服务转发请求。

15.2 Rust / Go 客户端

Rust 客户端提供最低延迟的访问能力,特别适合对性能要求极高的后端服务。v1.5.9 新增了自动只读后端故障转移功能:

// Rust 客户端示例(概念性,API 可能随版本变化)
use chromadb::client::Client;

let client = Client::builder()
    .with_url("http://localhost:8000")
    .with_read_only_failover(true)  // v1.5.9: 自动故障转移到只读副本
    .build();

let collection = client.get_or_create_collection("docs").await?;

// 查询
let results = collection.query(
    vec!["What is vector search?"],
    5,
    None,  // where filter
    None,  // where_document filter
).await?;

Go 客户端也在活跃开发中,适合 Go 技术栈的团队。使用场景主要是微服务架构中需要直接访问 Chroma 的 Go 服务。


第 16 章 数据同步与自动化

16.1 Chroma Sync 生态

Chroma Cloud 提供了自动数据同步功能,可以从外部数据源自动解析、分块、嵌入并索引数据。

GitHub 仓库同步

将 GitHub 仓库的文档自动同步到 Chroma:

# 创建 GitHub 同步源
curl -X POST https://sync.trychroma.com/api/v1/sources \
  -H "x-chroma-token: $CHROMA_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "database_name": "my-db",
    "github": {
      "repo": "chroma-core/chroma",
      "branch": "main",
      "collection_name": "chroma-docs",
      "path_filter": "docs/**/*.md"
    }
  }'

同步后,仓库中的 Markdown 文件会被自动分块、嵌入并索引。支持两种模式:Direct Sync(使用 Chroma Cloud GitHub App)和 Platform Sync(自定义 GitHub App)。

网站爬虫同步(Web Sync)

自动爬取公开 URL,提取 Markdown 内容并索引:

curl -X POST https://sync.trychroma.com/api/v1/sources \
  -H "x-chroma-token: $CHROMA_API_KEY" \
  -d '{
    "database_name": "my-db",
    "web": {
      "url": "https://docs.trychroma.com",
      "collection_name": "chroma-docs-web",
      "max_depth": 3
    }
  }'

S3 数据同步

从 S3 存储桶自动同步文件:

curl -X POST https://sync.trychroma.com/api/v1/sources \
  -H "x-chroma-token: $CHROMA_API_KEY" \
  -d '{
    "database_name": "my-db",
    "s3": {
      "bucket_name": "my-docs-bucket",
      "region": "us-east-1",
      "collection_name": "s3-docs",
      "aws_credential_id": 42,
      "path_prefix": "documents/"
    }
  }'

配置 S3 Event Notifications 后,新上传的文件会自动触发索引。支持 .meta.json 伴生文件控制元数据。

16.2 自定义数据管道

对于自建 Chroma 部署(非 Cloud),需要自行构建数据管道。以下是一个通用的文档处理流水线模板:

import chromadb
from pathlib import Path
from langchain_text_splitters import RecursiveCharacterTextSplitter

class DocumentPipeline:
    """文档处理流水线:解析 → 分块 → 向量化 → 入库"""
    
    def __init__(self, chroma_path: str, collection_name: str):
        self.client = chromadb.PersistentClient(path=chroma_path)
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"}
        )
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=512,
            chunk_overlap=50
        )
    
    def process_file(self, file_path: str):
        """处理单个文件"""
        text = Path(file_path).read_text(encoding="utf-8")
        chunks = self.splitter.split_text(text)
        
        ids = []
        documents = []
        metadatas = []
        
        for i, chunk in enumerate(chunks):
            chunk_id = f"{Path(file_path).stem}_chunk_{i}"
            ids.append(chunk_id)
            documents.append(chunk)
            metadatas.append({
                "source": str(file_path),
                "chunk_index": i,
                "total_chunks": len(chunks)
            })
        
        # 批量 upsert
        self.collection.upsert(
            ids=ids,
            documents=documents,
            metadatas=metadatas
        )
        return len(chunks)
    
    def process_directory(self, dir_path: str, pattern: str = "**/*.md"):
        """批量处理目录中的所有文件"""
        total = 0
        for path in Path(dir_path).glob(pattern):
            chunks = self.process_file(str(path))
            total += chunks
            print(f"已处理: {path} ({chunks} 个分块)")
        return total

# 使用
pipeline = DocumentPipeline("./chroma_db", "docs_collection")
pipeline.process_directory("./my_docs/")

定时增量同步

结合文件修改时间戳实现增量同步:

import os
import json
from datetime import datetime

SYNC_STATE_FILE = "sync_state.json"

def load_sync_state():
    if os.path.exists(SYNC_STATE_FILE):
        return json.load(open(SYNC_STATE_FILE))
    return {}

def save_sync_state(state):
    json.dump(state, open(SYNC_STATE_FILE, "w"))

def incremental_sync(dir_path: str):
    state = load_sync_state()
    changed_files = []
    
    for path in Path(dir_path).glob("**/*.md"):
        mtime = os.path.getmtime(path)
        key = str(path)
        if key not in state or state[key] < mtime:
            changed_files.append(str(path))
            state[key] = mtime
    
    if changed_files:
        pipeline = DocumentPipeline("./chroma_db", "docs_collection")
        for f in changed_files:
            pipeline.process_file(f)
        save_sync_state(state)
        print(f"同步了 {len(changed_files)} 个变更文件")
    else:
        print("没有需要同步的变更")

第 17 章 企业级特性

17.1 多租户架构设计

在 SaaS 应用中,需要为不同租户(客户组织)提供隔离的数据空间。Chroma 支持三种多租户模式。

模式一:集合级隔离(强隔离)

def get_tenant_collection(client, tenant_id: str):
    return client.get_or_create_collection(
        name=f"tenant_{tenant_id}_docs",
        metadata={"tenant_id": tenant_id}
    )

每个租户拥有独立的集合。优点是数据完全隔离,可以独立管理(删除租户只需删除其集合),查询性能不受其他租户影响。缺点是集合数量多时管理开销大,资源碎片化。适合租户数量较少(<100)且对隔离要求高的场景。

模式二:共享集合 + 元数据过滤(逻辑隔离)

shared = client.get_or_create_collection("all_tenants")

# 写入时标记 tenant_id
shared.add(
    ids=[f"t1_doc1", f"t2_doc1"],
    documents=["Tenant 1 doc", "Tenant 2 doc"],
    metadatas=[
        {"tenant_id": "t1"},
        {"tenant_id": "t2"}
    ]
)

# 查询时强制过滤 tenant_id
results = shared.query(
    query_texts=["search query"],
    where={"tenant_id": "t1"},  # 必须在中间件层强制注入
    n_results=5
)

资源利用率高,但需要中间件确保 tenant_id 过滤不被绕过。绝对不要信任客户端传入的过滤条件——必须在服务端中间件层强制注入。

class TenantMiddleware:
    def __init__(self, client, tenant_id):
        self.collection = client.get_or_create_collection("shared_data")
        self.tenant_id = tenant_id
    
    def _inject_filter(self, extra_where=None):
        tenant_filter = {"tenant_id": self.tenant_id}
        if extra_where:
            return {"$and": [tenant_filter, extra_where]}
        return tenant_filter
    
    def query(self, query_texts, where=None, **kwargs):
        return self.collection.query(
            query_texts=query_texts,
            where=self._inject_filter(where),
            **kwargs
        )
    
    def add(self, ids, documents, metadatas=None, **kwargs):
        metadatas = metadatas or [{} for _ in ids]
        for m in metadatas:
            m["tenant_id"] = self.tenant_id  # 强制注入
        return self.collection.add(ids=ids, documents=documents, metadatas=metadatas, **kwargs)

模式三:Chroma 原生多租户

admin_client = chromadb.AdminClient(url="http://localhost:8000")

# 创建租户和数据库
admin_client.create_tenant("company_a")
admin_client.create_database("production", tenant="company_a")

# 客户端限定到特定租户
tenant_client = chromadb.HttpClient(
    host="localhost",
    port=8000,
    tenant="company_a",
    database="production"
)

这是最干净的隔离方式,但需要 Chroma 服务端的租户管理功能支持。

17.2 合规与安全

客户管理加密密钥(CMEK)

对于高安全要求的环境,数据应在存储时加密。Chroma Cloud 支持 CMEK(Customer-Managed Encryption Keys),客户通过自己的 KMS(如 AWS KMS 或 GCP KMS)管理加密密钥,Chroma 无法访问明文密钥。

数据驻留

Chroma Cloud 支持指定部署区域(如 aws-us-east-1、EU 区域)。对于 GDPR 合规要求,确保数据存储在欧盟区域内的服务器上。自建部署则通过选择服务器物理位置来实现数据驻留。

审计日志

在生产环境中应记录所有数据访问操作:

import logging

audit_logger = logging.getLogger("chroma_audit")

class AuditedCollection:
    def __init__(self, collection, user_id):
        self.collection = collection
        self.user_id = user_id
    
    def query(self, *args, **kwargs):
        audit_logger.info(f"User {self.user_id} queried collection {self.collection.name}")
        return self.collection.query(*args, **kwargs)
    
    def add(self, *args, **kwargs):
        audit_logger.info(f"User {self.user_id} added data to {self.collection.name}")
        return self.collection.add(*args, **kwargs)
    
    def delete(self, *args, **kwargs):
        audit_logger.info(f"User {self.user_id} deleted from {self.collection.name}")
        return self.collection.delete(*args, **kwargs)

第六阶段:精通与架构设计(Day 26-30)


第 18 章 Chroma 底层原理深度解析

18.1 存储引擎内幕

Chroma 使用三层存储架构,理解这些层次有助于排查性能问题和优化部署。

SQLite 元数据存储

SQLite 存储了所有的元数据信息:集合目录(名称、UUID、配置)、每条记录的 ID 和元数据字段、倒排索引(用于元数据过滤)。SQLite 使用 WAL(Write-Ahead Logging)模式,允许多个并发读者与一个写入者同时工作。元数据过滤的倒排索引是 Chroma 自动构建和维护的,对每个字符串/整数/浮点数元数据字段建立独立的倒排索引,使得 where 条件可以在 O(log N) 时间内定位到匹配的文档 ID 集合。

SQLite 的局限性在于写入串行化:所有写操作都需要获取独占锁。在高并发写入场景下,这会成为瓶颈。解决方案包括使用客户端-服务器模式(服务端有写入队列)或分布式模式(使用分布式日志替代 SQLite)。

Parquet 列式向量存储

Chroma 使用 Apache Parquet 格式存储嵌入向量和文档文本。Parquet 是列式存储格式,对于向量化数据特别高效——所有向量的同一维度存储在连续的磁盘空间中,可以利用 SIMD 指令进行批量距离计算。Parquet 还内置了压缩(通常使用 Snappy 或 Zstd),显著减少磁盘占用。

HNSWlib 索引文件

HNSW 索引以 HNSWlib 的原生二进制格式存储在磁盘上。文件包含图的邻接表(每个节点的邻居列表)和原始向量数据。当集合被首次访问时,整个索引文件被加载到内存中。对于大型集合,这个加载过程可能需要数秒——这就是为什么生产环境中需要索引预热。

持久化目录下的文件布局如下:

./chroma_data/
  ├── chroma.sqlite3              # 元数据、集合目录、倒排索引
  ├── {collection_uuid}/
  │   ├── *.bin                   # HNSW 索引文件(图 + 向量)
  │   ├── header.bin              # 索引元信息(维度、距离度量、参数)
  │   └── *.parquet               # 文档文本和嵌入的列式存储
  └── wal/                        # Write-Ahead Log 段

WAL3 日志文件结构

WAL3 是 Chroma 分布式模式的第三代写入日志架构。与单机模式的 SQLite WAL 不同,WAL3 基于对象存储(S3/GCS)实现,支持跨多个集合和区域的日志管理。v1.5.9 引入的 MCMR(Multi-Collection Multi-Region)支持使垃圾回收器能够高效地处理多个集合的日志清理。

WAL 的核心设计原则是"写入即确认":一旦写操作被追加到 WAL 并持久化,就立即向客户端返回成功。索引构建在后台异步进行,Compactor 定期将 WAL 中的条目压缩为不可变的索引段。

18.2 HNSW 算法深度

分层导航小世界图原理

HNSW(Hierarchical Navigable Small World)算法由 Malkov 和 Yashunin 于 2016 年提出,其核心思想是构建一个多层的近邻图。可以把它类比为"跳表(Skip List)的图版本":

Layer 2(最稀疏,长程连接):
    A ──────────> D ──────────> G

Layer 1(中等密度):
    A ────> B ────> D ────> F ────> G

Layer 0(最稠密,包含所有节点):
    A → B → C → D → E → F → G → H

每一层都是一个"可导航的小世界图"——任意两个节点之间可以通过少量跳数(O(log N))到达。高层节点稀疏,跳过的距离大(类似高速公路);底层节点稠密,覆盖精细的局部近邻关系(类似城市街道)。

每个节点被随机分配到若干层。层数 l 的分配公式为:

l = floor(-ln(uniform(0,1)) × m_L)

其中 m_L = 1/ln(M) 是归一化因子,M 是最大连接数。大多数节点只在 Layer 0,少数节点到达 Layer 1,极少数到达更高层——形成指数递减的层级结构。

构建过程

当插入一个新节点 q 时:

  1. 随机确定 q 的最高层 l
  2. 从入口节点(最高层的某个节点)开始,在每一层贪心搜索最近的 f 个节点(f = ef_construction)
  3. 在 q 存在的每一层,将 q 与搜索到的最近节点建立双向连接
  4. 如果连接数超过 M,使用多样性启发式(Diversity Heuristic) 裁剪:优先保留那些彼此距离较远的邻居,避免所有邻居都聚集在一个方向上

多样性启发式是 HNSW 相比朴素近邻图的关键优化——它确保每个节点的邻居分布在不同方向上,从而在搜索时能有效覆盖整个空间。

查询过程

查询分为两个阶段:

阶段一:贪心下降(Greedy Descent)。 从最高层的入口节点开始,在每一层贪心地移动到最近的邻居,直到找不到更近的节点。然后下降到下一层,重复这个过程。这个阶段的目的是快速定位到查询向量附近的大致区域。

阶段二:底层精确搜索。 到达 Layer 0 后,使用 ef_search 个候选进行更精细的 beam search。维护一个大小为 ef_search 的优先队列,不断扩展最近的未探索节点,直到队列为空或达到终止条件。

参数对性能的数学影响

参数 增大效果 减小效果
M 索引质量提升,召回率增加,内存占用线性增长,构建速度变慢 索引变小,内存节省,构建更快,但召回率下降
ef_construction 索引图质量更优(更好的邻居选择),构建时间 O(N·log N·ef) 增长 构建更快,但图质量下降,影响查询精度上限
ef_search 召回率提升(更接近精确搜索),查询延迟线性增长 查询更快,但召回率下降

插入复杂度: O(log N × ef_construction)
查询复杂度: O(log N × ef_search)
内存复杂度: O(N × M × log N)(M 个连接 × log N 层)
构建复杂度: O(N × log N × ef_construction)

理解这些复杂度有助于在不同场景下做出正确的参数选择。例如,对于只读场景(数据一次性导入后不再变化),可以大胆提高 ef_construction 和 M,因为构建是一次性成本。对于低延迟场景(如实时搜索),需要在 ef_search 上仔细权衡,找到满足召回率要求的最小值。


第 19 章 大规模系统架构设计

19.1 百万级向量方案

单机优化上限

Chroma 单节点的实际上限约为 1500 万条 1536 维向量(需要 64 GB 内存)。在这个规模下,单机部署仍然是可行的,但需要注意几个优化要点。

首先是内存管理。HNSW 索引必须完全驻留在内存中,1500 万条 1536 维向量大约需要 62 GB 的 RAM(加上系统开销,总共需要 64 GB)。如果同时有其他服务运行在同一台机器上,内存竞争会导致性能严重下降。建议为 Chroma 专用一台机器。

其次是磁盘 I/O。虽然 HNSW 索引在查询时完全在内存中操作,但写入(WAL 刷盘)、索引构建(Compactor 将索引段写入磁盘)和首次加载(将索引从磁盘加载到内存)都涉及大量磁盘 I/O。使用 NVMe SSD 是必须的。

读写分离架构

当查询 QPS 超过单节点承载能力时,可以部署读写分离架构:

                    ┌─────────────┐
                    │  Load       │
                    │  Balancer   │
                    └──────┬──────┘
                           │
            ┌──────────────┼──────────────┐
            │              │              │
     ┌──────┴──────┐ ┌────┴────┐  ┌──────┴──────┐
     │  Writer     │ │ Reader  │  │  Reader     │
     │  (主节点)    │ │ (副本1)  │  │  (副本2)    │
     └──────┬──────┘ └────┬────┘  └──────┬──────┘
            │              │              │
     ┌──────┴──────────────┴──────────────┴──────┐
     │            共享存储 (NFS / 对象存储)         │
     └───────────────────────────────────────────┘

写入请求只发往主节点,读取请求在多个副本间负载均衡。副本通过共享存储或定期数据同步获取最新数据。这种架构适合读多写少的场景(如知识库搜索)。

19.2 千万级向量方案

应用层分片设计

当数据量超过单机上限(如 5000 万条向量)时,需要在应用层实现分片。每个分片是一个独立的 Chroma 实例,负责一部分数据:

import hashlib
import chromadb

class ShardedChroma:
    """应用层分片的 Chroma 管理器"""
    
    def __init__(self, hosts: list[str], collection_name: str):
        self.clients = [
            chromadb.HttpClient(host=host, port=8000)
            for host in hosts
        ]
        self.collection_name = collection_name
        self.num_shards = len(hosts)
    
    def _get_shard(self, doc_id: str) -> int:
        """一致性哈希确定分片"""
        h = int(hashlib.md5(doc_id.encode()).hexdigest(), 16)
        return h % self.num_shards
    
    def add(self, ids, documents, metadatas=None):
        """按分片分组写入"""
        shard_groups = {}
        for i, doc_id in enumerate(ids):
            shard = self._get_shard(doc_id)
            if shard not in shard_groups:
                shard_groups[shard] = {"ids": [], "documents": [], "metadatas": []}
            shard_groups[shard]["ids"].append(doc_id)
            shard_groups[shard]["documents"].append(documents[i])
            if metadatas:
                shard_groups[shard]["metadatas"].append(metadatas[i])
        
        for shard, data in shard_groups.items():
            collection = self.clients[shard].get_or_create_collection(self.collection_name)
            collection.add(**data)
    
    def query(self, query_texts, n_results=10):
        """并行查询所有分片并合并"""
        import concurrent.futures
        
        all_results = []
        
        def query_shard(client):
            collection = client.get_or_create_collection(self.collection_name)
            return collection.query(query_texts=query_texts, n_results=n_results)
        
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [executor.submit(query_shard, c) for c in self.clients]
            for f in concurrent.futures.as_completed(futures):
                all_results.append(f.result())
        
        return self._merge_results(all_results, n_results)
    
    def _merge_results(self, results_list, n_results):
        """合并多个分片的查询结果"""
        # 收集所有结果并按距离排序
        merged_ids = []
        merged_docs = []
        merged_dists = []
        
        for results in results_list:
            for i in range(len(results["ids"][0])):
                merged_ids.append(results["ids"][0][i])
                merged_docs.append(results["documents"][0][i])
                merged_dists.append(results["distances"][0][i])
        
        # 按距离排序取 Top-N
        combined = list(zip(merged_dists, merged_ids, merged_docs))
        combined.sort(key=lambda x: x[0])
        top_n = combined[:n_results]
        
        return {
            "ids": [[item[1] for item in top_n]],
            "documents": [[item[2] for item in top_n]],
            "distances": [[item[0] for item in top_n]]
        }

分片策略选择

哈希分片(如上例):实现简单,数据分布均匀,但同一个文档的不同分块可能分散在不同分片上。适合 ID 无业务含义的场景。

业务分片:按业务维度(如租户、地域、时间)分片。例如按 tenant_id 分片使每个租户的数据集中在同一个分片上。优点是业务逻辑清晰,查询时只需访问相关分片;缺点是数据可能不均匀。

19.3 亿级以上选型建议

Chroma 的边界与局限

Chroma 的设计初衷是"AI 应用的数据基础设施",在百万到千万级向量规模上表现出色。但在亿级以上规模,以下局限需要考虑:

  • 分片能力有限: Chroma 原生支持的分片主要在集合级别,单集合内部的自动分片尚在发展中。亿级场景需要应用层分片,增加了系统复杂度。
  • 内存效率: HNSW 索引的内存占用在高维大数据量下非常可观。1 亿条 1536 维向量约需要 620 GB 内存——即使使用量化技术也很难压缩到单机可承载的范围。
  • 运维复杂度: 多节点 Chroma 集群的运维(备份、升级、故障恢复)需要自建工具链。

何时考虑迁移

当以下情况出现时,建议评估其他向量数据库:

  • 数据量持续增长,预计 1 年内超过 5000 万条
  • 需要原生分片和自动负载均衡
  • 需要向量级的量化压缩(如标量量化、乘积量化)以降低成本
  • 需要成熟的多租户管理和 RBAC

候选系统对比:

  • Milvus/Zilliz: 专为大规模设计,原生支持分片、副本和量化。社区活跃,但运维复杂度较高。
  • Qdrant: Rust 实现,性能优秀,原生支持分片和量化。API 设计简洁。
  • Pinecone: 全托管服务,零运维,但成本较高且不支持自托管。
  • Weaviate: 支持混合搜索和多模态,但大规模性能不如 Milvus/Qdrant。

平滑迁移方案

从 Chroma 迁移到其他系统的建议步骤:

  1. 导出 Chroma 中的所有数据(ID、文档、嵌入向量、元数据)
  2. 在目标系统中创建对应的集合和索引
  3. 分批导入数据到目标系统
  4. 双写验证:同时写入新旧系统一段时间,对比查询结果
  5. 逐步切换流量到新系统
  6. 确认无误后下线 Chroma 实例
# 数据导出脚本
def export_chroma_collection(collection, output_path: str):
    """导出 Chroma 集合的所有数据"""
    import json
    
    all_data = collection.get(include=["documents", "metadatas", "embeddings"])
    
    records = []
    for i in range(len(all_data["ids"])):
        records.append({
            "id": all_data["ids"][i],
            "document": all_data["documents"][i],
            "metadata": all_data["metadatas"][i],
            "embedding": all_data["embeddings"][i]
        })
    
    with open(output_path, "w") as f:
        json.dump(records, f)
    
    print(f"导出了 {len(records)} 条记录到 {output_path}")

第 20 章 向量检索前沿技术

20.1 量化技术

标量量化 vs 乘积量化

量化技术通过降低向量的存储精度来减少内存占用和加速距离计算,是大规模向量检索中的关键技术。

标量量化(Scalar Quantization, SQ) 将每个维度的浮点数映射为较低精度的表示。最常见的是 INT8 量化:将 float32(4 字节/维)映射为 int8(1 字节/维),内存占用降低 4 倍。量化过程为:

quantized_value = round((value - min) / (max - min) × 255)

INT8 量化通常只损失 1-2% 的召回率,是非常实用的优化手段。更激进的 INT4/Binary 量化可以进一步降低到 0.5 字节/维,但召回率损失更大(5-10%)。

乘积量化(Product Quantization, PQ) 将高维向量分割为多个子空间,对每个子空间独立做聚类量化。例如将 768 维向量分成 96 个 8 维的子空间,每个子空间用 256 个聚类中心(8 bit)表示。最终每个向量只需 96 字节(而非 3072 字节),压缩比达 32 倍。PQ 的缺点是量化过程需要训练(K-Means 聚类),且对高维向量的压缩效果依赖于各子空间之间的独立性。

Chroma 量化支持现状

截至 v1.5.9,Chroma 尚未内置标量量化或乘积量化的支持。HNSW 索引中的向量以 float32 精度存储。Chroma 的替代方案是 SPANN 索引(在 Chroma Cloud 的 Schema 系统中可选),它支持量化选项。开源社区的量化支持在路线图中,预计后续版本会引入。

如果你急需量化来降低内存,可以考虑:使用维度更低的嵌入模型(如从 1536 维降到 384 维);在应用层对嵌入向量做量化后再存入 Chroma(但可能影响检索精度);或迁移到原生支持量化的系统(如 Qdrant、Milvus)。

20.2 检索增强技术

重排序(Reranking)

重排序是提升检索精度的关键后处理步骤。基本流程是:先用向量检索粗排(Recall 阶段,取 Top-100),再用更精确但更慢的重排序模型精排(Precision 阶段,从 100 中选 Top-10)。

# 使用 Cohere Rerank
import cohere

co = cohere.Client("your-api-key")

# 第一阶段:粗排
initial_results = collection.query(
    query_texts=["Chroma 向量数据库的部署方案"],
    n_results=50
)

# 第二阶段:重排序
reranked = co.rerank(
    query="Chroma 向量数据库的部署方案",
    documents=[doc for doc in initial_results["documents"][0]],
    model="rerank-v3.5",
    top_n=10
)

# 使用重排序后的结果
final_docs = [initial_results["documents"][0][r.index] for r in reranked.results]

常用的重排序方案包括:Cohere Rerank API(开箱即用,效果好)、Cross-Encoder 模型(如 bge-reranker,可本地部署)、ColBERT(延迟交互模型,精度极高但开销大)。

假设性文档嵌入(HyDE)

HyDE(Hypothetical Document Embeddings)是一种查询改写技术:先让 LLM 生成一个"假设性答案文档",再用这个文档的嵌入去检索,而非用原始查询的嵌入。

from openai import OpenAI

client = OpenAI()

def hyde_query(original_query: str) -> str:
    """生成假设性文档"""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": f"请回答以下问题(即使不确定也请给出最佳猜测):\n{original_query}"
        }],
        max_tokens=200
    )
    return response.choices[0].message.content

# 使用 HyDE
hypothetical_answer = hyde_query("Chroma 1.5.9 中的 MaxScore 索引有什么作用?")

# 用假设性答案的嵌入去检索(比原始问题的嵌入更接近真实文档)
results = collection.query(
    query_texts=[hypothetical_answer],
    n_results=10
)

HyDE 的原理是:用户的问题通常是短小的疑问句,而数据库中的文档是陈述句形式。两者的嵌入在向量空间中可能相距较远。通过生成一个陈述句形式的假设性答案,其嵌入会更接近真实文档的嵌入,从而提升检索召回率。

查询改写与扩展

查询改写通过扩展或重构用户的原始查询来提升检索效果:

def multi_query_expansion(query: str) -> list[str]:
    """多查询扩展:从不同角度重写查询"""
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": f"请从 3 个不同角度重写以下查询,以提升检索效果:\n{query}"
        }]
    )
    # 解析得到 3 个变体查询
    variants = parse_variants(response.choices[0].message.content)
    return variants

# 多路召回
variants = multi_query_expansion("如何优化 Chroma 的查询性能")
all_results = []
for v in variants:
    results = collection.query(query_texts=[v], n_results=10)
    all_results.extend(results["ids"][0])

# 去重 + RRF 融合
unique_ids = list(dict.fromkeys(all_results))  # 保持顺序去重

第 21 章 常见坑与避坑指南

坑 1:并发写入锁竞争问题

症状: 在持久化模式下,多个线程/进程同时写入时出现 database is locked 错误或写入延迟飙升。

根因: SQLite 的 WAL 模式支持并发读取,但写操作仍需获取排他锁。

解决方案:

  • 迁移到客户端-服务器模式(推荐)
  • 在应用层使用写入队列串行化写操作
  • 减少写入频率,增大批量大小

坑 2:嵌入维度不匹配错误

症状: 向集合写入数据时报错 Dimensionality error: expected 384, got 1536

根因: 集合的嵌入函数输出维度与写入的向量维度不一致。常见触发场景:更换了嵌入模型但没有创建新集合;或在 get_collection 时忘记传入嵌入函数。

解决方案:

  • 每个嵌入模型对应一个新集合,不要混用
  • get_collection 时必须传入与创建时相同的嵌入函数
  • 如果需要切换模型,创建新集合并迁移数据

坑 3:持久化路径容器化陷阱

症状: Docker 容器重启后数据丢失。

根因: 没有正确挂载数据卷。Chroma 在容器内的 /data 目录存储数据,如果不挂载到主机,容器销毁时数据也随之丢失。

解决方案:

# 正确:使用 -v 挂载数据卷
docker run -v /host/data:/data -p 8000:8000 chromadb/chroma

# 错误:没有挂载数据卷
docker run -p 8000:8000 chromadb/chroma  # 容器重启后数据丢失!

另一个常见错误是挂载路径不正确——Chroma 的持久化路径必须与容器内的 /data 对应,而非其他自定义路径。

坑 4:大集合首次加载延迟

症状: 应用启动后,第一次查询某个大集合时响应极慢(数秒甚至数十秒),后续查询恢复正常。

根因: Chroma 在首次访问集合时需要将整个 HNSW 索引从磁盘加载到内存中。百万级向量的索引文件可能有数 GB,加载需要时间。

解决方案:

  • 在应用启动时主动预热索引(执行一次 peek()count()
  • 配置健康检查时包含一次简单查询,确保索引已加载
  • 对于关键集合,考虑使用内存预加载脚本

坑 5:元数据过滤性能退化

症状: 添加了复杂的元数据过滤条件后,查询延迟显著增加。

根因: 元数据过滤需要先通过倒排索引找到满足条件的文档 ID 集合,然后再在这个子集上做向量搜索。如果过滤后的子集仍然很大(如 >100 万),或者过滤条件非常复杂(多层嵌套的 $and/$or),过滤本身的开销就会变得显著。

解决方案:

  • 优先使用低基数字段做过滤(categoryuser_id 快)
  • 避免过度嵌套的逻辑运算
  • 考虑将高基数的过滤条件移到应用层后处理
  • 使用 $in 代替多个 $or 条件

坑 6:忘记处理查询结果的二维结构

症状: 代码中出现 IndexErrorTypeError,因为对结果结构理解有误。

根因: Chroma 的 query() 返回的结果是二维列表——results["ids"][[id1, id2, ...]] 而非 [id1, id2, ...]。外层列表对应多个查询文本,内层列表对应每个查询的多个结果。

# 正确访问方式
first_query_results = results["ids"][0]      # 第一个查询的结果列表
first_result_id = results["ids"][0][0]       # 第一个查询的第一个结果 ID

# 错误访问方式
first_result_id = results["ids"][0]          # 这拿到的是整个列表!

坑 7:集合名称不合法

症状: 创建集合时报 InvalidCollectionName 错误。

根因: 名称太短(<3 字符)、包含非法字符(空格、特殊符号)、以连字符/点/下划线开头或结尾。

解决方案: 使用字母和数字开头的名称,长度 3-512 字符,只用字母、数字、连字符、下划线和点(且不以这些特殊字符开头/结尾)。


附录 A:Chroma 版本迁移指南

从 0.x 迁移到 1.x

v1.0.0 是一个破坏性版本,主要变更包括:

  1. 核心引擎 Rust 重写: 性能大幅提升,但 API 有细微差异。
  2. 配置方式变更: 从环境变量迁移到 YAML 配置文件。旧的 CHROMA_* 环境变量不再支持。
  3. 认证系统移除: 内置的 Python 认证中间件被移除。需要通过反向代理或 API 网关实现认证。
  4. list_collections 返回值: 从 v0.6.0 开始返回 Collection 对象列表(之前返回名称字符串列表)。
  5. 客户端构造函数: 使用 chromadb.Client()PersistentClient()HttpClient() 替代旧的 chromadb.Client(Settings(...)) 方式。

迁移步骤

# 1. 备份现有数据
tar -czf chroma-backup-before-migration.tar.gz /data/chroma_db/

# 2. 升级 chromadb
pip install --upgrade chromadb>=1.5.9

# 3. 更新代码中的客户端创建方式
# 旧版:
# client = chromadb.Client(chromadb.Settings(persist_directory="./db"))
# 新版:
# client = chromadb.PersistentClient(path="./db")

# 4. 创建 YAML 配置文件替代环境变量
# config.yaml:
#   server:
#     host: "0.0.0.0"
#     port: 8000
#   storage:
#     persist_directory: /data/chroma_db

# 5. 验证数据完整性
python -c "
import chromadb
client = chromadb.PersistentClient(path='./db')
for c in client.list_collections():
    print(f'{c.name}: {c.count()} records')
"

附录 B:推荐学习资源

官方资源:

技术论文:

  • Malkov & Yashunin (2016). “Efficient and robust approximate nearest neighbor search using Hierarchical Navigable Small World graphs” — HNSW 算法原始论文
  • Robertson & Walker (1994). “Some simple effective approximations to the 2-Poisson model for probabilistic weighted retrieval” — BM25 算法原始论文
  • Formal et al. (2021). “SPLADE: Sparse Lexical and Expansion Model for First Passage Retrieval” — SPLADE 模型论文
  • Radford et al. (2021). “Learning Transferable Visual Models From Natural Language Supervision” — CLIP 模型论文
  • Cormack et al. (2009). “Reciprocal Rank Fusion outperforms Condorcet and individual Rank Learning Methods” — RRF 算法论文
  • Lewis et al. (2020). “Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks” — RAG 原始论文

社区教程:

  • LangChain + Chroma 官方集成文档
  • LlamaIndex + Chroma 集成指南
  • Chroma Docker 部署最佳实践

本教程基于 Chroma v1.5.9(2026 年 5 月发布)编写。Chroma 迭代速度较快,部分 API 和配置可能在新版本中有变化,请以官方文档为准。

如有问题或建议,欢迎通过 GitHub Issues 反馈。祝你在向量数据库的世界里探索愉快!

0

评论区