Skip to content

Commit

Permalink
perf(streams): add single-character fast path for DelimiterStream() (
Browse files Browse the repository at this point in the history
…denoland#3739)

* perf(DelimiterStream): Implement fast path for char delimiter streams

* factor out flush and simplify constructor

---------

Co-authored-by: Asher Gomez <[email protected]>
  • Loading branch information
aapoalas and iuioiua committed Nov 10, 2023
1 parent 943f1a3 commit 1d99bac
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 9 deletions.
111 changes: 102 additions & 9 deletions streams/delimiter_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,23 @@ export class DelimiterStream extends TransformStream<Uint8Array, Uint8Array> {
#bufs: Uint8Array[] = [];
#delimiter: Uint8Array;
#matchIndex = 0;
#delimLPS: Uint8Array;
#delimLPS: Uint8Array | null;
#disp: DelimiterDisposition;

constructor(
delimiter: Uint8Array,
options?: DelimiterStreamOptions,
) {
super({
transform: (chunk, controller) => {
this.#handle(chunk, controller);
},
flush: (controller) => {
controller.enqueue(concat(...this.#bufs));
},
transform: (chunk, controller) =>
delimiter.length === 1
? this.#handleChar(chunk, controller)
: this.#handle(chunk, controller),
flush: (controller) => this.#flush(controller),
});

this.#delimiter = delimiter;
this.#delimLPS = createLPS(delimiter);
this.#delimLPS = delimiter.length > 1 ? createLPS(delimiter) : null;
this.#disp = options?.disposition ?? "discard";
}

Expand All @@ -85,7 +84,7 @@ export class DelimiterStream extends TransformStream<Uint8Array, Uint8Array> {
const disposition = this.#disp;
const delimiter = this.#delimiter;
const delimLen = delimiter.length;
const lps = this.#delimLPS;
const lps = this.#delimLPS as Uint8Array;
let chunkStart = 0;
let matchIndex = this.#matchIndex;
let inspectIndex = 0;
Expand Down Expand Up @@ -189,4 +188,98 @@ export class DelimiterStream extends TransformStream<Uint8Array, Uint8Array> {
bufs.push(chunk.subarray(chunkStart));
}
}

/**
* Optimized handler for a char delimited stream:
*
* For char delimited streams we do not need to keep track of
* the match index, removing the need for a fair bit of work.
*/
#handleChar(
chunk: Uint8Array,
controller: TransformStreamDefaultController<Uint8Array>,
) {
const bufs = this.#bufs;
const length = chunk.byteLength;
const disposition = this.#disp;
const delimiter = this.#delimiter[0];
let chunkStart = 0;
let inspectIndex = 0;
while (inspectIndex < length) {
if (chunk[inspectIndex] === delimiter) {
// Next byte matched our next delimiter
inspectIndex++;
/**
* Always non-negative
*/
const delimitedChunkEnd = disposition === "suffix"
? inspectIndex
: inspectIndex - 1;
if (delimitedChunkEnd === 0 && bufs.length === 0) {
// Our chunk started with a delimiter and no previous chunks exist:
// Enqueue an empty chunk.
controller.enqueue(new Uint8Array());
chunkStart = disposition === "prefix" ? 0 : 1;
} else if (delimitedChunkEnd > 0 && bufs.length === 0) {
// No previous chunks, slice from current chunk.
controller.enqueue(chunk.subarray(chunkStart, delimitedChunkEnd));
// Our chunk may have more than one delimiter; we must remember where
// the next delimited chunk begins.
chunkStart = disposition === "prefix"
? inspectIndex - 1
: inspectIndex;
} else if (delimitedChunkEnd === 0 && bufs.length > 0) {
// Our chunk started with a delimiter, previous chunks are passed as
// they are (with concatenation).
if (bufs.length === 1) {
// Concat not needed when a single buffer is passed.
controller.enqueue(bufs[0]);
} else {
controller.enqueue(concat(...bufs));
}
// Drop all previous chunks.
bufs.length = 0;
if (disposition !== "prefix") {
// suffix or discard: The next chunk starts where our inspection finished.
// We should only ever end up here with a discard disposition as
// for a suffix disposition this branch would mean that the previous
// chunk ended with a full match but was not enqueued.
chunkStart = inspectIndex;
}
} else if (delimitedChunkEnd > 0 && bufs.length > 0) {
// Previous chunks and current chunk together form a delimited chunk.
const chunkSliced = chunk.subarray(chunkStart, delimitedChunkEnd);
const result = concat(...bufs, chunkSliced);
bufs.length = 0;
chunkStart = disposition === "prefix"
? delimitedChunkEnd
: inspectIndex;
controller.enqueue(result);
} else {
throw new Error("unreachable");
}
} else {
inspectIndex++;
}
}
if (chunkStart === 0) {
bufs.push(chunk);
} else if (chunkStart < length) {
// If we matched partially somewhere in the middle of our chunk
// then the remnants should be pushed into buffers.
bufs.push(chunk.subarray(chunkStart));
}
}

#flush(controller: TransformStreamDefaultController<Uint8Array>) {
const bufs = this.#bufs;
const length = bufs.length;
if (length === 0) {
controller.enqueue(new Uint8Array());
} else if (length === 1) {
controller.enqueue(bufs[0]);
} else {
controller.enqueue(concat(...bufs));
}
}
}
70 changes: 70 additions & 0 deletions streams/delimiter_stream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,76 @@ Deno.test("[streams] DelimiterStream, prefix", async () => {
await testTransformStream(delimStream, DELIMITER_STREAM_INPUTS, outputs);
});

const CHAR_DELIMITER_STREAM_INPUTS = [
"a", // more than one subsequent chunks with no delimiters
"b", // more than one subsequent chunks with no delimiters
"c_", // more than one subsequent chunks with no delimiters
"_", // chunk with only delimiter
"qwertzu", // no delimiter
"iopasd_mnbvc", // one delimiter in the middle
"xylkjh_gfdsap_oiuzt", // two separate delimiters
"euoi__aueiou", // two consecutive delimiters
"rewq098765432", // more than one intermediate chunks with no delimiters
"349012i491290", // more than one intermediate chunks with no delimiters
"asdfghjkliop", // more than one intermediate chunks with no delimiters
"ytrewq_mnbvcxz", // one delimiter in the middle after multiple chunks with no delimiters
"_asd", // chunk starts with delimiter
].map((s) => new TextEncoder().encode(s));

Deno.test("[streams] DelimiterStream, char delimiter, discard", async () => {
const delim = new TextEncoder().encode("_");
const delimStream = new DelimiterStream(delim, { disposition: "discard" });
const outputs = [
"abc",
"",
"qwertzuiopasd",
"mnbvcxylkjh",
"gfdsap",
"oiuzteuoi",
"",
"aueiourewq098765432349012i491290asdfghjkliopytrewq",
"mnbvcxz",
"asd",
].map((s) => new TextEncoder().encode(s));
await testTransformStream(delimStream, CHAR_DELIMITER_STREAM_INPUTS, outputs);
});

Deno.test("[streams] DelimiterStream, char delimiter, suffix", async () => {
const delim = new TextEncoder().encode("_");
const delimStream = new DelimiterStream(delim, { disposition: "suffix" });
const outputs = [
"abc_",
"_",
"qwertzuiopasd_",
"mnbvcxylkjh_",
"gfdsap_",
"oiuzteuoi_",
"_",
"aueiourewq098765432349012i491290asdfghjkliopytrewq_",
"mnbvcxz_",
"asd",
].map((s) => new TextEncoder().encode(s));
await testTransformStream(delimStream, CHAR_DELIMITER_STREAM_INPUTS, outputs);
});

Deno.test("[streams] DelimiterStream, char delimiter, prefix", async () => {
const delim = new TextEncoder().encode("_");
const delimStream = new DelimiterStream(delim, { disposition: "prefix" });
const outputs = [
"abc",
"_",
"_qwertzuiopasd",
"_mnbvcxylkjh",
"_gfdsap",
"_oiuzteuoi",
"_",
"_aueiourewq098765432349012i491290asdfghjkliopytrewq",
"_mnbvcxz",
"_asd",
].map((s) => new TextEncoder().encode(s));
await testTransformStream(delimStream, CHAR_DELIMITER_STREAM_INPUTS, outputs);
});

Deno.test("[streams] DelimiterStream, regression 3609", async () => {
const delimStream = new DelimiterStream(new TextEncoder().encode(";"));
const inputs = [
Expand Down

0 comments on commit 1d99bac

Please sign in to comment.