Skip to content

Commit

Permalink
Merge 263f62b into e3a1c4e
Browse files Browse the repository at this point in the history
  • Loading branch information
tadgh authored Apr 26, 2023
2 parents e3a1c4e + 263f62b commit 6e03371
Show file tree
Hide file tree
Showing 82 changed files with 3,800 additions and 1,591 deletions.
67 changes: 67 additions & 0 deletions hapi-fhir-base/src/main/java/ca/uhn/fhir/util/ProxyUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package ca.uhn.fhir.util;

/*-
* #%L
* HAPI FHIR - Core Library
* %%
* Copyright (C) 2014 - 2023 Smile CDR, Inc.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

import org.apache.commons.lang3.Validate;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

public class ProxyUtil {
private ProxyUtil() {}

/**
* Wrap theInstance in a Proxy that synchronizes every method.
*
* @param theClass the target interface
* @param theInstance the instance to wrap
* @return a Proxy implementing theClass interface that syncronizes every call on theInstance
* @param <T> the interface type
*/
public static <T> T synchronizedProxy(Class<T> theClass, T theInstance) {
Validate.isTrue(theClass.isInterface(), "%s is not an interface", theClass);
InvocationHandler handler = new SynchronizedHandler(theInstance);
Object object = Proxy.newProxyInstance(theClass.getClassLoader(), new Class<?>[] { theClass }, handler);
return theClass.cast(object);
}

/**
* Simple handler that first synchronizes on the delegate
*/
static class SynchronizedHandler implements InvocationHandler {
private final Object theDelegate;

SynchronizedHandler(Object theDelegate) {
this.theDelegate = theDelegate;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
synchronized (theDelegate) {
return method.invoke(theDelegate, args);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
type: perf
issue: 4622
title: "The batch system now reads less data during the maintenance pass. This avoids slowdowns on large systems."
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
type: change
issue: 4621
title: "Batch2 work-chunk processing now aligns transaction boundaries with event transitions."
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
type: fix
issue: 4643
title: "There was a transaction boundary issue in the Batch2 storage layer which resulted in the
framework needing more open database connections than necessary. This has been corrected."
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
type: fix
issue: 4647
title: "Batch job state transitions are are now transitionally safe."
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@

```mermaid
---
title: Batch2 Job Instance state transitions
---
stateDiagram-v2
[*] --> QUEUED : on db create and queued on kakfa
QUEUED --> IN_PROGRESS : on any work-chunk received by worker
%% and (see ca.uhn.fhir.batch2.progress.InstanceProgress.getNewStatus())
state first_step_finished <<choice>>
IN_PROGRESS --> first_step_finished : When 1st step finishes
first_step_finished --> COMPLETED: if no chunks produced
first_step_finished --> IN_PROGRESS: chunks produced
IN_PROGRESS --> in_progress_poll : on poll \n(count acomplete/failed/errored chunks)
in_progress_poll --> COMPLETED : 0 failures, errored, or incomplete\n AND at least 1 chunk complete
in_progress_poll --> ERRORED : no failed but errored chunks
in_progress_poll --> FINALIZE : none failed, gated execution\n last step\n queue REDUCER chunk
in_progress_poll --> IN_PROGRESS : still work to do
%% ERRORED is just like IN_PROGRESS, but it is a one-way trip from IN_PROGRESS to ERRORED.
%% FIXME We could probably delete/merge this state with IS_PROCESS, and use the error count in the UI.
note left of ERRORED
Parallel to IS_PROCESS
end note
state in_progress_poll <<choice>>
state error_progress_poll <<choice>>
ERRORED --> error_progress_poll : on poll \n(count acomplete/failed/errored chunks)
error_progress_poll --> FAILED : any failed chunks
error_progress_poll --> ERRORED : no failed but errored chunks
error_progress_poll --> FINALIZE : none failed, gated execution\n last step\n queue REDUCER chunk
error_progress_poll --> COMPLETED : 0 failures, errored, or incomplete AND at least 1 chunk complete
state do_report <<choice>>
FINALIZE --> do_reduction: poll util worker marks REDUCER chunk yes or no.
do_reduction --> COMPLETED : success
do_reduction --> FAILED : fail
in_progress_poll --> FAILED : any failed chunks
```

```mermaid
---
title: Batch2 Job Work Chunk state transitions
---
stateDiagram-v2
state QUEUED
state on_receive <<choice>>
state IN_PROGRESS
state ERROR
state execute <<choice>>
state FAILED
state COMPLETED
direction LR
[*] --> QUEUED : on create
%% worker processing states
QUEUED --> on_receive : on deque by worker
on_receive --> IN_PROGRESS : start execution
IN_PROGRESS --> execute: execute
execute --> ERROR : on re-triable error
execute --> COMPLETED : success\n maybe trigger instance first_step_finished
execute --> FAILED : on unrecoverable \n or too many errors
%% temporary error state until retry
ERROR --> on_receive : exception rollback\n triggers redelivery
%% terminal states
COMPLETED --> [*]
FAILED --> [*]
```
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
package ca.uhn.fhir.jpa.util;

/*-
* #%L
* HAPI FHIR JPA Server
Expand All @@ -19,6 +17,7 @@
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.batch2;

import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.WorkChunk;
Expand All @@ -27,7 +26,7 @@

import javax.annotation.Nonnull;

public class JobInstanceUtil {
class JobInstanceUtil {

private JobInstanceUtil() {}

Expand Down Expand Up @@ -64,14 +63,44 @@ public static JobInstance fromEntityToInstance(@Nonnull Batch2JobInstanceEntity
return retVal;
}

/**
* Copies all JobInstance fields to a Batch2JobInstanceEntity
* @param theJobInstance the job
* @param theJobInstanceEntity the target entity
*/
public static void fromInstanceToEntity(@Nonnull JobInstance theJobInstance, @Nonnull Batch2JobInstanceEntity theJobInstanceEntity) {
theJobInstanceEntity.setId(theJobInstance.getInstanceId());
theJobInstanceEntity.setDefinitionId(theJobInstance.getJobDefinitionId());
theJobInstanceEntity.setDefinitionVersion(theJobInstance.getJobDefinitionVersion());
theJobInstanceEntity.setStatus(theJobInstance.getStatus());
theJobInstanceEntity.setCancelled(theJobInstance.isCancelled());
theJobInstanceEntity.setFastTracking(theJobInstance.isFastTracking());
theJobInstanceEntity.setStartTime(theJobInstance.getStartTime());
theJobInstanceEntity.setCreateTime(theJobInstance.getCreateTime());
theJobInstanceEntity.setEndTime(theJobInstance.getEndTime());
theJobInstanceEntity.setUpdateTime(theJobInstance.getUpdateTime());
theJobInstanceEntity.setCombinedRecordsProcessed(theJobInstance.getCombinedRecordsProcessed());
theJobInstanceEntity.setCombinedRecordsProcessedPerSecond(theJobInstance.getCombinedRecordsProcessedPerSecond());
theJobInstanceEntity.setTotalElapsedMillis(theJobInstance.getTotalElapsedMillis());
theJobInstanceEntity.setWorkChunksPurged(theJobInstance.isWorkChunksPurged());
theJobInstanceEntity.setProgress(theJobInstance.getProgress());
theJobInstanceEntity.setErrorMessage(theJobInstance.getErrorMessage());
theJobInstanceEntity.setErrorCount(theJobInstance.getErrorCount());
theJobInstanceEntity.setEstimatedTimeRemaining(theJobInstance.getEstimatedTimeRemaining());
theJobInstanceEntity.setParams(theJobInstance.getParameters());
theJobInstanceEntity.setCurrentGatedStepId(theJobInstance.getCurrentGatedStepId());
theJobInstanceEntity.setReport(theJobInstance.getReport());
theJobInstanceEntity.setEstimatedTimeRemaining(theJobInstance.getEstimatedTimeRemaining());
}

/**
* Converts a Batch2WorkChunkEntity into a WorkChunk object
*
* @param theEntity - the entity to convert
* @param theIncludeData - whether or not to include the Data attached to the chunk
* @return - the WorkChunk object
*/
@Nonnull
public static WorkChunk fromEntityToWorkChunk(@Nonnull Batch2WorkChunkEntity theEntity, boolean theIncludeData) {
public static WorkChunk fromEntityToWorkChunk(@Nonnull Batch2WorkChunkEntity theEntity) {
WorkChunk retVal = new WorkChunk();
retVal.setId(theEntity.getId());
retVal.setSequence(theEntity.getSequence());
Expand All @@ -87,11 +116,8 @@ public static WorkChunk fromEntityToWorkChunk(@Nonnull Batch2WorkChunkEntity the
retVal.setErrorMessage(theEntity.getErrorMessage());
retVal.setErrorCount(theEntity.getErrorCount());
retVal.setRecordsProcessed(theEntity.getRecordsProcessed());
if (theIncludeData) {
if (theEntity.getSerializedData() != null) {
retVal.setData(theEntity.getSerializedData());
}
}
// note: may be null out if queried NoData
retVal.setData(theEntity.getSerializedData());
return retVal;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
package ca.uhn.fhir.jpa.batch2;

/*-
* #%L
* HAPI FHIR JPA Server
Expand All @@ -19,40 +17,47 @@
* limitations under the License.
* #L%
*/
package ca.uhn.fhir.jpa.batch2;

import ca.uhn.fhir.batch2.api.IJobPersistence;
import ca.uhn.fhir.batch2.config.BaseBatch2Config;
import ca.uhn.fhir.batch2.coordinator.SynchronizedJobPersistenceWrapper;
import ca.uhn.fhir.jpa.bulk.export.job.BulkExportJobConfig;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.dao.tx.IHapiTransactionService;
import ca.uhn.fhir.system.HapiSystemProperties;
import ca.uhn.fhir.util.ProxyUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.PlatformTransactionManager;

import javax.persistence.EntityManager;

@Configuration
@Import({
BulkExportJobConfig.class
})
public class JpaBatch2Config extends BaseBatch2Config {

@Bean
public IJobPersistence batch2JobInstancePersister(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager) {
return new JpaJobPersistenceImpl(theJobInstanceRepository, theWorkChunkRepository, theTransactionManager);
public IJobPersistence batch2JobInstancePersister(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, IHapiTransactionService theTransactionService, EntityManager theEntityManager) {
return new JpaJobPersistenceImpl(theJobInstanceRepository, theWorkChunkRepository, theTransactionService, theEntityManager);
}

@Primary
@Bean
public IJobPersistence batch2JobInstancePersisterWrapper(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, PlatformTransactionManager theTransactionManager) {
IJobPersistence retVal = batch2JobInstancePersister(theJobInstanceRepository, theWorkChunkRepository, theTransactionManager);
public IJobPersistence batch2JobInstancePersisterWrapper(IBatch2JobInstanceRepository theJobInstanceRepository, IBatch2WorkChunkRepository theWorkChunkRepository, IHapiTransactionService theTransactionService, EntityManager theEntityManager) {
IJobPersistence retVal = batch2JobInstancePersister(theJobInstanceRepository, theWorkChunkRepository, theTransactionService, theEntityManager);
// Avoid H2 synchronization issues caused by
// https://github.com/h2database/h2database/issues/1808
if (HapiSystemProperties.isUnitTestModeEnabled()) {
retVal = new SynchronizedJobPersistenceWrapper(retVal);
}
// TODO: Update 2023-03-14 - The bug above appears to be fixed. I'm going to try
// disabing this and see if we can get away without it. If so, we can delete
// this entirely
// if (HapiSystemProperties.isUnitTestModeEnabled()) {
// retVal = ProxyUtil.synchronizedProxy(IJobPersistence.class, retVal);
// }
return retVal;
}

Expand Down
Loading

0 comments on commit 6e03371

Please sign in to comment.