diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java index d08bc2ac0c382..7b35e50283065 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Buckets.java @@ -254,7 +254,7 @@ private void snapshotActiveBuckets( } } - void onElement(final IN value, final SinkFunction.Context context) throws Exception { + Bucket onElement(final IN value, final SinkFunction.Context context) throws Exception { final long currentProcessingTime = context.currentProcessingTime(); // setting the values in the bucketer context @@ -272,6 +272,7 @@ void onElement(final IN value, final SinkFunction.Context context) throws Except // another part file for the bucket, if we start from 0 we may overwrite previous parts. this.maxPartCounter = Math.max(maxPartCounter, bucket.getPartCounter()); + return bucket; } private Bucket getOrCreateBucketForBucketId(final BucketID bucketId) throws IOException { @@ -304,7 +305,11 @@ void close() { } private Path assembleBucketPath(BucketID bucketId) { - return new Path(basePath, bucketId.toString()); + final String child = bucketId.toString(); + if ("".equals(child)) { + return basePath; + } + return new Path(basePath, child); } /** diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java new file mode 100644 index 0000000000000..5ae57ce43bc21 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketAssignerITCases.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.functions.sink.filesystem; + +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; + +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; + +/** + * Integration tests for {@link BucketAssigner bucket assigners}. + */ +public class BucketAssignerITCases { + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + @Test + public void testAssembleBucketPath() throws Exception { + final File outDir = TEMP_FOLDER.newFolder(); + final Path basePath = new Path(outDir.toURI()); + final long time = 1000L; + + final RollingPolicy rollingPolicy = + DefaultRollingPolicy + .create() + .withMaxPartSize(7L) + .build(); + + final Buckets buckets = new Buckets<>( + basePath, + new BasePathBucketAssigner<>(), + new DefaultBucketFactoryImpl<>(), + new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()), + rollingPolicy, + 0 + ); + + Bucket bucket = + buckets.onElement("abc", new TestUtils.MockSinkContext(time, time, time)); + Assert.assertEquals(new Path(basePath.toUri()), bucket.getBucketPath()); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java index aee362178a76e..8369b90ac9665 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.java @@ -188,7 +188,7 @@ public void testOnProcessingTime() throws Exception { // it takes the current processing time of the context for the creation time, // and for the last modification time. - buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L , 3L)); + buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L, 3L)); // now it should roll buckets.onProcessingTime(7L); @@ -214,13 +214,13 @@ public void testBucketIsRemovedWhenNotActive() throws Exception { final Path path = new Path(outDir.toURI()); final OnProcessingTimePolicy rollOnProcessingTimeCountingPolicy = - new OnProcessingTimePolicy<>(2L); + new OnProcessingTimePolicy<>(2L); final Buckets buckets = createBuckets(path, rollOnProcessingTimeCountingPolicy, 0); // it takes the current processing time of the context for the creation time, and for the last modification time. - buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L , 3L)); + buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L, 3L)); // now it should roll buckets.onProcessingTime(7L); @@ -244,7 +244,7 @@ public void testPartCounterAfterBucketResurrection() throws Exception { createBuckets(path, rollOnProcessingTimeCountingPolicy, 0); // it takes the current processing time of the context for the creation time, and for the last modification time. - buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L , 3L)); + buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L, 3L)); Assert.assertEquals(1L, buckets.getActiveBuckets().get("test").getPartCounter()); // now it should roll @@ -257,7 +257,7 @@ public void testPartCounterAfterBucketResurrection() throws Exception { Assert.assertTrue(buckets.getActiveBuckets().isEmpty()); - buckets.onElement("test", new TestUtils.MockSinkContext(2L, 3L , 4L)); + buckets.onElement("test", new TestUtils.MockSinkContext(2L, 3L, 4L)); Assert.assertEquals(2L, buckets.getActiveBuckets().get("test").getPartCounter()); }