在多线程编程中,数据共享和线程同步是两个关键问题。C# 提供了一些强大的工具来帮助开发人员管理这些挑战,其中之一就是 BlockingCollection<T>
。这篇博客将介绍 BlockingCollection
的基本概念、用法以及它在多线程环境中的优势。
BlockingCollection<T>
是 .NET 中的一个线程安全集合类,通常用于生产者-消费者模式。它是一个高级的集合类,内部使用了 IProducerConsumerCollection<T>
接口,可以让多个线程安全地添加和移除数据。
BlockingCollection
内部实现了锁机制,确保在多线程环境中操作集合时不会出现竞争条件。Add
和 Take
方法,允许线程在集合为空或已满时等待。CancellationToken
来取消等待操作。下面是一个简单的例子,演示如何使用 BlockingCollection<T>
实现生产者-消费者模式。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program
{
public static void Main()
{
// 创建一个容量为5的 BlockingCollection
BlockingCollection<int> blockingCollection = new BlockingCollection<int>(5);
// 启动生产者任务
Task producer = Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
blockingCollection.Add(i);
Console.WriteLine($"Produced: {i}");
Thread.Sleep(100); // 模拟生产时间
}
blockingCollection.CompleteAdding();
});
// 启动消费者任务
Task consumer = Task.Run(() =>
{
while (!blockingCollection.IsCompleted)
{
try
{
int item = blockingCollection.Take();
Console.WriteLine($"Consumed: {item}");
}
catch (InvalidOperationException)
{
// 当集合完成添加且为空时抛出
break;
}
Thread.Sleep(150); // 模拟消费时间
}
});
Task.WaitAll(producer, consumer);
}
}
BlockingCollection<int>(5)
创建一个最大容量为 5 的集合。Add
方法会阻塞,直到有空余位置。Take
方法会阻塞,直到有新数据可用。CompleteAdding
方法表示不再有新数据添加,消费者可以结束。InvalidOperationException
,用于捕获集合完成添加后的访问。BlockingCollection
是基于锁的集合,适用于需要线程安全但不要求极高性能的场景。如果有新的数据持续进入,我们可以使用循环或其他控制机制来处理不断到达的数据,而不使用 CompleteAdding
,直到系统关闭为止。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main()
{
// 创建一个无上限的 BlockingCollection
BlockingCollection<int> dataQueue = new BlockingCollection<int>();
// 启动消费者线程
Task consumerTask = Task.Run(() =>
{
while (true)
{
// 检查是否有数据可消费
if (dataQueue.TryTake(out int data, Timeout.Infinite))
{
// 模拟数据处理
Console.WriteLine($"Processing data: {data}");
Thread.Sleep(100); // 模拟处理延迟
}
}
});
// 启动多个生产者线程
Task[] producerTasks = new Task[3];
for (int i = 0; i < producerTasks.Length; i++)
{
int producerId = i;
producerTasks[i] = Task.Run(() =>
{
for (int j = 0; j < 15; j++)
{
int data = producerId * 100 + j;
dataQueue.Add(data);
Console.WriteLine($"Sensor {producerId} produced: {data}");
Thread.Sleep(20); // 模拟数据生成延迟
}
});
}
// 等待所有生产者完成
Task.WaitAll(producerTasks);
// 这里不调用 CompleteAdding,因为我们模拟的是持续数据流
// 让消费者运行一段时间
Thread.Sleep(5000); // 运行5秒后停止
// 停止消费者线程
consumerTask.Wait();
}
}
CompleteAdding
:因为我们模拟的是持续数据流,所以不调用 CompleteAdding
。Thread.Sleep
只是为了演示。BlockingCollection<T>
是一个非常实用的工具,特别适合在多线程环境中实现生产者-消费者模式。它简化了线程同步和数据共享的复杂性,让开发者可以专注于核心逻辑的实现。如果你的应用程序需要在多个线程之间安全地传递数据,不妨尝试使用 BlockingCollection
。