MassTransit 的架構是一個基於事件驅動和 Message 傳遞的分佈式系統架構,主要是要解偶服務之間的通訊。它使用 Message Broker(如 RabbitMQ、ActiveMQ、Kafaka、Azure Service Bus、Amazon SQS 等)來傳遞不同服務之間的訊息,它大大的簡化事件驅動的開發門檻
MassTransit 與 RabbitMQ 架構圖
下圖出自:MassTransit 知多少 | .NET 分布式应用框架 - 「圣杰」 - 博客园 (cnblogs.com)
消息流動過程
Producer,發布消息:
- 生產者向 RabbitMQ 發布一個消息(如事件或命令)。
- 通過 IPublishEndpoint.Publish:將事件廣播給所有訂閱了該事件類型的消費者。
- 通過 ISendEndpoint.Send:將訊息發送到指定的佇列,通常用於點對點模式,確保只有一個消費者處理該訊息。
Message Broker (RabbitMQ):
- 消息到達 RabbitMQ 的 Exchange,Exchange 根據配置的路由規則將消息轉發到一個或多個 Queue。這些規則可以根據消息類型、路由鍵等來進行配置。
- Transport:跟 Message Boker 通訊的角色,負責發送和接收消息。
- 支多個 Message Boker 例如:RabbitMQ、Azure Service Bus 或 Kafka。
Consumer,處理消息:
- 消費者從 RabbitMQ 的 Queue 中取出消息,並通過實作 IConsumer<T> 的方式來處理收到的消息。
- IConsumer<T>:處理從 Queue 接收到的事件或命令的消費者。
- IReceiveEndpoint:接收端點,從 Transport 接收訊息反序列化後傳給消費者。
Message:消息合约,定義消息生產者和消息消費者之間的內容規範。
開發環境
- Windows 11 Home
- Rider 2024.2.6
- .NET 8
- rabbitmq:3-management
建立 RabbitMQ
docker-compose 內容如下
version: '3.8'
services:
rabbitmq:
container_name: rabbitmq.3
image: "rabbitmq:3-management"
ports:
- "5672:5672" # RabbitMQ 主要連接埠
- "15672:15672" # 管理介面連接埠
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
GUI 頁面如下所示
快速開始
建立一個 .NET 8 的 Web API 專案,
安裝套件
dotnet add package MassTransit.RabbitMQ --version 8.2.5
dotnet add package Microsoft.Extensions.Hosting --version 8.0.1
生產者發佈訊息
新增一個端點,使用 IPublishEndpoint.Publish 發佈事件
public class OrderCreated
{
public Guid OrderId { get; set; }
public DateTime CreatedAt { get; set; }
public decimal TotalAmount { get; set; }
}
public class CreateOrderRequest
{
public decimal TotalAmount { get; set; }
}
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly IPublishEndpoint _publishEndpoint;
public OrdersController(IPublishEndpoint publishEndpoint)
{
this._publishEndpoint = publishEndpoint;
}
[HttpPost]
public async Task<ActionResult> CreateOrder([FromBody] CreateOrderRequest request)
{
if (request == null)
{
return this.BadRequest("Invalid order data");
}
var orderCreatedEvent = new OrderCreated
{
OrderId = Guid.NewGuid(),
CreatedAt = DateTime.UtcNow,
TotalAmount = request.TotalAmount
};
// 生產者,發布 OrderCreated 事件
await this._publishEndpoint.Publish(orderCreatedEvent);
return this.Ok($"Order created with ID: {orderCreatedEvent.OrderId}");
}
}
消費者處理訊息
消費者實作 IConsumer<OrderCreated>,並用 ConsumeContext<OrderCreated> 處理訊息
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
public Task Consume(ConsumeContext<OrderCreated> context)
{
Console.WriteLine($"Order created: {context.Message.OrderId}, Total Amount: {context.Message.TotalAmount}");
return Task.CompletedTask;
}
}
經中斷得知 ConsumeContext 內容如下
在 DI Container 配置 MassTransit 的 Message Boker
builder.Services.AddMassTransit(x =>
{
// 註冊消費者
x.AddConsumer<OrderCreatedConsumer>();
// 配置 MassTransit 使用 RabbitMQ
x.UsingRabbitMq((context, config) =>
{
config.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
// 註冊消費者
config.ReceiveEndpoint("order-created-event", e =>
{
e.ConfigureConsumer<OrderCreatedConsumer>(context);
});
});
});
builder.Services.AddControllers();
完成之後應該就可以,試著尻 API
curl -X 'POST' `
'https://localhost:7293/api/Orders' `
-H 'Content-Type: application/json' `
-d '{
"totalAmount": 2
}'
結果如下
PS C:\Users\yao> curl -X 'POST' `
>> 'https://localhost:7293/api/Orders' `
>> -H 'Content-Type: application/json' `
>> -d '{
>> "totalAmount": 2
>> }'
Order created with ID: edc992db-3e78-4d79-8d17-5ac42f46a758
生產者與消費者分開
上一個簡單的例子是把生產者跟消費者放在同一個專案,沒意外的話,真實的案場,兩者應該都是分開的
生產者(Producer)
ISendEndpoint:提供了 Send 方法,用於發送命令,由特定的消費者接收。
IPublishEndpoint:提供了 Publish 方法,用事件廣播,有訂閱的消費者都可以接收。
發送命令有幾種寫法
透過 DI Container 註冊後,建構子開一個洞讓它依賴以下
- IBus
- ISendEndpointProvider
ISendEndpointProvider
public class MessageSenderService : IHostedService
{
private readonly ISendEndpointProvider _sendEndpointProvider;
public MessageSenderService(ISendEndpointProvider sendEndpointProvider)
{
this._sendEndpointProvider = sendEndpointProvider;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
// 取得 SendEndpoint 並發送訊息到指定佇列
// var address = new Uri("queue:order-submitted-queue");
var address = new Uri("rabbitmq://localhost/order-submitted-queue");
var sendEndpoint =
await this._sendEndpointProvider.GetSendEndpoint(address);
var message = new OrderSubmitted
{
OrderId = Guid.NewGuid(),
Timestamp = DateTime.UtcNow
};
await sendEndpoint.Send(message, cancellationToken);
Console.WriteLine("OrderSubmitted event sent to order-submitted-queue.");
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
IBus
public class MessageSenderService2 : IHostedService
{
private readonly IBus _bus;
public MessageSenderService2(IBus bus)
{
this._bus = bus;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
// 使用 Publish 發佈事件,所有訂閱者都能接收此事件
var orderSubmitted = new OrderSubmitted
{
OrderId = Guid.NewGuid(),
Timestamp = DateTime.UtcNow
};
// ===Publish===
// await this._bus.Publish(orderSubmitted, cancellationToken);
// Console.WriteLine("OrderSubmitted event published.");
// ===Send===
// EndpointConvention.Map<OrderSubmitted>(new Uri("queue:order-submitted-queue"));
EndpointConvention.Map<OrderSubmitted>(new Uri("rabbitmq://localhost/order-submitted-queue"));
await this._bus.Send(orderSubmitted, cancellationToken);
Console.WriteLine("OrderSubmitted event sent.");
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
IBusControl
IBusControl 實作 IBus,除了可以透過 DI Container 之外,也可以自行建立執行個體
public class MessagePublishService4 : IHostedService
{
public async Task StartAsync(CancellationToken cancellationToken)
{
var busControl = Bus.Factory.CreateUsingRabbitMq(config =>
{
config.Host("rabbitmq://localhost", h =>
{
h.Username("guest");
h.Password("guest");
});
});
await busControl.StartAsync(cancellationToken);
var orderSubmitted = new OrderSubmitted
{
OrderId = Guid.NewGuid(),
Timestamp = DateTime.UtcNow
};
// 發佈事件
await busControl.Publish(new OrderSubmitted
{
OrderId = Guid.NewGuid(),
Timestamp = DateTime.UtcNow
}, cancellationToken);
// ===Publish===
// 使用 Publish 發佈事件,所有訂閱者都能接收此事件
await busControl.Publish(orderSubmitted, cancellationToken);
Console.WriteLine("OrderSubmitted event published.");
// ===Send===
EndpointConvention.Map<OrderSubmitted>(new Uri("rabbitmq://localhost/order-submitted-queue"));
await busControl.Send(orderSubmitted, cancellationToken);
Console.WriteLine("OrderSubmitted event sent.");
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
ConsumeContext
或者是在消費者端再次發送訊息,ConsumeContext 可以 Send 也可以 Publish
public class OrderSubmittedConsumer : IConsumer<OrderSubmitted>
{
public async Task Consume(ConsumeContext<OrderSubmitted> context)
{
var destinationAddress = new Uri("rabbitmq://localhost/order-submitted-queue");
var command = new OrderSubmitted()
{
OrderId = context.Message.OrderId,
Timestamp = context.Message.Timestamp
};
await context.Send(destinationAddress, command);
// var endpoint = await context.GetSendEndpoint(destinationAddress);
// await endpoint.Send(command);
Console.WriteLine($"Order received: {context.Message.OrderId} at {context.Message.Timestamp}");
}
}
注意:上面這個例子,會造成無窮迴圈
消費者(Consumer)
Consumer,消費者用來消化消息,消費者可以訂閱某一個生產者的命令 quere,也可以訂閱生產者的廣播訊息,在 Hosting 配置好接收路徑,以及收到訊息要做甚麼事,基本上就設定好了。
public static async Task Main(string[] args)
{
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.AddConsumer<OrderSubmittedConsumer>();
x.UsingRabbitMq((context, config) =>
{
config.Host("rabbitmq://localhost", h =>
{
h.Username("guest");
h.Password("guest");
});
// 設置接收端點,並消費 `OrderSubmitted`
config.ReceiveEndpoint("order-submitted-queue", endpoint =>
{
endpoint.ConfigureConsumer<OrderSubmittedConsumer>(context);
});
});
});
services.AddMassTransitHostedService();
})
.Build();
try
{
Console.WriteLine("Listening for OrderSubmitted events...");
await host.RunAsync();
}
finally
{
await host.StopAsync();
}
}
public class OrderSubmittedConsumer : IConsumer<OrderSubmitted>
{
public async Task Consume(ConsumeContext<OrderSubmitted> context)
{
Console.WriteLine($"Order received: {context.Message.OrderId} at {context.Message.Timestamp}");
}
}
案例位置
若有謬誤,煩請告知,新手發帖請多包涵
Microsoft MVP Award 2010~2017 C# 第四季
Microsoft MVP Award 2018~2022 .NET