[C#]生產者vs消費者的典型案例(使用TPL的DataFlow來加快速度)
前言
上一篇已經介紹過傳統使用TPL的方式來處理生產者消費者的需求,但是有一種更強大的數據流的方式,更適合來處理這種消費者和生產者的例子,這篇並不會解釋得很清楚什麼是生產者和消費者,如果你覺得此篇文章內容看不懂的話,請移駕到(https://dotblogs.com.tw/kinanson/2017/05/17/145521),不過DataFlow算是比較進階的用法,所以必須對async await的掌握度夠高,才比較好懂和發揮它的優勢。
安裝DataFlow
雖然DataFlow也是微軟做的package,但是卻不在原本的framework裡面,我們必須使用nuget來下載安裝
使用DataFlow來改善生產者vs消費者
首先因為數據流提供了更多async的方法供我們使用,在一個比較複雜的邏輯裡面,這些非同步的方法對我們會有很大的幫助,接下來先給一下最原先的案例完全沒優化過的程式碼,可能會是長這個樣子
void Main()
{
Producer();
Consumer();
}
private static List<int> data = new List<int>(); //共享資料
private static void Producer() //生產者
{
for (int ctr = 0; ctr < 20; ctr++)
{
Thread.Sleep(100);
data.Add(ctr);
}
}
private static void Consumer() //消費者
{
foreach (var item in data)
{
Console.WriteLine(item);
Thread.Sleep(100);
}
}
接著我們把此段改成DataFlow的方式
void Main()
{
var buffer = new BufferBlock<Int32>(); //數據流提供的類別
var consumer = ConsumeData(buffer);//把數據流丟到消費者做處理
for (int i = 0; i < 20; i++)
{
buffer.Post(i); //生產者處理部份
Thread.Sleep(100);
}
buffer.Complete(); //生產完畢
consumer.Wait(); //這邊不處理的話,就是射後不理了
}
static async Task ConsumeData(ISourceBlock<Int32> source)
{
while (await source.OutputAvailableAsync())
{
Int32 data =source.Receive(); //取得資料
Console.WriteLine(data);
}
}
結果就是馬上生產馬上就消費,不會等全部生產完了才做消費的概念
接下來我們再進化一下,把迴圈增加到200筆,如果以目前的方式來進行的話,100*200應該要20秒才能進行完畢
接下來改成Parallel.for的方式來進行看看
void Main()
{
var buffer = new BufferBlock<Int32>(); //數據流提供的類別
var consumer = ConsumeData(buffer);//把數據流丟到消費者做處理
Parallel.For(0, 200, i =>
{
buffer.Post(i); //生產者處理部份
Thread.Sleep(100);
});
buffer.Complete(); //生產完畢
consumer.Wait(); //這邊不處理的話,就是射後不理了
}
static async Task ConsumeData(ISourceBlock<Int32> source)
{
while (await source.OutputAvailableAsync())
{
Int32 data =source.Receive(); //取得資料
Console.WriteLine(data);
}
}
筆者執行了四次,可以看到四次的執行速度都不一定,但相對20秒來說,最快甚至不用1秒就完成是快速很多,不過為何會有速度上的差異呢?我在之前的文章有說過了,Parallel和tpl都是使用threadpool的原因,如果對相關議題有興趣,請移駕到(https://dotblogs.com.tw/kinanson/2017/04/25/083020)
最後來說明一下DataFlow帶給了我們很多預設的async方法,比如之前的範例我在生產者的部份可以使用射後不理的做法
void Main()
{
var buffer = new BufferBlock<Int32>();
var consumer = ConsumeData(buffer);
Parallel.For(0, 200, i =>
{
buffer.SendAsync(i); //改成async的方法,並完全不做處理,射後不理
Thread.Sleep(100);
});
buffer.Complete(); //生產完畢
consumer.Wait(); //這邊不處理的話,就是射後不理了
}
static async Task ConsumeData(ISourceBlock<Int32> source)
{
while (await source.OutputAvailableAsync())
{
Int32 data =await source.ReceiveAsync(); //改成async的方法
Console.WriteLine(data);
}
}
結論
這個例子如果沒有先看另一篇文章,可能很難理解筆者在做什麼,所以強烈建議先看上一篇再看這一篇,這邊的範例算是比較簡單,如果在比較複雜的案例裡面,提供給我們的async方法越多的話,就都會是我們提升效能的更多利器,如果此篇對您有幫助,請按讚分享,如果有誤的話再請多多指導。