Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-12541][REST] Support to submit Python Table API jobs via REST API #8532

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

dianfu
Copy link
Contributor

@dianfu dianfu commented May 24, 2019

What is the purpose of the change

This pull request supports to submit Python Table API jobs via REST API.
To support submit resource files other than jar, we need to update the REST API a bit as specified in FLIP38

Current REST API New REST API Remarks
GET /jars GET /artifacts Now it lists both jar files and python package files.
POST /jars/upload POST /artifacts/upload Allow users to upload both jar files and python package files. Use "Content-Type" in the request header to mark the file type.
DELETE /jars/:jarid DELETE /artifacts/:artifactid Delete files according to “artifactid”.
GET /jars/:jarid/plan GET /artifacts/:artifactid/plan We will introduce a new optional parameter:pythonEntryModule.It allows specifying a python entry point module other than main.py.
POST /jars/:jarid/run POST /artifacts/:artifactid/run We also introduce the optional pythonEntryModule parameter here, as same as above.

Brief change log

  • 2c4d575: Changes the REST API prefix from /jars to /artifacts as we will not only support upload jar files, but also Python resource files. This commit also refactor the class name prefix from JarXXX to ArtifactXXX
  • c286149: Updates the web to use the new /artifacts REST API instead of the /jars REST API
  • b516468 Supports to submit Python Table API jobs. It supports to submit Python Table API jobs via resource files such as .py, .zip and .egg.

Verifying this change

This change added tests and can be verified as follows:

  • Added tests ArtifactHandlerUtilsTest.testExtractContainedLibraries, ArtifactXXXTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented May 24, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❗ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@sunjincheng121
Copy link
Member

Thanks for your PR. @dianfu!
I have merged the https://issues.apache.org/jira/browse/FLINK-12327, so please rebase the code :) then I'll review this PR ASAP.
Best,
Jincheng

@dianfu
Copy link
Contributor Author

dianfu commented May 28, 2019

@sunjincheng121 Great, I have rebased the PR.

@sunjincheng121
Copy link
Member

@flinkbot attention @tillrohrmann @zentol

@tillrohrmann
Copy link
Contributor

tillrohrmann commented Jun 4, 2019

Since you are changing part of Flink's public API, how did you check that we did not break backwards compatibility @dianfu, @sunjincheng121? It should still be possible to submit jobs with a 1.8 client against a 1.9 cluster.

@sunjincheng121
Copy link
Member

sunjincheng121 commented Jun 4, 2019

That's a good point, @tillrohrmann! We use the org.apache.flink.runtime.rest.compatibility.RestAPIStabilityTest to ensure compatibility. and in current PR should keep the old APIs there but mark Deprecated for the old APIs. Do you think this is enough? Or we still need to do other extra work ?

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to rework this PR a bit.
To ensure better backwards compatibility I would suggest to not touch existing APIs/handlers at all, and have the new APIs/handlers be completely separate from the old ones. This implies reverting the renaming of files and any modifications done, specifically to the JarRequestBody which actually breaks backwards-compatibilty. All classes from the old API can be deprecated.

@dianfu
Copy link
Contributor Author

dianfu commented Jun 4, 2019

@tillrohrmann Thanks a lot for your reply. As mentioned by @sunjincheng121 that the test case RestAPIStabilityTest can make sure the backwards compatibility. Besides, I have also manually checked that the old REST API still works by the following way:

  1. Only apply the first commit which adds REST API /artifacts/ and deprecates the old REST API /jars/
  2. Build the project, start up a local cluster and check that the functionalities such as uploading jars, getting plans, submitting jobs works well.
  3. As only the first commit is applied, the frontend still uses the old REST API. This makes sure that the old REST API still works.

@dianfu
Copy link
Contributor Author

dianfu commented Jun 4, 2019

@zentol I think your suggestions make sense to me. Will updated the PR accordingly. Thanks a lot.

@zentol
Copy link
Contributor

zentol commented Jun 4, 2019

@dianfu In your manual test, did you not include commit #3? That one modifies the API as well, and from what I can tell at a glance should also impact the legacy /jar endpoints.
Now that we separate things though this should no longer matter.

@dianfu
Copy link
Contributor Author

dianfu commented Jun 4, 2019

