首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >LangGraph之知识库调用、记忆持久化及Agent可视化

LangGraph之知识库调用、记忆持久化及Agent可视化

作者头像
Wangzy
发布2026-06-22 18:54:36
发布2026-06-22 18:54:36
230
举报

前记:上篇文章结尾提到还有LangGraph 调用知识库、记忆持久化、智能体链路可视化等还没介绍,本文就接着介绍这些内容。

01

LangGraph 知识库查询 和 Active RAG (主动式 RAG)

1、LangGraph知识库(向量库)查询

LangGraph 本身不具备查询功能,它只是通过 Python 函数去调用 LangChain 的组件(Retriever)或 数据库的原生 SDK。

在 LangGraph 中,查询向量库通常被封装在一个标准的Python 函数(Node)中。这个过程分为三步:

a、入参:从 LangGraph 的State中拿到用户的问题。

b、动作:在函数内部调用向量库接口(这是最关键的一步)。

c、出参:把查到的文档(Documents)塞回State,供后续节点使用。

常见实现方式有如下两种:

1)、直接调用 Retriever(适合自定义工作流)

2)、作为 Tool 供 Agent 调用(适合 ReAct 架构)

本节主要介绍下第一种直接调用Retriever的方式,代码如下:

代码语言:javascript
复制
# 1. 准备工作:在图构建之外,先定义好 Retriever
# 这一步纯粹是 LangChain 的写法,跟 LangGraph 无关
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

# 假设这里是你连接向量库的代码
vectorstore = Chroma(persist_directory="./db", embedding_function=OpenAIEmbeddings())
# 将向量库转换为检索器对象
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# ==========================================
# 2. 核心:在 LangGraph 节点中调用它
# ==========================================

def retrieve_node(state):
    """
    这个函数就是 LangGraph 中的一个 Node。
    """
    print("--- 正在查询向量库 ---")

    # [第一步] 从 State 获取问题
    question = state["question"]

    # [第二步] 执行查询动作
    # 这里直接调用 LangChain 的 invoke 接口
    # 它会自动完成:问题Embedding -> 向量搜索 -> 返回Document列表
    documents = retriever.invoke(question)

    # [第三步] 更新 State
    # 返回的字典会自动合并到全局 State 中
    return {"context": documents}

接下来我们使用LangChain 的InMemoryVectorStore(内存向量库)和FakeEmbeddings(模拟 Embeddings),来直接运行一个示例,无需 LLM API Key 或外部向量库,模拟了一个最简化的流程:

1)构建知识库:初始化一个内存向量库,存入几条关于“公司政策”的 Mock 数据。

2)定义图:使用 LangGraph 构建一个包含retrieve节点的图。

3)运行:输入问题,查看图是如何把向量库的数据查出来并更新到状态(State)里的。

代码如下(安装基础依赖pip install langgraph langchain langchain-community):

代码语言:javascript
复制
import operator
from typing import Annotated, List, TypedDict

# LangChain 组件
from langchain_core.documents import Document
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_core.embeddings import FakeEmbeddings
from langchain_core.runnables import RunnableLambda

# LangGraph 组件
from langgraph.graph import StateGraph, END

# --- 1. 准备 Mock 向量库数据 ---
print("--- [1] 初始化向量库 (Mock) ---")

# 使用 FakeEmbeddings 避免需要 API Key
# 它的原理是生成随机向量,只要维度一致即可运行,但语义匹配效果是随机的
# (但在本示例的精确匹配场景下,InMemoryVectorStore 也能工作)
embeddings = FakeEmbeddings(size=768)
vector_store = InMemoryVectorStore(embeddings)

# 模拟一些公司政策文档
mock_docs = [
    Document(page_content="差旅费报销政策:所有超过 500 元的支出都需要发票。", metadata={"source": "policy_v1"}),
    Document(page_content="年假政策:员工入职满一年后,每年享有 10 天带薪年假。", metadata={"source": "hr_handbook"}),
    Document(page_content="远程办公政策:每周三允许员工在家办公。", metadata={"source": "remote_work"}),
]

vector_store.add_documents(mock_docs)
print(f"成功存入 {len(mock_docs)} 条文档到内存向量库。\n")

# 创建检索器 (Retriever)
# k=1 表示每次只查最相关的那 1 条
retriever = vector_store.as_retriever(search_kwargs={"k": 1})


# --- 2. 定义 LangGraph 的状态 (State) ---
class AgentState(TypedDict):
    question: str                # 用户输入的问题
    retrieved_docs: List[str]    # 存储检索到的结果


