.net4.0引入了IObservable(publisher)和IObserver(Subscriber)兩個介面,
並提供實作publish subscribe pattern標準範本,這樣的模式非常適合分散式推入型通知,
而且每個元件職責相當明確,這篇紀錄如何使用這兩個介面。
Real-World中,我們可能有需要監控資料或狀態變更時,後需要處理的商業邏輯需求,
.net本身所提供的event和delegate也是在處理相同事情,
列如textchanged event,監控textbox中文字改變後,
開發人員後續要處理的邏輯可能會包在如Ontextchanged method中,
這篇文章我參考MSDN sample code,透過IOservable和IOserver這兩個介面,
簡單實現訊息推送訂閱服務。
Publisher職責
提供訂閱者Topic的訂閱和取消
推送訊息到Topic並通知所有訂閱者(可以推送到不同的Topic)
public class Publisher<T> : IObservable<T>
{
private List<IObserver<T>> _observers;
private readonly string _name;
public Publisher(string name)
{
_observers = new List<IObserver<T>>();
this._name = name;
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (!_observers.Contains(observer))
_observers.Add(observer);
return new UnSubscribe(this._name, _observers, observer);
}
protected void Notification(T obj)
{
_observers.ForEach(o => o.OnNext(obj));
}
private class UnSubscribe : IDisposable
{
private readonly List<IObserver<T>> _observers;
private readonly IObserver<T> _observer;
private readonly string _name;
public UnSubscribe(string name, List<IObserver<T>> observers, IObserver<T> observer)
{
this._observers = observers;
this._observer = observer;
this._name = name;
Console.WriteLine($"{_name} Subscribed");
}
public void Dispose()
{
if (_observer != null && _observers.Contains(_observer))
{
_observers.Remove(_observer);
Console.WriteLine($"{_name} UnSubscribed");
}
}
}
}
Subscriber職責
接收不同Topic的訊息後(可以by partition也可以broadcast),
進行相對應商業邏輯處理(我這裡就簡單顯示Message)
public class Subscriber : IObserver<ManagerPublisher>
{
private readonly string _name;
private readonly int _partition;
public Subscriber(string name, int parition)
{
this._name = name;
this._partition = parition;
}
void IObserver<ManagerPublisher>.OnCompleted()
{
Console.WriteLine("Push Completed");
}
void IObserver<ManagerPublisher>.OnError(Exception error)
{
Console.WriteLine($"Error:{error.Message}");
}
void IObserver<ManagerPublisher>.OnNext(ManagerPublisher value)
{
if (ReferenceEquals(value, null))
return;
try
{
var message = value.NewMessage;
if (this._partition <= 0 || message.Partition == this._partition)
Console.WriteLine(
$"Publisher:{value._name}, Subscriber:{this._name}, Topic:{message.Topic}, Partition:{message.Partition}, Value:{message.Value}, Timestamp:{message.Timestamp}");
}
catch (Exception e)
{
Console.WriteLine($"Error onNext:${e.Message}");
throw;
}
}
}
Message Model
public class Message
{
public string Topic { get; set; }
public string Value { get; set; }
public int Partition { get; set; }
public DateTime Timestamp { get; set; }
}
Client
ManagerPublisher mp1 = new ManagerPublisher("Logger");
ManagerPublisher mp2 = new ManagerPublisher("Bonus");
Subscriber s1 = new Subscriber("Sherry", 0);
Subscriber s2 = new Subscriber("FiFi", 1);
Subscriber s3 = new Subscriber("RicoIsme", 1);
Subscriber s4 = new Subscriber("Sherry", 2);
var removeMe1 = mp1.Subscribe(s1);
var removeMe2 = mp1.Subscribe(s2);
var removeMe3 = mp2.Subscribe(s3);
var removeMe4 = mp2.Subscribe(s4);
mp1.NewMessage = new Message { Topic = "ricoLog", Partition = 0, Value = "Log1", Timestamp = DateTime.UtcNow };
mp1.NewMessage = new Message { Topic = "ricoLog", Partition = 1, Value = "Log2", Timestamp = DateTime.UtcNow };
mp1.NewMessage = new Message { Topic = "ricoLog", Partition = 2, Value = "Log3", Timestamp = DateTime.UtcNow };
mp2.NewMessage = new Message { Topic = "ricoBonus", Partition = 1, Value = "Bonus1", Timestamp = DateTime.UtcNow };
mp2.NewMessage = new Message { Topic = "ricoBonus", Partition = 2, Value = "Bonus2", Timestamp = DateTime.UtcNow };
// Cleanup
removeMe1.Dispose();
removeMe2.Dispose();
removeMe3.Dispose();
removeMe4.Dispose();
Console.ReadLine();
結果
Sherry訂閱了ricoLog(broadcast)和ricoBonus(partition=2)兩個Topic
FiFi只訂閱了ricoLog且partition=1
Ricoisme只訂閱了ricoBonus且partition=1
透過這兩個標準介面,實現publish subscribe pattern,
讓我覺得程式可讀性提高不少,且開發上也更直覺。
參考
Publisher/Subscriber pattern with Event/Delegate and EventAggregator