Skip to content

Commit

Permalink
feat: enable unit testing subscription criteria (medplum#4581)
Browse files Browse the repository at this point in the history
* feat: enable unit testing subscription criteria

* Organized imports

---------

Co-authored-by: Cody Ebberson <[email protected]>
  • Loading branch information
dillonstreator and codyebberson committed May 27, 2024
1 parent 41005df commit 9f3bb56
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 99 deletions.
60 changes: 58 additions & 2 deletions packages/core/src/subscriptions/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { Bundle, Communication, Parameters, SubscriptionStatus } from '@medplum/fhirtypes';
import { Bundle, Communication, Parameters, Subscription, SubscriptionStatus } from '@medplum/fhirtypes';
import WS from 'jest-websocket-mock';
import { RobustWebSocket, SubscriptionEmitter, SubscriptionEventMap, SubscriptionManager } from '.';
import {
RobustWebSocket,
SubscriptionEmitter,
SubscriptionEventMap,
SubscriptionManager,
resourceMatchesSubscriptionCriteria,
} from '.';
import { MockMedplumClient } from '../client-test-utils';
import { generateId } from '../crypto';
import { OperationOutcomeError } from '../outcomes';
Expand Down Expand Up @@ -749,3 +755,53 @@ describe('SubscriptionManager', () => {
});
});
});

describe('resourceMatchesSubscriptionCriteria', () => {
it('should return true for a resource that matches the criteria', async () => {
const subscription: Subscription = {
resourceType: 'Subscription',
status: 'active',
reason: 'test subscription',
criteria: 'Communication',
channel: {
type: 'rest-hook',
endpoint: 'Bot/123',
},
extension: [
{
url: 'https://medplum.com/fhir/StructureDefinition/fhir-path-criteria-expression',
valueString: '%previous.status = "in-progress" and %current.status = "completed"',
},
{
url: 'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction',
valueCode: 'update',
},
],
};

const result1 = await resourceMatchesSubscriptionCriteria({
resource: {
resourceType: 'Communication',
status: 'in-progress',
},
subscription,
context: { interaction: 'create' },
getPreviousResource: async () => undefined,
});
expect(result1).toBe(false);

const result2 = await resourceMatchesSubscriptionCriteria({
resource: {
resourceType: 'Communication',
status: 'completed',
},
subscription,
context: { interaction: 'update' },
getPreviousResource: async () => ({
resourceType: 'Communication',
status: 'in-progress',
}),
});
expect(result2).toBe(true);
});
});
122 changes: 120 additions & 2 deletions packages/core/src/subscriptions/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import { Bundle, Parameters, Subscription, SubscriptionStatus } from '@medplum/fhirtypes';
import { Bundle, Parameters, Subscription, SubscriptionStatus, Resource } from '@medplum/fhirtypes';
import { MedplumClient } from '../client';
import { TypedEventTarget } from '../eventtarget';
import { OperationOutcomeError, serverError, validationError } from '../outcomes';
import { ProfileResource, getReferenceString, resolveId } from '../utils';
import { ProfileResource, getExtension, getReferenceString, resolveId } from '../utils';
import { Logger } from '../logger';
import { matchesSearchRequest } from '../search/match';
import { toTypedValue } from '../fhirpath/utils';
import { evalFhirPathTyped } from '../fhirpath/parse';
import { parseSearchRequest } from '../search/search';

export type SubscriptionEventMap = {
connect: { type: 'connect'; payload: { subscriptionId: string } };
Expand Down Expand Up @@ -355,3 +360,116 @@ export class SubscriptionManager {
return this.masterSubEmitter;
}
}

export type BackgroundJobInteraction = 'create' | 'update' | 'delete';

export interface BackgroundJobContext {
interaction: BackgroundJobInteraction;
}

type ResourceMatchesSubscriptionCriteria = {
resource: Resource;
subscription: Subscription;
context: BackgroundJobContext;
logger?: Logger;
getPreviousResource: (currentResource: Resource) => Promise<Resource | undefined>;
};

export async function resourceMatchesSubscriptionCriteria({
resource,
subscription,
context,
getPreviousResource,
logger,
}: ResourceMatchesSubscriptionCriteria): Promise<boolean> {
if (subscription.meta?.account && resource.meta?.account?.reference !== subscription.meta.account.reference) {
logger?.debug('Ignore resource in different account compartment');
return false;
}

if (!matchesChannelType(subscription, logger)) {
logger?.debug(`Ignore subscription without recognized channel type`);
return false;
}

const subscriptionCriteria = subscription.criteria;
if (!subscriptionCriteria) {
logger?.debug(`Ignore rest hook missing criteria`);
return false;
}

const searchRequest = parseSearchRequest(subscriptionCriteria);
if (resource.resourceType !== searchRequest.resourceType) {
logger?.debug(
`Ignore rest hook for different resourceType (wanted "${searchRequest.resourceType}", received "${resource.resourceType}")`
);
return false;
}

const fhirPathCriteria = await isFhirCriteriaMet(subscription, resource, getPreviousResource);
if (!fhirPathCriteria) {
logger?.debug(`Ignore rest hook for criteria returning false`);
return false;
}

const supportedInteractionExtension = getExtension(
subscription,
'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction'
);
if (supportedInteractionExtension && supportedInteractionExtension.valueCode !== context.interaction) {
logger?.debug(
`Ignore rest hook for different interaction (wanted "${supportedInteractionExtension.valueCode}", received "${context.interaction}")`
);
return false;
}

return matchesSearchRequest(resource, searchRequest);
}

/**
* Returns true if the subscription channel type is ok to execute.
* @param subscription - The subscription resource.
* @param logger - The logger.
* @returns True if the subscription channel type is ok to execute.
*/
function matchesChannelType(subscription: Subscription, logger?: Logger): boolean {
const channelType = subscription.channel?.type;

if (channelType === 'rest-hook') {
const url = subscription.channel?.endpoint;
if (!url) {
logger?.debug(`Ignore rest-hook missing URL`);
return false;
}

return true;
}

if (channelType === 'websocket') {
return true;
}

return false;
}

export async function isFhirCriteriaMet(
subscription: Subscription,
currentResource: Resource,
getPreviousResource: (currentResource: Resource) => Promise<Resource | undefined>
): Promise<boolean> {
const criteria = getExtension(
subscription,
'https://medplum.com/fhir/StructureDefinition/fhir-path-criteria-expression'
);
if (!criteria?.valueString) {
return true;
}
const previous = await getPreviousResource(currentResource);
const evalInput = {
'%current': toTypedValue(currentResource),
'%previous': toTypedValue(previous ?? {}),
};
const evalValue = evalFhirPathTyped(criteria.valueString, [toTypedValue(currentResource)], evalInput);
console.log(evalValue);
return evalValue?.[0]?.value === true;
}
5 changes: 0 additions & 5 deletions packages/server/src/workers/context.ts

This file was deleted.

2 changes: 1 addition & 1 deletion packages/server/src/workers/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Resource } from '@medplum/fhirtypes';
import { BackgroundJobContext } from '@medplum/core';
import { MedplumServerConfig } from '../config';
import { globalLogger } from '../logger';
import { BackgroundJobContext } from './context';
import { addCronJobs, closeCronWorker, initCronWorker } from './cron';
import { addDownloadJobs, closeDownloadWorker, initDownloadWorker } from './download';
import { addSubscriptionJobs, closeSubscriptionWorker, initSubscriptionWorker } from './subscription';
Expand Down
80 changes: 10 additions & 70 deletions packages/server/src/workers/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import {
AccessPolicyInteraction,
BackgroundJobContext,
BackgroundJobInteraction,
ContentType,
OperationOutcomeError,
Operator,
Expand All @@ -9,9 +11,8 @@ import {
getReferenceString,
isGone,
isNotFound,
matchesSearchRequest,
normalizeOperationOutcome,
parseSearchRequest,
resourceMatchesSubscriptionCriteria,
satisfiedAccessPolicy,
serverError,
stringify,
Expand All @@ -30,8 +31,7 @@ import { getRedis } from '../redis';
import { createSubEventNotification } from '../subscriptions/websockets';
import { parseTraceparent } from '../traceparent';
import { AuditEventOutcome } from '../util/auditevent';
import { BackgroundJobContext, BackgroundJobInteraction } from './context';
import { createAuditEvent, findProjectMembership, isFhirCriteriaMet, isJobSuccessful } from './utils';
import { createAuditEvent, findProjectMembership, getPreviousResource, isJobSuccessful } from './utils';

/**
* The upper limit on the number of times a job can be retried.
Expand Down Expand Up @@ -270,73 +270,13 @@ async function matchesCriteria(
context: BackgroundJobContext
): Promise<boolean> {
const ctx = getRequestContext();
if (subscription.meta?.account && resource.meta?.account?.reference !== subscription.meta.account.reference) {
ctx.logger.debug('Ignore resource in different account compartment');
return false;
}

if (!matchesChannelType(subscription)) {
ctx.logger.debug(`Ignore subscription without recognized channel type`);
return false;
}

const subscriptionCriteria = subscription.criteria;
if (!subscriptionCriteria) {
ctx.logger.debug(`Ignore rest hook missing criteria`);
return false;
}

const searchRequest = parseSearchRequest(subscriptionCriteria);
if (resource.resourceType !== searchRequest.resourceType) {
ctx.logger.debug(
`Ignore rest hook for different resourceType (wanted "${searchRequest.resourceType}", received "${resource.resourceType}")`
);
return false;
}

const fhirPathCriteria = await isFhirCriteriaMet(subscription, resource);
if (!fhirPathCriteria) {
ctx.logger.debug(`Ignore rest hook for criteria returning false`);
return false;
}

const supportedInteractionExtension = getExtension(
return resourceMatchesSubscriptionCriteria({
resource,
subscription,
'https://medplum.com/fhir/StructureDefinition/subscription-supported-interaction'
);
if (supportedInteractionExtension && supportedInteractionExtension.valueCode !== context.interaction) {
ctx.logger.debug(
`Ignore rest hook for different interaction (wanted "${supportedInteractionExtension.valueCode}", received "${context.interaction}")`
);
return false;
}

return matchesSearchRequest(resource, searchRequest);
}

/**
* Returns true if the subscription channel type is ok to execute.
* @param subscription - The subscription resource.
* @returns True if the subscription channel type is ok to execute.
*/
function matchesChannelType(subscription: Subscription): boolean {
const channelType = subscription.channel?.type;

if (channelType === 'rest-hook') {
const url = subscription.channel?.endpoint;
if (!url) {
getLogger().debug(`Ignore rest-hook missing URL`);
return false;
}

return true;
}

if (channelType === 'websocket') {
return true;
}

return false;
context,
logger: ctx.logger,
getPreviousResource: getPreviousResource,
});
}

/**
Expand Down
21 changes: 2 additions & 19 deletions packages/server/src/workers/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { createReference, evalFhirPathTyped, getExtension, isResource, Operator, toTypedValue } from '@medplum/core';
import { createReference, getExtension, isResource, Operator } from '@medplum/core';
import {
AuditEvent,
AuditEventEntity,
Expand Down Expand Up @@ -109,24 +109,7 @@ export function getAuditEventEntityRole(resource: Resource): Coding {
}
}

export async function isFhirCriteriaMet(subscription: Subscription, currentResource: Resource): Promise<boolean> {
const criteria = getExtension(
subscription,
'https://medplum.com/fhir/StructureDefinition/fhir-path-criteria-expression'
);
if (!criteria?.valueString) {
return true;
}
const previous = await getPreviousResource(currentResource);
const evalInput = {
'%current': toTypedValue(currentResource),
'%previous': toTypedValue(previous ?? {}),
};
const evalValue = evalFhirPathTyped(criteria.valueString, [toTypedValue(currentResource)], evalInput);
return evalValue?.[0]?.value === true;
}

async function getPreviousResource(currentResource: Resource): Promise<Resource | undefined> {
export async function getPreviousResource(currentResource: Resource): Promise<Resource | undefined> {
const systemRepo = getSystemRepo();
const history = await systemRepo.readHistory(currentResource.resourceType, currentResource?.id as string);

Expand Down

0 comments on commit 9f3bb56

Please sign in to comment.