forked from zone-eu/zone-mta
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sender.js
142 lines (116 loc) · 4.14 KB
/
sender.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
'use strict';
// NB! This script is ran as a separate process, so no direct access to the queue, no data
// sharing with other part of the code etc.
const SendingZone = require('./lib/sending-zone').SendingZone;
const config = require('config');
const log = require('npmlog');
// initialize plugin system
const plugins = require('./lib/plugins');
plugins.init('sender');
const Sender = require('./lib/sender');
const crypto = require('crypto');
const QueueClient = require('./lib/transport/client');
const queueClient = new QueueClient(config.queueServer);
const RemoteQueue = require('./lib/remote-queue');
const senders = new Set();
let cmdId = 0;
let responseHandlers = new Map();
let closing = false;
let zone;
// Read command line arguments
let currentZone = (process.argv[2] || '').toString().trim().toLowerCase();
let clientId = (process.argv[3] || '').toString().trim().toLowerCase() || crypto.randomBytes(10).toString('hex');
// Find and setup correct Sending Zone
Object.keys(config.zones || {}).find(zoneName => {
let zoneData = config.zones[zoneName];
if (zoneName === currentZone) {
zone = new SendingZone(zoneName, zoneData, false);
return true;
}
return false;
});
if (!zone) {
require('./lib/logger'); // eslint-disable-line global-require
log.error('Sender/' + process.pid, 'Unknown Zone %s', currentZone);
return process.exit(5);
}
let logName = 'Sender/' + zone.name + '/' + process.pid;
log.level = 'logLevel' in zone ? zone.logLevel : config.log.level;
require('./lib/logger'); // eslint-disable-line global-require
log.info(logName, '[%s] Starting sending for %s', clientId, zone.name);
process.title = config.ident + ': sender/' + currentZone;
let sendCommand = (cmd, callback) => {
let id = ++cmdId;
let data = {
req: id
};
if (typeof cmd === 'string') {
cmd = {
cmd
};
}
Object.keys(cmd).forEach(key => data[key] = cmd[key]);
responseHandlers.set(id, callback);
queueClient.send(data);
};
queueClient.connect(err => {
if (err) {
log.error(logName, 'Could not connect to Queue server. %s', err.message);
process.exit(1);
}
queueClient.on('close', () => {
if (!closing) {
log.error(logName, 'Connection to Queue server closed unexpectedly');
process.exit(1);
}
});
queueClient.on('error', err => {
if (!closing) {
log.error(logName, 'Connection to Queue server ended with error %s', err.message);
process.exit(1);
}
});
queueClient.onData = (data, next) => {
let callback;
if (responseHandlers.has(data.req)) {
callback = responseHandlers.get(data.req);
responseHandlers.delete(data.req);
setImmediate(() => callback(data.error ? new Error(data.error) : null, !data.error && data.response));
}
next();
};
// Notify the server about the details of this client
queueClient.send({
cmd: 'HELLO',
zone: zone.name,
id: clientId
});
let queue = new RemoteQueue();
queue.init(sendCommand, err => {
if (err) {
log.error(logName, 'Queue error %s', err.message);
return process.exit(1);
}
plugins.handler.queue = queue;
plugins.handler.load(() => {
log.info(logName, '%s plugins loaded', plugins.handler.loaded.length);
});
// start sending instances
for (let i = 0; i < zone.connections; i++) {
// use artificial delay to lower the chance of races
setTimeout(() => {
let sender = new Sender(clientId, i + 1, zone, sendCommand, queue);
senders.add(sender);
sender.once('error', err => {
log.info(logName, 'Sender error. %s', err.message);
closing = true;
senders.forEach(sender => {
sender.removeAllListeners('error');
sender.close();
});
senders.clear();
});
}, Math.random() * 1500);
}
});
});