# --- 3. 定义节点 (Node) ---
# 这是核心:LangGraph 如何调用 retriever
def retrieve_node(state: AgentState):
    question = state["question"]
    print(f"--- [Node: Retrieve] 正在检索关于 '{question}' 的内容 ---")

    # 【关键动作】直接调用 retriever
    documents = retriever.invoke(question)

    # 打印检索结果看看
    for i, doc in enumerate(documents):
        print(f"    > 找到文档 {i+1}: {doc.page_content}")

    # 将结果存回 State
    # 注意:这里返回的字典会更新到全局 State 中
    return {"retrieved_docs": [doc.page_content for doc in documents]}


# --- 4. 构建图 (Graph) ---
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("retrieve_step", retrieve_node)

# 设置流程: 入口 -> retrieve_step -> 结束
workflow.set_entry_point("retrieve_step")
workflow.add_edge("retrieve_step", END)

# 编译应用
app = workflow.compile()


# --- 5. 运行测试 ---
if __name__ == "__main__":
    print("--- [5] 开始运行 LangGraph ---")

    # 模拟用户输入
    inputs = {"question": "差旅费报销"}

    # 运行图
    result = app.invoke(inputs)

    print("\n--- [6] 最终状态 (Final State) ---")
    print(f"用户问题: {result['question']}")
    print(f"检索结果: {result['retrieved_docs']}")

代码运行结果如下:

2、LangGraph + RAG = Active RAG( 主动式 RAG)

如果说 MCP 解决了“手脚”怎么动的问题,那么 LangGraph + RAG 就是解决了“大脑”怎么查阅资料并自我修正的问题。

在传统的 Chain(链式)结构中,RAG 往往是“一锤子买卖”:检索 -> 生成。如果检索错了,回答也就错了。

但在 LangGraph 中,RAG 变成了一个“不知疲倦的研究员”:它会先检索,读完觉得不对,换个关键词再检索,直到找到满意的资料为止。

这种模式被称为 Active RAG (主动式 RAG) 或 Self-RAG。

这个主动式RAG就先简单了解下概念,后续有机会再细研究总结。

02

LangGraph 记忆持久化

之前的文章有介绍过LangGraph的记忆有分为上下文管理和长期持久化记忆。

LangGraph的上下文是通过State来管理的,State就是上下文的窗口,所有节点函数代码中定义的state参数,就是当前的全部上下文。

基于 LangGraph 官方文档(Persistence 部分),可以将 LangGraph 的记忆与持久化机制清晰地划分为两个核心概念:Checkpoints(检查点/短时记忆) 和 Store(存储/长时记忆)。

1、Checkpoints(检查点):会话级持久化

Checkpoint 是对 Graph State(图状态) 的快照。它的核心作用是“记录当下,以便回溯”。它与特定的 thread_id(线程/会话 ID)强绑定。

主要有如下功能:

a、容错与恢复 (Fault Tolerance):如果程序崩溃,只要有thread_id,就可以从上一个步骤中断的地方继续运行,而不用从头开始。

b、时光倒流 (Time Travel):既然保存了每一步的快照,你可以随时查看(Get)甚至回滚到之前的某个步骤。

c、人机交互 (Human-in-the-loop):允许程序暂停(等待人类审批),人类修改状态后,程序基于修改后的状态继续运行。

2、Store(存储):跨会话/长久记忆

Store 是一个层级化的 Key-Value 数据库。它的核心作用是“跨越时空共享数据”。它不依赖于当前的对话上下文,数据可以跨 thread_id 存在。

主要有如下功能:

a、跨会话记忆 (Cross-Thread Memory): 用户昨天(Thread A)说喜欢红色,今天(Thread B)开启新对话时,Agent 依然能通过查询 Store 知道他喜欢红色。

b、共享知识: 可以存储所有 Agent 共享的全局规则或知识。

c、语义搜索: 高级的 Store 实现(如 PostgresStore)支持通过向量(Embeddings)进行语义检索,即使记不清 Key 也能查到相关记忆。

3、总结对比

4、LangGraph中两种持久化方式代码实战

安装依赖包 pip install langgraph langchain-core

代码如下:

代码语言:javascript
复制
import uuid
from typing import Annotated, Literal, TypedDict, Union, Dict, Any
from datetime import datetime

from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, ToolMessage, BaseMessage
from langchain_core.runnables import RunnableConfig
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.memory import InMemoryStore


# ==========================================
# 1. 生产级数据结构定义 (Schema)
# ==========================================

# 用于 State (图的状态)
class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]


# 用于 Store (长期记忆的数据结构)
# 在生产环境中,这通常对应数据库中的一张表
class UserProfile(TypedDict):
    name: str | None
    preference: str | None
    last_updated: str


