[RABBITMQ] Persistent, Durable, Ack


1. 메시지 수신 자동 확인(ack , noAck)

작업을 수행하는 데 몇 초가 걸릴 수 있습니다.

소비자 중 한 명이 긴 작업을 시작하고 부분적으로 만 수행되어 사망하는 경우 어떻게되는지 궁금 할 수 있습니다.

현재의 코드(

noAck : true
)를 사용하면 RabbitMQ가 고객에게 메시지를 전달하면 바로 삭제 표시가됩니다.

이 경우 작업자를 죽이면 처리중인 메시지가 손실됩니다.

이 특정 작업자에게 발송되었지만 아직 처리되지 않은 모든 메시지도 손실됩니다.

noAck : false
ack를 전송하지 않고 소비자가 죽거나 (채널이 닫히거나 연결이 끊어 지거나 TCP 연결이 끊어지는 경우),

RabbitMQ는 메시지가 완전히 처리되지 않았 음을 인식하고 다시 대기합니다.

가끔씩 사망하더라도 메시지를 잃어 버리지 않을 것입니다.

메시지 시간 초과가 없습니다. RabbitMQ는 소비자가 사망 할 때 메시지를 재전송합니다.

메시지 처리가 매우 오랜 시간이 걸리는 경우에도 괜찮습니다.

앞의 예에서 메시지 수신 확인이 해제되었습니다. 작업을 마친 후 에는 noAck : false

옵션 을 사용하여 설정을 해제하고 작업자에게 적절한 응답을 보내야합니다.

ex) 자동 응답

ch.consume(q, function(msg) {
var secs = msg.content.toString().split(‘.’).length - 1;

console.log(“ [x] Received %s”, msg.content.toString());
setTimeout(function() {
console.log(“ [x] Done”);
}, secs * 1000);
}, {noAck: true});

ex) 자동 미응답 & ack(msg)

ch.consume(q, function(msg) {
var secs = msg.content.toString().split(‘.’).length - 1;

console.log(“ [x] Received %s”, msg.content.toString());
setTimeout(function() {
console.log(“ [x] Done”);
ch.ack(msg);
}, secs * 1000);
}, {noAck: false});


2. 메시지 내구성(durable) , 지속성(persistent)

우리는 소비자가 사망하더라도 작업이 손실되지 않도록하는 방법을 배웠습니다. 그러나 RabbitMQ 서버가 중지되면 우리의 작업은 여전히 손실됩니다.

RabbitMQ가 종료되거나 충돌하면 사용자가 알리지 않는 한 대기열과 메시지를 잊어 버리게됩니다. 메시지가 손실되지 않도록하려면 큐와 메시지를 모두 튼튼하게 표시해야합니다.

첫째, 우리는 RabbitMQ가 결코 우리 큐를 잃지 않도록해야합니다. 그렇게하기 위해서 우리는 그것을 durable 으로 선언 할 필요가있다 .

ex)

큐 할당시

ch.assertQueue ( ‘hello’ , { durable : true });
큐로 전송시 옵션

ch.sendToQueue(q, new Buffer(msg), {persistent: true});


3. 최종적으로 구현해야 할 옵션을 포함한 소스

new_task.js

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'task_queue';
var msg = process.argv.slice(2).join(' ') || "Hello World!";

ch.assertQueue(q, {durable: true});
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
console.log(" [x] Sent '%s'", msg);
});
setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

worker.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'task_queue';

ch.assertQueue(q, {durable: true});
ch.prefetch(1);
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
ch.consume(q, function(msg) {
var secs = msg.content.toString().split('.').length - 1;

console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() {
console.log(" [x] Done");
ch.ack(msg);
}, secs * 1000);
}, {noAck: false});
});
});