静态知识库过时了!Agent+Milvus:动态记忆与按需检索实战指南

最近在技术社区看到一个常见问题:为什么我的 AI 助手明明安装了向量数据库,但无法记住历史对话?
原因在于,采用RAG 架构的AI助手,最初只是为 LLM 提供外部知识查询能力,不是为动态记忆管理设计的。
那么拥有动态记忆的AI助手要如何搭建,RAG架构、agentic RAG、agent的演化路线是如何的,我们要如何对其进行选型以及记忆能力的配置?
本文将梳理从 RAG 到智能体记忆的演进路径,以及 Milvus 在每个阶段的核心使用方式。
01
第一阶段:RAG——只读的外部知识库
RAG 的本质是什么?
RAG(检索增强生成)本质就是给 LLM 配备外部知识库,让它回答问题前先查询资料。2020 年 Lewis 等人提出这个概念,用于解决 LLM 的知识截止日期问题。
RAG 的工作流程:先在离线阶段将文档切分成小块并转换为向量存入数据库,运行时将用户问题也转换为向量并检索最相似的 Top-K 文档,最后将检索结果和用户问题一起输入 LLM 生成答案。
不难发现,在实现 RAG 时,最大的技术挑战是:如何在百万级/亿级向量中实现毫秒级检索?
传统数据库在这个场景下存在明显短板,要么不支持向量索引,要么只采用简单的向量检索插件,在百万级数据上检索延迟巨大,无法满足实时性要求。
Milvus在内向量数据库则可以 提供专业的向量索引(HNSW、IVF_FLAT 等),做到百亿数据的毫秒级延迟。
Milvus 实现 RAG 的核心代码如下:
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataTypeimport openai# ===== 工具函数:统一的向量化接口 =====def embed(text): """将文本转换为向量""" response = openai.Embedding.create( input=text, model="text-embedding-ada-002" ) return response["data"][0]["embedding"]# ===== 步骤1:连接Milvus并创建Collection =====connections.connect(host="localhost", port="19530")fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536), FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=512),]schema = CollectionSchema(fields=fields, description="RAG Knowledge Base")collection = Collection(name="rag_knowledge", schema=schema)# ===== 步骤2:创建索引 =====index_params = { "index_type": "HNSW", "metric_type": "COSINE", "params": {"M": 16, "efConstruction": 256}}collection.create_index(field_name="embedding", index_params=index_params)# ===== 步骤3:离线索引——批量插入文档 =====def ingest_documents(documents): """批量插入文档到向量库""" texts = [] embeddings = [] sources = [] for doc in documents: texts.append(doc["text"]) embeddings.append(embed(doc["text"])) sources.append(doc["source"]) data = [texts, embeddings, sources] collection.insert(data) collection.flush() # 确保数据持久化 print(f"✅ 已索引 {len(documents)} 条文档")# ===== 步骤4:在线检索——RAG查询 =====def rag_retrieval(query, top_k=5): """传统RAG检索:每次必检索""" collection.load() # 加载到内存 query_embedding = embed(query) search_params = {"metric_type": "COSINE", "params": {"ef": 100}} results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=top_k, output_fields=["text", "source"] ) return results[0]# ===== 使用示例 =====docs = [ {"text": "Milvus是开源向量数据库,支持HNSW索引", "source": "docs/intro.md"}, {"text": "RAG通过检索增强生成提升LLM能力", "source": "docs/rag.md"}]ingest_documents(docs)results = rag_retrieval("什么是向量数据库?")for hit in results: print(f"相似度: {hit.score:.3f} | 内容: {hit.entity.get('text')}")
通过以上架构示意图和代码,我们可以发现,在传统的simple RAG架构中,每次查询都强制检索,并且知识库在离线阶段构建后运行时无法更新,且所有知识来自同一向量库无法动态切换数据源。
这些问题的根源在于:RAG 把检索当作必选项,而不是可选的精细化分类的工具。
这些问题在简单问答场景影响不大,但在复杂智能体系统中会成为瓶颈。
那么能否让系统像人类一样,根据问题类型决定是否需要查资料?这就是agentic RAG 的核心思想。
02
第二阶段:agentic RAG —按需检索
传统simple RAG 每次查询都强制调用检索,不管是否真的需要额外知识。agentic RAG的突破是:把检索变成可选工具,Agent 自主决策是否需要检索、检索什么来源、结果是否可信。
相应的,在这个过程中,我们需要引入多 Collection 架构。
因为,单 Collection 架构存在明显问题:检索结果混杂不同领域内容,不同领域的向量混在同一语义空间导致召回准确率下降,且无法针对不同类型知识设置差异化检索策略。
多 Collection 方案的优势在于:每个领域独立索引(product_docs、api_reference、customer_cases等),Agent 根据问题类型精准路由,不同领域使用同的索引参数和过滤条件。
下面通过实际代码展示如何用 Milvus 实现这套多 Collection 架构。
核心要点是:为每个领域创建独立 Collection,Agent 根据问题类型动态路由检索。
from pymilvus import connections, Collectionconnections.connect(host='localhost', port='19530')# ===== 创建多个专业领域的Collection =====class MultiSourceRAG: def __init__(self): self.collections = { "product_docs": Collection("product_docs"), # 产品文档 "api_reference": Collection("api_reference"), # API参考 "customer_cases": Collection("customer_cases"), # 客户案例 "tech_blogs": Collection("tech_blogs") # 技术博客 } # 加载所有Collection到内存 for coll in self.collections.values(): coll.load()# ===== Agent决策:智能检索路由 =====def smart_retrieve(question, agent_decision): """ Agent决策示例: { "need_retrieval": True, "target_collections": ["api_reference", "tech_blogs"], "top_k": 5, "filters": {"publish_date": ">= 2024-01-01"} } """ if not agent_decision["need_retrieval"]: return [] # Agent判断不需要检索 rag = MultiSourceRAG() results = [] for coll_name in agent_decision["target_collections"]: collection = rag.collections[coll_name] # 构建动态过滤表达式 filter_expr = None if "filters" in agent_decision: filters = agent_decision["filters"] if "publish_date" in filters: filter_expr = f'publish_date {filters["publish_date"]}' # 执行检索 search_params = {"metric_type": "IP", "params": {"nprobe": 16}} search_results = collection.search( data=[embed(question)], anns_field="embedding", param=search_params, limit=agent_decision["top_k"], expr=filter_expr, # Milvus支持标量过滤 output_fields=["text", "source", "publish_date"] ) results.extend(search_results[0]) return results# ===== 检索质量评估 =====def retrieve_with_quality_check(question, threshold=0.7): """Agent评估检索质量,决定下一步行动""" collection = Collection("product_docs") collection.load() results = collection.search( data=[embed(question)], anns_field="embedding", param={"metric_type": "IP", "params": {"nprobe": 16}}, limit=5 ) # 过滤低质量结果 high_quality_results = [ hit for hit in results[0] if hit.score >= threshold ] # Agent决策 if not high_quality_results: return {"action": "FALLBACK_TO_WEB_SEARCH", "reason": "本地知识库召回质量不足"} return {"action": "USE_RESULTS", "data": high_quality_results}
尽管agentic RAG 在检索决策上实现了突破,但仍有一个核心问题未解决:
智能体 RAG 的核心问题未解决:知识库依然是只读的。Agent 可以决定什么时候读、读什么,但不能写入新知识、更新旧知识、删除过时知识。这引出了下一阶段:agent memory系统。
03
第三阶段:agent memory
Agent 记忆需要完整的 CRUD 能力:实时保存对话中的偏好和事件,检索历史会话中的相关记忆,修正用户提供的新信息,清理过期或无效记录。这要求底层存储系统支持运行时的写入和更新操作。
但是实践中,不同类型的记忆无法使用统一策略。比如,用户说"我喜欢简洁的回复"是长期偏好,需保留数月甚至数年;但"今天天气怎么样"这类对话只需保留几天。
如果混合存储,会导致:查询"用户沟通偏好"时结果混杂大量无关对话,检索精度下降;无法设置差异化过期策略,要么误删长期偏好,要么历史对话无限膨胀拖垮性能。
解决方案是按生命周期分类:程序性记忆 Collection 存储长期偏好(importance > 0.8);情景记忆 Collection 存储对话历史(30-90 天过期);语义记忆 Collection 存储事实知识(长期有效可修正)。
以下是借助Milvus 实现多 Collection 隔离 + 混合索引 + 动态 CRUD架构如何应用于agent memory的参考。
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collectionfrom datetime import datetimeconnections.connect(host='localhost', port='19530')# ===== Collection Schema定义 =====def create_memory_collection(name, description): """创建标准化的记忆Collection""" fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="user_id", dtype=DataType.VARCHAR, max_length=64), FieldSchema(name="content", dtype=DataType.VARCHAR, max_length=5000), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=768), FieldSchema(name="importance", dtype=DataType.FLOAT), FieldSchema(name="created_at", dtype=DataType.INT64), FieldSchema(name="metadata", dtype=DataType.JSON), ] schema = CollectionSchema(fields=fields, description=description) collection = Collection(name=name, schema=schema) # 创建向量索引 collection.create_index( field_name="embedding", index_params={"index_type": "HNSW", "metric_type": "IP", "params": {"M": 16}} ) # 创建标量索引(加速user_id过滤) collection.create_index(field_name="user_id", index_params={"index_type": "TRIE"}) collection.load() return collection# ===== 初始化三种记忆类型 =====class AgentMemorySystem: def __init__(self): self.procedural_memory = create_memory_collection( "procedural_memory", "用户偏好与行为规则" ) self.episodic_memory = create_memory_collection( "episodic_memory", "对话历史与事件记录" ) self.semantic_memory = create_memory_collection( "semantic_memory", "事实性知识" )memory_system = AgentMemorySystem()
核心操作:记忆写入(Create)
def store_memory(memory_type, user_id, content, importance=0.5, metadata=None): """实时写入记忆""" # 选择对应的Collection if memory_type == "procedural": collection = memory_system.procedural_memory elif memory_type == "episodic": collection = memory_system.episodic_memory else: collection = memory_system.semantic_memory # 准备数据 data = [{ "user_id": user_id, "content": content, "embedding": embed(content), "importance": importance, "created_at": int(datetime.now().timestamp()), "metadata": metadata or {} }] # 实时插入 collection.insert(data) collection.flush() # 确保持久化 print(f"✅ 已存储{memory_type}记忆: {content[:50]}...")# ===== 使用场景 =====# 场景1:Agent从对话中提取用户偏好store_memory( memory_type="procedural", user_id="user_123", content="用户喜欢简洁的回复,多用emoji", importance=0.8, metadata={"category": "communication_style"})# 场景2:记录对话事件store_memory( memory_type="episodic", user_id="user_123", content="用户提到10月30日要去巴黎旅行,需要推荐景点", importance=0.9, metadata={"event_type": "travel_plan", "date": "2024-10-30"})# 场景3:存储事实性知识store_memory( memory_type="semantic", user_id="user_123", content="埃菲尔铁塔位于法国巴黎,高330米,建于1889年", importance=0.7, metadata={"entity": "埃菲尔铁塔", "source": "wikipedia"})
核心操作:记忆检索(Read)
def retrieve_memories(user_id, query, memory_type="all", top_k=5, min_importance=0.3): """智能记忆检索:支持多类型+过滤""" query_embedding = embed(query) results = {} # 构建过滤表达式(用户隔离 + 重要性过滤) filter_expr = f'user_id == "{user_id}" && importance >= {min_importance}' search_params = {"metric_type": "IP", "params": {"ef": 100}} # 选择要检索的Collection collections_to_search = [] if memory_type in ["all", "procedural"]: collections_to_search.append(("procedural", memory_system.procedural_memory)) if memory_type in ["all", "episodic"]: collections_to_search.append(("episodic", memory_system.episodic_memory)) if memory_type in ["all", "semantic"]: collections_to_search.append(("semantic", memory_system.semantic_memory)) # 执行检索 for mem_type, collection in collections_to_search: search_results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=top_k, expr=filter_expr, output_fields=["content", "importance", "created_at", "metadata"] ) results[mem_type] = search_results[0] return results# ===== 使用场景 =====memories = retrieve_memories( user_id="user_123", query="用户想了解巴黎旅游信息", memory_type="all", min_importance=0.7)print("🧠 召回的记忆:")for mem_type, hits in memories.items(): print(f"\n【{mem_type}】:") for hit in hits[:2]: print(f" 相似度: {hit.score:.3f} | {hit.entity.get('content')[:60]}...")
核心操作:记忆更新与删除(Update & Delete)
def update_memory(collection, memory_id, new_content, new_importance=None): """ 更新记忆(Milvus 2.3+支持upsert) 注意:生产环境需考虑一致性问题 当前"先删后插"方案存在风险: - 删除成功但插入失败 → 记忆丢失 - 并发更新 → 数据竞争 推荐:使用Milvus的upsert操作(原子性) """ # 先删除旧记忆 collection.delete(expr=f"id == {memory_id}") # 插入新记忆 data = [{ "content": new_content, "embedding": embed(new_content), "importance": new_importance or 0.5, "created_at": int(datetime.now().timestamp()) }] collection.insert(data) collection.flush()def forget_memory(collection, criteria): """ 选择性遗忘记忆 策略示例: - 时间衰减:删除90天前的低重要性情景记忆 - 置信度过滤:删除置信度<0.6的语义记忆 """ # 示例:删除过期的情景记忆 if "older_than_days" in criteria: cutoff_time = int(datetime.now().timestamp()) - criteria["older_than_days"] * 86400 filter_expr = f"created_at < {cutoff_time} && importance < {criteria.get('min_importance', 0.5)}" collection.delete(expr=filter_expr) print(f"🗑️ 已清理 {criteria['older_than_days']} 天前的低重要性记忆")# ===== 使用场景 =====# 场景1:用户修正信息update_memory( collection=memory_system.episodic_memory, memory_id=12345, new_content="用户旅行时间改为11月15日", new_importance=0.9)# 场景2:定期清理过期记忆forget_memory( collection=memory_system.episodic_memory, criteria={"older_than_days": 90, "min_importance": 0.4})
04
三个阶段的本质差异
以下是从 RAG 到记忆的技术演进总结
1.webp
回顾整个演进过程,核心变化体现在两个维度:数据更新时机和操作权限。
传统 RAG 的知识库在离线阶段完成索引构建,运行时只支持查询操作。如果知识内容需要更新,必须停止服务重新构建索引。
agentic RAG 引入了检索决策机制。系统会先判断是否需要检索、以及从哪个数据源检索,避免了无效查询。但数据本身依然是只读的,运行时无法修改知识库内容。
Agent memory 阶段实现了关键突破:系统具备了运行时的写入能力。它可以在对话过程中创建新记忆、更新过期信息、删除无用记录。这种从只读到读写的转变,使得系统能够动态维护用户的个性化知识库。
回顾这三个阶段的演进,有个反直觉的发现:技术难度在下降,工程难度在上升。
传统 RAG 的核心是怎么检索得更准——索引算法、相似度计算。到了智能体记忆阶段,技术问题反而简单了(Milvus 把 CRUD 都封装好了),真正棘手的是工程决策:什么信息值得记?记多久?什么时候更新?什么时候忘掉?
实际项目中,很多团队没想清楚记忆策略。用户的随口抱怨要不要记?记错了怎么办?三个月不登录的用户,记忆要不要清理?
这些问题没有标准答案,完全取决于业务场景。真正的壁垒是对用户记忆管理的理解深度。

技术干货
当一个程序员决定穿上粉裤子
如何找到和你时尚风格相似的明星?AI + Milvus=?
2023-8-23
技术干货
GPTCache 悬赏令!寻找最佳捉虫猎手,豪华赏格等你来拿!
捉虫数量越多,奖品越丰厚!
2023-8-2
技术干货
我决定给 ChatGPT 做个缓存层 >>> Hello GPTCache
我们从自己的开源项目 Milvus 和一顿没有任何目的午饭中分别获得了灵感,做出了 OSSChat、GPTCache。在这个过程中,我们也在不断接受「从 0 到 1」的考验。作为茫茫 AI 领域开发者和探索者中的一员,我很愿意与诸位分享这背后的故事、逻辑和设计思考,希望大家能避坑避雷、有所收获。
2023-4-14




