大模型应用开发必知必会:RAG、向量数据库与实战

本文整理自我和ai的对话,大概介绍了RAG、向量数据库的原理与选型,并附上Chroma和Milvus的详细代码示例,让我好快速上手()


1. RAG:检索增强生成

1.1 前世今生:为什么需要RAG?

大语言模型(LLM)虽然强大,但有三个固有缺陷:

  • 知识截止日期:模型训练完成后便无法获取新知识。
  • 幻觉问题:对于不确定的问题,模型可能会”编造”答案。
  • 无法访问私有数据:企业内部文档、实时数据等不在训练集中。

为了解决这些问题,RAG(Retrieval-Augmented Generation,检索增强生成) 应运而生。它的核心思想是:在LLM回答问题之前,先从外部知识库中检索出相关文档,然后将文档与问题一起交给LLM生成答案。这样既保证了答案的时效性和准确性,又无需频繁重新训练模型。

RAG最早由Facebook AI在2020年的论文《Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks》中提出,随后迅速成为大模型落地的主流范式。如今,几乎所有的AI助手、客服机器人、企业知识库问答系统都采用了RAG架构。

1.2 原理与结构

一个典型的RAG系统包含三个核心模块:

  1. 索引模块

    • 将原始文档(PDF、网页、数据库记录等)切分成较小的文本块(Chunking)。
    • 嵌入模型(如 text-embedding-ada-002)将每个文本块转换为向量(高维空间中的点)。
    • 将向量及其对应的文本块存入向量数据库
  2. 检索模块

    • 用户提问时,用相同的嵌入模型将问题转换为向量。
    • 在向量数据库中进行相似性搜索(余弦距离、欧氏距离等),找到与问题向量最相似的K个文本块。
    • 返回这些文本块作为”检索结果”。
  3. 生成模块

    • 将检索到的文本块与原始问题组合成一个提示(Prompt),交给LLM。
    • LLM基于提供的上下文生成最终答案。
1
2
3
4
5
6
7
graph LR
A[用户问题] --> B[嵌入模型]
B --> C[向量数据库<br>相似性搜索]
C --> D[检索到的文本块]
D --> E[提示词构造]
E --> F[大语言模型]
F --> G[最终答案]

这种结构让模型既能利用内部参数知识,又能动态获取外部信息,大幅提升回答质量。


2. 向量数据库:RAG的基石

2.1 为什么不能用传统数据库?

你可能会问:”向量不也就是一堆数字吗?存在SQL表里,查询时算一下距离不就行了?”理论上可行,但实际中有三大障碍:

  1. 缺乏高效索引
    SQL数据库的B-Tree索引专为精确匹配(如等值查询)和范围查询(如大于、小于)设计,无法处理”找相似向量”这种模糊查询。如果要计算查询向量与表中每一行的距离(全表扫描),当数据量达到百万甚至亿级时,响应时间会从毫秒级变成分钟级。

  2. 高维诅咒
    向量通常是几百到上千维,计算两个高维向量的距离本身就很耗时。全表扫描的计算量随数据量线性增长,无法支撑实时应用。

  3. 近似最近邻(ANN)
    实际场景中,我们并不需要绝对精确的最近邻,只要”足够相似”即可。但传统SQL无法利用近似算法(如HNSW、IVF)来大幅加速,这些算法可以通过牺牲微小的精度来换取数量级的性能提升。

因此,向量数据库应运而生,它专门为高维向量的相似性检索而设计,内置了高效的索引结构和距离计算优化。

2.2 索引结构及算法原理

向量数据库的核心在于近似最近邻(ANN)索引,常见算法有:

算法 原理 特点
IVF (倒排文件) 将向量空间通过聚类(K-Means)划分为多个区域(聚类中心)。查询时,先找到离查询向量最近的几个聚类中心,然后只在这些聚类的内部搜索。 速度快,适合中等精度需求,可配合乘积量化(PQ)进一步压缩内存。
HNSW (分层可导航小世界) 构建多层图结构:上层是”高速公路”,用长距离边快速定位到目标区域;下层是”精细街道”,进行精确搜索。搜索时从上层开始,逐层向下细化。 目前最流行的算法之一,精度高、速度快,适合需要高召回率的场景。
PQ (乘积量化) 将高维向量分解成多个子空间,分别量化,从而大幅压缩向量大小(减少内存占用)。查询时通过查表快速计算近似距离。 节省内存,适合超大规模数据集(十亿级以上),但精度有所损失。
ScaNN (Google) 结合树结构和量化,通过重排机制进一步提升精度。 性能优异,但实现复杂,主要用于Google内部。

