我們對於 Message Queue 的既定印象就是先進先出,先發送的訊息就先傳遞出去,這兩天收到一個需求是希望通知能夠在指定的時間傳遞出去,以往這類型的需求我們會跳過 Message Queue,改用定期去看資料庫或是某個資料夾,如果有資料或檔案內容的指定時間符合當下的時間,我們就發送通知,但我現在想讓 Message Queue 處理延後傳遞的訊息。
我用的 Mesaage Queue 系統是 RabbitMQ,版本是 3.6.11,安裝在 Windows Server 2016 作業系統上,RabbitMQ 有一個 Plugin 叫 rabbitmq_delayed_message_exchange
,我們可以利用這個 Plugin 來發送延後傳遞的訊息。
安裝
我們從 Community Plugins — RabbitMQ 找到 rabbitmq_delayed_message_exchange 並且把它下載回來,要注意的是 RabbitMQ 的版本至少要 3.6 以上,下載之後就解壓縮到 RabbitMQ 安裝目錄下的 plugins
資料夾
打開命令提示字元,切換到 RabbitMQ 安裝目錄底下的 sbin
資料夾,執行下列指令來啟動 rabbitmq_delayed_message_exchange Plugin。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
用管理介面來綁定 Queue 到 x-delayed-message Exchange
如果我們有啟動 rabbitmq_management Plugin,那麼我們可以選擇從管理介面來新增 x-delayed-message
Exchange,並且將 Queue 綁定到 Exchange 上。
新增 x-delayed-message Exchange
Exchange 的 Name
就自訂一個自己喜歡的名稱,Type
選擇 x-delayed-message,需要新增一個 Argument,Key 為 x-delayed-type
,Value 為 direct
。
新增 Queue
我們新增一個 Queue,用來從 x-delayed-message Exchange 接收延遲傳遞的訊息。
綁定 Queue 到 x-delayed-message Exchange
在 From exchange
欄位輸入剛剛新增的 x-delayed-message Exchange 名稱,自訂一個 Routing Key
。
用程式碼來綁定 Queue 到 x-delayed-message Exchange
除了用 RabbitMQ 的管理介面之外,我們還可以用程式碼來綁定 Queue 到 x-delayed-message Exchange。
// 新增 x-delayed-message Exchange
var exchangeName = "delayed-message";
var args = new Dictionary<string, object> { ["x-delayed-type"] = "direct" };
channel.ExchangeDeclare(exchangeName, "x-delayed-message", true, false, args);
// 新增 Queue
var queueName = "delayed";
channel.QueueDeclare(queueName, true, false, false, null);
// 綁定 Queue 到 x-delayed-message Exchange
var routingKey = "delayed-notification";
channel.QueueBind(queueName, exchangeName, routingKey, null);
發送延後傳遞的訊息
Queue 綁定好之後,我們在發送的訊息中要加入一個 Header,Key 是 x-delay
,Value 是毫秒。
var delay = new Dictionary<string, object> { ["x-delay"] = 5000 };
channel.BasicPublish(
exchangeName,
routingKey,
new BasicProperties { Headers = delay },
Encoding.UTF8.GetBytes($"Hello, Send: {DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}"));
我分別測試了 5 秒、1 小時、2 小時,訊息皆在預期的時間傳遞過來。
發送特定時間傳遞的訊息
我們把指定的時間減掉當前時間的總毫秒數,拿來當 x-delay 的參數值,就可以做到在特定時間才把訊息傳遞出去,根據文件的說明,最大毫秒數為 (2^32)-1
毫秒。
var delayMilliseconds = Convert.ToInt32(new DateTime(2019, 5, 16, 17, 55, 0).Subtract(DateTime.Now).TotalMilliseconds);
var delay = new Dictionary<string, object> { ["x-delay"] = delayMilliseconds };
channel.BasicPublish(
exchangeName,
routingKey,
new BasicProperties { Headers = delay },
Encoding.UTF8.GetBytes($"Hello, Send: {DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}"));
另外就是相信各位應該有注意到,那個送達的時間有些許的落差,也就是說不是百分之百地精準,由於這個主要是靠 Countdown 來做到在特定時間傳遞訊息,所以 RabbitMQ Server 的 Performance,以及 Client 端與 Server 端時間的不同步,都會造成時間差,因此如果對時間敏感的需求,建議不要使用 RabbitMQ 的 rabbitmq_delayed_message_exchange Plugin 來處理延後傳遞的訊息。