Skip to content

Commit

Permalink
Add live data logging to mongodb.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tiago Siebler committed Jan 1, 2018
1 parent 6536c15 commit 35be925
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 49 deletions.
19 changes: 9 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,16 @@ require('./lib/DBCore')(logger, (err, db)=>{
if(err){
throw "Refusing to continue without MongoDB connection: " + err;
process.exit(1);

}else{
ctrl.storage.db = db;

ctrl.options.storage.logHistory = true;
ctrl.UI = require('./lib/UI')(ctrl.options),
ctrl.events = require('./lib/EventsCore')(ctrl);

// We're ready to start. Load up the webhook streams and start making it rain.
require('./lib/StreamsCore')(ctrl);
}

ctrl.storage.db = db;
ctrl.options.storage.logHistory = true;

ctrl.UI = require('./lib/UI')(ctrl.options),
ctrl.events = require('./lib/EventsCore')(ctrl);

// We're ready to start. Load up the webhook streams and start making it rain.
require('./lib/StreamsCore')(ctrl);
});


Expand Down
77 changes: 55 additions & 22 deletions lib/CurrencyCore.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,23 @@ CurrencyCore.getCurrencyFromStream = (stream, fromCur, toCur)=>{
currency.flipped = true;
currency.rate = (1/currency.b);
}
currency.stepFrom = fromCur;
currency.toCurrency = toCur;

return currency;
}
CurrencyCore.getArbitageRate = (stream, step1, step2, step3)=>{
if(!stream || !step1 || !step2 || !step3) return;

var a = CurrencyCore.getCurrencyFromStream(stream, step1, step2);
var b = CurrencyCore.getCurrencyFromStream(stream, step2, step3);
var c = CurrencyCore.getCurrencyFromStream(stream, step3, step1);

if(!a || !b || !c) return;

var d = (a.rate) * (b.rate) * (c.rate);
return d;
var ret = {
a: CurrencyCore.getCurrencyFromStream(stream, step1, step2),
b: CurrencyCore.getCurrencyFromStream(stream, step2, step3),
c: CurrencyCore.getCurrencyFromStream(stream, step3, step1)
};

if(!ret.a || !ret.b || !ret.c) return;

ret.rate = (ret.a.rate) * (ret.b.rate) * (ret.c.rate);
return ret;
}

CurrencyCore.getCandidatesFromStreamViaPath = (stream, aPair, bPair)=>{
Expand Down Expand Up @@ -125,22 +128,54 @@ CurrencyCore.getCandidatesFromStreamViaPath = (stream, aPair, bPair)=>{
if(stepC){
keys.c = match.key;

var rate = CurrencyCore.getArbitageRate(stream, keys.a, keys.b, keys.c);
if(isNaN(rate)){
// debugger;
}else{
var comparison = CurrencyCore.getArbitageRate(stream, keys.a, keys.b, keys.c);
if(comparison){
var dt = new Date();
var triangle = {
a: keys.a,
b: keys.b,
c: keys.c,
rate: rate
ws_ts: comparison.a.E,
ts: +dt,
dt: dt,

// these are for storage later
a: comparison.a,
a_symbol: comparison.a.s,
a_step: comparison.a.key,
a_bid_price: comparison.a.b,
a_bid_quantity: comparison.a.B,
a_ask_price: comparison.a.a,
a_ask_quantity: comparison.a.A,
a_volume: comparison.a.v,
a_trades: comparison.a.n,

b: comparison.b,
b_symbol: comparison.b.s,
b_step: comparison.b.key,
b_bid_price: comparison.b.b,
b_bid_quantity: comparison.b.B,
b_ask_price: comparison.b.a,
b_ask_quantity: comparison.b.A,
b_volume: comparison.b.v,
b_trades: comparison.b.n,

c: comparison.c,
c_symbol: comparison.c.s,
c_step: comparison.c.key,
c_bid_price: comparison.c.b,
c_bid_quantity: comparison.c.B,
c_ask_price: comparison.c.a,
c_ask_quantity: comparison.c.A,
c_volume: comparison.c.v,
c_trades: comparison.c.n,

rate: comparison.rate
};

bmatches.push(triangle);
bmatches.push(triangle);
}
}
}
}
}

if(bmatches.length){
bmatches.sort(function(a, b) { return parseFloat(b.rate) - parseFloat(a.rate); });
}
Expand All @@ -156,8 +191,6 @@ CurrencyCore.getDynamicCandidatesFromStream = (stream, options)=>{
// console.log("adding: " + pMatches.length + " to : " + matches.length);
}


