Skip to content

Commit

Permalink
fix-3885 propagate traceId through asynchronous jobs (medplum#3886)
Browse files Browse the repository at this point in the history
* support `Sentry-Trace` header

* update doc

* traceparent parser allowing some divergence from the spec and maintain full traceparent in logs

* allow some grace on flags format

* feedback

* fix-3885 propagate traceId through asynchronous jobs

* propagate traceId to vmcontext and lambda bot executions

* fix tests

---------

Co-authored-by: Cody Ebberson <[email protected]>
  • Loading branch information
dillonstreator and codyebberson committed Feb 8, 2024
1 parent 1182202 commit d89db56
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 261 deletions.
4 changes: 2 additions & 2 deletions packages/server/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ export class AuthenticatedRequestContext extends RequestContext {
this.repo.close();
}

static system(): AuthenticatedRequestContext {
static system(ctx?: { requestId?: string; traceId?: string }): AuthenticatedRequestContext {
const systemLogger = new Logger(write, undefined, LogLevel.ERROR);
return new AuthenticatedRequestContext(
new RequestContext('', ''),
new RequestContext(ctx?.requestId ?? '', ctx?.traceId ?? ''),
{} as unknown as Login,
{} as unknown as Project,
{} as unknown as ProjectMembership,
Expand Down
7 changes: 5 additions & 2 deletions packages/server/src/fhir/operations/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface BotExecutionRequest {
readonly remoteAddress?: string;
readonly forwardedFor?: string;
readonly requestTime?: string;
readonly traceId?: string;
}

export interface BotExecutionResult {
Expand Down Expand Up @@ -269,7 +270,7 @@ async function writeBotInputToStorage(request: BotExecutionRequest): Promise<voi
* @returns The bot execution result.
*/
async function runInLambda(request: BotExecutionRequest): Promise<BotExecutionResult> {
const { bot, runAs, input, contentType } = request;
const { bot, runAs, input, contentType, traceId } = request;
const config = getConfig();
const accessToken = await getBotAccessToken(runAs);
const secrets = await getBotSecrets(bot);
Expand All @@ -282,6 +283,7 @@ async function runInLambda(request: BotExecutionRequest): Promise<BotExecutionRe
input: input instanceof Hl7Message ? input.toString() : input,
contentType,
secrets,
traceId,
};

// Build the command
Expand Down Expand Up @@ -372,7 +374,7 @@ function parseLambdaLog(logResult: string): string {
* @returns The bot execution result.
*/
async function runInVmContext(request: BotExecutionRequest): Promise<BotExecutionResult> {
const { bot, runAs, input, contentType } = request;
const { bot, runAs, input, contentType, traceId } = request;

const config = getConfig();
if (!config.vmContextBotsEnabled) {
Expand Down Expand Up @@ -409,6 +411,7 @@ async function runInVmContext(request: BotExecutionRequest): Promise<BotExecutio
input: input instanceof Hl7Message ? input.toString() : input,
contentType,
secrets,
traceId,
},
};

Expand Down
4 changes: 2 additions & 2 deletions packages/server/src/test.setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,6 @@ export async function waitForAsyncJob(contentLocation: string, app: Express, acc
throw new Error('Async Job did not complete');
}

export function withTestContext<T>(fn: () => T): T {
return requestContextStore.run(AuthenticatedRequestContext.system(), fn);
export function withTestContext<T>(fn: () => T, ctx?: { requestId?: string; traceId?: string }): T {
return requestContextStore.run(AuthenticatedRequestContext.system(ctx), fn);
}
10 changes: 5 additions & 5 deletions packages/server/src/traceparent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,23 @@ describe('parseTraceparent', () => {
expect(parseTraceparent(`${tp.version}-${tp.traceId}-${tp.parentId}-${tp.flags}`)).toEqual(tp);
});

it('allow missing version', () => {
it('allows missing version', () => {
expect(parseTraceparent(`${tp.traceId}-${tp.parentId}-${tp.flags}`)).toEqual({ ...tp, version: undefined });
});

it('allow missing flags', () => {
it('allows missing flags', () => {
expect(parseTraceparent(`${tp.version}-${tp.traceId}-${tp.parentId}`)).toEqual({ ...tp, flags: undefined });
});

it('allow missing version and flags', () => {
it('allows missing version and flags', () => {
expect(parseTraceparent(`${tp.traceId}-${tp.parentId}`)).toEqual({ ...tp, version: undefined, flags: undefined });
});

it('allow 1 character for flags', () => {
it('allows 1 character for flags', () => {
expect(parseTraceparent(`${tp.traceId}-${tp.parentId}-1`)).toEqual({ ...tp, version: undefined, flags: '1' });
});

it('no more than 2 characters for flags', () => {
it('returns null for more than 2 characters for flags', () => {
expect(parseTraceparent(`${tp.traceId}-${tp.parentId}-001`)).toEqual(null);
});

Expand Down
80 changes: 44 additions & 36 deletions packages/server/src/workers/download.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,45 +37,53 @@ describe('Download Worker', () => {
});

test('Download external URL', () =>
withTestContext(async () => {
const url = 'https://example.com/download';

const queue = getDownloadQueue() as any;
queue.add.mockClear();

const media = await repo.createResource<Media>({
resourceType: 'Media',
status: 'completed',
content: {
contentType: ContentType.TEXT,
url,
},
});
expect(media).toBeDefined();
expect(queue.add).toHaveBeenCalled();

const body = new Readable();
body.push('foo');
body.push(null);

(fetch as unknown as jest.Mock).mockImplementation(() => ({
status: 200,
headers: {
get(name: string): string | undefined {
return {
'content-disposition': 'attachment; filename=download',
'content-type': ContentType.TEXT,
}[name];
withTestContext(
async () => {
const url = 'https://example.com/download';

const queue = getDownloadQueue() as any;
queue.add.mockClear();

const media = await repo.createResource<Media>({
resourceType: 'Media',
status: 'completed',
content: {
contentType: ContentType.TEXT,
url,
},
},
body,
}));
});
expect(media).toBeDefined();
expect(queue.add).toHaveBeenCalled();

const body = new Readable();
body.push('foo');
body.push(null);

(fetch as unknown as jest.Mock).mockImplementation(() => ({
status: 200,
headers: {
get(name: string): string | undefined {
return {
'content-disposition': 'attachment; filename=download',
'content-type': ContentType.TEXT,
}[name];
},
},
body,
}));

const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job;
await execDownloadJob(job);
const job = { id: 1, data: queue.add.mock.calls[0][1] } as unknown as Job;
await execDownloadJob(job);

expect(fetch).toHaveBeenCalledWith(url);
}));
expect(fetch).toHaveBeenCalledWith(url, {
headers: {
'x-trace-id': '00-12345678901234567890123456789012-3456789012345678-01',
traceparent: '00-12345678901234567890123456789012-3456789012345678-01',
},
});
},
{ traceId: '00-12345678901234567890123456789012-3456789012345678-01' }
));

test('Ignore media missing URL', () =>
withTestContext(async () => {
Expand Down
32 changes: 26 additions & 6 deletions packages/server/src/workers/download.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import { Job, Queue, QueueBaseOptions, Worker } from 'bullmq';
import fetch from 'node-fetch';
import { Readable } from 'stream';
import { getConfig, MedplumServerConfig } from '../config';
import { getRequestContext } from '../context';
import { getRequestContext, RequestContext, requestContextStore } from '../context';
import { getSystemRepo } from '../fhir/repo';
import { getBinaryStorage } from '../fhir/storage';
import { globalLogger } from '../logger';
import { parseTraceparent } from '../traceparent';

/*
* The download worker inspects resources,
Expand All @@ -24,6 +25,8 @@ export interface DownloadJobData {
readonly resourceType: string;
readonly id: string;
readonly url: string;
readonly requestId: string;
readonly traceId: string;
}

const queueName = 'DownloadQueue';
Expand Down Expand Up @@ -53,10 +56,15 @@ export function initDownloadWorker(config: MedplumServerConfig): void {
},
});

worker = new Worker<DownloadJobData>(queueName, execDownloadJob, {
...defaultOptions,
...config.bullmq,
});
worker = new Worker<DownloadJobData>(
queueName,
(job) =>
requestContextStore.run(new RequestContext(job.data.requestId, job.data.traceId), () => execDownloadJob(job)),
{
...defaultOptions,
...config.bullmq,
}
);
worker.on('completed', (job) => globalLogger.info(`Completed job ${job.id} successfully`));
worker.on('failed', (job, err) => globalLogger.info(`Failed job ${job?.id} with ${err}`));
}
Expand Down Expand Up @@ -102,12 +110,15 @@ export function getDownloadQueue(): Queue<DownloadJobData> | undefined {
* @param resource - The resource that was created or updated.
*/
export async function addDownloadJobs(resource: Resource): Promise<void> {
const ctx = getRequestContext();
for (const attachment of getAttachments(resource)) {
if (isExternalUrl(attachment.url)) {
await addDownloadJobData({
resourceType: resource.resourceType,
id: resource.id as string,
url: attachment.url,
requestId: ctx.requestId,
traceId: ctx.traceId,
});
}
}
Expand Down Expand Up @@ -172,9 +183,18 @@ export async function execDownloadJob(job: Job<DownloadJobData>): Promise<void>
return;
}

const headers: HeadersInit = {};
const traceId = job.data.traceId;
headers['x-trace-id'] = traceId;
if (parseTraceparent(traceId)) {
headers['traceparent'] = traceId;
}

try {
ctx.logger.info('Requesting content at: ' + url);
const response = await fetch(url);
const response = await fetch(url, {
headers,
});

ctx.logger.info('Received status: ' + response.status);
if (response.status >= 400) {
Expand Down
Loading

0 comments on commit d89db56

Please sign in to comment.