选型建议

  • 如果数据量在百万级以内,IVF足以应对。
  • 如果需要极高精度且数据量适中,HNSW是首选。
  • 如果数据量达到亿级且内存有限,PQ或其变体(如IVF-PQ)是更好的选择。

2.3 SQL + 扩展实现向量检索

既然传统SQL不原生支持向量检索,但我们可以通过扩展来弥补。最典型的例子是 pgvector(PostgreSQL扩展):

  • 它允许在PostgreSQL中创建向量类型的列,并建立IVFFlat或HNSW索引。
  • 查询时使用 <-> 操作符计算欧氏距离,或 <=> 计算余弦距离。
  • 优点是可以复用现有的PostgreSQL基础设施,简化架构;缺点是性能不如专用向量数据库(尤其当数据量极大时)。

示例(pgvector):

1
2
3
4
5
6
7
8
9
10
11
-- 创建表
CREATE TABLE items (id bigserial PRIMARY KEY, embedding vector(384));

-- 插入向量
INSERT INTO items (embedding) VALUES ('[0.1,0.2,...]');

-- 建立索引
CREATE INDEX ON items USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

-- 查询最近邻
SELECT * FROM items ORDER BY embedding <=> '[0.1,0.2,...]' LIMIT 5;

2.4 主流向量数据库对比与技术选型

数据库 部署方式 核心特性 适用场景 性能 生态
Chroma 轻量级/嵌入式/服务端 简单易用、Python原生、内存优先 原型开发、小数据量场景 中等 轻量
Milvus 分布式/云原生 高可用、水平扩展、多索引支持 生产环境、大数据量 完善
Pinecone 全托管SaaS 零运维、自动扩缩容 云原生应用 封闭
Weaviate 混合搜索 向量+全文检索、GraphQL API 语义搜索、知识图谱 中高 丰富
PGVector PostgreSQL扩展 兼容SQL、事务支持 已有PG生态、中小规模 中等 成熟

选型建议:

  • 学习和原型:Chroma(5分钟上手)
  • 中小型生产:Qdrant 或 Weaviate(兼顾性能和易用性)
  • 大型生产:Milvus(功能最全) 或 Pinecone(托管服务)

三、Chroma与Milvus实战教程

本章从零开始,手把手带你完成从安装到查询的全流程。每一步都有可直接运行的代码,按顺序执行即可。


3.1 Chroma 使用教程

Chroma 是最适合入门的向量数据库,无需启动额外服务,pip install 后即能用。

第一步:安装依赖

1
pip install chromadb sentence-transformers
  • chromadb:Chroma 数据库本体。
  • sentence-transformers:用于把文字转成向量的开源嵌入模型库。

安装过程中会下载模型权重(约90MB),需要保持网络畅通。如果速度慢,可以提前设置镜像:

1
pip install chromadb sentence-transformers -i https://pypi.tuna.tsinghua.edu.cn/simple

第二步:初始化客户端

Chroma 有两种运行模式,根据需求二选一:

1
2
3
4
5
6
7
8
import chromadb
from chromadb.utils import embedding_functions

# 模式一:内存模式(程序结束后数据清空,适合测试)
client = chromadb.Client()

# 模式二:持久化模式(数据保存到磁盘,推荐使用)
client = chromadb.PersistentClient(path="./chroma_db")

新手建议:先用持久化模式,这样数据不会因为程序重启而丢失,./chroma_db 会在当前目录下自动创建。

第三步:配置嵌入函数

嵌入函数负责将文字转换为向量。这里使用 all-MiniLM-L6-v2,它体积小、速度快,中英文都能处理,是入门首选。

1
2
3
4
# 首次运行会自动下载模型(约90MB),之后会缓存到本地
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L6-v2"
)

第四步:创建集合(Collection)

集合相当于数据库里的一张”表”,同一个集合里的所有文档使用相同的嵌入模型。

1
2
3
4
5
collection = client.get_or_create_collection(
name="my_documents", # 集合名称,可以自定义
embedding_function=sentence_transformer_ef,
metadata={"hnsw:space": "cosine"} # 使用余弦相似度衡量文本相关性
)

get_or_create_collection 很贴心:集合不存在时自动创建,已存在时直接获取,不会重复创建。

第五步:插入文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 准备要存入的文档
documents = [
"RAG是检索增强生成的缩写",
"向量数据库用于存储和检索高维向量",
"Chroma是轻量级的向量数据库",
"Milvus支持分布式部署和大规模向量检索",
"PGVector是PostgreSQL的向量扩展"
]

# 每条文档必须有唯一ID
ids = [f"id{i}" for i in range(len(documents))]

# 元数据(可选),方便后续过滤
metadatas = [{"source": "rag_guide"} for _ in documents]

# 插入数据(Chroma会自动调用嵌入函数将文字转为向量)
collection.add(
documents=documents,
ids=ids,
metadatas=metadatas
)

print(f"已插入 {collection.count()} 条文档")
# 输出:已插入 5 条文档

第六步:查询相似文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
query = "什么是向量数据库?"

results = collection.query(
query_texts=[query], # 传入自然语言问题,Chroma自动转向量后检索
n_results=3, # 返回最相似的3条结果
# where={"source": "rag_guide"} # 可选:按元数据过滤
)

# 打印结果
for i, (doc, distance) in enumerate(zip(results["documents"][0], results["distances"][0])):
# distance是余弦距离,值越小越相似;相似度 = 1 - distance
similarity = 1 - distance
print(f"{i+1}. {doc}")
print(f" 相似度:{similarity:.4f}\n")

预期输出

1
2
3
4
5
6
7
8
1. 向量数据库用于存储和检索高维向量
相似度:0.8923

2. Chroma是轻量级的向量数据库
相似度:0.7856

3. Milvus支持分布式部署和大规模向量检索
相似度:0.6543

第七步:更新与删除

1
2
3
4
5
6
7
8
9
10
11
12
# 更新已有文档(通过ID指定)
collection.update(
ids=["id0"],
documents=["RAG(Retrieval-Augmented Generation)检索增强生成,结合检索和生成的AI技术"]
)

# 删除文档
collection.delete(ids=["id4"])

# 查看当前集合信息
print(f"文档数量: {collection.count()}")
# 输出:文档数量: 4

完整可运行代码

将以下代码保存为 chroma_demo.py,直接运行即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import chromadb
from chromadb.utils import embedding_functions

# 1. 初始化持久化客户端
client = chromadb.PersistentClient(path="./chroma_db")

# 2. 配置嵌入函数(首次运行会下载模型)
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L6-v2"
)

# 3. 创建集合
collection = client.get_or_create_collection(
name="my_documents",
embedding_function=sentence_transformer_ef,
metadata={"hnsw:space": "cosine"}
)

# 4. 插入文档
documents = [
"RAG是检索增强生成的缩写",
"向量数据库用于存储和检索高维向量",
"Chroma是轻量级的向量数据库",
"Milvus支持分布式部署和大规模向量检索",
"PGVector是PostgreSQL的向量扩展"
]
ids = [f"id{i}" for i in range(len(documents))]
metadatas = [{"source": "rag_guide"} for _ in documents]

collection.add(documents=documents, ids=ids, metadatas=metadatas)
print(f"已插入 {collection.count()} 条文档")

# 5. 查询
query = "什么是向量数据库?"
results = collection.query(query_texts=[query], n_results=3)

print(f"\n查询:{query}\n检索结果:")
for i, (doc, distance) in enumerate(zip(results["documents"][0], results["distances"][0])):
print(f"{i+1}. {doc} (相似度: {1 - distance:.4f})")

3.2 Milvus 使用教程

Milvus 是面向生产环境的向量数据库,支持亿级数据量和分布式部署。相比 Chroma,它的配置更复杂,但性能和扩展性都更强。

第一步:启动 Milvus 服务

Milvus 是独立服务,需要先用 Docker 启动,再从 Python 连接。

前提条件:已安装 Docker Desktop

1
2
3
4
5
# 下载 Milvus 的 Docker Compose 配置文件
wget https://github.com/milvus-io/milvus/releases/download/v2.4.0/milvus-standalone-docker-compose.yml -O docker-compose.yml

# 启动 Milvus(后台运行)
docker-compose up -d

Windows 用户:如果没有 wget,可以直接在浏览器打开上面的链接下载文件,保存为 docker-compose.yml,然后在同目录下运行 docker-compose up -d

启动后验证是否成功:

1
docker-compose ps

看到 milvus-standalone 状态为 Up 即表示启动成功。

第二步:安装 Python SDK

1
pip install pymilvus sentence-transformers

第三步:连接 Milvus

