From 53c8e88efa8abd2f8007c75e43b3e2d96cdd3aa2 Mon Sep 17 00:00:00 2001 From: Derrick Farris Date: Mon, 8 Apr 2024 17:09:38 -0700 Subject: [PATCH] feat(server): add both `serial` and `parallel` modes for seed --- packages/server/src/seed.ts | 15 ++++-- packages/server/src/seeds/common.ts | 16 +++++++ packages/server/src/seeds/searchparameters.ts | 23 ++++++--- .../server/src/seeds/structuredefinitions.ts | 35 +++++++++++--- packages/server/src/seeds/valuesets.ts | 47 ++++++++++++++----- 5 files changed, 106 insertions(+), 30 deletions(-) create mode 100644 packages/server/src/seeds/common.ts diff --git a/packages/server/src/seed.ts b/packages/server/src/seed.ts index 38d60ab5e9..18016632e2 100644 --- a/packages/server/src/seed.ts +++ b/packages/server/src/seed.ts @@ -4,13 +4,20 @@ import { NIL as nullUuid, v5 } from 'uuid'; import { bcryptHashPassword } from './auth/utils'; import { getSystemRepo } from './fhir/repo'; import { globalLogger } from './logger'; +import { RebuildOptions } from './seeds/common'; import { rebuildR4SearchParameters } from './seeds/searchparameters'; import { rebuildR4StructureDefinitions } from './seeds/structuredefinitions'; import { rebuildR4ValueSets } from './seeds/valuesets'; export const r4ProjectId = v5('R4', nullUuid); -export async function seedDatabase(): Promise { +/** + * Seeds the database with system resources. + * + * @param options - Optional options for seeding the database. + * @returns A Promise that resolves when seeding is done. + */ +export async function seedDatabase(options?: RebuildOptions): Promise { if (await isSeeded()) { globalLogger.info('Already seeded'); return; @@ -77,7 +84,7 @@ export async function seedDatabase(): Promise { performance.mark('Starting rebuilds'); performance.mark('Starting rebuildR4StructureDefinitions'); - await rebuildR4StructureDefinitions(); + await rebuildR4StructureDefinitions({ parallel: true, ...options }); const sdStats = performance.measure( 'Finished rebuildR4StructureDefinitions', 'Starting rebuildR4StructureDefinitions' @@ -87,12 +94,12 @@ export async function seedDatabase(): Promise { }); performance.mark('Starting rebuildR4ValueSets'); - await rebuildR4ValueSets(); + await rebuildR4ValueSets({ parallel: true, ...options }); const valueSetsStats = performance.measure('Finished rebuildR4ValueSets', 'Starting rebuildR4ValueSets'); globalLogger.info('Finished rebuildR4ValueSets', { duration: `${Math.ceil(valueSetsStats.duration)} ms` }); performance.mark('Starting rebuildR4SearchParameters'); - await rebuildR4SearchParameters(); + await rebuildR4SearchParameters({ parallel: true, ...options }); const searchParamsStats = performance.measure( 'Finished rebuildR4SearchParameters', 'Starting rebuildR4SearchParameters' diff --git a/packages/server/src/seeds/common.ts b/packages/server/src/seeds/common.ts new file mode 100644 index 0000000000..6d9db22550 --- /dev/null +++ b/packages/server/src/seeds/common.ts @@ -0,0 +1,16 @@ +export interface RebuildOptions { + /** + * Whether the resources should be created in parallel. + * + * **WARNING: Can be CPU intensive and/or clog up the connection pool.** + */ + parallel: boolean; +} + +const defaultOptions = { + parallel: false, +}; + +export function buildRebuildOptions(options?: Partial): RebuildOptions { + return { ...defaultOptions, ...options }; +} diff --git a/packages/server/src/seeds/searchparameters.ts b/packages/server/src/seeds/searchparameters.ts index 506e125ab4..c19659df70 100644 --- a/packages/server/src/seeds/searchparameters.ts +++ b/packages/server/src/seeds/searchparameters.ts @@ -4,23 +4,34 @@ import { getDatabasePool } from '../database'; import { Repository, getSystemRepo } from '../fhir/repo'; import { globalLogger } from '../logger'; import { r4ProjectId } from '../seed'; +import { RebuildOptions, buildRebuildOptions } from './common'; /** * Creates all SearchParameter resources. + * @param options - Optional options for how rebuild should be done. */ -export async function rebuildR4SearchParameters(): Promise { +export async function rebuildR4SearchParameters(options?: Partial): Promise { + const finalOptions = buildRebuildOptions(options); const client = getDatabasePool(); await client.query('DELETE FROM "SearchParameter" WHERE "projectId" = $1', [r4ProjectId]); const systemRepo = getSystemRepo(); - const promises = []; - for (const filename of SEARCH_PARAMETER_BUNDLE_FILES) { - for (const entry of readJson(filename).entry as BundleEntry[]) { - promises.push(createParameter(systemRepo, entry.resource as SearchParameter)); + if (finalOptions.parallel) { + const promises = []; + for (const filename of SEARCH_PARAMETER_BUNDLE_FILES) { + for (const entry of readJson(filename).entry as BundleEntry[]) { + promises.push(createParameter(systemRepo, entry.resource as SearchParameter)); + } + } + await Promise.all(promises); + } else { + for (const filename of SEARCH_PARAMETER_BUNDLE_FILES) { + for (const entry of readJson(filename).entry as BundleEntry[]) { + await createParameter(systemRepo, entry.resource as SearchParameter); + } } } - await Promise.all(promises); } async function createParameter(systemRepo: Repository, param: SearchParameter): Promise { diff --git a/packages/server/src/seeds/structuredefinitions.ts b/packages/server/src/seeds/structuredefinitions.ts index 4eb89df68f..9baa37112d 100644 --- a/packages/server/src/seeds/structuredefinitions.ts +++ b/packages/server/src/seeds/structuredefinitions.ts @@ -4,23 +4,32 @@ import { getDatabasePool } from '../database'; import { Repository, getSystemRepo } from '../fhir/repo'; import { globalLogger } from '../logger'; import { r4ProjectId } from '../seed'; +import { RebuildOptions, buildRebuildOptions } from './common'; /** * Creates all StructureDefinition resources. + * @param options - Optional options for how rebuild should be done. */ -export async function rebuildR4StructureDefinitions(): Promise { +export async function rebuildR4StructureDefinitions(options?: Partial): Promise { + const finalOptions = buildRebuildOptions(options) as RebuildOptions; const client = getDatabasePool(); await client.query(`DELETE FROM "StructureDefinition" WHERE "projectId" = $1`, [r4ProjectId]); const systemRepo = getSystemRepo(); - await Promise.all([ - createStructureDefinitionsForBundle(systemRepo, readJson('fhir/r4/profiles-resources.json') as Bundle), - createStructureDefinitionsForBundle(systemRepo, readJson('fhir/r4/profiles-medplum.json') as Bundle), - createStructureDefinitionsForBundle(systemRepo, readJson('fhir/r4/profiles-others.json') as Bundle), - ]); + if (finalOptions.parallel) { + await Promise.all([ + createStructureDefinitionsForBundleParallel(systemRepo, readJson('fhir/r4/profiles-resources.json') as Bundle), + createStructureDefinitionsForBundleParallel(systemRepo, readJson('fhir/r4/profiles-medplum.json') as Bundle), + createStructureDefinitionsForBundleParallel(systemRepo, readJson('fhir/r4/profiles-others.json') as Bundle), + ]); + } else { + await createStructureDefinitionsForBundleSerial(systemRepo, readJson('fhir/r4/profiles-resources.json') as Bundle); + await createStructureDefinitionsForBundleSerial(systemRepo, readJson('fhir/r4/profiles-medplum.json') as Bundle); + await createStructureDefinitionsForBundleSerial(systemRepo, readJson('fhir/r4/profiles-others.json') as Bundle); + } } -async function createStructureDefinitionsForBundle( +async function createStructureDefinitionsForBundleParallel( systemRepo: Repository, structureDefinitions: Bundle ): Promise { @@ -34,6 +43,18 @@ async function createStructureDefinitionsForBundle( await Promise.all(promises); } +async function createStructureDefinitionsForBundleSerial( + systemRepo: Repository, + structureDefinitions: Bundle +): Promise { + for (const entry of structureDefinitions.entry as BundleEntry[]) { + const resource = entry.resource as Resource; + if (resource.resourceType === 'StructureDefinition' && resource.name) { + await createAndLogStructureDefinition(systemRepo, resource); + } + } +} + async function createAndLogStructureDefinition(systemRepo: Repository, resource: StructureDefinition): Promise { globalLogger.debug('[StructureDefinition] creation started: ' + resource.name); const result = await systemRepo.createResource({ diff --git a/packages/server/src/seeds/valuesets.ts b/packages/server/src/seeds/valuesets.ts index d52d2a19cc..aa6792cccb 100644 --- a/packages/server/src/seeds/valuesets.ts +++ b/packages/server/src/seeds/valuesets.ts @@ -3,25 +3,38 @@ import { readJson } from '@medplum/definitions'; import { Bundle, BundleEntry, CodeSystem, ValueSet } from '@medplum/fhirtypes'; import { Repository, getSystemRepo } from '../fhir/repo'; import { r4ProjectId } from '../seed'; +import { RebuildOptions, buildRebuildOptions } from './common'; /** * Imports all built-in ValueSets and CodeSystems into the database. + * @param options - Optional options for how rebuild should be done. */ -export async function rebuildR4ValueSets(): Promise { +export async function rebuildR4ValueSets(options?: Partial): Promise { + const finalOptions = buildRebuildOptions(options) as RebuildOptions; const systemRepo = getSystemRepo(); const files = ['v2-tables.json', 'v3-codesystems.json', 'valuesets.json', 'valuesets-medplum.json']; for (const file of files) { const bundle = readJson('fhir/r4/' + file) as Bundle; - const promises = []; - for (const entry of bundle.entry as BundleEntry[]) { - promises.push(overwriteResource(systemRepo, entry.resource as CodeSystem | ValueSet)); + if (finalOptions.parallel) { + const promises = []; + for (const entry of bundle.entry as BundleEntry[]) { + promises.push(overwriteResource(systemRepo, entry.resource as CodeSystem | ValueSet, finalOptions)); + } + await Promise.all(promises); + } else { + for (const entry of bundle.entry as BundleEntry[]) { + await overwriteResource(systemRepo, entry.resource as CodeSystem | ValueSet, finalOptions); + } } - await Promise.all(promises); } } -async function overwriteResource(systemRepo: Repository, resource: CodeSystem | ValueSet): Promise { - await deleteExisting(systemRepo, resource, r4ProjectId); +async function overwriteResource( + systemRepo: Repository, + resource: CodeSystem | ValueSet, + options: RebuildOptions +): Promise { + await deleteExisting(systemRepo, resource, r4ProjectId, options); await systemRepo.createResource({ ...resource, meta: { @@ -36,7 +49,8 @@ async function overwriteResource(systemRepo: Repository, resource: CodeSystem | async function deleteExisting( systemRepo: Repository, resource: CodeSystem | ValueSet, - projectId: string + projectId: string, + options: RebuildOptions ): Promise { const bundle = await systemRepo.search({ resourceType: resource.resourceType, @@ -45,12 +59,19 @@ async function deleteExisting( { code: '_project', operator: Operator.EQUALS, value: projectId }, ], }); - const promises = []; if (bundle.entry && bundle.entry.length > 0) { - for (const entry of bundle.entry) { - const existing = entry.resource as CodeSystem | ValueSet; - promises.push(systemRepo.deleteResource(existing.resourceType, existing.id as string)); + if (options.parallel) { + const promises = []; + for (const entry of bundle.entry) { + const existing = entry.resource as CodeSystem | ValueSet; + promises.push(systemRepo.deleteResource(existing.resourceType, existing.id as string)); + } + await Promise.all(promises); + } else { + for (const entry of bundle.entry) { + const existing = entry.resource as CodeSystem | ValueSet; + await systemRepo.deleteResource(existing.resourceType, existing.id as string); + } } } - await Promise.all(promises); }