Skip to content

Commit

Permalink
perf(ext/node): native vectored write for server streams (denoland#19752
Browse files Browse the repository at this point in the history
)

```
# main
$ ./load_test 10 0.0.0.0 8080 0 0
Using message size of 20 bytes
Running benchmark now...
Msg/sec: 106182.250000
Msg/sec: 110279.750000
^C

# this PR
$ ./load_test 10 0.0.0.0 8080 0 0
Using message size of 20 bytes
Running benchmark now...
Msg/sec: 131632.250000
Msg/sec: 134754.250000
^C
```
  • Loading branch information
littledivy committed Jul 7, 2023
1 parent 7d022ad commit e4870d8
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 2 deletions.
42 changes: 42 additions & 0 deletions ext/http/http_next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
Expand Down Expand Up @@ -1034,6 +1035,34 @@ impl UpgradeStream {
.try_or_cancel(cancel_handle)
.await
}

async fn write_vectored(
self: Rc<Self>,
buf1: &[u8],
buf2: &[u8],
) -> Result<usize, AnyError> {
let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await;

let total = buf1.len() + buf2.len();
let mut bufs = [std::io::IoSlice::new(buf1), std::io::IoSlice::new(buf2)];
let mut nwritten = wr.write_vectored(&bufs).await?;
if nwritten == total {
return Ok(nwritten);
}

// Slightly more optimized than (unstable) write_all_vectored for 2 iovecs.
while nwritten <= buf1.len() {
bufs[0] = std::io::IoSlice::new(&buf1[nwritten..]);
nwritten += wr.write_vectored(&bufs).await?;
}

// First buffer out of the way.
if nwritten < total && nwritten > buf1.len() {
wr.write_all(&buf2[nwritten - buf1.len()..]).await?;
}

Ok(total)
}
}

impl Resource for UpgradeStream {
Expand All @@ -1048,3 +1077,16 @@ impl Resource for UpgradeStream {
self.cancel_handle.cancel();
}
}

#[op]
pub async fn op_raw_write_vectored(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
buf1: JsBuffer,
buf2: JsBuffer,
) -> Result<usize, AnyError> {
let resource: Rc<UpgradeStream> =
state.borrow().resource_table.get::<UpgradeStream>(rid)?;
let nwritten = resource.write_vectored(&buf1, &buf2).await?;
Ok(nwritten)
}
1 change: 1 addition & 0 deletions ext/http/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ deno_core::extension!(
http_next::op_http_track,
http_next::op_http_upgrade_websocket_next,
http_next::op_http_upgrade_raw,
http_next::op_raw_write_vectored,
http_next::op_http_try_wait,
http_next::op_http_wait,
],
Expand Down
4 changes: 3 additions & 1 deletion ext/node/polyfills/internal/stream_base_commons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ export function onStreamRead(
}
} else {
const offset = streamBaseState[kArrayBufferOffset];
const buf = Buffer.from(arrayBuffer, offset, nread);
// Performance note: Pass ArrayBuffer to Buffer#from to avoid
// copy.
const buf = Buffer.from(arrayBuffer.buffer, offset, nread);
result = stream.push(buf);
}

Expand Down
30 changes: 29 additions & 1 deletion ext/node/polyfills/internal_binding/stream_wrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ import {
} from "ext:deno_node/internal_binding/async_wrap.ts";
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";

const core = globalThis.Deno.core;
const { ops } = core;

interface Reader {
read(p: Uint8Array): Promise<number | null>;
}
Expand All @@ -54,7 +57,7 @@ export interface Closer {

type Ref = { ref(): void; unref(): void };

enum StreamBaseStateFields {
const enum StreamBaseStateFields {
kReadBytesOrError,
kArrayBufferOffset,
kBytesWritten,
Expand Down Expand Up @@ -195,6 +198,31 @@ export class LibuvStreamWrap extends HandleWrap {
chunks: Buffer[] | (string | Buffer)[],
allBuffers: boolean,
): number {
const supportsWritev = this.provider === providerType.TCPSERVERWRAP;
// Fast case optimization: two chunks, and all buffers.
if (chunks.length === 2 && allBuffers && supportsWritev) {
// String chunks.
if (typeof chunks[0] === "string") chunks[0] = Buffer.from(chunks[0]);
if (typeof chunks[1] === "string") chunks[1] = Buffer.from(chunks[1]);

ops.op_raw_write_vectored(
this[kStreamBaseField]!.rid,
chunks[0],
chunks[1],
).then((nwritten) => {
try {
req.oncomplete(0);
} catch {
// swallow callback errors.
}

streamBaseState[kBytesWritten] = nwritten;
this.bytesWritten += nwritten;
});

return 0;
}

const count = allBuffers ? chunks.length : chunks.length >> 1;
const buffers: Buffer[] = new Array(count);

Expand Down

0 comments on commit e4870d8

Please sign in to comment.