Skip to content

Commit

Permalink
[FLINK-3712] Make all dynamic properties available to the CLI frontend
Browse files Browse the repository at this point in the history
This closes apache#1863
  • Loading branch information
rmetzger committed Apr 11, 2016
1 parent a234719 commit b368cb2
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1044,6 +1044,9 @@ protected Client getClient(

jobManagerAddress = yarnCluster.getJobManagerAddress();
writeJobManagerAddressToConfig(jobManagerAddress);

// overwrite the yarn client config (because the client parses the dynamic properties)
this.config.addAll(flinkYarnClient.getFlinkConfiguration());

logAndSysout("YARN cluster started");
logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL());
Expand Down Expand Up @@ -1180,8 +1183,9 @@ public Integer run() throws Exception {
catch (Exception e) {
return handleError(e);
}
} else {
return run(params);
}
return run(params);
case ACTION_LIST:
return list(params);
case ACTION_INFO:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
GlobalConfiguration.loadConfiguration(confDirPath);
Configuration flinkConfiguration = GlobalConfiguration.getConfiguration();
flinkYarnClient.setFlinkConfigurationObject(flinkConfiguration);
flinkYarnClient.setFlinkConfiguration(flinkConfiguration);
flinkYarnClient.setConfigurationDirectory(confDirPath);
File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME);
if (!confFile.exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public StandaloneLeaderRetrievalService(String jobManagerAddress) {
this.jobManagerAddress = jobManagerAddress;
}

@Override
public void start(LeaderRetrievalListener listener) {
Preconditions.checkNotNull(listener, "Listener must not be null.");
Preconditions.checkState(leaderListener == null, "StandaloneLeaderRetrievalService can " +
Expand All @@ -55,5 +56,6 @@ public void start(LeaderRetrievalListener listener) {
leaderListener.notifyLeaderAddress(jobManagerAddress, null);
}

@Override
public void stop() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.flink.runtime.yarn;

import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.fs.Path;
import java.io.File;
import java.util.List;
Expand All @@ -43,7 +44,9 @@ public abstract class AbstractFlinkYarnClient {
/**
* Flink configuration
*/
public abstract void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf);
public abstract void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf);

public abstract Configuration getFlinkConfiguration();

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ object FlinkShell {
val confFile = new File(confDirPath + File.separator + "flink-conf.yaml")
val confPath = new Path(confFile.getAbsolutePath)
GlobalConfiguration.loadConfiguration(confDirPath)
yarnClient.setFlinkConfigurationObject(flinkConfiguration)
yarnClient.setFlinkConfiguration(flinkConfiguration)
yarnClient.setConfigurationDirectory(confDirPath)
yarnClient.setConfigurationFilePath(confPath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public void testMultipleAMKill() throws Exception {

String fsStateHandlePath = tmp.getRoot().getPath();

flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public void testJavaAPI() {
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
String confDirPath = System.getenv("FLINK_CONF_DIR");
flinkYarnClient.setConfigurationDirectory(confDirPath);
flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));

// deploy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {

private String dynamicPropertiesEncoded;

private List<File> shipFiles = new ArrayList<File>();
private List<File> shipFiles = new ArrayList<>();
private org.apache.flink.configuration.Configuration flinkConfiguration;

private boolean detached;
Expand Down Expand Up @@ -174,10 +174,15 @@ public void setTaskManagerMemory(int memoryMb) {
}

@Override
public void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf) {
public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
this.flinkConfiguration = conf;
}

@Override
public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
return flinkConfiguration;
}

@Override
public void setTaskManagerSlots(int slots) {
if(slots <= 0) {
Expand Down Expand Up @@ -209,6 +214,7 @@ public void setConfigurationFilePath(Path confPath) {
flinkConfigurationPath = confPath;
}

@Override
public void setConfigurationDirectory(String configurationDirectory) {
this.configurationDirectory = configurationDirectory;
}
Expand Down Expand Up @@ -247,6 +253,7 @@ public void setShipFiles(List<File> shipFiles) {
}
}

@Override
public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
}
Expand Down Expand Up @@ -303,6 +310,7 @@ public boolean isDetached() {
return detached;
}

@Override
public AbstractFlinkYarnCluster deploy() throws Exception {

UserGroupInformation.setConfiguration(conf);
Expand Down Expand Up @@ -542,7 +550,7 @@ protected AbstractFlinkYarnCluster deployInternal() throws Exception {
LocalResource flinkConf = Records.newRecord(LocalResource.class);
Path remotePathJar = Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar, fs.getHomeDirectory());
Path remotePathConf = Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath, flinkConf, fs.getHomeDirectory());
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
Map<String, LocalResource> localResources = new HashMap<>(2);
localResources.put("flink.jar", appMasterJar);
localResources.put("flink-conf.yaml", flinkConf);

Expand Down Expand Up @@ -578,7 +586,7 @@ protected AbstractFlinkYarnCluster deployInternal() throws Exception {
fs.close();

// Setup CLASSPATH for ApplicationMaster
Map<String, String> appMasterEnv = new HashMap<String, String>();
Map<String, String> appMasterEnv = new HashMap<>();
// set user specified app master environment variables
appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX, flinkConfiguration));
// set classpath from YARN configuration
Expand Down Expand Up @@ -728,6 +736,7 @@ private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yar
return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
}

@Override
public String getClusterDescription() throws Exception {

ByteArrayOutputStream baos = new ByteArrayOutputStream();
Expand Down Expand Up @@ -763,6 +772,7 @@ public String getClusterDescription() throws Exception {
return baos.toString();
}

@Override
public String getSessionFilesDir() {
return sessionFilesDir.toString();
}
Expand Down

0 comments on commit b368cb2

Please sign in to comment.