Skip to content

Commit

Permalink
[FLINK-17590][fs-connector] Support bucket lifecycle listener in stre…
Browse files Browse the repository at this point in the history
…aming file sink buckets


This closes apache#12053
  • Loading branch information
gaoyunhaii authored May 16, 2020
1 parent de5e631 commit 3f487c9
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ private void commitRecoveredPendingFiles(final BucketState<BucketID> state) thro
}
}

BucketID getBucketId() {
public BucketID getBucketId() {
return bucketId;
}

Path getBucketPath() {
public Path getBucketPath() {
return bucketPath;
}

long getPartCounter() {
public long getPartCounter() {
return partCounter;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.sink.filesystem;

import org.apache.flink.annotation.Internal;

import java.io.Serializable;

/**
* Listener about the status of {@link Bucket}.
*/
@Internal
public interface BucketLifeCycleListener<IN, BucketID> extends Serializable {

/**
* Notifies a new bucket has been created.
*
* @param bucket The newly created bucket.
*/
void bucketCreated(Bucket<IN, BucketID> bucket);

/**
* Notifies a bucket become inactive. A bucket becomes inactive after all
* the records received so far have been committed.
*
* @param bucket The bucket getting inactive.
*/
void bucketInactive(Bucket<IN, BucketID> bucket);

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class Buckets<IN, BucketID> {

private final RollingPolicy<IN, BucketID> rollingPolicy;

@Nullable
private final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;

// --------------------------- runtime fields -----------------------------

private final int subtaskIndex;
Expand Down Expand Up @@ -98,6 +101,7 @@ public class Buckets<IN, BucketID> {
final BucketFactory<IN, BucketID> bucketFactory,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
@Nullable final BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener,
final int subtaskIndex,
final OutputFileConfig outputFileConfig) throws IOException {

Expand All @@ -106,6 +110,7 @@ public class Buckets<IN, BucketID> {
this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
this.partFileWriterFactory = Preconditions.checkNotNull(partFileWriterFactory);
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
this.bucketLifeCycleListener = bucketLifeCycleListener;
this.subtaskIndex = subtaskIndex;

this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);
Expand Down Expand Up @@ -219,6 +224,10 @@ void commitUpToCheckpoint(final long checkpointId) throws IOException {
// We've dealt with all the pending files and the writer for this bucket is not currently open.
// Therefore this bucket is currently inactive and we can remove it from our state.
activeBucketIt.remove();

if (bucketLifeCycleListener != null) {
bucketLifeCycleListener.bucketInactive(bucket);
}
}
}
}
Expand Down Expand Up @@ -295,6 +304,10 @@ private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final BucketID bucketI
rollingPolicy,
outputFileConfig);
activeBuckets.put(bucketId, bucket);

if (bucketLifeCycleListener != null) {
bucketLifeCycleListener.bucketCreated(bucket);
}
}
return bucket;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.api.functions.sink.filesystem;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.BulkWriter;
Expand Down Expand Up @@ -222,6 +223,8 @@ public static class RowFormatBuilder<IN, BucketID, T extends RowFormatBuilder<IN

private OutputFileConfig outputFileConfig;

private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;

protected RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, BucketID> bucketAssigner) {
this(basePath, encoder, bucketAssigner, DefaultRollingPolicy.builder().build(), DEFAULT_BUCKET_CHECK_INTERVAL, new DefaultBucketFactoryImpl<>(), OutputFileConfig.builder().build());
}
Expand Down Expand Up @@ -262,6 +265,12 @@ public T withRollingPolicy(final RollingPolicy<IN, BucketID> policy) {
return self();
}

@Internal
public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, BucketID> listener) {
this.bucketLifeCycleListener = Preconditions.checkNotNull(listener);
return self();
}

public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
this.outputFileConfig = outputFileConfig;
return self();
Expand Down Expand Up @@ -291,6 +300,7 @@ Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
bucketFactory,
new RowWisePartWriter.Factory<>(encoder),
rollingPolicy,
bucketLifeCycleListener,
subtaskIndex,
outputFileConfig);
}
Expand Down Expand Up @@ -326,6 +336,8 @@ public static class BulkFormatBuilder<IN, BucketID, T extends BulkFormatBuilder<

private CheckpointRollingPolicy<IN, BucketID> rollingPolicy;

private BucketLifeCycleListener<IN, BucketID> bucketLifeCycleListener;

private BucketFactory<IN, BucketID> bucketFactory;

private OutputFileConfig outputFileConfig;
Expand Down Expand Up @@ -371,6 +383,12 @@ public T withRollingPolicy(CheckpointRollingPolicy<IN, BucketID> rollingPolicy)
return self();
}

@Internal
public T withBucketLifeCycleListener(final BucketLifeCycleListener<IN, BucketID> listener) {
this.bucketLifeCycleListener = Preconditions.checkNotNull(listener);
return self();
}

