Microsoft Agent Framework (MAF) 提供了一套强大的 Workflow(工作流) 框架,用于编排和协调多个智能体(Agent)或处理组件的执行流程。
本课将以通俗易懂的方式,帮助你理解 MAF Workflow 的核心概念:
概念 | 说明 | 类比 |
|---|---|---|
Workflow | 工作流定义 | 📋 流程图 |
Executor | 执行器/处理节点 | 🔧 工人 |
Edge | 连接边 | ➡️ 传送带 |
SuperStep | 超步/批量处理 | ⏱️ 处理周期 |
WorkflowContext | 工作流上下文 | 📦 工作台 |
WorkflowEvent | 工作流事件 | 📣 通知 |
Run | 运行实例 | 🏃 一次执行 |
Checkpoint | 检查点 | 💾 存档点 |

让我们逐一深入了解这些概念!
在构建 AI 智能体应用时,我们经常需要:
传统方式的问题:
// ❌ 硬编码的流程,难以维护和扩展
var result1 = await agent1.ProcessAsync(input);
if (result1.Success)
{
var result2 = await agent2.ProcessAsync(result1.Data);
// ... 越来越复杂
}使用 Workflow 的优势:
// ✅ 声明式定义,清晰可维护
var workflow = new WorkflowBuilder(startExecutor)
.AddEdge(executor1, executor2)
.AddEdge(executor2, executor3, condition: x => x.Success)
.Build();
Executor(执行器) 是 Workflow 中的最小工作单元,类似于:
类比 | 说明 |
|---|---|
🧑🏭 工厂里的工人 | 每个工人负责一道工序 |
🧱 乐高积木块 | 每个积木有特定功能,组合成整体 |
🔌 电路中的元件 | 接收输入信号,输出处理结果 |

ConfigureRoutes 方法定义能处理哪些类型的消息IWorkflowContext 访问和修改工作流状态MAF 提供了多种 Executor 类型,满足不同场景需求:
类型 | 用途 | 示例场景 |
|---|---|---|
Executor<TInput> | 处理消息,无返回值 | 日志记录、通知发送 |
Executor<TInput, TOutput> | 处理消息,有返回值 | 数据转换、AI 调用 |
FunctionExecutor<T> | 用委托函数快速创建 | 简单处理逻辑 |
StatefulExecutor<TState> | 需要维护状态的执行器 | 会话管理、计数器 |
从源码中可以看到 Executor 的核心设计:
// 来自 Executor.cs
public abstract class Executor : IIdentified
{
// 唯一标识符
public string Id { get; }
// 配置消息路由(子类必须实现)
protected abstract RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder);
// 执行消息处理
public async ValueTask<object?> ExecuteAsync(
object message,
TypeId messageType,
IWorkflowContext context,
CancellationToken cancellationToken = default)
{
// 记录调用事件
await context.AddEventAsync(new ExecutorInvokedEvent(this.Id, message));
// 路由消息到正确的处理器
CallResult? result = await this.Router.RouteMessageAsync(message, context, ...);
// 记录完成或失败事件
ExecutorEvent executionResult = result?.IsSuccess is not false
? new ExecutorCompletedEvent(this.Id, result?.Result)
: new ExecutorFailedEvent(this.Id, result.Exception);
await context.AddEventAsync(executionResult);
return result.Result;
}
}关键点:
ConfigureRoutes 声明能处理的消息类型ExecutorInvokedEvent、ExecutorCompletedEvent 等)Edge(边) 是连接两个 Executor 的消息通道,类似于:
类比 | 说明 |
|---|---|
🏭 工厂传送带 | 把上一道工序的产品传送到下一道工序 |
🔗 水管 | 把水从一个容器引导到另一个容器 |
📨 邮路 | 把信件从发件人送到收件人 |

MAF 支持三种类型的 Edge,适用于不同的流程模式:

类型 | 说明 | 使用场景 |
|---|---|---|
Direct(直连) | 一对一连接 | 顺序处理流程 |
FanOut(扇出) | 一对多连接 | 并行分发任务 |
FanIn(扇入) | 多对一连接 | 汇聚多个结果 |
从源码可以看到 Edge 的核心结构:
// 来自 Edge.cs
public enum EdgeKind
{
Direct, // 直连:一对一
FanOut, // 扇出:一对多
FanIn // 扇入:多对一
}
public sealed class Edge
{
public EdgeKind Kind { get; init; } // 边的类型
public EdgeData Data { get; init; } // 边的具体数据
}
// 来自 DirectEdgeData.cs - 直连边的数据
public sealed class DirectEdgeData : EdgeData
{
public string SourceId { get; } // 源 Executor ID
public string SinkId { get; } // 目标 Executor ID
public Func<object?, bool>? Condition; // 可选的条件判断
}Direct Edge 示意图:

关键点:
Workflow(工作流) 是将多个 Executor 通过 Edge 连接起来的完整流程定义,类似于:
类比 | 说明 |
|---|---|
📋 流程图 | 定义了从开始到结束的完整流程 |
🏭 生产线 | 多个工位通过传送带连接成完整生产线 |
🎼 乐谱 | 规定了演奏的顺序和节奏 |

