BlockingCollection
是 .NET 集合框架中的一個類,它提供了一種線程安全的集合,可以用于在生產者和消費者之間傳遞數據。它可以處理數據流的方式如下:
BlockingCollection
中,而消費者則負責從 BlockingCollection
中獲取數據并進行處理。這種模式可以確保生產者和消費者之間的同步和數據一致性。using System;
using System.Collections.Concurrent;
using System.Threading;
class Program
{
static BlockingCollection<int> _blockingCollection = new BlockingCollection<int>();
static void Main(string[] args)
{
// 創建生產者線程
Thread producerThread = new Thread(ProduceData);
producerThread.Start();
// 創建消費者線程
Thread consumerThread = new Thread(ConsumeData);
consumerThread.Start();
}
static void ProduceData()
{
for (int i = 0; i < 10; i++)
{
_blockingCollection.Add(i);
Console.WriteLine($"Produced: {i}");
Thread.Sleep(1000); // 模擬生產數據所需的時間
}
// 生產完成,通知消費者
_blockingCollection.CompleteAdding();
}
static void ConsumeData()
{
foreach (var item in _blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine($"Consumed: {item}");
Thread.Sleep(2000); // 模擬處理數據所需的時間
}
}
}
BlockingCollection
允許你設置一個最大容量,當集合達到這個容量時,嘗試添加數據的線程將被阻塞,直到有其他線程從集合中移除數據。這可以用于限制數據流的大小。using System;
using System.Collections.Concurrent;
using System.Threading;
class Program
{
static BlockingCollection<int> _blockingCollection = new BlockingCollection<int>(3); // 設置最大容量為3
static void Main(string[] args)
{
// 創建生產者線程
Thread producerThread = new Thread(ProduceData);
producerThread.Start();
// 創建消費者線程
Thread consumerThread = new Thread(ConsumeData);
consumerThread.Start();
}
static void ProduceData()
{
for (int i = 0; i < 10; i++)
{
_blockingCollection.Add(i);
Console.WriteLine($"Produced: {i}");
Thread.Sleep(1000); // 模擬生產數據所需的時間
}
// 生產完成,通知消費者
_blockingCollection.CompleteAdding();
}
static void ConsumeData()
{
foreach (var item in _blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine($"Consumed: {item}");
Thread.Sleep(2000); // 模擬處理數據所需的時間
}
}
}
BlockingCollection
提供了一些方法,如 TryAdd
和 TryTake
,允許你在指定的時間內嘗試添加或獲取數據。如果操作在指定時間內未完成,這些方法將返回一個布爾值,表示操作是否成功。這可以用于處理數據流的超時情況。using System;
using System.Collections.Concurrent;
using System.Threading;
class Program
{
static BlockingCollection<int> _blockingCollection = new BlockingCollection<int>();
static void Main(string[] args)
{
// 創建生產者線程
Thread producerThread = new Thread(ProduceData);
producerThread.Start();
// 創建消費者線程
Thread consumerThread = new Thread(ConsumeData);
consumerThread.Start();
}
static void ProduceData()
{
for (int i = 0; i < 10; i++)
{
bool success = _blockingCollection.TryAdd(i, TimeSpan.FromSeconds(1));
if (success)
{
Console.WriteLine($"Produced: {i}");
}
else
{
Console.WriteLine($"Failed to produce: {i}");
}
Thread.Sleep(1000); // 模擬生產數據所需的時間
}
// 生產完成,通知消費者
_blockingCollection.CompleteAdding();
}
static void ConsumeData()
{
foreach (var item in _blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine($"Consumed: {item}");
Thread.Sleep(2000); // 模擬處理數據所需的時間
}
}
}
通過以上方法,你可以使用 BlockingCollection
來處理數據流。在實際應用中,你可能需要根據具體需求對這些示例進行調整。