Skip to content

Commit

Permalink
refactor(metrics): move to core (denoland#12386)
Browse files Browse the repository at this point in the history
Avoids overhead of wrapping ops (and allocs when inspecting async-op futures)
  • Loading branch information
AaronO committed Oct 10, 2021
1 parent f2ac7ff commit 5a8a989
Show file tree
Hide file tree
Showing 19 changed files with 162 additions and 270 deletions.
6 changes: 5 additions & 1 deletion cli/dts/lib.deno.ns.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1792,7 +1792,7 @@ declare namespace Deno {
* Requires `allow-write` permission. */
export function truncate(name: string, len?: number): Promise<void>;

export interface Metrics {
export interface OpMetrics {
opsDispatched: number;
opsDispatchedSync: number;
opsDispatchedAsync: number;
Expand All @@ -1806,6 +1806,10 @@ declare namespace Deno {
bytesReceived: number;
}

export interface Metrics extends OpMetrics {
ops: Record<string, OpMetrics>;
}

/** Receive metrics from the privileged side of Deno. This is primarily used
* in the development of Deno. 'Ops', also called 'bindings', are the go-between
* between Deno JavaScript and Deno Rust.
Expand Down
18 changes: 0 additions & 18 deletions cli/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -804,24 +804,6 @@ declare namespace Deno {
*/
export function sleepSync(millis: number): void;

export interface Metrics extends OpMetrics {
ops: Record<string, OpMetrics>;
}

export interface OpMetrics {
opsDispatched: number;
opsDispatchedSync: number;
opsDispatchedAsync: number;
opsDispatchedAsyncUnref: number;
opsCompleted: number;
opsCompletedSync: number;
opsCompletedAsync: number;
opsCompletedAsyncUnref: number;
bytesSentControl: number;
bytesSentData: number;
bytesReceived: number;
}

/** **UNSTABLE**: New option, yet to be vetted. */
export interface TestDefinition {
/** Specifies the permissions that should be used to run the test.
Expand Down
12 changes: 12 additions & 0 deletions core/01_core.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
Map,
Array,
ArrayPrototypeFill,
ArrayPrototypeMap,
ErrorCaptureStackTrace,
Promise,
ObjectEntries,
ObjectFreeze,
ObjectFromEntries,
MapPrototypeGet,
Expand Down Expand Up @@ -152,6 +154,15 @@
opSync("op_print", str, isErr);
}

function metrics() {
const [aggregate, perOps] = opSync("op_metrics");
aggregate.ops = ObjectFromEntries(ArrayPrototypeMap(
ObjectEntries(opsCache),
([opName, opId]) => [opName, perOps[opId]],
));
return aggregate;
}

// Some "extensions" rely on "BadResource" and "Interrupted" errors in the
// JS code (eg. "deno_net") so they are provided in "Deno.core" but later
// reexported on "Deno.errors"
Expand All @@ -178,6 +189,7 @@
tryClose,
print,
resources,
metrics,
registerErrorBuilder,
registerErrorClass,
opresolve,
Expand Down
7 changes: 6 additions & 1 deletion core/bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ fn opcall_sync<'s>(
mut rv: v8::ReturnValue,
) {
let state_rc = JsRuntime::state(scope);
let state = state_rc.borrow();
let state = state_rc.borrow_mut();

let op_id = match v8::Local::<v8::Integer>::try_from(args.get(0))
.map(|l| l.value() as OpId)
Expand Down Expand Up @@ -344,11 +344,13 @@ fn opcall_sync<'s>(
scope,
a,
b,
op_id,
promise_id: 0,
};
let op = OpTable::route_op(op_id, state.op_state.clone(), payload);
match op {
Op::Sync(result) => {
state.op_state.borrow_mut().tracker.track_sync(op_id);
rv.set(result.to_v8(scope).unwrap());
}
Op::NotFound => {
Expand Down Expand Up @@ -405,6 +407,7 @@ fn opcall_async<'s>(
scope,
a,
b,
op_id,
promise_id,
};
let op = OpTable::route_op(op_id, state.op_state.clone(), payload);
Expand All @@ -417,10 +420,12 @@ fn opcall_async<'s>(
OpResult::Err(_) => rv.set(result.to_v8(scope).unwrap()),
},
Op::Async(fut) => {
state.op_state.borrow_mut().tracker.track_async(op_id);
state.pending_ops.push(fut);
state.have_unpolled_ops = true;
}
Op::AsyncUnref(fut) => {
state.op_state.borrow_mut().tracker.track_unref(op_id);
state.pending_unref_ops.push(fut);
state.have_unpolled_ops = true;
}
Expand Down
1 change: 1 addition & 0 deletions core/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod normalize_path;
mod ops;
mod ops_builtin;
mod ops_json;
mod ops_metrics;
mod resources;
mod runtime;

Expand Down
2 changes: 1 addition & 1 deletion core/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1008,7 +1008,7 @@ mod tests {
dispatch_count_.fetch_add(1, Ordering::Relaxed);
let (control, _): (u8, ()) = payload.deserialize().unwrap();
assert_eq!(control, 42);
let resp = (0, serialize_op_result(Ok(43), state));
let resp = (0, 1, serialize_op_result(Ok(43), state));
Op::Async(Box::pin(futures::future::ready(resp)))
};

Expand Down
9 changes: 8 additions & 1 deletion core/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::error::type_error;
use crate::error::AnyError;
use crate::gotham_state::GothamState;
use crate::ops_metrics::OpsTracker;
use crate::resources::ResourceTable;
use crate::runtime::GetErrorClassFn;
use futures::Future;
Expand All @@ -18,14 +19,16 @@ use std::pin::Pin;
use std::rc::Rc;

pub type PromiseId = u64;
pub type OpAsyncFuture = Pin<Box<dyn Future<Output = (PromiseId, OpResult)>>>;
pub type OpAsyncFuture =
Pin<Box<dyn Future<Output = (PromiseId, OpId, OpResult)>>>;
pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static;
pub type OpId = usize;

pub struct OpPayload<'a, 'b, 'c> {
pub(crate) scope: &'a mut v8::HandleScope<'b>,
pub(crate) a: v8::Local<'c, v8::Value>,
pub(crate) b: v8::Local<'c, v8::Value>,
pub(crate) op_id: OpId,
pub(crate) promise_id: PromiseId,
}

Expand Down Expand Up @@ -96,6 +99,7 @@ pub struct OpState {
pub resource_table: ResourceTable,
pub op_table: OpTable,
pub get_error_class_fn: GetErrorClassFn,
pub(crate) tracker: OpsTracker,
gotham_state: GothamState,
}

Expand All @@ -105,6 +109,9 @@ impl OpState {
resource_table: Default::default(),
op_table: OpTable::default(),
get_error_class_fn: &|_| "Error",
tracker: OpsTracker {
ops: Vec::with_capacity(256),
},
gotham_state: Default::default(),
}
}
Expand Down
12 changes: 12 additions & 0 deletions core/ops_builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::error::type_error;
use crate::error::AnyError;
use crate::include_js_files;
use crate::op_sync;
use crate::ops_metrics::OpMetrics;
use crate::resources::ResourceId;
use crate::void_op_async;
use crate::void_op_sync;
Expand Down Expand Up @@ -32,6 +33,7 @@ pub(crate) fn init_builtins() -> Extension {
"op_wasm_streaming_set_url",
op_sync(op_wasm_streaming_set_url),
),
("op_metrics", op_sync(op_metrics)),
("op_void_sync", void_op_sync()),
("op_void_async", void_op_async()),
])
Expand Down Expand Up @@ -158,3 +160,13 @@ pub fn op_wasm_streaming_set_url(

Ok(())
}

pub fn op_metrics(
state: &mut OpState,
_: (),
_: (),
) -> Result<(OpMetrics, Vec<OpMetrics>), AnyError> {
let aggregate = state.tracker.aggregate();
let per_op = state.tracker.per_op();
Ok((aggregate, per_op))
}
9 changes: 6 additions & 3 deletions core/ops_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ pub fn void_op_async() -> Box<OpFn> {
// to deserialize to the unit type instead of failing with `ExpectedNull`
// op_async(|_, _: (), _: ()| futures::future::ok(()))
Box::new(move |state, payload| -> Op {
let op_id = payload.op_id;
let pid = payload.promise_id;
let op_result = serialize_op_result(Ok(()), state);
Op::Async(Box::pin(futures::future::ready((pid, op_result))))
Op::Async(Box::pin(futures::future::ready((pid, op_id, op_result))))
})
}

Expand Down Expand Up @@ -112,6 +113,7 @@ where
RV: Serialize + 'static,
{
Box::new(move |state, payload| -> Op {
let op_id = payload.op_id;
let pid = payload.promise_id;
// Deserialize args, sync error on failure
let args = match payload.deserialize() {
Expand All @@ -124,7 +126,7 @@ where

use crate::futures::FutureExt;
let fut = op_fn(state.clone(), a, b)
.map(move |result| (pid, serialize_op_result(result, state)));
.map(move |result| (pid, op_id, serialize_op_result(result, state)));
Op::Async(Box::pin(fut))
})
}
Expand All @@ -143,6 +145,7 @@ where
RV: Serialize + 'static,
{
Box::new(move |state, payload| -> Op {
let op_id = payload.op_id;
let pid = payload.promise_id;
// Deserialize args, sync error on failure
let args = match payload.deserialize() {
Expand All @@ -155,7 +158,7 @@ where

use crate::futures::FutureExt;
let fut = op_fn(state.clone(), a, b)
.map(move |result| (pid, serialize_op_result(result, state)));
.map(move |result| (pid, op_id, serialize_op_result(result, state)));
Op::AsyncUnref(Box::pin(fut))
})
}
Expand Down
96 changes: 96 additions & 0 deletions core/ops_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use crate::serde::Serialize;
use crate::OpId;

// TODO(@AaronO): split into AggregateMetrics & PerOpMetrics
#[derive(Clone, Default, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct OpMetrics {
pub ops_dispatched: u64,
pub ops_dispatched_sync: u64,
pub ops_dispatched_async: u64,
pub ops_dispatched_async_unref: u64,
pub ops_completed: u64,
pub ops_completed_sync: u64,
pub ops_completed_async: u64,
pub ops_completed_async_unref: u64,
pub bytes_sent_control: u64,
pub bytes_sent_data: u64,
pub bytes_received: u64,
}

// TODO(@AaronO): track errors
#[derive(Default, Debug)]
pub struct OpsTracker {
pub ops: Vec<OpMetrics>,
}

impl OpsTracker {
pub fn per_op(&self) -> Vec<OpMetrics> {
self.ops.clone()
}

pub fn aggregate(&self) -> OpMetrics {
let mut sum = OpMetrics::default();

for metrics in self.ops.iter() {
sum.ops_dispatched += metrics.ops_dispatched;
sum.ops_dispatched_sync += metrics.ops_dispatched_sync;
sum.ops_dispatched_async += metrics.ops_dispatched_async;
sum.ops_dispatched_async_unref += metrics.ops_dispatched_async_unref;
sum.ops_completed += metrics.ops_completed;
sum.ops_completed_sync += metrics.ops_completed_sync;
sum.ops_completed_async += metrics.ops_completed_async;
sum.ops_completed_async_unref += metrics.ops_completed_async_unref;
sum.bytes_sent_control += metrics.bytes_sent_control;
sum.bytes_sent_data += metrics.bytes_sent_data;
sum.bytes_received += metrics.bytes_received;
}

sum
}

fn ensure_capacity(&mut self, op_id: OpId) {
if op_id >= self.ops.len() {
let delta_len = 1 + op_id - self.ops.len();
self.ops.extend(vec![OpMetrics::default(); delta_len])
}
}

fn metrics_mut(&mut self, id: OpId) -> &mut OpMetrics {
self.ensure_capacity(id);
self.ops.get_mut(id).unwrap()
}

pub fn track_sync(&mut self, id: OpId) {
let metrics = self.metrics_mut(id);
metrics.ops_dispatched += 1;
metrics.ops_completed += 1;
metrics.ops_dispatched_sync += 1;
metrics.ops_completed_sync += 1;
}

pub fn track_async(&mut self, id: OpId) {
let metrics = self.metrics_mut(id);
metrics.ops_dispatched += 1;
metrics.ops_dispatched_async += 1;
}

pub fn track_async_completed(&mut self, id: OpId) {
let metrics = self.metrics_mut(id);
metrics.ops_completed += 1;
metrics.ops_completed_async += 1;
}

pub fn track_unref(&mut self, id: OpId) {
let metrics = self.metrics_mut(id);
metrics.ops_dispatched += 1;
metrics.ops_dispatched_async_unref += 1;
}

pub fn track_unref_completed(&mut self, id: OpId) {
let metrics = self.metrics_mut(id);
metrics.ops_completed += 1;
metrics.ops_completed_async_unref += 1;
}
}
Loading

0 comments on commit 5a8a989

Please sign in to comment.