Skip to content

Commit

Permalink
[FLINK-18545][configuration] Allow users to specify job name by confi…
Browse files Browse the repository at this point in the history
…guration in ExecutionEnvironment
  • Loading branch information
godfreyhe committed Nov 27, 2020
1 parent 78d6ef6 commit eaad167
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@

package org.apache.flink.client;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import java.io.Serializable;

import static junit.framework.TestCase.fail;
import static org.junit.Assert.assertTrue;


/**
Expand Down Expand Up @@ -63,4 +67,35 @@ public void testExecuteAfterGetExecutionPlanContextEnvironment() {
fail("Consecutive #getExecutionPlan calls caused an exception.");
}
}

@Test
public void testDefaultJobName() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
testJobName("Flink Java Job at", env);
}

@Test
public void testUserDefinedJobName() {
String jobName = "MyTestJob";
Configuration config = new Configuration();
config.set(PipelineOptions.NAME, jobName);
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(config);
testJobName(jobName, env);
}

@Test
public void testUserDefinedJobNameWithConfigure() {
String jobName = "MyTestJob";
Configuration config = new Configuration();
config.set(PipelineOptions.NAME, jobName);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.configure(config, this.getClass().getClassLoader());
testJobName(jobName, env);
}

private void testJobName(String prefixOfExpectedJobName, ExecutionEnvironment env) {
env.fromElements(1, 2, 3).writeAsText("/dev/null");
Plan plan = env.createProgramPlan();
assertTrue(plan.getJobName().startsWith(prefixOfExpectedJobName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,8 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
this.cacheFile.clear();
this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f));
});
configuration.getOptional(PipelineOptions.NAME)
.ifPresent(jobName -> this.getConfiguration().set(PipelineOptions.NAME, jobName));
config.configure(configuration, classLoader);
}

Expand Down Expand Up @@ -870,7 +872,7 @@ public DataSource<Long> generateSequence(long from, long to) {
* @throws Exception Thrown, if the program executions fails.
*/
public JobExecutionResult execute() throws Exception {
return execute(getDefaultName());
return execute(getJobName());
}

/**
Expand Down Expand Up @@ -945,7 +947,7 @@ public void clearJobListeners() {
*/
@PublicEvolving
public final JobClient executeAsync() throws Exception {
return executeAsync(getDefaultName());
return executeAsync(getJobName());
}

/**
Expand Down Expand Up @@ -998,7 +1000,7 @@ public JobClient executeAsync(String jobName) throws Exception {
* @throws Exception Thrown, if the compiler could not be instantiated.
*/
public String getExecutionPlan() throws Exception {
Plan p = createProgramPlan(getDefaultName(), false);
Plan p = createProgramPlan(getJobName(), false);
return ExecutionPlanUtil.getExecutionPlanAsJSON(p);
}

Expand Down Expand Up @@ -1051,7 +1053,7 @@ public void registerCachedFile(String filePath, String name, boolean executable)
*/
@Internal
public Plan createProgramPlan() {
return createProgramPlan(getDefaultName());
return createProgramPlan(getJobName());
}

/**
Expand Down Expand Up @@ -1122,12 +1124,13 @@ void registerDataSink(DataSink<?> sink) {
}

/**
* Gets a default job name, based on the timestamp when this method is invoked.
* Gets the job name. If user defined job name is not found in the configuration,
* the default name based on the timestamp when this method is invoked will return.
*
* @return A default job name.
* @return A job name.
*/
private static String getDefaultName() {
return "Flink Java Job at " + Calendar.getInstance().getTime();
private String getJobName() {
return configuration.getString(PipelineOptions.NAME, "Flink Java Job at " + Calendar.getInstance().getTime());
}

// --------------------------------------------------------------------------------------------
Expand Down

0 comments on commit eaad167

Please sign in to comment.