Skip to content

Commit

Permalink
[FLINK-22133][core] Add checkpointID to 'SplitEnumerator.snapshotStat…
Browse files Browse the repository at this point in the history
…e()'

This closes apache#15677
  • Loading branch information
StephanEwen committed Apr 20, 2021
1 parent 13d7e55 commit c4678d8
Show file tree
Hide file tree
Showing 16 changed files with 44 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void addReader(int subtaskId) {
}

@Override
public List<MockSourceSplit> snapshotState() {
public List<MockSourceSplit> snapshotState(long checkpointId) {
return splits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
}

@Override
public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() throws Exception {
public PendingSplitsCheckpoint<FileSourceSplit> snapshotState(long checkpointId)
throws Exception {
final PendingSplitsCheckpoint<FileSourceSplit> checkpoint =
PendingSplitsCheckpoint.fromCollectionSnapshot(
splitAssigner.remainingSplits(), pathsAlreadyProcessed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
}

@Override
public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() {
public PendingSplitsCheckpoint<FileSourceSplit> snapshotState(long checkpointId) {
return PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testDiscoverSplitWhenNoReaderRegistered() throws Exception {
fileEnumerator.addSplits(split);
context.triggerAllActions();

assertThat(enumerator.snapshotState().getSplits(), contains(split));
assertThat(enumerator.snapshotState(1L).getSplits(), contains(split));
}

@Test
Expand All @@ -77,7 +77,7 @@ public void testDiscoverWhenReaderRegistered() throws Exception {
fileEnumerator.addSplits(split);
context.triggerAllActions();

assertThat(enumerator.snapshotState().getSplits(), empty());
assertThat(enumerator.snapshotState(1L).getSplits(), empty());
assertThat(context.getSplitAssignments().get(2).getAssignedSplits(), contains(split));
}

Expand All @@ -102,7 +102,7 @@ public void testRequestingReaderUnavailableWhenSplitDiscovered() throws Exceptio
context.triggerAllActions();

assertFalse(context.getSplitAssignments().containsKey(2));
assertThat(enumerator.snapshotState().getSplits(), contains(split));
assertThat(enumerator.snapshotState(1L).getSplits(), contains(split));
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testCheckpointNoSplitRequested() throws Exception {
final FileSourceSplit split = createRandomSplit();
final StaticFileSplitEnumerator enumerator = createEnumerator(context, split);

final PendingSplitsCheckpoint<FileSourceSplit> checkpoint = enumerator.snapshotState();
final PendingSplitsCheckpoint<FileSourceSplit> checkpoint = enumerator.snapshotState(1L);

assertThat(checkpoint.getSplits(), contains(split));
}
Expand All @@ -68,7 +68,7 @@ public void testSplitRequestForRegisteredReader() throws Exception {
enumerator.addReader(3);
enumerator.handleSplitRequest(3, "somehost");

assertThat(enumerator.snapshotState().getSplits(), empty());
assertThat(enumerator.snapshotState(1L).getSplits(), empty());
assertThat(context.getSplitAssignments().get(3).getAssignedSplits(), contains(split));
}

Expand All @@ -82,7 +82,7 @@ public void testSplitRequestForNonRegisteredReader() throws Exception {
enumerator.handleSplitRequest(3, "somehost");

assertFalse(context.getSplitAssignments().containsKey(3));
assertThat(enumerator.snapshotState().getSplits(), contains(split));
assertThat(enumerator.snapshotState(1L).getSplits(), contains(split));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public void addReader(int subtaskId) {
}

@Override
public PendingSplitsCheckpoint<HiveSourceSplit> snapshotState() throws Exception {
public PendingSplitsCheckpoint<HiveSourceSplit> snapshotState(long checkpointId)
throws Exception {
Collection<HiveSourceSplit> remainingSplits =
(Collection<HiveSourceSplit>) (Collection<?>) splitAssigner.remainingSplits();
return new ContinuousHivePendingSplitsCheckpoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void addReader(int subtaskId) {
}

@Override
public KafkaSourceEnumState snapshotState() throws Exception {
public KafkaSourceEnumState snapshotState(long checkpointId) throws Exception {
return new KafkaSourceEnumState(assignedPartitions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,15 +333,15 @@ public void testSnapshotState() throws Throwable {
enumerator.start();

// No reader is registered, so the state should be empty
final KafkaSourceEnumState state1 = enumerator.snapshotState();
final KafkaSourceEnumState state1 = enumerator.snapshotState(1L);
assertTrue(state1.assignedPartitions().isEmpty());

registerReader(context, enumerator, READER0);
registerReader(context, enumerator, READER1);
context.runNextOneTimeCallable();

// The state should contain splits assigned to READER0 and READER1
final KafkaSourceEnumState state2 = enumerator.snapshotState();
final KafkaSourceEnumState state2 = enumerator.snapshotState(1L);
verifySplitAssignmentWithPartitions(
getExpectedAssignments(
new HashSet<>(Arrays.asList(READER0, READER1)), PRE_EXISTING_TOPICS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,26 @@ public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT>
void addReader(int subtaskId);

/**
* Checkpoints the state of this split enumerator.
* Creates a snapshot of the state of this split enumerator, to be stored in a checkpoint.
*
* <p>The snapshot should contain the latest state of the enumerator: It should assume that all
* operations that happened before the snapshot have successfully completed. For example all
* splits assigned to readers via {@link SplitEnumeratorContext#assignSplit(SourceSplit, int)}
* and {@link SplitEnumeratorContext#assignSplits(SplitsAssignment)}) don't need to be included
* in the snapshot anymore.
*
* <p>This method takes the ID of the checkpoint for which the state is snapshotted. Most
* implementations should be able to ignore this parameter, because for the contents of the
* snapshot, it doesn't matter for which checkpoint it gets created. This parameter can be
* interesting for source connectors with external systems where those systems are themselves
* aware of checkpoints; for example in cases where the enumerator notifies that system about a
* specific checkpoint being triggered.
*
* @param checkpointId The ID of the checkpoint for which the snapshot is created.
* @return an object containing the state of the split enumerator.
* @throws Exception when the snapshot cannot be taken.
*/
CheckpointT snapshotState() throws Exception;
CheckpointT snapshotState(long checkpointId) throws Exception;

/**
* Called to close the enumerator, in case it holds on to any resources, like threads or network
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void addSplitsBack(List<SplitT> splits, int subtaskId) {
}

@Override
public Collection<SplitT> snapshotState() throws Exception {
public Collection<SplitT> snapshotState(long checkpointId) throws Exception {
return remainingSplits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void addReader(int subtaskId) {
}

@Override
public Set<MockSourceSplit> snapshotState() {
public Set<MockSourceSplit> snapshotState(long checkpointId) {
return unassignedSplits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> r
checkpointId);
try {
context.onCheckpoint(checkpointId);
result.complete(toBytes());
result.complete(toBytes(checkpointId));
} catch (Throwable e) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(e);
result.completeExceptionally(
Expand Down Expand Up @@ -352,8 +352,9 @@ SourceCoordinatorContext<SplitT> getContext() {
* @return A byte array containing the serialized state of the source coordinator.
* @throws Exception When something goes wrong in serialization.
*/
private byte[] toBytes() throws Exception {
return writeCheckpointBytes(enumerator.snapshotState(), enumCheckpointSerializer);
private byte[] toBytes(long checkpointId) throws Exception {
return writeCheckpointBytes(
enumerator.snapshotState(checkpointId), enumCheckpointSerializer);
}

static <EnumChkT> byte[] writeCheckpointBytes(
Expand All @@ -378,7 +379,7 @@ static <EnumChkT> byte[] writeCheckpointBytes(
/**
* Restore the state of this source coordinator from the state bytes.
*
* @param bytes The checkpoint bytes that was returned from {@link #toBytes()}
* @param bytes The checkpoint bytes that was returned from {@link #toBytes(long)}
* @throws Exception When the deserialization failed.
*/
private EnumChkT deserializeCheckpoint(byte[] bytes) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public void testSerdeBackwardCompatibility() throws Exception {
// Build checkpoint data with serde version 0
final TestingSplitEnumerator<MockSourceSplit> enumerator = getEnumerator();
final Set<MockSourceSplit> splits = new HashSet<>();
enumerator.runInEnumThreadAndSync(() -> splits.addAll(enumerator.snapshotState()));
enumerator.runInEnumThreadAndSync(() -> splits.addAll(enumerator.snapshotState(1L)));

final byte[] checkpointDataForV0Serde = createCheckpointDataWithSerdeV0(splits);

Expand Down Expand Up @@ -428,7 +428,7 @@ public void addReader(int subtaskId) {
}

@Override
public Set<MockSourceSplit> snapshotState() throws Exception {
public Set<MockSourceSplit> snapshotState(long checkpointId) throws Exception {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void addReader(int subtaskId) {
}

@Override
public Set<SplitT> snapshotState() {
public Set<SplitT> snapshotState(long checkpointId) {
return new HashSet<>(splits);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,12 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
}

@Override
public Collection<SplitT> snapshotState() throws Exception {
public Collection<SplitT> snapshotState(long checkpointId) throws Exception {
// this will be enqueued in the enumerator thread, so it will actually run after this
// method (the snapshot operation) is complete!
context.runInCoordinatorThread(this::fullFillPendingRequests);

return super.snapshotState();
return super.snapshotState(checkpointId);
}

private void fullFillPendingRequests() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ public void notifyCheckpointComplete(long checkpointId) {
}

@Override
public EnumeratorState snapshotState() throws Exception {
public EnumeratorState snapshotState(long checkpointId) throws Exception {
LOG.info("snapshotState {}", state);
return state;
}
Expand Down

0 comments on commit c4678d8

Please sign in to comment.