# ==========================================
# 2. 模拟 LLM (为了让你无需 API Key 也能运行)
# ==========================================
# 在真实生产代码中,这里会是 ChatOpenAI 或 Claude
class MockLLM:
    def invoke(self, messages: list, user_profile: dict):
        last_msg = messages[-1].content

        # 逻辑 A: 如果用户自报家门 -> 决定调用工具保存
        if "我叫" in last_msg:
            name = last_msg.split("我叫")[-1].strip()
            print(f"  [MockLLM] 💡 检测到用户信息,决定调用工具保存: {name}")
            return AIMessage(
                content="",
                tool_calls=[{
                    "id": "call_" + str(uuid.uuid4())[:8],
                    "name": "update_profile",
                    "args": {"name": name}
                }]
            )

        # 逻辑 B: 如果用户问我是谁 -> 读取长期记忆回答
        elif "我是谁" in last_msg or "记得" in last_msg:
            name = user_profile.get("name", "未知用户")
            return AIMessage(content=f"你是 {name}啊!这是我从长期记忆里读出来的。")

        # 逻辑 C: 如果用户问刚才说了什么 -> 从对话历史中查找(演示 Checkpointer 的作用)
        elif "刚才" in last_msg and ("说" in last_msg or "什么" in last_msg):
            # 从 messages 中查找用户之前说的话(跳过最后一条,因为那是当前问题)
            user_messages = [msg for msg in messages[:-1] if isinstance(msg, HumanMessage)]
            if user_messages:
                previous_msg = user_messages[-1].content
                print(f"  [MockLLM] 💡 从 Checkpointer 恢复的对话历史中找到:{previous_msg}")
                return AIMessage(content=f"你刚才说:'{previous_msg}'。这是我从 Checkpointer 保存的对话历史中读取的!")
            else:
                return AIMessage(content="我没有找到你之前说的话。")

        # 逻辑 D: 普通对话
        else:
            return AIMessage(content=f"收到了:{last_msg}")


model = MockLLM()


# ==========================================
# 3. 定义节点 (Nodes)
# ==========================================

# 全局 store 变量,将在编译图时设置
long_term_store = None

# --- 节点 1: 思考与决策节点 ---
def agent_node(state: AgentState, config: RunnableConfig):
    """
    核心 Agent 节点:
    1. 从 Store 获取长期记忆 (用户画像)。
    2. 将画像注入到 System Prompt。
    3. 调用 LLM。
    """
    user_id = config["configurable"]["user_id"]

    # [关键步骤] 主动从 Store 读取跨会话记忆
    # 命名空间设计: ("users", user_id)
    stored_data = long_term_store.get(("users", user_id), "profile")

    # 如果是新用户,给个默认空画像
    user_profile = stored_data.value if stored_data else {"name": None}

    print(f"--- [Node: Agent] 当前加载的用户画像: {user_profile} ---")

    # 模拟调用 LLM (传入记忆)
    response = model.invoke(state["messages"], user_profile)

    return {"messages": [response]}


# --- 节点 2: 记忆写入工具节点 ---
def update_profile_tool(state: AgentState, config: RunnableConfig):
    """
    工具节点:专门负责将提取到的信息写入 Store
    """
    user_id = config["configurable"]["user_id"]

    # 获取 LLM 想要调用的工具参数 (这里简化处理,取最后一个消息的工具调用)
    last_message = state["messages"][-1]
    tool_call = last_message.tool_calls[0]
    new_name = tool_call["args"]["name"]

    print(f"--- [Node: Tool] 正在将 '{new_name}' 写入长期存储 Store ---")

    # [关键步骤] 写入 Store
    new_profile: UserProfile = {
        "name": new_name,
        "preference": "default",
        "last_updated": datetime.now().isoformat()
    }
    long_term_store.put(("users", user_id), "profile", new_profile)

    # 返回工具执行结果
    return {
        "messages": [
            ToolMessage(
                tool_call_id=tool_call["id"],
                content=f"已更新用户资料: Name={new_name}"
            )
        ]
    }


# --- 辅助逻辑: 路由边 (Conditional Edge) ---
def should_continue(state: AgentState) -> Literal["update_profile", "__end__"]:
    messages = state["messages"]
    last_message = messages[-1]

    if last_message.tool_calls:
        return "update_profile"
    return "__end__"


# ==========================================
# 4. 构建图 (Graph Construction)
# ==========================================

workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("update_profile", update_profile_tool)

# 设置入口
workflow.set_entry_point("agent")

