-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12541][REST] Support to submit Python Table API jobs via REST API #8532
base: master
Are you sure you want to change the base?
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
Thanks for your PR. @dianfu! |
@sunjincheng121 Great, I have rebased the PR. |
@flinkbot attention @tillrohrmann @zentol |
8d05d07
to
569010b
Compare
Since you are changing part of Flink's public API, how did you check that we did not break backwards compatibility @dianfu, @sunjincheng121? It should still be possible to submit jobs with a |
That's a good point, @tillrohrmann! We use the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to rework this PR a bit.
To ensure better backwards compatibility I would suggest to not touch existing APIs/handlers at all, and have the new APIs/handlers be completely separate from the old ones. This implies reverting the renaming of files and any modifications done, specifically to the JarRequestBody
which actually breaks backwards-compatibilty. All classes from the old API can be deprecated.
@tillrohrmann Thanks a lot for your reply. As mentioned by @sunjincheng121 that the test case RestAPIStabilityTest can make sure the backwards compatibility. Besides, I have also manually checked that the old REST API still works by the following way:
|
@zentol I think your suggestions make sense to me. Will updated the PR accordingly. Thanks a lot. |
@zentol The manual test does not include the third commit. Definitely agree that separating the new API with the old API will be more safer for backwards compatibility. :) |
@zentol @tillrohrmann I have updated the PR and separated the newly added API with the old ones per @zentol 's suggestion. Could you help to take a look at? Appreciated! |
|
@tillrohrmann Thanks a lot for the remind and have addressed the test failure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updated @dianfu!
I have left a few suggestions, and two question as follows:
- At present, there are some
ArtifactXXX
andJarXXX
content is almost 90% identical. The reason we directly copy one is to facilitate the next release deletion? such asArtifactListInfo and JarListInfo
andArtifactRequestBody and JarRequestBody, (only add new property dependentArtifactId)
etc. - For the test case, we just rename the
JarxxxTest
toArtifactxxxTest
and test theArtifactxxx
logic. But how do we deal with the logic test forJarxxx
? if we want to do some bugfix forJarxxx
?
Please let me know what do you think?
Best, Jincheng
|
||
@Override | ||
public String getTargetRestEndpointURL() { | ||
return "/artifacts/:" + ArtifactIdPathParameter.KEY + "/plan"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a constant for artifacts
due to there are many places using this string? such as ArtifactDeleteHeaders
, ArtifactListHeaders
etc.
I know the old one AbstractJarPlanHeaders
also using the jars
string directly, but I
think we can do some improvement if it makes sense to us. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense to me. +1
|
||
@Nullable | ||
@JsonIgnore | ||
public String getDependentArtifactId() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we should add this new method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This parameter allows to specify a separate jar file as the dependency. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I know we should have a way to let the user upload their fat JAR for Python Job. Furthermore, we also need a way to let the user upload the resources of pythons files, so we need a ZIP folder structure, such as :
zip:
/lib ---> for JAR and python files ZIPs.
/xx/wordcount.py ---> user code
What do you think?
return handler.getMessageHeaders().getUnresolvedMessageParameters(); | ||
} | ||
|
||
@Override | ||
JarPlanMessageParameters getJarMessageParameters(ProgramArgsParType programArgsParType) { | ||
final JarPlanMessageParameters parameters = getUnresolvedJarMessageParameters(); | ||
ArtifactPlanMessageParameters getJarMessageParameters(ProgramArgsParType programArgsParType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getJarMessageParameters
-> getArtifactMessageParameters
?
*/ | ||
public class JarPlanHandlerParameterTest extends JarHandlerParameterTest<JarPlanRequestBody, JarPlanMessageParameters> { | ||
private static JarPlanHandler handler; | ||
public class ArtifactPlanHandlerParameterTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace all jar
with Artifact
? such as getWrongJarMessageParameters
-> getWrongArtifactMessageParameters
?
@@ -94,27 +95,28 @@ JarPlanMessageParameters getWrongJarMessageParameters(ProgramArgsParType program | |||
} | |||
|
|||
@Override | |||
JarPlanRequestBody getDefaultJarRequestBody() { | |||
return new JarPlanRequestBody(); | |||
ArtifactPlanRequestBody getDefaultJarRequestBody() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getDefaultJarRequestBody
-> getDefaultArtifactRequestBody
@@ -154,7 +154,7 @@ export class SubmitComponent implements OnInit, OnDestroy { | |||
} | |||
|
|||
constructor( | |||
private jarService: JarService, | |||
private artifactService: ArtifactService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 152: trackJarBy
-> trackArtifactBy
?
<tr (click)="expandJar(jar)" class="clickable"> | ||
<td>{{jar.name}}</td> | ||
<td>{{jar.uploaded | date:'yyyy-MM-dd, HH:mm:ss'}}</td> | ||
<ng-container *ngFor="let artifact of listOfJar; trackBy:trackJarBy;"> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trackJarBy
-> trackArtifactBy
*/ | ||
deleteJar(jar: JarFilesItemInterface) { | ||
this.jarService.deleteJar(jar.id).subscribe(() => { | ||
deleteArtifact(artifact: JarFilesItemInterface) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JarFilesItemInterface
-> ArtifactFilesItemInterface
?
@@ -23,7 +23,7 @@ import { Router } from '@angular/router'; | |||
import { JarFilesItemInterface } from 'interfaces'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we rename the JarFilesItemInterface
to ArtifactFilesItemInterface
in jar.js
?
loadJarList() { | ||
return this.httpClient.get<JarListInterface>(`${BASE_URL}/jars`).pipe( | ||
loadArtifactList() { | ||
return this.httpClient.get<JarListInterface>(`${BASE_URL}/artifacts`).pipe( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename the JarListInterface
to ArtifactListInterface
in jar.js
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. +1
@sunjincheng121 Thanks a lot for the review. Regarding to question 1, it's to ensure better backwards compatibility per @zentol 's suggestion. Will update the PR later on. |
@sunjincheng121 Have updated the PR according to your comments. Looking forwarding to your feedback. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick update @dianfu!
I think one important thing we should fix in this JIRA is how to upload the user JAR and python files. I left one comment about this functionality. i.e. we need a ZIP folder structure, such as :
zip:
/lib ---> for JAR and python files ZIPs.
/xx/wordcount.py ---> user code
What do you think?
Best,
Jincheng
|
||
@Nullable | ||
@JsonIgnore | ||
public String getDependentArtifactId() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I know we should have a way to let the user upload their fat JAR for Python Job. Furthermore, we also need a way to let the user upload the resources of pythons files, so we need a ZIP folder structure, such as :
zip:
/lib ---> for JAR and python files ZIPs.
/xx/wordcount.py ---> user code
What do you think?
@sunjincheng121 I think packaging the dependencies into the zip file makes much sense to me. I have updated the PR accordingly. |
I found the time out error of CI, so I have restarted the CI. |
Before diving into the review of this rather large PR, I wanted to ask whether the web based submission is a crucial feature for Flink 1.9 or not? I'm wondering whether the client based submission of Python programs wouldn't be enough as a first step. |
@tillrohrmann I agree that client based submission can meet most requirements and I'm not sure if there are users which only use REST API to submit jobs. Anyway, It's cool if we can support the submission of Python jobs for all kinds of users in 1.9. |
@tillrohrmann this feature not the crucial feature for Flink 1.9, it's not must-have for Flink 1.9. but I also suggest adding this change to Flink 1.9, due to the following reasons:
What do you think? |
@tillrohrmann do you still think we should not merge this change into 1.9? just double check and confirm. :) |
How is it going? I want to submit a python job inside application, and it would be great if REST API supports it. |
@elanv The REST API is still not supported as we want to add support of client based submission as the first step. However, it makes sense to consider the REST API support now as more and more functionalities have already been supported for PyFlink and more and more users are trying it out. As 1.12 is already feature freeze and so I think we could not make it in the 1.12. I will revisit this work and hope we could support it in 1.13. |
Is there still plans to support pyflink job submission via REST api? |
What is the purpose of the change
This pull request supports to submit Python Table API jobs via REST API.
To support submit resource files other than jar, we need to update the REST API a bit as specified in FLIP38
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation