Skip to content

Commit

Permalink
[FLINK-2935] [scala-shell] Allow Scala shell to connect Flink cluster…
Browse files Browse the repository at this point in the history
… on YARN

  - Remove duplicated parseHostPortAddress method (Move it to ClientUtils class)
  - Refactor FlinkShell class

This closes apache#1500.
  • Loading branch information
chiwanpark committed Mar 24, 2016
1 parent c77e5ec commit 5108f68
Show file tree
Hide file tree
Showing 10 changed files with 339 additions and 175 deletions.
14 changes: 13 additions & 1 deletion docs/apis/scala_shell.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,24 @@ Scala-Flink> env.execute("MyProgram")

The Flink Shell comes with command history and autocompletion.

## Scala Shell with Flink on YARN

The Scala shell can connect Flink cluster on YARN. To connect deployed Flink cluster on YARN, use following command:

~~~bash
bin/start-scala-shell.sh yarn
~~~

The shell reads the connection information of the deployed Flink cluster from the `.yarn-properties` file, which is created in the configured `yarn.properties-file.location` directory or the temporary directory. If there is no deployed Flink cluster on YARN, the shell prints an error message.

The shell can deploy a Flink cluster to YARN, which is used exclusively by the shell. The number of YARN containers can be controlled by the parameter `-n <arg>`. The shell deploys a new Flink cluster on YARN and connects the cluster. You can also specify options for YARN cluster such as memory for JobManager, name of YARN application, etc..

## Adding external dependencies

It is possible to add external classpaths to the Scala-shell. These will be sent to the Jobmanager automatically alongside your shell program, when calling execute.

Use the parameter `-a <path/to/jar.jar>` or `--addclasspath <path/to/jar.jar>` to load additional classes.

~~~bash
bin/start-scala-shell.sh [local | remote <host> <port>] --addclasspath <path/to/jar.jar>
bin/start-scala-shell.sh [local | remote <host> <port> | yarn] --addclasspath <path/to/jar.jar>
~~~
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
Expand Down Expand Up @@ -174,11 +172,7 @@ public CliFrontend(String configDir) throws Exception {
this.config = GlobalConfiguration.getConfiguration();

// load the YARN properties
String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
String currentUser = System.getProperty("user.name");
String propertiesFileLocation = config.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);

