
—
LangGraph 知识库查询 和 Active RAG (主动式 RAG)
1、LangGraph知识库(向量库)查询
LangGraph 本身不具备查询功能,它只是通过 Python 函数去调用 LangChain 的组件(Retriever)或 数据库的原生 SDK。
在 LangGraph 中,查询向量库通常被封装在一个标准的Python 函数(Node)中。这个过程分为三步:
a、入参:从 LangGraph 的State中拿到用户的问题。
b、动作:在函数内部调用向量库接口(这是最关键的一步)。
c、出参:把查到的文档(Documents)塞回State,供后续节点使用。
常见实现方式有如下两种:
1)、直接调用 Retriever(适合自定义工作流)
2)、作为 Tool 供 Agent 调用(适合 ReAct 架构)
本节主要介绍下第一种直接调用Retriever的方式,代码如下:
# 1. 准备工作:在图构建之外,先定义好 Retriever
# 这一步纯粹是 LangChain 的写法,跟 LangGraph 无关
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
# 假设这里是你连接向量库的代码
vectorstore = Chroma(persist_directory="./db", embedding_function=OpenAIEmbeddings())
# 将向量库转换为检索器对象
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# ==========================================
# 2. 核心:在 LangGraph 节点中调用它
# ==========================================
def retrieve_node(state):
"""
这个函数就是 LangGraph 中的一个 Node。
"""
print("--- 正在查询向量库 ---")
# [第一步] 从 State 获取问题
question = state["question"]
# [第二步] 执行查询动作
# 这里直接调用 LangChain 的 invoke 接口
# 它会自动完成:问题Embedding -> 向量搜索 -> 返回Document列表
documents = retriever.invoke(question)
# [第三步] 更新 State
# 返回的字典会自动合并到全局 State 中
return {"context": documents}
接下来我们使用LangChain 的InMemoryVectorStore(内存向量库)和FakeEmbeddings(模拟 Embeddings),来直接运行一个示例,无需 LLM API Key 或外部向量库,模拟了一个最简化的流程:
1)构建知识库:初始化一个内存向量库,存入几条关于“公司政策”的 Mock 数据。
2)定义图:使用 LangGraph 构建一个包含retrieve节点的图。
3)运行:输入问题,查看图是如何把向量库的数据查出来并更新到状态(State)里的。
代码如下(安装基础依赖pip install langgraph langchain langchain-community):
import operator
from typing import Annotated, List, TypedDict
# LangChain 组件
from langchain_core.documents import Document
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_core.embeddings import FakeEmbeddings
from langchain_core.runnables import RunnableLambda
# LangGraph 组件
from langgraph.graph import StateGraph, END
# --- 1. 准备 Mock 向量库数据 ---
print("--- [1] 初始化向量库 (Mock) ---")
# 使用 FakeEmbeddings 避免需要 API Key
# 它的原理是生成随机向量,只要维度一致即可运行,但语义匹配效果是随机的
# (但在本示例的精确匹配场景下,InMemoryVectorStore 也能工作)
embeddings = FakeEmbeddings(size=768)
vector_store = InMemoryVectorStore(embeddings)
# 模拟一些公司政策文档
mock_docs = [
Document(page_content="差旅费报销政策:所有超过 500 元的支出都需要发票。", metadata={"source": "policy_v1"}),
Document(page_content="年假政策:员工入职满一年后,每年享有 10 天带薪年假。", metadata={"source": "hr_handbook"}),
Document(page_content="远程办公政策:每周三允许员工在家办公。", metadata={"source": "remote_work"}),
]
vector_store.add_documents(mock_docs)
print(f"成功存入 {len(mock_docs)} 条文档到内存向量库。\n")
# 创建检索器 (Retriever)
# k=1 表示每次只查最相关的那 1 条
retriever = vector_store.as_retriever(search_kwargs={"k": 1})
# --- 2. 定义 LangGraph 的状态 (State) ---
class AgentState(TypedDict):
question: str # 用户输入的问题
retrieved_docs: List[str] # 存储检索到的结果
# --- 3. 定义节点 (Node) ---
# 这是核心:LangGraph 如何调用 retriever
def retrieve_node(state: AgentState):
question = state["question"]
print(f"--- [Node: Retrieve] 正在检索关于 '{question}' 的内容 ---")
# 【关键动作】直接调用 retriever
documents = retriever.invoke(question)
# 打印检索结果看看
for i, doc in enumerate(documents):
print(f" > 找到文档 {i+1}: {doc.page_content}")
# 将结果存回 State
# 注意:这里返回的字典会更新到全局 State 中
return {"retrieved_docs": [doc.page_content for doc in documents]}
# --- 4. 构建图 (Graph) ---
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("retrieve_step", retrieve_node)
# 设置流程: 入口 -> retrieve_step -> 结束
workflow.set_entry_point("retrieve_step")
workflow.add_edge("retrieve_step", END)
# 编译应用
app = workflow.compile()
# --- 5. 运行测试 ---
if __name__ == "__main__":
print("--- [5] 开始运行 LangGraph ---")
# 模拟用户输入
inputs = {"question": "差旅费报销"}
# 运行图
result = app.invoke(inputs)
print("\n--- [6] 最终状态 (Final State) ---")
print(f"用户问题: {result['question']}")
print(f"检索结果: {result['retrieved_docs']}")代码运行结果如下:

