
在.NET异步编程中,处理大量数据的异步迭代是常见需求。IAsyncEnumerable<T> 提供了一种高效的异步迭代模式,解决了传统同步迭代在异步场景下的性能与资源问题。深入学习它,能帮助开发者避免异步迭代中的陷阱,构建高性能、响应式的应用程序。
在传统的同步编程中,IEnumerable<T> 用于迭代集合。但当涉及到异步操作,如从数据库或网络中读取大量数据时,使用同步迭代会阻塞线程,导致应用程序响应性变差。IAsyncEnumerable<T> 应运而生,它允许在不阻塞主线程的情况下异步迭代数据,特别适用于处理I/O密集型任务,如从远程服务分页获取数据或处理大型数据流。
IAsyncEnumerable<T> 基于迭代器模式的异步版本。它允许逐个异步地生成序列中的元素,而不是一次性加载整个集合。这意味着在处理大数据集时,内存占用可以保持在较低水平。IEnumerable<T> 类似,IAsyncEnumerable<T> 也是延迟执行的。只有当消费端开始迭代时,数据生成逻辑才会执行。这种特性在处理复杂或资源消耗大的数据生成操作时,能显著提升性能。IAsyncEnumerable<T> 接口仅定义了一个方法 GetAsyncEnumerator,该方法返回一个实现 IAsyncEnumerator<T> 接口的对象。public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
}MoveNextAsync 用于推进到下一个元素,Current 获取当前元素,以及 DisposeAsync 用于释放资源。public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
T Current { get; }
ValueTask<bool> MoveNextAsync();
ValueTask DisposeAsync();
}IAsyncEnumerable<T> 的异步方法时,编译器会生成一个状态机类,该类实现了 IAsyncEnumerator<T> 接口,管理异步迭代的状态和逻辑。using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async IAsyncEnumerable<int> GetNumbersAsync(int count)
{
for (int i = 0; i < count; i++)
{
// 模拟异步操作,如数据库查询
await Task.Delay(100);
yield return i;
}
}
static async Task Main()
{
await foreach (var number in GetNumbersAsync(5))
{
Console.WriteLine($"Received number: {number}");
}
}
}GetNumbersAsync 方法返回一个 IAsyncEnumerable<int>,在方法内部通过 await Task.Delay 模拟异步操作,并使用 yield return 返回每个元素。Main 方法使用 await foreach 异步迭代这个序列。GetPageAsync。using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
class RemoteApi
{
public async Task<List<int>> GetPageAsync(int pageIndex, int pageSize)
{
// 模拟远程API调用
await Task.Delay(500);
var result = new List<int>();
for (int i = pageIndex * pageSize; i < (pageIndex + 1) * pageSize; i++)
{
result.Add(i);
}
return result;
}
}
class Program
{
static async IAsyncEnumerable<int> GetAllDataAsync(RemoteApi api, int pageSize)
{
int pageIndex = 0;
while (true)
{
var page = await api.GetPageAsync(pageIndex, pageSize);
if (page.Count == 0)
{
break;
}
foreach (var item in page)
{
yield return item;
}
pageIndex++;
}
}
static async Task Main()
{
var api = new RemoteApi();
await foreach (var data in GetAllDataAsync(api, 3))
{
Console.WriteLine($"Processed data: {data}");
}
}
}RemoteApi 类中的 GetPageAsync 方法模拟远程API调用。GetAllDataAsync 方法通过循环调用 GetPageAsync 并使用 yield return 逐个返回数据。Main 方法异步迭代获取到的所有数据并进行处理。using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async IAsyncEnumerable<int> GetNumbersAsync(int count, CancellationToken cancellationToken)
{
for (int i = 0; i < count; i++)
{
await Task.Delay(100);
yield return i;
}
}
static async Task Main()
{
var cancellationTokenSource = new CancellationTokenSource();
var task = Task.Run(async () =>
{
try
{
await foreach (var number in GetNumbersAsync(10, cancellationTokenSource.Token))
{
Console.WriteLine($"Received number: {number}");
if (number == 5)
{
cancellationTokenSource.Cancel();
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Iteration cancelled");
}
});
await task;
}
}GetNumbersAsync 方法虽然接受了取消令牌,但在内部并没有检查取消令牌,导致即使调用了 cancellationTokenSource.Cancel(),迭代也不会停止。using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static async IAsyncEnumerable<int> GetNumbersAsync(int count, CancellationToken cancellationToken)
{
for (int i = 0; i < count; i++)
{
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(100, cancellationToken);
yield return i;
}
}
static async Task Main()
{
var cancellationTokenSource = new CancellationTokenSource();
var task = Task.Run(async () =>
{
try
{
await foreach (var number in GetNumbersAsync(10, cancellationTokenSource.Token))
{
Console.WriteLine($"Received number: {number}");
if (number == 5)
{
cancellationTokenSource.Cancel();
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Iteration cancelled");
}
});
await task;
}
}GetNumbersAsync 方法中,每次循环开始时检查 cancellationToken,并在 Task.Delay 中传递取消令牌,这样当令牌取消时,能正确抛出 OperationCanceledException 停止迭代。OperationCanceledException 并输出 “Iteration cancelled”,停止迭代。IAsyncEnumerable<T> 在处理I/O密集型任务时性能优势明显。例如,从数据库中读取大量数据,同步迭代可能会阻塞线程,导致CPU利用率过高,而异步迭代可以释放线程资源,提高系统的并发处理能力。通过性能测试工具(如BenchmarkDotNet)测试,在处理10000条数据时,同步迭代可能需要几百毫秒甚至更长时间,而异步迭代可以在几十毫秒内完成,同时保持较低的内存占用。IAsyncEnumerable<T> 与 IEnumerable<T> 有什么区别? IEnumerable<T> 用于同步迭代,会阻塞线程;而 IAsyncEnumerable<T> 用于异步迭代,不会阻塞线程,适用于I/O密集型任务。await foreach 块中使用 try-catch 来捕获异步迭代过程中抛出的异常。IAsyncEnumerable<T> 转换为 IEnumerable<T>? IAsyncEnumerable<T> 是.NET异步编程中处理异步迭代的强大工具。其核心在于异步迭代和延迟执行的原理,通过状态机实现底层逻辑。在实践中,开发者需正确处理取消令牌、避免阻塞操作。随着.NET的发展,预计会进一步优化异步迭代的性能和易用性。适用于处理I/O密集型、大数据量的异步操作场景,但在计算密集型场景下,可能需要结合其他技术来提升性能。