Skip to content

Commit

Permalink
[FLINK-11140][fs-connector] Fix empty child path check in Buckets.
Browse files Browse the repository at this point in the history
This closes apache#7287.
  • Loading branch information
Matrix42 authored and kl0u committed Jan 9, 2019
1 parent 6a4bc90 commit 3702029
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ private void snapshotActiveBuckets(
}
}

void onElement(final IN value, final SinkFunction.Context context) throws Exception {
Bucket<IN, BucketID> onElement(final IN value, final SinkFunction.Context context) throws Exception {
final long currentProcessingTime = context.currentProcessingTime();

// setting the values in the bucketer context
Expand All @@ -272,6 +272,7 @@ void onElement(final IN value, final SinkFunction.Context context) throws Except
// another part file for the bucket, if we start from 0 we may overwrite previous parts.

this.maxPartCounter = Math.max(maxPartCounter, bucket.getPartCounter());
return bucket;
}

private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final BucketID bucketId) throws IOException {
Expand Down Expand Up @@ -304,7 +305,11 @@ void close() {
}

private Path assembleBucketPath(BucketID bucketId) {
return new Path(basePath, bucketId.toString());
final String child = bucketId.toString();
if ("".equals(child)) {
return basePath;
}
return new Path(basePath, child);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.sink.filesystem;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;

/**
* Integration tests for {@link BucketAssigner bucket assigners}.
*/
public class BucketAssignerITCases {

@ClassRule
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

@Test
public void testAssembleBucketPath() throws Exception {
final File outDir = TEMP_FOLDER.newFolder();
final Path basePath = new Path(outDir.toURI());
final long time = 1000L;

final RollingPolicy<String, String> rollingPolicy =
DefaultRollingPolicy
.create()
.withMaxPartSize(7L)
.build();

final Buckets<String, String> buckets = new Buckets<>(
basePath,
new BasePathBucketAssigner<>(),
new DefaultBucketFactoryImpl<>(),
new RowWisePartWriter.Factory<>(new SimpleStringEncoder<>()),
rollingPolicy,
0
);

Bucket<String, String> bucket =
buckets.onElement("abc", new TestUtils.MockSinkContext(time, time, time));
Assert.assertEquals(new Path(basePath.toUri()), bucket.getBucketPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void testOnProcessingTime() throws Exception {

// it takes the current processing time of the context for the creation time,
// and for the last modification time.
buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L , 3L));
buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L, 3L));

// now it should roll
buckets.onProcessingTime(7L);
Expand All @@ -214,13 +214,13 @@ public void testBucketIsRemovedWhenNotActive() throws Exception {
final Path path = new Path(outDir.toURI());

final OnProcessingTimePolicy<String, String> rollOnProcessingTimeCountingPolicy =
new OnProcessingTimePolicy<>(2L);
new OnProcessingTimePolicy<>(2L);

final Buckets<String, String> buckets =
createBuckets(path, rollOnProcessingTimeCountingPolicy, 0);

// it takes the current processing time of the context for the creation time, and for the last modification time.
buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L , 3L));
buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L, 3L));

// now it should roll
buckets.onProcessingTime(7L);
Expand All @@ -244,7 +244,7 @@ public void testPartCounterAfterBucketResurrection() throws Exception {
createBuckets(path, rollOnProcessingTimeCountingPolicy, 0);

// it takes the current processing time of the context for the creation time, and for the last modification time.
buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L , 3L));
buckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L, 3L));
Assert.assertEquals(1L, buckets.getActiveBuckets().get("test").getPartCounter());

// now it should roll
Expand All @@ -257,7 +257,7 @@ public void testPartCounterAfterBucketResurrection() throws Exception {

Assert.assertTrue(buckets.getActiveBuckets().isEmpty());

buckets.onElement("test", new TestUtils.MockSinkContext(2L, 3L , 4L));
buckets.onElement("test", new TestUtils.MockSinkContext(2L, 3L, 4L));
Assert.assertEquals(2L, buckets.getActiveBuckets().get("test").getPartCounter());
}

Expand Down

0 comments on commit 3702029

Please sign in to comment.