Skip to content

Commit

Permalink
Merge pull request #2319 from metriport/develop
Browse files Browse the repository at this point in the history
RELEASE 429s, Stream IHE Results, Epic Chunk Sizes
  • Loading branch information
jonahkaye committed Jun 21, 2024
2 parents 4a624e7 + 3ed64a6 commit 59fb165
Show file tree
Hide file tree
Showing 42 changed files with 2,010 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,62 @@ describe("outboundDocumentRetrievalRequest", () => {
expect(res[0].documentReference.length).toEqual(2);
});

it("returns 1 req with 6 doc refs when we have an epic gw", async () => {
const outboundDocumentQueryResps: OutboundDocumentQueryResp[] = [
makeOutboundDocumentQueryResp({
gateway: makeXcaGateway({ homeCommunityId: "1.2.840.114350.1.13" }),
documentReference: [
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
],
}),
];
const res: OutboundDocumentRetrievalReq[] = createOutboundDocumentRetrievalReqs({
patient,
requestId,
initiator,
outboundDocumentQueryResults: outboundDocumentQueryResps,
});
expect(res).toBeTruthy();
expect(res.length).toEqual(1);
expect(res[0].documentReference.length).toEqual(6);
});

it("returns 2 req with 11 doc refs when we have an epic gw", async () => {
const outboundDocumentQueryResps: OutboundDocumentQueryResp[] = [
makeOutboundDocumentQueryResp({
gateway: makeXcaGateway({ homeCommunityId: "1.2.840.114350.1.13" }),
documentReference: [
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
makeDocumentReferenceWithMetriportId(),
],
}),
];
const res: OutboundDocumentRetrievalReq[] = createOutboundDocumentRetrievalReqs({
patient,
requestId,
initiator,
outboundDocumentQueryResults: outboundDocumentQueryResps,
});
expect(res).toBeTruthy();
expect(res.length).toEqual(2);
expect(res[0].documentReference.length).toEqual(10);
expect(res[1].documentReference.length).toEqual(1);
});

