Skip to content

Commit

Permalink
Prettify all files
Browse files Browse the repository at this point in the history
  • Loading branch information
jbmusso committed Aug 29, 2017
1 parent 5ee724e commit 39a1d15
Show file tree
Hide file tree
Showing 14 changed files with 184 additions and 153 deletions.
102 changes: 56 additions & 46 deletions src/GremlinClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import MessageStream from './MessageStream';
import executeHandler from './executeHandler';
import * as Utils from './utils';


class GremlinClient extends EventEmitter {
constructor(port = 8182, host = 'localhost', options = {}) {
super();
Expand All @@ -37,8 +36,8 @@ class GremlinClient extends EventEmitter {
user: '',
password: '',
...options,
path: path && path.length && !path.startsWith('/') ? `/${path}` : path
}
path: path && path.length && !path.startsWith('/') ? `/${path}` : path,
};

this.useSession = this.options.session;
this.user = this.options.user;
Expand All @@ -65,12 +64,18 @@ class GremlinClient extends EventEmitter {
}

createConnection({ port, host, path, ssl, rejectUnauthorized }) {
const connection = new WebSocketGremlinConnection({ port, host, path, ssl, rejectUnauthorized });
const connection = new WebSocketGremlinConnection({
port,
host,
path,
ssl,
rejectUnauthorized,
});

connection.on('open', () => this.onConnectionOpen());
connection.on('error', (error) => this.handleError(error));
connection.on('message', (message) => this.handleProtocolMessage(message));
connection.on('close', (event) => this.handleDisconnection(event))
connection.on('error', error => this.handleError(error));
connection.on('message', message => this.handleProtocolMessage(message));
connection.on('close', event => this.handleDisconnection(event));

return connection;
}
Expand All @@ -87,7 +92,7 @@ class GremlinClient extends EventEmitter {
warn(code, message) {
this.emit('warning', {
code,
message
message,
});
}

Expand All @@ -114,7 +119,10 @@ class GremlinClient extends EventEmitter {
// If we didn't find a stream for this response, emit a warning on the
// client
if (!this.commands[requestId]) {
this.warn('OrphanedResponse', `Received response for missing or closed request: ${requestId}`);
this.warn(
'OrphanedResponse',
`Received response for missing or closed request: ${requestId}`,
);
return;
}

Expand All @@ -139,7 +147,10 @@ class GremlinClient extends EventEmitter {
break;
default:
delete this.commands[requestId];
messageStream.emit('error', new Error(statusMessage + ' (Error '+ statusCode +')'));
messageStream.emit(
'error',
new Error(statusMessage + ' (Error ' + statusCode + ')'),
);
break;
}
}
Expand All @@ -153,17 +164,17 @@ class GremlinClient extends EventEmitter {
this.emit('connect');

this.executeQueue();
};
}

/**
* @param {CloseEvent} event
*/
handleDisconnection(event) {
this.cancelPendingCommands({
message: 'WebSocket closed',
details: event
details: event,
});
};
}

/**
* Process the current command queue, sending commands to Gremlin Server
Expand All @@ -174,7 +185,7 @@ class GremlinClient extends EventEmitter {
let { message } = this.queue.shift();
this.sendMessage(message);
}
};
}

/**
* @param {Object} reason
Expand All @@ -189,11 +200,11 @@ class GremlinClient extends EventEmitter {
this.queue.length = 0;
this.commands = {};

Object.keys(commands).forEach((key) => {
Object.keys(commands).forEach(key => {
command = commands[key];
command.messageStream.emit('error', error);
});
};
}

/**
* For a given script string and optional bound parameters, build a protocol
Expand All @@ -204,7 +215,10 @@ class GremlinClient extends EventEmitter {
* @param {Object} message
*/
buildMessage(rawScript, rawBindings = {}, baseMessage = {}) {
let { gremlin, bindings } = Utils.buildQueryFromSignature(rawScript, rawBindings);
let { gremlin, bindings } = Utils.buildQueryFromSignature(
rawScript,
rawBindings,
);
const { processor, op, accept, language, aliases } = this.options;

const baseArgs = { gremlin, bindings, accept, language, aliases };
Expand All @@ -215,7 +229,7 @@ class GremlinClient extends EventEmitter {
processor,
op,
args,
...baseMessage
...baseMessage,
};

if (this.useSession) {
Expand All @@ -225,21 +239,21 @@ class GremlinClient extends EventEmitter {
}

return message;
};
}

buildChallengeResponse(requestId) {
const { processor, op, accept, language, aliases } = this.options;
var args = { SASL: utf8.encode('\0' + this.user + '\0' + this.password) };

const message = {
requestId: requestId,
processor,
op: 'authentication',
args
args,
};

return message;
};
}

sendMessage(message) {
let serializedMessage = this.options.accept + JSON.stringify(message);
Expand All @@ -255,7 +269,7 @@ class GremlinClient extends EventEmitter {
}

this.connection.sendMessage(binaryMessage);
};
}

/**
* Asynchronously send a script to Gremlin Server for execution and fire
Expand Down Expand Up @@ -310,18 +324,18 @@ class GremlinClient extends EventEmitter {
// Create a local highland 'through' pipeline so we don't expose
// a Highland stream to the end user, but a standard Node.js Stream2
const through = _.pipeline(
_.map(({ result: { data }}) => data),
_.sequence()
_.map(({ result: { data } }) => data),
_.sequence(),
);

let rawStream = messageStream.pipe(through);

messageStream.on('error', (e) => {
messageStream.on('error', e => {
rawStream.emit('error', new Error(e));
});

return rawStream;
};
}

/**
* Execute the script and return a stream of raw messages returned by Gremlin
Expand All @@ -343,13 +357,13 @@ class GremlinClient extends EventEmitter {

const command = {
message: this.buildMessage(script, bindings, rawMessage),
messageStream: stream
messageStream: stream,
};

this.sendCommand(command); //todo improve for streams

return stream;
};
}

/**
* Send a command to Gremlin Server, or add it to queue if the connection
Expand All @@ -358,12 +372,7 @@ class GremlinClient extends EventEmitter {
* @param {Object} command
*/
sendCommand(command) {
const {
message,
message: {
requestId
}
} = command;
const { message, message: { requestId } } = command;

this.commands[requestId] = command;

Expand All @@ -372,7 +381,7 @@ class GremlinClient extends EventEmitter {
} else {
this.queue.push(command);
}
};
}

traversalSource() {
const { g } = gremlin;
Expand All @@ -382,15 +391,16 @@ class GremlinClient extends EventEmitter {
const awaitable = new Proxy(g, {
get: (traversal, name, receiver) => {
if (name === 'toPromise') {
return () => new Promise((resolve, reject) => {
const { query, params } = renderChain(chain);
this.execute(query, params, (err, result) => {
if (err) {
return reject(err);
}
resolve(result);
return () =>
new Promise((resolve, reject) => {
const { query, params } = renderChain(chain);
this.execute(query, params, (err, result) => {
if (err) {
return reject(err);
}
resolve(result);
});
});
});
}

chain = chain[name];
Expand All @@ -399,14 +409,14 @@ class GremlinClient extends EventEmitter {
get(target2, name2, receiver2) {
target2 = target2[name];
return awaitable;
}
},
})[name];
},
apply(traversal, thisArg, args) {
Reflect.apply(chain, null, args);

return awaitable;
}
},
});

return awaitable;
Expand Down
3 changes: 1 addition & 2 deletions src/MessageStream.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import ReadableStream from 'readable-stream';


class MessageStream extends ReadableStream {
constructor(...args) {
super(...args)
super(...args);
}

_read() {
Expand Down
15 changes: 7 additions & 8 deletions src/WebSocketGremlinConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,24 @@ import { EventEmitter } from 'events';

import WebSocket from 'ws';


export default class WebSocketGremlinConnection extends EventEmitter {
export default class WebSocketGremlinConnection extends EventEmitter {
constructor({ port, host, path, ssl, rejectUnauthorized }) {
super();

this.open = false;

const address = `ws${ssl ? 's' : ''}:https://${host}:${port}${path}`;
const options = {
rejectUnauthorized
rejectUnauthorized,
};

this.ws = new WebSocket(address, null, options);

this.ws.onopen = () => this.onOpen();
this.ws.onerror = (err) => this.handleError(err);
this.ws.onmessage = (message) => this.handleMessage(message);
this.ws.onclose = (event) => this.onClose(event);
this.ws.binaryType = "arraybuffer";
this.ws.onerror = err => this.handleError(err);
this.ws.onmessage = message => this.handleMessage(message);
this.ws.onclose = event => this.onClose(event);
this.ws.binaryType = 'arraybuffer';
}

onOpen() {
Expand All @@ -42,7 +41,7 @@ export default class WebSocketGremlinConnection extends EventEmitter {
}

sendMessage(message) {
this.ws.send(message, { mask: true, binary: true }, (err) => {
this.ws.send(message, { mask: true, binary: true }, err => {
if (err) {
this.handleError(err);
}
Expand Down
11 changes: 5 additions & 6 deletions src/bindForClient.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ require('chai').should();
import { createClient, bindForClient } from './';
import { assert } from 'chai';


const getByName = (name) => ({
const getByName = name => ({
gremlin: 'g.V().has("name", name)',
bindings: {
name
}
name,
},
});

describe('.bindForClient()', () => {
it('should return a map of bound functions', async (done) => {
it('should return a map of bound functions', async done => {
const client = createClient();
const queries = bindForClient(client, { getByName });
assert.isFunction(queries.getByName);
Expand All @@ -21,7 +20,7 @@ describe('.bindForClient()', () => {
assert.property(promise, 'query');

const result = await promise;
result.length.should.equal(1)
result.length.should.equal(1);
done();
});
});
Loading

0 comments on commit 39a1d15

Please sign in to comment.