Skip to content

Commit

Permalink
Merge pull request #2307 from metriport/1827-logs-to-executewithretries
Browse files Browse the repository at this point in the history
1827 Add request aware log to execute with retries functions
  • Loading branch information
leite08 committed Jun 20, 2024
2 parents 0704bcb + 1a3bc08 commit 554cdd0
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 21 deletions.
2 changes: 2 additions & 0 deletions packages/api/src/external/commonwell/patient-external-data.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Patient } from "@metriport/core/domain/patient";
import { DiscoveryParams } from "@metriport/core/domain/patient-discovery";
import { out } from "@metriport/core/util/log";
import { executeWithRetriesSafe, MetriportError } from "@metriport/shared";
import dayjs from "dayjs";
import duration from "dayjs/plugin/duration";
Expand Down Expand Up @@ -51,6 +52,7 @@ export async function getPatientWithCWData(
return executeWithRetriesSafe(() => _getPatientWithCWData(patient), {
maxAttempts: maxAttemptsToGetPatientCWData,
initialDelay: waitTimeBetweenAttemptsToGetPatientCWData.asMilliseconds(),
log: out("getPatientWithCWData").log,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export async function documentUploaderHandler(
await executeWithRetries(() => s3Utils.s3.copyObject(params).promise(), {
maxAttempts: 3,
initialDelay: 500,
log,
});
log(`Successfully copied the uploaded file to ${destinationBucket} with key ${destinationKey}`);
} catch (error) {
Expand Down Expand Up @@ -124,7 +125,7 @@ async function forwardCallToServer(
const url = `${apiServerURL}?cxId=${cxId}`;
const encodedUrl = encodeURI(url);

const resp = await executeWithNetworkRetries(() => api.post(encodedUrl, fileData));
const resp = await executeWithNetworkRetries(() => api.post(encodedUrl, fileData), { log });
log(`Server response - status: ${resp.status}`);
log(`Server response - body: ${JSON.stringify(resp.data)}`);
return resp.data;
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/external/aws/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import dayjs from "dayjs";
import duration from "dayjs/plugin/duration";
import * as stream from "stream";
import * as util from "util";
import { out } from "../../util/log";
import { capture } from "../../util/notifications";

dayjs.extend(duration);
Expand All @@ -26,7 +27,8 @@ async function executeWithRetriesS3<T>(
fn: () => Promise<T>,
options?: ExecuteWithRetriesOptions<T>
): Promise<T> {
return await executeWithRetries(fn, { ...defaultS3RetriesConfig, ...options });
const log = options?.log ?? out("executeWithRetriesS3").log;
return await executeWithRetries(fn, { ...defaultS3RetriesConfig, ...options, log });
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
} from "@metriport/ihe-gateway-sdk";
import { errorToString, executeWithNetworkRetries, executeWithRetries } from "@metriport/shared";
import axios from "axios";
import { log as getLog } from "../../../util/log";
import { log as getLog, out } from "../../../util/log";
import { capture } from "../../../util/notifications";
import { createAndSignBulkDQRequests, SignedDqRequest } from "./outbound/xca/create/iti38-envelope";
import { createAndSignBulkDRRequests, SignedDrRequest } from "./outbound/xca/create/iti39-envelope";
Expand Down Expand Up @@ -52,6 +52,7 @@ export async function sendProcessRetryDqRequest({
initialDelay: 3000,
maxAttempts: 3,
shouldRetry: isRetryable,
log: out("sendProcessRetryDqRequest").log,
});
}

Expand Down Expand Up @@ -85,6 +86,7 @@ export async function sendProcessRetryDrRequest({
initialDelay: 3000,
maxAttempts: 3,
shouldRetry: isRetryable,
log: out("sendProcessRetryDrRequest").log,
});
}

Expand All @@ -101,6 +103,7 @@ export async function createSignSendProcessXCPDRequest({
patientId: string;
cxId: string;
}): Promise<void> {
const log = getLog("createSignSendProcessXCPDRequest");
const signedRequests = createAndSignBulkXCPDRequests(xcpdRequest, samlCertsAndKeys);
const responses = await sendSignedXCPDRequests({
signedRequests,
Expand All @@ -118,11 +121,10 @@ export async function createSignSendProcessXCPDRequest({
for (const result of results) {
try {
// TODO not sure if we should retry on timeout
await executeWithNetworkRetries(async () => axios.post(pdResponseUrl, result));
await executeWithNetworkRetries(async () => axios.post(pdResponseUrl, result), { log });
} catch (error) {
const msg = "Failed to send PD response to internal CQ endpoint";
const extra = { cxId, patientId, result };
const log = getLog("createSignSendProcessXCPDRequest");
log(`${msg} - ${errorToString(error)} - ${JSON.stringify(extra)}`);
capture.error(msg, { extra: { ...extra, error } });
}
Expand All @@ -142,6 +144,8 @@ export async function createSignSendProcessDqRequests({
patientId: string;
cxId: string;
}): Promise<void> {
const log = getLog("createSignSendProcessDqRequests");

const signedRequests = createAndSignBulkDQRequests({
bulkBodyData: dqRequestsGatewayV2,
samlCertsAndKeys,
Expand All @@ -167,11 +171,10 @@ export async function createSignSendProcessDqRequests({
for (const result of successfulResults) {
try {
// TODO not sure if we should retry on timeout
await executeWithNetworkRetries(async () => axios.post(dqResponseUrl, result));
await executeWithNetworkRetries(async () => axios.post(dqResponseUrl, result), { log });
} catch (error) {
const msg = "Failed to send DQ response to internal CQ endpoint";
const extra = { cxId, patientId, result };
const log = getLog("createSignSendProcessDQRequests");
log(`${msg} - ${errorToString(error)} - ${JSON.stringify(extra)}`);
capture.error(msg, { extra: { ...extra, error } });
}
Expand All @@ -191,6 +194,7 @@ export async function createSignSendProcessDrRequests({
patientId: string;
cxId: string;
}): Promise<void> {
const log = getLog("createSignSendProcessDrRequests");
const signedRequests = createAndSignBulkDRRequests({
bulkBodyData: drRequestsGatewayV2,
samlCertsAndKeys,
Expand All @@ -217,11 +221,10 @@ export async function createSignSendProcessDrRequests({
for (const result of successfulResults) {
try {
// TODO not sure if we should retry on timeout
await executeWithNetworkRetries(async () => axios.post(drResponseUrl, result));
await executeWithNetworkRetries(async () => axios.post(drResponseUrl, result), { log });
} catch (error) {
const msg = "Failed to send DR response to internal CQ endpoint";
const extra = { cxId, patientId, result };
const log = getLog("createSignSendProcessDRRequests");
log(`${msg} - ${errorToString(error)} - ${JSON.stringify(extra)}`);
capture.error(msg, { extra: { ...extra, error } });
}
Expand Down
19 changes: 14 additions & 5 deletions packages/lambdas/src/sqs-to-converter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,12 @@ export const handler = Sentry.AWSLambda.wrapHandler(async (event: SQSEvent) => {
fhirConverter.post(converterUrl, payloadClean, {
params: converterParams,
headers: { "Content-Type": "text/plain" },
})
// No retries on timeout b/c we want to re-enqueue instead of trying within the same lambda run,
// it could lead to timing out the lambda execution.
}),
{
// No retries on timeout b/c we want to re-enqueue instead of trying within the same lambda run,
// it could lead to timing out the lambda execution.
log,
}
);
const conversionResult = res.data.fhirResource as FHIRBundle;
metrics.conversion = {
Expand All @@ -241,7 +244,10 @@ export const handler = Sentry.AWSLambda.wrapHandler(async (event: SQSEvent) => {
ContentType: "application/fhir+json",
})
.promise(),
defaultS3RetriesConfig
{
...defaultS3RetriesConfig,
log,
}
);
} catch (error) {
console.log(`Error uploading pre-processed file: ${error}`);
Expand Down Expand Up @@ -410,7 +416,10 @@ async function sendConversionResult(
ContentType: "application/fhir+json",
})
.promise(),
defaultS3RetriesConfig
{
...defaultS3RetriesConfig,
log,
}
);

log(`Sending result info to queue`);
Expand Down
7 changes: 5 additions & 2 deletions packages/lambdas/src/sqs-to-fhir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,10 @@ export const handler = Sentry.AWSLambda.wrapHandler(async (event: SQSEvent) => {
const downloadStart = Date.now();
const payloadRaw = await executeWithRetries(
() => s3Utils.getFileContentsAsString(s3BucketName, s3FileName),
defaultS3RetriesConfig
{
...defaultS3RetriesConfig,
log,
}
);
metrics.download = {
duration: Date.now() - downloadStart,
Expand Down Expand Up @@ -163,7 +166,7 @@ export const handler = Sentry.AWSLambda.wrapHandler(async (event: SQSEvent) => {
// This retry logic is for application level errors, not network errors
while (retry) {
count++;
response = await executeWithNetworkRetries(() => fhirApi.executeBatch(payload));
response = await executeWithNetworkRetries(() => fhirApi.executeBatch(payload), { log });
const errors = getErrorsFromReponse(response);
if (errors.length <= 0) break;
retry = count < maxRetries;
Expand Down
1 change: 1 addition & 0 deletions packages/lambdas/src/sqs-to-opensearch-xml.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ export const handler = Sentry.AWSLambda.wrapHandler(async (event: SQSEvent) => {
await executeWithRetries(async () => openSearch.ingest(params), {
initialDelay: initialDelayBetweenRetries.asMilliseconds(),
maxAttempts: maxAttemptsToIngest,
log,
});
metrics.ingestion = {
duration: Date.now() - ingestionStart,
Expand Down
10 changes: 5 additions & 5 deletions packages/shared/src/net/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ import axios, { AxiosError } from "axios";
import { errorToString } from "../error/shared";

// https://nodejs.org/docs/latest-v18.x/api/errors.html#common-system-errors
export const nodeConnRefusedErrorCodes = ["ECONNREFUSED", "ECONNRESET"];
export const nodeConnRefusedErrorCodes = ["ECONNREFUSED", "ECONNRESET"] as const;
export type NodeConnRefusedNetworkError = (typeof nodeConnRefusedErrorCodes)[number];

export const nodeTimeoutErrorCodes = ["ETIMEDOUT"];
export const nodeTimeoutErrorCodes = ["ETIMEDOUT"] as const;
export type NodeTimeoutNetworkError = (typeof nodeTimeoutErrorCodes)[number];

export type NodeNetworkError = NodeTimeoutNetworkError | NodeConnRefusedNetworkError;
export type NodeNetworkError = NodeTimeoutNetworkError | NodeConnRefusedNetworkError | "ENOTFOUND";

// Axios error codes that are timeout errors

export const axiosTimeoutErrorCodes = [AxiosError.ECONNABORTED, AxiosError.ETIMEDOUT];
export const axiosTimeoutErrorCodes = [AxiosError.ECONNABORTED, AxiosError.ETIMEDOUT] as const;
export type AxiosTimeoutError = (typeof axiosTimeoutErrorCodes)[number];

export const axiosResponseErrorCodes = [AxiosError.ERR_BAD_RESPONSE];
export const axiosResponseErrorCodes = [AxiosError.ERR_BAD_RESPONSE] as const;
export type AxiosResponseError = (typeof axiosResponseErrorCodes)[number];

export type AxiosNetworkError = AxiosTimeoutError | AxiosResponseError;
Expand Down
1 change: 1 addition & 0 deletions packages/shared/src/net/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const defaultOptions: ExecuteWithNetworkRetriesOptions = {
// https://nodejs.org/docs/latest-v18.x/api/errors.html#common-system-errors
"ECONNREFUSED", // (Connection refused): No connection could be made because the target machine actively refused it. This usually results from trying to connect to a service that is inactive on the foreign host.
"ECONNRESET", // (Connection reset by peer): A connection was forcibly closed by a peer. This normally results from a loss of the connection on the remote socket due to a timeout or reboot. Commonly encountered via the http and net modules.
"ENOTFOUND", // (DNS lookup failed): Indicates a DNS failure of either EAI_NODATA or EAI_NONAME. This is not a standard POSIX error.
],
httpStatusCodesToRetry: [429], // 429 Too Many Requests
retryOnTimeout: false,
Expand Down

0 comments on commit 554cdd0

Please sign in to comment.