Skip to content

Commit

Permalink
[FLINK-17788][scala-shell] Fix yarn session support in scala shell
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Jun 11, 2020
1 parent 8ee8f29 commit 6744033
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,14 @@ object FlinkShell {
case None => fetchDeployedYarnClusterInfo(config, clusterConfig, "default")
}

println("Configuration: " + effectiveConfig)

(effectiveConfig, clusterClient)
}

private def deployNewYarnCluster(config: Config, flinkConfig: Configuration) = {
val effectiveConfig = new Configuration(flinkConfig)
val args = parseYarnArgList(config, "yarn-cluster")
val args = parseArgList(config, "yarn-cluster")

val configurationDirectory = getConfigDir(config)

Expand All @@ -253,6 +255,7 @@ object FlinkShell {
.deploySessionCluster(clusterSpecification)
.getClusterClient
} finally {
executorConfig.set(DeploymentOptions.TARGET, "yarn-session")
clusterDescriptor.close()
}

Expand All @@ -265,7 +268,7 @@ object FlinkShell {
mode: String) = {

val effectiveConfig = new Configuration(flinkConfig)
val args = parseYarnArgList(config, mode)
val args = parseArgList(config, mode)

val configurationDirectory = getConfigDir(config)

Expand All @@ -284,7 +287,7 @@ object FlinkShell {
(executorConfig, None)
}

def parseYarnArgList(config: Config, mode: String): Array[String] = {
def parseArgList(config: Config, mode: String): Array[String] = {
val args = if (mode == "default") {
ArrayBuffer[String]()
} else {
Expand Down

0 comments on commit 6744033

Please sign in to comment.