diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index b5dfbe5160745..6d972bc5ab426 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -1044,6 +1044,9 @@ protected Client getClient( jobManagerAddress = yarnCluster.getJobManagerAddress(); writeJobManagerAddressToConfig(jobManagerAddress); + + // overwrite the yarn client config (because the client parses the dynamic properties) + this.config.addAll(flinkYarnClient.getFlinkConfiguration()); logAndSysout("YARN cluster started"); logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL()); @@ -1180,8 +1183,9 @@ public Integer run() throws Exception { catch (Exception e) { return handleError(e); } + } else { + return run(params); } - return run(params); case ACTION_LIST: return list(params); case ACTION_INFO: diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java index 94de5c4592e24..91f8df2970cdc 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -142,7 +142,7 @@ public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) { String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv(); GlobalConfiguration.loadConfiguration(confDirPath); Configuration flinkConfiguration = GlobalConfiguration.getConfiguration(); - flinkYarnClient.setFlinkConfigurationObject(flinkConfiguration); + flinkYarnClient.setFlinkConfiguration(flinkConfiguration); flinkYarnClient.setConfigurationDirectory(confDirPath); File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME); if (!confFile.exists()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java index dbab41cef1862..1be879cf5fc88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java @@ -44,6 +44,7 @@ public StandaloneLeaderRetrievalService(String jobManagerAddress) { this.jobManagerAddress = jobManagerAddress; } + @Override public void start(LeaderRetrievalListener listener) { Preconditions.checkNotNull(listener, "Listener must not be null."); Preconditions.checkState(leaderListener == null, "StandaloneLeaderRetrievalService can " + @@ -55,5 +56,6 @@ public void start(LeaderRetrievalListener listener) { leaderListener.notifyLeaderAddress(jobManagerAddress, null); } + @Override public void stop() {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java index 83a976d6588d6..c1498c56890d8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java @@ -17,6 +17,7 @@ */ package org.apache.flink.runtime.yarn; +import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.Path; import java.io.File; import java.util.List; @@ -43,7 +44,9 @@ public abstract class AbstractFlinkYarnClient { /** * Flink configuration */ - public abstract void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf); + public abstract void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf); + + public abstract Configuration getFlinkConfiguration(); /** * diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index 6937e1bc9370a..2c2fbb3dbf261 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -223,7 +223,7 @@ object FlinkShell { val confFile = new File(confDirPath + File.separator + "flink-conf.yaml") val confPath = new Path(confFile.getAbsolutePath) GlobalConfiguration.loadConfiguration(confDirPath) - yarnClient.setFlinkConfigurationObject(flinkConfiguration) + yarnClient.setFlinkConfiguration(flinkConfiguration) yarnClient.setConfigurationDirectory(confDirPath) yarnClient.setConfigurationFilePath(confPath) diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java index f68b14153bc79..a93abf06bd75c 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java @@ -111,7 +111,7 @@ public void testMultipleAMKill() throws Exception { String fsStateHandlePath = tmp.getRoot().getPath(); - flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration()); + flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration()); flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" + zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + "@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" + diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java index d713b7360d1a2..cb402a3f55291 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java @@ -227,7 +227,7 @@ public void testJavaAPI() { flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles())); String confDirPath = System.getenv("FLINK_CONF_DIR"); flinkYarnClient.setConfigurationDirectory(confDirPath); - flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration()); + flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration()); flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml")); // deploy diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java index ef02be3c4d5bb..6f81d09d46595 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java @@ -124,7 +124,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { private String dynamicPropertiesEncoded; - private List shipFiles = new ArrayList(); + private List shipFiles = new ArrayList<>(); private org.apache.flink.configuration.Configuration flinkConfiguration; private boolean detached; @@ -174,10 +174,15 @@ public void setTaskManagerMemory(int memoryMb) { } @Override - public void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf) { + public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) { this.flinkConfiguration = conf; } + @Override + public org.apache.flink.configuration.Configuration getFlinkConfiguration() { + return flinkConfiguration; + } + @Override public void setTaskManagerSlots(int slots) { if(slots <= 0) { @@ -209,6 +214,7 @@ public void setConfigurationFilePath(Path confPath) { flinkConfigurationPath = confPath; } + @Override public void setConfigurationDirectory(String configurationDirectory) { this.configurationDirectory = configurationDirectory; } @@ -247,6 +253,7 @@ public void setShipFiles(List shipFiles) { } } + @Override public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) { this.dynamicPropertiesEncoded = dynamicPropertiesEncoded; } @@ -303,6 +310,7 @@ public boolean isDetached() { return detached; } + @Override public AbstractFlinkYarnCluster deploy() throws Exception { UserGroupInformation.setConfiguration(conf); @@ -542,7 +550,7 @@ protected AbstractFlinkYarnCluster deployInternal() throws Exception { LocalResource flinkConf = Records.newRecord(LocalResource.class); Path remotePathJar = Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory()); Path remotePathConf = Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory()); - Map localResources = new HashMap(2); + Map localResources = new HashMap<>(2); localResources.put("flink.jar", appMasterJar); localResources.put("flink-conf.yaml", flinkConf); @@ -578,7 +586,7 @@ protected AbstractFlinkYarnCluster deployInternal() throws Exception { fs.close(); // Setup CLASSPATH for ApplicationMaster - Map appMasterEnv = new HashMap(); + Map appMasterEnv = new HashMap<>(); // set user specified app master environment variables appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, flinkConfiguration)); // set classpath from YARN configuration @@ -728,6 +736,7 @@ private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yar return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree); } + @Override public String getClusterDescription() throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); @@ -763,6 +772,7 @@ public String getClusterDescription() throws Exception { return baos.toString(); } + @Override public String getSessionFilesDir() { return sessionFilesDir.toString(); }