Skip to content

Commit

Permalink
feat(ihe): stream responses to response endpoints + epic bigger doc r…
Browse files Browse the repository at this point in the history
…ef chunks

Refs: #1667
Signed-off-by: Jonah Kaye <[email protected]>
  • Loading branch information
jonahkaye committed Jun 19, 2024
1 parent ce53590 commit 858a904
Show file tree
Hide file tree
Showing 11 changed files with 184 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import {
OutboundDocumentQueryResp,
OutboundDocumentRetrievalReq,
} from "@metriport/ihe-gateway-sdk";
import {
isEpicGateway,
maxDocRefsPerEpicDocRetrievalRequest,
} from "@metriport/core/external/carequality/ihe-gateway-v2/gateways";

import dayjs from "dayjs";
import { chunk } from "lodash";
import { HieInitiator } from "../../hie/get-hie-initiator";
Expand Down Expand Up @@ -60,7 +65,10 @@ export function createOutboundDocumentRetrievalReqs({
},
};

const docRefChunks = chunk(documentReference, maxDocRefsPerDocRetrievalRequest);
const docRefsPerRequest = isEpicGateway(gateway)
? maxDocRefsPerEpicDocRetrievalRequest
: maxDocRefsPerDocRetrievalRequest;
const docRefChunks = chunk(documentReference, docRefsPerRequest);
const request: OutboundDocumentRetrievalReq[] = docRefChunks.map(chunk => {
return {
...baseRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const gatewaysThatAcceptOneDocRefPerRequest = [
surescriptsOid,
];

const epicOidPrefix = "1.2.840.114350.1.13";
export const maxDocRefsPerEpicDocRetrievalRequest = 10;

/*
* These gateways require a urn:uuid prefix before document Unique ids formatted as lowercase uuids
*/
Expand Down Expand Up @@ -82,3 +85,7 @@ export function requiresUrnInSoapBody(gateway: XCPDGateway): boolean {
export function requiresOnlyOneDocRefPerRequest(gateway: XCAGateway): boolean {
return gatewaysThatAcceptOneDocRefPerRequest.includes(gateway.homeCommunityId);
}

export function isEpicGateway(gateway: XCAGateway): boolean {
return gateway.homeCommunityId.startsWith(epicOidPrefix);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dayjs.extend(duration);
const SLEEP_IN_BETWEEN_DOCUMENT_RETRIEVAL_REQUESTS = dayjs.duration({ seconds: 1 });
const MAX_GATEWAYS_BEFORE_CHUNK = 1000;
const MAX_DOCUMENT_QUERY_REQUESTS_PER_INVOCATION = 20;
const MAX_DOCUMENT_RETRIEVAL_REQUESTS_PER_INVOCATION = 20;
const MAX_DOCUMENT_RETRIEVAL_REQUESTS_PER_INVOCATION = 10;

const iheGatewayV2OutboundPatientDiscoveryLambdaName = "IHEGatewayV2OutboundPatientDiscoveryLambda";
const iheGatewayV2OutboundDocumentQueryLambdaName = "IHEGatewayV2OutboundDocumentQueryLambda";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,53 @@ import { createAndSignBulkDQRequests, SignedDqRequest } from "./outbound/xca/cre
import { createAndSignBulkDRRequests, SignedDrRequest } from "./outbound/xca/create/iti39-envelope";
import { processDqResponse } from "./outbound/xca/process/dq-response";
import { processDrResponse } from "./outbound/xca/process/dr-response";
import { isRetryable } from "./outbound/xca/process/error";
import { isRetryable as isRetryableXca } from "./outbound/xca/process/error";
import { isRetryable as isRetryableXcpd } from "./outbound/xcpd/process/error";
import { sendSignedDqRequest } from "./outbound/xca/send/dq-requests";
import { sendSignedDrRequest } from "./outbound/xca/send/dr-requests";
import { createAndSignBulkXCPDRequests } from "./outbound/xcpd/create/iti55-envelope";
import {
createAndSignBulkXCPDRequests,
SignedXcpdRequest,
} from "./outbound/xcpd/create/iti55-envelope";
import { processXCPDResponse } from "./outbound/xcpd/process/xcpd-response";
import { sendSignedXCPDRequests } from "./outbound/xcpd/send/xcpd-requests";
import { sendSignedXcpdRequest } from "./outbound/xcpd/send/xcpd-requests";
import { SamlCertsAndKeys } from "./saml/security/types";

export async function sendProcessRetryXcpdRequest({
signedRequest,
samlCertsAndKeys,
patientId,
cxId,
index,
}: {
signedRequest: SignedXcpdRequest;
samlCertsAndKeys: SamlCertsAndKeys;
patientId: string;
cxId: string;
index: number;
}): Promise<OutboundPatientDiscoveryResp> {
async function sendProcessXcpdRequest() {
const response = await sendSignedXcpdRequest({
request: signedRequest,
samlCertsAndKeys,
patientId,
cxId,
index,
});
return processXCPDResponse({
xcpdResponse: response,
patientId,
cxId,
});
}

return await executeWithRetries(sendProcessXcpdRequest, {
initialDelay: 3000,
maxAttempts: 3,
shouldRetry: isRetryableXcpd,
});
}

export async function sendProcessRetryDqRequest({
signedRequest,
samlCertsAndKeys,
Expand Down Expand Up @@ -50,7 +89,7 @@ export async function sendProcessRetryDqRequest({
return await executeWithRetries(sendProcessDqRequest, {
initialDelay: 3000,
maxAttempts: 3,
shouldRetry: isRetryable,
shouldRetry: isRetryableXca,
});
}

Expand Down Expand Up @@ -83,7 +122,7 @@ export async function sendProcessRetryDrRequest({
return await executeWithRetries(sendProcessDrRequest, {
initialDelay: 3000,
maxAttempts: 3,
shouldRetry: isRetryable,
shouldRetry: isRetryableXca,
});
}

Expand All @@ -101,20 +140,14 @@ export async function createSignSendProcessXCPDRequest({
cxId: string;
}): Promise<void> {
const signedRequests = createAndSignBulkXCPDRequests(xcpdRequest, samlCertsAndKeys);
const responses = await sendSignedXCPDRequests({
signedRequests,
samlCertsAndKeys,
patientId,
cxId,
});
const results: OutboundPatientDiscoveryResp[] = responses.map(response => {
return processXCPDResponse({
xcpdResponse: response,
const resultPromises = signedRequests.map(async (signedRequest, index) => {
const result = await sendProcessRetryXcpdRequest({
signedRequest,
samlCertsAndKeys,
patientId,
cxId,
index,
});
});
for (const result of results) {
try {
await executeWithNetworkRetries(async () => axios.post(pdResponseUrl, result));
} catch (error) {
Expand All @@ -127,7 +160,9 @@ export async function createSignSendProcessXCPDRequest({
},
});
}
}
});

await Promise.allSettled(resultPromises);
}

export async function createSignSendProcessDqRequests({
Expand All @@ -149,23 +184,13 @@ export async function createSignSendProcessDqRequests({
});

const resultPromises = signedRequests.map(async (signedRequest, index) => {
return sendProcessRetryDqRequest({
const result = await sendProcessRetryDqRequest({
signedRequest,
samlCertsAndKeys,
patientId,
cxId,
index,
});
});

const results = await Promise.allSettled(resultPromises);
const successfulResults = results
.filter(
(result): result is PromiseFulfilledResult<OutboundDocumentQueryResp> =>
result.status === "fulfilled"
)
.map(result => result.value);
for (const result of successfulResults) {
try {
await executeWithNetworkRetries(async () => axios.post(dqResponseUrl, result));
} catch (error) {
Expand All @@ -178,7 +203,9 @@ export async function createSignSendProcessDqRequests({
},
});
}
}
});

await Promise.allSettled(resultPromises);
}

export async function createSignSendProcessDrRequests({
Expand All @@ -200,24 +227,13 @@ export async function createSignSendProcessDrRequests({
});

const resultPromises = signedRequests.map(async (signedRequest, index) => {
return sendProcessRetryDrRequest({
const result = await sendProcessRetryDrRequest({
signedRequest,
samlCertsAndKeys,
patientId,
cxId,
index,
});
});

const results = await Promise.allSettled(resultPromises);
const successfulResults = results
.filter(
(result): result is PromiseFulfilledResult<OutboundDocumentRetrievalResp> =>
result.status === "fulfilled"
)
.map(result => result.value);

for (const result of successfulResults) {
try {
await executeWithNetworkRetries(async () => axios.post(drResponseUrl, result));
} catch (error) {
Expand All @@ -230,5 +246,7 @@ export async function createSignSendProcessDrRequests({
},
});
}
}
});

await Promise.allSettled(resultPromises);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import { capture } from "../../../../../../util/notifications";
import { RegistryError, RegistryErrorList } from "./schema";

const { log } = out("XCA Error Handling");
const knownNonRetryableErrors = ["No active consent for patient id"];
const knownNonRetryableErrors = [
"No active consent for patient id",
"Failed to find document with unique ID",
];

export function processRegistryErrorList(
registryErrorList: RegistryErrorList,
Expand Down Expand Up @@ -193,13 +196,16 @@ export function handleSchemaErrorResponse({
* Retries if the response has an error that is not in the known non-retryable errors list
* Will not retry if the response is successful and is not an error.
*/
export function isRetryable(outboundResponse: OutboundDocumentRetrievalResp | undefined): boolean {
export function isRetryable(
outboundResponse: OutboundDocumentRetrievalResp | OutboundDocumentQueryResp | undefined
): boolean {
if (!outboundResponse) return false;
return (
outboundResponse.operationOutcome?.issue.some(
issue =>
issue.severity === "error" &&
issue.code !== "http-error" &&
issue.code !== "schema-error" &&
!knownNonRetryableErrors.some(
nonRetryableError =>
"text" in issue.details && issue.details.text.includes(nonRetryableError)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { requiresUrnInSoapBody, getHomeCommunityId } from "../../../gateways";

const DATE_DASHES_REGEX = /-/g;
const action = "urn:hl7-org:v3:PRPA_IN201305UV02:CrossGatewayPatientDiscovery";
export type BulkSignedXCPD = {
export type SignedXcpdRequest = {
gateway: XCPDGateway;
signedRequest: string;
outboundRequest: OutboundPatientDiscoveryReq;
Expand Down Expand Up @@ -340,8 +340,8 @@ export function createITI5SoapEnvelope({
export function createAndSignBulkXCPDRequests(
bulkBodyData: OutboundPatientDiscoveryReq,
samlCertsAndKeys: SamlCertsAndKeys
): BulkSignedXCPD[] {
const signedRequests: BulkSignedXCPD[] = [];
): SignedXcpdRequest[] {
const signedRequests: SignedXcpdRequest[] = [];

for (const gateway of bulkBodyData.gateways) {
const bodyData: OutboundPatientDiscoveryReq = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,17 @@ export function handleSchemaErrorResponse({
};
return response;
}

/**
* Retries if the response has an error that is not in the known non-retryable errors list
* Will not retry if the response is successful and is not an error.
*/
export function isRetryable(outboundResponse: OutboundPatientDiscoveryResp | undefined): boolean {
if (!outboundResponse) return false;
return (
outboundResponse.operationOutcome?.issue.some(
issue =>
issue.severity === "error" && issue.code !== "http-error" && issue.code !== "schema-error"
) ?? false
);
}
Loading

0 comments on commit 858a904

Please sign in to comment.