Skip to content

Commit

Permalink
[FLINK-14787][configuration] Add configure method to StreamExecutionE…
Browse files Browse the repository at this point in the history
…nvironment
  • Loading branch information
dawidwys committed Nov 22, 2019
1 parent ccf322c commit 3ec3079
Show file tree
Hide file tree
Showing 10 changed files with 799 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -79,6 +80,28 @@ public DistributedCacheEntry(String filePath, Boolean isExecutable, byte[] blobK
this(filePath, isExecutable, blobKey, false);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DistributedCacheEntry that = (DistributedCacheEntry) o;
return isZipped == that.isZipped &&
Objects.equals(filePath, that.filePath) &&
Objects.equals(isExecutable, that.isExecutable) &&
Arrays.equals(blobKey, that.blobKey);
}

@Override
public int hashCode() {
int result = Objects.hash(filePath, isExecutable, isZipped);
result = 31 * result + Arrays.hashCode(blobKey);
return result;
}

@Override
public String toString() {
return "DistributedCacheEntry{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.flink.configuration;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.TextElement;

import java.time.Duration;

/**
* {@link ConfigOption}s specific for a single execution of a user program.
Expand All @@ -35,4 +39,20 @@ public class ExecutionOptions {
.booleanType()
.defaultValue(false)
.withDescription("Tells if we should use compression for the state snapshot data or not");

public static final ConfigOption<Duration> BUFFER_TIMEOUT =
ConfigOptions.key("execution.buffer-timeout")
.durationType()
.defaultValue(Duration.ofMillis(100))
.withDescription(Description.builder()
.text("The maximum time frequency (milliseconds) for the flushing of the output buffers. By default " +
"the output buffers flush frequently to provide low latency and to aid smooth developer " +
"experience. Setting the parameter can result in three logical modes:")
.list(
TextElement.text("A positive value triggers flushing periodically by that interval"),
TextElement.text("0 triggers flushing after every record thus minimizing latency"),
TextElement.text("-1 ms triggers flushing only when the output buffer is full thus maximizing " +
"throughput")
)
.build());
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,22 @@ public class PipelineOptions {
" sure that only tags are written.")
.build());

public static final ConfigOption<Boolean> OPERATOR_CHAINING =
key("pipeline.operator-chaining")
.booleanType()
.defaultValue(true)
.withDescription("Operator chaining allows non-shuffle operations to be co-located in the same thread " +
"fully avoiding serialization and de-serialization.");

public static final ConfigOption<List<String>> CACHED_FILES =
key("pipeline.cached-files")
.stringType()
.asList()
.noDefaultValue()
.withDescription(Description.builder()
.text("Files to be registered at the distributed cache under the given name. The files will be " +
"accessible from any user-defined function in the (distributed) runtime under a local path. " +
"Files may be local files (which will be distributed via BlobServer), or files in a distributed " +
"file system. The runtime will copy the files temporarily to a local cache, if needed.")
.build());
}
Loading

0 comments on commit 3ec3079

Please sign in to comment.