Skip to content

Commit

Permalink
Avoid fetching work-chunk data (#4622)
Browse files Browse the repository at this point in the history
* Replace work-chunk maintenance query with projection to avoid fetching clob data.
* Add processing logging
  • Loading branch information
michaelabuckley committed Mar 4, 2023
1 parent 98b0dc3 commit d5ebd1f
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
type: perf
issue: 4622
title: "The batch system now reads less data during the maintenance pass. This avoids slowdowns on large systems."
Original file line number Diff line number Diff line change
Expand Up @@ -322,12 +322,24 @@ public List<WorkChunk> fetchWorkChunksWithoutData(String theInstanceId, int theP
}

private void fetchChunks(String theInstanceId, boolean theIncludeData, int thePageSize, int thePageIndex, Consumer<WorkChunk> theConsumer) {
myTxTemplate.executeWithoutResult(tx -> {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
for (Batch2WorkChunkEntity chunk : chunks) {
theConsumer.accept(toChunk(chunk, theIncludeData));
}
});
if (theIncludeData) {
// I think this is dead: MB
myTxTemplate.executeWithoutResult(tx -> {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunks(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
for (Batch2WorkChunkEntity chunk : chunks) {
theConsumer.accept(toChunk(chunk, theIncludeData));
}
});
} else {
// wipmb mb here
// a minimally-different path for a prod-fix.
myTxTemplate.executeWithoutResult(tx -> {
List<Batch2WorkChunkEntity> chunks = myWorkChunkRepository.fetchChunksNoData(PageRequest.of(thePageIndex, thePageSize), theInstanceId);
for (Batch2WorkChunkEntity chunk : chunks) {
theConsumer.accept(toChunk(chunk, theIncludeData));
}
});
}
}

@Override
Expand Down Expand Up @@ -355,6 +367,7 @@ private void fetchChunksForStep(String theInstanceId, String theStepId, int theP
*/
@Override
public Iterator<WorkChunk> fetchAllWorkChunksIterator(String theInstanceId, boolean theWithData) {
// wipmb mb here
return new PagingIterator<>((thePageIndex, theBatchSize, theConsumer) -> fetchChunks(theInstanceId, theWithData, theBatchSize, thePageIndex, theConsumer));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ public interface IBatch2WorkChunkRepository extends JpaRepository<Batch2WorkChun
@Query("SELECT e FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC")
List<Batch2WorkChunkEntity> fetchChunks(Pageable thePageRequest, @Param("instanceId") String theInstanceId);

/**
* A projection query to avoid fetching the CLOB over the wire.
* Otherwise, the same as fetchChunks.
*/
@Query("SELECT new Batch2WorkChunkEntity(" +
"e.myId, e.mySequence, e.myJobDefinitionId, e.myJobDefinitionVersion, e.myInstanceId, e.myTargetStepId, e.myStatus," +
"e.myCreateTime, e.myStartTime, e.myUpdateTime, e.myEndTime," +
"e.myErrorMessage, e.myErrorCount, e.myRecordsProcessed" +
") FROM Batch2WorkChunkEntity e WHERE e.myInstanceId = :instanceId ORDER BY e.mySequence ASC")
List<Batch2WorkChunkEntity> fetchChunksNoData(Pageable thePageRequest, @Param("instanceId") String theInstanceId);

@Query("SELECT DISTINCT e.myStatus from Batch2WorkChunkEntity e where e.myInstanceId = :instanceId AND e.myTargetStepId = :stepId")
List<StatusEnum> getDistinctStatusesForStep(@Param("instanceId") String theInstanceId, @Param("stepId") String theStepId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,36 @@ public class Batch2WorkChunkEntity implements Serializable {
@Column(name = "ERROR_COUNT", nullable = false)
private int myErrorCount;


/**
* Default constructor for Hibernate.
*/
public Batch2WorkChunkEntity() {
}

/**
* Projection constructor for no-date path.
*/
public Batch2WorkChunkEntity(String theId, int theSequence, String theJobDefinitionId, int theJobDefinitionVersion,
String theInstanceId, String theTargetStepId, StatusEnum theStatus,
Date theCreateTime, Date theStartTime, Date theUpdateTime, Date theEndTime,
String theErrorMessage, int theErrorCount, Integer theRecordsProcessed) {
myId = theId;
mySequence = theSequence;
myJobDefinitionId = theJobDefinitionId;
myJobDefinitionVersion = theJobDefinitionVersion;
myInstanceId = theInstanceId;
myTargetStepId = theTargetStepId;
myStatus = theStatus;
myCreateTime = theCreateTime;
myStartTime = theStartTime;
myUpdateTime = theUpdateTime;
myEndTime = theEndTime;
myErrorMessage = theErrorMessage;
myErrorCount = theErrorCount;
myRecordsProcessed = theRecordsProcessed;
}

public int getErrorCount() {
return myErrorCount;
}
Expand Down Expand Up @@ -242,4 +272,5 @@ public String toString() {
.append("errorMessage", myErrorMessage)
.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,31 @@
import ca.uhn.fhir.batch2.model.FetchJobInstancesRequest;
import ca.uhn.fhir.batch2.model.JobInstance;
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.jpa.dao.data.IBatch2JobInstanceRepository;
import ca.uhn.fhir.jpa.dao.data.IBatch2WorkChunkRepository;
import ca.uhn.fhir.jpa.entity.Batch2JobInstanceEntity;
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.util.JobInstanceUtil;
import ca.uhn.fhir.model.api.PagingIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -192,66 +183,6 @@ public void updateInstance_invalidId_throwsIllegalArgumentException() {
}
}

@Test
public void fetchAllWorkChunksIterator_withValidIdAndBoolToSayToIncludeData_returnsPagingIterator() {
// setup
String instanceId = "instanceId";
String jobDefinition = "definitionId";
int version = 1;
String targetStep = "step";

List<Batch2WorkChunkEntity> workChunkEntityList = new ArrayList<>();
Batch2WorkChunkEntity chunk1 = new Batch2WorkChunkEntity();
chunk1.setId("id1");
chunk1.setJobDefinitionVersion(version);
chunk1.setJobDefinitionId(jobDefinition);
chunk1.setSerializedData("serialized data 1");
chunk1.setTargetStepId(targetStep);
workChunkEntityList.add(chunk1);
Batch2WorkChunkEntity chunk2 = new Batch2WorkChunkEntity();
chunk2.setId("id2");
chunk2.setSerializedData("serialized data 2");
chunk2.setJobDefinitionId(jobDefinition);
chunk2.setJobDefinitionVersion(version);
chunk2.setTargetStepId(targetStep);
workChunkEntityList.add(chunk2);

for (boolean includeData : new boolean[] { true , false }) {
// when
when(myWorkChunkRepository.fetchChunks(any(PageRequest.class), eq(instanceId)))
.thenReturn(workChunkEntityList);

// test
Iterator<WorkChunk> chunkIterator = mySvc.fetchAllWorkChunksIterator(instanceId, includeData);

// verify
assertTrue(chunkIterator instanceof PagingIterator);
verify(myWorkChunkRepository, never())
.fetchChunks(any(PageRequest.class), anyString());

// now try the iterator out...
WorkChunk chunk = chunkIterator.next();
assertEquals(chunk1.getId(), chunk.getId());
if (includeData) {
assertEquals(chunk1.getSerializedData(), chunk.getData());
} else {
assertNull(chunk.getData());
}
chunk = chunkIterator.next();
assertEquals(chunk2.getId(), chunk.getId());
if (includeData) {
assertEquals(chunk2.getSerializedData(), chunk.getData());
} else {
assertNull(chunk.getData());
}

verify(myWorkChunkRepository)
.fetchChunks(any(PageRequest.class), eq(instanceId));

reset(myWorkChunkRepository);
}
}

@Test
public void fetchInstances_validRequest_returnsFoundInstances() {
// setup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import ca.uhn.fhir.jpa.entity.Batch2WorkChunkEntity;
import ca.uhn.fhir.jpa.test.BaseJpaR4Test;
import ca.uhn.fhir.util.JsonUtil;
import com.google.common.collect.Iterators;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
Expand Down Expand Up @@ -358,6 +359,76 @@ public void testStoreAndFetchWorkChunk_NoData() {
assertNull(chunk.getData());
}

@Test
void testStoreAndFetchChunksForInstance_NoData() {
//wipmb here
// given
JobInstance instance = createInstance();
String instanceId = mySvc.storeNewInstance(instance);

String queuedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 0, "some data");
String erroredId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 1, "some more data");
String completedId = storeWorkChunk(JOB_DEFINITION_ID, TARGET_STEP_ID, instanceId, 2, "some more data");

mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(erroredId);
MarkWorkChunkAsErrorRequest parameters = new MarkWorkChunkAsErrorRequest();
parameters.setChunkId(erroredId);
parameters.setErrorMsg("Our error message");
mySvc.markWorkChunkAsErroredAndIncrementErrorCount(parameters);

mySvc.fetchWorkChunkSetStartTimeAndMarkInProgress(completedId);
mySvc.markWorkChunkAsCompletedAndClearData(instanceId, completedId, 11);

// when
Iterator<WorkChunk> workChunks = mySvc.fetchAllWorkChunksIterator(instanceId, false);

// then
ArrayList<WorkChunk> chunks = new ArrayList<>();
Iterators.addAll(chunks, workChunks);
assertEquals(3, chunks.size());

{
WorkChunk workChunk = chunks.get(0);
assertNull(workChunk.getData(), "we skip the data");
assertEquals(queuedId, workChunk.getId());
assertEquals(JOB_DEFINITION_ID, workChunk.getJobDefinitionId());
assertEquals(JOB_DEF_VER, workChunk.getJobDefinitionVersion());
assertEquals(instanceId, workChunk.getInstanceId());
assertEquals(TARGET_STEP_ID, workChunk.getTargetStepId());
assertEquals(0, workChunk.getSequence());
assertEquals(StatusEnum.QUEUED, workChunk.getStatus());


assertNotNull(workChunk.getCreateTime());
assertNotNull(workChunk.getStartTime());
assertNotNull(workChunk.getUpdateTime());
assertNull(workChunk.getEndTime());
assertNull(workChunk.getErrorMessage());
assertEquals(0, workChunk.getErrorCount());
assertEquals(null, workChunk.getRecordsProcessed());
}

{
WorkChunk workChunk1 = chunks.get(1);
assertEquals(StatusEnum.ERRORED, workChunk1.getStatus());
assertEquals("Our error message", workChunk1.getErrorMessage());
assertEquals(1, workChunk1.getErrorCount());
assertEquals(null, workChunk1.getRecordsProcessed());
assertNotNull(workChunk1.getEndTime());
}

{
WorkChunk workChunk2 = chunks.get(2);
assertEquals(StatusEnum.COMPLETED, workChunk2.getStatus());
assertNotNull(workChunk2.getEndTime());
assertEquals(11, workChunk2.getRecordsProcessed());
assertNull(workChunk2.getErrorMessage());
assertEquals(0, workChunk2.getErrorCount());
}

}


@Test
public void testStoreAndFetchWorkChunk_WithData() {
JobInstance instance = createInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import ca.uhn.fhir.batch2.progress.JobInstanceStatusUpdater;
import ca.uhn.fhir.model.api.IModelJson;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import org.apache.commons.lang3.time.DateUtils;
import org.slf4j.Logger;

Expand Down Expand Up @@ -68,6 +69,9 @@ public class JobInstanceProcessor {
}

public void process() {
ourLog.debug("Starting job processing: {}", myInstanceId);
StopWatch stopWatch = new StopWatch();

JobInstance theInstance = myJobPersistence.fetchInstance(myInstanceId).orElse(null);
if (theInstance == null) {
return;
Expand All @@ -76,6 +80,8 @@ public void process() {
handleCancellation(theInstance);
cleanupInstance(theInstance);
triggerGatedExecutions(theInstance);

ourLog.debug("Finished job processing: {} - {}", myInstanceId, stopWatch);
}

private void handleCancellation(JobInstance theInstance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import ca.uhn.fhir.batch2.model.StatusEnum;
import ca.uhn.fhir.batch2.model.WorkChunk;
import ca.uhn.fhir.util.Logs;
import ca.uhn.fhir.util.StopWatch;
import org.slf4j.Logger;

import javax.annotation.Nonnull;
Expand All @@ -46,6 +47,9 @@ public JobInstanceProgressCalculator(IJobPersistence theJobPersistence, JobChunk

public void calculateAndStoreInstanceProgress(JobInstance theInstance) {
String instanceId = theInstance.getInstanceId();
StopWatch stopWatch = new StopWatch();
ourLog.trace("calculating progress: {}", instanceId);


InstanceProgress instanceProgress = calculateInstanceProgress(instanceId);

Expand Down Expand Up @@ -76,11 +80,13 @@ public void calculateAndStoreInstanceProgress(JobInstance theInstance) {
}

}
ourLog.trace("calculating progress: {} - complete in {}", instanceId, stopWatch);
}

@Nonnull
private InstanceProgress calculateInstanceProgress(String instanceId) {
InstanceProgress instanceProgress = new InstanceProgress();
// wipmb mb here
Iterator<WorkChunk> workChunkIterator = myJobPersistence.fetchAllWorkChunksIterator(instanceId, false);

while (workChunkIterator.hasNext()) {
Expand Down

0 comments on commit d5ebd1f

Please sign in to comment.