Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Package: Analytics Engine #399

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6319887
first up
CraigglesO Oct 1, 2022
e6351c5
remove TODOs; adjust readmes
CraigglesO Oct 1, 2022
6eac0a4
migrate to execute sql data inside engine code
CraigglesO Oct 1, 2022
3ae1bfd
parse INTERVAL; add test cases
CraigglesO Oct 2, 2022
da11a07
interval, complete test cases
CraigglesO Oct 2, 2022
b307341
add miniflare api access + tests; drop 'UNIQUE'
CraigglesO Oct 2, 2022
a5d2fee
fix options; all analytics exists inside singular db now
CraigglesO Oct 2, 2022
cc1547d
add jest testings
CraigglesO Oct 2, 2022
5fb6b54
add jest testings
CraigglesO Oct 2, 2022
b2810df
fix get->all; add http testing in minfilare; remove unique
CraigglesO Oct 2, 2022
11b2692
add vitest tests
CraigglesO Oct 2, 2022
13916f2
fix internal problem
CraigglesO Oct 2, 2022
d798479
TODATETIME fix
CraigglesO Oct 2, 2022
9b359ac
support QUANTILEWEIGHTED
CraigglesO Oct 3, 2022
39ed5cf
support QUANTILEWEIGHTED
CraigglesO Oct 3, 2022
7cb5d40
add formatting; minor fixes/adjustments
CraigglesO Oct 3, 2022
556d5eb
import sorting fix
CraigglesO Oct 3, 2022
16b28f0
more error cases
CraigglesO Oct 4, 2022
7abffeb
re-arrange
CraigglesO Oct 4, 2022
8e1e310
writeDataPoint is sync
CraigglesO Oct 4, 2022
44dc3d8
minor fixes
CraigglesO Oct 4, 2022
c25b639
ensure added functions & keywords are working
CraigglesO Oct 5, 2022
5865e9e
temporarily edit npx-import to get passing tests
CraigglesO Oct 6, 2022
62f63d5
pre-add better-sqlite
CraigglesO Oct 6, 2022
55b1d93
tmp move npx-import
CraigglesO Oct 6, 2022
ca5fe4f
ensure TODATETIME sticks to UTC
CraigglesO Oct 6, 2022
aabd9fb
update to latest master
CraigglesO Oct 24, 2022
c9b272d
first fix set
CraigglesO Oct 24, 2022
f93f54e
bug fixes
CraigglesO Oct 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix options; all analytics exists inside singular db now
  • Loading branch information
CraigglesO committed Oct 2, 2022
commit a5d2feed2179cab7555a11c72d7e8f91c1fa124c
2 changes: 1 addition & 1 deletion packages/analytics-engine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ fun, full-featured, fully-local simulator for Cloudflare Workers. See
import { AnalyticsEngine } from "@miniflare/analytics-engine";
import { createSQLiteDB } from "@miniflare/shared";

const db = new AnalyticsEngine("TEST_BINDING", await createSQLiteDB(":memory:"));
const db = new AnalyticsEngine("DATASET_NAME", await createSQLiteDB(":memory:"));

await db.writeDataPoint({
indexes: ["a3cd45"], // Sensor ID
Expand Down
4 changes: 2 additions & 2 deletions packages/analytics-engine/src/analytics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ CREATE TABLE IF NOT EXISTS {{BINDING}} (
blob20 TEXT
);

CREATE INDEX IF NOT EXISTS {{BINDING}}_index ON {{BINDING}} (dataset, timestamp);
CREATE INDEX IF NOT EXISTS {{BINDING}}_index ON {{BINDING}} (dataset, index1, timestamp);
CREATE UNIQUE INDEX IF NOT EXISTS {{BINDING}}_index ON {{BINDING}} (dataset, timestamp);
CREATE UNIQUE INDEX IF NOT EXISTS {{BINDING}}_index ON {{BINDING}} (dataset, index1, timestamp);

COMMIT;
`;
Expand Down
70 changes: 17 additions & 53 deletions packages/analytics-engine/src/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,59 +54,17 @@ interface DataPoint {
blob20?: string | null;
}

type Doubles =
| "double1"
| "double2"
| "double3"
| "double4"
| "double5"
| "double6"
| "double7"
| "double8"
| "double9"
| "double10"
| "double11"
| "double12"
| "double13"
| "double14"
| "double15"
| "double16"
| "double17"
| "double18"
| "double19"
| "double20";

type Blobs =
| "blob1"
| "blob2"
| "blob3"
| "blob4"
| "blob5"
| "blob6"
| "blob7"
| "blob8"
| "blob9"
| "blob10"
| "blob11"
| "blob12"
| "blob13"
| "blob14"
| "blob15"
| "blob16"
| "blob17"
| "blob18"
| "blob19"
| "blob20";
export const kQuery = Symbol("kQuery");

export class AnalyticsEngine {
readonly #name: string;
readonly #dataset: string;
readonly #db: SqliteDB;
#decoder = new TextDecoder();

constructor(name: string, db: SqliteDB) {
this.#name = name;
constructor(dataset: string, db: SqliteDB) {
this.#dataset = dataset;
this.#db = db;
db.exec(analytics.replaceAll("{{BINDING}}", name));
db.exec(analytics.replaceAll("{{BINDING}}", dataset));
buildSQLFunctions(db);
}

Expand Down Expand Up @@ -146,30 +104,30 @@ export class AnalyticsEngine {
}
// prep insert
const insertData: DataPoint = {
dataset: this.#name,
dataset: this.#dataset,
index1: indexes[0],
};
// prep doubles
const doublesKeys: string[] = [];
const doublesValues: string[] = [];
doubles.forEach((double, i) => {
const key = `double${i + 1}` as Doubles;
insertData[key] = double;
const key = `double${i + 1}` as keyof DataPoint;
(insertData[key] as any) = double;
doublesKeys.push(key);
doublesValues.push(`@${key}`);
});
// prep blobs
const blobsKeys: string[] = [];
const blobsValues: string[] = [];
_blobs.forEach((blob, i) => {
const key = `blob${i + 1}` as Blobs;
insertData[key] = blob;
const key = `blob${i + 1}` as keyof DataPoint;
(insertData[key] as any) = blob;
blobsKeys.push(key);
blobsValues.push(`@${key}`);
});

const input = prepare(
`INSERT INTO ${this.#name} (dataset, index1${
`INSERT INTO ${this.#dataset} (dataset, index1${
doublesKeys.length > 0 ? `, ${doublesKeys}` : ""
}${
blobsKeys.length > 0 ? `, ${blobsKeys}` : ""
Expand All @@ -181,6 +139,12 @@ export class AnalyticsEngine {

insert.run(insertData);
}

async [kQuery](input: string): Promise<any> {
const query = this.#db.prepare(prepare(input));

return query.get();
}
}

/** @internal */
Expand Down
60 changes: 47 additions & 13 deletions packages/analytics-engine/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import {
StorageFactory,
resolveStoragePersist,
} from "@miniflare/shared";
import type { SqliteDB } from "@miniflare/shared";
import { AnalyticsEngine } from "./engine";

export type ProcessedAnalyticsEngine = Record<string, string>; // { [name]: dataset }
CraigglesO marked this conversation as resolved.
Show resolved Hide resolved

export interface AnalyticsEngineOptions {
analyticsEngines?: string[];
analyticsEngines?: ProcessedAnalyticsEngine;
aePersist?: boolean | string;
}

Expand All @@ -20,14 +23,25 @@ export class AnalyticsEnginePlugin
implements AnalyticsEngineOptions
{
@Option({
type: OptionType.ARRAY,
name: "analyticsEngine",
description: "Analytics Engine namespace to bind",
logName: "Analytics Engine Namespaces",
fromWrangler: ({ analytics_engines }) =>
analytics_engines?.map(({ binding }) => binding),
type: OptionType.OBJECT,
typeFormat: "NAME=DATASET",
name: "ae",
alias: "a",
description: "Analytics Engine to bind",
logName: "Analytics Engine Names",
fromEntries: (entries) =>
Object.fromEntries(
entries.map(([name, datasetName]) => {
return [name, datasetName];
})
),
fromWrangler: ({ bindings }) =>
bindings?.reduce((objects, { type, name, dataset }) => {
if (type === "analytics_engine") objects[name] = dataset;
return objects;
}, {} as ProcessedAnalyticsEngine),
})
analyticsEngines?: string[];
analyticsEngines?: ProcessedAnalyticsEngine;

@Option({
type: OptionType.BOOLEAN_STRING,
CraigglesO marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -38,6 +52,8 @@ export class AnalyticsEnginePlugin
aePersist?: boolean | string;
CraigglesO marked this conversation as resolved.
Show resolved Hide resolved
readonly #persist?: boolean | string;

#db?: SqliteDB;

constructor(ctx: PluginContext, options?: AnalyticsEngineOptions) {
super(ctx);
this.assignOptions(options);
Expand All @@ -46,17 +62,35 @@ export class AnalyticsEnginePlugin

async getAnalyticsEngine(
storageFactory: StorageFactory,
dbName: string
name: string
): Promise<AnalyticsEngine> {
const storage = storageFactory.storage(dbName, this.#persist);
return new AnalyticsEngine(dbName, await storage.getSqliteDatabase());
const dataset = this.analyticsEngines?.[name];
if (dataset === undefined) {
throw new Error(`Analytics Engine "${name}" does not exist.`);
}
await this.#setup(storageFactory);
// @ts-expect-error: #setup already ensures #db exists.
return new AnalyticsEngine(dataset, this.#db);
}

async setup(storageFactory: StorageFactory): Promise<SetupResult> {
await this.#setup(storageFactory);
const bindings: Context = {};
for (const dbName of this.analyticsEngines ?? []) {
bindings[dbName] = await this.getAnalyticsEngine(storageFactory, dbName);
for (const name of Object.keys(this.analyticsEngines ?? {})) {
bindings[name] = await this.getAnalyticsEngine(storageFactory, name);
}
return { bindings };
}

async #setup(storageFactory: StorageFactory): Promise<void> {
if (this.#db === undefined) {
// grab storage
const storage = storageFactory.storage(
"__MINIFLARE_ANALYTICS_ENGINE_STORAGE__",
CraigglesO marked this conversation as resolved.
Show resolved Hide resolved
this.#persist
);
// setup db
this.#db = await storage.getSqliteDatabase();
}
}
}
20 changes: 10 additions & 10 deletions packages/analytics-engine/test/engine.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const test = anyTest as TestInterface<Context>;
test.beforeEach(async (t) => {
const storage = new MemoryStorage(undefined, testClock);
const db = new AnalyticsEngine(
"TEST_BINDING",
"TEST_DATASET",
await storage.getSqliteDatabase()
);
t.context = { storage, db };
Expand All @@ -32,12 +32,12 @@ test("Analytics Engine: Write a data point using indexes, blobs, and doubles.",
});

// grab data from sqliteDB
const stmt = sqliteDB.prepare("SELECT * FROM TEST_BINDING WHERE index1 = ?");
const stmt = sqliteDB.prepare("SELECT * FROM TEST_DATASET WHERE index1 = ?");
const res = stmt.get("t1");
t.true(typeof res.timestamp === "string");
delete res.timestamp;
t.deepEqual(res, {
dataset: "TEST_BINDING",
dataset: "TEST_DATASET",
index1: "t1",
_sample_interval: 1,
blob1: "a",
Expand Down Expand Up @@ -91,12 +91,12 @@ test("Analytics Engine: Write a data point with no data provided.", async (t) =>
await db.writeDataPoint({});

// grab data from sqliteDB
const stmt = sqliteDB.prepare("SELECT * FROM TEST_BINDING");
const stmt = sqliteDB.prepare("SELECT * FROM TEST_DATASET");
const res = stmt.get();
t.true(typeof res.timestamp === "string");
delete res.timestamp;
t.deepEqual(res, {
dataset: "TEST_BINDING",
dataset: "TEST_DATASET",
index1: null,
_sample_interval: 1,
blob1: null,
Expand Down Expand Up @@ -177,12 +177,12 @@ test("Analytics Engine: Write a data point filling indexes, blobs, and doubles."
});

// grab data from sqliteDB
const stmt = sqliteDB.prepare("SELECT * FROM TEST_BINDING WHERE index1 = ?");
const stmt = sqliteDB.prepare("SELECT * FROM TEST_DATASET WHERE index1 = ?");
const res = stmt.get("t1");
t.true(typeof res.timestamp === "string");
delete res.timestamp;
t.deepEqual(res, {
dataset: "TEST_BINDING",
dataset: "TEST_DATASET",
index1: "t1",
_sample_interval: 1,
blob1: "a",
Expand Down Expand Up @@ -241,7 +241,7 @@ test("Analytics Engine: Store AB", async (t) => {
});

const stmt = sqliteDB.prepare(
"SELECT blob1 FROM TEST_BINDING WHERE index1 = ?"
"SELECT blob1 FROM TEST_DATASET WHERE index1 = ?"
);
const res = stmt.get("t1");
t.is(res.blob1, "test string");
Expand All @@ -263,7 +263,7 @@ test("Analytics Engine: Minimal example test.", async (t) => {

// grab data from sqliteDB
const stmt = sqliteDB.prepare(
"SELECT blob1 AS city, SUM(_sample_interval * double1) / SUM(_sample_interval) AS avg_humidity FROM TEST_BINDING WHERE double1 > 0 GROUP BY city ORDER BY avg_humidity DESC LIMIT 10"
"SELECT blob1 AS city, SUM(_sample_interval * double1) / SUM(_sample_interval) AS avg_humidity FROM TEST_DATASET WHERE double1 > 0 GROUP BY city ORDER BY avg_humidity DESC LIMIT 10"
);
const res = stmt.get();
delete res.timestamp;
Expand All @@ -274,7 +274,7 @@ test("Analytics Engine: Minimal example test.", async (t) => {

// USING TIME SERIES DATA
const stmt2 = sqliteDB.prepare(
"SELECT intDiv(toUInt32(timestamp), 300) * 300 AS t, blob1 AS city, SUM(_sample_interval * double1) / SUM(_sample_interval) AS avg_humidity FROM TEST_BINDING WHERE timestamp >= NOW() AND double1 > 0 GROUP BY t, city ORDER BY t, avg_humidity DESC"
"SELECT intDiv(toUInt32(timestamp), 300) * 300 AS t, blob1 AS city, SUM(_sample_interval * double1) / SUM(_sample_interval) AS avg_humidity FROM TEST_DATASET WHERE timestamp >= NOW() AND double1 > 0 GROUP BY t, city ORDER BY t, avg_humidity DESC"
);
const res2 = stmt2.get();
// console.log("res2", res2);
Expand Down
Loading