# 添加边
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("update_profile", "agent")  # 工具执行完通过循环回到 Agent 确认

# ==========================================
# 5. 初始化持久化层
# ==========================================

# A. 负责短期会话上下文 (Checkpointer)
# Checkpointer 的作用:
# 1. 保存同一 thread_id 下的对话历史(短期记忆)
# 2. 在同一个 thread_id 下,每次调用都会自动恢复之前的对话状态
# 3. 不同 thread_id 之间的对话历史是隔离的(互不影响)
# 4. 适合保存:当前会话的对话记录、临时状态等
checkpointer = MemorySaver()

# B. 负责长期跨会话记忆 (Store)
# Store 的作用:
# 1. 保存跨 thread_id 的长期数据(长期记忆)
# 2. 所有 thread_id 共享同一个 Store(通过 user_id 区分用户)
# 3. 适合保存:用户画像、偏好设置、知识库等
long_term_store = InMemoryStore()

# 编译图:同时挂载两者
app = workflow.compile(
    checkpointer=checkpointer,
    store=long_term_store
)

# ==========================================
# 6. 运行模拟 (Simulation)
# ==========================================

if __name__ == "__main__":
    # 定义全局的用户 ID (这个 ID 贯穿所有会话)
    GLOBAL_USER_ID = "user_999"

    print("\n" + "="*60)
    print("📚 演示 1: Checkpointer 的作用 - 同一 thread_id 下的对话历史")
    print("="*60)

    print("\n========== 🟢 会话 1: 初次见面 (Thread ID: A) ==========")
    # 场景:用户第一次来,告诉 Agent 名字
    config_session_a = {
        "configurable": {
            "thread_id": "thread_A",  # 会话 A
            "user_id": GLOBAL_USER_ID  # 用户 999
        }
    }

    input_msg = {"messages": [HumanMessage(content="你好,我叫 Gemini")]}

    for event in app.stream(input_msg, config=config_session_a):
        pass

    final_state = app.get_state(config_session_a)
    print(f"🤖 AI 最后回复: {final_state.values['messages'][-1].content}")

    # 显示当前会话的对话历史(Checkpointer 保存的)
    print(f"\n📝 [Checkpointer] Thread A 的对话历史(共 {len(final_state.values['messages'])} 条消息):")
    for i, msg in enumerate(final_state.values['messages'], 1):
        msg_type = type(msg).__name__
        content = msg.content[:50] + "..." if len(msg.content) > 50 else msg.content
        print(f"   {i}. {msg_type}: {content}")

    print("\n========== 🟡 会话 1 继续: 在同一 Thread A 下继续对话 ==========")
    # 场景:用户在同一个 thread_id 下继续对话
    # Checkpointer 会自动恢复之前的对话历史!
    print(">>> 用户继续对话: '我刚才说了什么?'")
    input_msg_continue = {"messages": [HumanMessage(content="我刚才说了什么?")]}

    for event in app.stream(input_msg_continue, config=config_session_a):
        pass

    final_state_continue = app.get_state(config_session_a)
    print(f"🤖 AI 最后回复: {final_state_continue.values['messages'][-1].content}")

    # 显示更新后的对话历史
    print(f"\n📝 [Checkpointer] Thread A 的对话历史(共 {len(final_state_continue.values['messages'])} 条消息):")
    for i, msg in enumerate(final_state_continue.values['messages'], 1):
        msg_type = type(msg).__name__
        content = msg.content[:50] + "..." if len(msg.content) > 50 else msg.content
        print(f"   {i}. {msg_type}: {content}")
    print("   ✅ 可以看到,Checkpointer 保存了完整的对话历史!")

    print("\n" + "="*60)
    print("📚 演示 2: Checkpointer 的隔离性 - 不同 thread_id 之间对话历史隔离")
    print("="*60)

    print("\n========== 🔴 会话 2: 隔天再来 (Thread ID: B) ==========")
    # 场景:用户换了一个新会话 (Thread B),旧的聊天记录(Checkpoint) 已经隔离了
    # 但是!Store 是共享的。
    print(">>> 注意:这是全新的 thread_id,Checkpointer 中的对话历史是空的!")
    print(">>> 但是 Store 中的长期记忆是共享的(通过 user_id)")

    config_session_b = {
        "configurable": {
            "thread_id": "thread_B",  # 全新的会话 ID
            "user_id": GLOBAL_USER_ID  # 还是同一个用户 999
        }
    }

    input_msg_2 = {"messages": [HumanMessage(content="你还记得我是谁吗?")]}

    print(">>> 用户开启新会话: '你还记得我是谁吗?'")
    for event in app.stream(input_msg_2, config=config_session_b):
        pass

    final_state_b = app.get_state(config_session_b)
    print(f"🤖 AI 最后回复: {final_state_b.values['messages'][-1].content}")

    # 显示新会话的对话历史(应该是空的,只有当前对话)
    print(f"\n📝 [Checkpointer] Thread B 的对话历史(共 {len(final_state_b.values['messages'])} 条消息):")
    for i, msg in enumerate(final_state_b.values['messages'], 1):
        msg_type = type(msg).__name__
        content = msg.content[:50] + "..." if len(msg.content) > 50 else msg.content
        print(f"   {i}. {msg_type}: {content}")
    print("   ✅ 可以看到,Thread B 的对话历史是独立的,不包含 Thread A 的对话!")
    print("   ✅ 但是 AI 仍然知道用户是谁,因为 Store 是共享的!")

    print("\n" + "="*60)
    print("📚 演示 3: 对比 Thread A 和 Thread B 的对话历史")
    print("="*60)

    # 再次查看 Thread A 的对话历史
    final_state_a_again = app.get_state(config_session_a)
    print(f"\n📝 Thread A 的对话历史(共 {len(final_state_a_again.values['messages'])} 条消息)")
    print(f"📝 Thread B 的对话历史(共 {len(final_state_b.values['messages'])} 条消息)")
    print("   ✅ 两个 thread_id 的对话历史完全隔离,互不影响!")

    print("\n========== 🔍 验证 Store 数据(长期记忆,跨 thread_id 共享) ==========")
    print(f"Store 中存储的数据: {long_term_store.get(('users', GLOBAL_USER_ID), 'profile').value}")
    print("   ✅ Store 数据在所有 thread_id 之间共享!")

    print("\n" + "="*60)
    print("📊 总结:")
    print("="*60)
    print("1. Checkpointer: 保存短期会话上下文,同一 thread_id 下可恢复对话历史")
    print("2. Store: 保存长期跨会话记忆,所有 thread_id 共享(通过 user_id 区分用户)")
    print("3. 两者配合使用:Checkpointer 管理对话流,Store 管理用户画像和知识库")

