過去的工作專案為了要加快 Request 的回應速度,所以就想方設法地去做了很多的調整…
例如一個訊息建立後要做很多的事情,像是要許多相關資料 Table 的資料異動、透過 RabbitMQ 去發送 message 通知其他 Client、快取資料的更新等等等
這麼多的處理如果都是在一個 Request 裡全部處理完成後才回傳 Response 結果,因為每個處理都會花費一些時間,全部累積起來就相當可觀,於是就會想到是不是要往平行處理或多執行緒的方式來解決,但是在繁忙的網站服務去使用這些解決方案又有很多風險,程式沒有寫好的話就會出現嚴重錯誤。
所以就想到用發送事件的方式,當訊息建立完成後,就發送一個事件到一個佇列裡,然後有一個 BackgroundService 去專門接收訊息建立完成事件,當收到事件後就開始一連串的處理,這麼一來 Action 方法的回應時間就能夠加快一些。
生產者-消費者模式(Producer-Consumer Pattern)
- 定義:這是一種通過緩衝區(buffer)在生產者(Producer)和消費者(Consumer)之間解耦的設計模式。生產者生成資料並將其放入緩衝區,消費者從緩衝區中取出資料進行處理。生產者和消費者之間並不直接交互,它們各自只關注自己的工作。
- 使用情境:當生產資料和消費資料的速度不一致時,此模式可以讓生產者和消費者彼此不互相阻塞,通過緩衝區平衡兩者的速度差異。
- 實現方式:C# 中可以使用 BlockingCollection<T> 或 Channel<T> 來實現這種模式。
生產者-消費者模式適合用於內部任務排程和工作佇列。
Channels (通道)
相關連結
- 通道 - .NET | Microsoft Learn (了解 System.Threading.Channels 中適用於使用 .NET 產生者和取用者的官方同步資料結構。)
- C# Channel 简单实现消息队列的发布、订阅 - 天才卧龙 - 博客园
- 如何在 C# 中使用 Channels - 小菜农啊 - 博客园
- C# Channels — Efficient and secure communication between threads and tasks. | by Ben Witt | .Net Programming | Medium
- 善用Channels實現生產者/消費者模式. | by RiCo 技術農場 | RiCosNote | Medium
實作案例
ChannelServiceCollectionExtensions
將 Channel、Writer、Reader 都單獨註冊成服務,並且設置了 SingleReader = true
,這樣可以優化讀取性能,適合只有一個消費者進行資料處理的場景。
在設置裡使用了Channel.CreateUnbounded<T>
表示沒有固定的緩衝大小,可以持續寫入資料而不會被阻塞(除非記憶體不足)。
使用情境:適合於對生產速度和消費速度差異較大的場景,因為它不會因為緩衝區滿了而阻塞寫入者。但這也表示生產者生成的速度如果遠高於消費者的處理速度,可能會導致記憶體使用過多。
using System.Threading.Channels;
using WebApplication1.Infrastructure.Events;
namespace WebApplication1.Infrastructure.ServiceCollectionExtensions;
/// <summary>
/// class ChannelServiceCollectionExtensions
/// </summary>
public static class ChannelServiceCollectionExtensions
{
/// <summary>
/// Add the Channels
/// </summary>
/// <param name="services">The services</param>
/// <returns></returns>
public static IServiceCollection AddChannels(this IServiceCollection services)
{
services.AddSingleton(Channel.CreateUnbounded<MessageCreatedEvent>(new UnboundedChannelOptions { SingleReader = true }));
services.AddSingleton(x => x.GetRequiredService<Channel<MessageCreatedEvent>>().Writer);
services.AddSingleton(x => x.GetRequiredService<Channel<MessageCreatedEvent>>().Reader);
return services;
}
}
UnboundedChannelOptions
因為使用到 UnboundedChannelOptions 這個類別,所以特別說明。
UnboundedChannelOptions 是用來設定 Channel.CreateUnbounded<T> 方法時的選項。這些選項會影響 Channel 的行為,包括如何處理資料的寫入和讀取。有幾個重要的屬性可以設定:
SingleWriter(bool)
- 用途:指示是否保證只有一個寫入者(Producer)。
- 預設值:false(允許多個寫入者)
- 說明:如果設置為 true,則 Channel 會假設只有一個生產者進行寫入操作,這樣可以進行一些內部優化,以提高效能。當你確定只有一個寫入者時,可以將其設為 true。
SingleReader(bool)
- 用途:指示是否保證只有一個讀取者(Consumer)。
- 預設值:false(允許多個讀取者)
- 說明:如果設置為 true,則 Channel 會假設只有一個消費者進行讀取操作。這可以在內部優化某些併發控制,以提升讀取效率。如果你確定只有一個後台服務處理這些訊息,則應將其設為 true。
AllowSynchronousContinuations(bool)
- 用途:允許同步執行的延續操作(continuations)。
- 預設值:false
- 說明:如果設為 true,當某些操作(如 WriteAsync 或 ReadAsync)完成後,後續的回調操作可能會在同一個執行緒中執行。這可以在某些情況下降低上下文切換的成本,但也可能導致執行緒被佔用過久。因此,僅在有需要時才設置為 true。
MessageController (生產者)
在 MessageController 注入 ChannelWriter 以寫入事件, 然後 CreateAsync 方法負責接收 API 請求並將 MessageCreatedEvent 寫入 Channel。
使用 TryWrite 確保當 Channel 滿了或不可寫入時能夠快速回應 503 錯誤,避免阻塞 API 的請求,並對 Channel 寫入失敗的情況進行處理。
TryWrite 成功時直接返回 OK,失敗時返回 503 狀態碼。
using System.Threading.Channels;
using Microsoft.AspNetCore.Mvc;
using WebApplication1.Infrastructure.Events;
using WebApplication1.Infrastructure.Models;
namespace WebApplication1.Controllers;
/// <summary>
/// class MessageController
/// </summary>
[ApiController]
[Route("api/[controller]")]
public class MessageController : ControllerBase
{
private readonly ILogger<MessageController> _logger;
private readonly TimeProvider _timeProvider;
private readonly ChannelWriter<MessageCreatedEvent> _messageCreatedEventChannelWriter;
/// <summary>
/// Initializes a new instance of the <see cref="MessageController"/> class
/// </summary>
/// <param name="logger">The logger</param>
/// <param name="timeProvider">The timeProvider</param>
/// <param name="messageCreatedEventChannelWriter">The messageCreatedEventChannelWriter</param>
public MessageController(ILogger<MessageController> logger,
TimeProvider timeProvider,
ChannelWriter<MessageCreatedEvent> messageCreatedEventChannelWriter)
{
this._logger = logger;
this._timeProvider = timeProvider;
this._messageCreatedEventChannelWriter = messageCreatedEventChannelWriter;
}
/// <summary>
/// 建立 Message 資料
/// </summary>
/// <param name="parameter">parameter</param>
/// <returns></returns>
[HttpPost("create")]
public async Task<IActionResult> CreateAsync([FromBody] MessageCreateParameter parameter)
{
this._logger.LogInformation("{CurrentTime} Message Created - Content: {Content}",
$"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff}",
parameter.Content);
var messageEvent = new MessageCreatedEvent(Guid.NewGuid(), parameter.Content);
try
{
// 先嘗試使用 TryWrite 將事件寫入 Channel
if (this._messageCreatedEventChannelWriter.TryWrite(messageEvent))
{
// 如果 TryWrite 成功,直接返回 OK,無需再次執行 WriteAsync
return this.Ok(new { Message = "Event produced successfully" });
}
// 如果 TryWrite 失敗,記錄警告並返回 503 狀態碼
this._logger.LogWarning("Channel is full or completed, unable to write message event.");
return this.StatusCode(503, new { Message = "Service Unavailable. Try again later." });
}
catch (Exception ex)
{
this._logger.LogError(ex, "Failed to write message event to the channel.");
return this.StatusCode(500, new { Message = "Failed to produce event" });
}
}
}
MessageCreatedEvent 類別
namespace WebApplication1.Infrastructure.Events;
/// <summary>
/// class MessageCreatedEvent
/// </summary>
public class MessageCreatedEvent
{
/// <summary>
/// Initializes a new instance of the <see cref="MessageCreatedEvent"/> class
/// </summary>
/// <param name="id">The id</param>
/// <param name="content">The content</param>
public MessageCreatedEvent(Guid id, string content)
{
this.Id = id;
this.Content = content;
}
/// <summary>
/// Id
/// </summary>
public Guid Id { get; set; }
/// <summary>
/// Content
/// </summary>
public string Content { get; set; }
}
MessageCreatedEventConsumerBackgroundService (消費者)
這個 BackgroundService 類別負責持續監聽 Channel,使用 ChannelReader 讀取生產者寫入的事件。 透過 ReadAllAsync 方法逐一處理 MessageCreatedEvent 事件,實現非同步的消息處理,當事件到達時即時進行處理並記錄日誌,確保了消息的消費過程不會中斷。
using System.Text.Json;
using System.Threading.Channels;
using WebApplication1.Infrastructure.Events;
namespace WebApplication1.Infrastructure.BackgroundServices;
/// <summary>
/// class MessageCreatedEventConsumerBackgroundService
/// </summary>
public class MessageCreatedEventConsumerBackgroundService : BackgroundService
{
private readonly ILogger<MessageCreatedEventConsumerBackgroundService> _logger;
private readonly TimeProvider _timeProvider;
private readonly ChannelReader<MessageCreatedEvent> _channelReader;
/// <summary>
/// Initializes a new instance of the <see cref="MessageCreatedEventConsumerBackgroundService"/> class
/// </summary>
/// <param name="logger">The logger</param>
/// <param name="timeProvider">The time provider</param>
/// <param name="channelReader">The channel reader</param>
public MessageCreatedEventConsumerBackgroundService(ILogger<MessageCreatedEventConsumerBackgroundService> logger,
TimeProvider timeProvider,
ChannelReader<MessageCreatedEvent> channelReader)
{
this._logger = logger;
this._timeProvider = timeProvider;
this._channelReader = channelReader;
}
/// <summary>
/// Executes the stopping token
/// </summary>
/// <param name="stoppingToken">The stopping token</param>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
this._logger.LogInformation(
"{DateTimeNow} {TypeName} is starting.",
$"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
this.GetType().Name);
// 從 ChannelReader 讀取事件
await foreach (var messageCreatedEvent in this._channelReader.ReadAllAsync(stoppingToken))
{
try
{
// 處理讀取到的事件
var message = string.Concat($"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff} ",
"Got a MessageCreatedEvent");
this._logger.LogInformation("{Message} - {MessageCreatedEvent}",
message,
JsonSerializer.Serialize(messageCreatedEvent));
}
catch (Exception ex)
{
this._logger.LogError(ex, "An error occurred while processing the event.");
}
}
}
/// <summary>
/// Stops the cancellation token
/// </summary>
/// <param name="cancellationToken">The cancellation token</param>
public override async Task StopAsync(CancellationToken cancellationToken)
{
this._logger.LogInformation(
"{DateTimeNow} {TypeName} is stopping.",
$"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss}",
this.GetType().Name);
await base.StopAsync(cancellationToken);
}
}
最後要記得在 Program.cs 裡去註冊服務
觀察執行狀況
執行服務並觀察 Console Log 的內容
最後
在這個應用中,我們利用 C# 中的 Channels 在 ASP.NET Core Web API 裡實作了一個生產者-消費者模式。
P.S. 以下是使用 ChatGPT 幫我做個總結
這個基於 Channels 的生產者-消費者模式實現,利用了 ASP.NET Core 的依賴注入、非同步操作,以及 Channel 提供的高效線程安全通信能力:
- 高效通信:Channel 提供了高效且線程安全的生產者-消費者模式,可以在多線程環境下輕鬆傳遞消息而不需要顧及複雜的鎖定機制。
- 擴展性強:將 Channel 的註冊與使用封裝到擴展方法和 BackgroundService 中,使得此模式可以方便地複用和擴展到其他類型的事件處理。
- 簡化異步處理:使用 Channel 和 BackgroundService 的組合,使得消息處理的邏輯簡單且易於理解,並且能夠高效地處理大量請求。
這樣的設計不僅適合需要高效處理事件的場景,如佇列處理、數據流等,也能輕鬆適應多種不同的應用需求。對於希望在 ASP.NET Core 中構建高效、可擴展的應用程序的開發者而言,Channel 提供了一個非常有力的工具。
純粹是在寫興趣的,用寫程式、寫文章來抒解工作壓力