Skip to content

Commit

Permalink
feat(unstable): kv.watch() (denoland#21147)
Browse files Browse the repository at this point in the history
This commit adds support for a new `kv.watch()` method that allows
watching for changes to a key-value pair. This is useful for cases
where you want to be notified when a key-value pair changes, but
don't want to have to poll for changes.

---------

Co-authored-by: losfair <[email protected]>
  • Loading branch information
lucacasonato and losfair committed Dec 5, 2023
1 parent a24d3e8 commit 74e39a9
Show file tree
Hide file tree
Showing 10 changed files with 357 additions and 56 deletions.
18 changes: 12 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ test_util = { path = "./test_util" }
deno_lockfile = "0.17.2"
deno_media_type = { version = "0.1.1", features = ["module_specifier"] }

denokv_proto = "0.4.0"
denokv_proto = "0.5.0"
# denokv_sqlite brings in bundled sqlite if we don't disable the default features
denokv_sqlite = { default-features = false, version = "0.4.0" }
denokv_remote = "0.4.0"
denokv_sqlite = { default-features = false, version = "0.5.0" }
denokv_remote = "0.5.0"

# exts
deno_broadcast_channel = { version = "0.120.0", path = "./ext/broadcast_channel" }
Expand Down
44 changes: 44 additions & 0 deletions cli/tests/unit/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2137,3 +2137,47 @@ Deno.test(
// calling [Symbol.dispose] after manual close is a no-op
},
);

dbTest("key watch", async (db) => {
const changeHistory: Deno.KvEntryMaybe<number>[] = [];
const watcher: ReadableStream<Deno.KvEntryMaybe<number>[]> = db.watch<
number[]
>([["key"]]);

const reader = watcher.getReader();
const expectedChanges = 2;

const work = (async () => {
for (let i = 0; i < expectedChanges; i++) {
const message = await reader.read();
if (message.done) {
throw new Error("Unexpected end of stream");
}
changeHistory.push(message.value[0]);
}

await reader.cancel();
})();

while (changeHistory.length !== 1) {
await sleep(100);
}
assertEquals(changeHistory[0], {
key: ["key"],
value: null,
versionstamp: null,
});

const { versionstamp } = await db.set(["key"], 1);
while (changeHistory.length as number !== 2) {
await sleep(100);
}
assertEquals(changeHistory[1], {
key: ["key"],
value: 1,
versionstamp,
});

await work;
await reader.cancel();
});
42 changes: 42 additions & 0 deletions cli/tsc/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2032,6 +2032,48 @@ declare namespace Deno {
*/
atomic(): AtomicOperation;

/**
* Watch for changes to the given keys in the database. The returned stream
* is a {@linkcode ReadableStream} that emits a new value whenever any of
* the watched keys change their versionstamp. The emitted value is an array
* of {@linkcode Deno.KvEntryMaybe} objects, with the same length and order
* as the `keys` array. If no value exists for a given key, the returned
* entry will have a `null` value and versionstamp.
*
* The returned stream does not return every single intermediate state of
* the watched keys, but rather only keeps you up to date with the latest
* state of the keys. This means that if a key is modified multiple times
* quickly, you may not receive a notification for every single change, but
* rather only the latest state of the key.
*
* ```ts
* const db = await Deno.openKv();
*
* const stream = db.watch([["foo"], ["bar"]]);
* for await (const entries of stream) {
* entries[0].key; // ["foo"]
* entries[0].value; // "bar"
* entries[0].versionstamp; // "00000000000000010000"
* entries[1].key; // ["bar"]
* entries[1].value; // null
* entries[1].versionstamp; // null
* }
* ```
*
* The `options` argument can be used to specify additional options for the
* watch operation. The `raw` option can be used to specify whether a new
* value should be emitted whenever a mutation occurs on any of the watched
* keys (even if the value of the key does not change, such as deleting a
* deleted key), or only when entries have observably changed in some way.
* When `raw: true` is used, it is possible for the stream to occasionally
* emit values even if no mutations have occurred on any of the watched
* keys. The default value for this option is `false`.
*/
watch<T extends readonly unknown[]>(
keys: readonly [...{ [K in keyof T]: KvKey }],
options?: { raw?: boolean },
): ReadableStream<{ [K in keyof T]: KvEntryMaybe<T[K]> }>;

/**
* Close the database connection. This will prevent any further operations
* from being performed on the database, and interrupt any in-flight
Expand Down
67 changes: 67 additions & 0 deletions ext/kv/01_db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ const {
SymbolFor,
SymbolToStringTag,
Uint8ArrayPrototype,
Error,
} = globalThis.__bootstrap.primordials;
import { SymbolDispose } from "ext:deno_web/00_infra.js";
import { ReadableStream } from "ext:deno_web/06_streams.js";
const core = Deno.core;
const ops = core.ops;

Expand Down Expand Up @@ -297,6 +299,71 @@ class Kv {
finishMessageOps.clear();
}

watch(keys: Deno.KvKey[], options = {}) {
const raw = options.raw ?? false;
const rid = ops.op_kv_watch(this.#rid, keys);
const lastEntries: (Deno.KvEntryMaybe<unknown> | undefined)[] = Array.from(
{ length: keys.length },
() => undefined,
);
return new ReadableStream({
async pull(controller) {
while (true) {
let updates;
try {
updates = await core.opAsync("op_kv_watch_next", rid);
} catch (err) {
core.tryClose(rid);
controller.error(err);
return;
}
if (updates === null) {
core.tryClose(rid);
controller.close();
return;
}
let changed = false;
for (let i = 0; i < keys.length; i++) {
if (updates[i] === "unchanged") {
if (lastEntries[i] === undefined) {
throw new Error(
"watch: invalid unchanged update (internal error)",
);
}
continue;
}
if (
lastEntries[i] !== undefined &&
(updates[i]?.versionstamp ?? null) ===
lastEntries[i]?.versionstamp
) {
continue;
}
changed = true;
if (updates[i] === null) {
lastEntries[i] = {
key: [...keys[i]],
value: null,
versionstamp: null,
};
} else {
lastEntries[i] = updates[i];
}
}
if (!changed && !raw) continue; // no change
const entries = lastEntries.map((entry) =>
entry.versionstamp === null ? { ...entry } : deserializeValue(entry)
);
controller.enqueue(entries);
return;
}
},
cancel() {
core.tryClose(rid);
},
});
}

close() {
core.close(this.#rid);
}
Expand Down
34 changes: 24 additions & 10 deletions ext/kv/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use deno_core::error::AnyError;
use deno_core::OpState;
use denokv_proto::CommitResult;
use denokv_proto::ReadRangeOutput;
use denokv_proto::WatchStream;

pub struct MultiBackendDbHandler {
backends: Vec<(&'static [&'static str], Box<dyn DynamicDbHandler>)>,
Expand Down Expand Up @@ -55,7 +56,7 @@ impl MultiBackendDbHandler {

#[async_trait(?Send)]
impl DatabaseHandler for MultiBackendDbHandler {
type DB = Box<dyn DynamicDb>;
type DB = RcDynamicDb;

async fn open(
&self,
Expand Down Expand Up @@ -88,12 +89,12 @@ pub trait DynamicDbHandler {
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Box<dyn DynamicDb>, AnyError>;
) -> Result<RcDynamicDb, AnyError>;
}

#[async_trait(?Send)]
impl DatabaseHandler for Box<dyn DynamicDbHandler> {
type DB = Box<dyn DynamicDb>;
type DB = RcDynamicDb;

async fn open(
&self,
Expand All @@ -114,8 +115,8 @@ where
&self,
state: Rc<RefCell<OpState>>,
path: Option<String>,
) -> Result<Box<dyn DynamicDb>, AnyError> {
Ok(Box::new(self.open(state, path).await?))
) -> Result<RcDynamicDb, AnyError> {
Ok(RcDynamicDb(Rc::new(self.open(state, path).await?)))
}
}

Expand All @@ -136,36 +137,45 @@ pub trait DynamicDb {
&self,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError>;

fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream;

fn dyn_close(&self);
}

#[derive(Clone)]
pub struct RcDynamicDb(Rc<dyn DynamicDb>);

#[async_trait(?Send)]
impl Database for Box<dyn DynamicDb> {
impl Database for RcDynamicDb {
type QMH = Box<dyn QueueMessageHandle>;

async fn snapshot_read(
&self,
requests: Vec<ReadRange>,
options: SnapshotReadOptions,
) -> Result<Vec<ReadRangeOutput>, AnyError> {
(**self).dyn_snapshot_read(requests, options).await
(*self.0).dyn_snapshot_read(requests, options).await
}

async fn atomic_write(
&self,
write: AtomicWrite,
) -> Result<Option<CommitResult>, AnyError> {
(**self).dyn_atomic_write(write).await
(*self.0).dyn_atomic_write(write).await
}

async fn dequeue_next_message(
&self,
) -> Result<Option<Box<dyn QueueMessageHandle>>, AnyError> {
(**self).dyn_dequeue_next_message().await
(*self.0).dyn_dequeue_next_message().await
}

fn watch(&self, keys: Vec<Vec<u8>>) -> WatchStream {
(*self.0).dyn_watch(keys)
}

fn close(&self) {
(**self).dyn_close()
(*self.0).dyn_close()
}
}

Expand Down Expand Up @@ -201,6 +211,10 @@ where
)
}

fn dyn_watch(&self, keys: Vec<Vec<u8>>) -> WatchStream {
self.watch(keys)
}

fn dyn_close(&self) {
self.close()
}
Expand Down
Loading

0 comments on commit 74e39a9

Please sign in to comment.