Skip to content

Commit

Permalink
added batch job related attributes to OpenTelemetry span (#5972)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrdnctrk authored Jun 6, 2024
1 parent 00a6591 commit 48c387e
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import jakarta.annotation.Nonnull;
Expand Down Expand Up @@ -65,6 +66,13 @@
public abstract class BaseInterceptorService<POINTCUT extends Enum<POINTCUT> & IPointcut>
implements IBaseInterceptorService<POINTCUT>, IBaseInterceptorBroadcaster<POINTCUT> {
private static final Logger ourLog = LoggerFactory.getLogger(BaseInterceptorService.class);
private static final AttributeKey<String> OTEL_INTERCEPTOR_POINTCUT_NAME_ATT_KEY =
AttributeKey.stringKey("hapifhir.interceptor.pointcut_name");
private static final AttributeKey<String> OTEL_INTERCEPTOR_CLASS_NAME_ATT_KEY =
AttributeKey.stringKey("hapifhir.interceptor.class_name");
private static final AttributeKey<String> OTEL_INTERCEPTOR_METHOD_NAME_ATT_KEY =
AttributeKey.stringKey("hapifhir.interceptor.method_name");

private final List<Object> myInterceptors = new ArrayList<>();
private final ListMultimap<POINTCUT, BaseInvoker> myGlobalInvokers = ArrayListMultimap.create();
private final ListMultimap<POINTCUT, BaseInvoker> myAnonymousInvokers = ArrayListMultimap.create();
Expand Down Expand Up @@ -573,11 +581,11 @@ Object invoke(HookParams theParams) {
private Object invokeMethod(Object[] args) throws InvocationTargetException, IllegalAccessException {
// Add attributes to the opentelemetry span
Span currentSpan = Span.current();
currentSpan.setAttribute("hapifhir.interceptor.pointcut_name", myPointcut.name());
currentSpan.setAttribute(OTEL_INTERCEPTOR_POINTCUT_NAME_ATT_KEY, myPointcut.name());
currentSpan.setAttribute(
"hapifhir.interceptor.class_name",
OTEL_INTERCEPTOR_CLASS_NAME_ATT_KEY,
myMethod.getDeclaringClass().getName());
currentSpan.setAttribute("hapifhir.interceptor.method_name", myMethod.getName());
currentSpan.setAttribute(OTEL_INTERCEPTOR_METHOD_NAME_ATT_KEY, myMethod.getName());

return myMethod.invoke(getInterceptor(), args);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
type: add
issue: 5980
title: "When processing a batch job, OpenTelemetry spans named `hapifhir.batch_job.execute` are now generated by the
worker threads. These spans have the following span attributes related to the batch job: `hapifhir.batch_job.definition_id`,
`hapifhir.batch_job.definition_version`, `hapifhir.batch_job.instance_id`, `hapifhir.batch_job.step_id`, `hapifhir.batch_job.chunk_id`."
6 changes: 6 additions & 0 deletions hapi-fhir-storage-batch2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@
<artifactId>jakarta.el</artifactId>
</dependency>

<!-- OpenTelemetry -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>

<!-- test -->
<dependency>
<groupId>org.awaitility</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import jakarta.annotation.Nonnull;
import org.slf4j.Logger;

import java.util.Date;

import static ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils.JOB_STEP_EXECUTION_SPAN_NAME;

public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson> {
private static final Logger ourLog = Logs.getBatchTroubleshootingLog();

Expand Down Expand Up @@ -67,7 +71,16 @@ public class JobStepExecutor<PT extends IModelJson, IT extends IModelJson, OT ex
myJobInstanceStatusUpdater = new JobInstanceStatusUpdater(theJobDefinitionRegistry);
}

@WithSpan(JOB_STEP_EXECUTION_SPAN_NAME)
public void executeStep() {

BatchJobOpenTelemetryUtils.addAttributesToCurrentSpan(
myInstance.getJobDefinitionId(),
myInstance.getJobDefinitionVersion(),
myInstance.getInstanceId(),
myCursor.getCurrentStepId(),
myWorkChunk == null ? null : myWorkChunk.getId());

JobStepExecutorOutput<PT, IT, OT> stepExecutorOutput =
myJobExecutorSvc.doExecution(myCursor, myInstance, myWorkChunk);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.batch2.model.WorkChunkStatusEnum;
import ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.jpa.model.sched.HapiJob;
import ca.uhn.fhir.jpa.model.sched.IHasScheduledJobs;
Expand All @@ -41,6 +42,7 @@
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import jakarta.annotation.Nonnull;
import org.apache.commons.lang3.time.DateUtils;
import org.quartz.JobExecutionContext;
Expand Down Expand Up @@ -70,6 +72,7 @@
import static ca.uhn.fhir.batch2.model.StatusEnum.ERRORED;
import static ca.uhn.fhir.batch2.model.StatusEnum.FINALIZE;
import static ca.uhn.fhir.batch2.model.StatusEnum.IN_PROGRESS;
import static ca.uhn.fhir.batch2.util.BatchJobOpenTelemetryUtils.JOB_STEP_EXECUTION_SPAN_NAME;

public class ReductionStepExecutorServiceImpl implements IReductionStepExecutorService, IHasScheduledJobs {
public static final String SCHEDULED_JOB_ID = ReductionStepExecutorScheduledJob.class.getName();
Expand Down Expand Up @@ -159,10 +162,18 @@ public void reducerPass() {
}

@VisibleForTesting
@WithSpan(JOB_STEP_EXECUTION_SPAN_NAME)
<PT extends IModelJson, IT extends IModelJson, OT extends IModelJson>
ReductionStepChunkProcessingResponse executeReductionStep(
String theInstanceId, JobWorkCursor<PT, IT, OT> theJobWorkCursor) {

BatchJobOpenTelemetryUtils.addAttributesToCurrentSpan(
theJobWorkCursor.getJobDefinition().getJobDefinitionId(),
theJobWorkCursor.getJobDefinition().getJobDefinitionVersion(),
theInstanceId,
theJobWorkCursor.getCurrentStepId(),
null);

JobDefinitionStep<PT, IT, OT> step = theJobWorkCursor.getCurrentStep();

// wipmb For 6.8 - this runs four tx. That's at least 2 too many
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*-
* #%L
* HAPI FHIR JPA Server - Batch2 Task Processor
* %%
* Copyright (C) 2014 - 2024 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.batch2.util;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import jakarta.annotation.Nullable;

public class BatchJobOpenTelemetryUtils {

public static final String JOB_STEP_EXECUTION_SPAN_NAME = "hapifhir.batch_job.execute";
private static final AttributeKey<String> OTEL_JOB_DEF_ID_ATT_KEY =
AttributeKey.stringKey("hapifhir.batch_job.definition_id");
private static final AttributeKey<String> OTEL_JOB_DEF_VER_ATT_KEY =
AttributeKey.stringKey("hapifhir.batch_job.definition_version");
private static final AttributeKey<String> OTEL_JOB_STEP_ID_ATT_KEY =
AttributeKey.stringKey("hapifhir.batch_job.step_id");
private static final AttributeKey<String> OTEL_JOB_INSTANCE_ID_ATT_KEY =
AttributeKey.stringKey("hapifhir.batch_job.instance_id");
private static final AttributeKey<String> OTEL_JOB_CHUNK_ID_ATT_KEY =
AttributeKey.stringKey("hapifhir.batch_job.chunk_id");

private BatchJobOpenTelemetryUtils() {}

public static void addAttributesToCurrentSpan(
String theJobDefinitionId,
int theJobDefinitionVersion,
String theJobInstanceId,
String theStepId,
@Nullable String theChunkId) {

Span currentSpan = Span.current();
AttributesBuilder attBuilder = Attributes.builder()
.put(OTEL_JOB_DEF_ID_ATT_KEY, theJobDefinitionId)
.put(OTEL_JOB_DEF_VER_ATT_KEY, Integer.toString(theJobDefinitionVersion))
.put(OTEL_JOB_STEP_ID_ATT_KEY, theStepId)
.put(OTEL_JOB_INSTANCE_ID_ATT_KEY, theJobInstanceId)
// it is ok to put null values, builder ignores them
.put(OTEL_JOB_CHUNK_ID_ATT_KEY, theChunkId);

currentSpan.setAllAttributes(attBuilder.build());
}
}

0 comments on commit 48c387e

Please sign in to comment.