2、LangGraph + RAG = Active RAG( 主动式 RAG)
如果说 MCP 解决了“手脚”怎么动的问题,那么 LangGraph + RAG 就是解决了“大脑”怎么查阅资料并自我修正的问题。
在传统的 Chain(链式)结构中,RAG 往往是“一锤子买卖”:检索 -> 生成。如果检索错了,回答也就错了。
但在 LangGraph 中,RAG 变成了一个“不知疲倦的研究员”:它会先检索,读完觉得不对,换个关键词再检索,直到找到满意的资料为止。
这种模式被称为 Active RAG (主动式 RAG) 或 Self-RAG。
这个主动式RAG就先简单了解下概念,后续有机会再细研究总结。
02
—
LangGraph 记忆持久化
之前的文章有介绍过LangGraph的记忆有分为上下文管理和长期持久化记忆。
LangGraph的上下文是通过State来管理的,State就是上下文的窗口,所有节点函数代码中定义的state参数,就是当前的全部上下文。
基于 LangGraph 官方文档(Persistence 部分),可以将 LangGraph 的记忆与持久化机制清晰地划分为两个核心概念:Checkpoints(检查点/短时记忆) 和 Store(存储/长时记忆)。
1、Checkpoints(检查点):会话级持久化
Checkpoint 是对 Graph State(图状态) 的快照。它的核心作用是“记录当下,以便回溯”。它与特定的 thread_id(线程/会话 ID)强绑定。
主要有如下功能:
a、容错与恢复 (Fault Tolerance):如果程序崩溃,只要有thread_id,就可以从上一个步骤中断的地方继续运行,而不用从头开始。
b、时光倒流 (Time Travel):既然保存了每一步的快照,你可以随时查看(Get)甚至回滚到之前的某个步骤。
c、人机交互 (Human-in-the-loop):允许程序暂停(等待人类审批),人类修改状态后,程序基于修改后的状态继续运行。
2、Store(存储):跨会话/长久记忆
Store 是一个层级化的 Key-Value 数据库。它的核心作用是“跨越时空共享数据”。它不依赖于当前的对话上下文,数据可以跨 thread_id 存在。
主要有如下功能:
a、跨会话记忆 (Cross-Thread Memory): 用户昨天(Thread A)说喜欢红色,今天(Thread B)开启新对话时,Agent 依然能通过查询 Store 知道他喜欢红色。
b、共享知识: 可以存储所有 Agent 共享的全局规则或知识。
c、语义搜索: 高级的 Store 实现(如 PostgresStore)支持通过向量(Embeddings)进行语义检索,即使记不清 Key 也能查到相关记忆。
3、总结对比

4、LangGraph中两种持久化方式代码实战
安装依赖包 pip install langgraph langchain-core
代码如下:
import uuid
from typing import Annotated, Literal, TypedDict, Union, Dict, Any
from datetime import datetime
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage, BaseMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.memory import InMemoryStore
# ==========================================
# 1. 生产级数据结构定义 (Schema)
# ==========================================
# 用于 State (图的状态)
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
# 用于 Store (长期记忆的数据结构)
# 在生产环境中,这通常对应数据库中的一张表
class UserProfile(TypedDict):
name: str | None
preference: str | None
last_updated: str
# ==========================================
# 2. 模拟 LLM (为了让你无需 API Key 也能运行)
# ==========================================
# 在真实生产代码中,这里会是 ChatOpenAI 或 Claude
class MockLLM:
def invoke(self, messages: list, user_profile: dict):
last_msg = messages[-1].content
# 逻辑 A: 如果用户自报家门 -> 决定调用工具保存
if "我叫" in last_msg:
name = last_msg.split("我叫")[-1].strip()
print(f" [MockLLM] 💡 检测到用户信息,决定调用工具保存: {name}")
return AIMessage(
content="",
tool_calls=[{
"id": "call_" + str(uuid.uuid4())[:8],
"name": "update_profile",
"args": {"name": name}
}]
)
# 逻辑 B: 如果用户问我是谁 -> 读取长期记忆回答
elif "我是谁" in last_msg or "记得" in last_msg:
name = user_profile.get("name", "未知用户")
return AIMessage(content=f"你是 {name}啊!这是我从长期记忆里读出来的。")
# 逻辑 C: 如果用户问刚才说了什么 -> 从对话历史中查找(演示 Checkpointer 的作用)
elif "刚才" in last_msg and ("说" in last_msg or "什么" in last_msg):
# 从 messages 中查找用户之前说的话(跳过最后一条,因为那是当前问题)
user_messages = [msg for msg in messages[:-1] if isinstance(msg, HumanMessage)]
if user_messages:
previous_msg = user_messages[-1].content
print(f" [MockLLM] 💡 从 Checkpointer 恢复的对话历史中找到:{previous_msg}")
return AIMessage(content=f"你刚才说:'{previous_msg}'。这是我从 Checkpointer 保存的对话历史中读取的!")
else:
return AIMessage(content="我没有找到你之前说的话。")
# 逻辑 D: 普通对话
else:
return AIMessage(content=f"收到了:{last_msg}")
model = MockLLM()
# ==========================================
# 3. 定义节点 (Nodes)
# ==========================================
# 全局 store 变量,将在编译图时设置
long_term_store = None
# --- 节点 1: 思考与决策节点 ---
def agent_node(state: AgentState, config: RunnableConfig):
"""
核心 Agent 节点:
1. 从 Store 获取长期记忆 (用户画像)。
2. 将画像注入到 System Prompt。
3. 调用 LLM。
"""
user_id = config["configurable"]["user_id"]
# [关键步骤] 主动从 Store 读取跨会话记忆
# 命名空间设计: ("users", user_id)
stored_data = long_term_store.get(("users", user_id), "profile")
# 如果是新用户,给个默认空画像
user_profile = stored_data.value if stored_data else {"name": None}
print(f"--- [Node: Agent] 当前加载的用户画像: {user_profile} ---")
# 模拟调用 LLM (传入记忆)
response = model.invoke(state["messages"], user_profile)
return {"messages": [response]}
# --- 节点 2: 记忆写入工具节点 ---
def update_profile_tool(state: AgentState, config: RunnableConfig):
"""
工具节点:专门负责将提取到的信息写入 Store
"""
user_id = config["configurable"]["user_id"]
# 获取 LLM 想要调用的工具参数 (这里简化处理,取最后一个消息的工具调用)
last_message = state["messages"][-1]
tool_call = last_message.tool_calls[0]
new_name = tool_call["args"]["name"]
print(f"--- [Node: Tool] 正在将 '{new_name}' 写入长期存储 Store ---")
# [关键步骤] 写入 Store
new_profile: UserProfile = {
"name": new_name,
"preference": "default",
"last_updated": datetime.now().isoformat()
}
long_term_store.put(("users", user_id), "profile", new_profile)
# 返回工具执行结果
return {
"messages": [
ToolMessage(
tool_call_id=tool_call["id"],
content=f"已更新用户资料: Name={new_name}"
)
]
}
# --- 辅助逻辑: 路由边 (Conditional Edge) ---
def should_continue(state: AgentState) -> Literal["update_profile", "__end__"]:
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "update_profile"
return "__end__"
# ==========================================
# 4. 构建图 (Graph Construction)
# ==========================================
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("update_profile", update_profile_tool)
# 设置入口
workflow.set_entry_point("agent")
# 添加边
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("update_profile", "agent") # 工具执行完通过循环回到 Agent 确认
# ==========================================
# 5. 初始化持久化层
# ==========================================
# A. 负责短期会话上下文 (Checkpointer)
# Checkpointer 的作用:
# 1. 保存同一 thread_id 下的对话历史(短期记忆)
# 2. 在同一个 thread_id 下,每次调用都会自动恢复之前的对话状态
# 3. 不同 thread_id 之间的对话历史是隔离的(互不影响)
# 4. 适合保存:当前会话的对话记录、临时状态等
checkpointer = MemorySaver()
# B. 负责长期跨会话记忆 (Store)
# Store 的作用:
# 1. 保存跨 thread_id 的长期数据(长期记忆)
# 2. 所有 thread_id 共享同一个 Store(通过 user_id 区分用户)
# 3. 适合保存:用户画像、偏好设置、知识库等
long_term_store = InMemoryStore()
# 编译图:同时挂载两者
app = workflow.compile(
checkpointer=checkpointer,
store=long_term_store
)
# ==========================================
# 6. 运行模拟 (Simulation)
# ==========================================
if __name__ == "__main__":
# 定义全局的用户 ID (这个 ID 贯穿所有会话)
GLOBAL_USER_ID = "user_999"
print("\n" + "="*60)
print("📚 演示 1: Checkpointer 的作用 - 同一 thread_id 下的对话历史")
print("="*60)
print("\n========== 🟢 会话 1: 初次见面 (Thread ID: A) ==========")
# 场景:用户第一次来,告诉 Agent 名字
config_session_a = {
"configurable": {
"thread_id": "thread_A", # 会话 A
"user_id": GLOBAL_USER_ID # 用户 999
}
}
input_msg = {"messages": [HumanMessage(content="你好,我叫 Gemini")]}
for event in app.stream(input_msg, config=config_session_a):
pass
final_state = app.get_state(config_session_a)
print(f"🤖 AI 最后回复: {final_state.values['messages'][-1].content}")
# 显示当前会话的对话历史(Checkpointer 保存的)
print(f"\n📝 [Checkpointer] Thread A 的对话历史(共 {len(final_state.values['messages'])} 条消息):")
for i, msg in enumerate(final_state.values['messages'], 1):
msg_type = type(msg).__name__
content = msg.content[:50] + "..." if len(msg.content) > 50 else msg.content
print(f" {i}. {msg_type}: {content}")
print("\n========== 🟡 会话 1 继续: 在同一 Thread A 下继续对话 ==========")
# 场景:用户在同一个 thread_id 下继续对话
# Checkpointer 会自动恢复之前的对话历史!
print(">>> 用户继续对话: '我刚才说了什么?'")
input_msg_continue = {"messages": [HumanMessage(content="我刚才说了什么?")]}
for event in app.stream(input_msg_continue, config=config_session_a):
pass
final_state_continue = app.get_state(config_session_a)
print(f"🤖 AI 最后回复: {final_state_continue.values['messages'][-1].content}")
# 显示更新后的对话历史
print(f"\n📝 [Checkpointer] Thread A 的对话历史(共 {len(final_state_continue.values['messages'])} 条消息):")
for i, msg in enumerate(final_state_continue.values['messages'], 1):
msg_type = type(msg).__name__
content = msg.content[:50] + "..." if len(msg.content) > 50 else msg.content
print(f" {i}. {msg_type}: {content}")
print(" ✅ 可以看到,Checkpointer 保存了完整的对话历史!")
print("\n" + "="*60)
print("📚 演示 2: Checkpointer 的隔离性 - 不同 thread_id 之间对话历史隔离")
print("="*60)
print("\n========== 🔴 会话 2: 隔天再来 (Thread ID: B) ==========")
# 场景:用户换了一个新会话 (Thread B),旧的聊天记录(Checkpoint) 已经隔离了
# 但是!Store 是共享的。
print(">>> 注意:这是全新的 thread_id,Checkpointer 中的对话历史是空的!")
print(">>> 但是 Store 中的长期记忆是共享的(通过 user_id)")
config_session_b = {
"configurable": {
"thread_id": "thread_B", # 全新的会话 ID
"user_id": GLOBAL_USER_ID # 还是同一个用户 999
}
}
input_msg_2 = {"messages": [HumanMessage(content="你还记得我是谁吗?")]}
print(">>> 用户开启新会话: '你还记得我是谁吗?'")
for event in app.stream(input_msg_2, config=config_session_b):
pass
final_state_b = app.get_state(config_session_b)
print(f"🤖 AI 最后回复: {final_state_b.values['messages'][-1].content}")
# 显示新会话的对话历史(应该是空的,只有当前对话)
print(f"\n📝 [Checkpointer] Thread B 的对话历史(共 {len(final_state_b.values['messages'])} 条消息):")
for i, msg in enumerate(final_state_b.values['messages'], 1):
msg_type = type(msg).__name__
content = msg.content[:50] + "..." if len(msg.content) > 50 else msg.content
print(f" {i}. {msg_type}: {content}")
print(" ✅ 可以看到,Thread B 的对话历史是独立的,不包含 Thread A 的对话!")
print(" ✅ 但是 AI 仍然知道用户是谁,因为 Store 是共享的!")
print("\n" + "="*60)
print("📚 演示 3: 对比 Thread A 和 Thread B 的对话历史")
print("="*60)
# 再次查看 Thread A 的对话历史
final_state_a_again = app.get_state(config_session_a)
print(f"\n📝 Thread A 的对话历史(共 {len(final_state_a_again.values['messages'])} 条消息)")
print(f"📝 Thread B 的对话历史(共 {len(final_state_b.values['messages'])} 条消息)")
print(" ✅ 两个 thread_id 的对话历史完全隔离,互不影响!")
print("\n========== 🔍 验证 Store 数据(长期记忆,跨 thread_id 共享) ==========")
print(f"Store 中存储的数据: {long_term_store.get(('users', GLOBAL_USER_ID), 'profile').value}")
print(" ✅ Store 数据在所有 thread_id 之间共享!")
print("\n" + "="*60)
print("📊 总结:")
print("="*60)
print("1. Checkpointer: 保存短期会话上下文,同一 thread_id 下可恢复对话历史")
print("2. Store: 保存长期跨会话记忆,所有 thread_id 共享(通过 user_id 区分用户)")
print("3. 两者配合使用:Checkpointer 管理对话流,Store 管理用户画像和知识库")
运行结果如下:


