Skip to content

Commit

Permalink
[FLINK-15090][api] Reverse the dependency from flink-streaming-java t…
Browse files Browse the repository at this point in the history
…o flink-clients

This closes apache#10526 .
  • Loading branch information
tisonkun authored and aljoscha committed Mar 12, 2020
1 parent 9504252 commit 0523ef6
Show file tree
Hide file tree
Showing 17 changed files with 243 additions and 298 deletions.
6 changes: 6 additions & 0 deletions flink-clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ under the License.
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<!-- More information on this:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.ContextEnvironmentFactory;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StreamContextEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
Expand Down Expand Up @@ -128,16 +128,21 @@ public static void executeProgram(

LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));

ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
executorServiceLoader,
configuration,
userCodeClassLoader);
ContextEnvironment.setAsContext(factory);
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader);

StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader);

try {
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetContext();
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,45 +54,19 @@ public static String translateToJSONExecutionPlan(Pipeline pipeline) {
}

private static FlinkPipelineTranslator getPipelineTranslator(Pipeline pipeline) {
PlanTranslator planToJobGraphTransmogrifier = new PlanTranslator();
PlanTranslator planTranslator = new PlanTranslator();

if (planToJobGraphTransmogrifier.canTranslate(pipeline)) {
return planToJobGraphTransmogrifier;
if (planTranslator.canTranslate(pipeline)) {
return planTranslator;
}

FlinkPipelineTranslator streamGraphTranslator = reflectStreamGraphTranslator();
StreamGraphTranslator streamGraphTranslator = new StreamGraphTranslator();

if (!streamGraphTranslator.canTranslate(pipeline)) {
throw new RuntimeException("Translator " + streamGraphTranslator + " cannot translate "
+ "the given pipeline " + pipeline + ".");
if (streamGraphTranslator.canTranslate(pipeline)) {
return streamGraphTranslator;
}
return streamGraphTranslator;
}

private static FlinkPipelineTranslator reflectStreamGraphTranslator() {
// Try our luck with StreamGraph translation. We have to load a StreamGraphTranslator
// via reflection because the dependencies of flink-streaming-java are inverted compared
// to flink-java. For flink-java does not depend on runtime, clients or optimizer and
// we have the translation code in clients/optimizer. On the other hand,
// flink-streaming-java depends on runtime and clients.

Class<?> streamGraphTranslatorClass;
try {
streamGraphTranslatorClass = Class.forName(
"org.apache.flink.streaming.api.graph.StreamGraphTranslator",
true,
FlinkPipelineTranslationUtil.class.getClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException("Could not load StreamGraphTranslator.", e);
}

FlinkPipelineTranslator streamGraphTranslator;
try {
streamGraphTranslator =
(FlinkPipelineTranslator) streamGraphTranslatorClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Could not instantiate StreamGraphTranslator.", e);
}
return streamGraphTranslator;
throw new RuntimeException("Translator " + streamGraphTranslator + " cannot translate "
+ "the given pipeline " + pipeline + ".");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
*
*/

package org.apache.flink.streaming.api.graph;
package org.apache.flink.client;

import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.FlinkPipelineTranslator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.graph.StreamGraph;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
Expand All @@ -41,7 +42,7 @@ public class ContextEnvironment extends ExecutionEnvironment {

private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);

ContextEnvironment(
public ContextEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader) {
Expand Down Expand Up @@ -95,11 +96,18 @@ public String toString() {

// --------------------------------------------------------------------------------------------

public static void setAsContext(ContextEnvironmentFactory factory) {
public static void setAsContext(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader) {
ExecutionEnvironmentFactory factory = () -> new ContextEnvironment(
executorServiceLoader,
configuration,
userCodeClassLoader);
initializeContextEnvironment(factory);
}

public static void unsetContext() {
public static void unsetAsContext() {
resetContextEnvironment();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,143 +22,39 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;

/**
* An {@link ExecutionEnvironment} that never executes a job but only extracts the {@link
* org.apache.flink.api.dag.Pipeline}.
* An {@link ExecutionEnvironment} that never executes a job but only extracts the {@link Pipeline}.
*/
public class OptimizerPlanEnvironment extends ExecutionEnvironment {

private Pipeline pipeline;

// ------------------------------------------------------------------------
// Execution Environment methods
// ------------------------------------------------------------------------

@Override
public JobClient executeAsync(String jobName) throws Exception {
this.pipeline = createProgramPlan();

// do not go on with anything now!
throw new ProgramAbortException();
public Pipeline getPipeline() {
return pipeline;
}

/**
* Retrieves the JobGraph from a PackagedProgram.
* @param prog The program to run
* @param suppressOutput Whether to suppress stdout/stderr. Output is always printed on errors.
* @return The Flink batch or streaming plan
* @throws ProgramInvocationException in case of errors.
*/
public Pipeline getPipeline(PackagedProgram prog, boolean suppressOutput) throws ProgramInvocationException {

final PrintStream originalOut = System.out;
final PrintStream originalErr = System.err;
final ByteArrayOutputStream stdOutBuffer;
final ByteArrayOutputStream stdErrBuffer;

if (suppressOutput) {
// temporarily write syserr and sysout to a byte array.
stdOutBuffer = new ByteArrayOutputStream();
System.setOut(new PrintStream(stdOutBuffer));
stdErrBuffer = new ByteArrayOutputStream();
System.setErr(new PrintStream(stdErrBuffer));
} else {
stdOutBuffer = null;
stdErrBuffer = null;
}

setAsContext();
try {
prog.invokeInteractiveModeForExecution();
}
catch (ProgramInvocationException e) {
throw e;
}
catch (Throwable t) {
// the invocation gets aborted with the preview plan
if (pipeline != null) {
return pipeline;
} else {
throw generateException(prog, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer);
}
}
finally {
unsetAsContext();
if (suppressOutput) {
System.setOut(originalOut);
System.setErr(originalErr);
}
public OptimizerPlanEnvironment(int parallelism) {
if (parallelism > 0) {
setParallelism(parallelism);
}

throw generateException(prog, "The program plan could not be fetched - the program aborted pre-maturely.", stdOutBuffer, stdErrBuffer);
}

// ------------------------------------------------------------------------
@Override
public JobClient executeAsync(String jobName) {
pipeline = createProgramPlan();

private void setAsContext() {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
// do not go on with anything now!
throw new ProgramAbortException();
}

@Override
public ExecutionEnvironment createExecutionEnvironment() {
return OptimizerPlanEnvironment.this;
}
};
public void setAsContext() {
ExecutionEnvironmentFactory factory = () -> this;
initializeContextEnvironment(factory);
}

private void unsetAsContext() {
public void unsetAsContext() {
resetContextEnvironment();
}

// ------------------------------------------------------------------------

public void setPipeline(Pipeline pipeline){
this.pipeline = pipeline;
}

private static ProgramInvocationException generateException(
PackagedProgram prog,
String msg,
@Nullable ByteArrayOutputStream stdout,
@Nullable ByteArrayOutputStream stderr) {
return generateException(prog, msg, null, stdout, stderr);
}

private static ProgramInvocationException generateException(
PackagedProgram prog,
String msg,
@Nullable Throwable cause,
@Nullable ByteArrayOutputStream stdoutBuffer,
@Nullable ByteArrayOutputStream stderrBuffer) {
Preconditions.checkState((stdoutBuffer != null) == (stderrBuffer != null),
"Stderr/Stdout should either both be set or both be null.");
String stdout = "";
String stderr = "";
if (stdoutBuffer != null) {
stdout = stdoutBuffer.toString();
stderr = stderrBuffer.toString();
}
return new ProgramInvocationException(
String.format("%s\n\nClasspath: %s\n\nSystem.out: %s\n\nSystem.err: %s",
msg,
prog.getJobJarAndDependencies(),
stdout.length() == 0 ? "(none)" : stdout,
stderr.length() == 0 ? "(none)" : stderr),
cause);
}

/**
* A special exception used to abort programs when the caller is only interested in the
* program plan, rather than in the full execution.
*/
public static final class ProgramAbortException extends Error {
private static final long serialVersionUID = 1L;
}
}
Loading

0 comments on commit 0523ef6

Please sign in to comment.