Skip to content

Commit

Permalink
[FLINK-15616][python] Move boot error messages from python-udf-boot.l…
Browse files Browse the repository at this point in the history
…og to taskmanager's log file

Previously, the boot error messages are printed in the log file(python-udf-boot.log) which is very hard
to locate. This commit prints the error messages into the taskmanager log file to make it more user-friendly.

This closes apache#10870.
  • Loading branch information
hequn8128 committed Jan 18, 2020
1 parent 5420f29 commit 4c63ec4
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 36 deletions.
11 changes: 2 additions & 9 deletions flink-python/bin/pyflink-udf-runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,5 @@ if [[ "$_PYTHON_WORKING_DIR" != "" ]]; then
fi
fi

if [[ "$FLINK_LOG_DIR" != "" ]]; then
if [[ "$FLINK_IDENT_STRING" = "" ]]; then
FLINK_IDENT_STRING="$USER"
fi
log="$FLINK_LOG_DIR/flink-$USER-python-udf-boot-$HOSTNAME.log"
${python} -m pyflink.fn_execution.boot $@ 2>&1 | tee -a ${log}
else
${python} -m pyflink.fn_execution.boot $@
fi
log="$BOOT_LOG_DIR/flink-python-udf-boot.log"
${python} -m pyflink.fn_execution.boot $@ 2>&1 | tee -a ${log}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def start_test_provision_server():
self.env = dict(os.environ)
self.env["python"] = sys.executable
self.env["FLINK_BOOT_TESTING"] = "1"
self.env["FLINK_LOG_DIR"] = os.path.join(self.env["FLINK_HOME"], "log")
self.env["BOOT_LOG_DIR"] = os.path.join(self.env["FLINK_HOME"], "log")

self.tmp_dir = tempfile.mkdtemp(str(time.time()), dir=self.tempdir)
# assume that this file is in flink-python source code directory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,21 @@ public void open() throws Exception {
Struct pipelineOptions = PipelineOptionsTranslation.toProto(portableOptions);

jobBundleFactory = createJobBundleFactory(pipelineOptions);
stageBundleFactory = jobBundleFactory.forStage(createExecutableStage());
stageBundleFactory = createStageBundleFactory();
progressHandler = BundleProgressHandler.ignored();
}

/**
* To make the error messages more user friendly, throws an exception with the boot logs.
*/
private StageBundleFactory createStageBundleFactory() throws Exception {
try {
return jobBundleFactory.forStage(createExecutableStage());
} catch (Throwable e) {
throw new RuntimeException(environmentManager.getBootLog(), e);
}
}

@Override
public void close() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -100,16 +99,13 @@ public final class ProcessPythonEnvironmentManager implements PythonEnvironmentM
@NotNull private final PythonDependencyInfo dependencyInfo;
@NotNull private final Map<String, String> systemEnv;
@NotNull private final String[] tmpDirectories;
@Nullable private final String logDirectory;

public ProcessPythonEnvironmentManager(
@NotNull PythonDependencyInfo dependencyInfo,
@NotNull String[] tmpDirectories,
@Nullable String logDirectory,
@NotNull Map<String, String> systemEnv) {
this.dependencyInfo = Objects.requireNonNull(dependencyInfo);
this.tmpDirectories = Objects.requireNonNull(tmpDirectories);
this.logDirectory = logDirectory;
this.systemEnv = Objects.requireNonNull(systemEnv);
}

Expand Down Expand Up @@ -193,10 +189,8 @@ Map<String, String> constructEnvironmentVariables()

constructRequirementsDirectory(env);

// set FLINK_LOG_DIR if the log directory exists
if (!Strings.isNullOrEmpty(logDirectory)) {
env.put("FLINK_LOG_DIR", logDirectory);
}
// set BOOT_LOG_DIR.
env.put("BOOT_LOG_DIR", baseDirectory);

