Skip to content

Commit

Permalink
[FLINK-18490][python] Refactor PythonFunctionRunner to make it more g…
Browse files Browse the repository at this point in the history
…eneral

This closes apache#12841.
  • Loading branch information
HuangXingBo authored and dianfu committed Jul 9, 2020
1 parent 584dca1 commit 03934b3
Show file tree
Hide file tree
Showing 57 changed files with 1,241 additions and 2,729 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def test_param_validation(self):

def test_set_working_directory(self):
JProcessPythonEnvironmentManager = \
get_gateway().jvm.org.apache.flink.python.env.ProcessPythonEnvironmentManager
get_gateway().jvm.org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager

output_file = os.path.join(self.tmp_dir, "output.txt")
pyflink_dir = os.path.join(self.tmp_dir, "pyflink")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,45 @@
package org.apache.flink.python;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;

/**
* The base interface of runner which is responsible for the execution of Python functions.
*
* @param <IN> Type of the input elements.
*/
@Internal
public interface PythonFunctionRunner<IN> {
public interface PythonFunctionRunner {

/**
* Prepares the Python function runner, such as preparing the Python execution environment, etc.
*/
void open() throws Exception;
void open(PythonConfig config) throws Exception;

/**
* Tear-down the Python function runner.
*/
void close() throws Exception;

/**
* Prepares to process the next bundle of elements.
* Executes the Python function with the input byte array.
*
* @param data the byte array data.
*/
void process(byte[] data) throws Exception;

/**
* Retrieves the Python function result.
*
* @return the head of he Python function result buffer, or {@code null} if the result buffer is empty.
* f0 means the byte array buffer which stores the Python function result.
* f1 means the length of the Python function result byte array.
*/
void startBundle() throws Exception;
Tuple2<byte[], Integer> pollResult() throws Exception;

/**
* Forces to finish the processing of the current bundle of elements. It will flush
* the data cached in the data buffer for processing and retrieves the state mutations (if exists)
* made by the Python function. The call blocks until all of the outputs produced by this
* bundle have been received.
*/
void finishBundle() throws Exception;

/**
* Executes the Python function with the input element. It's not required to execute
* the Python function immediately. The runner may choose to buffer the input element and
* execute them in batch for better performance.
*/
void processElement(IN element) throws Exception;
void flush() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.python.env;

import org.apache.flink.annotation.Internal;

import java.util.Map;

/**
* A {@link PythonEnvironment} for executing UDFs in Process.
*/
@Internal
public class ProcessPythonEnvironment implements PythonEnvironment {
private final String command;
private final Map<String, String> env;

public ProcessPythonEnvironment(String command, Map<String, String> env) {
this.command = command;
this.env = env;
}

public Map<String, String> getEnv() {
return env;
}

public String getCommand() {
return command;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.python.env;

import org.apache.flink.annotation.Internal;

/**
* The base interface of python environment for executing UDFs.
*/
@Internal
public interface PythonEnvironment {
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package org.apache.flink.python.env;

import org.apache.flink.annotation.Internal;
import org.apache.flink.python.PythonFunctionRunner;

import org.apache.beam.model.pipeline.v1.RunnerApi;

/**
* The base interface of python environment manager which is used to create the Environment object and the
* RetrievalToken of Beam Fn API.
* The base interface of python environment manager which is used to create the PythonEnvironment object and the
* RetrievalToken.
*/
@Internal
public interface PythonEnvironmentManager extends AutoCloseable {
Expand All @@ -35,16 +35,16 @@ public interface PythonEnvironmentManager extends AutoCloseable {
void open() throws Exception;

/**
* Creates the Environment object used in Apache Beam Fn API.
* Creates the PythonEnvironment object used in {@link PythonFunctionRunner}.
*
* @return The Environment object which represents the environment(process, docker, etc) the python worker would run
* @return The PythonEnvironment object which represents the environment(process, docker, etc) the python worker would run
* in.
*/
RunnerApi.Environment createEnvironment() throws Exception;
PythonEnvironment createEnvironment() throws Exception;

/**
* Creates the RetrievalToken used in Apache Beam Fn API. It contains a list of files which need to transmit through
* ArtifactService provided by Apache Beam.
* Creates the RetrievalToken used in {@link PythonFunctionRunner}. It contains a list of files
* which need to transmit through ArtifactService provided by {@link PythonFunctionRunner}.
*
* @return The path of the RetrievalToken file.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@
* limitations under the License.
*/

package org.apache.flink.python.env;
package org.apache.flink.python.env.beam;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.python.env.ProcessPythonEnvironment;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.python.env.PythonEnvironment;
import org.apache.flink.python.env.PythonEnvironmentManager;
import org.apache.flink.python.util.PythonEnvironmentManagerUtils;
import org.apache.flink.python.util.ZipUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.ShutdownHookUtil;

import org.apache.flink.shaded.guava18.com.google.common.base.Strings;

import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
import org.codehaus.commons.nullanalysis.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,7 +54,7 @@

/**
* The ProcessPythonEnvironmentManager is used to prepare the working dir of python UDF worker and create
* ProcessEnvironment object of Beam Fn API. It's used when the python function runner is configured to run python UDF
* ProcessPythonEnvironment object of Beam Fn API. It's used when the python function runner is configured to run python UDF
* in process mode.
*/
@Internal
Expand Down Expand Up @@ -164,7 +166,7 @@ public void close() throws Exception {
}

@Override
public RunnerApi.Environment createEnvironment() throws IOException {
public PythonEnvironment createEnvironment() throws IOException {
Map<String, String> env = constructEnvironmentVariables();

if (dependencyInfo.getRequirementsFilePath().isPresent()) {
Expand All @@ -178,11 +180,7 @@ public RunnerApi.Environment createEnvironment() throws IOException {
}
String runnerScript = PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(dependencyInfo.getPythonExec(), env);

return Environments.createProcessEnvironment(
"",
"",
runnerScript,
env);
return new ProcessPythonEnvironment(runnerScript, env);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.flink.python.env.ProcessPythonEnvironmentManager.PYTHON_WORKING_DIR;
import static org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.PYTHON_WORKING_DIR;

/**
* Utils used to prepare the python environment.
Expand Down
Loading

0 comments on commit 03934b3

Please sign in to comment.