Skip to content

Commit

Permalink
feat(server): add both serial and parallel modes for seed
Browse files Browse the repository at this point in the history
  • Loading branch information
ThatOneBro committed Apr 9, 2024
1 parent 6937194 commit 53c8e88
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 30 deletions.
15 changes: 11 additions & 4 deletions packages/server/src/seed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@ import { NIL as nullUuid, v5 } from 'uuid';
import { bcryptHashPassword } from './auth/utils';
import { getSystemRepo } from './fhir/repo';
import { globalLogger } from './logger';
import { RebuildOptions } from './seeds/common';
import { rebuildR4SearchParameters } from './seeds/searchparameters';
import { rebuildR4StructureDefinitions } from './seeds/structuredefinitions';
import { rebuildR4ValueSets } from './seeds/valuesets';

export const r4ProjectId = v5('R4', nullUuid);

export async function seedDatabase(): Promise<void> {
/**
* Seeds the database with system resources.
*
* @param options - Optional options for seeding the database.
* @returns A Promise that resolves when seeding is done.
*/
export async function seedDatabase(options?: RebuildOptions): Promise<void> {
if (await isSeeded()) {
globalLogger.info('Already seeded');
return;
Expand Down Expand Up @@ -77,7 +84,7 @@ export async function seedDatabase(): Promise<void> {
performance.mark('Starting rebuilds');

performance.mark('Starting rebuildR4StructureDefinitions');
await rebuildR4StructureDefinitions();
await rebuildR4StructureDefinitions({ parallel: true, ...options });
const sdStats = performance.measure(
'Finished rebuildR4StructureDefinitions',
'Starting rebuildR4StructureDefinitions'
Expand All @@ -87,12 +94,12 @@ export async function seedDatabase(): Promise<void> {
});

performance.mark('Starting rebuildR4ValueSets');
await rebuildR4ValueSets();
await rebuildR4ValueSets({ parallel: true, ...options });
const valueSetsStats = performance.measure('Finished rebuildR4ValueSets', 'Starting rebuildR4ValueSets');
globalLogger.info('Finished rebuildR4ValueSets', { duration: `${Math.ceil(valueSetsStats.duration)} ms` });

performance.mark('Starting rebuildR4SearchParameters');
await rebuildR4SearchParameters();
await rebuildR4SearchParameters({ parallel: true, ...options });
const searchParamsStats = performance.measure(
'Finished rebuildR4SearchParameters',
'Starting rebuildR4SearchParameters'
Expand Down
16 changes: 16 additions & 0 deletions packages/server/src/seeds/common.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
export interface RebuildOptions {
/**
* Whether the resources should be created in parallel.
*
* **WARNING: Can be CPU intensive and/or clog up the connection pool.**
*/
parallel: boolean;
}

const defaultOptions = {
parallel: false,
};

export function buildRebuildOptions(options?: Partial<RebuildOptions>): RebuildOptions {
return { ...defaultOptions, ...options };
}
23 changes: 17 additions & 6 deletions packages/server/src/seeds/searchparameters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,34 @@ import { getDatabasePool } from '../database';
import { Repository, getSystemRepo } from '../fhir/repo';
import { globalLogger } from '../logger';
import { r4ProjectId } from '../seed';
import { RebuildOptions, buildRebuildOptions } from './common';

/**
* Creates all SearchParameter resources.
* @param options - Optional options for how rebuild should be done.
*/
export async function rebuildR4SearchParameters(): Promise<void> {
export async function rebuildR4SearchParameters(options?: Partial<RebuildOptions>): Promise<void> {
const finalOptions = buildRebuildOptions(options);
const client = getDatabasePool();
await client.query('DELETE FROM "SearchParameter" WHERE "projectId" = $1', [r4ProjectId]);

const systemRepo = getSystemRepo();

const promises = [];
for (const filename of SEARCH_PARAMETER_BUNDLE_FILES) {
for (const entry of readJson(filename).entry as BundleEntry[]) {
promises.push(createParameter(systemRepo, entry.resource as SearchParameter));
if (finalOptions.parallel) {
const promises = [];
for (const filename of SEARCH_PARAMETER_BUNDLE_FILES) {
for (const entry of readJson(filename).entry as BundleEntry[]) {
promises.push(createParameter(systemRepo, entry.resource as SearchParameter));
}
}
await Promise.all(promises);
} else {
for (const filename of SEARCH_PARAMETER_BUNDLE_FILES) {
for (const entry of readJson(filename).entry as BundleEntry[]) {
await createParameter(systemRepo, entry.resource as SearchParameter);
}
}
}
await Promise.all(promises);
}

async function createParameter(systemRepo: Repository, param: SearchParameter): Promise<void> {
Expand Down
35 changes: 28 additions & 7 deletions packages/server/src/seeds/structuredefinitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,32 @@ import { getDatabasePool } from '../database';
import { Repository, getSystemRepo } from '../fhir/repo';
import { globalLogger } from '../logger';
import { r4ProjectId } from '../seed';
import { RebuildOptions, buildRebuildOptions } from './common';

/**
* Creates all StructureDefinition resources.
* @param options - Optional options for how rebuild should be done.
*/
export async function rebuildR4StructureDefinitions(): Promise<void> {
export async function rebuildR4StructureDefinitions(options?: Partial<RebuildOptions>): Promise<void> {
const finalOptions = buildRebuildOptions(options) as RebuildOptions;
const client = getDatabasePool();
await client.query(`DELETE FROM "StructureDefinition" WHERE "projectId" = $1`, [r4ProjectId]);

const systemRepo = getSystemRepo();
await Promise.all([
createStructureDefinitionsForBundle(systemRepo, readJson('fhir/r4/profiles-resources.json') as Bundle),
createStructureDefinitionsForBundle(systemRepo, readJson('fhir/r4/profiles-medplum.json') as Bundle),
createStructureDefinitionsForBundle(systemRepo, readJson('fhir/r4/profiles-others.json') as Bundle),
]);
if (finalOptions.parallel) {
await Promise.all([
createStructureDefinitionsForBundleParallel(systemRepo, readJson('fhir/r4/profiles-resources.json') as Bundle),
createStructureDefinitionsForBundleParallel(systemRepo, readJson('fhir/r4/profiles-medplum.json') as Bundle),
createStructureDefinitionsForBundleParallel(systemRepo, readJson('fhir/r4/profiles-others.json') as Bundle),
]);
} else {
await createStructureDefinitionsForBundleSerial(systemRepo, readJson('fhir/r4/profiles-resources.json') as Bundle);
await createStructureDefinitionsForBundleSerial(systemRepo, readJson('fhir/r4/profiles-medplum.json') as Bundle);
await createStructureDefinitionsForBundleSerial(systemRepo, readJson('fhir/r4/profiles-others.json') as Bundle);
}
}

async function createStructureDefinitionsForBundle(
async function createStructureDefinitionsForBundleParallel(
systemRepo: Repository,
structureDefinitions: Bundle
): Promise<void> {
Expand All @@ -34,6 +43,18 @@ async function createStructureDefinitionsForBundle(
await Promise.all(promises);
}

async function createStructureDefinitionsForBundleSerial(
systemRepo: Repository,
structureDefinitions: Bundle
): Promise<void> {
for (const entry of structureDefinitions.entry as BundleEntry[]) {
const resource = entry.resource as Resource;
if (resource.resourceType === 'StructureDefinition' && resource.name) {
await createAndLogStructureDefinition(systemRepo, resource);
}
}
}

async function createAndLogStructureDefinition(systemRepo: Repository, resource: StructureDefinition): Promise<void> {
globalLogger.debug('[StructureDefinition] creation started: ' + resource.name);
const result = await systemRepo.createResource<StructureDefinition>({
Expand Down
47 changes: 34 additions & 13 deletions packages/server/src/seeds/valuesets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,38 @@ import { readJson } from '@medplum/definitions';
import { Bundle, BundleEntry, CodeSystem, ValueSet } from '@medplum/fhirtypes';
import { Repository, getSystemRepo } from '../fhir/repo';
import { r4ProjectId } from '../seed';
import { RebuildOptions, buildRebuildOptions } from './common';

/**
* Imports all built-in ValueSets and CodeSystems into the database.
* @param options - Optional options for how rebuild should be done.
*/
export async function rebuildR4ValueSets(): Promise<void> {
export async function rebuildR4ValueSets(options?: Partial<RebuildOptions>): Promise<void> {
const finalOptions = buildRebuildOptions(options) as RebuildOptions;
const systemRepo = getSystemRepo();
const files = ['v2-tables.json', 'v3-codesystems.json', 'valuesets.json', 'valuesets-medplum.json'];
for (const file of files) {
const bundle = readJson('fhir/r4/' + file) as Bundle<CodeSystem | ValueSet>;
const promises = [];
for (const entry of bundle.entry as BundleEntry<CodeSystem | ValueSet>[]) {
promises.push(overwriteResource(systemRepo, entry.resource as CodeSystem | ValueSet));
if (finalOptions.parallel) {
const promises = [];
for (const entry of bundle.entry as BundleEntry<CodeSystem | ValueSet>[]) {
promises.push(overwriteResource(systemRepo, entry.resource as CodeSystem | ValueSet, finalOptions));
}
await Promise.all(promises);
} else {
for (const entry of bundle.entry as BundleEntry<CodeSystem | ValueSet>[]) {
await overwriteResource(systemRepo, entry.resource as CodeSystem | ValueSet, finalOptions);
}
}
await Promise.all(promises);
}
}

async function overwriteResource(systemRepo: Repository, resource: CodeSystem | ValueSet): Promise<void> {
await deleteExisting(systemRepo, resource, r4ProjectId);
async function overwriteResource(
systemRepo: Repository,
resource: CodeSystem | ValueSet,
options: RebuildOptions
): Promise<void> {
await deleteExisting(systemRepo, resource, r4ProjectId, options);
await systemRepo.createResource({
...resource,
meta: {
Expand All @@ -36,7 +49,8 @@ async function overwriteResource(systemRepo: Repository, resource: CodeSystem |
async function deleteExisting(
systemRepo: Repository,
resource: CodeSystem | ValueSet,
projectId: string
projectId: string,
options: RebuildOptions
): Promise<void> {
const bundle = await systemRepo.search({
resourceType: resource.resourceType,
Expand All @@ -45,12 +59,19 @@ async function deleteExisting(
{ code: '_project', operator: Operator.EQUALS, value: projectId },
],
});
const promises = [];
if (bundle.entry && bundle.entry.length > 0) {
for (const entry of bundle.entry) {
const existing = entry.resource as CodeSystem | ValueSet;
promises.push(systemRepo.deleteResource(existing.resourceType, existing.id as string));
if (options.parallel) {
const promises = [];
for (const entry of bundle.entry) {
const existing = entry.resource as CodeSystem | ValueSet;
promises.push(systemRepo.deleteResource(existing.resourceType, existing.id as string));
}
await Promise.all(promises);
} else {
for (const entry of bundle.entry) {
const existing = entry.resource as CodeSystem | ValueSet;
await systemRepo.deleteResource(existing.resourceType, existing.id as string);
}
}
}
await Promise.all(promises);
}

0 comments on commit 53c8e88

Please sign in to comment.