// debugger;
if(matches.length){
matches.sort(function(a, b) { return parseFloat(b.rate) - parseFloat(a.rate); });
}
Expand Down
5 changes: 4 additions & 1 deletion lib/DBCore.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ DBCore.startupDB = (logger, cb)=>{

var u = 'mongodb:https://' + authStr + process.env.mongoHost + ':' + process.env.mongoPort + '/' + process.env.mongoDb + '?' + (authMechanism ? '&authMechanism=' + authMechanism : '' )

require('mongodb').MongoClient.connect(u, (err, db) => {
require('mongodb').MongoClient.connect(u, (err, client) => {
if (err) {
console.error('WARNING: MongoDB Connection Error: ', err)
console.error('WARNING: without MongoDB some features (such as history logging & indicators) may be disabled.')
Expand All @@ -28,6 +28,9 @@ DBCore.startupDB = (logger, cb)=>{
}else{
logger.info('--- \tConnected to MongoDB');
}

var db = client.db(process.env.mongoDb);

return cb(err, db);
})
}
Expand Down
33 changes: 26 additions & 7 deletions lib/StreamsCore.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,33 @@ module.exports = (ctrl)=>{
ctrl.currencyCore = require('./CurrencyCore')(ctrl);

ctrl.storage.candidates = [],
ctrl.storage.streams = [],
ctrl.storage.streamTick = async (stream, streamID)=>{
ctrl.storage.streams = [];

ctrl.storage.saveArbRows = (rows, db, cb)=>{
const ticks = db.collection('ticks');

ticks.insertMany(rows, cb);
}
ctrl.storage.streamTick = (stream, streamID)=>{
ctrl.storage.streams[streamID] = stream;
// Run logic to check for arbitrage opportunities
ctrl.storage.candidates = ctrl.currencyCore.getDynamicCandidatesFromStream(stream,ctrl.options.arbitrage);

// update UI with latest values per currency
ctrl.UI.updateArbitageOpportunities(ctrl.storage.candidates);

if(streamID == 'allMarketTickers'){
// Run logic to check for arbitrage opportunities
ctrl.storage.candidates = ctrl.currencyCore.getDynamicCandidatesFromStream(stream,ctrl.options.arbitrage);

// update UI with latest values per currency
ctrl.UI.updateArbitageOpportunities(ctrl.storage.candidates);

if(ctrl.options.storage.logHistory)
ctrl.storage.saveArbRows(ctrl.storage.candidates, ctrl.storage.db, (err, result)=>{
if(err){
return ctrl.logger.error('--- MongoDB Error in streamTick(): ' + err);
}
ctrl.logger.debug('----- Logged '+result.result.n+' arbitrage rows to DB');

});

}
};

ctrl.logger.info('----- Bot Startup Finished -----');
Expand Down
18 changes: 9 additions & 9 deletions lib/UI.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,30 +76,30 @@ UI.updateArbitageOpportunities = (tickers)=>{
for(i=0;i<UI.maxRows;i++){
var ticker = tickers[i];
if(!ticker) return;

var color = clc.green;
if(ticker.rate && ticker.rate < 1) color = clc.red;

if(ticker.a){

var color = clc.green;
if(ticker.rate && ticker.rate < 1) color = clc.red;

var rate = ((ticker.rate - 1)* 100);
var fees1 = rate * 0.05; //bnb
var fRate1 = rate - fees1;

var fees2 = rate * 0.1; //other
var fRate2 = rate - fees2;

UI.line = new Line(UI.outputBuffer)
.column(ticker.a.toString(), UI.cols[0], [clc.cyan])
.column(ticker.b.toString(), UI.cols[0], [clc.cyan])
.column(ticker.c.toString(), UI.cols[0], [clc.cyan])
.column(ticker.a.key.toString(), UI.cols[0], [clc.cyan])
.column(ticker.b.stepFrom.toString(), UI.cols[0], [clc.cyan])
.column(ticker.c.stepFrom.toString(), UI.cols[0], [clc.cyan])

.column(rate.toFixed(3).toString() + '%', UI.cols[1], [clc.cyan])
.column(fees1.toFixed(3).toString() + '%', UI.cols[1], [clc.cyan])
.column(fRate1.toFixed(3).toString() + '%', 20, [color])

.column(fees2.toFixed(3).toString() + '%', 17, [clc.cyan])
.column(fRate2.toFixed(3).toString() + '%', 20, [color])


.fill()
.store();
}else{
Expand Down

0 comments on commit 35be925

Please sign in to comment.