diff --git a/docs/apis/scala_shell.md b/docs/apis/scala_shell.md index 1d6ac262dcf49..8705b31714d0b 100644 --- a/docs/apis/scala_shell.md +++ b/docs/apis/scala_shell.md @@ -72,6 +72,18 @@ Scala-Flink> env.execute("MyProgram") The Flink Shell comes with command history and autocompletion. +## Scala Shell with Flink on YARN + +The Scala shell can connect Flink cluster on YARN. To connect deployed Flink cluster on YARN, use following command: + +~~~bash +bin/start-scala-shell.sh yarn +~~~ + +The shell reads the connection information of the deployed Flink cluster from the `.yarn-properties` file, which is created in the configured `yarn.properties-file.location` directory or the temporary directory. If there is no deployed Flink cluster on YARN, the shell prints an error message. + +The shell can deploy a Flink cluster to YARN, which is used exclusively by the shell. The number of YARN containers can be controlled by the parameter `-n `. The shell deploys a new Flink cluster on YARN and connects the cluster. You can also specify options for YARN cluster such as memory for JobManager, name of YARN application, etc.. + ## Adding external dependencies It is possible to add external classpaths to the Scala-shell. These will be sent to the Jobmanager automatically alongside your shell program, when calling execute. @@ -79,5 +91,5 @@ It is possible to add external classpaths to the Scala-shell. These will be sent Use the parameter `-a ` or `--addclasspath ` to load additional classes. ~~~bash -bin/start-scala-shell.sh [local | remote ] --addclasspath +bin/start-scala-shell.sh [local | remote | yarn] --addclasspath ~~~ diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 3f9014d129564..929e1b17c54fc 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -86,8 +86,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; import java.net.URL; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -174,11 +172,7 @@ public CliFrontend(String configDir) throws Exception { this.config = GlobalConfiguration.getConfiguration(); // load the YARN properties - String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); - String currentUser = System.getProperty("user.name"); - String propertiesFileLocation = config.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); - - File propertiesFile = new File(propertiesFileLocation, CliFrontend.YARN_PROPERTIES_FILE + currentUser); + File propertiesFile = new File(getYarnPropertiesLocation(config)); if (propertiesFile.exists()) { logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath()); @@ -213,7 +207,7 @@ public CliFrontend(String configDir) throws Exception { InetSocketAddress jobManagerAddress; if (address != null) { try { - jobManagerAddress = parseHostPortAddress(address); + jobManagerAddress = ClientUtils.parseHostPortAddress(address); // store address in config from where it is retrieved by the retrieval service writeJobManagerAddressToConfig(jobManagerAddress); } @@ -945,7 +939,7 @@ protected void writeJobManagerAddressToConfig(InetSocketAddress address) { */ protected void updateConfig(CommandLineOptions options) { if(options.getJobManagerAddress() != null){ - InetSocketAddress jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress()); + InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress()); writeJobManagerAddressToConfig(jobManagerAddress); } } @@ -1080,7 +1074,7 @@ protected Client getClient( } else { if(options.getJobManagerAddress() != null) { - jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress()); + jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress()); writeJobManagerAddressToConfig(jobManagerAddress); } } @@ -1250,28 +1244,6 @@ public static void main(String[] args) { // Miscellaneous Utilities // -------------------------------------------------------------------------------------------- - /** - * Parses a given host port address of the format URL:PORT and returns an {@link InetSocketAddress} - * - * @param hostAndPort host port string to be parsed - * @return InetSocketAddress object containing the parsed host port information - */ - private static InetSocketAddress parseHostPortAddress(String hostAndPort) { - // code taken from http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress - URI uri; - try { - uri = new URI("my://" + hostAndPort); - } catch (URISyntaxException e) { - throw new RuntimeException("Malformed address " + hostAndPort, e); - } - String host = uri.getHost(); - int port = uri.getPort(); - if (host == null || port == -1) { - throw new RuntimeException("Address is missing hostname or port " + hostAndPort); - } - return new InetSocketAddress(host, port); - } - public static String getConfigurationDirectoryFromEnv() { String location = System.getenv(ENV_CONFIG_DIRECTORY); @@ -1319,4 +1291,12 @@ public static Map getDynamicProperties(String dynamicPropertiesE return Collections.emptyMap(); } } + + public static String getYarnPropertiesLocation(Configuration conf) { + String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); + String currentUser = System.getProperty("user.name"); + String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); + + return propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE + currentUser; + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java new file mode 100644 index 0000000000000..f1ed93e3add40 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.client; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * A class that provides some utility methods + */ +public class ClientUtils { + /** + * Utility method that converts a string of the form "host:port" into an {@link InetSocketAddress}. + * The returned InetSocketAddress may be unresolved! + * + * @param hostport The "host:port" string. + * @return The converted InetSocketAddress. + */ + public static InetSocketAddress parseHostPortAddress(String hostport) { + // from http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress + URI uri; + try { + uri = new URI("my://" + hostport); + } catch (URISyntaxException e) { + throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.", e); + } + String host = uri.getHost(); + int port = uri.getPort(); + if (host == null || port == -1) { + throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'."); + } + return new InetSocketAddress(host, port); + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java index 14dc2893fde6b..a4686682a0728 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -24,7 +24,6 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; @@ -430,13 +429,9 @@ public int run(String[] args) { String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort(); System.out.println("Flink JobManager is now running on " + jobManagerAddress); System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL()); - // file that we write into the conf/ dir containing the jobManager address and the dop. - - String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); - String currentUser = System.getProperty("user.name"); - String propertiesFileLocation = yarnCluster.getFlinkConfiguration().getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); - File yarnPropertiesFile = new File(propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE + currentUser); + // file that we write into the conf/ dir containing the jobManager address and the dop. + File yarnPropertiesFile = new File(CliFrontend.getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration())); Properties yarnProps = new Properties(); yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress); diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java index bc0d220d0c3c2..ab70453b33779 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java @@ -19,8 +19,6 @@ package org.apache.flink.client; import java.net.InetSocketAddress; -import java.net.URI; -import java.net.URISyntaxException; import java.net.URL; import java.util.Collections; import java.util.List; @@ -75,7 +73,7 @@ public RemoteExecutor(String hostname, int port, URL jarFile) { } public RemoteExecutor(String hostport, URL jarFile) { - this(getInetFromHostport(hostport), new Configuration(), Collections.singletonList(jarFile), + this(ClientUtils.parseHostPortAddress(hostport), new Configuration(), Collections.singletonList(jarFile), Collections.emptyList()); } @@ -95,7 +93,7 @@ public RemoteExecutor(String hostname, int port, Configuration clientConfigurati } public RemoteExecutor(String hostport, Configuration clientConfiguration, URL jarFile) { - this(getInetFromHostport(hostport), clientConfiguration, + this(ClientUtils.parseHostPortAddress(hostport), clientConfiguration, Collections.singletonList(jarFile), Collections.emptyList()); } @@ -256,31 +254,4 @@ public void endSession(JobID jobID) throws Exception { } } } - - // -------------------------------------------------------------------------------------------- - // Utilities - // -------------------------------------------------------------------------------------------- - - /** - * Utility method that converts a string of the form "host:port" into an {@link InetSocketAddress}. - * The returned InetSocketAddress may be unresolved! - * - * @param hostport The "host:port" string. - * @return The converted InetSocketAddress. - */ - private static InetSocketAddress getInetFromHostport(String hostport) { - // from http://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress - URI uri; - try { - uri = new URI("my://" + hostport); - } catch (URISyntaxException e) { - throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.", e); - } - String host = uri.getHost(); - int port = uri.getPort(); - if (host == null || port == -1) { - throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'."); - } - return new InetSocketAddress(host, port); - } } diff --git a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java index a3369574f4958..52cbfe64750e9 100644 --- a/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java +++ b/flink-scala-shell/src/main/java/org/apache/flink/api/java/ScalaShellRemoteEnvironment.java @@ -48,9 +48,13 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment { * @param host The host name or address of the master (JobManager), where the program should be executed. * @param port The port of the master (JobManager), where the program should be executed. * @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called. + * @param clientConfig The configuration used by the client that connects to the cluster. + * @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses + * user-defined functions, user-defined input formats, or any libraries, those must be + * provided in the JAR files. */ - public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) { - super(host, port, null, jarFiles, null); + public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, Configuration clientConfig, String... jarFiles) { + super(host, port, clientConfig, jarFiles, null); this.flinkILoop = flinkILoop; } diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala index bcc9ef39df060..07d30ac24cd7d 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkILoop.scala @@ -21,6 +21,7 @@ package org.apache.flink.api.scala import java.io.{BufferedReader, File, FileOutputStream} import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment} +import org.apache.flink.configuration.Configuration import org.apache.flink.util.AbstractID import scala.tools.nsc.interpreter._ @@ -29,25 +30,37 @@ import scala.tools.nsc.interpreter._ class FlinkILoop( val host: String, val port: Int, + val clientConfig: Configuration, val externalJars: Option[Array[String]], in0: Option[BufferedReader], out0: JPrintWriter) extends ILoopCompat(in0, out0) { - def this(host: String, - port: Int, - externalJars: Option[Array[String]], - in0: BufferedReader, - out: JPrintWriter){ - this(host: String, port: Int, externalJars, Some(in0), out) + def this( + host: String, + port: Int, + clientConfig: Configuration, + externalJars: Option[Array[String]], + in0: BufferedReader, + out: JPrintWriter) { + this(host, port, clientConfig, externalJars, Some(in0), out) } - def this(host: String, port: Int, externalJars: Option[Array[String]]){ - this(host: String, port: Int, externalJars, None, new JPrintWriter(Console.out, true)) + def this( + host: String, + port: Int, + clientConfig: Configuration, + externalJars: Option[Array[String]]) { + this(host, port, clientConfig, externalJars, None, new JPrintWriter(Console.out, true)) } - def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){ - this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter) + def this( + host: String, + port: Int, + clientConfig: Configuration, + in0: BufferedReader, + out: JPrintWriter){ + this(host, port, clientConfig, None, in0, out) } // remote environment @@ -56,7 +69,7 @@ class FlinkILoop( ScalaShellRemoteEnvironment.resetContextEnvironments() // create our environment that submits against the cluster (local or remote) - val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this) + val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this, clientConfig) // prevent further instantiation of environments ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments() diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala index eb7f816ef3810..6937e1bc9370a 100644 --- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala +++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala @@ -18,142 +18,269 @@ package org.apache.flink.api.scala -import java.io.{StringWriter, BufferedReader} +import java.io._ +import java.util.Properties -import org.apache.flink.api.common.ExecutionMode - -import org.apache.flink.configuration.{ConfigConstants, Configuration} -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster +import org.apache.flink.client.{CliFrontend, ClientUtils, FlinkYarnSessionCli} +import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} +import org.apache.flink.runtime.minicluster.{FlinkMiniCluster, LocalFlinkMiniCluster} +import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster +import org.apache.hadoop.fs.Path import scala.tools.nsc.Settings - import scala.tools.nsc.interpreter._ - object FlinkShell { object ExecutionMode extends Enumeration { - val UNDEFINED, LOCAL, REMOTE = Value + val UNDEFINED, LOCAL, REMOTE, YARN = Value } - var bufferedReader: Option[BufferedReader] = None + /** Configuration object */ + case class Config( + host: Option[String] = None, + port: Option[Int] = None, + externalJars: Option[Array[String]] = None, + executionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED, + yarnConfig: Option[YarnConfig] = None + ) - def main(args: Array[String]) { + /** YARN configuration object */ + case class YarnConfig( + containers: Option[Int] = None, + jobManagerMemory: Option[Int] = None, + name: Option[String] = None, + queue: Option[String] = None, + slots: Option[Int] = None, + taskManagerMemory: Option[Int] = None + ) - // scopt, command line arguments - case class Config( - port: Int = -1, - host: String = "none", - externalJars: Option[Array[String]] = None, - flinkShellExecutionMode: ExecutionMode.Value = ExecutionMode.UNDEFINED) + /** Buffered reader to substitute input in test */ + var bufferedReader: Option[BufferedReader] = None + def main(args: Array[String]) { val parser = new scopt.OptionParser[Config]("start-scala-shell.sh") { - head ("Flink Scala Shell") + head("Flink Scala Shell") cmd("local") action { - (_, c) => c.copy(host = "none", port = -1, flinkShellExecutionMode = ExecutionMode.LOCAL) - } text("starts Flink scala shell with a local Flink cluster\n") children( + (_, c) => c.copy(executionMode = ExecutionMode.LOCAL) + } text("Starts Flink scala shell with a local Flink cluster") children( opt[(String)] ("addclasspath") abbr("a") valueName("") action { case (x, c) => val xArray = x.split(":") c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink\n") + } text("Specifies additional jars to be used in Flink") ) cmd("remote") action { (_, c) => - c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE) - } text("starts Flink scala shell connecting to a remote cluster\n") children( + c.copy(executionMode = ExecutionMode.REMOTE) + } text("Starts Flink scala shell connecting to a remote cluster") children( arg[String]("") action { (h, c) => - c.copy(host = h) } - text("remote host name as string"), + c.copy(host = Some(h)) } + text("Remote host name as string"), arg[Int]("") action { (p, c) => - c.copy(port = p) } - text("remote port as integer\n"), + c.copy(port = Some(p)) } + text("Remote port as integer\n"), opt[(String)]("addclasspath") abbr("a") valueName("") action { case (x, c) => val xArray = x.split(":") c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink") + } text ("Specifies additional jars to be used in Flink") + ) + + cmd("yarn") action { + (_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = None) + } text ("Starts Flink scala shell connecting to a yarn cluster") children( + opt[Int]("container") abbr ("n") valueName ("arg") action { + (x, c) => + c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = Some(x)))) + } text ("Number of YARN container to allocate (= Number of TaskManagers)"), + opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action { + (x, c) => + c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x)))) + } text ("Memory for JobManager container [in MB]"), + opt[String]("name") abbr ("nm") action { + (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name = Some(x)))) + } text ("Set a custom name for the application on YARN"), + opt[String]("queue") abbr ("qu") valueName ("") action { + (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(queue = Some(x)))) + } text ("Specifies YARN queue"), + opt[Int]("slots") abbr ("s") valueName ("") action { + (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(slots = Some(x)))) + } text ("Number of slots per TaskManager"), + opt[Int]("taskManagerMemory") abbr ("tm") valueName ("") action { + (x, c) => + c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x)))) + } text ("Memory per TaskManager container [in MB]"), + opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x, c) => + val xArray = x.split(":") + c.copy(externalJars = Option(xArray)) + } text("Specifies additional jars to be used in Flink") ) - help("help") abbr("h") text("prints this usage text\n") + + help("help") abbr ("h") text ("Prints this usage text") } // parse arguments - parser.parse (args, Config()) match { - case Some(config) => - startShell(config.host, - config.port, - config.flinkShellExecutionMode, - config.externalJars) - - case _ => System.out.println("Could not parse program arguments") + parser.parse(args, Config()) match { + case Some(config) => startShell(config) + case _ => println("Could not parse program arguments") + } + } + + def fetchConnectionInfo( + config: Config + ): (String, Int, Option[Either[FlinkMiniCluster, AbstractFlinkYarnCluster]]) = { + config.executionMode match { + case ExecutionMode.LOCAL => // Local mode + val config = new Configuration() + config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) + + val miniCluster = new LocalFlinkMiniCluster(config, false) + miniCluster.start() + + println("\nStarting local Flink cluster (host: localhost, " + + s"port: ${miniCluster.getLeaderRPCPort}).\n") + ("localhost", miniCluster.getLeaderRPCPort, Some(Left(miniCluster))) + + case ExecutionMode.REMOTE => // Remote mode + if (config.host.isEmpty || config.port.isEmpty) { + throw new IllegalArgumentException(" or is not specified!") + } + (config.host.get, config.port.get, None) + + case ExecutionMode.YARN => // YARN mode + config.yarnConfig match { + case Some(yarnConfig) => // if there is information for new cluster + deployNewYarnCluster(yarnConfig) + case None => // there is no information for new cluster. Then we use yarn properties. + fetchDeployedYarnClusterInfo() + } + + case ExecutionMode.UNDEFINED => // Wrong input + throw new IllegalArgumentException("please specify execution mode:\n" + + "[local | remote | yarn]") } } + def startShell(config: Config): Unit = { + println("Starting Flink Shell:") - def startShell( - userHost: String, - userPort: Int, - executionMode: ExecutionMode.Value, - externalJars: Option[Array[String]] = None): Unit ={ - - System.out.println("Starting Flink Shell:") - - // either port or userhost not specified by user, create new minicluster - val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) = - executionMode match { - case ExecutionMode.LOCAL => - val config = new Configuration() - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) - val miniCluster = new LocalFlinkMiniCluster(config, false) - miniCluster.start() - val port = miniCluster.getLeaderRPCPort - System.out.println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") - ("localhost", port, Some(miniCluster)) - - case ExecutionMode.REMOTE => - if (userHost == "none" || userPort == -1) { - System.out.println("Error: or not specified!") - return - } else { - System.out.println( - s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") - (userHost, userPort, None) - } - - case ExecutionMode.UNDEFINED => - System.out.println("Error: please specify execution mode:") - System.out.println("[local | remote ]") - return + val (repl, cluster) = try { + val (host, port, cluster) = fetchConnectionInfo(config) + val conf = cluster match { + case Some(Left(miniCluster)) => miniCluster.userConfiguration + case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration + case None => GlobalConfiguration.getConfiguration } - var repl: Option[FlinkILoop] = None + println(s"\nConnecting to Flink cluster (host: $host, port: $port).\n") + val repl = bufferedReader match { + case Some(reader) => + val out = new StringWriter() + new FlinkILoop(host, port, conf, config.externalJars, reader, new JPrintWriter(out)) + case None => + new FlinkILoop(host, port, conf, config.externalJars) + } + + (repl, cluster) + } catch { + case e: IllegalArgumentException => + println(s"Error: ${e.getMessage}") + sys.exit() + } + + val settings = new Settings() + settings.usejavacp.value = true + settings.Yreplsync.value = true try { - // custom shell - repl = Some( - bufferedReader match { + repl.process(settings) + } finally { + repl.closeInterpreter() + cluster match { + case Some(Left(miniCluster)) => miniCluster.stop() + case Some(Right(yarnCluster)) => yarnCluster.shutdown(false) + case _ => + } + } - case Some(br) => - val out = new StringWriter() - new FlinkILoop(host, port, externalJars, bufferedReader, new JPrintWriter(out)) + println(" good bye ..") + } - case None => - new FlinkILoop(host, port, externalJars) - }) + def deployNewYarnCluster(yarnConfig: YarnConfig) = { + val yarnClient = FlinkYarnSessionCli.getFlinkYarnClient - val settings = new Settings() + // use flink-dist.jar for scala shell + val jarPath = new Path("file://" + + s"${yarnClient.getClass.getProtectionDomain.getCodeSource.getLocation.getPath}") + yarnClient.setLocalJarPath(jarPath) - settings.usejavacp.value = true - settings.Yreplsync.value = true + // load configuration + val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv + val flinkConfiguration = GlobalConfiguration.getConfiguration + val confFile = new File(confDirPath + File.separator + "flink-conf.yaml") + val confPath = new Path(confFile.getAbsolutePath) + GlobalConfiguration.loadConfiguration(confDirPath) + yarnClient.setFlinkConfigurationObject(flinkConfiguration) + yarnClient.setConfigurationDirectory(confDirPath) + yarnClient.setConfigurationFilePath(confPath) - // start scala interpreter shell - repl.foreach(_.process(settings)) - } finally { - repl.foreach(_.closeInterpreter()) - cluster.foreach(_.stop()) + // number of task managers is required. + yarnConfig.containers match { + case Some(containers) => yarnClient.setTaskManagerCount(containers) + case None => + throw new IllegalArgumentException("Number of taskmanagers must be specified.") + } + + // set configuration from user input + yarnConfig.jobManagerMemory.foreach(yarnClient.setJobManagerMemory) + yarnConfig.name.foreach(yarnClient.setName) + yarnConfig.queue.foreach(yarnClient.setQueue) + yarnConfig.slots.foreach(yarnClient.setTaskManagerSlots) + yarnConfig.taskManagerMemory.foreach(yarnClient.setTaskManagerMemory) + + // deploy + val cluster = yarnClient.deploy() + val address = cluster.getJobManagerAddress.getAddress.getHostAddress + val port = cluster.getJobManagerAddress.getPort + cluster.connectToCluster() + + (address, port, Some(Right(cluster))) + } + + def fetchDeployedYarnClusterInfo() = { + // load configuration + val globalConfig = GlobalConfiguration.getConfiguration + val propertiesLocation = CliFrontend.getYarnPropertiesLocation(globalConfig) + val propertiesFile = new File(propertiesLocation) + + // read properties + val properties = if (propertiesFile.exists()) { + println("Found YARN properties file " + propertiesFile.getAbsolutePath) + val properties = new Properties() + val inputStream = new FileInputStream(propertiesFile) + + try { + properties.load(inputStream) + } finally { + inputStream.close() + } + + properties + } else { + throw new IllegalArgumentException("Scala Shell cannot fetch YARN properties.") } - System.out.println(" good bye ..") + val addressInStr = properties.getProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY) + val address = ClientUtils.parseHostPortAddress(addressInStr) + + (address.getHostString, address.getPort, None) + } + + def ensureYarnConfig(config: Config) = config.yarnConfig match { + case Some(yarnConfig) => yarnConfig + case None => YarnConfig() } } diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala index 1bf8cfe92e7cb..683ec63aae435 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala @@ -21,6 +21,7 @@ package org.apache.flink.api.scala import java.io._ import java.util.concurrent.TimeUnit +import org.apache.flink.configuration.GlobalConfiguration import org.apache.flink.test.util.{ForkableFlinkMiniCluster, TestBaseUtils} import org.apache.flink.util.TestLogger import org.junit.{AfterClass, BeforeClass, Test, Assert} @@ -297,11 +298,13 @@ object ScalaShellITCase { val repl = externalJars match { case Some(ej) => new FlinkILoop( host, port, + GlobalConfiguration.getConfiguration, Option(Array(ej)), in, new PrintWriter(out)) case None => new FlinkILoop( host, port, + GlobalConfiguration.getConfiguration, in, new PrintWriter(out)) } diff --git a/flink-scala-shell/start-script/start-scala-shell.sh b/flink-scala-shell/start-script/start-scala-shell.sh index fd85897524e1d..033d505032229 100644 --- a/flink-scala-shell/start-script/start-scala-shell.sh +++ b/flink-scala-shell/start-script/start-scala-shell.sh @@ -75,11 +75,20 @@ do fi done +log_setting="" + +if [[ $1 = "yarn" ]] +then +FLINK_CLASSPATH=$FLINK_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR +log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-scala-shell-yarn-$HOSTNAME.log +log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml" +fi + if ${EXTERNAL_LIB_FOUND} then - java -Dscala.color -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH" + java -Dscala.color -cp "$FLINK_CLASSPATH" $log_setting org.apache.flink.api.scala.FlinkShell $@ --addclasspath "$EXT_CLASSPATH" else - java -Dscala.color -cp "$FLINK_CLASSPATH" org.apache.flink.api.scala.FlinkShell $@ + java -Dscala.color -cp "$FLINK_CLASSPATH" $log_setting org.apache.flink.api.scala.FlinkShell $@ fi #restore echo