Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Attempt backport oldest first #4776

Open
wants to merge 12 commits into
base: ja_2030417_backport_subscription_opt_to_6_4
Choose a base branch
from
11 changes: 11 additions & 0 deletions hapi-fhir-base/src/main/java/ca/uhn/fhir/context/FhirContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -1141,12 +1141,23 @@ public String toString() {
}

// TODO KHS add the other primitive types
@Deprecated(since = "6.6.0", forRemoval = true)
public IPrimitiveType<Boolean> getPrimitiveBoolean(Boolean theValue) {
return newPrimitiveBoolean(theValue);
}

public IPrimitiveType<Boolean> newPrimitiveBoolean(Boolean theValue) {
IPrimitiveType<Boolean> retval = (IPrimitiveType<Boolean>) getElementDefinition("boolean").newInstance();
retval.setValue(theValue);
return retval;
}

public IPrimitiveType<String > newPrimitiveString(String theValue) {
IPrimitiveType<String> retval = (IPrimitiveType<String>) getElementDefinition("string").newInstance();
retval.setValue(theValue);
return retval;
}

private static boolean tryToInitParser(Runnable run) {
boolean retVal;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2618,6 +2618,33 @@ public enum Pointcut implements IPointcut {
"ca.uhn.fhir.jpa.util.SqlQueryList"
),

/**
* <b> Binary Blob Prefix Assigning Hook:</b>
* <p>
* Immediately before a binary blob is stored to its eventual data sink, this hook is called.
* This hook allows implementers to provide a prefix to the binary blob's ID.
* This is helpful in cases where you want to identify this blob for later retrieval outside of HAPI-FHIR. Note that allowable characters will depend on the specific storage sink being used.
* <ul>
* <li>
* ca.uhn.fhir.rest.api.server.RequestDetails - A bean containing details about the request that is about to be processed, including details such as the
* resource type and logical ID (if any) and other FHIR-specific aspects of the request which have been
* pulled out of the servlet request. Note that the bean
* properties are not all guaranteed to be populated.
* </li>
* <li>
* org.hl7.fhir.instance.model.api.IBaseBinary - The binary resource that is about to be stored.
* </li>
* </ul>
* <p>
* Hooks should return <code>String</code>, which represents the full prefix to be applied to the blob.
* </p>
*/
STORAGE_BINARY_ASSIGN_BLOB_ID_PREFIX(String.class,
"ca.uhn.fhir.rest.api.server.RequestDetails",
"org.hl7.fhir.instance.model.api.IBaseResource"
),


/**
* This pointcut is used only for unit tests. Do not use in production code as it may be changed or
* removed at any time.
Expand Down
67 changes: 67 additions & 0 deletions hapi-fhir-base/src/main/java/ca/uhn/fhir/util/ProxyUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package ca.uhn.fhir.util;

/*-
* #%L
* HAPI FHIR - Core Library
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

import org.apache.commons.lang3.Validate;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

public class ProxyUtil {
private ProxyUtil() {}

/**
* Wrap theInstance in a Proxy that synchronizes every method.
*
* @param theClass the target interface
* @param theInstance the instance to wrap
* @return a Proxy implementing theClass interface that syncronizes every call on theInstance
* @param <T> the interface type
*/
public static <T> T synchronizedProxy(Class<T> theClass, T theInstance) {
Validate.isTrue(theClass.isInterface(), "%s is not an interface", theClass);
InvocationHandler handler = new SynchronizedHandler(theInstance);
Object object = Proxy.newProxyInstance(theClass.getClassLoader(), new Class<?>[] { theClass }, handler);
return theClass.cast(object);
}

/**
* Simple handler that first synchronizes on the delegate
*/
static class SynchronizedHandler implements InvocationHandler {
private final Object theDelegate;

SynchronizedHandler(Object theDelegate) {
this.theDelegate = theDelegate;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
synchronized (theDelegate) {
return method.invoke(theDelegate, args);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
type: perf
issue: 4622
title: "The batch system now reads less data during the maintenance pass. This avoids slowdowns on large systems."
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
type: change
issue: 4621
title: "Batch2 work-chunk processing now aligns transaction boundaries with event transitions."
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
type: fix
issue: 4643
title: "There was a transaction boundary issue in the Batch2 storage layer which resulted in the
framework needing more open database connections than necessary. This has been corrected."
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
type: fix
issue: 4647
title: "Batch job state transitions are are now transitionally safe."
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
type: add
issue: 4774
title: "Bulk Export now supports a new `_exportId` parameter. If provided, any Binary resources generated by this export will have an extension in their `binary.meta` field which identifies this export. This can be used to correlate exported resources with the export job that generated them.
In addition, the `binary.meta` field of Bulk Export-generated binaries will also contain the job ID of the export job that generated them, as well as the resource type of the data contained within the binary."
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
type: add
issue: 4774
title: "A new Pointcut called `STORAGE_BINARY_ASSIGN_BLOB_ID_PREFIX` has been added. This pointcut is called when a binary blob is about to be stored,
and allows implementers to attach a prefix to the blob ID before it is stored."
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@

```mermaid
---
title: Batch2 Job Instance state transitions
---
stateDiagram-v2
[*] --> QUEUED : on db create and queued on kakfa
QUEUED --> IN_PROGRESS : on any work-chunk received by worker
%% and (see ca.uhn.fhir.batch2.progress.InstanceProgress.getNewStatus())
state first_step_finished <<choice>>
IN_PROGRESS --> first_step_finished : When 1st step finishes
first_step_finished --> COMPLETED: if no chunks produced
first_step_finished --> IN_PROGRESS: chunks produced
IN_PROGRESS --> in_progress_poll : on poll \n(count acomplete/failed/errored chunks)
in_progress_poll --> COMPLETED : 0 failures, errored, or incomplete\n AND at least 1 chunk complete
in_progress_poll --> ERRORED : no failed but errored chunks
in_progress_poll --> FINALIZE : none failed, gated execution\n last step\n queue REDUCER chunk
in_progress_poll --> IN_PROGRESS : still work to do
%% ERRORED is just like IN_PROGRESS, but it is a one-way trip from IN_PROGRESS to ERRORED.
%% FIXME We could probably delete/merge this state with IS_PROCESS, and use the error count in the UI.
note left of ERRORED
Parallel to IS_PROCESS
end note
state in_progress_poll <<choice>>
state error_progress_poll <<choice>>
ERRORED --> error_progress_poll : on poll \n(count acomplete/failed/errored chunks)
error_progress_poll --> FAILED : any failed chunks
error_progress_poll --> ERRORED : no failed but errored chunks
error_progress_poll --> FINALIZE : none failed, gated execution\n last step\n queue REDUCER chunk
error_progress_poll --> COMPLETED : 0 failures, errored, or incomplete AND at least 1 chunk complete
state do_report <<choice>>
FINALIZE --> do_reduction: poll util worker marks REDUCER chunk yes or no.
do_reduction --> COMPLETED : success
do_reduction --> FAILED : fail
in_progress_poll --> FAILED : any failed chunks
```

```mermaid
---
title: Batch2 Job Work Chunk state transitions
---
stateDiagram-v2
state QUEUED
state on_receive <<choice>>
state IN_PROGRESS
state ERROR
state execute <<choice>>
state FAILED
state COMPLETED
direction LR
[*] --> QUEUED : on create

%% worker processing states
QUEUED --> on_receive : on deque by worker
on_receive --> IN_PROGRESS : start execution

IN_PROGRESS --> execute: execute
execute --> ERROR : on re-triable error
execute --> COMPLETED : success\n maybe trigger instance first_step_finished
execute --> FAILED : on unrecoverable \n or too many errors

%% temporary error state until retry
ERROR --> on_receive : exception rollback\n triggers redelivery

%% terminal states
COMPLETED --> [*]
FAILED --> [*]
```
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
package ca.uhn.fhir.jpa.util;

/*-
* #%L
* HAPI FHIR JPA Server
Expand All @@ -19,6 +17,7 @@
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.batch2;

import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
Expand All @@ -27,7 +26,7 @@

import javax.annotation.Nonnull;

public class JobInstanceUtil {
class JobInstanceUtil {

private JobInstanceUtil() {}

Expand Down Expand Up @@ -64,14 +63,44 @@ public static JobInstance fromEntityToInstance(@Nonnull Batch2JobInstanceEntity
return retVal;
}

/**
* Copies all JobInstance fields to a Batch2JobInstanceEntity
* @param theJobInstance the job
* @param theJobInstanceEntity the target entity
*/
public static void fromInstanceToEntity(@Nonnull JobInstance theJobInstance, @Nonnull Batch2JobInstanceEntity theJobInstanceEntity) {
theJobInstanceEntity.setId(theJobInstance.getInstanceId());
theJobInstanceEntity.setDefinitionId(theJobInstance.getJobDefinitionId());
theJobInstanceEntity.setDefinitionVersion(theJobInstance.getJobDefinitionVersion());
theJobInstanceEntity.setStatus(theJobInstance.getStatus());
theJobInstanceEntity.setCancelled(theJobInstance.isCancelled());
theJobInstanceEntity.setFastTracking(theJobInstance.isFastTracking());
theJobInstanceEntity.setStartTime(theJobInstance.getStartTime());
theJobInstanceEntity.setCreateTime(theJobInstance.getCreateTime());
theJobInstanceEntity.setEndTime(theJobInstance.getEndTime());
theJobInstanceEntity.setUpdateTime(theJobInstance.getUpdateTime());
theJobInstanceEntity.setCombinedRecordsProcessed(theJobInstance.getCombinedRecordsProcessed());
theJobInstanceEntity.setCombinedRecordsProcessedPerSecond(theJobInstance.getCombinedRecordsProcessedPerSecond());
theJobInstanceEntity.setTotalElapsedMillis(theJobInstance.getTotalElapsedMillis());
theJobInstanceEntity.setWorkChunksPurged(theJobInstance.isWorkChunksPurged());
theJobInstanceEntity.setProgress(theJobInstance.getProgress());
theJobInstanceEntity.setErrorMessage(theJobInstance.getErrorMessage());
theJobInstanceEntity.setErrorCount(theJobInstance.getErrorCount());
theJobInstanceEntity.setEstimatedTimeRemaining(theJobInstance.getEstimatedTimeRemaining());
theJobInstanceEntity.setParams(theJobInstance.getParameters());
theJobInstanceEntity.setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId());
theJobInstanceEntity.setReport(theJobInstance.getReport());
theJobInstanceEntity.setEstimatedTimeRemaining(theJobInstance.getEstimatedTimeRemaining());
}

/**
* Converts a Batch2WorkChunkEntity into a WorkChunk object
*
* @param theEntity - the entity to convert
* @param theIncludeData - whether or not to include the Data attached to the chunk
* @return - the WorkChunk object
*/
@Nonnull
public static WorkChunk fromEntityToWorkChunk(@Nonnull Batch2WorkChunkEntity theEntity, boolean theIncludeData) {
public static WorkChunk fromEntityToWorkChunk(@Nonnull Batch2WorkChunkEntity theEntity) {
WorkChunk retVal = new WorkChunk();
retVal.setId(theEntity.getId());
retVal.setSequence(theEntity.getSequence());
Expand All @@ -87,11 +116,8 @@ public static WorkChunk fromEntityToWorkChunk(@Nonnull Batch2WorkChunkEntity the
retVal.setErrorMessage(theEntity.getErrorMessage());
retVal.setErrorCount(theEntity.getErrorCount());
retVal.setRecordsProcessed(theEntity.getRecordsProcessed());
if (theIncludeData) {
if (theEntity.getSerializedData() != null) {
retVal.setData(theEntity.getSerializedData());
}
}
// note: may be null out if queried NoData
retVal.setData(theEntity.getSerializedData());
return retVal;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
package ca.uhn.fhir.jpa.batch2;

/*-
* #%L
* HAPI FHIR JPA Server
Expand All @@ -19,40 +17,47 @@
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.batch2;

import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.config.BaseBatch2Config;
import ca.uhn.fhir.batch2.coordinator.SynchronizedJobPersistenceWrapper;
import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.system.HapiSystemProperties;
import ca.uhn.fhir.util.ProxyUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.PlatformTransactionManager;

import javax.persistence.EntityManager;

@Configuration
@Import({
BulkExportJobConfig.class
})
public class JpaBatch2Config extends BaseBatch2Config {

@Bean
public IJobPersistence batch2JobInstancePersister(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager) {
return new JpaJobPersistenceImpl(theJobInstanceRepository, theWorkChunkRepository, theTransactionManager);
public IJobPersistence batch2JobInstancePersister(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, IHapiTransactionService theTransactionService, EntityManager theEntityManager) {
return new JpaJobPersistenceImpl(theJobInstanceRepository, theWorkChunkRepository, theTransactionService, theEntityManager);
}

@Primary
@Bean
public IJobPersistence batch2JobInstancePersisterWrapper(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager) {
IJobPersistence retVal = batch2JobInstancePersister(theJobInstanceRepository, theWorkChunkRepository, theTransactionManager);
public IJobPersistence batch2JobInstancePersisterWrapper(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, IHapiTransactionService theTransactionService, EntityManager theEntityManager) {
IJobPersistence retVal = batch2JobInstancePersister(theJobInstanceRepository, theWorkChunkRepository, theTransactionService, theEntityManager);
// Avoid H2 synchronization issues caused by
// https://github.com/h2database/h2database/issues/1808
if (HapiSystemProperties.isUnitTestModeEnabled()) {
retVal = new SynchronizedJobPersistenceWrapper(retVal);
}
// TODO: Update 2023-03-14 - The bug above appears to be fixed. I'm going to try
// disabing this and see if we can get away without it. If so, we can delete
// this entirely
// if (HapiSystemProperties.isUnitTestModeEnabled()) {
// retVal = ProxyUtil.synchronizedProxy(IJobPersistence.class, retVal);
// }
return retVal;
}

Expand Down
Loading
Loading