运行结果如下:

5、题外话:对比在DeepSeek和Gemini的个人记忆

本节内容和langgraph以及标题不太相关,纯粹是笔者好奇现在主流大模型关于记忆是如何处理的。

同时问了DeepSeek和Gemini相同的问题,“我在xxx的记忆有哪些?”

笔者日常工作和生活都有同时在使用这两个大模型,从如下两个截图来看两个大模型的回答,DeepSeek现在应该是没有开启记忆功能的,Gemini是有开启的。

03

LangGraph Agent 可视化 --LangSmith

1、LangSmith介绍

基于 LangChain 官方文档(LangSmith 部分),我们可以将 LangSmith 定义为:一个用于 LLM 应用全生命周期的 DevOps 平台。

如果说 LangChain/LangGraph 是用来“造车”的(构建应用),那么 LangSmith 就是“仪表盘 + 维修车间 + 试车场”(监控、调试、评估)。

在智能体开发中,我们经常面临“黑盒”问题:

  • “为什么这个 Agent 在这一步卡住了?”
  • “为什么这次回答的质量突然变差了?”
  • “这个复杂的 LangGraph 图到底走了哪条分支?”

LangSmith 就是为了解决这些可观测性(Observability)和评估(Evaluation)难题而生的。

主要有如下三个功能:

a、Tracing(全链路追踪/调试)

b、Evaluation(评估/测试)

c、Monitoring(生产监控)

LangSmith 目前大部分的核心功能应该可以免费使用试用,好像使用量(额度)和数据保留时间的限制,用于学习、开发调试和个人项目来说,应该是够用的。

2、LangSmith使用介绍

1)配置方法

2)启动应用

3)登陆LangSmith页面(https://smith.langchain.com/),选择我们绑定的项目名,查看页面提供的相关功能,页面具体功能笔者也没有详细去分析,提供些截图大家可以看看。

登录初始页面

历史上跑过的一些自主规划Agent数据

打开指定链路,查看具体的每个LangGraph 节点的耗时等监控数据

查看其他监控相关数据

04

结语

关于LangGraph系列的学习总结文章介绍的差不多了,最终还是要能落地到生产环境,能解决问题,或有业务价值的才是有意义的,不然就是空谈。

未来如果在生产环境真实落地后,再来做一些相关的分享。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-12-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 周银杂谈 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前记:上篇文章结尾提到还有LangGraph 调用知识库、记忆持久化、智能体链路可视化等还没介绍,本文就接着介绍这些内容。
    • 01
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档