it("returns one req when doc refs within limit", async () => {
const outboundDocumentQueryResps: OutboundDocumentQueryResp[] = [
makeOutboundDocumentQueryResp({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import {
OutboundDocumentQueryResp,
OutboundDocumentRetrievalReq,
} from "@metriport/ihe-gateway-sdk";
import {
isEpicGateway,
maxDocRefsPerEpicDocRetrievalRequest,
} from "@metriport/core/external/carequality/ihe-gateway-v2/gateways";

import dayjs from "dayjs";
import { chunk } from "lodash";
import { HieInitiator } from "../../hie/get-hie-initiator";
Expand Down Expand Up @@ -60,7 +65,10 @@ export function createOutboundDocumentRetrievalReqs({
},
};

const docRefChunks = chunk(documentReference, maxDocRefsPerDocRetrievalRequest);
const docRefsPerRequest = isEpicGateway(gateway)
? maxDocRefsPerEpicDocRetrievalRequest
: maxDocRefsPerDocRetrievalRequest;
const docRefChunks = chunk(documentReference, docRefsPerRequest);
const request: OutboundDocumentRetrievalReq[] = docRefChunks.map(chunk => {
return {
...baseRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ export const namespaces = {
xsiType: "xsd:string",
};

export const expiresIn = 5;
export const expiresIn = 20;
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const pointClickCareOid = "2.16.840.1.113883.3.6448";
const redoxOid = "2.16.840.1.113883.3.6147.458";
const redoxGatewayOid = "2.16.840.1.113883.3.6147.458.2";
const surescriptsOid = "2.16.840.1.113883.3.2054.2.1.1";
const epicOidPrefix = "1.2.840.114350.1.13";

/*
* These gateways only accept a single document reference per request.
Expand All @@ -33,6 +34,8 @@ const gatewaysThatAcceptOneDocRefPerRequest = [
surescriptsOid,
];

export const maxDocRefsPerEpicDocRetrievalRequest = 10;

/*
* These gateways require a urn:uuid prefix before document Unique ids formatted as lowercase uuids
*/
Expand Down Expand Up @@ -82,3 +85,7 @@ export function requiresUrnInSoapBody(gateway: XCPDGateway): boolean {
export function requiresOnlyOneDocRefPerRequest(gateway: XCAGateway): boolean {
return gatewaysThatAcceptOneDocRefPerRequest.includes(gateway.homeCommunityId);
}

export function isEpicGateway(gateway: XCAGateway): boolean {
return gateway.homeCommunityId.startsWith(epicOidPrefix);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dayjs.extend(duration);
const SLEEP_IN_BETWEEN_DOCUMENT_RETRIEVAL_REQUESTS = dayjs.duration({ seconds: 1 });
const MAX_GATEWAYS_BEFORE_CHUNK = 1000;
const MAX_DOCUMENT_QUERY_REQUESTS_PER_INVOCATION = 20;
const MAX_DOCUMENT_RETRIEVAL_REQUESTS_PER_INVOCATION = 20;
const MAX_DOCUMENT_RETRIEVAL_REQUESTS_PER_INVOCATION = 10;

const iheGatewayV2OutboundPatientDiscoveryLambdaName = "IHEGatewayV2OutboundPatientDiscoveryLambda";
const iheGatewayV2OutboundDocumentQueryLambdaName = "IHEGatewayV2OutboundDocumentQueryLambda";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,54 @@ import { createAndSignBulkDQRequests, SignedDqRequest } from "./outbound/xca/cre
import { createAndSignBulkDRRequests, SignedDrRequest } from "./outbound/xca/create/iti39-envelope";
import { processDqResponse } from "./outbound/xca/process/dq-response";
import { processDrResponse } from "./outbound/xca/process/dr-response";
import { isRetryable } from "./outbound/xca/process/error";
import { isRetryable as isRetryableXca } from "./outbound/xca/process/error";
import { isRetryable as isRetryableXcpd } from "./outbound/xcpd/process/error";
import { sendSignedDqRequest } from "./outbound/xca/send/dq-requests";
import { sendSignedDrRequest } from "./outbound/xca/send/dr-requests";
import { createAndSignBulkXCPDRequests } from "./outbound/xcpd/create/iti55-envelope";
import {
createAndSignBulkXCPDRequests,
SignedXcpdRequest,
} from "./outbound/xcpd/create/iti55-envelope";
import { processXCPDResponse } from "./outbound/xcpd/process/xcpd-response";
import { sendSignedXCPDRequests } from "./outbound/xcpd/send/xcpd-requests";
import { sendSignedXcpdRequest } from "./outbound/xcpd/send/xcpd-requests";
import { SamlCertsAndKeys } from "./saml/security/types";

export async function sendProcessXcpdRequest({
signedRequest,
samlCertsAndKeys,
patientId,
cxId,
index,
}: {
signedRequest: SignedXcpdRequest;
samlCertsAndKeys: SamlCertsAndKeys;
patientId: string;
cxId: string;
index: number;
}): Promise<OutboundPatientDiscoveryResp> {
async function sendAndProcess() {
const response = await sendSignedXcpdRequest({
request: signedRequest,
samlCertsAndKeys,
patientId,
cxId,
index,
});
return processXCPDResponse({
xcpdResponse: response,
patientId,
cxId,
});
}

return await executeWithRetries(sendAndProcess, {
initialDelay: 3000,
maxAttempts: 3,
shouldRetry: isRetryableXcpd,
log: out("sendProcessXcpdRequest").log,
});
}

export async function sendProcessRetryDqRequest({
signedRequest,
samlCertsAndKeys,
Expand Down Expand Up @@ -51,7 +91,7 @@ export async function sendProcessRetryDqRequest({
return await executeWithRetries(sendProcessDqRequest, {
initialDelay: 3000,
maxAttempts: 3,
shouldRetry: isRetryable,
shouldRetry: isRetryableXca,
log: out("sendProcessRetryDqRequest").log,
});
}
Expand Down Expand Up @@ -85,7 +125,7 @@ export async function sendProcessRetryDrRequest({
return await executeWithRetries(sendProcessDrRequest, {
initialDelay: 3000,
maxAttempts: 3,
shouldRetry: isRetryable,
shouldRetry: isRetryableXca,
log: out("sendProcessRetryDrRequest").log,
});
}
Expand All @@ -105,30 +145,29 @@ export async function createSignSendProcessXCPDRequest({
}): Promise<void> {
const log = getLog("createSignSendProcessXCPDRequest");
const signedRequests = createAndSignBulkXCPDRequests(xcpdRequest, samlCertsAndKeys);
const responses = await sendSignedXCPDRequests({
signedRequests,
samlCertsAndKeys,
patientId,
cxId,
});
const results: OutboundPatientDiscoveryResp[] = responses.map(response => {
return processXCPDResponse({
xcpdResponse: response,
const resultPromises = signedRequests.map(async (signedRequest, index) => {
const result = await sendProcessXcpdRequest({
signedRequest,
samlCertsAndKeys,
patientId,
cxId,
index,
});
});
for (const result of results) {
try {
// TODO not sure if we should retry on timeout
await executeWithNetworkRetries(async () => axios.post(pdResponseUrl, result), { log });
await executeWithNetworkRetries(async () => axios.post(pdResponseUrl, result), {
httpStatusCodesToRetry: [502, 504],
log,
});
} catch (error) {
const msg = "Failed to send PD response to internal CQ endpoint";
const extra = { cxId, patientId, result };
log(`${msg} - ${errorToString(error)} - ${JSON.stringify(extra)}`);
capture.error(msg, { extra: { ...extra, error } });
}
}
});

await Promise.allSettled(resultPromises);
}

export async function createSignSendProcessDqRequests({
Expand All @@ -152,33 +191,28 @@ export async function createSignSendProcessDqRequests({
});

const resultPromises = signedRequests.map(async (signedRequest, index) => {
return sendProcessRetryDqRequest({
const result = await sendProcessRetryDqRequest({
signedRequest,
samlCertsAndKeys,
patientId,
cxId,
index,
});
});

const results = await Promise.allSettled(resultPromises);
const successfulResults = results
.filter(
(result): result is PromiseFulfilledResult<OutboundDocumentQueryResp> =>
result.status === "fulfilled"
)
.map(result => result.value);
for (const result of successfulResults) {
try {
// TODO not sure if we should retry on timeout
await executeWithNetworkRetries(async () => axios.post(dqResponseUrl, result), { log });
await executeWithNetworkRetries(async () => axios.post(dqResponseUrl, result), {
httpStatusCodesToRetry: [502, 504],
log,
});
} catch (error) {
const msg = "Failed to send DQ response to internal CQ endpoint";
const extra = { cxId, patientId, result };
log(`${msg} - ${errorToString(error)} - ${JSON.stringify(extra)}`);
capture.error(msg, { extra: { ...extra, error } });
}
}
});

await Promise.allSettled(resultPromises);
}

export async function createSignSendProcessDrRequests({
Expand All @@ -201,32 +235,26 @@ export async function createSignSendProcessDrRequests({
});

const resultPromises = signedRequests.map(async (signedRequest, index) => {
return sendProcessRetryDrRequest({
const result = await sendProcessRetryDrRequest({
signedRequest,
samlCertsAndKeys,
patientId,
cxId,
index,
});
});

const results = await Promise.allSettled(resultPromises);
const successfulResults = results
.filter(
(result): result is PromiseFulfilledResult<OutboundDocumentRetrievalResp> =>
result.status === "fulfilled"
)
.map(result => result.value);

for (const result of successfulResults) {
try {
// TODO not sure if we should retry on timeout
await executeWithNetworkRetries(async () => axios.post(drResponseUrl, result), { log });
await executeWithNetworkRetries(async () => axios.post(drResponseUrl, result), {
httpStatusCodesToRetry: [502, 504],
log,
});
} catch (error) {
const msg = "Failed to send DR response to internal CQ endpoint";
const extra = { cxId, patientId, result };
log(`${msg} - ${errorToString(error)} - ${JSON.stringify(extra)}`);
capture.error(msg, { extra: { ...extra, error } });
}
}
});

await Promise.allSettled(resultPromises);
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,16 @@ export function handleSchemaErrorResponse({
* Retries if the response has an error that is not in the known non-retryable errors list
* Will not retry if the response is successful and is not an error.
*/
export function isRetryable(outboundResponse: OutboundDocumentRetrievalResp | undefined): boolean {
export function isRetryable(
outboundResponse: OutboundDocumentRetrievalResp | OutboundDocumentQueryResp | undefined
): boolean {
if (!outboundResponse) return false;
return (
outboundResponse.operationOutcome?.issue.some(
issue =>
issue.severity === "error" &&
issue.code !== "http-error" &&
issue.code !== "schema-error" &&
!knownNonRetryableErrors.some(
nonRetryableError =>
"text" in issue.details && issue.details.text.includes(nonRetryableError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { requiresUrnInSoapBody, getHomeCommunityId } from "../../../gateways";

const DATE_DASHES_REGEX = /-/g;
const action = "urn:hl7-org:v3:PRPA_IN201305UV02:CrossGatewayPatientDiscovery";
export type BulkSignedXCPD = {
export type SignedXcpdRequest = {
gateway: XCPDGateway;
signedRequest: string;
outboundRequest: OutboundPatientDiscoveryReq;
Expand Down Expand Up @@ -340,8 +340,8 @@ export function createITI5SoapEnvelope({
export function createAndSignBulkXCPDRequests(
bulkBodyData: OutboundPatientDiscoveryReq,
samlCertsAndKeys: SamlCertsAndKeys
): BulkSignedXCPD[] {
const signedRequests: BulkSignedXCPD[] = [];
): SignedXcpdRequest[] {
const signedRequests: SignedXcpdRequest[] = [];

for (const gateway of bulkBodyData.gateways) {
const bodyData: OutboundPatientDiscoveryReq = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,11 @@ export function handleSchemaErrorResponse({
};
return response;
}

/**
* For now lets not retry on any error. We have network retries already.
*/
export function isRetryable(outboundResponse: OutboundPatientDiscoveryResp | undefined): boolean {
if (!outboundResponse) return false;
return false;
}
Loading

0 comments on commit 59fb165

Please sign in to comment.