File propertiesFile = new File(propertiesFileLocation, CliFrontend.YARN_PROPERTIES_FILE + currentUser);
File propertiesFile = new File(getYarnPropertiesLocation(config));
if (propertiesFile.exists()) {

logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
Expand Down Expand Up @@ -213,7 +207,7 @@ public CliFrontend(String configDir) throws Exception {
InetSocketAddress jobManagerAddress;
if (address != null) {
try {
jobManagerAddress = parseHostPortAddress(address);
jobManagerAddress = ClientUtils.parseHostPortAddress(address);
// store address in config from where it is retrieved by the retrieval service
writeJobManagerAddressToConfig(jobManagerAddress);
}
Expand Down Expand Up @@ -945,7 +939,7 @@ protected void writeJobManagerAddressToConfig(InetSocketAddress address) {
*/
protected void updateConfig(CommandLineOptions options) {
if(options.getJobManagerAddress() != null){
InetSocketAddress jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
InetSocketAddress jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
writeJobManagerAddressToConfig(jobManagerAddress);
}
}
Expand Down Expand Up @@ -1080,7 +1074,7 @@ protected Client getClient(
}
else {
if(options.getJobManagerAddress() != null) {
jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
jobManagerAddress = ClientUtils.parseHostPortAddress(options.getJobManagerAddress());
writeJobManagerAddressToConfig(jobManagerAddress);
}
}
Expand Down Expand Up @@ -1250,28 +1244,6 @@ public static void main(String[] args) {
// Miscellaneous Utilities
// --------------------------------------------------------------------------------------------

/**
* Parses a given host port address of the format URL:PORT and returns an {@link InetSocketAddress}
*
* @param hostAndPort host port string to be parsed
* @return InetSocketAddress object containing the parsed host port information
*/
private static InetSocketAddress parseHostPortAddress(String hostAndPort) {
// code taken from http:https://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
URI uri;
try {
uri = new URI("my:https://" + hostAndPort);
} catch (URISyntaxException e) {
throw new RuntimeException("Malformed address " + hostAndPort, e);
}
String host = uri.getHost();
int port = uri.getPort();
if (host == null || port == -1) {
throw new RuntimeException("Address is missing hostname or port " + hostAndPort);
}
return new InetSocketAddress(host, port);
}

public static String getConfigurationDirectoryFromEnv() {
String location = System.getenv(ENV_CONFIG_DIRECTORY);

Expand Down Expand Up @@ -1319,4 +1291,12 @@ public static Map<String, String> getDynamicProperties(String dynamicPropertiesE
return Collections.emptyMap();
}
}

public static String getYarnPropertiesLocation(Configuration conf) {
String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
String currentUser = System.getProperty("user.name");
String propertiesFileLocation = conf.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);

return propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE + currentUser;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;

/**
* A class that provides some utility methods
*/
public class ClientUtils {
/**
* Utility method that converts a string of the form "host:port" into an {@link InetSocketAddress}.
* The returned InetSocketAddress may be unresolved!
*
* @param hostport The "host:port" string.
* @return The converted InetSocketAddress.
*/
public static InetSocketAddress parseHostPortAddress(String hostport) {
// from http:https://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
URI uri;
try {
uri = new URI("my:https://" + hostport);
} catch (URISyntaxException e) {
throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.", e);
}
String host = uri.getHost();
int port = uri.getPort();
if (host == null || port == -1) {
throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.");
}
return new InetSocketAddress(host, port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
Expand Down Expand Up @@ -430,13 +429,9 @@ public int run(String[] args) {
String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort();
System.out.println("Flink JobManager is now running on " + jobManagerAddress);
System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
// file that we write into the conf/ dir containing the jobManager address and the dop.

String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
String currentUser = System.getProperty("user.name");
String propertiesFileLocation = yarnCluster.getFlinkConfiguration().getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);

File yarnPropertiesFile = new File(propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE + currentUser);
// file that we write into the conf/ dir containing the jobManager address and the dop.
File yarnPropertiesFile = new File(CliFrontend.getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()));

Properties yarnProps = new Properties();
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.flink.client;

import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -75,7 +73,7 @@ public RemoteExecutor(String hostname, int port, URL jarFile) {
}

public RemoteExecutor(String hostport, URL jarFile) {
this(getInetFromHostport(hostport), new Configuration(), Collections.singletonList(jarFile),
this(ClientUtils.parseHostPortAddress(hostport), new Configuration(), Collections.singletonList(jarFile),
Collections.<URL>emptyList());
}

Expand All @@ -95,7 +93,7 @@ public RemoteExecutor(String hostname, int port, Configuration clientConfigurati
}

public RemoteExecutor(String hostport, Configuration clientConfiguration, URL jarFile) {
this(getInetFromHostport(hostport), clientConfiguration,
this(ClientUtils.parseHostPortAddress(hostport), clientConfiguration,
Collections.singletonList(jarFile), Collections.<URL>emptyList());
}

Expand Down Expand Up @@ -256,31 +254,4 @@ public void endSession(JobID jobID) throws Exception {
}
}
}

// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------

/**
* Utility method that converts a string of the form "host:port" into an {@link InetSocketAddress}.
* The returned InetSocketAddress may be unresolved!
*
* @param hostport The "host:port" string.
* @return The converted InetSocketAddress.
*/
private static InetSocketAddress getInetFromHostport(String hostport) {
// from http:https://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
URI uri;
try {
uri = new URI("my:https://" + hostport);
} catch (URISyntaxException e) {
throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.", e);
}
String host = uri.getHost();
int port = uri.getPort();
if (host == null || port == -1) {
throw new RuntimeException("Could not identify hostname and port in '" + hostport + "'.");
}
return new InetSocketAddress(host, port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
* @param host The host name or address of the master (JobManager), where the program should be executed.
* @param port The port of the master (JobManager), where the program should be executed.
* @param flinkILoop The flink Iloop instance from which the ScalaShellRemoteEnvironment is called.
* @param clientConfig The configuration used by the client that connects to the cluster.
* @param jarFiles The JAR files with code that needs to be shipped to the cluster. If the program uses
* user-defined functions, user-defined input formats, or any libraries, those must be
* provided in the JAR files.
*/
public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, String... jarFiles) {
super(host, port, null, jarFiles, null);
public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, Configuration clientConfig, String... jarFiles) {
super(host, port, clientConfig, jarFiles, null);
this.flinkILoop = flinkILoop;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.flink.api.scala
import java.io.{BufferedReader, File, FileOutputStream}

import org.apache.flink.api.java.{JarHelper, ScalaShellRemoteEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.AbstractID

import scala.tools.nsc.interpreter._
Expand All @@ -29,25 +30,37 @@ import scala.tools.nsc.interpreter._
class FlinkILoop(
val host: String,
val port: Int,
val clientConfig: Configuration,
val externalJars: Option[Array[String]],
in0: Option[BufferedReader],
out0: JPrintWriter)
extends ILoopCompat(in0, out0) {

def this(host: String,
port: Int,
externalJars: Option[Array[String]],
in0: BufferedReader,
out: JPrintWriter){
this(host: String, port: Int, externalJars, Some(in0), out)
def this(
host: String,
port: Int,
clientConfig: Configuration,
externalJars: Option[Array[String]],
in0: BufferedReader,
out: JPrintWriter) {
this(host, port, clientConfig, externalJars, Some(in0), out)
}

def this(host: String, port: Int, externalJars: Option[Array[String]]){
this(host: String, port: Int, externalJars, None, new JPrintWriter(Console.out, true))
def this(
host: String,
port: Int,
clientConfig: Configuration,
externalJars: Option[Array[String]]) {
this(host, port, clientConfig, externalJars, None, new JPrintWriter(Console.out, true))
}

def this(host: String, port: Int, in0: BufferedReader, out: JPrintWriter){
this(host: String, port: Int, None, in0: BufferedReader, out: JPrintWriter)
def this(
host: String,
port: Int,
clientConfig: Configuration,
in0: BufferedReader,
out: JPrintWriter){
this(host, port, clientConfig, None, in0, out)
}

// remote environment
Expand All @@ -56,7 +69,7 @@ class FlinkILoop(
ScalaShellRemoteEnvironment.resetContextEnvironments()

// create our environment that submits against the cluster (local or remote)
val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this)
val remoteEnv = new ScalaShellRemoteEnvironment(host, port, this, clientConfig)

// prevent further instantiation of environments
ScalaShellRemoteEnvironment.disableAllContextAndOtherEnvironments()
Expand Down
Loading

0 comments on commit 5108f68

Please sign in to comment.