1
2
3
4
5
6
7
8
9
10
11
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility
from sentence_transformers import SentenceTransformer

# 连接到本地 Milvus 服务
connections.connect(
alias="default",
host="localhost", # Milvus 服务地址
port="19530" # 默认端口
)

print("Milvus 连接成功!")

第四步:加载嵌入模型

1
2
3
4
5
6
# 加载嵌入模型(首次运行会自动下载,约90MB)
model = SentenceTransformer('all-MiniLM-L6-v2')

# 测试:将一句话转为向量
test_vector = model.encode(["测试文本"])
print(f"向量维度:{test_vector.shape[1]}") # 输出:向量维度:384

记住这个维度(384),后续定义集合结构时需要用到。

第五步:定义集合结构(Schema)

Milvus 中的集合需要预先定义字段,类似关系数据库的建表语句。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 定义每个字段的类型和约束
fields = [
# 主键字段(自动递增,不需要手动指定)
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
# 存储原始文本(最长500个字符)
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=500),
# 向量字段(dim必须与嵌入模型输出维度一致)
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384),
# 元数据字段(用于过滤)
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=100)
]

# 组合成 Schema
schema = CollectionSchema(
fields=fields,
description="RAG文档集合",
enable_dynamic_field=True # 允许插入 Schema 中未定义的额外字段
)

第六步:创建集合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
collection_name = "rag_documents"

# 如果已存在同名集合,先删除(开发阶段方便重置)
if utility.has_collection(collection_name):
utility.drop_collection(collection_name)
print(f"已删除旧集合:{collection_name}")

# 创建集合
collection = Collection(
name=collection_name,
schema=schema,
using='default' # 使用名为 default 的连接
)

print(f"集合 {collection_name} 创建成功!")

第七步:创建向量索引

索引决定了查询的速度和精度,插入数据前必须创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
index_params = {
"index_type": "HNSW", # 使用 HNSW 算法,精度高且速度快
"metric_type": "COSINE", # 余弦相似度(适合文本语义检索)
"params": {
"M": 8, # 每个节点的最大连接数,越大精度越高但内存占用也越多
"efConstruction": 64 # 构建索引时的搜索范围,越大索引质量越好但构建越慢
}
}

collection.create_index(
field_name="embedding", # 对向量字段建立索引
index_params=index_params
)

print("索引创建成功!")

第八步:插入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
documents = [
"RAG是检索增强生成的缩写",
"向量数据库用于存储和检索高维向量",
"Chroma是轻量级的向量数据库",
"Milvus支持分布式部署和大规模向量检索",
"PGVector是PostgreSQL的向量扩展"
]
sources = ["rag_guide"] * len(documents)

# 用嵌入模型将文本转为向量列表
embeddings = model.encode(documents).tolist()

# 插入数据(字段顺序需与 Schema 中非主键字段的顺序对应)
insert_result = collection.insert([
documents, # content 字段
embeddings, # embedding 字段
sources # source 字段
])

# flush() 确保数据从内存写入磁盘,持久化保存
collection.flush()

print(f"成功插入 {len(insert_result.primary_keys)} 条数据")
print(f"自动生成的ID:{insert_result.primary_keys}")

第九步:加载集合并查询

重要:Milvus 查询前必须先调用 load() 将集合加载到内存,否则会报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 将集合加载到内存(每次启动程序后都需要执行)
collection.load()
print("集合已加载到内存,可以开始查询")

# 准备查询
query = "什么是向量数据库?"
query_embedding = model.encode([query]).tolist()

# 配置搜索参数
search_params = {
"metric_type": "COSINE", # 必须与索引的 metric_type 一致
"params": {"ef": 64} # ef 越大,搜索精度越高,但速度越慢
}

# 执行相似性搜索
results = collection.search(
data=query_embedding, # 查询向量
anns_field="embedding", # 在哪个字段上搜索
param=search_params,
limit=3, # 返回最相似的3条
output_fields=["content", "source"] # 返回哪些字段的值
)

# 打印结果
print(f"\n查询:{query}\n检索结果:")
for i, hit in enumerate(results[0]):
print(f"{i+1}. {hit.entity.get('content')}")
print(f" 相似度:{hit.score:.4f} | 来源:{hit.entity.get('source')}")
print(f" 文档ID:{hit.id}\n")

预期输出

