-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12541][container][python] Add support for Python jobs in build script #8609
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
bfefeb7
to
900f728
Compare
@flinkbot attention @tillrohrmann |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @dianfu. I think this PR does not properly separate concerns. It mixes the concern of Python programs with the StandaloneJobClusterEntrypoint
, StandaloneJobClusterConfiguration
and the ClassPathJobGraphRetriever
. I suspect that there is a better separation of concerns by delegating the responsibility of parsing python specific job options to the PythonDriver
. Otherwise I fear that we are going to add special case logic for every language binding which we might support in the future.
I would also suggest to add appropriate test cases to test your changes. Moreover, your first commit contains unrelated changes to the WordCount example which are attributed to FLINK-12541. Please revert them.
this(jobId, savepointRestoreSettings, programArguments, jobClassName, JarsOnClassPath.INSTANCE); | ||
@Nullable String jobEntryPointName, | ||
@Nullable String jobPythonArtifacts) { | ||
this(jobId, savepointRestoreSettings, programArguments, jobEntryPointName, jobPythonArtifacts, JarsOnClassPath.INSTANCE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to touch the ClassPathJobGraphRetriever
at all? This class should not need to know about Python. Otherwise we need to add special casing for all supported languages in the future. This does not seem right.
throw new FlinkException("Could not load the provided entrypoint class.", e); | ||
} catch (ClassNotFoundException e) { | ||
if (!isPythonProgram && jobPythonArtifacts != null) { | ||
return createPackagedProgram("org.apache.flink.python.client.PythonDriver", true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is overly complicated and error prone. Why not starting the cluster entrypoint with o.a.f.python.client.PythonDriver
as jobClassName
?
args[2] = "pyfs"; | ||
args[3] = jobPythonArtifacts; | ||
System.arraycopy(programArguments, 0, args, 4, programArguments.length); | ||
return args; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to do this command line argument magic? Just because PythonDriver
expects the arguments to be passed in a special order? Why not making the PythonDriver
more flexible.
} | ||
|
||
@Nullable | ||
String getJobPythonArtifacts() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed?
.desc("Entrypoint name of the job to run.") | ||
.build(); | ||
|
||
private static final Option JOB_PYTHON_ARTIFACTS_OPTION = Option.builder("ja") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to be handled by the StandaloenJobClusterConfiguration
and cannot be parsed by the PythonDriver
program?
private final String jobEntryPointName; | ||
|
||
@Nullable | ||
private final String jobPythonArtifacts; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is not a good idea to add special case logic into the StandaloneJobClusterEntrypoint
. Why do you think this is needed?
Why has this PR the same Flink issue assigned as #8532? Every PR should have it's own JIRA issue as stated in the contribution guidelines. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @dianfu
I think the suggestion from @tillrohrmann makes more sense to me. we should let the PR have its own JIRA
furthermore, the improvement of pyfs
also can in a new PR (with a new JIRA).
BTW: please rebase the code, and will have another review :)
What do you think?
@tillrohrmann Thanks a lot for your review. @sunjincheng121 Thanks a lot for your review. I have created a separate JIRA FLINK-12787 for the PythonDriver improving. Then we can focus this PR on the build script changes for Python jobs support. Will updated the PR later today. |
900f728
to
904d9d7
Compare
@tillrohrmann @sunjincheng121 I have updated the PR and the changes are only related to the build script for Python jobs support. Looking forward to your feedback. |
Thanks for the update @dianfu! Currently, the PR looks pretty clean form my side. only one improvement I am not pretty sure that is build command option Best, |
I think we can remove the |
@sunjincheng121 Make sense to me as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look good now @dianfu. I had two last comments which we should address before merging this PR.
@@ -29,7 +29,7 @@ In non HA mode, you should first start the job cluster service: | |||
|
|||
In order to deploy the job cluster entrypoint run: | |||
|
|||
`FLINK_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB=<JOB_NAME> FLINK_JOB_PARALLELISM=<PARALLELISM> envsubst < job-cluster-job.yaml.template | kubectl create -f -` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we remove FLINK_JOB
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that FLINK_JOB has been removed in commit 753e0c6 and just correct the documentation here. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Now this makes sense.
flink-container/docker/README.md
Outdated
@@ -13,26 +13,28 @@ Install the most recent stable version of [Docker](https://docs.docker.com/insta | |||
Images are based on the official Java Alpine (OpenJDK 8) image. | |||
|
|||
Before building the image, one needs to build the user code jars for the job. | |||
Assume that the job jar is stored under `<PATH_TO_JOB_JAR>` | |||
Assume that the job jar is stored under `<COMMA_SEPARATED_PATH_TO_JOB_ARTIFACTS>` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems not to fit. I guess the sentence needs to be changed to something like: A Flink job can consist of multiple jars. In order to specify the required jars, they need to be passed to
--job-artifacts` of the build script. The individual paths are comma separated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Only one concern: what about changing jars
to artifacts
?
@tillrohrmann Thanks a lot for your comments. I have updated the PR and addressed one of your concerns. Regarding to FLINK_JOB, this option has been removed and the documentation has not been updated. Just correct the documentation here. Feel free to let me know if you feel that change doesn't make sense or you prefer to address that in another PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing my comments @dianfu. LGTM. Failing test seems to be unrelated. Merging now.
@@ -29,7 +29,7 @@ In non HA mode, you should first start the job cluster service: | |||
|
|||
In order to deploy the job cluster entrypoint run: | |||
|
|||
`FLINK_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB=<JOB_NAME> FLINK_JOB_PARALLELISM=<PARALLELISM> envsubst < job-cluster-job.yaml.template | kubectl create -f -` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Now this makes sense.
FYI: This change broke |
@knaufk Thanks a lot for the hotfix. The fix makes sense to me. |
What is the purpose of the change
This pull request adds support to build job specific docker image for Python Table API jobs.
Brief change log
Verifying this change
Verify this change manually.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation