Skip to content

Commit

Permalink
fix: denoland#2033 shared queue push bug
Browse files Browse the repository at this point in the history
  • Loading branch information
keroxp committed Apr 20, 2019
1 parent c8db224 commit 07ad690
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 7 deletions.
48 changes: 42 additions & 6 deletions core/shared_queue.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,25 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.

function debug(...msg) {
// console.log("js:shared_queue:", ...msg)
}

/*
SharedQueue Binary Layout
+-------------------------------+-------------------------------+
| NUM_RECORDS (32) |
+---------------------------------------------------------------+
| NUM_SHIFTED_OFF (32) |
+---------------------------------------------------------------+
| HEAD (32) |
+---------------------------------------------------------------+
| OFFSETS (32) |
+---------------------------------------------------------------+
| RECORD_ENDS (*MAX_RECORDS) ...
+---------------------------------------------------------------+
| RECORDS (*MAX_RECORDS) ...
+---------------------------------------------------------------+
*/
(window => {
const GLOBAL_NAMESPACE = "Deno";
const CORE_NAMESPACE = "core";
Expand All @@ -24,6 +44,7 @@
}

function reset() {
debug("reset");
shared32[INDEX_NUM_RECORDS] = 0;
shared32[INDEX_NUM_SHIFTED_OFF] = 0;
shared32[INDEX_HEAD] = HEAD_INIT;
Expand All @@ -41,6 +62,10 @@
return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF];
}

function numShiftedOff() {
return shared32[INDEX_NUM_SHIFTED_OFF];
}

function setEnd(index, end) {
shared32[INDEX_OFFSETS + index] = end;
}
Expand All @@ -55,7 +80,7 @@

function getOffset(index) {
if (index < numRecords()) {
if (index == 0) {
if (index === 0) {
return HEAD_INIT;
} else {
return shared32[INDEX_OFFSETS + index - 1];
Expand All @@ -69,41 +94,51 @@
let off = head();
let end = off + buf.byteLength;
let index = numRecords();
if (end > shared32.byteLength) {
console.log("shared_queue.ts push fail");
if (end > shared32.byteLength || size() >= MAX_RECORDS) {
debug("push fail");
return false;
}
setEnd(index, end);
assert(end - off == buf.byteLength);
assert(end - off === buf.byteLength);
sharedBytes.set(buf, off);
shared32[INDEX_NUM_RECORDS] += 1;
shared32[INDEX_HEAD] = end;
debug(`push: num_records=${numRecords()}, index_head=${head()}`);
return true;
}

/// Returns null if empty.
function shift() {
let i = shared32[INDEX_NUM_SHIFTED_OFF];
if (size() == 0) {
assert(i == 0);
debug(`shift:begin: num_records=${numRecords()}, shifted_off=${i}`);
if (size() === 0) {
assert(i === 0);
return null;
}

let off = getOffset(i);
let end = getEnd(i);

debug(`shift:going off=${off}, end=${end}, ablen=${sharedBytes.length}`);
if (size() > 1) {
debug(`shift:incr size=${size()}, num_shifted_off=${numShiftedOff()}`);
shared32[INDEX_NUM_SHIFTED_OFF] += 1;
debug(`shift:done num_shifted_off=${numShiftedOff()}`);
} else {
debug(`shift: size=${size()}. will reset`);
reset();
}

assert(off != null);
assert(end != null);
debug(
`shift: num_records=${numRecords()}, num_shifted_off=${numShiftedOff()}`
);
return sharedBytes.subarray(off, end);
}

let asyncHandler;

function setAsyncHandler(cb) {
assert(asyncHandler == null);
asyncHandler = cb;
Expand All @@ -123,6 +158,7 @@
assert(shared.byteLength > 0);
assert(sharedBytes == null);
assert(shared32 == null);
debug(`init: ${shared.byteLength}`);
sharedBytes = new Uint8Array(shared);
shared32 = new Int32Array(shared);
// Callers should not call Deno.core.recv, use setAsyncHandler.
Expand Down
50 changes: 49 additions & 1 deletion core/shared_queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
// Copyright 2018 the Deno authors. All rights reserved. MIT license.
/*
SharedQueue Binary Layout
+-------------------------------+-------------------------------+
| NUM_RECORDS (32) |
+---------------------------------------------------------------+
| NUM_SHIFTED_OFF (32) |
+---------------------------------------------------------------+
| HEAD (32) |
+---------------------------------------------------------------+
| OFFSETS (32) |
+---------------------------------------------------------------+
| RECORD_ENDS (*MAX_RECORDS) ...
+---------------------------------------------------------------+
| RECORDS (*MAX_RECORDS) ...
+---------------------------------------------------------------+
*/
use crate::libdeno::deno_buf;

const MAX_RECORDS: usize = 100;
Expand Down Expand Up @@ -36,6 +52,7 @@ impl SharedQueue {
}

fn reset(&mut self) {
debug!("rust:shared_queue:reset");
let s: &mut [u32] = self.as_u32_slice_mut();
s[INDEX_NUM_RECORDS] = 0;
s[INDEX_NUM_SHIFTED_OFF] = 0;
Expand Down Expand Up @@ -70,6 +87,11 @@ impl SharedQueue {
s[INDEX_NUM_RECORDS] as usize
}

fn num_shifted_off(&self) -> usize {
let s = self.as_u32_slice();
return s[INDEX_NUM_SHIFTED_OFF] as usize;
}

fn head(&self) -> usize {
let s = self.as_u32_slice();
s[INDEX_HEAD] as usize
Expand Down Expand Up @@ -120,14 +142,23 @@ impl SharedQueue {
} else {
self.reset();
}

debug!(
"rust:shared_queue:shift: num_records={}, num_shifted_off={}, head={}",
self.num_records(),
self.num_shifted_off(),
self.head()
);
Some(&self.bytes[off..end])
}

pub fn push(&mut self, record: &[u8]) -> bool {
let off = self.head();
let end = off + record.len();
let index = self.num_records();
if self.size() >= MAX_RECORDS {
debug!("WARNING the sharedQueue full records");
return false;
}
if end > self.bytes.len() {
debug!("WARNING the sharedQueue overflowed");
return false;
Expand All @@ -138,6 +169,12 @@ impl SharedQueue {
let u32_slice = self.as_u32_slice_mut();
u32_slice[INDEX_NUM_RECORDS] += 1;
u32_slice[INDEX_HEAD] = end as u32;
debug!(
"rust:shared_queue:push: num_records={}, num_shifted_off={}, head={}",
self.num_records(),
self.num_shifted_off(),
self.head()
);
true
}
}
Expand Down Expand Up @@ -213,4 +250,15 @@ mod tests {
assert_eq!(q.shift().unwrap().len(), 1);
assert_eq!(q.size(), 0);
}

#[test]
fn full_records() {
let mut q = SharedQueue::new(RECOMMENDED_SIZE);
for _ in 0..MAX_RECORDS {
assert!(q.push(&alloc_buf(1)))
}
assert_eq!(q.push(&alloc_buf(1)), false);
assert_eq!(q.shift().unwrap().len(), 1);
assert_eq!(q.push(&alloc_buf(1)), true);
}
}

0 comments on commit 07ad690

Please sign in to comment.