[C#]生產者vs消費者的典型案例(使用TPL的DataFlow來加快速度)

[C#]生產者vs消費者的典型案例(使用TPL的DataFlow來加快速度)

前言

上一篇已經介紹過傳統使用TPL的方式來處理生產者消費者的需求,但是有一種更強大的數據流的方式,更適合來處理這種消費者和生產者的例子,這篇並不會解釋得很清楚什麼是生產者和消費者,如果你覺得此篇文章內容看不懂的話,請移駕到(https://dotblogs.com.tw/kinanson/2017/05/17/145521),不過DataFlow算是比較進階的用法,所以必須對async await的掌握度夠高,才比較好懂和發揮它的優勢。

安裝DataFlow

雖然DataFlow也是微軟做的package,但是卻不在原本的framework裡面,我們必須使用nuget來下載安裝

使用DataFlow來改善生產者vs消費者

首先因為數據流提供了更多async的方法供我們使用,在一個比較複雜的邏輯裡面,這些非同步的方法對我們會有很大的幫助,接下來先給一下最原先的案例完全沒優化過的程式碼,可能會是長這個樣子

void Main()
{
	Producer();
	Consumer();
}

private static List<int> data = new List<int>(); //共享資料

private static void Producer() //生產者

{
	for (int ctr = 0; ctr < 20; ctr++)
	{
		Thread.Sleep(100);
		data.Add(ctr);
	}
}

private static void Consumer() //消費者
{
	foreach (var item in data)
	{
		Console.WriteLine(item);
		Thread.Sleep(100);
	}
}

接著我們把此段改成DataFlow的方式

void Main()
{
	var buffer = new BufferBlock<Int32>(); //數據流提供的類別
	var consumer = ConsumeData(buffer);//把數據流丟到消費者做處理
	for (int i = 0; i < 20; i++)
	{
		buffer.Post(i); //生產者處理部份
		Thread.Sleep(100);
	}
	buffer.Complete(); //生產完畢
	consumer.Wait(); //這邊不處理的話,就是射後不理了
}

static async Task ConsumeData(ISourceBlock<Int32> source)
{
	while (await source.OutputAvailableAsync())
	{
		Int32 data =source.Receive(); //取得資料
		Console.WriteLine(data);
	}
}

結果就是馬上生產馬上就消費,不會等全部生產完了才做消費的概念

接下來我們再進化一下,把迴圈增加到200筆,如果以目前的方式來進行的話,100*200應該要20秒才能進行完畢

接下來改成Parallel.for的方式來進行看看

void Main()
{
	var buffer = new BufferBlock<Int32>(); //數據流提供的類別
	var consumer = ConsumeData(buffer);//把數據流丟到消費者做處理
	Parallel.For(0, 200, i =>
	{
		buffer.Post(i); //生產者處理部份
		Thread.Sleep(100);
	});
	buffer.Complete(); //生產完畢
	consumer.Wait(); //這邊不處理的話,就是射後不理了
}

static async Task ConsumeData(ISourceBlock<Int32> source)
{
	while (await source.OutputAvailableAsync())
	{
		Int32 data =source.Receive(); //取得資料
		Console.WriteLine(data);
	}
}

筆者執行了四次,可以看到四次的執行速度都不一定,但相對20秒來說,最快甚至不用1秒就完成是快速很多,不過為何會有速度上的差異呢?我在之前的文章有說過了,Parallel和tpl都是使用threadpool的原因,如果對相關議題有興趣,請移駕到(https://dotblogs.com.tw/kinanson/2017/04/25/083020)

最後來說明一下DataFlow帶給了我們很多預設的async方法,比如之前的範例我在生產者的部份可以使用射後不理的做法

void Main()
{
	var buffer = new BufferBlock<Int32>();
	var consumer = ConsumeData(buffer);
	Parallel.For(0, 200, i =>
	{
		buffer.SendAsync(i); //改成async的方法,並完全不做處理,射後不理
		Thread.Sleep(100);
	});
	buffer.Complete(); //生產完畢
	consumer.Wait(); //這邊不處理的話,就是射後不理了
}

static async Task ConsumeData(ISourceBlock<Int32> source)
{
	while (await source.OutputAvailableAsync())
	{
		Int32 data =await source.ReceiveAsync(); //改成async的方法
		Console.WriteLine(data);
	}
}

結論

這個例子如果沒有先看另一篇文章,可能很難理解筆者在做什麼,所以強烈建議先看上一篇再看這一篇,這邊的範例算是比較簡單,如果在比較複雜的案例裡面,提供給我們的async方法越多的話,就都會是我們提升效能的更多利器,如果此篇對您有幫助,請按讚分享,如果有誤的話再請多多指導。