-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators #12037
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit ab6e6c9 (Fri May 08 11:14:48 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
cc @becketqin @StephanEwen @godfreyhe please help for the review, thanks a lot~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @tsreaper. I left a few of comments
|
||
assertThat(future, futureFailedWith(IllegalArgumentException.class)); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a test for scenario: "operator id not found"
((CoordinationResponser) coordinator).handleCoordinationRequest(request)); | ||
} else if (coordinator != null) { | ||
return FutureUtils.completedExceptionally( | ||
new IllegalArgumentException("Coordinator of operator " + operator + " cannot handle client event")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should throw FlinkException
, this situation is the same as the operation does not exist
.getSerializedCoordinationResponse() | ||
.deserializeValue(getClass().getClassLoader()); | ||
} catch (IOException | ClassNotFoundException e) { | ||
throw new RuntimeException("Failed to deserialize coordination response", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw CompletionException
like getAccumulators
method
* Coordinator interface which can handle {@link CoordinationRequest}s | ||
* and response with {@link CoordinationResponse}s to the client. | ||
*/ | ||
public interface CoordinationResponser { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to CoordinationHandler
? similar to OperatorEventHandler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CoordinationRequestHandler
seems to be better
* Client interface which sends out a {@link CoordinationRequest} and | ||
* expects for a {@link CoordinationResponse} from a {@link OperatorCoordinator}. | ||
*/ | ||
public interface CoordinationRequester { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to CoordinationRequestGateway
? similar to OperatorEventGateway
* Client interface which sends out a {@link CoordinationRequest} and | ||
* expects for a {@link CoordinationResponse} from a {@link OperatorCoordinator}. | ||
*/ | ||
public interface CoordinationRequestGateway { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this interface is necessary, it seems we can add the method to JobClient
. You can see almost each time when a class implements JobClient
, it will also implement this interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coordinators are used internally so we do not want to expose this to the users. As JobClient
is a public interface, we prefer not to modify it.
try { | ||
return new ClientCoordinationResponseBody(new SerializedValue<>(coordinationResponse)); | ||
} catch (IOException e) { | ||
throw new RuntimeException("Failed to construct response body", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw RestHandlerException
instead? Will the RuntimeException
crash the rest server?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't. See JobAccumulatorsHandler
for similar behaviors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing RuntimeExceptions is an implementation error. REST handlers should always throw a RestHandlerException, as only then a proper error is returned to the user, instead of a generic 500 Internal Server Error.
@flinkbot run azure |
* if the task is not running, or no operator/coordinator exists for the given ID, | ||
* or the coordinator cannot handle client events. | ||
*/ | ||
CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should add to JobMasterOperatorEventGateway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. JobMasterOperatorEventGateway
is for OperatorEvent
s. CoordinationRequest
and CoordinationResponse
are different from OperatorEvent
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for creating this PR @tsreaper. Nice work! The changes look good to me. I had a couple of minor comments which would be good to be addressed before we merge this PR. Once this is done and AZP gives green light, we can merge it.
return responseFuture.thenApply( | ||
coordinationResponse -> { | ||
try { | ||
return new ClientCoordinationResponseBody(new SerializedValue<>(coordinationResponse)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we wrapping coordinationResponse
into a SerializedValue
here? If it is in order to support user code classes, then it should not work because ClientCoordinationHandler
should not have access to the user code class loader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wrapping would have to happen on a component which has access to the user code classes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because CoordinationResponse
is an object and should be serialized in order to be passed back to the client through the REST API. Is there a more proper way to achieve this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I guess we will use another layer of SerializedValue
if the user requests a user code object from the OperatorCoordinator
which is then initialized on the JM.
@@ -163,6 +165,56 @@ public void taskTaskManagerFailuresAreReportedBack() throws Exception { | |||
assertThat(result, futureFailedWith(TestException.class)); | |||
} | |||
|
|||
@Test | |||
@SuppressWarnings("unchecked") | |||
public void testDeliveringClientRequestToResponser() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void testDeliveringClientRequestToResponser() throws Exception { | |
public void testDeliveringClientRequestToResponder() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be testDeliveringClientRequestToRequestHandler
now as the name of the interface is changed to CoordinationRequestHandler
.
} | ||
|
||
@Test(expected = FlinkException.class) | ||
public void testDeliveringClientRequestToNonResponser() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void testDeliveringClientRequestToNonResponser() throws Exception { | |
public void testDeliveringClientRequestToNonResponder() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be testDeliveringClientRequestToNonRequestHandler
now as the name of the interface is changed to CoordinationRequestHandler
.
} catch (FlinkException e) { | ||
Assert.assertTrue(e.getMessage().contains("cannot handle client event")); | ||
throw e; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could use CommonTestUtils.assertThrows
here.
Assert.assertTrue(e.getMessage().contains("does not exist")); | ||
throw e; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here with CommonTestUtils.assertThrows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we intentionally not regenerating the REST documentation?
Yes. As coordinators are used only internally, we're not going to expose this to the users so no user documentation is updated. I've updated |
I like the idea to have an internal REST API which we don't expose to the user because it is not intended for public use. |
@flinkbot run azure |
True. Thanks for the information. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with having internal REST API's, so if #12141 is merged beforehand and the headers are marked accordingly then I'm fine with it.
…PI between the clients and the operator coordinators
…Future in case of costly handling
@tillrohrmann @zentol I've rebased onto master and marked |
Thanks @tsreaper. Merging this PR once AZP gives green light. |
AZP passed, I'm merging this then I can start to review another PR based on this. |
What is the purpose of the change
This PR is part of the FLINK-14807 which is going to introduce a collecting method for tables. See here for the whole design document.
To allow the clients to receive results from the coordinator, which acts as a proxy between the clients and the sinks, a communication protocol should be introduced between the clients and the coordinators. This PR introduce this new communication protocol through the REST API.
Note that as coordinators are internally used only, we're not going to expose this REST API to the user, thus no user document is provided.
Brief change log
Verifying this change
The changes can be verified by running the added unit tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation