Skip to content

Commit

Permalink
build: add write access to EC lambda +
Browse files Browse the repository at this point in the history
- add write access to EC lambda
- allow EC lambda to dryRun EC
- error handling on EC lambda
- log duration of EC execution through lambda

Ref: metriport/metriport-internal#1195
  • Loading branch information
leite08 committed Dec 12, 2023
1 parent cdb9469 commit ace58b7
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export async function checkStaleEnhancedCoverage(cxIds: string[]): Promise<void>
cxId,
patientIds: patientsOfCx.map(p => p.id),
cqLinkStatus: "linked",
context: "checkStaleEnhancedCoverage",
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,20 @@ export const completeEnhancedCoverage = async ({
cxId,
patientIds,
cqLinkStatus,
startedAt,
context,
}: {
cxId: string;
patientIds: string[];
cqLinkStatus: CQLinkStatus;
startedAt?: number;
/**
* Context/operation in which this function is being called
*/
context?: string;
}): Promise<void> => {
const startedAt = Date.now();
const { log } = out(`EC completer - cx ${cxId}`);
const startedAtLocal = Date.now();
const { log } = out(`EC completer - cx ${cxId}, ctxt ${context ?? "n/a"}`);
try {
log(
`Completing EC for ${patientIds.length} patients, to status: ${cqLinkStatus}, ` +
Expand All @@ -41,9 +48,14 @@ export const completeEnhancedCoverage = async ({
numberOfParallelExecutions: PARALLEL_UPDATES,
});
} finally {
const duration = Date.now() - startedAt;
const durationMin = dayjs.duration(duration).asMinutes();
log(`Done, duration: ${duration} ms / ${durationMin} min`);
const duration = startedAt ? Date.now() - startedAt : undefined;
const durationMin = duration ? dayjs.duration(duration).asMinutes() : undefined;
const durationLocal = Date.now() - startedAtLocal;
const durationLocalMin = dayjs.duration(durationLocal).asMinutes();
log(
`Done, total duration: ${duration} ms / ${durationMin} min ` +
`(just to complete: ${durationLocal} ms / ${durationLocalMin} min)`
);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { CoverageEnhancementParams } from "@metriport/core/external/commonwell/c
import { CoverageEnhancerLocal } from "@metriport/core/external/commonwell/cq-bridge/coverage-enhancer-local";
import { CommonWellManagementAPI } from "@metriport/core/external/commonwell/management/api";
import { out } from "@metriport/core/util/log";
import { sleep } from "@metriport/core/util/sleep";
import { sleep } from "@metriport/shared";
import dayjs from "dayjs";
import duration from "dayjs/plugin/duration";
import { capture } from "../../../shared/notifications";
Expand Down Expand Up @@ -46,10 +46,12 @@ export class CoverageEnhancerApiLocal extends CoverageEnhancerLocal {
log(`Giving some time for patients to be updated @ CW... (${waitTime} ms)`);
await sleep(waitTime);

await completeEnhancedCoverage({ cxId, patientIds, cqLinkStatus: "linked" });

const duration = Date.now() - startedAt;
const durationMin = dayjs.duration(duration).asMinutes();
log(`Done, total time: ${duration} ms / ${durationMin} min`);
await completeEnhancedCoverage({
cxId,
patientIds,
cqLinkStatus: "linked",
startedAt,
context: "enhanceCoverage",
});
}
}
6 changes: 5 additions & 1 deletion packages/api/src/routes/medical/internal-patient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ const completeEnhancedCoverageSchema = z.object({
cxId: uuidSchema,
patientIds: uuidSchema.array(),
cqLinkStatus: cqLinkStatusSchema,
startedAt: z.number().nullish(),
});

/** ---------------------------------------------------------------------------
Expand All @@ -420,13 +421,16 @@ const completeEnhancedCoverageSchema = z.object({
router.post(
"/enhance-coverage/completed",
asyncHandler(async (req: Request, res: Response) => {
const { cxId, patientIds, cqLinkStatus } = completeEnhancedCoverageSchema.parse(req.body);
const { cxId, patientIds, cqLinkStatus, startedAt } = completeEnhancedCoverageSchema.parse(
req.body
);

// intentionally async, no need to wait for it
completeEnhancedCoverage({
cxId,
patientIds,
cqLinkStatus,
startedAt: startedAt ?? undefined,
}).catch(error => {
console.log(
`Failed to set cqLinkStatus for patients ${patientIds.join(", ")} - ${errorToString(error)}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { PatientUpdater } from "./patient-updater";

dayjs.extend(duration);

const UPDATE_TIMEOUT = dayjs.duration({ minutes: 2 });
const UPDATE_TIMEOUT = dayjs.duration({ minutes: 3 });

/**
* Implementation of the PatientUpdater that calls the Metriport API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ export class CoverageEnhancerCloud extends CoverageEnhancer {
});
}
} finally {
await this.sendEnhancedCoverageDone(cxId, patientIds);
await this.sendEnhancedCoverageDone(cxId, patientIds, startedAt);

const duration = Date.now() - startedAt;
const durationMin = dayjs.duration(duration).asMinutes();
Expand All @@ -72,11 +72,12 @@ export class CoverageEnhancerCloud extends CoverageEnhancer {
await this.sendMessageToQueue(params.cxId, payload);
}

private async sendEnhancedCoverageDone(cxId: string, patientIds: string[]) {
private async sendEnhancedCoverageDone(cxId: string, patientIds: string[], startedAt: number) {
const payload: Input = {
cxId,
patientIds,
done: true,
startedAt,
};
await this.sendMessageToQueue(cxId, payload);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { PatientLoader } from "../../../domain/patient/patient-loader";
import { CQOrgHydrated, getOrgChunksFromPos, getOrgsByPrio, OrgPrio } from "./get-orgs";

// Try to keep it even to make testing easier
export const defaultMaxOrgsToProcess = 350;
export const defaultMaxOrgsToProcess = 2500;

export type CoverageEnhancementParams = {
cxId: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@ export type ChunkProgress = { chunkIndex?: number | undefined; chunkTotal?: numb

export type Input =
| (LinkPatientsCommand & { done: false } & ChunkProgress)
| (Pick<LinkPatientsCommand, "cxId" | "patientIds"> & { done: true } & ChunkProgress);
| (Pick<LinkPatientsCommand, "cxId" | "patientIds"> & {
done: true;
startedAt?: number; // when the process started
} & ChunkProgress);
7 changes: 7 additions & 0 deletions packages/core/src/external/commonwell/management/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@ export class CommonWellManagementAPI {
oid,
careQualityOrgIds,
timeout = DEFAULT_TIMEOUT_INCLUDE_LIST.asMilliseconds(),
dryRun = false,
log = console.log,
debug = emptyFunction,
}: {
oid: string;
careQualityOrgIds: string[];
timeout?: number;
dryRun?: boolean | undefined;
log?: typeof console.log | undefined;
debug?: typeof console.log | undefined;
}): Promise<void> {
Expand All @@ -139,6 +141,11 @@ export class CommonWellManagementAPI {
return;
}

if (dryRun) {
log(`[DRY RUN] Would be posting to /IncludeList and updating cookies, skipping...`);
return;
}

log(`Posting to /IncludeList...`);
const before = Date.now();
const resp = await axios.post(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ export type LinkPatientsCommand = {
cxOrgOID: string;
patientIds: string[];
cqOrgIds: string[];
/**
* Indicates whether to make changes to internal and external services or not, used to validate
* the overall setup/infra.
*/
dryRun?: boolean | undefined;
log?: typeof console.log;
};

Expand All @@ -25,22 +30,32 @@ export class LinkPatients {
private readonly patientsUpdater: PatientUpdater
) {}

/**
* Links Patients to CQ orgs using CW's CQ bridge.
* It updates the include list for the cx @ CW and then issues an update on all provided
* patient IDs so they get linked to those orgs @ CW's CQ bridge.
*
* @param dryRun indicates whether to make changes to internal and external services or not,
* used to validate the overall setup/infra
*/
async linkPatientsToOrgs({
cxId,
cxOrgOID,
patientIds,
cqOrgIds,
dryRun,
log,
}: LinkPatientsCommand): Promise<void> {
await this.cwManagementApi.updateIncludeList({
oid: cxOrgOID,
careQualityOrgIds: cqOrgIds,
dryRun,
log,
});

// Give some time for the cache - if any, on CW's side to catch up
await sleep(TIME_BETWEEN_INCLUDE_LIST_AND_UPDATE_ALL.asMilliseconds());

await this.patientsUpdater.updateAll(cxId, patientIds);
if (!dryRun) await this.patientsUpdater.updateAll(cxId, patientIds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ function createLinkPatientsLambda(

provideAccessToQueue({ accessType: "both", queue: inputQueue, resource: lambda });
cookieStore.grantRead(lambda);
cookieStore.grantWrite(lambda);

return lambda;
}
Expand Down
27 changes: 19 additions & 8 deletions packages/lambdas/src/cw-enhanced-coverage-link-patients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { PatientUpdaterMetriportAPI } from "@metriport/core/domain/patient/patie
import { Input } from "@metriport/core/external/commonwell/cq-bridge/cq-link-patients";
import { CommonWellManagementAPI } from "@metriport/core/external/commonwell/management/api";
import { LinkPatients } from "@metriport/core/external/commonwell/management/link-patients";
import { errorToString } from "@metriport/core/util/error/index";
import { MetriportError } from "@metriport/core/util/error/metriport-error";
import { sleep } from "@metriport/core/util/sleep";
import * as Sentry from "@sentry/serverless";
Expand Down Expand Up @@ -72,29 +73,36 @@ export const handler = Sentry.AWSLambda.wrapHandler(async (event: SQSEvent) => {
console.log(`Giving some time to increase chances of cache being updated @ CW...`);
await sleep(TIME_BETWEEN_CQ_LINK_AND_DOC_QUERY.asMilliseconds());
console.log(`Notifying our API EC is completed for this batch...`);
await completeEnhancedCoverage(entry.cxId, entry.patientIds);
await completeEnhancedCoverage(entry.cxId, entry.patientIds, entry.startedAt);
return;
}

if (!isProduction()) {
console.log(`--> skipping actual linking b/c we're not on prod`);
return;
}
await linkPatients.linkPatientsToOrgs(entry);
const dryRun = !isProduction();
await linkPatients.linkPatientsToOrgs({ ...entry, dryRun });

console.log(`Done.`);
} catch (error) {
const msg = `Error running EC Lambda`;
console.log(`${msg} - ${errorToString(error)}`);
capture.message(msg, { extra: { error, event }, level: "error" });
throw error;
} finally {
const duration = Date.now() - startedAt;
const durationMin = dayjs.duration(duration).asMinutes();
console.log(`Total time: ${duration} ms / ${durationMin} min`);
}
});

async function completeEnhancedCoverage(cxId: string, patientIds: string[]): Promise<void> {
async function completeEnhancedCoverage(
cxId: string,
patientIds: string[],
startedAt?: number
): Promise<void> {
await axios.post(`${metriportBaseUrl}/internal/patient/enhance-coverage/completed`, {
cxId,
patientIds,
cqLinkStatus: "linked",
startedAt,
});
}

Expand All @@ -121,7 +129,10 @@ function recordToEntry(record: SQSRecord): Input {
const done = body.done;
if (done == undefined) throw new Error(`Missing 'done' in body`);

const basePayload = { cxId, cxOrgOID, patientIds, done, chunkIndex, chunkTotal };
const startedAtRaw = body.startedAt;
const startedAt = startedAtRaw ? Number(startedAtRaw) : undefined;

const basePayload = { cxId, cxOrgOID, patientIds, done, chunkIndex, chunkTotal, startedAt };
if (done) return basePayload;
return { ...basePayload, cqOrgIds };
}

0 comments on commit ace58b7

Please sign in to comment.