-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-10020] [kinesis] Support recoverable exceptions in listShards. #6482
Conversation
listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); | ||
LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.", | ||
streamName, backoffMillis); | ||
Thread.sleep(backoffMillis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering what kind of SdkClientException
s there are. Do we really need to have a backoff here before retrying?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see the JIRA for an example of such exception. These are really the same type of exceptions that we don't want getRecords to fail on and I believe we should be consistent with the backoff. Since listShards isn't latency sensitive it won't hurt to error on the conservative side.
@@ -409,7 +416,7 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha | |||
int attemptCount = 0; | |||
// List Shards returns just the first 1000 shard entries. Make sure that all entries | |||
// are taken up. | |||
while (listShardsResults == null) { // retry until we get a result | |||
while (attemptCount <= listShardsMaxAttempts && listShardsResults == null) { // retry until we get a result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The earlier contract was to wait till we get a result. https://issues.apache.org/jira/browse/FLINK-10020 does not talk about breaking this contract. I personally believe maxAttemptCount is better since listShard works in a periodic thread and we are bound to try again after 'X' seconds. Just wanted to point this out. I like this approach better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I too think that this is better since it provides the user more flexibility. Setting the retry count to max practically achieves the previous behavior. Perhaps we should up the default retry count?
@@ -151,6 +190,45 @@ public void testGetShardList() throws Exception { | |||
expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()]))); | |||
} | |||
|
|||
@Test | |||
public void testGetShardListRetry() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to also have a test where we exceed the number of configured retries? In that case we should not get any result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, expanded the test to cover this.
@tzulitai PTAL |
bf2e212
to
e8fd071
Compare
👍 |
Changes LGTM, +1, thanks @tweise. |
This change fixes the retry behavior of listShards to match what getRecords already supports. Importantly this will prevent the subtask from failing on transient listShards errors that we can identify based on well known exceptions. These are recoverable and should not lead to unnecessary recovery cycles that cause downtime.
R: @glaksh100 @jgrier @tzulitai