前一篇「使用 Wolverine 實作生產者-消費者模式(Producer-Consumer Pattern)」介紹了將原本使用 Channels + BackgroundService 的生產者-消費者模式改用 Wolverine 來實作。
就以現實的問題來看,如果服務是相當頻繁地被使用時,如果 MessageCreatedEventHandler 消費者的處理速度有所延遲,那麼 Channels 裡就會堆積大量的 MessageCreatedEvent 事件,一旦服務出現問題而重新啟動或是有版本更新而需要重新部署,那麼佇列在 Channels 裡的事件就會消失不見了,這可是不得了的事情。所以這篇就來用 WolverineFx.RabbitMQ,生產者將事件發送到 RabbitMQ 裡,然後消費者再去接收存放在 RabbitMQ 裡的訊息,如此的改變就是為了避免因為服務重新啟動而讓佇列在 Channels 裡的事件消失不見。
WolverineFx.RabbitMQ
- https://www.nuget.org/packages/WolverineFx.RabbitMQ
- https://github.com/JasperFx/wolverine/tree/main/src/Transports/RabbitMQ/Wolverine.RabbitMQ
準備 RabbitMQ
之前有寫了這麼一篇「快速建立開發環境的 RabbitMQ Cluster - 使用 bat 執行檔的方式」,重新整理了那篇所使用的一些檔案內容,然後將 RabbitMQ Management 版本改為使用 4.0.2
Dockerfile
FROM rabbitmq:4.0.2-management
COPY definitions.json /etc/rabbitmq/
rabbitmq_cluster_setup.sh
#!/bin/bash
rabbitmqctl stop_app
rabbitmqctl join_cluster --ram rabbit@node1
rabbitmqctl start_app
definitions.json
{
"rabbit_version": "4.0.2",
"rabbitmq_version": "4.0.2",
"product_name": "RabbitMQ",
"product_version": "4.0.2",
"users": [
{
"name": "admin",
"password_hash": "rrTrQZDcgif07x5jHy+8mlDRil4yv+CY/VJi8UUAFzGmpgfy",
"hashing_algorithm": "rabbit_password_hashing_sha256",
"tags": [
"administrator"
],
"limits": {}
}
],
"vhosts": [
{
"name": "/",
"description": "Default virtual host",
"tags": [],
"metadata": {
"description": "Default virtual host",
"tags": []
}
}
],
"permissions": [
{
"user": "admin",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
}
],
"topic_permissions": [],
"parameters": [],
"global_parameters": [
{
"name": "internal_cluster_id",
"value": "rabbitmq-cluster-id-6n6jOh8xpgnG9JpYoDNNnA"
}
],
"policies": [],
"queues": [],
"exchanges": [],
"bindings": []
}
init_rabbitmq_cluster.bat
@echo off
chcp 65001 > nul
:: 檢查是否存在 rabbitmq:4.0.2-management 映像
docker images | findstr "^rabbitmq" | findstr "4.0.2-management" > nul
if %errorlevel% neq 0 (
echo "rabbitmq:4.0.2-management 映像不存在,正在拉取..."
docker pull rabbitmq:4.0.2-management
) else (
echo "rabbitmq:4.0.2-management 映像已存在,不需要重新拉取。"
)
:: 檢查是否存在 my-custom-rabbitmq:4.0.2-management 映像
docker images | findstr "my-custom-rabbitmq:4.0.2-management" > nul
if %errorlevel% equ 0 (
echo "my-custom-rabbitmq:4.0.2-management 映像已存在,不需要重新構建。"
) else (
:: 建立 RabbitMQ Docker 映像
echo "建立 RabbitMQ Docker 映像..."
docker build -t my-custom-rabbitmq:4.0.2-management .
)
:: 啟動 Docker 容器 1
echo "啟動 Docker 容器 1"
docker run -d --restart=always --name rabbitmq1 --hostname node1 --log-opt max-size=10m --log-opt max-file=3 -v "%CD%\data1:/var/lib/rabbitmq:z" -p "4369:4369" -p "5671:5671" -p "5672:5672" -p "15671:15671" -p "15672:15672" -p "25672:25672" -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=mypassword -e RABBITMQ_ERLANG_COOKIE=mysecretcookie -e RABBITMQ_NODENAME=rabbit my-custom-rabbitmq:4.0.2-management
:: 啟動 Docker 容器 2
echo "啟動 Docker 容器 2"
docker run -d --restart=always --name rabbitmq2 --hostname node2 --log-opt max-size=10m --log-opt max-file=3 -v "%CD%\data2:/var/lib/rabbitmq:z" -p "5673:5672" -p "15673:15672" -p "25673:25672" -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=mypassword -e RABBITMQ_ERLANG_COOKIE=mysecretcookie -e RABBITMQ_NODENAME=rabbit --link rabbitmq1:node1 my-custom-rabbitmq:4.0.2-management
:: 啟動 Docker 容器 3
echo "啟動 Docker 容器 3"
docker run -d --restart=always --name rabbitmq3 --hostname node3 --log-opt max-size=10m --log-opt max-file=3 -v "%CD%\data3:/var/lib/rabbitmq:z" -p "5674:5672" -p "15674:15672" -p "25674:25672" -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=mypassword -e RABBITMQ_ERLANG_COOKIE=mysecretcookie -e RABBITMQ_NODENAME=rabbit --link rabbitmq1:node1 --link rabbitmq2:node2 my-custom-rabbitmq:4.0.2-management
echo "所有 Docker 容器已建立"
:: 等待 RabbitMQ 服務啟動完成
echo "等待 RabbitMQ 服務啟動..."
timeout /t 30 /nobreak
:: 建立 RabbitMQ Cluster
echo "Starting to build rabbitmq cluster."
:: 在 rabbitmq2 容器内執行脚本
docker cp rabbitmq_cluster_setup.sh rabbitmq2:/rabbitmq_cluster_setup.sh
docker exec rabbitmq2 /bin/bash -c '/rabbitmq_cluster_setup.sh'
:: 在 rabbitmq3 容器内執行脚本
docker cp rabbitmq_cluster_setup.sh rabbitmq3:/rabbitmq_cluster_setup.sh
docker exec rabbitmq3 /bin/bash -c '/rabbitmq_cluster_setup.sh'
echo "完成 RabbitMQ Cluster"
:: check cluster status
echo "Check cluster status:"
docker exec rabbitmq1 /bin/bash -c "rabbitmqctl cluster_status"
docker exec rabbitmq2 /bin/bash -c "rabbitmqctl cluster_status"
docker exec rabbitmq3 /bin/bash -c "rabbitmqctl cluster_status"
:: 等待使用者按下任意鍵以關閉視窗
pause
在 Windows 環境下使用 CMD (命令提示字元) 去執行 init_rabbitmq_cluster.bat 這個檔案,就可以建立好 RabbitMQ Cluster,管理者的登入帳號和密碼都在上面。
Program.cs 設定 Wolverine
在完成 RabbitMQ Cluster 的架設後,先建立名稱為「wolverine」的 Virtual Host,並且增加使用者帳號「wolverine_user」和密碼,然後在 Program.cs 裡的 builder.Host.UseWolverine() 裡去增加 RabbitMQ 的 Connection 以及 PublishMessage 和 ListenToRabbitQueue 的設定
// 使用 Wolverine 註冊訊息處理
builder.Host.UseWolverine(opts =>
{
// 設定 RabbitMQ 連線資訊,包含 VirtualHost
opts.UseRabbitMq(connectionFactory =>
{
connectionFactory.VirtualHost = "wolverine";
connectionFactory.UserName = "wolverine_user";
connectionFactory.Password = "1q2w3e4r5t_";
connectionFactory.EndpointResolverFactory = _ => new DefaultEndpointResolver([
new AmqpTcpEndpoint("127.0.0.1", 5672),
new AmqpTcpEndpoint("127.0.0.1", 5673),
new AmqpTcpEndpoint("127.0.0.1", 5674)
]);
})
.AutoProvision();
// 發送 MessageCreatedEvent 到指定的 exchange 和 queue
opts.PublishMessage<MessageCreatedEvent>()
.ToRabbitExchange(
exchangeName: "message_created_exchange",
configure: exchange =>
{
exchange.ExchangeType = ExchangeType.Direct;
exchange.BindQueue(queueName: "message_created_queue", bindingKey: "message.created");
});
// 接收來自 RabbitMQ 的 MessageCreatedEvent
opts.ListenToRabbitQueue(queueName: "message_created_queue", configure: queue =>
{
queue.TimeToLive(1024.Minutes());
});
});
RabbitMQ 連線設定:
- 使用 UseRabbitMq 配置 ConnectionFactory,設置了 VirtualHost、UserName 和 Password。
- 使用 EndpointResolverFactory 來配置多個 RabbitMQ 節點,使得在集群中如果有節點不可用,仍然能夠自動切換到其他節點。
MessageCreatedEvent 的發送:
- 設定了 PublishMessage<MessageCreatedEvent>(),將事件發送到一個名為 "message_created_exchange" 的 exchange,並配置它為 Direct 類型。
- 使用 exchange.BindQueue 綁定了 "message_created_queue" 並設置了 bindingKey 為 "message.created",這樣確保了該事件能夠正確路由到對應的 queue。
接收 MessageCreatedEvent:
- 使用 ListenToRabbitQueue 設定了監聽 "message_created_queue"。
- 使用 TimeToLive(5.Minutes()) 設置消息的存活時間,讓未處理的消息在 queue 中的壽命為 1024 分鐘。
發送者 - MessageController
基本上跟前一篇的內容是一樣的,沒有做任何的變化
using Microsoft.AspNetCore.Mvc;
using WebApplication1.Infrastructure.Events;
using WebApplication1.Infrastructure.Models;
using Wolverine;
namespace WebApplication1.Controllers;
/// <summary>
/// class MessageController
/// </summary>
[ApiController]
[Route("api/[controller]")]
public class MessageController : ControllerBase
{
private readonly ILogger<MessageController> _logger;
private readonly TimeProvider _timeProvider;
private readonly IMessageBus _messageBus;
/// <summary>
/// Initializes a new instance of the <see cref="MessageController"/> class
/// </summary>
/// <param name="logger">The logger</param>
/// <param name="timeProvider">The time provider</param>
/// <param name="messageBus">The message bus</param>
public MessageController(ILogger<MessageController> logger, TimeProvider timeProvider, IMessageBus messageBus)
{
this._logger = logger;
this._timeProvider = timeProvider;
this._messageBus = messageBus;
}
/// <summary>
/// 建立 Message 資料
/// </summary>
/// <param name="parameter">parameter</param>
/// <returns></returns>
[HttpPost("create")]
[Produces("application/json", "text/json")]
[ProducesResponseType(200, Type = typeof(ResponseMessageOutputModel))]
[ProducesResponseType(500, Type = typeof(ResponseMessageOutputModel))]
public async Task<IActionResult> CreateAsync([FromBody] MessageCreateParameter parameter)
{
this._logger.LogInformation("{CurrentTime} Message Created - Content: {Content}",
$"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff}",
parameter.Content);
var messageEvent = new MessageCreatedEvent(Guid.NewGuid(), parameter.Content);
try
{
// 使用 Wolverine 發送事件
await this._messageBus.PublishAsync(messageEvent);
return this.Ok(new ResponseMessageOutputModel { Message = "Event produced successfully" });
}
catch (Exception ex)
{
this._logger.LogError(ex, "Failed to publish message event.");
return this.StatusCode(500, new ResponseMessageOutputModel { Message = "Failed to produce event" });
}
}
}
消費者 - MessageCreatedEventHandler
這也是和前一篇的內容是一樣的,沒有做任何的改變
using System.Text.Json;
using WebApplication1.Infrastructure.Events;
namespace WebApplication1.Infrastructure.EventHandlers;
/// <summary>
/// class MessageCreatedEventHandler
/// </summary>
/// <remarks>
/// 透過 Wolverine 的 MessageBus 接收處理 MessageCreatedEvent 事件
/// </remarks>
public class MessageCreatedEventHandler
{
private readonly ILogger<MessageCreatedEventHandler> _logger;
private readonly TimeProvider _timeProvider;
/// <summary>
/// Initializes a new instance of the <see cref="MessageCreatedEventHandler"/> class
/// </summary>
/// <param name="logger">The logger</param>
/// <param name="timeProvider">The time provider</param>
public MessageCreatedEventHandler(ILogger<MessageCreatedEventHandler> logger, TimeProvider timeProvider)
{
this._logger = logger;
this._timeProvider = timeProvider;
}
/// <summary>
/// 接收並處理 MessageCreatedEvent 事件
/// </summary>
/// <param name="messageCreatedEvent">The messageCreatedEvent</param>
public async Task HandleAsync(MessageCreatedEvent messageCreatedEvent)
{
try
{
var message = string.Concat($"{this._timeProvider.GetLocalNow().DateTime:yyyy-MM-dd HH:mm:ss.fff} ",
"Got a MessageCreatedEvent");
this._logger.LogInformation("{Message} - {MessageCreatedEvent}",
message,
JsonSerializer.Serialize(messageCreatedEvent));
// 可以在此處進行其他的異步操作
await Task.CompletedTask;
}
catch (Exception ex)
{
this._logger.LogError(ex, "An error occurred while processing the event.");
}
}
}
啟動服務並觀察
服務啟動後,在 console log 裡就可以看到相關的 RabbitMQ 設定處理的 log 紀錄
可以到 RabbitMQ Mamagement 的 Connection 裡看到有 Client 端的連線,因為我們的服務有接收端要收 message,所以就會有連線
然後透過 Swagger 或 Scalar 或使用 Postman 去執行 /message/create.POST,觀察 conolse log 就可以看到有發送並且 MessageCreatedEventHandler 有接收並處理的 log 紀錄
也可以觀察 RabbitMQ 的 Queues and Streams 內容,確定是有將 message 發送到 RabbitMQ 裡並且是有被 Consumer 接收處理的
上面就是改用 WolverineFx.RabbitMQ 將 MessageCreatedEvent 發送到 RabbitMQ,並且讓 MessageCreatedEventHander 從 RabbitMQ 接收 MessageCreatedEvent 並做處理。
以上
純粹是在寫興趣的,用寫程式、寫文章來抒解工作壓力