Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-14807][runtime][client] Introduce communication through REST API between the clients and the operator coordinators #12037

Merged
merged 7 commits into from
May 14, 2020

Conversation

tsreaper
Copy link
Contributor

@tsreaper tsreaper commented May 8, 2020

What is the purpose of the change

This PR is part of the FLINK-14807 which is going to introduce a collecting method for tables. See here for the whole design document.

To allow the clients to receive results from the coordinator, which acts as a proxy between the clients and the sinks, a communication protocol should be introduced between the clients and the coordinators. This PR introduce this new communication protocol through the REST API.

Note that as coordinators are internally used only, we're not going to expose this REST API to the user, thus no user document is provided.

Brief change log

  • Introduce a REST API to allow the communication between clients and coordinators.

Verifying this change

The changes can be verified by running the added unit tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: yes (JobManager)
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? not documented

@flinkbot
Copy link
Collaborator

flinkbot commented May 8, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit ab6e6c9 (Fri May 08 11:14:48 UTC 2020)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented May 8, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

@tsreaper
Copy link
Contributor Author

tsreaper commented May 9, 2020

cc @becketqin @StephanEwen @godfreyhe please help for the review, thanks a lot~

Copy link
Contributor

@godfreyhe godfreyhe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @tsreaper. I left a few of comments


assertThat(future, futureFailedWith(IllegalArgumentException.class));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a test for scenario: "operator id not found"

((CoordinationResponser) coordinator).handleCoordinationRequest(request));
} else if (coordinator != null) {
return FutureUtils.completedExceptionally(
new IllegalArgumentException("Coordinator of operator " + operator + " cannot handle client event"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should throw FlinkException, this situation is the same as the operation does not exist

.getSerializedCoordinationResponse()
.deserializeValue(getClass().getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Failed to deserialize coordination response", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw CompletionException like getAccumulators method

* Coordinator interface which can handle {@link CoordinationRequest}s
* and response with {@link CoordinationResponse}s to the client.
*/
public interface CoordinationResponser {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to CoordinationHandler ? similar to OperatorEventHandler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CoordinationRequestHandler seems to be better

* Client interface which sends out a {@link CoordinationRequest} and
* expects for a {@link CoordinationResponse} from a {@link OperatorCoordinator}.
*/
public interface CoordinationRequester {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to CoordinationRequestGateway? similar to OperatorEventGateway

* Client interface which sends out a {@link CoordinationRequest} and
* expects for a {@link CoordinationResponse} from a {@link OperatorCoordinator}.
*/
public interface CoordinationRequestGateway {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this interface is necessary, it seems we can add the method to JobClient. You can see almost each time when a class implements JobClient, it will also implement this interface

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coordinators are used internally so we do not want to expose this to the users. As JobClient is a public interface, we prefer not to modify it.

try {
return new ClientCoordinationResponseBody(new SerializedValue<>(coordinationResponse));
} catch (IOException e) {
throw new RuntimeException("Failed to construct response body", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw RestHandlerException instead? Will the RuntimeException crash the rest server?

Copy link
Contributor Author

@tsreaper tsreaper May 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't. See JobAccumulatorsHandler for similar behaviors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing RuntimeExceptions is an implementation error. REST handlers should always throw a RestHandlerException, as only then a proper error is returned to the user, instead of a generic 500 Internal Server Error.

@tsreaper
Copy link
Contributor Author

@flinkbot run azure

* if the task is not running, or no operator/coordinator exists for the given ID,
* or the coordinator cannot handle client events.
*/
CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should add to JobMasterOperatorEventGateway

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. JobMasterOperatorEventGateway is for OperatorEvents. CoordinationRequest and CoordinationResponse are different from OperatorEvent.

@tillrohrmann tillrohrmann self-assigned this May 13, 2020
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for creating this PR @tsreaper. Nice work! The changes look good to me. I had a couple of minor comments which would be good to be addressed before we merge this PR. Once this is done and AZP gives green light, we can merge it.

return responseFuture.thenApply(
coordinationResponse -> {
try {
return new ClientCoordinationResponseBody(new SerializedValue<>(coordinationResponse));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we wrapping coordinationResponse into a SerializedValue here? If it is in order to support user code classes, then it should not work because ClientCoordinationHandler should not have access to the user code class loader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wrapping would have to happen on a component which has access to the user code classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because CoordinationResponse is an object and should be serialized in order to be passed back to the client through the REST API. Is there a more proper way to achieve this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I guess we will use another layer of SerializedValue if the user requests a user code object from the OperatorCoordinator which is then initialized on the JM.

@@ -163,6 +165,56 @@ public void taskTaskManagerFailuresAreReportedBack() throws Exception {
assertThat(result, futureFailedWith(TestException.class));
}

@Test
@SuppressWarnings("unchecked")
public void testDeliveringClientRequestToResponser() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void testDeliveringClientRequestToResponser() throws Exception {
public void testDeliveringClientRequestToResponder() throws Exception {

Copy link
Contributor Author

@tsreaper tsreaper May 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be testDeliveringClientRequestToRequestHandler now as the name of the interface is changed to CoordinationRequestHandler.

}

@Test(expected = FlinkException.class)
public void testDeliveringClientRequestToNonResponser() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void testDeliveringClientRequestToNonResponser() throws Exception {
public void testDeliveringClientRequestToNonResponder() throws Exception {

Copy link
Contributor Author

@tsreaper tsreaper May 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be testDeliveringClientRequestToNonRequestHandler now as the name of the interface is changed to CoordinationRequestHandler.

Comment on lines 195 to 198
} catch (FlinkException e) {
Assert.assertTrue(e.getMessage().contains("cannot handle client event"));
throw e;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could use CommonTestUtils.assertThrows here.

Comment on lines 213 to 215
Assert.assertTrue(e.getMessage().contains("does not exist"));
throw e;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here with CommonTestUtils.assertThrows

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we intentionally not regenerating the REST documentation?

@tsreaper
Copy link
Contributor Author

tsreaper commented May 14, 2020

Are we intentionally not regenerating the REST documentation?

Yes. As coordinators are used only internally, we're not going to expose this to the users so no user documentation is updated.

I've updated RestAPIDocGenerator in FLINK-17680 and #12141 , please take a look.

@tillrohrmann
Copy link
Contributor

I like the idea to have an internal REST API which we don't expose to the user because it is not intended for public use.

@tillrohrmann
Copy link
Contributor

@flinkbot run azure

@tsreaper
Copy link
Contributor Author

tsreaper commented May 14, 2020

@flinkbot run azure

It has already finished, see here, all tests have passed except e2e tests which are cancelled.

@tillrohrmann
Copy link
Contributor

True. Thanks for the information.

Copy link
Contributor

@zentol zentol left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with having internal REST API's, so if #12141 is merged beforehand and the headers are marked accordingly then I'm fine with it.

@tillrohrmann
Copy link
Contributor

@tsreaper I have merged #12141. You could now rebase this PR on the current master and add the @ExcludeFromDocumentation annotation.

@tsreaper
Copy link
Contributor Author

@tillrohrmann @zentol I've rebased onto master and marked ClientCoordinationHeaders with @ExcludeFromDocumentation, please take another look. Thanks.

@tillrohrmann
Copy link
Contributor

Thanks @tsreaper. Merging this PR once AZP gives green light.

@KurtYoung
Copy link
Contributor

AZP passed, I'm merging this then I can start to review another PR based on this.

@KurtYoung KurtYoung merged commit 9fe920f into apache:master May 14, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants