Skip to content

Commit

Permalink
refactor: simplify the handling of the "drain" event
Browse files Browse the repository at this point in the history
The two event handlers are merged into one.
  • Loading branch information
darrachequesne committed Jun 13, 2024
1 parent ef1c4c8 commit 407c3ad
Showing 1 changed file with 20 additions and 30 deletions.
50 changes: 20 additions & 30 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,25 +223,40 @@ export class Socket extends EventEmitter {
private setTransport(transport) {
const onError = this.onError.bind(this);
const onPacket = this.onPacket.bind(this);
const flush = this.flush.bind(this);
const onDrain = this.onDrain.bind(this);
const onClose = this.onClose.bind(this, "transport close");

this.transport = transport;
this.transport.once("error", onError);
this.transport.on("packet", onPacket);
this.transport.on("drain", flush);
this.transport.on("drain", onDrain);
this.transport.once("close", onClose);
// this function will manage packet events (also message callbacks)
this.setupSendCallback();

this.cleanupFn.push(function () {
transport.removeListener("error", onError);
transport.removeListener("packet", onPacket);
transport.removeListener("drain", flush);
transport.removeListener("drain", onDrain);
transport.removeListener("close", onClose);
});
}

/**
* Upon transport "drain" event
*
* @private
*/
private onDrain() {
this.flush();

if (this.sentCallbackFn.length > 0) {
debug("executing batch send callback");
const seqFn = this.sentCallbackFn.shift();
for (let i = 0; i < seqFn.length; i++) {
seqFn[i](this.transport);
}
}
}

/**
* Upgrades socket to the given transport
*
Expand Down Expand Up @@ -388,31 +403,6 @@ export class Socket extends EventEmitter {
}
}

/**
* Setup and manage send callback
*
* @api private
*/
private setupSendCallback() {
// the message was sent successfully, execute the callback
const onDrain = () => {
if (this.sentCallbackFn.length > 0) {
debug("executing batch send callback");
const seqFn = this.sentCallbackFn.shift();
const l = seqFn.length;
for (let i = 0; i < l; i++) {
seqFn[i](this.transport);
}
}
};

this.transport.on("drain", onDrain);

this.cleanupFn.push(() => {
this.transport.removeListener("drain", onDrain);
});
}

/**
* Sends a message packet.
*
Expand Down

0 comments on commit 407c3ad

Please sign in to comment.