Skip to content

Commit

Permalink
Batch2 workchunk states hapi (#5851)
Browse files Browse the repository at this point in the history
* step 1

* updated batch 2 framework with READY state

* spotless

* remove entity manager

* spotless

* fixing up more tests for batch2

* updating documentation

* cleanup

* removing checkstyle violation

* code review points

* review points continued

* review poitns finished

* updating tests

* updates

* spotless

* updated

* step 1

* updated

* sketch out test cases

* basic state transition shell work

* typos

* spotless

* adding spy override

* fixing tests

* spotless

* changing comment to complete build

* fixing some tests and adding a view

* adding different paging mechanism

* spotless

* waiting step 1

* commit changes

* remove text

* review fixes

* spotless

* some tweaks

* updating documentation and adding change log

* spotless

* added documentation

* review comments 1

* more review fixes

* spotless

* fixing bug

* fixing path

* spotless

* update state diagram

* review points round 1

* revert

* updating diag

* review fixes round 2

* spotless

* - Implemented GATE_WAITING state for the batch2 state machine.
- This will be the initial status for all workchunks of a gated job.
- made compatible with the equivalent "fake QUEUED" state in the Old batch2 implementation.
- Updated corresponding docs.
- added corresponding tests and changelog

* Revert "- Implemented GATE_WAITING state for the batch2 state machine."

This reverts commit 32a00f4.

* - Implemented GATE_WAITING state for the batch2 state machine.
- This will be the initial status for all workchunks of a gated job.
- made compatible with the equivalent "fake QUEUED" state in the Old batch2 implementation.
- Updated corresponding docs.
- added corresponding tests and changelog

* fixing a bug

* spotless

* fixing

* - fix merges conflicts
- set first chunk to be always created in READY

* - have only one path through the equeueReady method
- fixed tests

* - hid the over-powered transition function behind a proper state action

* spotless

* resolved review comments

* fixing tests

* resolved review comments

* resolved review comments

* resolved review comments

* resolved review comments

* resolved review comments

* updating migration script number

* fixed bugs

* spotless

* fix test high concurrency

* fixing a test

* code fix

* fixing tests in bulkexportit

* fixing tests

* fixing tests

* cleanup

* completed instance will not be sent to the reduction step service

* Revert "completed instance will not be sent to the reduction step service"

This reverts commit aa149b6.

* Revert "Revert "completed instance will not be sent to the reduction step service""

This reverts commit e18f579.

* removing dead code

* changed db query for step advance to take statuses as parameter instead

* test fixes

* spotless

* test fix

* spotless

* fixing tests

* migration fix

* fixing test

* testing pipeline with `testGroupBulkExportNotInGroup_DoesNotShowUp` disabled

* fixing some tests

* Add new race test for simultaneous queue/dequeue

* re-enabling `testGroupBulkExportNotInGroup_DoesNotShowUp`

* cascade tag deletes

* test fixes

* some logging

* a test case

* adding job id

* more test code

* marking purge checks

* test fix

* testing

* pausing schedulers on cleanup

* adding a wait

* max thread count guarantee

* fixing the tests again

* removing dead code

* spotless

* checking

* msg codes:

* Fixing a test

* review points

* spotless

* required pom values

* step 1 of reduction ready

* update

* reductoin ready

* annother test

* spotless

* cleanup

* cleanup

* simplifying check in reduction step

* review fixes

* updating version

* using 7.3.1

* adding check

* test finessing

---------

Co-authored-by: leif stawnyczy <[email protected]>
Co-authored-by: Michael Buckley <[email protected]>
Co-authored-by: tyner <[email protected]>
  • Loading branch information
4 people committed May 7, 2024
1 parent b555498 commit ae67e7b
Show file tree
Hide file tree
Showing 163 changed files with 3,959 additions and 760 deletions.
2 changes: 1 addition & 1 deletion hapi-deployable-pom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-android/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
import java.util.NoSuchElementException;
import java.util.function.Consumer;

/**
* This paging iterator only works with already ordered queries
*/
public class PagingIterator<T> implements Iterator<T> {

public interface PageFetcher<T> {
void fetchNextPage(int thePageIndex, int theBatchSize, Consumer<T> theConsumer);
}

static final int PAGE_SIZE = 100;
static final int DEFAULT_PAGE_SIZE = 100;

private int myPage;

Expand All @@ -42,8 +45,16 @@ public interface PageFetcher<T> {

private final PageFetcher<T> myFetcher;

private final int myPageSize;

public PagingIterator(PageFetcher<T> theFetcher) {
this(DEFAULT_PAGE_SIZE, theFetcher);
}

public PagingIterator(int thePageSize, PageFetcher<T> theFetcher) {
assert thePageSize > 0 : "Page size must be a positive value";
myFetcher = theFetcher;
myPageSize = thePageSize;
}

@Override
Expand All @@ -66,9 +77,9 @@ public T next() {

private void fetchNextBatch() {
if (!myIsFinished && myCurrentBatch.isEmpty()) {
myFetcher.fetchNextPage(myPage, PAGE_SIZE, myCurrentBatch::add);
myFetcher.fetchNextPage(myPage, myPageSize, myCurrentBatch::add);
myPage++;
myIsFinished = myCurrentBatch.size() < PAGE_SIZE;
myIsFinished = myCurrentBatch.size() < myPageSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void next_whenNextIsAvailable_fetches() {
public void next_fetchTest_fetchesAndReturns() {
// 3 cases to make sure we get the edge cases
for (int adj : new int[] { -1, 0, 1 }) {
int size = PagingIterator.PAGE_SIZE + adj;
int size = PagingIterator.DEFAULT_PAGE_SIZE + adj;

myPagingIterator = createPagingIterator(size);

Expand Down
4 changes: 2 additions & 2 deletions hapi-fhir-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
<modelVersion>4.0.0</modelVersion>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-bom</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<packaging>pom</packaging>
<name>HAPI FHIR BOM</name>

<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-checkstyle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-cli/hapi-fhir-cli-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../../hapi-deployable-pom/pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-cli/hapi-fhir-cli-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir-cli</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-client-okhttp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-converter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-fhir</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
type: add
issue: 5745
title: "Added another state to the Batch2 work chunk state machine: `READY`.
This work chunk state will be the initial state on creation.
Once queued for delivery, they will transition to `QUEUED`.
The exception is for ReductionStep chunks (because reduction steps
are not read off of the queue, but executed by the maintenance job
inline.
"
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
type: add
issue: 5767
title: "Added new `POLL_WAITING` state for WorkChunks in batch jobs.
Also added RetryChunkLaterException for jobs that have steps that
need to be retried at a later time (can be provided optionally to exception).
If a step throws this new exception, it will be set with the new
`POLL_WAITING` status and retried at a later time.
"
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
type: add
issue: 5818
title: "Added another state to the Batch2 work chunk state machine: `GATE_WAITING`.
This work chunk state will be the initial state on creation for gated jobs.
Once all chunks are completed for the previous step, they will transition to `READY`.
"
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,35 @@ stateDiagram-v2
title: Batch2 Job Work Chunk state transitions
---
stateDiagram-v2
state GATE_WAITING
state READY
state REDUCTION_READY
state QUEUED
state on_receive <<choice>>
state IN_PROGRESS
state ERROR
state POLL_WAITING
state execute <<choice>>
state FAILED
state COMPLETED
direction LR
[*] --> QUEUED : on create
[*] --> READY : on create - normal or gated jobs first chunks
[*] --> GATE_WAITING : on create - gated jobs for all but the first chunks of the first step
GATE_WAITING --> READY : on gate release - gated
GATE_WAITING --> REDUCTION_READY : on gate release for the final reduction step (all reduction jobs are gated)
QUEUED --> READY : on gate release - gated (for compatibility with legacy QUEUED state up to Hapi-fhir version 7.1)
READY --> QUEUED : placed on kafka (maint.)
POLL_WAITING --> READY : after a poll delay on a POLL_WAITING work chunk has elapsed
%% worker processing states
QUEUED --> on_receive : on deque by worker
QUEUED --> on_receive : on deque by worker
on_receive --> IN_PROGRESS : start execution
IN_PROGRESS --> execute: execute
execute --> ERROR : on re-triable error
execute --> COMPLETED : success\n maybe trigger instance first_step_finished
execute --> FAILED : on unrecoverable \n or too many errors
execute --> POLL_WAITING : job step has throw a RetryChunkLaterException and must be tried again after the provided poll delay
%% temporary error state until retry
ERROR --> on_receive : exception rollback\n triggers redelivery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,54 @@ A HAPI-FHIR batch job definition consists of a job name, version, parameter json
After a job has been defined, *instances* of that job can be submitted for batch processing by populating a `JobInstanceStartRequest` with the job name and job parameters json and then submitting that request to the Batch Job Coordinator.

The Batch Job Coordinator will then store two records in the database:
- Job Instance with status QUEUED: that is the parent record for all data concerning this job
- Batch Work Chunk with status QUEUED: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data.
- Job Instance with status `QUEUED`: that is the parent record for all data concerning this job
- Batch Work Chunk with status `READY`: this describes the first "chunk" of work required for this job. The first Batch Work Chunk contains no data.

Lastly the Batch Job Coordinator publishes a message to the Batch Notification Message Channel (named `batch2-work-notification`) to inform worker threads that this first chunk of work is now ready for processing.
### The Maintenance Job

### Job Processing - First Step
A Scheduled Job runs periodically (once a minute). For each Job Instance in the database, it:

HAPI-FHIR Batch Jobs run based on job notification messages. The process is kicked off by the first chunk of work. When this notification message arrives, the message handler makes a single call to the first step defined in the job definition, passing in the job parameters as input.
1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any left over work chunks still in the database.
1. Moves all `POLL_WAITING` work chunks to `READY` if their `nextPollTime` has expired.
1. Calculates job progress (% of work chunks in `COMPLETE` status). If the job is finished, purges any leftover work chunks still in the database.
1. Cleans up any complete, failed, or cancelled jobs that need to be removed.
1. When the current step is complete, moves any gated jobs onto their next step and updates all chunks in `GATE_WAITING` to `READY`. If the the job is being moved to its final reduction step, chunks are moved from `GATE_WAITING` to `REDUCTION_READY`.
1. If the final step of a gated job is a reduction step, a reduction step execution will be triggered. All workchunks for the job in `REDUCTION_READY` will be consumed at this point.
1. Moves all `READY` work chunks into the `QUEUED` state and publishes a message to the Batch Notification Message Channel to inform worker threads that a work chunk is now ready for processing. \*

The handler then does the following:
1. Change the work chunk status from QUEUED to IN_PROGRESS
2. Change the Job Instance status from QUEUED to IN_PROGRESS
3. If the Job Instance is cancelled, change the status to CANCELLED and abort processing.
4. The first step of the job definition is executed with the job parameters
5. This step creates new work chunks. For each work chunk it creates, it json serializes the work chunk data, stores it in the database, and publishes a new message to the Batch Notification Message Channel to notify worker threads that there are new work chunks waiting to be processed.
6. If the step succeeded, the work chunk status is changed from IN_PROGRESS to COMPLETED, and the data it contained is deleted.
7. If the step failed, the work chunk status is changed from IN_PROGRESS to either ERRORED or FAILED depending on the severity of the error.
\* An exception is for the final reduction step, where work chunks are not published to the Batch Notification Message Channel,
but instead processed inline.

### Job Processing - Middle steps
### Batch Notification Message Handler

Middle Steps in the job definition are executed in the same way, except instead of only using the Job Parameters as input, they use both the Job Parameters and the Work Chunk data produced from the previous step.
HAPI-FHIR Batch Jobs run based on job notification messages of the Batch Notification Message Channel (named `batch2-work-notification`).

### Job Processing - Final Step
When a notification message arrives, the handler does the following:

1. Change the work chunk status from `QUEUED` to `IN_PROGRESS`
1. Change the Job Instance status from `QUEUED` to `IN_PROGRESS`
1. If the Job Instance is cancelled, change the status to `CANCELLED` and abort processing
1. If the step creates new work chunks, each work chunk will be created in either the `GATE_WAITING` state (for gated jobs) or `READY` state (for non-gated jobs) and will be handled in the next maintenance job pass.
1. If the step succeeds, the work chunk status is changed from `IN_PROGRESS` to `COMPLETED`, and the data it contained is deleted.
1. If the step throws a `RetryChunkLaterException`, the work chunk status is changed from `IN_PROGRESS` to `POLL_WAITING`, and a `nextPollTime` value will be set.
1. If the step fails, the work chunk status is changed from `IN_PROGRESS` to either `ERRORED` or `FAILED`, depending on the severity of the error.

### First Step

The first step in a job definition is executed with just the job parameters.

### Middle steps

Middle Steps in the job definition are executed using the initial Job Parameters and the Work Chunk data produced from the previous step.

### Final Step

The final step operates the same way as the middle steps, except it does not produce any new work chunks.

### Gated Execution

If a Job Definition is set to having Gated Execution, then all work chunks for one step must be COMPLETED before any work chunks for the next step may begin.
If a Job Definition is set to having Gated Execution, then all work chunks for a step must be `COMPLETED` before any work chunks for the next step may begin.

### Job Instance Completion

A Batch Job Maintenance Service runs every minute to monitor the status of all Job Instances and the Job Instance is transitioned to either COMPLETED, ERRORED or FAILED according to the status of all outstanding work chunks for that job instance. If the job instance is still IN_PROGRESS this maintenance service also estimates the time remaining to complete the job.
A Batch Job Maintenance Service runs every minute to monitor the status of all Job Instances and the Job Instance is transitioned to either `COMPLETED`, `ERRORED` or `FAILED` according to the status of all outstanding work chunks for that job instance. If the job instance is still `IN_PROGRESS` this maintenance service also estimates the time remaining to complete the job.
2 changes: 1 addition & 1 deletion hapi-fhir-jacoco/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-jaxrsserver-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion hapi-fhir-jpa/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>ca.uhn.hapi.fhir</groupId>
<artifactId>hapi-deployable-pom</artifactId>
<version>7.3.0-SNAPSHOT</version>
<version>7.3.1-SNAPSHOT</version>

<relativePath>../hapi-deployable-pom/pom.xml</relativePath>
</parent>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ public interface IHapiScheduler {

void logStatusForUnitTest();

/**
* Pauses this scheduler (and thus all scheduled jobs).
* To restart call {@link #unpause()}
*/
void pause();

/**
* Restarts this scheduler after {@link #pause()}
*/
void unpause();

void scheduleJob(long theIntervalMillis, ScheduledJobDefinition theJobDefinition);

Set<JobKey> getJobKeysForUnitTest() throws SchedulerException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ public interface ISchedulerService {

void logStatusForUnitTest();

/**
* Pauses the scheduler so no new jobs will run.
* Useful in tests when cleanup needs to happen but scheduled jobs may
* be running
*/
@VisibleForTesting
void pause();

/**
* Restarts the scheduler after a previous call to {@link #pause()}.
*/
@VisibleForTesting
void unpause();

/**
* This task will execute locally (and should execute on all nodes of the cluster if there is a cluster)
* @param theIntervalMillis How many milliseconds between passes should this job run
Expand All @@ -52,6 +66,9 @@ public interface ISchedulerService {
@VisibleForTesting
Set<JobKey> getClusteredJobKeysForUnitTest() throws SchedulerException;

@VisibleForTesting
boolean isSchedulingDisabled();

boolean isStopping();

/**
Expand Down
Loading

0 comments on commit ae67e7b

Please sign in to comment.