Skip to content

Commit

Permalink
[FLINK-18545][configuration] Introduce pipeline.name to allow users…
Browse files Browse the repository at this point in the history
… to specify job name by configuration
  • Loading branch information
godfreyhe committed Nov 27, 2020
1 parent 144a9b2 commit 78d6ef6
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 4 deletions.
6 changes: 6 additions & 0 deletions docs/_includes/generated/pipeline_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<td>Integer</td>
<td>The program-wide maximum parallelism used for operators which haven't specified a maximum parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state.</td>
</tr>
<tr>
<td><h5>pipeline.name</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The job name used for printing and logging.</td>
</tr>
<tr>
<td><h5>pipeline.object-reuse</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@
*/
@PublicEvolving
public class PipelineOptions {

/**
* The job name used for printing and logging.
*/
public static final ConfigOption<String> NAME =
key("pipeline.name")
.stringType()
.noDefaultValue()
.withDescription("The job name used for printing and logging.");

/**
* A list of jar files that contain the user-defined function (UDF) classes and all classes used from within the UDFs.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,9 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
configuration.getOptional(ExecutionOptions.USE_BATCH_STATE_BACKEND).ifPresent(
sortInputs -> this.getConfiguration().set(ExecutionOptions.USE_BATCH_STATE_BACKEND, sortInputs)
);
configuration.getOptional(PipelineOptions.NAME).ifPresent(
jobName -> this.getConfiguration().set(PipelineOptions.NAME, jobName)
);
config.configure(configuration, classLoader);
checkpointCfg.configure(configuration);
}
Expand Down Expand Up @@ -1798,7 +1801,7 @@ public <OUT> DataStreamSource<OUT> fromSource(
* @throws Exception which occurs during job execution.
*/
public JobExecutionResult execute() throws Exception {
return execute(DEFAULT_JOB_NAME);
return execute(getJobName());
}

/**
Expand Down Expand Up @@ -1891,7 +1894,7 @@ public void clearJobListeners() {
*/
@PublicEvolving
public final JobClient executeAsync() throws Exception {
return executeAsync(DEFAULT_JOB_NAME);
return executeAsync(getJobName());
}

/**
Expand Down Expand Up @@ -1958,7 +1961,7 @@ public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
*/
@Internal
public StreamGraph getStreamGraph() {
return getStreamGraph(DEFAULT_JOB_NAME);
return getStreamGraph(getJobName());
}

/**
Expand Down Expand Up @@ -2018,7 +2021,7 @@ private StreamGraphGenerator getStreamGraphGenerator() {
* @return The execution plan of the program, as a JSON String.
*/
public String getExecutionPlan() {
return getStreamGraph(DEFAULT_JOB_NAME, false).getStreamingPlanAsJSON();
return getStreamGraph(getJobName(), false).getStreamingPlanAsJSON();
}

/**
Expand Down Expand Up @@ -2350,4 +2353,8 @@ private <OUT, T extends TypeInformation<OUT>> T getTypeInfo(
}
return (T) resolvedTypeInfo;
}

private String getJobName() {
return configuration.getString(PipelineOptions.NAME, DEFAULT_JOB_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
Expand Down Expand Up @@ -269,6 +271,37 @@ public void testGetStreamGraph() {
}
}

@Test
public void testDefaultJobName() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
testJobName(StreamExecutionEnvironment.DEFAULT_JOB_NAME, env);
}

@Test
public void testUserDefinedJobName() {
String jobName = "MyTestJob";
Configuration config = new Configuration();
config.set(PipelineOptions.NAME, jobName);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
testJobName(jobName, env);
}

@Test
public void testUserDefinedJobNameWithConfigure() {
String jobName = "MyTestJob";
Configuration config = new Configuration();
config.set(PipelineOptions.NAME, jobName);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.configure(config, this.getClass().getClassLoader());
testJobName(jobName, env);
}

private void testJobName(String expectedJobName, StreamExecutionEnvironment env) {
env.fromElements(1, 2, 3).print();
StreamGraph streamGraph = env.getStreamGraph();
assertEquals(expectedJobName, streamGraph.getJobName());
}

@Test
public void testAddSourceWithUserDefinedTypeInfo() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand Down

0 comments on commit 78d6ef6

Please sign in to comment.