Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector #3078

Closed
wants to merge 5 commits into from

Conversation

skidder
Copy link
Contributor

@skidder skidder commented Jan 6, 2017

My Flink job that consumes from a Kinesis stream must be restarted at least once daily due to an uncaught AmazonKinesisException when reading from Kinesis. The complete stacktrace looks like:

com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 500; Error Code: InternalFailure; Request ID: dc1b7a1a-1b97-1a32-8cd5-79a896a55223)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1545)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1183)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:964)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:676)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:650)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:633)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$300(AmazonHttpClient.java:601)
	at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:583)
	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:447)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1747)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1723)
	at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:858)
	at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:193)
	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:268)
	at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:176)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

It's interesting that the Kinesis endpoint returned a 500 status code, but that's outside the scope of this issue.

I think we can handle this exception in the same manner as a ProvisionedThroughputException: performing an exponential backoff and retrying a finite number of times before throwing an exception.

@tzulitai
Copy link
Contributor

tzulitai commented Jan 8, 2017

Thank you for opening a pull request for this @skidder !

I think AmazonServiceException will also be thrown for client errors (ex., wrong user provided parameters for API calls, such as invalid access key). Whether an exception is caused by client or server is indicated by AmazonServiceException#getErrorType() which returns either ErrorType.Client, ErrorType.Server, or ErrorType.Unknown

So, instead of simply catching a AmazonServiceException, I wonder if we should only perform exponential backoff for service exceptions with Server and Unknown error types. We need to also see what type ProvisionedThroughputExceededException is, and see if it is included as a Server or Unknown; if not, handle that also.

What do you think?

@skidder
Copy link
Contributor Author

skidder commented Jan 9, 2017

Thanks @tzulitai , great feedback!

The ProvisionedThroughputExceededException exception will be reported with an HTTP 400 response status code:
http:https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#API_GetRecords_Errors

The AWS SDK will assign the ErrorType.Client type to all exceptions with an HTTP status code less than 500 (AWS SDK source).

Perhaps we can perform exponential-backoff for exceptions where:

  • Client error of type ProvisionedThroughputExceededException
  • All Service errors (e.g. HTTP 500, 503)
  • All Unknown errors (appear to be limited to errors unmarshalling the Kinesis service response)

All other exceptions can be thrown up.

What are your thoughts?

Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @skidder,
Really sorry for the late response, this PR somehow slipped out of my attention (please feel free to ping me if I'm apparently not responding in the future :))

Thank you for the detailed explanation! The proposal seems very reasonable to me.

I have some last minor comments on the code style consistency, once those are addressed the code is ready to be merged :)

* exponential-backoff
*
* @param ex
* Exception to inspect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the Javadoc formatting here is inconsistent with the other methods (line change).

return false;
}

switch (ex.getErrorType()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation for the cases here seem to be missing.


switch (ex.getErrorType()) {
case Client:
if (ex instanceof ProvisionedThroughputExceededException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll probably be cleaner to just do ex instanceof ProvisionedThroughputExceededException


/**
* Test for methods in the KinesisProxy class.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra empty line

import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;

/**
* Test for methods in the KinesisProxy class.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should link the KinesisProxy referencing, like other Javadocs.


package org.apache.flink.streaming.connectors.kinesis.proxy;

import static org.junit.Assert.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Flink we generally try to avoid asterisk imports.
The style check doesn't actually check the test scope, but it'll be good to try to be consistent with the style rules in tests also.

@skidder
Copy link
Contributor Author

skidder commented Jan 19, 2017

Thank you @tzulitai for the feedback on the styling! I've pushed a commit that addresses your comments.

@tzulitai
Copy link
Contributor

tzulitai commented Jan 20, 2017

Thank you @skidder for addressing the comments and the contribution! Merging ..

@asfgit asfgit closed this in b380bd3 Jan 20, 2017
asfgit pushed a commit that referenced this pull request Jan 20, 2017
alpinegizmo pushed a commit to alpinegizmo/flink that referenced this pull request Jan 20, 2017
joseprupi pushed a commit to joseprupi/flink that referenced this pull request Feb 12, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants