Skip to content

Commit

Permalink
[FLINK-18110][fs-connector] StreamingFileSink notifies for buckets de…
Browse files Browse the repository at this point in the history
…tected to be inactive on restoring


This closes apache#12496
  • Loading branch information
gaoyunhaii committed Jun 8, 2020
1 parent 88871b2 commit 0a1b0c6
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ private void handleRestoredBucketState(final BucketState<BucketID> recoveredStat

private void updateActiveBucketId(final BucketID bucketId, final Bucket<IN, BucketID> restoredBucket) throws IOException {
if (!restoredBucket.isActive()) {
notifyBucketInactive(restoredBucket);
return;
}

Expand All @@ -212,10 +213,7 @@ public void commitUpToCheckpoint(final long checkpointId) throws IOException {
// We've dealt with all the pending files and the writer for this bucket is not currently open.
// Therefore this bucket is currently inactive and we can remove it from our state.
activeBucketIt.remove();

if (bucketLifeCycleListener != null) {
bucketLifeCycleListener.bucketInactive(bucket);
}
notifyBucketInactive(bucket);
}
}
}
Expand Down Expand Up @@ -304,10 +302,7 @@ private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final BucketID bucketI
rollingPolicy,
outputFileConfig);
activeBuckets.put(bucketId, bucket);

if (bucketLifeCycleListener != null) {
bucketLifeCycleListener.bucketCreated(bucket);
}
notifyBucketCreate(bucket);
}
return bucket;
}
Expand All @@ -332,6 +327,18 @@ private Path assembleBucketPath(BucketID bucketId) {
return new Path(basePath, child);
}

private void notifyBucketCreate(Bucket<IN, BucketID> bucket) {
if (bucketLifeCycleListener != null) {
bucketLifeCycleListener.bucketCreated(bucket);
}
}

private void notifyBucketInactive(Bucket<IN, BucketID> bucket) {
if (bucketLifeCycleListener != null) {
bucketLifeCycleListener.bucketInactive(bucket);
}
}

/**
* The {@link BucketAssigner.Context} exposed to the
* {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -409,6 +411,48 @@ public void testBucketLifeCycleListenerOnCreatingAndInactive() throws Exception
Assert.assertEquals(expectedEvents, bucketLifeCycleListener.getEvents());
}

@Test
public void testBucketLifeCycleListenerOnRestoring() throws Exception {
File outDir = TEMP_FOLDER.newFolder();
Path path = new Path(outDir.toURI());
OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy =
new OnProcessingTimePolicy<>(2L);
RecordBucketLifeCycleListener bucketLifeCycleListener = new RecordBucketLifeCycleListener();
Buckets<String, String> buckets = createBuckets(
path,
rollOnProcessingTimeCountingPolicy,
bucketLifeCycleListener,
0,
OutputFileConfig.builder().build());
ListState<byte[]> bucketStateContainer = new MockListState<>();
ListState<Long> partCounterContainer = new MockListState<>();

buckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
buckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 3L));

// Will close the part file writer of the bucket "test1". Now bucket "test1" have only
// one pending file while bucket "test2" has an on-writing in-progress file.
buckets.onProcessingTime(4);
buckets.snapshotState(0, bucketStateContainer, partCounterContainer);

// On restoring the bucket "test1" will commit its pending file and become inactive.
buckets = restoreBuckets(
path,
rollOnProcessingTimeCountingPolicy,
bucketLifeCycleListener,
0,
bucketStateContainer,
partCounterContainer,
OutputFileConfig.builder().build());

Assert.assertEquals(new HashSet<>(Collections.singletonList("test2")), buckets.getActiveBuckets().keySet());
List<Tuple2<RecordBucketLifeCycleListener.EventType, String>> expectedEvents = Arrays.asList(
new Tuple2<>(RecordBucketLifeCycleListener.EventType.CREATED, "test1"),
new Tuple2<>(RecordBucketLifeCycleListener.EventType.CREATED, "test2"),
new Tuple2<>(RecordBucketLifeCycleListener.EventType.INACTIVE, "test1"));
Assert.assertEquals(expectedEvents, bucketLifeCycleListener.getEvents());
}

private static class RecordBucketLifeCycleListener implements BucketLifeCycleListener<String, String> {
public enum EventType {
CREATED,
Expand Down Expand Up @@ -477,6 +521,7 @@ private static Buckets<String, String> restoreBuckets(
return restoreBuckets(
basePath,
rollingPolicy,
null,
subtaskIdx,
bucketState,
partCounterState,
Expand All @@ -486,14 +531,15 @@ private static Buckets<String, String> restoreBuckets(
private static Buckets<String, String> restoreBuckets(
final Path basePath,
final RollingPolicy<String, String> rollingPolicy,
final BucketLifeCycleListener<String, String> bucketLifeCycleListener,
final int subtaskIdx,
final ListState<byte[]> bucketState,
final ListState<Long> partCounterState,
final OutputFileConfig outputFileConfig) throws Exception {
final Buckets<String, String> restoredBuckets = createBuckets(
basePath,
rollingPolicy,
null,
bucketLifeCycleListener,
subtaskIdx,
outputFileConfig);
restoredBuckets.initializeState(bucketState, partCounterState);
Expand Down

0 comments on commit 0a1b0c6

Please sign in to comment.