// set the path of python interpreter, it will be used to execute the udf worker.
if (dependencyInfo.getPythonExec().isPresent()) {
Expand Down Expand Up @@ -305,6 +299,17 @@ String getBaseDirectory() {
return baseDirectory;
}

@Override
public String getBootLog() throws Exception {
File bootLogFile = new File(baseDirectory + File.separator + "flink-python-udf-boot.log");
String msg = "Failed to create stage bundle factory!";
if (bootLogFile.exists()) {
byte[] output = Files.readAllBytes(bootLogFile.toPath());
msg += String.format(" %s", new String(output, Charset.defaultCharset()));
}
return msg;
}

private static void appendToPythonPath(Map<String, String> env, List<String> pythonDependencies) {
if (pythonDependencies.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ public interface PythonEnvironmentManager extends AutoCloseable {
* @return The path of the RetrievalToken file.
*/
String createRetrievalToken() throws Exception;

/**
* Returns the boot log of the Python Environment.
*/
String getBootLog() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.operators.python;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.memory.MemoryType;
Expand All @@ -40,7 +39,6 @@
import org.apache.flink.table.functions.python.PythonEnv;
import org.apache.flink.util.Preconditions;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -342,16 +340,9 @@ protected PythonEnvironmentManager createPythonEnvironmentManager() throws IOExc
config, getRuntimeContext().getDistributedCache());
PythonEnv pythonEnv = getPythonEnv();
if (pythonEnv.getExecType() == PythonEnv.ExecType.PROCESS) {
String taskManagerLogFile = getContainingTask()
.getEnvironment()
.getTaskManagerInfo()
.getConfiguration()
.getString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, System.getProperty("log.file"));
String logDirectory = taskManagerLogFile == null ? null : new File(taskManagerLogFile).getParent();
return new ProcessPythonEnvironmentManager(
dependencyInfo,
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
logDirectory,
System.getenv());
} else {
throw new UnsupportedOperationException(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public void testCreateRetrievalToken() throws Exception {
sysEnv.put("FLINK_HOME", "/flink");

try (ProcessPythonEnvironmentManager environmentManager =
new ProcessPythonEnvironmentManager(dependencyInfo, new String[] {tmpDir}, null, sysEnv)) {
new ProcessPythonEnvironmentManager(dependencyInfo, new String[] {tmpDir}, sysEnv)) {
environmentManager.open();
String retrievalToken = environmentManager.createRetrievalToken();

Expand All @@ -310,11 +310,11 @@ public void testSetLogDirectory() throws Exception {
null);

try (ProcessPythonEnvironmentManager environmentManager = new ProcessPythonEnvironmentManager(
dependencyInfo, new String[] {tmpDir}, "/tmp/log", new HashMap<>())) {
dependencyInfo, new String[] {tmpDir}, new HashMap<>())) {
environmentManager.open();
Map<String, String> env = environmentManager.constructEnvironmentVariables();
Map<String, String> expected = getBasicExpectedEnv(environmentManager);
expected.put("FLINK_LOG_DIR", "/tmp/log");
expected.put("BOOT_LOG_DIR", environmentManager.getBaseDirectory());
assertEquals(expected, env);
}
}
Expand Down Expand Up @@ -387,12 +387,13 @@ private static Map<String, String> getBasicExpectedEnv(ProcessPythonEnvironmentM
String.join(File.separator, tmpBase, "pyflink.zip"),
String.join(File.separator, tmpBase, "py4j-0.10.8.1-src.zip"),
String.join(File.separator, tmpBase, "cloudpickle-1.2.2-src.zip")));
map.put("BOOT_LOG_DIR", tmpBase);
return map;
}

private static ProcessPythonEnvironmentManager createBasicPythonEnvironmentManager(
PythonDependencyInfo dependencyInfo) {
return new ProcessPythonEnvironmentManager(
dependencyInfo, new String[] {tmpDir}, null, new HashMap<>());
dependencyInfo, new String[] {tmpDir}, new HashMap<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public AbstractPythonScalarFunctionRunner<BaseRow, BaseRow> createPythonScalarFu
new ProcessPythonEnvironmentManager(
new PythonDependencyInfo(new HashMap<>(), null, null, new HashMap<>(), null),
new String[] {System.getProperty("java.io.tmpdir")},
null,
new HashMap<>());

return new BaseRowPythonScalarFunctionRunner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ public AbstractPythonScalarFunctionRunner<Row, Row> createPythonScalarFunctionRu
new ProcessPythonEnvironmentManager(
new PythonDependencyInfo(new HashMap<>(), null, null, new HashMap<>(), null),
new String[] {System.getProperty("java.io.tmpdir")},
null,
new HashMap<>());

return new PythonScalarFunctionRunner(
Expand All @@ -257,7 +256,6 @@ private AbstractPythonScalarFunctionRunner<Row, Row> createUDFRunner(
new ProcessPythonEnvironmentManager(
new PythonDependencyInfo(new HashMap<>(), null, null, new HashMap<>(), null),
new String[] {System.getProperty("java.io.tmpdir")},
null,
new HashMap<>());

return new PythonScalarFunctionRunnerTestHarness(
Expand Down

0 comments on commit 4c63ec4

Please sign in to comment.