Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12541][container][python] Add support for Python jobs in build script #8609

Closed
wants to merge 3 commits into from

Conversation

dianfu
Copy link
Contributor

@dianfu dianfu commented Jun 4, 2019

What is the purpose of the change

This pull request adds support to build job specific docker image for Python Table API jobs.

Brief change log

  • Improves the build script to support build a Python job specific docker image

Verifying this change

Verify this change manually.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 4, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@dianfu dianfu force-pushed the FLINK-12541-docker branch 3 times, most recently from bfefeb7 to 900f728 Compare June 4, 2019 08:56
@sunjincheng121
Copy link
Member

@flinkbot attention @tillrohrmann

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening this PR @dianfu. I think this PR does not properly separate concerns. It mixes the concern of Python programs with the StandaloneJobClusterEntrypoint, StandaloneJobClusterConfiguration and the ClassPathJobGraphRetriever. I suspect that there is a better separation of concerns by delegating the responsibility of parsing python specific job options to the PythonDriver. Otherwise I fear that we are going to add special case logic for every language binding which we might support in the future.

I would also suggest to add appropriate test cases to test your changes. Moreover, your first commit contains unrelated changes to the WordCount example which are attributed to FLINK-12541. Please revert them.

this(jobId, savepointRestoreSettings, programArguments, jobClassName, JarsOnClassPath.INSTANCE);
@Nullable String jobEntryPointName,
@Nullable String jobPythonArtifacts) {
this(jobId, savepointRestoreSettings, programArguments, jobEntryPointName, jobPythonArtifacts, JarsOnClassPath.INSTANCE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to touch the ClassPathJobGraphRetriever at all? This class should not need to know about Python. Otherwise we need to add special casing for all supported languages in the future. This does not seem right.

throw new FlinkException("Could not load the provided entrypoint class.", e);
} catch (ClassNotFoundException e) {
if (!isPythonProgram && jobPythonArtifacts != null) {
return createPackagedProgram("org.apache.flink.python.client.PythonDriver", true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is overly complicated and error prone. Why not starting the cluster entrypoint with o.a.f.python.client.PythonDriver as jobClassName?

args[2] = "pyfs";
args[3] = jobPythonArtifacts;
System.arraycopy(programArguments, 0, args, 4, programArguments.length);
return args;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to do this command line argument magic? Just because PythonDriver expects the arguments to be passed in a special order? Why not making the PythonDriver more flexible.

}

@Nullable
String getJobPythonArtifacts() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

.desc("Entrypoint name of the job to run.")
.build();

private static final Option JOB_PYTHON_ARTIFACTS_OPTION = Option.builder("ja")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need to be handled by the StandaloenJobClusterConfiguration and cannot be parsed by the PythonDriver program?

private final String jobEntryPointName;

@Nullable
private final String jobPythonArtifacts;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not a good idea to add special case logic into the StandaloneJobClusterEntrypoint. Why do you think this is needed?

@tillrohrmann
Copy link
Contributor

Why has this PR the same Flink issue assigned as #8532? Every PR should have it's own JIRA issue as stated in the contribution guidelines.

Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @dianfu
I think the suggestion from @tillrohrmann makes more sense to me. we should let the PR have its own JIRA
furthermore, the improvement of pyfs also can in a new PR (with a new JIRA).
BTW: please rebase the code, and will have another review :)
What do you think?

@dianfu
Copy link
Contributor Author

dianfu commented Jun 10, 2019

@tillrohrmann Thanks a lot for your review.
Your suggestion makes much sense to me. I have created a dedicated JIRA FLINK-12788 for this PR. Regarding to the changes to StandaloneJobClusterEntrypoint, agree that there should not be special logic for Python. I will revert that part of changes.

@sunjincheng121 Thanks a lot for your review. I have created a separate JIRA FLINK-12787 for the PythonDriver improving. Then we can focus this PR on the build script changes for Python jobs support.

Will updated the PR later today.

@dianfu
Copy link
Contributor Author

dianfu commented Jun 11, 2019

@tillrohrmann @sunjincheng121 I have updated the PR and the changes are only related to the build script for Python jobs support. Looking forward to your feedback.

@sunjincheng121
Copy link
Member

sunjincheng121 commented Jun 11, 2019

Thanks for the update @dianfu!

Currently, the PR looks pretty clean form my side. only one improvement I am not pretty sure that is build command option --opt-jars. In this way the opt jar does not need upload. but without this option, the user can build a fat jar with opt jars. So this change is an improvement(not necessary change) So we may need some opinion from @tillrohrmann!
Otherwise the PR LGTM. I'll merge it when @tillrohrmann says ok!

Best,
Jincheng

@sunjincheng121
Copy link
Member

I think we can remove the --opt-jars due to we @StephanEwen already bring up the discussion for put the table JAR into lib. detail can be found here: http:https://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Putting-Table-API-jars-in-lib-by-default-td29492.html
What do you think?

@dianfu
Copy link
Contributor Author

dianfu commented Jun 12, 2019

@sunjincheng121 Make sense to me as --opt-jars is not a must have option. Considering that the table JAR will be put into lib by default in the future, removing this option is good for me as the table JAR is the motivation for this option. Have updated the PR. Looking forward to your response.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look good now @dianfu. I had two last comments which we should address before merging this PR.

@@ -29,7 +29,7 @@ In non HA mode, you should first start the job cluster service:

In order to deploy the job cluster entrypoint run:

`FLINK_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB=<JOB_NAME> FLINK_JOB_PARALLELISM=<PARALLELISM> envsubst < job-cluster-job.yaml.template | kubectl create -f -`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove FLINK_JOB here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that FLINK_JOB has been removed in commit 753e0c6 and just correct the documentation here. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Now this makes sense.

@@ -13,26 +13,28 @@ Install the most recent stable version of [Docker](https://docs.docker.com/insta
Images are based on the official Java Alpine (OpenJDK 8) image.

Before building the image, one needs to build the user code jars for the job.
Assume that the job jar is stored under `<PATH_TO_JOB_JAR>`
Assume that the job jar is stored under `<COMMA_SEPARATED_PATH_TO_JOB_ARTIFACTS>`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems not to fit. I guess the sentence needs to be changed to something like: A Flink job can consist of multiple jars. In order to specify the required jars, they need to be passed to --job-artifacts` of the build script. The individual paths are comma separated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. Only one concern: what about changing jars to artifacts?

@dianfu
Copy link
Contributor Author

dianfu commented Jun 12, 2019

@tillrohrmann Thanks a lot for your comments. I have updated the PR and addressed one of your concerns. Regarding to FLINK_JOB, this option has been removed and the documentation has not been updated. Just correct the documentation here. Feel free to let me know if you feel that change doesn't make sense or you prefer to address that in another PR.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing my comments @dianfu. LGTM. Failing test seems to be unrelated. Merging now.

@@ -29,7 +29,7 @@ In non HA mode, you should first start the job cluster service:

In order to deploy the job cluster entrypoint run:

`FLINK_IMAGE_NAME=<IMAGE_NAME> FLINK_JOB=<JOB_NAME> FLINK_JOB_PARALLELISM=<PARALLELISM> envsubst < job-cluster-job.yaml.template | kubectl create -f -`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Now this makes sense.

@knaufk
Copy link
Contributor

knaufk commented Jun 19, 2019

FYI: This change broke test_docker_embedded_job.sh and test_kubernetes_embedded_job.sh as they were still using the --job-jar argument. I have included a [hotfix] in #8741.

@dianfu
Copy link
Contributor Author

dianfu commented Jun 19, 2019

@knaufk Thanks a lot for the hotfix. The fix makes sense to me.

sjwiesman pushed a commit to sjwiesman/flink that referenced this pull request Jun 26, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants