Skip to content

Commit

Permalink
Improve robustness of fallback mode
Browse files Browse the repository at this point in the history
  • Loading branch information
jlongster committed Aug 18, 2021
1 parent 76821b9 commit 33f9898
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 71 deletions.
158 changes: 91 additions & 67 deletions src/indexeddb/file-ops-fallback.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ async function openDb(name) {
});
}

export class FileOpsFallback {
constructor(filename) {
this.filename = filename;
this.dbName = this.filename.replace(/\//g, '-');
this.cachedFirstBlock = null;
this.blocks = new Map();
this.writeQueue = [];
this.lockType = 0;
// Using a separate class makes it easier to follow the code, and
// importantly it removes any reliance on internal state in
// `FileOpsFallback`. That would be problematic since these method
// happen async; the args to `write` must be closed over so they don't
// change
class Persistance {
constructor() {
this._openDb = null;
}

async getDb() {
Expand All @@ -57,16 +57,17 @@ export class FileOpsFallback {
}
}

async readIfFallback() {
// OK We need to fix this better - we don't block on the writes
// being flushed from closing the file, and we can't read in
// everything here because we might get old data. Need to track
// the last write and force it to be sequential
if (this.blocks.size > 0) {
return;
}
// Both `readAll` and `write` rely on IndexedDB transactional
// semantics to work, otherwise we'd have to coordinate them. If
// there are pending writes, the `readonly` transaction in `readAll`
// will block until they are all flushed out. If `write` is called
// multiple times, `readwrite` transactions can only run one at a
// time so it will naturally apply the writes sequentially (and
// atomically)

async readAll() {
let db = await this.getDb(this.dbName);
let blocks = new Map();

let trans = db.transaction(['data'], 'readonly');
let store = trans.objectStore('data');
Expand All @@ -78,33 +79,25 @@ export class FileOpsFallback {
req.onsuccess = e => {
let cursor = e.target.result;
if (cursor) {
this.blocks.set(cursor.key, cursor.value);
console.log('reading', cursor.key);
blocks.set(cursor.key, cursor.value);
cursor.continue();
} else {
resolve(this.readMeta());
console.log(
'read all',
blocks.get(-1),
new Uint8Array(blocks.get(0))
);
resolve(blocks);
}
};
});
}

queueWrite(key, value) {
this.writeQueue.push({ key, value });
}

// We need a snapshot of the current write + state in which it was
// written. We do writes async, so we can't check this state over
// time because it may change from underneath us
prepareFlush() {
let writeState = {
cachedFirstBlock: this.cachedFirstBlock,
writes: this.writeQueue,
lockType: this.lockType
};
this.writeQueue = [];
return writeState;
}
async write(writes, cachedFirstBlock, hasLocked) {
let db = await this.getDb(this.dbName);
console.log('fallback: writing');

async flushWriteState(db, writeState) {
// We need grab a readwrite lock on the db, and then read to check
// to make sure we can write to it
let trans = db.transaction(['data'], 'readwrite');
Expand All @@ -113,8 +106,8 @@ export class FileOpsFallback {
await new Promise((resolve, reject) => {
let req = store.get(0);
req.onsuccess = e => {
if (writeState.lockType > LOCK_TYPES.NONE) {
if (!isSafeToWrite(req.result, writeState.cachedFirstBlock)) {
if (hasLocked) {
if (!isSafeToWrite(req.result, cachedFirstBlock)) {
// TODO: We need to send a message to users somehow
console.log("OH NO WE CAN'T WRITE");
reject('screwed');
Expand All @@ -123,11 +116,13 @@ export class FileOpsFallback {
}

// Flush all the writes
for (let write of writeState.writes) {
console.log('flushing writes', writes.length);
for (let write of writes) {
store.put(write.value, write.key);
}

trans.onsuccess = () => {
console.log('done writing');
resolve();
};
trans.onerror = () => {
Expand All @@ -138,6 +133,27 @@ export class FileOpsFallback {
req.onerror = reject;
});
}
}

export class FileOpsFallback {
constructor(filename) {
this.filename = filename;
this.dbName = this.filename.replace(/\//g, '-');
this.cachedFirstBlock = null;
this.writeQueue = null;
this.blocks = new Map();
this.lockType = 0;
this.transferBlockOwnership = false;

this.persistance = new Persistance();
}

async readIfFallback() {
this.transferBlockOwnership = true;
this.blocks = await this.persistance.readAll();

return this.readMeta();
}

lock(lockType) {
// Locks always succeed here. Essentially we're only working
Expand All @@ -151,11 +167,11 @@ export class FileOpsFallback {

unlock(lockType) {
if (this.lockType > LOCK_TYPES.SHARED && lockType === LOCK_TYPES.SHARED) {
// Downgrading the lock from a write lock to a read lock. This
// is where we actually flush out all the writes async if
// possible
let writeState = this.prepareFlush();
this.getDb(this.dbName).then(db => this.flushWriteState(db, writeState));
// Within a write lock, we delay all writes until the end of the
// lock. We probably don't have to do this since we already
// delay writes until an `fsync`, however this is an extra
// measure to make sure we are writing everything atomically
this.flush();
}
this.lockType = lockType;
return true;
Expand All @@ -169,27 +185,21 @@ export class FileOpsFallback {
req.onsuccess = () => {};
}

open() {}
open() {
this.writeQueue = [];
this.lockType = 0;
}

close() {
// Clear out the in-memory data in close (it will have to be fully
// read in before opening again)
// this.buffer = null;

if (this._openDb) {
// The order is important here: we want to flush out any pending
// writes, and we expect the db to open. We use that and then
// immediately close it, but since we are going to close it we
// don't want anything else to use that db connection. So we
// clear it out and then close it later
let db = this._openDb;
this._openDb = null;
this.flush();

let writeState = this.prepareFlush();
this.flushWriteState(db, writeState).then(() => {
db.close();
});
if (this.transferBlockOwnership) {
this.transferBlockOwnership = false;
} else {
this.blocks = new Map();
}

this.persistance.closeDb();
}

readMeta() {
Expand All @@ -211,13 +221,6 @@ export class FileOpsFallback {
}

readBlocks(positions, blockSize) {
if (this.blocks == null) {
throw new Error(
"File was opened, but not read yet. This environment doesn't support SharedArrayWorker, " +
'so you must use `readIfFallback` to read the whole file into memory first'
);
}

let res = [];
for (let pos of positions) {
res.push({
Expand All @@ -234,5 +237,26 @@ export class FileOpsFallback {
this.blocks.set(key, write.data);
this.queueWrite(key, write.data);
}

// No write lock; flush them out immediately
if (this.lockType <= LOCK_TYPES.SHARED) {
this.flush();
}
}

queueWrite(key, value) {
this.writeQueue.push({ key, value });
}

flush() {
if (this.writeQueue.length > 0) {
this.persistance.write(
this.writeQueue,
this.cachedFirstBlock,
this.lockType > LOCK_TYPES.SHARED
);
this.writeQueue = [];
}
this.cachedFirstBlock = null;
}
}
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4774,10 +4774,10 @@ [email protected]:
resolved "https://registry.yarnpkg.com/path-to-regexp/-/path-to-regexp-0.1.7.tgz#df604178005f522f15eb4490e7247a1bfaa67f8c"
integrity sha1-32BBeABfUi8V60SQ5yR6G/qmf4w=

perf-deets@^1.0.12:
version "1.0.12"
resolved "https://registry.yarnpkg.com/perf-deets/-/perf-deets-1.0.12.tgz#5144c11692f49ebd7891c55989bfbb31c693906a"
integrity sha512-yCMG6M8noEZeKri/+IeTTH3VkXwmPF69Z9bxWK5gbeCyCZUwQ9DXUtL0XytYC1Mge/CJXpA4k0dIpVSZKjqJZA==
perf-deets@^1.0.15:
version "1.0.15"
resolved "https://registry.yarnpkg.com/perf-deets/-/perf-deets-1.0.15.tgz#a766e62b7cf043a0615dd0166122b167c12fd018"
integrity sha512-9BD3d31eaQp2NGCLQGD/ja2COeg2KkQG/pLdPadSTB1oXHr+Ora+bt+hGZK9u8uvZ2mjwQ1eZSikIDgp03R8tA==
dependencies:
"@observablehq/plot" "^0.1.0"

Expand Down

0 comments on commit 33f9898

Please sign in to comment.