在nodejs上實作rabbitMQ的client端
使用amqplib module
依照文件做一些紀錄
Overview
1.在與RabbitMQ建立連線connection後
在connection上是可以建立多個channel
2.RPC端透過AMQP與Server連接 都是透過非同步的方式的溝通
也就是說 RPC不是直接與RPC溝通 而是透過中介的RabbitMQ Server
傳送與接收
因此大多數amqplib提供的操作method 並不會由server取得回應
或只是接收到boolean的回傳值
Dealing with failure
大多數AMQP的操作,只要沒達到預期的狀態就會當作錯誤
因為錯誤可能造成channel關閉,錯誤發生或可能產生的效應
1.目前發生錯誤的RPC會被reject
2.使用的channel物件會因為錯誤 造成之後的操作發生問題
3.任何等待傳送的RPC 都會被reject
4.造成channel物件emit 'error'
5.造成channel物件emit 'close'
當channel發生錯誤有可能是其他的RPC所造成
而非當下連線的RPC造成
要去找到問題發生原因可以透過
try catch捕捉到錯誤e的stackAtStateChange會是有用的錯誤資訊
connection.createChannel().then(function(ch) {
ch.close();
try {
ch.close();
}
catch (alreadyClosed) {
console.log(alreadyClosed.stackAtStateChange);
}
});
Flow Control
channel的運作會類似於stream.Writable
1.當呼叫publish或sendToQueue時會回傳
true:代表 'keep sending'
false:代表 please wait for a 'drain' event
2.呼叫以下這些method則不會有任何server的回傳
包括ack, ackAll, nack, nackAll, and reject
3.ConfirmChannel則會以callback來接收boolean值
當server有確認了message時 callback會被呼叫
Argument handling
許多的method操作都會吃一個option參數
option本身有一些預設的option參數值
如果傳入非option定義的參數內容會被忽略
通常會把option合併成一個單一物件便於使用
var common_options = {durable: true, noAck: true};
var bar_opts = Object.create(common_options);
bar_opts.autoDelete = true;
var foo_consume_opts = Object.create(common_options);
// 使用arguments table方式priority,必須以x-格式寫,
foo_consume_opts.arguments = {'x-priority': 10};
var bar_consume_opts = Object.create(foo_consume_opts);
// 覆寫之前已經設定的prooprity值
bar_consume_opts.priority = 5;
Field table values
在publish與sendToQueue的option內
包括arguments與headers都是 field table值
都是key-value object型式
有一些特別的value型態會在table內被encode
當js內會以!語法來標明其型態
ex:
{'!': 'int8', value: 64}
{'!': 'decimal', value: {digits: 4999, places: 2}}