
在当今世界,应用程序常常需要处理大量数据或进行实时更新。无论是股票价格的流式传输、日志处理,还是用户生成的内容,设计一个响应迅速且高效的数据管道都至关重要。借助 C# 的异步流和 IAsyncEnumerable,我们能够创建异步数据处理的无缝流程,同时保持出色的可读性和性能。
在本文中,我们将探讨异步流如何简化复杂的工作流程、实现数据处理管道以及应对现实世界中的各种挑战。
在 C# 8.0 中引入的异步流将异步编程的强大功能与可枚举集合相结合。它们使用 IAsyncEnumerable<T> 接口来允许对异步操作的集合进行迭代。
主要优点:
以下是一个简单示例:
async IAsyncEnumerable<int>GenerateNumbersAsync()
{
for(int i =; i <=; i++)
{
await Task.Delay();// 模拟异步工作
yieldreturn i;
}
}
awaitforeach(var number inGenerateNumbersAsync())
{
Console.WriteLine(number);
}
输出:
1
2
3
4
5
让我们构建一个用于实时处理日志文件的数据管道。想象一下,有一个系统会生成日志条目,这些条目需要经过筛选、转换,然后保存到数据库中。以下展示了异步流如何简化这一工作流程:
步骤 1:生成日志(数据源) 第一步是创建一个异步日志生成器。
async IAsyncEnumerable<string>GenerateLogsAsync()
{
string[] logLevels ={"INFO","WARN","ERROR"};
for(int i =; i <=; i++)
{
await Task.Delay();// 模拟日志生成延迟
yieldreturn$"{DateTime.UtcNow}: {logLevels[i % ]} Log entry {i}";
}
}
步骤 2:筛选日志(处理过程) 让我们筛选出级别为“INFO”的日志。
async IAsyncEnumerable<string>FilterLogsAsync(IAsyncEnumerable<string> logs)
{
awaitforeach(var log in logs)
{
if(!log.Contains("INFO"))
{
yieldreturn log;
}
}
}
步骤 3:转换日志(处理过程) 将日志条目转换为结构化格式。
record LogEntry(DateTime Timestamp,string Level,string Message);
asyncIAsyncEnumerable<LogEntry>TransformLogsAsync(IAsyncEnumerable<string> logs)
{
awaitforeach(var log in logs)
{
var parts = log.Split(' ',);
yieldreturnnewLogEntry(
DateTime.Parse(parts[]),
parts[].TrimEnd(':'),
parts[]
);
}
}
步骤 4:保存日志(数据汇) 最后,将日志存储到数据库中。
async Task SaveLogsAsync(IAsyncEnumerable<LogEntry> logs)
{
await foreach (var log in logs)
{
Console.WriteLine($"Saving log to DB: {log}");
await Task.Delay(); // 模拟数据库保存延迟
}
}
步骤 5:组装管道 将所有步骤整合在一起:
async TaskProcessLogsAsync()
{
var logs =GenerateLogsAsync();
var filteredLogs =FilterLogsAsync(logs);
var structuredLogs =TransformLogsAsync(filteredLogs);
awaitSaveLogsAsync(structuredLogs);
}
awaitProcessLogsAsync();
输出:
Saving log to DB: LogEntry { Timestamp = 2024-11-22T12:00:00Z, Level = WARN, Message = Log entry 2 }
Saving log to DB: LogEntry { Timestamp = 2024-11-22T12:00:01Z, Level = ERROR, Message = Log entry 3 }
...
在管道的任何阶段都可能出现错误。在 await foreach 循环内部或生成器中使用 try-catch 块:
async IAsyncEnumerable<string>SafeGenerateLogsAsync()
{
try
{
for(int i =; i <=; i++)
{
if(i ==)thrownewException("Simulated error");
yieldreturn$"Log {i}";
}
}
catch(Exception ex)
{
yieldreturn$"Error: {ex.Message}";
}
}
在管道中处理错误:
await foreach (var log in SafeGenerateLogsAsync())
{
Console.WriteLine(log);
}
Channel 或 Parallel.ForEachAsync 来进行可控的并行处理。以下是一个流式传输和处理股票价格的示例:
async IAsyncEnumerable<(string Symbol, decimal Price)>FetchStockPricesAsync(string[] symbols)
{
var random =newRandom();
foreach(var symbol in symbols)
{
await Task.Delay();// 模拟数据获取
yieldreturn(symbol, random.Next(,)+ random.NextDecimal());
}
}
转换并筛选数据:
async IAsyncEnumerable<string>AnalyzeStockPricesAsync(IAsyncEnumerable<(string Symbol, decimal Price)> prices)
{
awaitforeach(var(symbol, price)in prices)
{
if(price >)
{
yieldreturn$"{symbol}: {price} is above the threshold!";
}
}
}异步流 IAsyncEnumerable 为构建响应迅速且可扩展的数据管道提供了一种优雅的解决方案。从日志处理到实时数据分析,其潜在应用非常广泛。通过将异步编程与可枚举集合相结合,它们简化了复杂的工作流程,减少了内存开销,并提升了性能。