Skip to content

Commit

Permalink
[FLINK-15067][table-api] Add utility method to TableConfig that adds …
Browse files Browse the repository at this point in the history
…a new job parameter
  • Loading branch information
dawidwys committed Dec 9, 2019
1 parent b0785ba commit bb6fd82
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def java_class(cls):
@classmethod
def excluded_methods(cls):
# internal interfaces, no need to expose to users.
return {'getPlannerConfig', 'setPlannerConfig'}
return {'getPlannerConfig', 'setPlannerConfig', 'addJobParameter'}

@classmethod
def java_method_name(cls, python_method_name):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

package org.apache.flink.table.api;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.util.Preconditions;

import java.math.MathContext;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;

/**
* Configuration for the current {@link TableEnvironment} session to adjust Table & SQL API programs.
Expand Down Expand Up @@ -262,6 +266,32 @@ public long getMaxIdleStateRetentionTime() {
return maxIdleStateRetentionTime;
}

/**
* Sets a custom user parameter that can be accessed via
* {@link org.apache.flink.table.functions.FunctionContext#getJobParameter(String, String)}.
*
* <p>This will add an entry to the current value of {@link PipelineOptions#GLOBAL_JOB_PARAMETERS}.
*
* <p>It is also possible to set multiple parameters at once, which will override any previously set
* parameters:
* <pre>
* {@code
* Map<String, String> params = ...
* TableConfig config = tEnv.getConfig;
* config.getConfiguration().set(PipelineOptions.GLOBAL_JOB_PARAMETERS, params);
* }
* </pre>
*/
@Experimental
public void addJobParameter(String key, String value) {
Map<String, String> params = getConfiguration()
.getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS)
.map(HashMap::new)
.orElseGet(HashMap::new);
params.put(key, value);
getConfiguration().set(PipelineOptions.GLOBAL_JOB_PARAMETERS, params);
}

public static TableConfig getDefault() {
return new TableConfig();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,8 +552,7 @@ class ExpressionReductionRulesTest extends TableTestBase {
util.addFunction("MyUdf", new RichFunc1)
util.tableEnv
.getConfig
.getConfiguration
.set(PipelineOptions.GLOBAL_JOB_PARAMETERS, Map("int.value" -> "10").asJava)
.addJobParameter("int.value", "10")

val expected = unaryNode(
"DataStreamCalc",
Expand Down

0 comments on commit bb6fd82

Please sign in to comment.