Skip to content

Commit

Permalink
[hotfix] Decouple ScalaShellRemoteStreamEnvironment from RemoteStream…
Browse files Browse the repository at this point in the history
…Environment
  • Loading branch information
kl0u committed Dec 3, 2019
1 parent 2843bd4 commit 814a9fa
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,44 @@

package org.apache.flink.api.java;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.JarUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* A {@link RemoteStreamEnvironment} for the Scala shell.
* A {@link StreamExecutionEnvironment} for the Scala shell.
*/
public class ScalaShellRemoteStreamEnvironment extends RemoteStreamEnvironment {
private static final Logger LOG = LoggerFactory.getLogger(ScalaShellRemoteStreamEnvironment.class);
public class ScalaShellRemoteStreamEnvironment extends StreamExecutionEnvironment {

/** The hostname of the JobManager. */
private final String host;

/** The port of the JobManager main actor system. */
private final int port;

/** The configuration used to parametrize the client that connects to the remote cluster. */
private final Configuration clientConfiguration;

/** The jar files that need to be attached to each job. */
private final List<URL> jarFiles;

// reference to Scala Shell, for access to virtual directory
private FlinkILoop flinkILoop;
/** Reference to Scala Shell, for access to virtual directory. */
private final FlinkILoop flinkILoop;

/**
* Creates a new RemoteStreamEnvironment that points to the master
Expand All @@ -64,7 +77,34 @@ public ScalaShellRemoteStreamEnvironment(
Configuration configuration,
String... jarFiles) {

super(host, port, configuration, jarFiles);
if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
throw new InvalidProgramException(
"The RemoteEnvironment cannot be used when submitting a program through a client, " +
"or running in a TestEnvironment context.");
}

if (host == null) {
throw new NullPointerException("Host must not be null.");
}
if (port < 1 || port >= 0xffff) {
throw new IllegalArgumentException("Port out of range");
}

this.host = host;
this.port = port;
this.clientConfiguration = configuration == null ? new Configuration() : configuration;
this.jarFiles = new ArrayList<>(jarFiles.length);
for (String jarFile : jarFiles) {
try {
URL jarFileUrl = new File(jarFile).getAbsoluteFile().toURI().toURL();
this.jarFiles.add(jarFileUrl);
JarUtils.checkJarFile(jarFileUrl);
} catch (MalformedURLException e) {
throw new IllegalArgumentException("JAR file path is invalid '" + jarFile + "'", e);
} catch (IOException e) {
throw new RuntimeException("Problem with jar file " + jarFile, e);
}
}
this.flinkILoop = flinkILoop;
}

Expand All @@ -73,25 +113,38 @@ public ScalaShellRemoteStreamEnvironment(
*
* @param streamGraph
* Stream Graph to execute
* @param jarFiles
* List of jar file URLs to ship to the cluster
* @return The result of the job execution, containing elapsed time and accumulators.
*/
@Override
protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL> jarFiles) throws ProgramInvocationException {
URL jarUrl;
public JobExecutionResult execute(StreamGraph streamGraph) throws ProgramInvocationException {
try {
jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
} catch (MalformedURLException e) {
final List<URL> allJarFiles = getUpdatedJars(jarFiles);
final PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, clientConfiguration);
return executor.executePlan(streamGraph, allJarFiles, Collections.emptyList()).getJobExecutionResult();
}
catch (ProgramInvocationException e) {
throw e;
}
catch (MalformedURLException e) {
throw new ProgramInvocationException("Could not write the user code classes to disk.",
streamGraph.getJobGraph().getJobID(), e);
streamGraph.getJobGraph().getJobID(), e);
}
catch (Exception e) {
String term = e.getMessage() == null ? "." : (": " + e.getMessage());
throw new ProgramInvocationException("The program execution failed" + term, e);
}
}

List<URL> allJarFiles = new ArrayList<>(jarFiles.size() + 1);
private List<URL> getUpdatedJars(List<URL> jarFiles) throws MalformedURLException {
final URL jarUrl = flinkILoop.writeFilesToDisk().getAbsoluteFile().toURI().toURL();
final List<URL> allJarFiles = new ArrayList<>(jarFiles.size() + 1);
allJarFiles.addAll(jarFiles);
allJarFiles.add(jarUrl);
return allJarFiles;
}

return super.executeRemotely(streamGraph, allJarFiles);
public Configuration getClientConfiguration() {
return clientConfiguration;
}

public void setAsContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.scala.FlinkILoop;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -106,9 +105,9 @@ public void testConfigurationForwardingStreamEnvironment() {

StreamExecutionEnvironment streamEnv = flinkILoop.scalaSenv().getJavaEnv();

assertTrue(streamEnv instanceof RemoteStreamEnvironment);
assertTrue(streamEnv instanceof ScalaShellRemoteStreamEnvironment);

RemoteStreamEnvironment remoteStreamEnv = (RemoteStreamEnvironment) streamEnv;
ScalaShellRemoteStreamEnvironment remoteStreamEnv = (ScalaShellRemoteStreamEnvironment) streamEnv;

Configuration forwardedConfiguration = remoteStreamEnv.getClientConfiguration();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ public int getPort() {
return port;
}

/** @deprecated This method is going to be removed in the next releases. */
@Deprecated
public Configuration getClientConfiguration() {
return clientConfiguration;
}
Expand Down

0 comments on commit 814a9fa

Please sign in to comment.