Skip to content

Commit

Permalink
[FLINK-13850] refactor part file configurations into a single method
Browse files Browse the repository at this point in the history
  • Loading branch information
eskabetxe authored and Gyula Fora committed Nov 15, 2019
1 parent cc02399 commit 22b2a88
Show file tree
Hide file tree
Showing 14 changed files with 192 additions and 131 deletions.
14 changes: 14 additions & 0 deletions docs/dev/connectors/streamfile_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,20 @@ New buckets are created as dictated by the bucketing policy, and this doesn't af

Old buckets can still receive new records as the bucketing policy is evaluated on a per-record basis.

### Part file configuration

The filenames of the part files could be defined using `OutputFileConfig`, this configuration contain a part prefix and part suffix,
that will be used with the parallel subtask index of the sink and a rolling counter.
For example for a prefix "prefix" and a suffix ".ext" the file create:

```
└── 2019-08-25--12
├── prefix-0-0.ext
├── prefix-0-1.ext.inprogress.bd053eb0-5ecf-4c85-8433-9eff486ac334
├── prefix-1-0.ext
└── prefix-1-1.ext.inprogress.bc279efe-b16f-47d8-b828-00ef6e2fbd11
```

## File Formats

The `StreamingFileSink` supports both row-wise and bulk encoding formats, such as [Apache Parquet](http:https://parquet.apache.org).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class Bucket<IN, BucketID> {

private final NavigableMap<Long, List<CommitRecoverable>> pendingPartsPerCheckpoint;

private final PartFileConfig partFileConfig;
private final OutputFileConfig outputFileConfig;

private long partCounter;

Expand All @@ -90,7 +90,7 @@ private Bucket(
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final PartFileConfig partFileConfig) {
final OutputFileConfig outputFileConfig) {
this.fsWriter = checkNotNull(fsWriter);
this.subtaskIndex = subtaskIndex;
this.bucketId = checkNotNull(bucketId);
Expand All @@ -103,7 +103,7 @@ private Bucket(
this.pendingPartsPerCheckpoint = new TreeMap<>();
this.resumablesPerCheckpoint = new TreeMap<>();

this.partFileConfig = checkNotNull(partFileConfig);
this.outputFileConfig = checkNotNull(outputFileConfig);
}

/**
Expand All @@ -116,7 +116,7 @@ private Bucket(
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final PartFileConfig partFileConfig) throws IOException {
final OutputFileConfig outputFileConfig) throws IOException {

this(
fsWriter,
Expand All @@ -126,7 +126,7 @@ private Bucket(
initialPartCounter,
partFileFactory,
rollingPolicy,
partFileConfig);
outputFileConfig);

restoreInProgressFile(bucketState);
commitRecoveredPendingFiles(bucketState);
Expand Down Expand Up @@ -230,7 +230,7 @@ private void rollPartFile(final long currentTime) throws IOException {
}

private Path assembleNewPartPath() {
return new Path(bucketPath, partFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + partFileConfig.getPartSuffix());
return new Path(bucketPath, outputFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + outputFileConfig.getPartSuffix());
}

private CommitRecoverable closePartFile() throws IOException {
Expand Down Expand Up @@ -369,7 +369,7 @@ List<CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
* @param partFileFactory the {@link PartFileWriter.PartFileFactory} the factory creating part file writers.
* @param <IN> the type of input elements to the sink.
* @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
* @param partFileConfig the part file configuration.
* @param outputFileConfig the part file configuration.
* @return The new Bucket.
*/
static <IN, BucketID> Bucket<IN, BucketID> getNew(
Expand All @@ -380,8 +380,8 @@ static <IN, BucketID> Bucket<IN, BucketID> getNew(
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final PartFileConfig partFileConfig) {
return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, partFileConfig);
final OutputFileConfig outputFileConfig) {
return new Bucket<>(fsWriter, subtaskIndex, bucketId, bucketPath, initialPartCounter, partFileFactory, rollingPolicy, outputFileConfig);
}

/**
Expand All @@ -393,7 +393,7 @@ static <IN, BucketID> Bucket<IN, BucketID> getNew(
* @param bucketState the initial state of the restored bucket.
* @param <IN> the type of input elements to the sink.
* @param <BucketID> the type of the identifier of the bucket, as returned by the {@link BucketAssigner}
* @param partFileConfig the part file configuration.
* @param outputFileConfig the part file configuration.
* @return The restored Bucket.
*/
static <IN, BucketID> Bucket<IN, BucketID> restore(
Expand All @@ -403,7 +403,7 @@ static <IN, BucketID> Bucket<IN, BucketID> restore(
final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final PartFileConfig partFileConfig) throws IOException {
return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, partFileConfig);
final OutputFileConfig outputFileConfig) throws IOException {
return new Bucket<>(fsWriter, subtaskIndex, initialPartCounter, partFileFactory, rollingPolicy, bucketState, outputFileConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Bucket<IN, BucketID> getNewBucket(
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final PartFileConfig partFileConfig) throws IOException;
final OutputFileConfig outputFileConfig) throws IOException;

Bucket<IN, BucketID> restoreBucket(
final RecoverableWriter fsWriter,
Expand All @@ -48,5 +48,5 @@ Bucket<IN, BucketID> restoreBucket(
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final PartFileConfig partFileConfig) throws IOException;
final OutputFileConfig outputFileConfig) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class Buckets<IN, BucketID> {

private final RecoverableWriter fsWriter;

private final PartFileConfig partFileConfig;
private final OutputFileConfig outputFileConfig;

// --------------------------- State Related Fields -----------------------------

Expand All @@ -99,7 +99,7 @@ public class Buckets<IN, BucketID> {
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final int subtaskIndex,
final PartFileConfig partFileConfig) throws IOException {
final OutputFileConfig outputFileConfig) throws IOException {

this.basePath = Preconditions.checkNotNull(basePath);
this.bucketAssigner = Preconditions.checkNotNull(bucketAssigner);
Expand All @@ -108,7 +108,7 @@ public class Buckets<IN, BucketID> {
this.rollingPolicy = Preconditions.checkNotNull(rollingPolicy);
this.subtaskIndex = subtaskIndex;

this.partFileConfig = Preconditions.checkNotNull(partFileConfig);
this.outputFileConfig = Preconditions.checkNotNull(outputFileConfig);

this.activeBuckets = new HashMap<>();
this.bucketerContext = new Buckets.BucketerContext();
Expand Down Expand Up @@ -186,7 +186,7 @@ private void handleRestoredBucketState(final BucketState<BucketID> recoveredStat
partFileWriterFactory,
rollingPolicy,
recoveredState,
partFileConfig
outputFileConfig
);

updateActiveBucketId(bucketId, restoredBucket);
Expand Down Expand Up @@ -293,7 +293,7 @@ private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final BucketID bucketI
maxPartCounter,
partFileWriterFactory,
rollingPolicy,
partFileConfig);
outputFileConfig);
activeBuckets.put(bucketId, bucket);
}
return bucket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public Bucket<IN, BucketID> getNewBucket(
final long initialPartCounter,
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final PartFileConfig partFileConfig) {
final OutputFileConfig outputFileConfig) {

return Bucket.getNew(
fsWriter,
Expand All @@ -51,7 +51,7 @@ public Bucket<IN, BucketID> getNewBucket(
initialPartCounter,
partFileWriterFactory,
rollingPolicy,
partFileConfig);
outputFileConfig);
}

@Override
Expand All @@ -62,7 +62,7 @@ public Bucket<IN, BucketID> restoreBucket(
final PartFileWriter.PartFileFactory<IN, BucketID> partFileWriterFactory,
final RollingPolicy<IN, BucketID> rollingPolicy,
final BucketState<BucketID> bucketState,
final PartFileConfig partFileConfig) throws IOException {
final OutputFileConfig outputFileConfig) throws IOException {

return Bucket.restore(
fsWriter,
Expand All @@ -71,6 +71,6 @@ public Bucket<IN, BucketID> restoreBucket(
partFileWriterFactory,
rollingPolicy,
bucketState,
partFileConfig);
outputFileConfig);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.sink.filesystem;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;

/**
* Part file name configuration.
* This allow to define a prefix and a suffix to the part file name.
*/
public class OutputFileConfig implements Serializable {

private final String partPrefix;

private final String partSuffix;

/**
* Initiates the {@code PartFileConfig} with values passed as parameters.
*
* @param partPrefix - the beginning of part file name
* @param partSuffix - the ending of part file name
*/
public OutputFileConfig(final String partPrefix, final String partSuffix) {
this.partPrefix = Preconditions.checkNotNull(partPrefix);
this.partSuffix = Preconditions.checkNotNull(partSuffix);
}

/**
* The prefix for the part name.
*/
String getPartPrefix() {
return partPrefix;
}

/**
* The suffix for the part name.
*/
String getPartSuffix() {
return partSuffix;
}

public static OutputFileConfigBuilder builder() {
return new OutputFileConfigBuilder();
}

/**
* A builder to create the part file configuration.
*/
@PublicEvolving
public static class OutputFileConfigBuilder {

private static final String DEFAULT_PART_PREFIX = "part";

private static final String DEFAULT_PART_SUFFIX = "";

private String partPrefix;

private String partSuffix;

private OutputFileConfigBuilder() {
this.partPrefix = DEFAULT_PART_PREFIX;
this.partSuffix = DEFAULT_PART_SUFFIX;
}

public OutputFileConfigBuilder withPartPrefix(String prefix) {
this.partPrefix = prefix;
return this;
}

public OutputFileConfigBuilder withPartSuffix(String suffix) {
this.partSuffix = suffix;
return this;
}

public OutputFileConfig build() {
return new OutputFileConfig(partPrefix, partSuffix);
}
}
}

This file was deleted.

Loading

0 comments on commit 22b2a88

Please sign in to comment.