Skip to content

Commit

Permalink
[FLINK-20365][python] The native k8s cluster could not be unregistere…
Browse files Browse the repository at this point in the history
…d when executing Python DataStream jobs in attach mode

This closes 14232.
  • Loading branch information
shuiqiangchen authored and dianfu committed Nov 26, 2020
1 parent e2b34eb commit af1b2cf
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 5 deletions.
3 changes: 3 additions & 0 deletions flink-python/pyflink/util/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ def deco(*a, **kw):
try:
return f(*a, **kw)
except Py4JJavaError as e:
from pyflink.java_gateway import get_gateway
get_gateway().jvm.org.apache.flink.client.python.PythonEnvUtils\
.setPythonException(e.java_exception)
s = e.java_exception.toString()
stack_trace = '\n\t at '.join(map(lambda x: x.toString(),
e.java_exception.getStackTrace()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

/**
* A main class used to launch Python applications. It executes python as a
Expand All @@ -42,7 +41,7 @@
public final class PythonDriver {
private static final Logger LOG = LoggerFactory.getLogger(PythonDriver.class);

public static void main(String[] args) throws ExecutionException, InterruptedException {
public static void main(String[] args) throws Throwable {
// The python job needs at least 2 args.
// e.g. py a.py [user args]
// e.g. pym a.b [user args]
Expand Down Expand Up @@ -106,9 +105,13 @@ public static void main(String[] args) throws ExecutionException, InterruptedExc
} catch (Throwable e) {
LOG.error("Run python process failed", e);

// throw ProgramAbortException if the caller is interested in the program plan,
// there is no harm to throw ProgramAbortException even if it is not the case.
throw new ProgramAbortException();
if (PythonEnvUtils.capturedJavaException != null) {
throw PythonEnvUtils.capturedJavaException;
} else {
// throw ProgramAbortException if the caller is interested in the program plan,
// there is no harm to throw ProgramAbortException even if it is not the case.
throw new ProgramAbortException();
}
} finally {
PythonEnvUtils.setGatewayServer(null);
gatewayServer.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.flink.client.python;

import org.apache.flink.client.deployment.application.UnsuccessfulExecutionException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.OperatingSystem;
Expand Down Expand Up @@ -73,6 +75,8 @@ final class PythonEnvUtils {

static final String PYFLINK_CLIENT_EXECUTABLE = "PYFLINK_CLIENT_EXECUTABLE";

static volatile Throwable capturedJavaException = null;

/**
* Wraps Python exec environment.
*/
Expand Down Expand Up @@ -375,4 +379,10 @@ static Process launchPy4jPythonClient(
// start the python process.
return PythonEnvUtils.startPythonProcess(pythonEnv, commands, redirectToPipe);
}

public static void setPythonException(Throwable pythonException) {
if (ExceptionUtils.findThrowable(pythonException, UnsuccessfulExecutionException.class).isPresent()) {
capturedJavaException = pythonException;
}
}
}

0 comments on commit af1b2cf

Please sign in to comment.