-
Notifications
You must be signed in to change notification settings - Fork 2
/
queue.js
92 lines (72 loc) · 2.44 KB
/
queue.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
'use strict';
const Promise = require('bluebird');
const amqp = require('amqplib');
const logger = require('./libs/logger');
const { AmqpError, code } = require('./libs/errors');
let channel = null;
const codes = {
CANNOT_CONNECT: code(1, 'Can\'t connect to AMQP server'),
CANNOT_CREATE_CHANNEL: code(2, 'Can\'t create a new channel'),
INVALID_TOPOLOGY: code(3, 'Invalid queues/exchange topology')
};
function connect(config, { exchanges, queues }) {
return amqp.connect(config)
.catch(() => {
throw new AmqpError(codes.CANNOT_CONNECT);
})
.then(conn => conn.createChannel())
.catch(err => {
if (err instanceof AmqpError) throw err;
throw new AmqpError(codes.CANNOT_CREATE_CHANNEL);
})
.then(chan => {
channel = chan;
channel.prefetch(1);
const queueAsserted = (queues || []).map(queue =>
channel.assertQueue(queue, { durable: true })
);
const exchangeAsserted = (exchanges || []).map(exchange =>
channel.assertExchange(exchange.name, exchange.type, { durable: true })
.then(() => (exchange.bindings || []))
.map(({ queue, pattern }) =>
channel.bindQueue(queue, exchange.name, pattern)
)
);
return Promise.all([
...queueAsserted,
...exchangeAsserted
]);
})
.catch(err => {
if (err instanceof AmqpError) throw err;
logger.log(err);
throw new AmqpError(codes.INVALID_TOPOLOGY);
});
}
function consume(queue, handler) {
return channel.consume(queue, message => {
let data;
try {
data = JSON.parse(message.content.toString('utf8'));
} catch (err) {
logger.error('Invalid message format: ', message.content.toString('utf8'));
return channel.ack(message);
}
return Promise.resolve(handler(data))
.finally(() => channel.ack(message));
}, { noAck: false });
}
function subscribe(exchange, handler) {
return channel.assertQueue('', { exclusive: true })
.tap(q => channel.bindQueue(q.queue, exchange, ''))
.then(q => consume(q.name, handler));
}
function publish(exchange, data) {
const message = Buffer.from(JSON.stringify(data));
return channel.publish(exchange, '', message, { persistent: true });
}
function send(queue, data) {
const message = Buffer.from(JSON.stringify(data));
return channel.sendToQueue(queue, message, { persistent: true });
}
module.exports = { connect, subscribe, consume, send, publish };