Skip to content

Commit

Permalink
[FLINK-15504] Allow output to stdout/stderr during execution of Packa…
Browse files Browse the repository at this point in the history
…gedProgram

We suppress the output to stdout/stderr during plan extraction via
PackagedProgram. This has unintended consequences for users who are looking into
debugging their Flink programs during JobGraph creation.

This change removes the suppression of output when we run the JARs. The plan
preview still suppresses the output to avoid spaming the logs during plan
preview.
  • Loading branch information
mxm committed Jan 10, 2020
1 parent f0f9343 commit 4bb4a50
Show file tree
Hide file tree
Showing 16 changed files with 329 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ protected void info(String[] args) throws CliArgsException, FileNotFoundExceptio

LOG.info("Creating program plan dump");

Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, parallelism);
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(program, parallelism, true);
String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);

if (jsonPlan != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
Expand All @@ -46,15 +49,30 @@ public JobClient executeAsync(String jobName) throws Exception {
throw new ProgramAbortException();
}

public Pipeline getPipeline(PackagedProgram prog) throws ProgramInvocationException {

// temporarily write syserr and sysout to a byte array.
PrintStream originalOut = System.out;
PrintStream originalErr = System.err;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
System.setOut(new PrintStream(baos));
ByteArrayOutputStream baes = new ByteArrayOutputStream();
System.setErr(new PrintStream(baes));
/**
* Retrieves the JobGraph from a PackagedProgram.
* @param prog The program to run
* @param suppressOutput Whether to suppress stdout/stderr. Output is always printed on errors.
* @return The Flink batch or streaming plan
* @throws ProgramInvocationException in case of errors.
*/
public Pipeline getPipeline(PackagedProgram prog, boolean suppressOutput) throws ProgramInvocationException {

final PrintStream originalOut = System.out;
final PrintStream originalErr = System.err;
final ByteArrayOutputStream stdOutBuffer;
final ByteArrayOutputStream stdErrBuffer;

if (suppressOutput) {
// temporarily write syserr and sysout to a byte array.
stdOutBuffer = new ByteArrayOutputStream();
System.setOut(new PrintStream(stdOutBuffer));
stdErrBuffer = new ByteArrayOutputStream();
System.setErr(new PrintStream(stdErrBuffer));
} else {
stdOutBuffer = null;
stdErrBuffer = null;
}

setAsContext();
try {
Expand All @@ -68,23 +86,20 @@ public Pipeline getPipeline(PackagedProgram prog) throws ProgramInvocationExcept
if (pipeline != null) {
return pipeline;
} else {
throw new ProgramInvocationException("The program caused an error: ", t);
throw generateException(prog, "The program caused an error: ", t, stdOutBuffer, stdErrBuffer);
}
}
finally {
unsetAsContext();
System.setOut(originalOut);
System.setErr(originalErr);
if (suppressOutput) {
System.setOut(originalOut);
System.setErr(originalErr);
}
}

String stdout = baos.toString();
String stderr = baes.toString();

throw new ProgramInvocationException(
"The program plan could not be fetched - the program aborted pre-maturely."
+ "\n\nSystem.err: " + (stderr.length() == 0 ? "(none)" : stderr)
+ "\n\nSystem.out: " + (stdout.length() == 0 ? "(none)" : stdout));
throw generateException(prog, "The program plan could not be fetched - the program aborted pre-maturely.", stdOutBuffer, stdErrBuffer);
}

// ------------------------------------------------------------------------

private void setAsContext() {
Expand All @@ -108,6 +123,37 @@ public void setPipeline(Pipeline pipeline){
this.pipeline = pipeline;
}

private static ProgramInvocationException generateException(
PackagedProgram prog,
String msg,
@Nullable ByteArrayOutputStream stdout,
@Nullable ByteArrayOutputStream stderr) {
return generateException(prog, msg, null, stdout, stderr);
}

private static ProgramInvocationException generateException(
PackagedProgram prog,
String msg,
@Nullable Throwable cause,
@Nullable ByteArrayOutputStream stdoutBuffer,
@Nullable ByteArrayOutputStream stderrBuffer) {
Preconditions.checkState((stdoutBuffer != null) == (stderrBuffer != null),
"Stderr/Stdout should either both be set or both be null.");
String stdout = "";
String stderr = "";
if (stdoutBuffer != null) {
stdout = stdoutBuffer.toString();
stderr = stderrBuffer.toString();
}
return new ProgramInvocationException(
String.format("%s\n\nClasspath: %s\n\nSystem.out: %s\n\nSystem.err: %s",
msg,
prog.getJobJarAndDependencies(),
stdout.length() == 0 ? "(none)" : stdout,
stderr.length() == 0 ? "(none)" : stderr),
cause);
}

/**
* A special exception used to abort programs when the caller is only interested in the
* program plan, rather than in the full execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class PackagedProgramUtils {
private static final String PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver";

private static final String PYTHON_GATEWAY_CLASS_NAME = "org.apache.flink.client.python.PythonGatewayServer";

/**
* Creates a {@link JobGraph} with a specified {@link JobID}
* from the given {@link PackagedProgram}.
Expand All @@ -50,8 +51,9 @@ public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism,
@Nullable JobID jobID) throws ProgramInvocationException {
final Pipeline pipeline = getPipelineFromProgram(packagedProgram, defaultParallelism);
@Nullable JobID jobID,
boolean suppressOutput) throws ProgramInvocationException {
final Pipeline pipeline = getPipelineFromProgram(packagedProgram, defaultParallelism, suppressOutput);
final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline, configuration, defaultParallelism);

if (jobID != null) {
Expand All @@ -71,19 +73,22 @@ public static JobGraph createJobGraph(
* @param packagedProgram to extract the JobGraph from
* @param configuration to use for the optimizer and job graph generator
* @param defaultParallelism for the JobGraph
* @param suppressOutput Whether to suppress stdout/stderr during interactive JobGraph creation.
* @return JobGraph extracted from the PackagedProgram
* @throws ProgramInvocationException if the JobGraph generation failed
*/
public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
int defaultParallelism) throws ProgramInvocationException {
return createJobGraph(packagedProgram, configuration, defaultParallelism, null);
int defaultParallelism,
boolean suppressOutput) throws ProgramInvocationException {
return createJobGraph(packagedProgram, configuration, defaultParallelism, null, suppressOutput);
}

public static Pipeline getPipelineFromProgram(
PackagedProgram prog,
int parallelism) throws CompilerException, ProgramInvocationException {
int parallelism,
boolean suppressOutput) throws CompilerException, ProgramInvocationException {
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
Expand All @@ -93,7 +98,7 @@ public static Pipeline getPipelineFromProgram(
if (parallelism > 0) {
env.setParallelism(parallelism);
}
return env.getPipeline(prog);
return env.getPipeline(prog, suppressOutput);
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ public Class<?> loadClass(String name) throws ClassNotFoundException {
Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), c);

// we expect this to fail with a "ClassNotFoundException"
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(prog, 666);
Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(prog, 666, true);
FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline);
fail("Should have failed with a ClassNotFoundException");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void testGetExecutionPlan() throws ProgramInvocationException {
.build();

Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, 1);
Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, 1, true);
OptimizedPlan op = optimizer.compile(plan);
assertNotNull(op);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testGetExecutionPlan() {
config.setInteger(JobManagerOptions.PORT, mockJmAddress.getPort());

Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, -1);
Plan plan = (Plan) PackagedProgramUtils.getPipelineFromProgram(prg, -1, true);
OptimizedPlan op = optimizer.compile(plan);
assertNotNull(op);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.client.program;

import org.junit.Assert;
import org.junit.Test;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.MatcherAssert.assertThat;

/**
* Tests for {@link OptimizerPlanEnvironment}.
*/
public class OptimizerPlanEnvironmentTest {

/**
* Test the two modes for handling stdout/stderr of user program.
* (1) Capturing the output and including it only in the exception
* (2) Leaving the output untouched
*/
@Test
public void testStdOutStdErrHandling() throws Exception {
runOutputTest(true, new String[] {"System.out: hello out!", "System.err: hello err!"});
runOutputTest(false, new String[] {"System.out: (none)", "System.err: (none)"});
}

private void runOutputTest(boolean suppressOutput, String[] expectedCapturedOutput) throws ProgramInvocationException {
PackagedProgram packagedProgram = PackagedProgram.newBuilder()
.setEntryPointClassName(getClass().getName())
.build();
OptimizerPlanEnvironment env = new OptimizerPlanEnvironment();
try {
// Flink will throw an error because no job graph will be generated by the main method.
env.getPipeline(packagedProgram, suppressOutput);
Assert.fail("This should have failed to create the Flink Plan.");
} catch (ProgramInvocationException e) {
// Test that that Flink captured the expected stdout/stderr
for (String expected : expectedCapturedOutput) {
assertThat(e.getMessage(), containsString(expected));
}
}
}

/**
* Main method for {@code testEnsureStdoutStdErrIsRestored()}.
* This will not create a valid Flink program. We will just use this program to check whether stdout/stderr is
* captured in a byte buffer or directly printed to the console.
* */
public static void main(String[] args) {
// Print something to stdout/stderr for output suppression test
System.out.println("hello out!");
System.err.println("hello err!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept
packagedProgram,
configuration,
defaultParallelism,
jobId);
jobId,
false);
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);

return jobGraph;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected CompletableFuture<JobPlanInfo> handleRequest(
final JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir, log);

return CompletableFuture.supplyAsync(() -> {
final JobGraph jobGraph = context.toJobGraph(configuration);
final JobGraph jobGraph = context.toJobGraph(configuration, true);
return planGenerator.apply(jobGraph);
}, executor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private CompletableFuture<JobGraph> getJobGraphAsync(
JarHandlerContext context,
final SavepointRestoreSettings savepointRestoreSettings) {
return CompletableFuture.supplyAsync(() -> {
final JobGraph jobGraph = context.toJobGraph(configuration);
final JobGraph jobGraph = context.toJobGraph(configuration, false);
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
return jobGraph;
}, executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public static <R extends JarRequestBody> JarHandlerContext fromRequest(
return new JarHandlerContext(jarFile, entryClass, programArgs, parallelism, jobId);
}

public JobGraph toJobGraph(Configuration configuration) {
public JobGraph toJobGraph(Configuration configuration, boolean suppressOutput) {
if (!Files.exists(jarFile)) {
throw new CompletionException(new RestHandlerException(
String.format("Jar file %s does not exist", jarFile), HttpResponseStatus.BAD_REQUEST));
Expand All @@ -125,7 +125,7 @@ public JobGraph toJobGraph(Configuration configuration) {
.setConfiguration(configuration)
.setArguments(programArgs.toArray(new String[0]))
.build();
return PackagedProgramUtils.createJobGraph(packagedProgram, configuration, parallelism, jobId);
return PackagedProgramUtils.createJobGraph(packagedProgram, configuration, parallelism, jobId, suppressOutput);
} catch (final ProgramInvocationException e) {
throw new CompletionException(e);
}
Expand Down
Loading

0 comments on commit 4bb4a50

Please sign in to comment.