@zentol The manual test does not include the third commit. Definitely agree that separating the new API with the old API will be more safer for backwards compatibility. :)

@dianfu
Copy link
Contributor Author

dianfu commented Jun 4, 2019

@zentol @tillrohrmann I have updated the PR and separated the newly added API with the old ones per @zentol 's suggestion. Could you help to take a look at? Appreciated!

@tillrohrmann
Copy link
Contributor

RestAPIStabilityTest.testDispatcherRestAPIStability is failing the build @dianfu.

@dianfu
Copy link
Contributor Author

dianfu commented Jun 6, 2019

@tillrohrmann Thanks a lot for the remind and have addressed the test failure.

Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updated @dianfu!
I have left a few suggestions, and two question as follows:

  • At present, there are some ArtifactXXX and JarXXX content is almost 90% identical. The reason we directly copy one is to facilitate the next release deletion? such as ArtifactListInfo and JarListInfo and ArtifactRequestBody and JarRequestBody, (only add new property dependentArtifactId) etc.
  • For the test case, we just rename the JarxxxTest to ArtifactxxxTest and test the Artifactxxx logic. But how do we deal with the logic test for Jarxxx? if we want to do some bugfix for Jarxxx?
    Please let me know what do you think?
    Best, Jincheng


@Override
public String getTargetRestEndpointURL() {
return "/artifacts/:" + ArtifactIdPathParameter.KEY + "/plan";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a constant for artifacts due to there are many places using this string? such as ArtifactDeleteHeaders, ArtifactListHeadersetc.
I know the old one AbstractJarPlanHeaders also using the jars string directly, but I
think we can do some improvement if it makes sense to us. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. +1


@Nullable
@JsonIgnore
public String getDependentArtifactId() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we should add this new method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter allows to specify a separate jar file as the dependency. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know we should have a way to let the user upload their fat JAR for Python Job. Furthermore, we also need a way to let the user upload the resources of pythons files, so we need a ZIP folder structure, such as :
zip:
/lib ---> for JAR and python files ZIPs.
/xx/wordcount.py ---> user code
What do you think?

return handler.getMessageHeaders().getUnresolvedMessageParameters();
}

