Skip to content

Commit

Permalink
[hotfix][table] Improve signature of Executor
Browse files Browse the repository at this point in the history
  • Loading branch information
twalthr committed Aug 13, 2021
1 parent d9830ca commit 6353f62
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableEnvironment;

import javax.annotation.Nullable;

import java.util.List;

/**
Expand All @@ -50,14 +52,14 @@ public interface Executor {
* Translates the given transformations to a {@link Pipeline}.
*
* @param transformations list of transformations
* @param configuration configuration options
* @param tableConfiguration table-specific configuration options
* @param defaultJobName default job name if not specified via {@link PipelineOptions#NAME}
* @return The pipeline representing the transformations.
*/
Pipeline createPipeline(
List<Transformation<?>> transformations,
ReadableConfig configuration,
String defaultJobName);
ReadableConfig tableConfiguration,
@Nullable String defaultJobName);

/**
* Executes the given pipeline.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.delegation.Executor;

import javax.annotation.Nullable;

import java.util.List;

/** Mocking {@link Executor} for tests. */
Expand All @@ -38,8 +40,8 @@ public ReadableConfig getConfiguration() {
@Override
public Pipeline createPipeline(
List<Transformation<?>> transformations,
ReadableConfig configuration,
String defaultJobName) {
ReadableConfig tableConfiguration,
@Nullable String defaultJobName) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.flink.table.planner.utils.ExecutorUtils;
import org.apache.flink.util.StringUtils;

import javax.annotation.Nullable;

import java.util.List;

/** Default implementation of {@link Executor}. */
Expand All @@ -60,18 +62,18 @@ public ReadableConfig getConfiguration() {
@Override
public Pipeline createPipeline(
List<Transformation<?>> transformations,
ReadableConfig configuration,
String defaultJobName) {
ReadableConfig tableConfiguration,
@Nullable String defaultJobName) {

// reconfigure before a stream graph is generated
executionEnvironment.configure(configuration);
executionEnvironment.configure(tableConfiguration);

// create stream graph
final RuntimeExecutionMode mode = configuration.get(ExecutionOptions.RUNTIME_MODE);
final RuntimeExecutionMode mode = tableConfiguration.get(ExecutionOptions.RUNTIME_MODE);
final StreamGraph graph;
switch (mode) {
case BATCH:
graph = createBatchGraph(transformations, configuration);
graph = createBatchGraph(transformations, tableConfiguration);
break;
case STREAMING:
graph = createStreamingGraph(transformations);
Expand All @@ -95,19 +97,19 @@ public JobClient executeAsync(Pipeline pipeline) throws Exception {
}

private StreamGraph createBatchGraph(
List<Transformation<?>> transformations, ReadableConfig configuration) {
List<Transformation<?>> transformations, ReadableConfig tableConfiguration) {
ExecutorUtils.setBatchProperties(executionEnvironment);
StreamGraph graph =
ExecutorUtils.generateStreamGraph(executionEnvironment, transformations);
ExecutorUtils.setBatchProperties(graph, configuration);
ExecutorUtils.setBatchProperties(graph, tableConfiguration);
return graph;
}

private StreamGraph createStreamingGraph(List<Transformation<?>> transformations) {
return ExecutorUtils.generateStreamGraph(executionEnvironment, transformations);
}

private void setJobName(StreamGraph streamGraph, String defaultJobName) {
private void setJobName(StreamGraph streamGraph, @Nullable String defaultJobName) {
final String adjustedDefaultJobName =
StringUtils.isNullOrWhitespaceOnly(defaultJobName)
? DEFAULT_JOB_NAME
Expand Down

0 comments on commit 6353f62

Please sign in to comment.