我之前在開發Consumer API時,對於Message的接收是使用callback function+event,code review時,
同事建議為什麼不用RX來簡化code,並更簡單處理非同步error handle和multiple threads的concurrency問題。
沒想到MSDN有一系列的RX文章,看來我真的太Lag了,下面我來紀錄一下自己使用Rx非同步開發心得。
RX(Reactive programming)是一個基於觀察者的非同步程式設計模型,
我覺得它讓我更方便使用非同步來處理資料和error handle,
同時省下我開發去預防multiple threads的concurrency問題,
撰寫風格上就是Observables + LINQ+Schedulers,
它讓我更關注business logic並使用message set來操作資料(彈性更高),
而不用擔心底層safe thread、thread deadlock和IO blocking…等問題,
而我第一版的code大多在關心非同步threads問題(blocking or deadlock),
但使用Rx改寫後真的簡化不少,也不用擔心concurrency 問題,
下面簡單來驗證一下Rx的核心,非同步和觀察者模式。
先安裝RX assembly如下圖
沒有使用scheduler(Rx預設為單一執行緒)
static void SequenceBehavior()
{
Console.WriteLine($"Starting on threadId:{ Thread.CurrentThread.ManagedThreadId}");
IObservable<int> sourceObservable = Observable.Range(1, 10);
IObserver<int> observer = Observer.Create<int>(
x => Console.WriteLine($"OnNext: {x} on threadId:{Thread.CurrentThread.ManagedThreadId}"),
ex => Console.WriteLine($"OnError: {ex.Message}"),
() => Console.WriteLine($"OnCompleted on threadId:{Thread.CurrentThread.ManagedThreadId}"));
IDisposable subscription = sourceObservable.Subscribe(observer);
Console.WriteLine($"Subscribed on threadId:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
我們可以看到,當我沒有使用scheduler就沒有非同步執行,
所有的操作都在同一條執行緒且依序處理(Scheduler算是為了非同步而生)。
現在來看看怎麼達成非同步(因為我絕對不想封鎖主執行緒)
static void SchedulerBehavior()
{
Console.WriteLine($"Starting on threadId:{ Thread.CurrentThread.ManagedThreadId}");
IObservable<int> sourceObservable = Observable.Range(1, 10);
IObserver<int> observer = Observer.Create<int>(
x => Console.WriteLine($"OnNext: {x} on threadId:{Thread.CurrentThread.ManagedThreadId}"),
ex => Console.WriteLine($"OnError: {ex.Message}"),
() => Console.WriteLine($"OnCompleted on threadId:{Thread.CurrentThread.ManagedThreadId}"));
IDisposable subscription = sourceObservable
.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(observer);
Console.WriteLine($"Subscribed on threadId:{Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
可以看到我只加了SubscribeOn(NewThreadScheduler.Default)一行,
我的AP就變成非同步設計了,Rx真的好威。
參考
Implementing the Observer Pattern using Rx.
Reactive Extensions (Rx) – Part 1 – Replacing C# Events
Rx - can/should I replace .NET events with Observables?