-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-5355] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector #3078
Conversation
Thank you for opening a pull request for this @skidder ! I think So, instead of simply catching a What do you think? |
Thanks @tzulitai , great feedback! The The AWS SDK will assign the Perhaps we can perform exponential-backoff for exceptions where:
All other exceptions can be thrown up. What are your thoughts? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @skidder,
Really sorry for the late response, this PR somehow slipped out of my attention (please feel free to ping me if I'm apparently not responding in the future :))
Thank you for the detailed explanation! The proposal seems very reasonable to me.
I have some last minor comments on the code style consistency, once those are addressed the code is ready to be merged :)
* exponential-backoff | ||
* | ||
* @param ex | ||
* Exception to inspect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the Javadoc formatting here is inconsistent with the other methods (line change).
return false; | ||
} | ||
|
||
switch (ex.getErrorType()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The indentation for the cases here seem to be missing.
|
||
switch (ex.getErrorType()) { | ||
case Client: | ||
if (ex instanceof ProvisionedThroughputExceededException) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'll probably be cleaner to just do ex instanceof ProvisionedThroughputExceededException
|
||
/** | ||
* Test for methods in the KinesisProxy class. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra empty line
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; | ||
|
||
/** | ||
* Test for methods in the KinesisProxy class. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should link the KinesisProxy
referencing, like other Javadocs.
|
||
package org.apache.flink.streaming.connectors.kinesis.proxy; | ||
|
||
import static org.junit.Assert.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Flink we generally try to avoid asterisk imports.
The style check doesn't actually check the test scope, but it'll be good to try to be consistent with the style rules in tests also.
Thank you @tzulitai for the feedback on the styling! I've pushed a commit that addresses your comments. |
Thank you @skidder for addressing the comments and the contribution! Merging .. |
…nesis Streaming Connector This closes #3078.
…nesis Streaming Connector This closes apache#3078.
…nesis Streaming Connector This closes apache#3078.
My Flink job that consumes from a Kinesis stream must be restarted at least once daily due to an uncaught AmazonKinesisException when reading from Kinesis. The complete stacktrace looks like:
It's interesting that the Kinesis endpoint returned a 500 status code, but that's outside the scope of this issue.
I think we can handle this exception in the same manner as a ProvisionedThroughputException: performing an exponential backoff and retrying a finite number of times before throwing an exception.