5、题外话:对比在DeepSeek和Gemini的个人记忆
本节内容和langgraph以及标题不太相关,纯粹是笔者好奇现在主流大模型关于记忆是如何处理的。
同时问了DeepSeek和Gemini相同的问题,“我在xxx的记忆有哪些?”
笔者日常工作和生活都有同时在使用这两个大模型,从如下两个截图来看两个大模型的回答,DeepSeek现在应该是没有开启记忆功能的,Gemini是有开启的。


03
—
LangGraph Agent 可视化 --LangSmith
1、LangSmith介绍
基于 LangChain 官方文档(LangSmith 部分),我们可以将 LangSmith 定义为:一个用于 LLM 应用全生命周期的 DevOps 平台。
如果说 LangChain/LangGraph 是用来“造车”的(构建应用),那么 LangSmith 就是“仪表盘 + 维修车间 + 试车场”(监控、调试、评估)。
在智能体开发中,我们经常面临“黑盒”问题:
LangSmith 就是为了解决这些可观测性(Observability)和评估(Evaluation)难题而生的。
主要有如下三个功能:
a、Tracing(全链路追踪/调试)
b、Evaluation(评估/测试)
c、Monitoring(生产监控)
LangSmith 目前大部分的核心功能应该可以免费使用试用,好像使用量(额度)和数据保留时间的限制,用于学习、开发调试和个人项目来说,应该是够用的。
2、LangSmith使用介绍
1)配置方法

2)启动应用

3)登陆LangSmith页面(https://smith.langchain.com/),选择我们绑定的项目名,查看页面提供的相关功能,页面具体功能笔者也没有详细去分析,提供些截图大家可以看看。
登录初始页面

历史上跑过的一些自主规划Agent数据

打开指定链路,查看具体的每个LangGraph 节点的耗时等监控数据

查看其他监控相关数据

04
—
结语
关于LangGraph系列的学习总结文章介绍的差不多了,最终还是要能落地到生产环境,能解决问题,或有业务价值的才是有意义的,不然就是空谈。
未来如果在生产环境真实落地后,再来做一些相关的分享。