1
2
3
4
5
6
7
8
9
10
11
12
13
查询:什么是向量数据库?
检索结果:
1. 向量数据库用于存储和检索高维向量
相似度:0.8923 | 来源:rag_guide
文档ID:448928598761420801

2. Chroma是轻量级的向量数据库
相似度:0.7856 | 来源:rag_guide
文档ID:448928598761420802

3. Milvus支持分布式部署和大规模向量检索
相似度:0.6543 | 来源:rag_guide
文档ID:448928598761420803

第十步:带元数据过滤的混合检索

向量相似度检索可以结合精确的元数据过滤,实现更精准的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 只在 source == "rag_guide" 的文档中做相似性检索
filter_results = collection.search(
data=query_embedding,
anns_field="embedding",
param=search_params,
limit=3,
expr='source == "rag_guide"', # 过滤条件,支持 ==, !=, >, <, in 等
output_fields=["content", "source"]
)

print("带过滤条件的检索结果:")
for hit in filter_results[0]:
print(f"- {hit.entity.get('content')}")

过滤语法类似 SQL 的 WHERE 子句,例如:

  • source == "rag_guide" 精确匹配
  • source in ["rag_guide", "wiki"] 多值匹配
  • id > 100 数值范围过滤

第十一步:用完释放资源

1
2
3
4
5
6
7
# 将集合从内存中卸载(释放内存,但数据仍保留在磁盘)
collection.release()

# 断开连接(程序结束时调用)
connections.disconnect("default")

print("资源已释放")

完整可运行代码(Milvus)

确保 Docker 中 Milvus 已启动后运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility
from sentence_transformers import SentenceTransformer

# ===== 1. 连接 =====
connections.connect(alias="default", host="localhost", port="19530")
print("连接成功!")

# ===== 2. 加载嵌入模型 =====
model = SentenceTransformer('all-MiniLM-L6-v2')

# ===== 3. 定义集合结构 =====
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=500),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=100),
]
schema = CollectionSchema(fields=fields, description="RAG文档集合", enable_dynamic_field=True)

# ===== 4. 创建集合 =====
collection_name = "rag_documents"
if utility.has_collection(collection_name):
utility.drop_collection(collection_name)
collection = Collection(name=collection_name, schema=schema)
print(f"集合 {collection_name} 创建成功!")

# ===== 5. 创建索引 =====
collection.create_index(
field_name="embedding",
index_params={"index_type": "HNSW", "metric_type": "COSINE", "params": {"M": 8, "efConstruction": 64}}
)
print("索引创建成功!")

# ===== 6. 插入数据 =====
documents = [
"RAG是检索增强生成的缩写",
"向量数据库用于存储和检索高维向量",
"Chroma是轻量级的向量数据库",
"Milvus支持分布式部署和大规模向量检索",
"PGVector是PostgreSQL的向量扩展"
]
embeddings = model.encode(documents).tolist()
sources = ["rag_guide"] * len(documents)

result = collection.insert([documents, embeddings, sources])
collection.flush()
print(f"成功插入 {len(result.primary_keys)} 条数据")

# ===== 7. 加载并查询 =====
collection.load()

query = "什么是向量数据库?"
query_embedding = model.encode([query]).tolist()
search_params = {"metric_type": "COSINE", "params": {"ef": 64}}

results = collection.search(
data=query_embedding,
anns_field="embedding",
param=search_params,
limit=3,
output_fields=["content", "source"]
)

print(f"\n查询:{query}\n检索结果:")
for i, hit in enumerate(results[0]):
print(f"{i+1}. {hit.entity.get('content')} (相似度: {hit.score:.4f})")

# ===== 8. 释放资源 =====
collection.release()
connections.disconnect("default")

3.3 关键注意事项

Chroma 注意事项

  • 默认内存存储,重启后数据丢失,生产环境务必用 PersistentClient
  • get_or_create_collection 是幂等的,多次调用不会重复创建。
  • 内置多种嵌入函数(OpenAI、Cohere、HuggingFace),也可自定义。
  • 使用余弦相似度时,distances 返回的是距离(越小越相似),需要 1 - distance 才是相似度。

Milvus 注意事项

  • 必须先启动 Docker 中的 Milvus 服务,Python 代码才能连接。
  • 插入数据后调用 flush(),否则数据可能在程序崩溃时丢失。
  • 每次程序启动后,查询前必须调用 collection.load()
  • 字段数据插入顺序必须与 Schema 中非主键字段的定义顺序一致。
  • searchmetric_type 必须与创建索引时的 metric_type 保持一致。