Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10020] [kinesis] Support recoverable exceptions in listShards. #6482

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The power constant for exponential backoff between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";

/** The maximum number of listShards attempts if we get a recoverable exception. */
public static final String LIST_SHARDS_RETRIES = "flink.list.shards.maxretries";

/** The base backoff time between each listShards attempt. */
public static final String LIST_SHARDS_BACKOFF_BASE = "flink.list.shards.backoff.base";

Expand All @@ -104,7 +107,7 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";

/** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException. */
/** The maximum number of getRecords attempts if we get a recoverable exception. */
public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";

/** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
Expand Down Expand Up @@ -161,6 +164,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {

public static final double DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;

public static final int DEFAULT_LIST_SHARDS_RETRIES = 10;

public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000;

public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public class KinesisProxy implements KinesisProxyInterface {
/** Exponential backoff power constant for the list shards operation. */
private final double listShardsExpConstant;

/** Maximum retry attempts for the list shards operation. */
private final int listShardsMaxRetries;

// ------------------------------------------------------------------------
// getRecords() related performance settings
// ------------------------------------------------------------------------
Expand All @@ -104,8 +107,8 @@ public class KinesisProxy implements KinesisProxyInterface {
/** Exponential backoff power constant for the get records operation. */
private final double getRecordsExpConstant;

/** Maximum attempts for the get records operation. */
private final int getRecordsMaxAttempts;
/** Maximum retry attempts for the get records operation. */
private final int getRecordsMaxRetries;

// ------------------------------------------------------------------------
// getShardIterator() related performance settings
Expand All @@ -120,8 +123,8 @@ public class KinesisProxy implements KinesisProxyInterface {
/** Exponential backoff power constant for the get shard iterator operation. */
private final double getShardIteratorExpConstant;

/** Maximum attempts for the get shard iterator operation. */
private final int getShardIteratorMaxAttempts;
/** Maximum retry attempts for the get shard iterator operation. */
private final int getShardIteratorMaxRetries;

/**
* Create a new KinesisProxy based on the supplied configuration properties.
Expand All @@ -146,6 +149,10 @@ protected KinesisProxy(Properties configProps) {
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
this.listShardsMaxRetries = Integer.valueOf(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES)));

this.getRecordsBaseBackoffMillis = Long.valueOf(
configProps.getProperty(
Expand All @@ -159,7 +166,7 @@ protected KinesisProxy(Properties configProps) {
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
this.getRecordsMaxAttempts = Integer.valueOf(
this.getRecordsMaxRetries = Integer.valueOf(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
Expand All @@ -176,7 +183,7 @@ protected KinesisProxy(Properties configProps) {
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
this.getShardIteratorMaxAttempts = Integer.valueOf(
this.getShardIteratorMaxRetries = Integer.valueOf(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
Expand Down Expand Up @@ -217,14 +224,14 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th

GetRecordsResult getRecordsResult = null;

int attempt = 0;
while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
int retryCount = 0;
while (retryCount <= getRecordsMaxRetries && getRecordsResult == null) {
try {
getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
} catch (SdkClientException ex) {
if (isRecoverableSdkClientException(ex)) {
long backoffMillis = fullJitterBackoff(
getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, retryCount++);
LOG.warn("Got recoverable SdkClientException. Backing off for "
+ backoffMillis + " millis (" + ex.getMessage() + ")");
Thread.sleep(backoffMillis);
Expand All @@ -235,7 +242,7 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th
}

if (getRecordsResult == null) {
throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxRetries +
" retry attempts returned ProvisionedThroughputExceededException.");
}

Expand Down Expand Up @@ -292,14 +299,14 @@ public String getShardIterator(StreamShardHandle shard, String shardIteratorType
private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException {
GetShardIteratorResult getShardIteratorResult = null;

int attempt = 0;
while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
int retryCount = 0;
while (retryCount <= getShardIteratorMaxRetries && getShardIteratorResult == null) {
try {
getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
} catch (AmazonServiceException ex) {
if (isRecoverableException(ex)) {
long backoffMillis = fullJitterBackoff(
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, retryCount++);
LOG.warn("Got recoverable AmazonServiceException. Backing off for "
+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
Thread.sleep(backoffMillis);
Expand All @@ -310,7 +317,7 @@ private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest)
}

if (getShardIteratorResult == null) {
throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts +
throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxRetries +
" retry attempts returned ProvisionedThroughputExceededException.");
}
return getShardIteratorResult.getShardIterator();
Expand Down Expand Up @@ -406,16 +413,16 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha
ListShardsResult listShardsResults = null;

// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
int attemptCount = 0;
int retryCount = 0;
// List Shards returns just the first 1000 shard entries. Make sure that all entries
// are taken up.
while (listShardsResults == null) { // retry until we get a result
while (retryCount <= listShardsMaxRetries && listShardsResults == null) { // retry until we get a result
try {

listShardsResults = kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
+ ". Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
Expand All @@ -433,6 +440,18 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha
} catch (ExpiredNextTokenException expiredToken) {
LOG.warn("List Shards has an expired token. Reusing the previous state.");
break;
} catch (SdkClientException ex) {
if (retryCount < listShardsMaxRetries && isRecoverableSdkClientException(ex)) {
long backoffMillis = fullJitterBackoff(
listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.",
streamName, backoffMillis);
Thread.sleep(backoffMillis);
} else {
// propagate if retries exceeded or not recoverable
// (otherwise would return null result and keep trying forever)
throw ex;
}
}
}
// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,24 @@
import com.amazonaws.AmazonServiceException.ErrorType;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.AmazonKinesisException;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Shard;
import org.apache.commons.lang3.mutable.MutableInt;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;

import java.util.ArrayList;
Expand All @@ -54,6 +62,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -91,6 +100,37 @@ public void testIsRecoverableExceptionWithNullErrorType() {
assertFalse(KinesisProxy.isRecoverableException(ex));
}

@Test
public void testGetRecordsRetry() throws Exception {
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");

final GetRecordsResult expectedResult = new GetRecordsResult();
MutableInt retries = new MutableInt();
final Throwable[] retriableExceptions = new Throwable[] {
new AmazonKinesisException("mock"),
};

AmazonKinesisClient mockClient = mock(AmazonKinesisClient.class);
Mockito.when(mockClient.getRecords(any())).thenAnswer(new Answer<GetRecordsResult>() {
@Override
public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable{
if (retries.intValue() < retriableExceptions.length) {
retries.increment();
throw retriableExceptions[retries.intValue() - 1];
}
return expectedResult;
}
});

KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient);

GetRecordsResult result = kinesisProxy.getRecords("fakeShardIterator", 1);
assertEquals(retriableExceptions.length, retries.intValue());
assertEquals(expectedResult, result);
}

@Test
public void testGetShardList() throws Exception {
List<String> shardIds =
Expand Down Expand Up @@ -151,6 +191,60 @@ public void testGetShardList() throws Exception {
expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()])));
}

@Test
public void testGetShardListRetry() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to also have a test where we exceed the number of configured retries? In that case we should not get any result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, expanded the test to cover this.

Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");

Shard shard = new Shard();
shard.setShardId("fake-shard-000000000000");
final ListShardsResult expectedResult = new ListShardsResult();
expectedResult.withShards(shard);

MutableInt exceptionCount = new MutableInt();
final Throwable[] retriableExceptions = new Throwable[]{
new AmazonKinesisException("attempt1"),
new AmazonKinesisException("attempt2"),
};

AmazonKinesisClient mockClient = mock(AmazonKinesisClient.class);
Mockito.when(mockClient.listShards(any())).thenAnswer(new Answer<ListShardsResult>() {

@Override
public ListShardsResult answer(InvocationOnMock invocation) throws Throwable {
if (exceptionCount.intValue() < retriableExceptions.length) {
exceptionCount.increment();
throw retriableExceptions[exceptionCount.intValue() - 1];
}
return expectedResult;
}
});

KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient);

HashMap<String, String> streamNames = new HashMap();
streamNames.put("fake-stream", null);
GetShardListResult result = kinesisProxy.getShardList(streamNames);
assertEquals(retriableExceptions.length, exceptionCount.intValue());
assertEquals(true, result.hasRetrievedShards());
assertEquals(shard.getShardId(), result.getLastSeenShardOfStream("fake-stream").getShard().getShardId());

// test max attempt count exceeded
int maxRetries = 1;
exceptionCount.setValue(0);
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_RETRIES, String.valueOf(maxRetries));
kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient);
try {
kinesisProxy.getShardList(streamNames);
Assert.fail("exception expected");
} catch (SdkClientException ex) {
assertEquals(retriableExceptions[maxRetries], ex);
}
assertEquals(maxRetries + 1, exceptionCount.intValue());
}

@Test
public void testCustomConfigurationOverride() {
Properties configProps = new Properties();
Expand Down