LangChain 是一个用于构建基于大语言模型(LLM)应用的开源框架,其核心设计理念是「模块化」和「可组合性」,通过统一的接口抽象和灵活的插件机制,使开发者能够轻松构建复杂的 AI 应用。
LangChain 采用 「monorepo(单仓库多包)」 架构组织代码:
langchain/
├── libs/ # 核心代码库
│ ├── core/ # 基础抽象和 LCEL 表达式语言
│ ├── langchain/ # 经典版 LangChain 包
│ ├── langchain_v1/ # 当前主版本包
│ ├── community/ # 第三方社区集成
│ ├── partners/ # 官方合作伙伴集成包
│ ├── text-splitters/ # 文本分割工具
│ ├── standard-tests/ # 标准化测试套件
│ └── cli/ # 命令行工具
├── docs/ # 文档
├── cookbook/ # 教程和示例
└── templates/ # 可部署的参考架构
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ (langchain / langchain_v1) │
│ Chains, Agents, Retrieval Strategies │
├─────────────────────────────────────────────────────────────┤
│ Integration Layer │
│ ┌──────────────────┐ ┌──────────────────────────────┐ │
│ │ langchain-openai │ │ langchain-community │ │
│ │ langchain-anthr. │ │ (第三方社区集成) │ │
│ │ langchain-google │ │ │ │
│ └──────────────────┘ └──────────────────────────────┘ │
├─────────────────────────────────────────────────────────────┤
│ Core Layer │
│ (langchain-core) │
│ Runnable Interface, LCEL, Base Classes, Callbacks │
└─────────────────────────────────────────────────────────────┘
「Runnable」 是 LangChain 最核心的抽象接口,所有可执行组件都实现此接口,这是实现插件化的基础。
from abc import ABC, abstractmethod
from typing import Any, Optional, List, AsyncIterator, Iterator
class Runnable(ABC):
"""所有可运行组件的基类"""
@abstractmethod
def invoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
"""同步执行单个输入"""
pass
asyncdef ainvoke(self, input: Any, config: Optional[RunnableConfig] = None) -> Any:
"""异步执行单个输入"""
pass
def batch(self, inputs: List[Any], config: Optional[RunnableConfig] = None) -> List[Any]:
"""批量执行多个输入"""
pass
asyncdef abatch(self, inputs: List[Any], config: Optional[RunnableConfig] = None) -> List[Any]:
"""异步批量执行"""
pass
def stream(self, input: Any, config: Optional[RunnableConfig] = None) -> Iterator[Any]:
"""流式返回响应"""
pass
asyncdef astream(self, input: Any, config: Optional[RunnableConfig] = None) -> AsyncIterator[Any]:
"""异步流式返回"""
pass
LangChain 提供两种主要的 Runnable 组合方式:
「RunnableSequence(顺序执行)」:
from langchain_core.runnables import RunnableSequence
# 方式1:使用管道操作符
chain = prompt | llm | output_parser
# 方式2:显式构造
chain = RunnableSequence([prompt, llm, output_parser])
「RunnableParallel(并行执行)」:
from langchain_core.runnables import RunnableParallel
# 并行执行多个 Runnable
parallel = RunnableParallel({
"summary": summary_chain,
"translation": translation_chain
})
LCEL 是 LangChain 的声明式表达语言,通过管道操作符 | 串联实现了 Runnable 接口的组件。
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# 使用 LCEL 构建链
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个有帮助的助手"),
("human", "{input}")
])
model = ChatOpenAI(model="gpt-4")
output_parser = StrOutputParser()
# 管道操作符组合
chain = prompt | model | output_parser
# 执行
result = chain.invoke({"input": "你好"})
Tool 是 LangChain 插件架构的核心组件之一,用于扩展 LLM 的能力。
from abc import ABC, abstractmethod
from pydantic import BaseModel
from typing import Any, Optional, Type
class BaseTool(BaseModel, ABC):
"""工具基类"""
name: str # 工具名称(必须唯一)
description: str # 工具描述(供 LLM 理解)
args_schema: Optional[Type[BaseModel]] = None# 参数模式
return_direct: bool = False # 是否直接返回结果
@abstractmethod
def _run(self, *args, **kwargs) -> Any:
"""同步执行工具"""
pass
asyncdef _arun(self, *args, **kwargs) -> Any:
"""异步执行工具"""
raise NotImplementedError("异步执行未实现")
「方式1:@tool 装饰器(最简单)」
from langchain_core.tools import tool
@tool
def multiply(a: int, b: int) -> int:
"""将两个数字相乘"""
return a * b
print(multiply.name) # "multiply"
print(multiply.description) # "将两个数字相乘"
print(multiply.args) # {'a': {'type': 'integer'}, 'b': {'type': 'integer'}}
「方式2:StructuredTool.from_function(更多配置)」
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(description="搜索查询")
max_results: int = Field(default=10, description="最大结果数")
def search_function(query: str, max_results: int = 10) -> str:
returnf"搜索 '{query}' 的结果"
search_tool = StructuredTool.from_function(
func=search_function,
name="search",
description="在网络上搜索信息",
args_schema=SearchInput,
return_direct=False
)
「方式3:继承 BaseTool(最灵活)」
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type
class CalculatorInput(BaseModel):
expression: str = Field(description="数学表达式")
class CalculatorTool(BaseTool):
name: str = "calculator"
description: str = "计算数学表达式"
args_schema: Type[BaseModel] = CalculatorInput
def _run(self, expression: str) -> str:
try:
result = eval(expression)
return str(result)
except Exception as e:
returnf"计算错误: {e}"
asyncdef _arun(self, expression: str) -> str:
return self._run(expression)
回调系统是 LangChain 的事件驱动机制,用于监控、日志记录和调试。
from typing import Any, Dict, List
from langchain_core.callbacks import BaseCallbackHandler
class BaseCallbackHandler:
"""回调处理器基类"""
def on_llm_start(
self,
serialized: Dict[str, Any],
prompts: List[str],
**kwargs
) -> Any:
"""LLM 开始运行时触发"""
pass
def on_llm_new_token(self, token: str, **kwargs) -> Any:
"""生成新 token 时触发(流式)"""
pass
def on_llm_end(self, response: Any, **kwargs) -> Any:
"""LLM 结束时触发"""
pass
def on_llm_error(self, error: Exception, **kwargs) -> Any:
"""LLM 出错时触发"""
pass
def on_chain_start(
self,
serialized: Dict[str, Any],
inputs: Dict[str, Any],
**kwargs
) -> Any:
"""Chain 开始时触发"""
pass
def on_chain_end(self, outputs: Dict[str, Any], **kwargs) -> Any:
"""Chain 结束时触发"""
pass
def on_tool_start(
self,
serialized: Dict[str, Any],
input_str: str,
**kwargs
) -> Any:
"""Tool 开始时触发"""
pass
def on_tool_end(self, output: str, **kwargs) -> Any:
"""Tool 结束时触发"""
pass
def on_agent_action(self, action: Any, **kwargs) -> Any:
"""Agent 执行动作时触发"""
pass
def on_agent_finish(self, finish: Any, **kwargs) -> Any:
"""Agent 完成时触发"""
pass
from langchain_core.callbacks import BaseCallbackHandler
class CustomHandler(BaseCallbackHandler):
"""自定义回调处理器"""
def on_llm_start(self, serialized, prompts, **kwargs):
print(f"[LLM开始] 提示词: {prompts}")
def on_llm_new_token(self, token, **kwargs):
print(f"[新Token] {token}", end="", flush=True)
def on_llm_end(self, response, **kwargs):
print(f"\n[LLM结束] 响应: {response}")
def on_tool_start(self, serialized, input_str, **kwargs):
print(f"[工具开始] 输入: {input_str}")
# 使用回调
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="gpt-4",
streaming=True,
callbacks=[CustomHandler()]
)
OutputParser 用于将 LLM 的原始文本输出转换为结构化数据。
from abc import ABC, abstractmethod
from typing import TypeVar, Generic
T = TypeVar('T')
class BaseOutputParser(ABC, Generic[T]):
"""输出解析器基类"""
@abstractmethod
def parse(self, text: str) -> T:
"""解析 LLM 输出文本"""
pass
def get_format_instructions(self) -> str:
"""返回格式化指令,注入到提示词中"""
pass
def parse_with_prompt(self, completion: str, prompt: Any) -> T:
"""带提示词的解析"""
return self.parse(completion)
from langchain_core.output_parsers import (
StrOutputParser, # 字符串解析
JsonOutputParser, # JSON 解析
CommaSeparatedListOutputParser, # 逗号分隔列表
PydanticOutputParser, # Pydantic 模型解析
)
# JSON 解析器示例
from pydantic import BaseModel, Field
class MovieReview(BaseModel):
title: str = Field(description="电影标题")
rating: int = Field(description="评分 1-10")
summary: str = Field(description="简短评价")
parser = JsonOutputParser(pydantic_object=MovieReview)
# 获取格式化指令
instructions = parser.get_format_instructions()
# 输出: "请以 JSON 格式返回,包含以下字段: title, rating, summary..."
Memory 组件用于管理对话历史和状态。
from abc import ABC, abstractmethod
from typing import List
from langchain_core.messages import BaseMessage
class BaseChatMessageHistory(ABC):
"""聊天消息历史基类"""
@property
@abstractmethod
def messages(self) -> List[BaseMessage]:
"""获取所有消息"""
pass
@abstractmethod
def add_message(self, message: BaseMessage) -> None:
"""添加消息"""
pass
@abstractmethod
def clear(self) -> None:
"""清空历史"""
pass
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_community.chat_message_histories import (
FileChatMessageHistory, # 文件存储
RedisChatMessageHistory, # Redis 存储
SQLChatMessageHistory, # SQL 数据库存储
)
# 内存存储
memory = InMemoryChatMessageHistory()
memory.add_user_message("你好")
memory.add_ai_message("你好!有什么可以帮助你的?")
# 文件存储
file_memory = FileChatMessageHistory(file_path="chat_history.json")
LangChain 采用分层的集成包策略:
包类型 | 说明 | 示例 |
|---|---|---|
「langchain-core」 | 核心抽象,无第三方依赖 | Runnable, BaseTool, BaseCallbackHandler |
「Partner Packages」 | 官方维护的轻量级集成 | langchain-openai, langchain-anthropic |
「langchain-community」 | 社区维护的第三方集成 | 各种向量数据库、工具集成 |
创建新的集成包需要遵循以下结构:
langchain-myintegration/
├── pyproject.toml
├── README.md
├── langchain_myintegration/
│ ├── __init__.py
│ ├── chat_models.py # 聊天模型集成
│ ├── llms.py # LLM 集成
│ ├── embeddings.py # 嵌入模型集成
│ ├── vectorstores.py # 向量存储集成
│ └── tools.py # 工具集成
└── tests/
└── unit_tests/
from langchain_core.language_models.llms import LLM
from typing import Any, List, Optional
class MyCustomLLM(LLM):
"""自定义 LLM 集成"""
model_name: str = "my-model"
api_key: str = ""
@property
def _llm_type(self) -> str:
return"my_custom_llm"
def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> str:
"""执行 LLM 调用"""
# 实现具体的 API 调用逻辑
response = self._make_api_call(prompt)
return response
asyncdef _acall(
self,
prompt: str,
stop: Optional[List[str]] = None,
**kwargs: Any,
) -> str:
"""异步执行 LLM 调用"""
response = await self._make_async_api_call(prompt)
return response
@property
def _identifying_params(self) -> dict:
"""返回标识参数"""
return {"model_name": self.model_name}
设计模式 | 应用场景 | 说明 |
|---|---|---|
「策略模式」 | Tool, LLM, Memory | 允许运行时切换不同实现 |
「装饰器模式」 | @tool, Callbacks | 动态添加功能 |
「工厂模式」 | from_function, from_template | 创建组件实例 |
「组合模式」 | RunnableSequence, RunnableParallel | 组合多个组件 |
「观察者模式」 | Callback 系统 | 监听和响应事件 |
「模板方法模式」 | BaseTool._run | 定义算法骨架 |
「责任链模式」 | LCEL 管道 | 请求沿链传递 |
扩展点 | 基类/接口 | 用途 |
|---|---|---|
语言模型 | BaseLLM, BaseChatModel | 集成新的 LLM |
工具 | BaseTool | 添加新的工具能力 |
向量存储 | VectorStore | 集成向量数据库 |
嵌入模型 | Embeddings | 集成嵌入模型 |
文档加载器 | BaseLoader | 加载不同格式文档 |
文本分割器 | TextSplitter | 自定义分割策略 |
输出解析器 | BaseOutputParser | 自定义输出格式 |
记忆存储 | BaseChatMessageHistory | 自定义存储后端 |
回调处理 | BaseCallbackHandler | 自定义事件处理 |
1. 确定扩展类型 → 2. 继承对应基类 → 3. 实现抽象方法 → 4. 注册/使用组件
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type, Optional
class WeatherInput(BaseModel):
"""天气查询输入"""
city: str = Field(description="城市名称")
date: Optional[str] = Field(default=None, description="日期,格式 YYYY-MM-DD")
class WeatherTool(BaseTool):
"""天气查询工具"""
name: str = "weather"
description: str = "查询指定城市的天气信息"
args_schema: Type[BaseModel] = WeatherInput
api_key: str = ""# 配置项
def _run(self, city: str, date: Optional[str] = None) -> str:
# 实现天气查询逻辑
returnf"{city} 的天气是晴天,温度 25°C"
asyncdef _arun(self, city: str, date: Optional[str] = None) -> str:
# 异步实现
return self._run(city, date)
from langchain_core.callbacks import BaseCallbackHandler
from datetime import datetime
import json
class MetricsCallbackHandler(BaseCallbackHandler):
"""性能指标收集回调"""
def __init__(self):
self.metrics = []
self.start_time = None
def on_llm_start(self, serialized, prompts, **kwargs):
self.start_time = datetime.now()
def on_llm_end(self, response, **kwargs):
if self.start_time:
duration = (datetime.now() - self.start_time).total_seconds()
self.metrics.append({
"type": "llm_call",
"duration": duration,
"timestamp": datetime.now().isoformat()
})
def get_metrics(self) -> str:
return json.dumps(self.metrics, indent=2)
LangChain 的插件架构具有以下特点:
这种架构设计使 LangChain 成为一个高度可扩展、易于使用的 LLM 应用开发框架。