@Override
JarPlanMessageParameters getJarMessageParameters(ProgramArgsParType programArgsParType) {
final JarPlanMessageParameters parameters = getUnresolvedJarMessageParameters();
ArtifactPlanMessageParameters getJarMessageParameters(ProgramArgsParType programArgsParType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getJarMessageParameters -> getArtifactMessageParameters?

*/
public class JarPlanHandlerParameterTest extends JarHandlerParameterTest<JarPlanRequestBody, JarPlanMessageParameters> {
private static JarPlanHandler handler;
public class ArtifactPlanHandlerParameterTest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace all jar with Artifact? such as getWrongJarMessageParameters -> getWrongArtifactMessageParameters?

@@ -94,27 +95,28 @@ JarPlanMessageParameters getWrongJarMessageParameters(ProgramArgsParType program
}

@Override
JarPlanRequestBody getDefaultJarRequestBody() {
return new JarPlanRequestBody();
ArtifactPlanRequestBody getDefaultJarRequestBody() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getDefaultJarRequestBody -> getDefaultArtifactRequestBody

@@ -154,7 +154,7 @@ export class SubmitComponent implements OnInit, OnDestroy {
}

constructor(
private jarService: JarService,
private artifactService: ArtifactService,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 152: trackJarBy -> trackArtifactBy?

<tr (click)="expandJar(jar)" class="clickable">
<td>{{jar.name}}</td>
<td>{{jar.uploaded | date:'yyyy-MM-dd, HH:mm:ss'}}</td>
<ng-container *ngFor="let artifact of listOfJar; trackBy:trackJarBy;">
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trackJarBy -> trackArtifactBy

*/
deleteJar(jar: JarFilesItemInterface) {
this.jarService.deleteJar(jar.id).subscribe(() => {
deleteArtifact(artifact: JarFilesItemInterface) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JarFilesItemInterface -> ArtifactFilesItemInterface?

@@ -23,7 +23,7 @@ import { Router } from '@angular/router';
import { JarFilesItemInterface } from 'interfaces';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we rename the JarFilesItemInterface to ArtifactFilesItemInterface in jar.js?

loadJarList() {
return this.httpClient.get<JarListInterface>(`${BASE_URL}/jars`).pipe(
loadArtifactList() {
return this.httpClient.get<JarListInterface>(`${BASE_URL}/artifacts`).pipe(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename the JarListInterface to ArtifactListInterface in jar.js?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. +1

@dianfu
Copy link
Contributor Author

dianfu commented Jun 10, 2019

@sunjincheng121 Thanks a lot for the review.

Regarding to question 1, it's to ensure better backwards compatibility per @zentol 's suggestion.
Regarding to question 2, adding separate test cases for ArtifactXXX seems reasonable for me. We can remove the JarXXX and the corresponding test cases in the next releases together.

Will update the PR later on.

@dianfu
Copy link
Contributor Author

dianfu commented Jun 11, 2019

@sunjincheng121 Have updated the PR according to your comments. Looking forwarding to your feedback.

Copy link
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick update @dianfu!
I think one important thing we should fix in this JIRA is how to upload the user JAR and python files. I left one comment about this functionality. i.e. we need a ZIP folder structure, such as :
zip:
/lib ---> for JAR and python files ZIPs.
/xx/wordcount.py ---> user code
What do you think?
Best,
Jincheng


@Nullable
@JsonIgnore
public String getDependentArtifactId() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I know we should have a way to let the user upload their fat JAR for Python Job. Furthermore, we also need a way to let the user upload the resources of pythons files, so we need a ZIP folder structure, such as :
zip:
/lib ---> for JAR and python files ZIPs.
/xx/wordcount.py ---> user code
What do you think?

@dianfu
Copy link
Contributor Author

dianfu commented Jun 11, 2019

@sunjincheng121 I think packaging the dependencies into the zip file makes much sense to me. I have updated the PR accordingly.

@sunjincheng121
Copy link
Member

I found the time out error of CI, so I have restarted the CI.

@tillrohrmann
Copy link
Contributor

Before diving into the review of this rather large PR, I wanted to ask whether the web based submission is a crucial feature for Flink 1.9 or not? I'm wondering whether the client based submission of Python programs wouldn't be enough as a first step.

@dianfu
Copy link
Contributor Author

dianfu commented Jun 12, 2019

@tillrohrmann I agree that client based submission can meet most requirements and I'm not sure if there are users which only use REST API to submit jobs. Anyway, It's cool if we can support the submission of Python jobs for all kinds of users in 1.9.
Regarding to the large PR, actually most changes are copying the JarXXX to ArtifactXXX which are all in the first commit. The changes of the last two commits are pretty small.

@sunjincheng121
Copy link
Member

@tillrohrmann this feature not the crucial feature for Flink 1.9, it's not must-have for Flink 1.9. but I also suggest adding this change to Flink 1.9, due to the following reasons:

  • API refactoring: this PR is pretty big like you said, but most of the change is refactoring, i.e.
    the change is: jars -> artifacts. And I think if we ensure that the change of jars -> artifacts is the right thing, I suggest do this change in Flink 1.9. :)
  • REST API for python: I think you are right, without REST API we can using the client to submit the Python Job, but we only need to do a small change for the support this feature(only
    300+ line change). I suggest adding this to Flink 1.9. :) But I agree with you that this is nice to have, not must-have.

image

What do you think?

@sunjincheng121
Copy link
Member

@tillrohrmann do you still think we should not merge this change into 1.9? just double check and confirm. :)

@elanv
Copy link

elanv commented Nov 20, 2020

How is it going? I want to submit a python job inside application, and it would be great if REST API supports it.

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 20, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@dianfu
Copy link
Contributor Author

dianfu commented Nov 23, 2020

@elanv The REST API is still not supported as we want to add support of client based submission as the first step. However, it makes sense to consider the REST API support now as more and more functionalities have already been supported for PyFlink and more and more users are trying it out. As 1.12 is already feature freeze and so I think we could not make it in the 1.12. I will revisit this work and hope we could support it in 1.13.

@tillrohrmann tillrohrmann removed their request for review June 29, 2022 09:38
@ikstewa
Copy link

ikstewa commented Sep 17, 2023

Is there still plans to support pyflink job submission via REST api?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants