Skip to content

Commit

Permalink
refactor: Move bin ops to deno_core and unify logic with json ops (de…
Browse files Browse the repository at this point in the history
…noland#9457)

This commit moves implementation of bin ops to "deno_core" crates
as well as unifying logic between bin ops and json ops to reuse
as much code as possible (both in Rust and JavaScript).
  • Loading branch information
inteon committed Mar 20, 2021
1 parent 0d26a82 commit 1251c89
Show file tree
Hide file tree
Showing 16 changed files with 403 additions and 599 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import {

const readErrorStackPattern = new RegExp(
`^.*
at handleError \\(.*10_dispatch_buffer\\.js:.*\\)
at bufferOpParseResult \\(.*10_dispatch_buffer\\.js:.*\\)
at Array.<anonymous> \\(.*10_dispatch_buffer\\.js:.*\\).*$`,
at handleError \\(.*core\\.js:.*\\)
at binOpParseResult \\(.*core\\.js:.*\\)
at asyncHandle \\(.*core\\.js:.*\\).*$`,
"ms",
);

Expand All @@ -33,7 +33,7 @@ declare global {
}
}

unitTest(function bufferOpsHeaderTooShort(): void {
unitTest(function binOpsHeaderTooShort(): void {
for (const op of ["op_read_sync", "op_read_async"]) {
const readOpId = Deno.core.ops()[op];
const res = Deno.core.send(
Expand Down
4 changes: 2 additions & 2 deletions cli/tests/unit/dispatch_json_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ unitTest(function malformedJsonControlBuffer(): void {
assertMatch(resObj.err.message, /\bexpected value\b/);
});

unitTest(function invalidPromiseId(): void {
unitTest(function invalidRequestId(): void {
const opId = Deno.core.ops()["op_open_async"];
const reqBuf = new Uint8Array([0, 0, 0, 0, 0, 0, 0]);
const resBuf = Deno.core.send(opId, reqBuf);
Expand All @@ -28,5 +28,5 @@ unitTest(function invalidPromiseId(): void {
console.error(resText);
assertStrictEquals(resObj.ok, undefined);
assertStrictEquals(resObj.err.className, "TypeError");
assertMatch(resObj.err.message, /\bpromiseId\b/);
assertMatch(resObj.err.message, /\brequestId\b/);
});
2 changes: 1 addition & 1 deletion cli/tests/unit/unit_tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import "./console_test.ts";
import "./copy_file_test.ts";
import "./custom_event_test.ts";
import "./dir_test.ts";
import "./dispatch_buffer_test.ts";
import "./dispatch_bin_test.ts";
import "./dispatch_json_test.ts";
import "./error_stack_test.ts";
import "./event_test.ts";
Expand Down
266 changes: 197 additions & 69 deletions core/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,22 @@ SharedQueue Binary Layout
const core = window.Deno.core;
const { recv, send } = core;

////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////// Dispatch /////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

const dispatch = send;
const dispatchByName = (opName, control, ...zeroCopy) =>
dispatch(opsCache[opName], control, ...zeroCopy);

////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////// Shared array buffer ///////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

let sharedBytes;
let shared32;

let asyncHandlers;

let opsCache = {};
const errorMap = {};

function init() {
const shared = core.shared;
Expand All @@ -45,6 +54,7 @@ SharedQueue Binary Layout
assert(shared32 == null);
sharedBytes = new Uint8Array(shared);
shared32 = new Int32Array(shared);

asyncHandlers = [];
// Callers should not call core.recv, use setAsyncHandler.
recv(handleAsyncMsgFromRust);
Expand Down Expand Up @@ -150,124 +160,240 @@ SharedQueue Binary Layout
return [opId, buf];
}

////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////// Error handling //////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

const errorMap = {};

function registerErrorClass(errorName, className, args) {
if (typeof errorMap[errorName] !== "undefined") {
throw new TypeError(`Error class for "${errorName}" already registered`);
}
errorMap[errorName] = [className, args ?? []];
}

function handleError(className, message) {
if (typeof errorMap[className] === "undefined") {
return new Error(
`Unregistered error class: "${className}"\n` +
` ${message}\n` +
` Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
);
}

const [ErrorClass, args] = errorMap[className];
return new ErrorClass(message, ...args);
}

////////////////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////// Async handling //////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

let asyncHandlers = [];

function setAsyncHandler(opId, cb) {
assert(opId != null);
asyncHandlers[opId] = cb;
}

function setAsyncHandlerByName(opName, cb) {
setAsyncHandler(opsCache[opName], cb);
}

function handleAsyncMsgFromRust() {
while (true) {
const opIdBuf = shift();
if (opIdBuf == null) {
break;
}
assert(asyncHandlers[opIdBuf[0]] != null);
asyncHandlers[opIdBuf[0]](opIdBuf[1]);
asyncHandlers[opIdBuf[0]](opIdBuf[1], true);
}

for (let i = 0; i < arguments.length; i += 2) {
asyncHandlers[arguments[i]](arguments[i + 1]);
asyncHandlers[arguments[i]](arguments[i + 1], false);
}
}

function dispatch(opName, control, ...zeroCopy) {
return send(opsCache[opName], control, ...zeroCopy);
}
////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////// General sync & async ops handling ////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

function registerErrorClass(errorName, className, args) {
if (typeof errorMap[errorName] !== "undefined") {
throw new TypeError(`Error class for "${errorName}" already registered`);
let nextRequestId = 1;
const promiseTable = {};

function asyncHandle(u8Array, isCopyNeeded, opResultParser) {
const [requestId, result, error] = opResultParser(u8Array, isCopyNeeded);
if (error !== null) {
promiseTable[requestId][1](error);
} else {
promiseTable[requestId][0](result);
}
errorMap[errorName] = [className, args ?? []];
delete promiseTable[requestId];
}

function getErrorClassAndArgs(errorName) {
return errorMap[errorName] ?? [undefined, []];
function opAsync(opName, opRequestBuilder, opResultParser) {
const opId = opsCache[opName];
// Make sure requests of this type are handled by the asyncHandler
// The asyncHandler's role is to call the "promiseTable[requestId]" function
if (typeof asyncHandlers[opId] === "undefined") {
asyncHandlers[opId] = (buffer, isCopyNeeded) =>
asyncHandle(buffer, isCopyNeeded, opResultParser);
}

const requestId = nextRequestId++;

// Create and store promise
const promise = new Promise((resolve, reject) => {
promiseTable[requestId] = [resolve, reject];
});

// Synchronously dispatch async request
core.dispatch(opId, ...opRequestBuilder(requestId));

// Wait for async response
return promise;
}

// Returns Uint8Array
function encodeJson(args) {
const s = JSON.stringify(args);
return core.encode(s);
function opSync(opName, opRequestBuilder, opResultParser) {
const opId = opsCache[opName];
const u8Array = core.dispatch(opId, ...opRequestBuilder());

const [_, result, error] = opResultParser(u8Array, false);
if (error !== null) throw error;
return result;
}

function decodeJson(ui8) {
const s = core.decode(ui8);
return JSON.parse(s);
////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////// Bin ops handling /////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

const binRequestHeaderByteLength = 8 + 4;
const scratchBuffer = new ArrayBuffer(binRequestHeaderByteLength);
const scratchView = new DataView(scratchBuffer);

function binOpBuildRequest(requestId, argument, zeroCopy) {
scratchView.setBigUint64(0, BigInt(requestId), true);
scratchView.setUint32(8, argument, true);
return [scratchView, ...zeroCopy];
}

let nextPromiseId = 1;
const promiseTable = {};
function binOpParseResult(u8Array, isCopyNeeded) {
// Decode header value from u8Array
const headerByteLength = 8 + 2 * 4;
assert(u8Array.byteLength >= headerByteLength);
assert(u8Array.byteLength % 4 == 0);
const view = new DataView(
u8Array.buffer,
u8Array.byteOffset + u8Array.byteLength - headerByteLength,
headerByteLength,
);

const requestId = Number(view.getBigUint64(0, true));
const status = view.getUint32(8, true);
const result = view.getUint32(12, true);

// Error handling
if (status !== 0) {
const className = core.decode(u8Array.subarray(0, result));
const message = core.decode(u8Array.subarray(result, -headerByteLength))
.trim();

return [requestId, null, handleError(className, message)];
}

function processResponse(res) {
if ("ok" in res) {
return res.ok;
if (u8Array.byteLength === headerByteLength) {
return [requestId, result, null];
}
const [ErrorClass, args] = getErrorClassAndArgs(res.err.className);
if (!ErrorClass) {
throw new Error(
`Unregistered error class: "${res.err.className}"\n ${res.err.message}\n Classes of errors returned from ops should be registered via Deno.core.registerErrorClass().`,
);

// Rest of response buffer is passed as reference or as a copy
let respBuffer = null;
if (isCopyNeeded) {
// Copy part of the response array (if sent through shared array buf)
respBuffer = u8Array.slice(0, result);
} else {
// Create view on existing array (if sent through overflow)
respBuffer = u8Array.subarray(0, result);
}
throw new ErrorClass(res.err.message, ...args);

return [requestId, respBuffer, null];
}

async function jsonOpAsync(opName, args = null, ...zeroCopy) {
setAsyncHandler(opsCache[opName], jsonOpAsyncHandler);

const promiseId = nextPromiseId++;
const reqBuf = core.encode("\0".repeat(8) + JSON.stringify(args));
new DataView(reqBuf.buffer).setBigUint64(0, BigInt(promiseId));
dispatch(opName, reqBuf, ...zeroCopy);
let resolve, reject;
const promise = new Promise((resolve_, reject_) => {
resolve = resolve_;
reject = reject_;
});
promise.resolve = resolve;
promise.reject = reject;
promiseTable[promiseId] = promise;
return processResponse(await promise);
function binOpAsync(opName, argument = 0, ...zeroCopy) {
return opAsync(
opName,
(requestId) => binOpBuildRequest(requestId, argument, zeroCopy),
binOpParseResult,
);
}

function binOpSync(opName, argument = 0, ...zeroCopy) {
return opSync(
opName,
() => binOpBuildRequest(0, argument, zeroCopy),
binOpParseResult,
);
}

function jsonOpSync(opName, args = null, ...zeroCopy) {
const argsBuf = encodeJson(args);
const res = dispatch(opName, argsBuf, ...zeroCopy);
return processResponse(decodeJson(res));
////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////// Json ops handling ////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////////

const jsonRequestHeaderLength = 8;

function jsonOpBuildRequest(requestId, argument, zeroCopy) {
const u8Array = core.encode(
"\0".repeat(jsonRequestHeaderLength) + JSON.stringify(argument),
);
new DataView(u8Array.buffer).setBigUint64(0, BigInt(requestId), true);
return [u8Array, ...zeroCopy];
}

function jsonOpAsyncHandler(buf) {
// Json Op.
const res = decodeJson(buf);
const promise = promiseTable[res.promiseId];
delete promiseTable[res.promiseId];
promise.resolve(res);
function jsonOpParseResult(u8Array, _) {
const data = JSON.parse(core.decode(u8Array));

if ("err" in data) {
return [
data.requestId,
null,
handleError(data.err.className, data.err.message),
];
}

return [data.requestId, data.ok, null];
}

function jsonOpAsync(opName, argument = null, ...zeroCopy) {
return opAsync(
opName,
(requestId) => jsonOpBuildRequest(requestId, argument, zeroCopy),
jsonOpParseResult,
);
}

function jsonOpSync(opName, argument = null, ...zeroCopy) {
return opSync(
opName,
() => [core.encode(JSON.stringify(argument)), ...zeroCopy],
jsonOpParseResult,
);
}

function resources() {
return jsonOpSync("op_resources");
}

function close(rid) {
jsonOpSync("op_close", { rid });
return jsonOpSync("op_close", { rid });
}

Object.assign(window.Deno.core, {
jsonOpAsync,
jsonOpSync,
setAsyncHandler,
setAsyncHandlerByName,
dispatch: send,
dispatchByName: dispatch,
binOpAsync,
binOpSync,
dispatch,
dispatchByName,
ops,
close,
resources,
registerErrorClass,
getErrorClassAndArgs,
sharedQueueInit: init,
// sharedQueue is private but exposed for testing.
sharedQueue: {
Expand All @@ -279,5 +405,7 @@ SharedQueue Binary Layout
reset,
shift,
},
// setAsyncHandler is private but exposed for testing.
setAsyncHandler,
});
})(this);

0 comments on commit 1251c89

Please sign in to comment.