@VisibleForTesting
T withBucketFactory(final BucketFactory<IN, BucketID> factory) {
this.bucketFactory = Preconditions.checkNotNull(factory);
Expand Down Expand Up @@ -401,6 +419,7 @@ Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
bucketFactory,
new BulkPartWriter.Factory<>(writerFactory),
rollingPolicy,
bucketLifeCycleListener,
subtaskIndex,
outputFileConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void testAssembleBucketPath() throws Exception {
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
rollingPolicy,
null,
0,
OutputFileConfig.builder().build()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils.MockListState;
Expand All @@ -34,8 +35,13 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -317,6 +323,7 @@ private void testCorrectTimestampPassingInContext(Long timestamp, long watermark
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
DefaultRollingPolicy.builder().build(),
null,
2,
OutputFileConfig.builder().build()
);
Expand Down Expand Up @@ -367,6 +374,66 @@ public SimpleVersionedSerializer<String> getSerializer() {
}
}

@Test
public void testBucketLifeCycleListenerOnCreatingAndInactive() throws Exception {
File outDir = TEMP_FOLDER.newFolder();
Path path = new Path(outDir.toURI());
OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy =
new OnProcessingTimePolicy<>(2L);
RecordBucketLifeCycleListener bucketLifeCycleListener = new RecordBucketLifeCycleListener();
Buckets<String, String> buckets = createBuckets(
path,
rollOnProcessingTimeCountingPolicy,
bucketLifeCycleListener,
0,
OutputFileConfig.builder().build());
ListState<byte[]> bucketStateContainer = new MockListState<>();
ListState<Long> partCounterContainer = new MockListState<>();

buckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
buckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 3L));

// Will close the part file writer of the bucket "test1".
buckets.onProcessingTime(4);
buckets.snapshotState(0, bucketStateContainer, partCounterContainer);
buckets.commitUpToCheckpoint(0);

// Will close the part file writer of the bucket "test2".
buckets.onProcessingTime(6);
buckets.snapshotState(1, bucketStateContainer, partCounterContainer);
buckets.commitUpToCheckpoint(1);

List<Tuple2<RecordBucketLifeCycleListener.EventType, String>> expectedEvents = Arrays.asList(
new Tuple2<>(RecordBucketLifeCycleListener.EventType.CREATED, "test1"),
new Tuple2<>(RecordBucketLifeCycleListener.EventType.CREATED, "test2"),
new Tuple2<>(RecordBucketLifeCycleListener.EventType.INACTIVE, "test1"),
new Tuple2<>(RecordBucketLifeCycleListener.EventType.INACTIVE, "test2"));
Assert.assertEquals(expectedEvents, bucketLifeCycleListener.getEvents());
}

private static class RecordBucketLifeCycleListener implements BucketLifeCycleListener<String, String> {
public enum EventType {
CREATED,
INACTIVE
}

private List<Tuple2<EventType, String>> events = new ArrayList<>();

@Override
public void bucketCreated(Bucket<String, String> bucket) {
events.add(new Tuple2<>(EventType.CREATED, bucket.getBucketId()));
}

@Override
public void bucketInactive(Bucket<String, String> bucket) {
events.add(new Tuple2<>(EventType.INACTIVE, bucket.getBucketId()));
}

public List<Tuple2<EventType, String>> getEvents() {
return events;
}
}

// ------------------------------- Utility Methods --------------------------------

private static Buckets<String, String> createBuckets(
Expand All @@ -376,13 +443,15 @@ private static Buckets<String, String> createBuckets(
return createBuckets(
basePath,
rollingPolicy,
null,
subtaskIdx,
OutputFileConfig.builder().build());
}

private static Buckets<String, String> createBuckets(
final Path basePath,
final RollingPolicy<String, String> rollingPolicy,
@Nullable final BucketLifeCycleListener<String, String> bucketLifeCycleListener,
final int subtaskIdx,
final OutputFileConfig outputFileConfig) throws IOException {
return new Buckets<>(
Expand All @@ -391,6 +460,7 @@ private static Buckets<String, String> createBuckets(
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
rollingPolicy,
bucketLifeCycleListener,
subtaskIdx,
outputFileConfig
);
Expand Down Expand Up @@ -418,7 +488,12 @@ private static Buckets<String, String> restoreBuckets(
final ListState<byte[]> bucketState,
final ListState<Long> partCounterState,
final OutputFileConfig outputFileConfig) throws Exception {
final Buckets<String, String> restoredBuckets = createBuckets(basePath, rollingPolicy, subtaskIdx, outputFileConfig);
final Buckets<String, String> restoredBuckets = createBuckets(
basePath,
rollingPolicy,
null,
subtaskIdx,
outputFileConfig);
restoredBuckets.initializeState(bucketState, partCounterState);
return restoredBuckets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ private static Buckets<String, String> createBuckets(
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
rollingPolicyToTest,
null,
0,
OutputFileConfig.builder().build()
);
Expand Down

0 comments on commit 3f487c9

Please sign in to comment.