Skip to content

Commit

Permalink
[hotfix] Decouple ScalaShellRemoteEnvironment from RemoteEnvironment
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Dec 3, 2019
1 parent 146c579 commit 2843bd4
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@
public class RemoteEnvironment extends ExecutionEnvironment {

/** The hostname of the JobManager. */
protected final String host;
private final String host;

/** The port of the JobManager main actor system. */
protected final int port;
private final int port;

/** The jar files that need to be attached to each job. */
protected final List<URL> jarFiles;
private final List<URL> jarFiles;

/** The configuration used by the client that connects to the cluster. */
protected Configuration clientConfiguration;
private Configuration clientConfiguration;

/** The classpaths that need to be attached to each job. */
protected final List<URL> globalClasspaths;
private final List<URL> globalClasspaths;

/**
* Creates a new RemoteEnvironment that points to the master (JobManager) described by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,42 @@
* limitations under the License.
*/

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;

import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Special version of {@link org.apache.flink.api.java.RemoteEnvironment} that has a reference
* Special version of {@link org.apache.flink.api.java.ExecutionEnvironment} that has a reference
* to a {@link org.apache.flink.api.scala.FlinkILoop}. When execute is called this will
* use the reference of the ILoop to write the compiled classes of the current session to
* a Jar file and submit these with the program.
*/
public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
public class ScalaShellRemoteEnvironment extends ExecutionEnvironment {

// reference to Scala Shell, for access to virtual directory
private FlinkILoop flinkILoop;
/** The hostname of the JobManager. */
private final String host;

/** The port of the JobManager main actor system. */
private final int port;

/** The jar files that need to be attached to each job. */
private final List<URL> jarFiles;

/** The configuration used by the client that connects to the cluster. */
private Configuration clientConfiguration;

/** reference to Scala Shell, for access to virtual directory. */
private final FlinkILoop flinkILoop;

/**
* Creates new ScalaShellRemoteEnvironment that has a reference to the FlinkILoop.
Expand All @@ -52,8 +67,36 @@ public class ScalaShellRemoteEnvironment extends RemoteEnvironment {
* user-defined functions, user-defined input formats, or any libraries, those must be
* provided in the JAR files.
*/
public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, Configuration clientConfig, String... jarFiles) throws Exception {
super(host, port, clientConfig, jarFiles, null);
public ScalaShellRemoteEnvironment(String host, int port, FlinkILoop flinkILoop, Configuration clientConfig, String... jarFiles) {
if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The RemoteEnvironment cannot be instantiated when running in a pre-defined context " +
"(such as Command Line Client, Scala Shell, or TestEnvironment)");
}
if (host == null) {
throw new NullPointerException("Host must not be null.");
}
if (port < 1 || port >= 0xffff) {
throw new IllegalArgumentException("Port out of range");
}

this.host = host;
this.port = port;
this.clientConfiguration = clientConfig == null ? new Configuration() : clientConfig;
if (jarFiles != null) {
this.jarFiles = new ArrayList<>(jarFiles.length);
for (String jarFile : jarFiles) {
try {
this.jarFiles.add(new File(jarFile).getAbsoluteFile().toURI().toURL());
} catch (MalformedURLException e) {
throw new IllegalArgumentException("JAR file path invalid", e);
}
}
}
else {
this.jarFiles = Collections.emptyList();
}

this.flinkILoop = flinkILoop;
}

Expand All @@ -63,7 +106,7 @@ public JobExecutionResult execute(String jobName) throws Exception {
final List<URL> allJarFiles = getUpdatedJarFiles();

final PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration);
lastJobExecutionResult = executor.executePlan(p, allJarFiles, globalClasspaths);
lastJobExecutionResult = executor.executePlan(p, allJarFiles, Collections.emptyList());
return lastJobExecutionResult;
}

Expand Down

0 comments on commit 2843bd4

Please sign in to comment.