diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java index 39acc298b1b4f..f9363487f3ad0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java @@ -187,6 +187,7 @@ private void handleRestoredBucketState(final BucketState recoveredStat private void updateActiveBucketId(final BucketID bucketId, final Bucket restoredBucket) throws IOException { if (!restoredBucket.isActive()) { + notifyBucketInactive(restoredBucket); return; } @@ -212,10 +213,7 @@ public void commitUpToCheckpoint(final long checkpointId) throws IOException { // We've dealt with all the pending files and the writer for this bucket is not currently open. // Therefore this bucket is currently inactive and we can remove it from our state. activeBucketIt.remove(); - - if (bucketLifeCycleListener != null) { - bucketLifeCycleListener.bucketInactive(bucket); - } + notifyBucketInactive(bucket); } } } @@ -304,10 +302,7 @@ private Bucket getOrCreateBucketForBucketId(final BucketID bucketI rollingPolicy, outputFileConfig); activeBuckets.put(bucketId, bucket); - - if (bucketLifeCycleListener != null) { - bucketLifeCycleListener.bucketCreated(bucket); - } + notifyBucketCreate(bucket); } return bucket; } @@ -332,6 +327,18 @@ private Path assembleBucketPath(BucketID bucketId) { return new Path(basePath, child); } + private void notifyBucketCreate(Bucket bucket) { + if (bucketLifeCycleListener != null) { + bucketLifeCycleListener.bucketCreated(bucket); + } + } + + private void notifyBucketInactive(Bucket bucket) { + if (bucketLifeCycleListener != null) { + bucketLifeCycleListener.bucketInactive(bucket); + } + } + /** * The {@link BucketAssigner.Context} exposed to the * {@link BucketAssigner#getBucketId(Object, BucketAssigner.Context)} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java index 9707ec7e1fbca..f982ced5d73cf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java @@ -40,6 +40,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -409,6 +411,48 @@ public void testBucketLifeCycleListenerOnCreatingAndInactive() throws Exception Assert.assertEquals(expectedEvents, bucketLifeCycleListener.getEvents()); } + @Test + public void testBucketLifeCycleListenerOnRestoring() throws Exception { + File outDir = TEMP_FOLDER.newFolder(); + Path path = new Path(outDir.toURI()); + OnProcessingTimePolicy rollOnProcessingTimeCountingPolicy = + new OnProcessingTimePolicy<>(2L); + RecordBucketLifeCycleListener bucketLifeCycleListener = new RecordBucketLifeCycleListener(); + Buckets buckets = createBuckets( + path, + rollOnProcessingTimeCountingPolicy, + bucketLifeCycleListener, + 0, + OutputFileConfig.builder().build()); + ListState bucketStateContainer = new MockListState<>(); + ListState partCounterContainer = new MockListState<>(); + + buckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L)); + buckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 3L)); + + // Will close the part file writer of the bucket "test1". Now bucket "test1" have only + // one pending file while bucket "test2" has an on-writing in-progress file. + buckets.onProcessingTime(4); + buckets.snapshotState(0, bucketStateContainer, partCounterContainer); + + // On restoring the bucket "test1" will commit its pending file and become inactive. + buckets = restoreBuckets( + path, + rollOnProcessingTimeCountingPolicy, + bucketLifeCycleListener, + 0, + bucketStateContainer, + partCounterContainer, + OutputFileConfig.builder().build()); + + Assert.assertEquals(new HashSet<>(Collections.singletonList("test2")), buckets.getActiveBuckets().keySet()); + List> expectedEvents = Arrays.asList( + new Tuple2<>(RecordBucketLifeCycleListener.EventType.CREATED, "test1"), + new Tuple2<>(RecordBucketLifeCycleListener.EventType.CREATED, "test2"), + new Tuple2<>(RecordBucketLifeCycleListener.EventType.INACTIVE, "test1")); + Assert.assertEquals(expectedEvents, bucketLifeCycleListener.getEvents()); + } + private static class RecordBucketLifeCycleListener implements BucketLifeCycleListener { public enum EventType { CREATED, @@ -477,6 +521,7 @@ private static Buckets restoreBuckets( return restoreBuckets( basePath, rollingPolicy, + null, subtaskIdx, bucketState, partCounterState, @@ -486,6 +531,7 @@ private static Buckets restoreBuckets( private static Buckets restoreBuckets( final Path basePath, final RollingPolicy rollingPolicy, + final BucketLifeCycleListener bucketLifeCycleListener, final int subtaskIdx, final ListState bucketState, final ListState partCounterState, @@ -493,7 +539,7 @@ private static Buckets restoreBuckets( final Buckets restoredBuckets = createBuckets( basePath, rollingPolicy, - null, + bucketLifeCycleListener, subtaskIdx, outputFileConfig); restoredBuckets.initializeState(bucketState, partCounterState);