Skip to content

Commit

Permalink
[FLINK-5542][yarn] Use YarnCluster vcores setting to do MaxVCore vali…
Browse files Browse the repository at this point in the history
…dation.

This closes apache#6775.
  • Loading branch information
leanken-zz authored and GJL committed Oct 9, 2018
1 parent 1e0a779 commit e959e6d
Showing 1 changed file with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,27 @@ private void isReadyForDeployment(ClusterSpecification clusterSpecification) thr
}

// Check if we don't exceed YARN's maximum virtual cores.
// The number of cores can be configured in the config.
// If not configured, it is set to the number of task slots
int numYarnVcores = yarnConfiguration.getInt(YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
// Fetch numYarnMaxVcores from all the RUNNING nodes via yarnClient
final int numYarnMaxVcores;
try {
numYarnMaxVcores = yarnClient.getNodeReports(NodeState.RUNNING)
.stream()
.mapToInt(report -> report.getCapability().getVirtualCores())
.max()
.orElse(0);
} catch (Exception e) {
throw new YarnDeploymentException("Couldn't get cluster description, please check on the YarnConfiguration", e);
}

int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
// don't configure more than the maximum configured number of vcores
if (configuredVcores > numYarnVcores) {
if (configuredVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
String.format("The number of virtual cores per node were configured with %d" +
" but Yarn only has %d virtual cores available. Please note that the number" +
" of virtual cores is set to the number of task slots by default unless configured" +
" in the Flink config with '%s.'",
configuredVcores, numYarnVcores, YarnConfigOptions.VCORES.key()));
String.format("The number of requested virtual cores per node %d" +
" exceeds the maximum number of virtual cores %d available in the Yarn Cluster." +
" Please note that the number of virtual cores is set to the number of task slots by default" +
" unless configured in the Flink config with '%s.'",
configuredVcores, numYarnMaxVcores, YarnConfigOptions.VCORES.key()));
}

// check if required Hadoop environment variables are set. If not, warn user
Expand Down

0 comments on commit e959e6d

Please sign in to comment.