上一篇我们用 LangGraph 搭了科研 Multi-Agent,这一篇换个更接地气的场景:
“我每个月就能拿出 2000 块,想做基金定投,有没有一个 AI 帮我做筛选?”
传统做法:
这就是一个很适合 Multi-Agent 的场景:
把「信息搜集 → 指标计算 → 风险评估 → 资产配置建议 → 生成报告」 拆给不同的 Agent,最后 LangGraph 把流程编排成一张图。

先定一个非常具体的目标:
输入:
用户风险偏好(保守 / 稳健 / 激进) 理财目标(教育金 / 退休 / 买房) 输出: 一份 Markdown 基金分析报告(含候选基金列表、核心指标 & 一个简单定投建议)
我们拆成 4 个 Agent:
最外层再由 LangGraph 控制流程:
START → UniverseBuilder → FundDataAnalyst → RiskProfiler → PortfolioAdvisor → END
我们用 TypedDict + LangGraph 的 Annotated 来定义共享状态:
# state.py
from typing import TypedDict, List, Dict, Optional
from typing_extensions import Annotated
import operator
class FundInfo(TypedDict):
code: str
name: str
type: str # 指数 / 债券 / 混合等
class FundMetric(TypedDict):
code: str
annual_return: float
max_drawdown: float
sharpe: float
class FundRisk(TypedDict):
code: str
risk_level: str # Low / Medium / High
risk_score: float
class PortfolioSuggestion(TypedDict):
funds: List[Dict]
comment: str
class FundResearchState(TypedDict):
# 用户输入
user_risk_profile: str # "conservative" / "balanced" / "aggressive"
user_goal: str # "retirement" / "education" / ...
# 1) 候选基金池
universe: Annotated[List[FundInfo], operator.add]
# 2) 指标结果
metrics: Annotated[List[FundMetric], operator.add]
# 3) 风险评估结果
risks: Annotated[List[FundRisk], operator.add]
# 4) 最终建议
portfolio: Optional[PortfolioSuggestion]
# 日志方便排查
logs: Annotated[List[str], operator.add]为了代码能跑通,我们先造一个小小的 in-memory DB:
# mock_db.py
from typing import Dict, List
# 简单模拟几个基金(全是编的)
FUND_UNIVERSE = [
{
"code": "F001",
"name": "先锋宽基指数 A",
"type": "equity_index",
},
{
"code": "F002",
"name": "安稳纯债债券 C",
"type": "bond",
},
{
"code": "F003",
"name": "成长精选混合",
"type": "balanced",
},
{
"code": "F004",
"name": "科技创新指数",
"type": "sector_tech",
},
]
# 模拟每只基金过去三年的关键指标(编的)
FUND_METRICS = {
"F001": {"annual_return": 0.10, "max_drawdown": -0.18, "sharpe": 1.1},
"F002": {"annual_return": 0.045, "max_drawdown": -0.05, "sharpe": 0.9},
"F003": {"annual_return": 0.085, "max_drawdown": -0.15, "sharpe": 0.95},
"F004": {"annual_return": 0.14, "max_drawdown": -0.28, "sharpe": 1.2},
}真正上生产时,你可以用
tushare / 聚宽 / Wind / 自建 API来替换这里的 mock 数据。
规则先写简单一点:
# nodes.py
from typing import List
from state import FundResearchState, FundInfo
from mock_db import FUND_UNIVERSE, FUND_METRICS
def universe_builder_node(state: FundResearchState) -> FundResearchState:
risk = state["user_risk_profile"]
goal = state["user_goal"]
candidates: List[FundInfo] = []
if risk == "conservative":
# 债券 + 少量宽基
for f in FUND_UNIVERSE:
if f["type"] == "bond" or f["type"] == "equity_index":
candidates.append(FundInfo(**f))
elif risk == "balanced":
# 债券 + 宽基 + 混合
for f in FUND_UNIVERSE:
if f["type"] in ["bond", "equity_index", "balanced"]:
candidates.append(FundInfo(**f))
else: # aggressive
# 偏股 / 行业 / 成长
for f in FUND_UNIVERSE:
if f["type"] in ["equity_index", "sector_tech", "balanced"]:
candidates.append(FundInfo(**f))
state["universe"].extend(candidates)
state["logs"].append(
f"[UniverseBuilder] risk={risk}, goal={goal}, selected={len(candidates)} funds."
)
return state真实场景你会在这里拉历史净值然后用 pandas 算指标,这里我们先走通 pipeline:
from state import FundMetric
def metrics_node(state: FundResearchState) -> FundResearchState:
metrics_list: List[FundMetric] = []
for f in state["universe"]:
code = f["code"]
m = FUND_METRICS.get(code)
if not m:
continue
metrics = FundMetric(
code=code,
annual_return=m["annual_return"],
max_drawdown=m["max_drawdown"],
sharpe=m["sharpe"],
)
metrics_list.append(metrics)
state["metrics"].extend(metrics_list)
state["logs"].append(
f"[FundDataAnalyst] computed metrics for {len(metrics_list)} funds."
)
return state示意一下很简单的「规则 + LLM」混合方式:
先用规则快速算出一个 risk_score,再用 LLM 帮我们生成 level & 说明(可选)。
这里先写纯规则版(方便你本地随时跑):
from state import FundRisk
def risk_profiler_node(state: FundResearchState) -> FundResearchState:
risk_list: List[FundRisk] = []
for m in state["metrics"]:
# 简单构造一个 risk_score:收益高 + 回撤大 + 夏普高 = 高风险高收益
# 这里只是玩具公式
score = (
0.4 * (m["annual_return"] * 100) # 越高越高风险
- 0.3 * (m["max_drawdown"] * -100) # 回撤越深风险越高
- 0.3 * m["sharpe"] * 10 # 夏普越高风险得适当抵消
)
if score < 10:
level = "Low"
elif score < 20:
level = "Medium"
else:
level = "High"
risk_item = FundRisk(
code=m["code"],
risk_level=level,
risk_score=round(score, 2),
)
risk_list.append(risk_item)
state["risks"].extend(risk_list)
state["logs"].append(
f"[RiskProfiler] evaluated risk for {len(risk_list)} funds."
)
return state真正落地时,你可以把这里替换成: 「规则 + LLM 分析 + 回测结果」综合打分。
这里我们用 LLM 来生成最后的分析报告,把前面所有结果喂进去。
# advisor.py
from typing import Dict
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
from llm_config import build_llm
from state import FundResearchState, PortfolioSuggestion
llm = build_llm()
report_prompt = PromptTemplate(
input_variables=["risk_profile", "goal", "universe", "metrics", "risks"],
template=(
"你是一个理财顾问,下面是关于用户和候选基金的数据:\n"
"用户风险偏好: {risk_profile}\n"
"用户目标: {goal}\n\n"
"候选基金列表 (JSON):\n{universe}\n\n"
"基金指标 (JSON):\n{metrics}\n\n"
"风险评估 (JSON):\n{risks}\n\n"
"请生成一份 Markdown 格式的基金分析报告,包含:\n"
"1. 用户画像简单描述\n"
"2. 按风险等级分类的基金列表(列出代码、名称、年化收益、最大回撤、风险等级)\n"
"3. 一个简单的定投建议组合(假设每月投入 2000 元,给出一个大致的权重,例如 40% 债券 + 60% 宽基),\n"
" 要和用户风险偏好匹配。\n"
"4. 用条目列出 3~5 条风险提示(比如:历史收益不代表未来,短期波动等)。\n"
"5. 在最后强调:本报告仅为 AI 示例,不构成任何投资建议。\n"
),
)
report_chain = LLMChain(llm=llm, prompt=report_prompt)
def portfolio_advisor_node(state: FundResearchState) -> FundResearchState:
import json
universe_json = json.dumps(state["universe"], ensure_ascii=False, indent=2)
metrics_json = json.dumps(state["metrics"], ensure_ascii=False, indent=2)
risks_json = json.dumps(state["risks"], ensure_ascii=False, indent=2)
report_md = report_chain.run(
risk_profile=state["user_risk_profile"],
goal=state["user_goal"],
universe=universe_json,
metrics=metrics_json,
risks=risks_json,
)
# 这里简单存一下
portfolio = PortfolioSuggestion(
funds=[], # 也可以让 LLM 顺便生成一个 JSON,这里先占位
comment=report_md,
)
state["portfolio"] = portfolio
state["logs"].append("[PortfolioAdvisor] generated markdown report.")
return state和科研篇一样,用 LangGraph 把节点组装成一张图:
# graph_build.py
from langgraph.graph import StateGraph, START, END
from state import FundResearchState
from nodes import (
universe_builder_node,
metrics_node,
risk_profiler_node,
)
from advisor import portfolio_advisor_node
def build_fund_graph():
builder = StateGraph(FundResearchState)
# 注册节点
builder.add_node("universe_builder", universe_builder_node)
builder.add_node("metrics_node", metrics_node)
builder.add_node("risk_profiler", risk_profiler_node)
builder.add_node("portfolio_advisor", portfolio_advisor_node)
# 定义边:START -> universe -> metrics -> risk -> advisor -> END
builder.add_edge(START, "universe_builder")
builder.add_edge("universe_builder", "metrics_node")
builder.add_edge("metrics_node", "risk_profiler")
builder.add_edge("risk_profiler", "portfolio_advisor")
builder.add_edge("portfolio_advisor", END)
graph = builder.compile()
return graph运行一个完整 demo:
# main.py
from graph_build import build_fund_graph
def main():
graph = build_fund_graph()
init_state = {
"user_risk_profile": "balanced", # conservative / balanced / aggressive
"user_goal": "retirement",
"universe": [],
"metrics": [],
"risks": [],
"portfolio": None,
"logs": [],
}
final_state = graph.invoke(init_state)
print("==== Logs ====")
for log in final_state["logs"]:
print(log)
print("\n==== Report (Markdown) ====\n")
if final_state["portfolio"]:
print(final_state["portfolio"]["comment"])
if __name__ == "__main__":
main()这套代码跑完,你会得到类似效果:
[UniverseBuilder] risk=balanced, goal=retirement, selected=3 funds.[FundDataAnalyst] computed metrics for 3 funds.[RiskProfiler] evaluated risk for 3 funds.[PortfolioAdvisor] generated markdown report.原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。