Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Package: Analytics Engine #399

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6319887
first up
CraigglesO Oct 1, 2022
e6351c5
remove TODOs; adjust readmes
CraigglesO Oct 1, 2022
6eac0a4
migrate to execute sql data inside engine code
CraigglesO Oct 1, 2022
3ae1bfd
parse INTERVAL; add test cases
CraigglesO Oct 2, 2022
da11a07
interval, complete test cases
CraigglesO Oct 2, 2022
b307341
add miniflare api access + tests; drop 'UNIQUE'
CraigglesO Oct 2, 2022
a5d2fee
fix options; all analytics exists inside singular db now
CraigglesO Oct 2, 2022
cc1547d
add jest testings
CraigglesO Oct 2, 2022
5fb6b54
add jest testings
CraigglesO Oct 2, 2022
b2810df
fix get->all; add http testing in minfilare; remove unique
CraigglesO Oct 2, 2022
11b2692
add vitest tests
CraigglesO Oct 2, 2022
13916f2
fix internal problem
CraigglesO Oct 2, 2022
d798479
TODATETIME fix
CraigglesO Oct 2, 2022
9b359ac
support QUANTILEWEIGHTED
CraigglesO Oct 3, 2022
39ed5cf
support QUANTILEWEIGHTED
CraigglesO Oct 3, 2022
7cb5d40
add formatting; minor fixes/adjustments
CraigglesO Oct 3, 2022
556d5eb
import sorting fix
CraigglesO Oct 3, 2022
16b28f0
more error cases
CraigglesO Oct 4, 2022
7abffeb
re-arrange
CraigglesO Oct 4, 2022
8e1e310
writeDataPoint is sync
CraigglesO Oct 4, 2022
44dc3d8
minor fixes
CraigglesO Oct 4, 2022
c25b639
ensure added functions & keywords are working
CraigglesO Oct 5, 2022
5865e9e
temporarily edit npx-import to get passing tests
CraigglesO Oct 6, 2022
62f63d5
pre-add better-sqlite
CraigglesO Oct 6, 2022
55b1d93
tmp move npx-import
CraigglesO Oct 6, 2022
ca5fe4f
ensure TODATETIME sticks to UTC
CraigglesO Oct 6, 2022
aabd9fb
update to latest master
CraigglesO Oct 24, 2022
c9b272d
first fix set
CraigglesO Oct 24, 2022
f93f54e
bug fixes
CraigglesO Oct 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add formatting; minor fixes/adjustments
  • Loading branch information
CraigglesO committed Oct 3, 2022
commit 7cb5d4043aa8976d6bfff7299970c8f161c69cda
112 changes: 98 additions & 14 deletions packages/analytics-engine/src/engine.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { TextDecoder } from "util";
import type { SqliteDB } from "@miniflare/shared";
import analytics from "./analytics";
import buildSQLFunctions from "./functions";
import buildSQLFunctions, { isDate } from "./functions";

export type Format = "JSON" | "JSONEachRow" | "TabSeparated";

export type MetaType = "DateTime" | "String" | "Float64";

export interface AnalyticsEngineEvent {
readonly doubles?: number[]; // up to 20
Expand All @@ -11,7 +15,7 @@ export interface AnalyticsEngineEvent {

interface DataPoint {
dataset: string;
index1: string;
index1?: string;
double1?: number;
double2?: number;
double3?: number;
Expand Down Expand Up @@ -54,7 +58,15 @@ interface DataPoint {
blob20?: string | null;
}

export const kQuery = Symbol("kQuery");
export interface ResponseData {
[key: string]: number | string;
}

export interface FormatJSON {
meta: { [key: string]: MetaType };
data: ResponseData[];
rows: number;
}

export class AnalyticsEngine {
readonly #dataset: string;
Expand Down Expand Up @@ -126,7 +138,7 @@ export class AnalyticsEngine {
blobsValues.push(`@${key}`);
});

const input = _prepare(
const [input] = _prepare(
`INSERT INTO ${this.#dataset} (dataset, index1${
doublesKeys.length > 0 ? `, ${doublesKeys}` : ""
}${
Expand All @@ -139,16 +151,10 @@ export class AnalyticsEngine {

insert.run(insertData);
}

async [kQuery](input: string): Promise<any> {
const query = this.#db.prepare(_prepare(input));

return query.get();
}
}

/** @internal */
export function _prepare(input: string): string {
export function _prepare(input: string): [string, Format | undefined] {
// split
const pieces = input
.replaceAll("\n", " ") // convert new lines to spaces
Expand All @@ -160,13 +166,16 @@ export function _prepare(input: string): string {
// find all instances of "INTERVAL" and "QUANTILEWEIGHTED"
const intervalIndexes = [];
const quantileweigthedIndexes = [];
let formatIndex = -1;
for (let i = 0, pl = pieces.length; i < pl; i++) {
if (pieces[i].toLocaleLowerCase() === "interval") {
const piece = pieces[i].toLocaleLowerCase();
if (piece === "interval") {
intervalIndexes.push(i);
}
if (pieces[i].toLocaleLowerCase().includes("quantileweighted")) {
if (piece.includes("quantileweighted")) {
quantileweigthedIndexes.push(i);
}
if (piece === "format") formatIndex = i;
}

// for each instance, convert "INTERVAL X Y" to "INTERVAL(X, Y)"
Expand All @@ -182,9 +191,84 @@ export function _prepare(input: string): string {

// for each instance of quantileweighted, seperately aggregate columns;
for (const qwIndex of quantileweigthedIndexes) {
// What if "quantileweighted (", the space between the two could cause an error
// adjust to "quantileweighted("
if (pieces[qwIndex + 1] === "(") {
pieces.splice(qwIndex + 1, 1);
pieces[qwIndex] = `${pieces[qwIndex]}(`;
}
pieces[qwIndex + 3] = `__GET_QUANTILE_GROUP(${pieces[qwIndex + 3]})`;
pieces[qwIndex + 5] = `__GET_QUANTILE_GROUP(${pieces[qwIndex + 5]})`;
}

return pieces.join(" ");
// if FORMAT exists, grab type and remove it
let formatType: Format = "JSON";
if (formatIndex >= 0) {
// change to new type. revert to JSON if unknown name
formatType = pieces[formatIndex + 1] as Format;
if (
formatType !== "JSON" &&
formatType !== "JSONEachRow" &&
formatType !== "TabSeparated"
) {
formatType = "JSON";
}
// remove from string
pieces.splice(formatIndex, 1);
pieces.splice(formatIndex, 1);
}

return [pieces.join(" "), formatType];
}

/** @internal */
export function _format(
data: ResponseData[] = [],
format: Format = "JSON"
): string | FormatJSON {
if (format === "JSON") return _formatJSON(data);
else if (format === "JSONEachRow") return _formatJSONEachRow(data);
else return _formatTabSeparated(data);
}

function _formatJSON(data: ResponseData[]): FormatJSON {
const meta: { [key: string]: MetaType } = {};
// incase one of the data points might have a null value but another might not
for (const point of data) {
for (const [key, value] of Object.entries(point)) {
if (value !== null) meta[key] = _getType(value);
}
}

return {
meta,
data,
rows: data.length,
};
}

function _formatJSONEachRow(data: ResponseData[]): string {
let res = "";

for (const point of data) {
res += `${JSON.stringify(point)}\n`;
}

return res;
}

function _formatTabSeparated(data: ResponseData[]): string {
let res = "";

for (const point of data) {
res += `${Object.values(point).join("\t")}\n`;
}

return res;
}

function _getType(input: number | string): MetaType {
if (typeof input === "number") return "Float64";
if (isDate(input)) return "DateTime";
return "String";
}
13 changes: 12 additions & 1 deletion packages/analytics-engine/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
resolveStoragePersist,
} from "@miniflare/shared";
import type { SqliteDB } from "@miniflare/shared";
import { AnalyticsEngine } from "./engine";
import { AnalyticsEngine, FormatJSON, _prepare, _format } from "./engine";

export type ProcessedAnalyticsEngine = Record<string, string>; // { [name]: dataset }
CraigglesO marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -88,6 +88,17 @@ export class AnalyticsEnginePlugin
return this.#db;
}

async query(
storageFactory: StorageFactory,
input: string
): Promise<string | FormatJSON> {
await this.#setup(storageFactory);
const [query, format] = _prepare(input);
// @ts-expect-error: #setup already ensures #db exists.
const data = this.#db.prepare(query).all();
return _format(data, format);
}

async #setup(storageFactory: StorageFactory): Promise<void> {
if (this.#db === undefined) {
// grab storage
Expand Down
101 changes: 100 additions & 1 deletion packages/analytics-engine/test/engine.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { AnalyticsEngine } from "@miniflare/analytics-engine";
import {
AnalyticsEngine,
_prepare,
_format,
} from "@miniflare/analytics-engine";
import { Storage } from "@miniflare/shared";
import { testClock } from "@miniflare/shared-test";
import { MemoryStorage } from "@miniflare/storage-memory";
Expand Down Expand Up @@ -376,3 +380,98 @@ test("Analytics Engine: More than 50kB of blob data fails.", async (t) => {
}
);
});

test("Analytics Engine: Test FORMAT JSON.", async (t) => {
const { db, storage } = t.context;
// @ts-expect-error: protected but does exist
const { sqliteDB } = storage;

await db.writeDataPoint({
indexes: ["formatJSON"], // Sensor ID
blobs: ["input"],
doubles: [0, 3],
});

const [query, format] = _prepare(
"SELECT double2 AS answer FROM TEST_DATASET WHERE index1 = ? FORMAT JSON"
);

const stmt = sqliteDB.prepare(query);
const res = stmt.all("formatJSON");
t.deepEqual(res, [{ answer: 3 }]);
t.is(format, "JSON");

// JSON format
const formatedRes = _format(res, "JSON");
t.deepEqual(formatedRes, {
meta: { answer: "Float64" },
data: [{ answer: 3 }],
rows: 1,
});
});

test("Analytics Engine: Test FORMAT JSONEachRow.", async (t) => {
const { db, storage } = t.context;
// @ts-expect-error: protected but does exist
const { sqliteDB } = storage;

await db.writeDataPoint({
indexes: ["formatJSONEachRow"], // Sensor ID
blobs: ["input"],
doubles: [0, 1],
});
await db.writeDataPoint({
indexes: ["formatJSONEachRow"], // Sensor ID
blobs: ["input"],
doubles: [2, 3],
});

const [query, format] = _prepare(
"SELECT double1, double2 FROM TEST_DATASET WHERE index1 = ? FORMAT JSONEachRow"
);

const stmt = sqliteDB.prepare(query);
const res = stmt.all("formatJSONEachRow");
t.deepEqual(res, [
{ double1: 0, double2: 1 },
{ double1: 2, double2: 3 },
]);
t.is(format, "JSONEachRow");

// JSON format
const formatedRes = _format(res, "JSONEachRow");
t.is(formatedRes, `{"double1":0,"double2":1}\n{"double1":2,"double2":3}\n`);
});

test("Analytics Engine: Test FORMAT TabSeparated.", async (t) => {
const { db, storage } = t.context;
// @ts-expect-error: protected but does exist
const { sqliteDB } = storage;

await db.writeDataPoint({
indexes: ["formatTabSeparated"], // Sensor ID
blobs: ["input"],
doubles: [0, 1],
});
await db.writeDataPoint({
indexes: ["formatTabSeparated"], // Sensor ID
blobs: ["input"],
doubles: [2, 3],
});

const [query, format] = _prepare(
"SELECT double1, double2 FROM TEST_DATASET WHERE index1 = ? FORMAT TabSeparated"
);

const stmt = sqliteDB.prepare(query);
const res = stmt.all("formatTabSeparated");
t.deepEqual(res, [
{ double1: 0, double2: 1 },
{ double1: 2, double2: 3 },
]);
t.is(format, "TabSeparated");

// JSON format
const formatedRes = _format(res, "TabSeparated");
t.is(formatedRes, `0\t1\n2\t3\n`);
});
Loading