从源码中可以看到 Workflow 的核心结构:
// 来自 Workflow.cs
public class Workflow
{
// 起始 Executor 的 ID
public string StartExecutorId { get; }
// 工作流名称(可选)
public string? Name { get; internal init; }
// 工作流描述(可选)
public string? Description { get; internal init; }
// Executor 绑定字典
internal Dictionary<string, ExecutorBinding> ExecutorBindings { get; init; }
// 边的集合(按源节点分组)
internal Dictionary<string, HashSet<Edge>> Edges { get; init; }
// 输出 Executor 集合
internal HashSet<string> OutputExecutors { get; init; }
}MAF 采用 建造者模式(Builder Pattern) 来构建 Workflow,这使得工作流的定义更加直观:
// 创建工作流示例
var workflow = new WorkflowBuilder(startExecutor) // 指定起点
.WithName("订单处理工作流") // 设置名称
.WithDescription("处理电商订单的完整流程") // 设置描述
.AddEdge(receiveOrder, validateOrder) // 添加边
.AddEdge(validateOrder, processPayment,
condition: x => x.IsValid) // 条件边
.AddEdge(processPayment, sendNotification)
.WithOutputFrom(sendNotification) // 指定输出节点
.Build(); // 构建工作流
关键方法:
方法 | 说明 |
|---|---|
AddEdge(source, sink) | 添加直连边 |
AddEdge(..., condition) | 添加条件边 |
AddFanOut(source, sinks) | 添加扇出边 |
AddFanIn(sources, sink) | 添加扇入边 |
WithOutputFrom(executor) | 指定输出节点 |
BindExecutor(executor) | 绑定占位符执行器 |
Build() | 构建最终的 Workflow |
SuperStep(超步) 是 Workflow 执行的基本处理周期。可以类比为:
类比 | 说明 |
|---|---|
🎮 游戏中的"回合" | 每个回合内所有玩家同时行动 |
⏰ 工厂的"班次" | 每个班次内完成一批任务 |
🌊 海浪的"一波" | 一波消息被处理,然后产生下一波 |

每个 SuperStep 内部执行的步骤:

关键事件:
SuperStepStartedEvent:超步开始SuperStepCompletedEvent:超步完成// SuperStep 事件定义
public class SuperStepEvent(int stepNumber, object? data = null) : WorkflowEvent(data)
{
// 超步的序号(从 0 开始)
public int StepNumber => stepNumber;
}WorkflowContext(工作流上下文) 是 Executor 执行时的运行环境,类似于:
类比 | 说明 |
|---|---|
🛠️ 工人的工作台 | 提供工具、材料和通信渠道 |
📞 通信枢纽 | 允许各个工位之间传递信息 |
💾 共享内存 | 存储和读取状态数据 |

// 来自 IWorkflowContext.cs
public interface IWorkflowContext
{
// 添加工作流事件(在当前 SuperStep 结束时触发)
ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default);
// 发送消息给下游 Executor(在下一个 SuperStep 处理)
ValueTask SendMessageAsync(object message, string? targetId, CancellationToken cancellationToken = default);
// 输出工作流结果
ValueTask YieldOutputAsync(object output, CancellationToken cancellationToken = default);
// 请求在当前 SuperStep 结束时停止工作流
ValueTask RequestHaltAsync();
// 读取状态
ValueTask<T?> ReadStateAsync<T>(string key, string? scopeName = null, CancellationToken cancellationToken = default);
// 读取或初始化状态
ValueTask<T> ReadOrInitStateAsync<T>(string key, Func<T> initialStateFactory, string? scopeName = null, CancellationToken cancellationToken = default);
// 更新状态(排队更新,在 SuperStep 结束时应用)
ValueTask QueueStateUpdateAsync<T>(string key, T value, string? scopeName = null, CancellationToken cancellationToken = default);
}关键点:
SendMessageAsync 在 Executor 之间传递消息AddEventAsync 发出事件RequestHaltAsync 停止工作流WorkflowEvent(工作流事件) 是工作流执行过程中产生的通知消息,类似于:
类比 | 说明 |
|---|---|
📢 广播通知 | 向所有人广播系统状态变化 |
📝 日志记录 | 记录系统执行过程中的关键节点 |
🔔 事件订阅 | 允许外部监听并响应特定事件 |

事件层级 | 事件类型 | 说明 |
|---|---|---|
工作流级别 | WorkflowStartedEvent | 工作流开始执行 |
WorkflowOutputEvent | 工作流产生输出 | |
WorkflowErrorEvent | 工作流发生错误 | |
WorkflowWarningEvent | 工作流产生警告 | |
超步级别 | SuperStepStartedEvent | 超步开始 |
SuperStepCompletedEvent | 超步完成 | |
执行器级别 | ExecutorInvokedEvent | Executor 被调用 |
ExecutorCompletedEvent | Executor 完成处理 | |
ExecutorFailedEvent | Executor 处理失败 |
Run(运行实例) 是 Workflow 的一次具体执行,类似于:
类比 | 说明 |
|---|---|
🎬 电影的一场放映 | 同一部电影可以放映多场 |
🏭 生产线的一个批次 | 同一条生产线可以生产多个批次 |
🎮 游戏的一局 | 同一个游戏可以玩多局 |

