之前寫了一篇「使用 Channels 與 BackgroundService 實作生產者-消費者模式(Producer-Consumer Pattern)」是使用 Channels 與 BackgroundService 來實作,而這一次就來改用 Wolverine 這個套件來試試看。
有關 Wolverine 這個套件我也看了好久,一直想拿來試試看,會想要用這個套件,主要是它是一個輕量化的工具,適合用於整合訊息佇列、事件驅動設計和後台任務處理。
而且也因為適合用於整合訊息佇列、事件驅動設計,所以支援了許多第三方服務,例如:RabbitMQ, Kafaka, MQTT, AzureServiceBux, AmazonSqs 等,所以打算之後也繼續玩玩 Wolverine。
Wolverine
- https://wolverinefx.net/
- https://github.com/JasperFx/wolverine
- Introduction to the Wolverine Library in .NET - Code Maze
- Wolverine: Your Next Messaging Library – Webinar Recording | The .NET Tools Blog
- Build Messaging in .NET with Wolverine | Nick Chapsas (Youtube)
以下是 ChatGPT 所列出的優缺點和總結
Wolverine 是一個專為 .NET 生態系統設計的訊息處理與微服務架構,旨在簡化和強化應用程式內的訊息傳遞和後端處理流程。它是一個輕量的工具,適合用於整合訊息佇列、事件驅動設計和後台任務處理。以下是 Wolverine 的主要用途與優缺點。
用途
- 訊息佇列與事件驅動:Wolverine 提供強大的訊息佇列功能,可以與 RabbitMQ、Azure Service Bus 等訊息中介系統整合,並且支援基於記憶體的訊息佇列,方便在開發環境進行測試。
- 後台任務與工作者處理:透過 Wolverine,開發者可以輕鬆地定義背景任務、排程任務,並處理異步作業(例如檔案處理、通知推送)。
- 內建的事件總線:Wolverine 具有內建的事件總線功能,適合用於事件驅動設計模式,開發者可以使用事件發布-訂閱模式來處理事件和命令。
- 內建的中介軟體模式支援:它也可以用於實作 Command-Query Responsibility Segregation(CQRS)模式,讓命令與查詢的處理流程更簡單。
優點
- 簡化訊息處理邏輯:Wolverine 提供一致且簡單的 API 介面,讓開發者不需撰寫過多樣板程式碼,即可輕鬆實作複雜的訊息處理邏輯。
- 效能優化:Wolverine 強調高效能,對訊息處理進行了許多優化,如使用並行處理訊息的設計,能夠在高負載環境下保持較低延遲。
- 內建支援多種訊息中介:它支援多種訊息中介軟體(例如 RabbitMQ、Azure Service Bus),開發者可以根據需求選擇最適合的佇列技術,並且能夠輕鬆切換。
- 開發者體驗友好:擁有清晰的文件和範例(如 Wolverine 官方網站和教學資源),學習曲線較低,並且能與 ASP.NET Core 無縫整合。
缺點
- 生態系統相對較新:相比於 MassTransit、NServiceBus 等傳統訊息總線方案,Wolverine 的生態系統和社群相對較新,第三方資源和支援可能較少。
- 文件尚在發展中:儘管 Wolverine 擁有不錯的文件支援,但隨著功能的發展,部分進階應用的範例與指引可能不如一些成熟的訊息中介方案。
- 缺少 GUI 管理工具:不像 RabbitMQ 等訊息中介本身有豐富的管理介面,Wolverine 更偏向程式邏輯上的處理,開發者需要依賴其他工具來監控佇列和訊息。
適用情境
- 若你需要一個高效且簡單的訊息佇列解決方案,且不想使用過於重量級的框架,Wolverine 是一個很好的選擇。
- 適合希望在 .NET Core 環境中實作 CQRS 或者事件驅動架構的團隊。
- 如果你習慣於 RabbitMQ 並需要在新的專案中採用更現代的訊息處理方式,Wolverine 可以是值得探索的替代方案。
總結來說,Wolverine 是一個現代化、輕量且專注於效能的訊息處理框架,適合希望簡化訊息處理和後台任務的開發者使用。然而,由於其生態系統相對較新,選擇時應考量現有團隊的經驗與專案需求。
生產者實作
首先在專案裡安裝 WolverinFx
接著修改 MessageController.cs,這邊就不需要再使用 Channels 了,而是直接使用 Wolverin 所提供的 IMessageBus
using Microsoft.AspNetCore.Mvc;
using WebApplication1.Infrastructure.Events;
using WebApplication1.Infrastructure.Models;
using Wolverine;
namespace WebApplication1.Controllers;
/// <summary>
/// class MessageController
/// </summary>
[ApiController]
[Route("api/[controller]")]
public class MessageController : ControllerBase
{
private readonly ILogger<MessageController> _logger;
private readonly TimeProvider _timeProvider;
private readonly IMessageBus _messageBus;
/// <summary>
/// Initializes a new instance of the <see cref="MessageController"/> class
/// </summary>
/// <param name="logger">The logger</param>
/// <param name="timeProvider">The time provider</param>
/// <param name="messageBus">The message bus</param>
public MessageController(ILogger<MessageController> logger, TimeProvider timeProvider, IMessageBus messageBus)
{
this._logger = logger;
this._timeProvider = timeProvider;
this._messageBus = messageBus;
}
/// <summary>
/// 建立 Message 資料
/// </summary>
/// <param name="parameter">parameter</param>
/// <returns></returns>
[HttpPost("create")]
public async Task<IActionResult> CreateAsync([FromBody] MessageCreateParameter parameter)
{
this._logger.LogInformation("{CurrentTime} Message Created - Content: {Content}",
$"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff}",
parameter.Content);
var messageEvent = new MessageCreatedEvent(Guid.NewGuid(), parameter.Content);
try
{
// 使用 Wolverine 發送事件
await this._messageBus.PublishAsync(messageEvent);
return this.Ok(new { Message = "Event produced successfully" });
}
catch (Exception ex)
{
this._logger.LogError(ex, "Failed to publish message event.");
return this.StatusCode(500, new { Message = "Failed to produce event" });
}
}
}
MessageCreatedEvent.cs
namespace WebApplication1.Infrastructure.Events;
/// <summary>
/// record MessageCreatedEvent
/// </summary>
public record MessageCreatedEvent
{
/// <summary>
/// Initializes a new instance of the <see cref="MessageCreatedEvent"/> class
/// </summary>
/// <param name="id">The id</param>
/// <param name="content">The content</param>
public MessageCreatedEvent(Guid id, string content)
{
this.Id = id;
this.Content = content;
}
/// <summary>
/// Id
/// </summary>
public Guid Id { get; set; }
/// <summary>
/// Content
/// </summary>
public string Content { get; set; }
}
這裡使用 Wolverine 的 IMesssageBus 取代 ChannelWriter,並透過 PublishAsync 方法將 MessageCreatedEvent 傳送到 MessageBus 裡。
消費者實作
這邊就不需要使用 BackgroundService + ChannelReader 來接收 MessageCreatedEvent 事件,因為 Wolverine 會去處理訂閱和訊息的處理邏輯,所以開發者只需要去定義事件處理的 Handler 就可以。
以下就是 MessageCreatedEventHandler 類別的實作
using System.Text.Json;
using WebApplication1.Infrastructure.Events;
namespace WebApplication1.Infrastructure.EventHandlers;
/// <summary>
/// class MessageCreatedEventHandler
/// </summary>
/// <remarks>
/// 透過 Wolverine 的 MessageBus 接收處理 MessageCreatedEvent 事件
/// </remarks>
public class MessageCreatedEventHandler
{
private readonly ILogger<MessageCreatedEventHandler> _logger;
private readonly TimeProvider _timeProvider;
/// <summary>
/// Initializes a new instance of the <see cref="MessageCreatedEventHandler"/> class
/// </summary>
/// <param name="logger">The logger</param>
/// <param name="timeProvider">The time provider</param>
public MessageCreatedEventHandler(ILogger<MessageCreatedEventHandler> logger, TimeProvider timeProvider)
{
this._logger = logger;
this._timeProvider = timeProvider;
}
/// <summary>
/// 處理 MessageCreatedEvent 的方法
/// </summary>
/// <param name="messageCreatedEvent">The messageCreatedEvent</param>
public async Task HandleAsync(MessageCreatedEvent messageCreatedEvent)
{
try
{
var message = string.Concat($"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff} ",
"Got a MessageCreatedEvent");
this._logger.LogInformation("{Message} - {MessageCreatedEvent}",
message,
JsonSerializer.Serialize(messageCreatedEvent));
// 可以在此處進行其他的異步操作
await Task.CompletedTask;
}
catch (Exception ex)
{
this._logger.LogError(ex, "An error occurred while processing the event.");
}
}
}
Wolverine 會自動找到並執行 HandleAsync 方法,前提是要符合事件的參數簽名,即 Task HandleAsync(MessageCreatedEvent)。你可以改用其他名稱,只要方法參數匹配到,Wolverine 就能識別這是對應的事件處理 Handler 類別。
除了生產者與消費者的實作外,不要忘記還要到 Program.cs 裡去註冊使用 Wolverine,透過註冊之後,Wolverine 會去掃描或註冊事件處理 Handler。
上面的註冊設定會去掃描 Assembly 裡有使用到 IMessageBus 的事件類別以及接收處理事件類別的處理器,或者也可以直接指定掃描 Assembly
對了,有關事件類別處理器 (Handler) 裡接收事件類別的方法名稱,是有限制的,可以使用以下的名稱
有關 Message Handler Discovery 的內容,可以查看 Wolverine 的文件
執行並觀察
以下是服務剛啟動時的 conolse log 內容,這時候還沒有執行 controller 所以還沒有發送任何訊息
然後執行 api/message/create.POST 方法,一共執行了三次,以下是發送與接收的 log 紀錄
第一次執行時的發送到接收處理,這過程似乎長了點,不過後續的發送、接收處理的間隔時間都相當短
同場加映:使用 BackgroundService 做為生產者,並使用 PeroidicTimer 計時發送事件
每 60 秒鐘建立 IssueCreatedEvent 事件,並透過 Wolverine 的 MessageBus 發送。這邊的重點在於取得 IMessageBus,因為是 BackgroundService,所以要在類別建構式依賴注入 IServiceScopeFactory , 然後在 HandleAsync 方法裡使用 serviceScopeFactory 建立 scope 並取得 seviceProvider,然後再使用 serviceProvider 取得 IMessageBus
using WebApplication1.Infrastructure.Events;
using Wolverine;
namespace WebApplication1.Infrastructure.BackgroundServices;
/// <summary>
/// class IssueCreateBackgroundService
/// </summary>
/// <remarks>
/// 建立 IssueCreatedEvent 事件,並透過 Wolverine 的 MessageBus 發送
/// </remarks>
public class IssueCreateBackgroundService : BackgroundService
{
/// <summary>
/// 執行間隔時間 60 sec
/// </summary>
private static readonly TimeSpan IntervalTime = TimeSpan.FromSeconds(60);
private readonly ILogger<IssueCreateBackgroundService> _logger;
private readonly TimeProvider _timeProvider;
private readonly IServiceScopeFactory _serviceScopeFactory;
/// <summary>
/// Initializes a new instance of the <see cref="IssueCreateBackgroundService"/> class
/// </summary>
/// <param name="logger">The logger</param>
/// <param name="timeProvider">The time provider</param>
/// <param name="serviceScopeFactory">The service scope factory</param>
public IssueCreateBackgroundService(ILogger<IssueCreateBackgroundService> logger,
TimeProvider timeProvider,
IServiceScopeFactory serviceScopeFactory)
{
this._logger = logger;
this._timeProvider = timeProvider;
this._serviceScopeFactory = serviceScopeFactory;
}
/// <summary>
/// Executes the stopping token
/// </summary>
/// <param name="stoppingToken">The stopping token</param>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var scope = this._serviceScopeFactory.CreateScope();
var serviceProvider = scope.ServiceProvider;
var messageBus = serviceProvider.GetRequiredService<IMessageBus>();
this._logger.LogInformation(
"{DateTimeNow} {TypeName} is starting.",
$"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
this.GetType().Name);
// 使用 PeriodicTimer
using PeriodicTimer periodicTimer = new(IntervalTime);
while (await periodicTimer.WaitForNextTickAsync(stoppingToken) && stoppingToken.IsCancellationRequested is false)
{
await messageBus.SendAsync(new IssueCreatedEvent
{
OriginatorId = Guid.NewGuid(),
Title = $"Issue Created at {this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
Description = $"Issue Created at {this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}"
});
this._logger.LogInformation(
"{DateTimeNow} {TypeName} Processing.",
$"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
this.GetType().Name);
}
}
/// <summary>
/// Stops the cancellation token
/// </summary>
/// <param name="cancellationToken">The cancellation token</param>
public override async Task StopAsync(CancellationToken cancellationToken)
{
this._logger.LogInformation(
"{DateTimeNow} {TypeName} is stopping.",
$"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
this.GetType().Name);
await base.StopAsync(cancellationToken);
}
}
建立 IssueCreatedEventHandler 類別,透過 Wolverine 的 MessageBus 接收處理 IssueCreatedEvent 事件
using System.Text.Json;
using WebApplication1.Infrastructure.Events;
namespace WebApplication1.Infrastructure.EventHandlers;
/// <summary>
/// class IssueCreatedEventHandler
/// </summary>
/// <remarks>
/// 透過 Wolverine 的 MessageBus 接收處理 IssueCreatedEvent 事件
/// </remarks>
public class IssueCreatedEventHandler
{
private readonly ILogger<IssueCreatedEventHandler> _logger;
private readonly TimeProvider _timeProvider;
/// <summary>
/// Initializes a new instance of the <see cref="IssueCreatedEventHandler"/> class
/// </summary>
/// <param name="logger">The logger</param>
/// <param name="timeProvider">The time provider</param>
public IssueCreatedEventHandler(ILogger<IssueCreatedEventHandler> logger,
TimeProvider timeProvider)
{
this._logger = logger;
this._timeProvider = timeProvider;
}
/// <summary>
/// 接收並處理 IssueCreatedEvent 事件
/// </summary>
/// <param name="issueCreatedEvent">The issueCreatedEvent</param>
public async Task HandleAsync(IssueCreatedEvent issueCreatedEvent)
{
try
{
var message = string.Concat($"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff} ",
"Got a IssueCreatedEvent");
this._logger.LogInformation("{Message} - {IssueCreatedEvent}",
message,
JsonSerializer.Serialize(issueCreatedEvent));
// 可以在此處進行其他的異步操作
await Task.CompletedTask;
}
catch (Exception ex)
{
this._logger.LogError(ex, "An error occurred while processing the event.");
}
}
}
最後別忘了要註冊 BackgroundService
執行並觀察
以上就是分別在兩種應用情境裡使用 Wolverine 實作生產者-消費者模式(Producer-Consumer Pattern)的簡單練習。
純粹是在寫興趣的,用寫程式、寫文章來抒解工作壓力