// 来自 Run.cs
public sealed class Run : IAsyncDisposable
{
// 运行实例的唯一标识符
public string RunId => this._runHandle.RunId;
// 获取当前执行状态
public ValueTask<RunStatus> GetStatusAsync(CancellationToken cancellationToken = default);
// 获取所有产生的事件
public IEnumerable<WorkflowEvent> OutgoingEvents => this._eventSink;
// 获取自上次访问后的新事件
public IEnumerable<WorkflowEvent> NewEvents { get; }
// 恢复执行(带外部响应)
public async ValueTask<bool> ResumeAsync(IEnumerable<ExternalResponse> responses, CancellationToken cancellationToken = default);
}public enum RunStatus
{
NotStarted, // 尚未开始
Idle, // 空闲(已暂停,无待处理请求)
PendingRequests, // 等待外部响应
Ended, // 已结束
Running // 正在运行
}
Checkpoint(检查点) 是工作流在某个时刻的完整状态快照,类似于:
类比 | 说明 |
|---|---|
🎮 游戏存档 | 保存游戏进度,随时可以读档继续 |
📸 照片 | 记录某一时刻的完整状态 |
🔖 书签 | 标记阅读进度,下次从这里继续 |

// 来自 CheckpointInfo.cs
public sealed class CheckpointInfo
{
// 运行实例的唯一标识符
public string RunId { get; }
// 检查点的唯一标识符
public string CheckpointId { get; }
}
// 检查点的完整数据(来自 Checkpoint.cs)
internal sealed class Checkpoint
{
public int StepNumber { get; } // 超步编号
public WorkflowInfo Workflow { get; } // 工作流信息
public RunnerStateData RunnerData { get; } // 运行器状态
public Dictionary<ScopeKey, PortableValue> StateData { get; } // 状态数据
public Dictionary<EdgeId, PortableValue> EdgeStateData { get; } // 边状态数据
public CheckpointInfo? Parent { get; } // 父检查点
}场景 | 说明 |
|---|---|
故障恢复 | 系统崩溃后从最近的检查点恢复 |
长时间运行 | 分段执行,每段结束保存进度 |
人机交互 | 等待用户输入时保存状态 |
调试回放 | 从任意检查点重新执行 |
版本分支 | 从同一个检查点创建多个分支执行 |

让我们把所有核心概念联系起来,看看它们是如何协作的:

从工作流的完整生命周期来看:

从消息在工作流中的流动来看:

关键理解:
让我们通过一个实际场景来理解这些概念的应用:

概念 | 在此场景中的体现 |
|---|---|
Workflow | 整个订单处理流程 |
Executor | 每个处理步骤(接收、验证、支付等) |
Edge | 步骤之间的连接(含条件判断) |
SuperStep | 每一轮处理(如:超步1处理接收,超步2处理验证...) |
WorkflowContext | 提供订单状态读写、消息发送能力 |
WorkflowEvent | 每个步骤的开始/完成/失败事件 |
Run | 一个具体订单的处理过程 |
Checkpoint | 处理中途保存的状态(如支付完成后保存) |
在 AI Agent 场景中,Workflow 可以用来编排多个 Agent 的协作:

Workflow 的优势:
优势 | 说明 |
|---|---|
🔄 灵活编排 | 可以轻松调整 Agent 之间的协作关系 |
💾 状态管理 | 自动管理各 Agent 的状态和上下文 |
⏸️ 可中断/恢复 | 支持人机交互,随时暂停和恢复 |
🔍 可观测性 | 通过事件追踪整个执行过程 |
🛡️ 容错能力 | 通过检查点支持故障恢复 |
概念 | 定义 | 关键类 | 核心职责 |
|---|---|---|---|
Executor | 执行器/处理节点 | Executor, Executor<T>, FunctionExecutor<T> | 处理消息,产生输出 |
Edge | 连接边 | Edge, EdgeData, DirectEdgeData | 定义消息流向和条件 |
Workflow | 工作流定义 | Workflow, WorkflowBuilder | 组织 Executor 和 Edge |
SuperStep | 超步/批量处理周期 | SuperStepEvent, SuperStepStartedEvent | 批量处理消息 |
WorkflowContext | 工作流上下文 | IWorkflowContext | 提供运行时服务 |
WorkflowEvent | 工作流事件 | WorkflowEvent, ExecutorEvent | 通知执行状态 |
Run | 运行实例 | Run, RunStatus | 管理一次执行 |
Checkpoint | 检查点 | CheckpointInfo, ICheckpointStore | 保存和恢复状态 |
"工作流程边连接,执行器来做处理" "超步批量往前走,上下文中读写态" "事件通知全过程,运行实例跑一次" "检查点来存进度,随时恢复不怕挂"