Skip to content

Commit

Permalink
[FLINK-10020] [kinesis] Support recoverable exceptions in listShards.
Browse files Browse the repository at this point in the history
This closes apache#6482.
  • Loading branch information
tweise authored and tzulitai committed Aug 17, 2018
1 parent 0244ac8 commit 50d076a
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The power constant for exponential backoff between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";

/** The maximum number of listShards attempts if we get a recoverable exception. */
public static final String LIST_SHARDS_RETRIES = "flink.list.shards.maxretries";

/** The base backoff time between each listShards attempt. */
public static final String LIST_SHARDS_BACKOFF_BASE = "flink.list.shards.backoff.base";

Expand All @@ -104,7 +107,7 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";

/** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException. */
/** The maximum number of getRecords attempts if we get a recoverable exception. */
public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";

/** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
Expand Down Expand Up @@ -161,6 +164,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {

public static final double DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;

public static final int DEFAULT_LIST_SHARDS_RETRIES = 10;

public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000;

public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public class KinesisProxy implements KinesisProxyInterface {
/** Exponential backoff power constant for the list shards operation. */
private final double listShardsExpConstant;

/** Maximum retry attempts for the list shards operation. */
private final int listShardsMaxRetries;

// ------------------------------------------------------------------------
// getRecords() related performance settings
// ------------------------------------------------------------------------
Expand All @@ -104,8 +107,8 @@ public class KinesisProxy implements KinesisProxyInterface {
/** Exponential backoff power constant for the get records operation. */
private final double getRecordsExpConstant;

/** Maximum attempts for the get records operation. */
private final int getRecordsMaxAttempts;
/** Maximum retry attempts for the get records operation. */
private final int getRecordsMaxRetries;

// ------------------------------------------------------------------------
// getShardIterator() related performance settings
Expand All @@ -120,8 +123,8 @@ public class KinesisProxy implements KinesisProxyInterface {
/** Exponential backoff power constant for the get shard iterator operation. */
private final double getShardIteratorExpConstant;

/** Maximum attempts for the get shard iterator operation. */
private final int getShardIteratorMaxAttempts;
/** Maximum retry attempts for the get shard iterator operation. */
private final int getShardIteratorMaxRetries;

/**
* Create a new KinesisProxy based on the supplied configuration properties.
Expand All @@ -146,6 +149,10 @@ protected KinesisProxy(Properties configProps) {
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));
this.listShardsMaxRetries = Integer.valueOf(
configProps.getProperty(
ConsumerConfigConstants.LIST_SHARDS_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES)));

this.getRecordsBaseBackoffMillis = Long.valueOf(
configProps.getProperty(
Expand All @@ -159,7 +166,7 @@ protected KinesisProxy(Properties configProps) {
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT)));
this.getRecordsMaxAttempts = Integer.valueOf(
this.getRecordsMaxRetries = Integer.valueOf(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES)));
Expand All @@ -176,7 +183,7 @@ protected KinesisProxy(Properties configProps) {
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT)));
this.getShardIteratorMaxAttempts = Integer.valueOf(
this.getShardIteratorMaxRetries = Integer.valueOf(
configProps.getProperty(
ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES)));
Expand Down Expand Up @@ -217,14 +224,14 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th

GetRecordsResult getRecordsResult = null;

int attempt = 0;
while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
int retryCount = 0;
while (retryCount <= getRecordsMaxRetries && getRecordsResult == null) {
try {
getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
} catch (SdkClientException ex) {
if (isRecoverableSdkClientException(ex)) {
long backoffMillis = fullJitterBackoff(
getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, retryCount++);
LOG.warn("Got recoverable SdkClientException. Backing off for "
+ backoffMillis + " millis (" + ex.getMessage() + ")");
Thread.sleep(backoffMillis);
Expand All @@ -235,7 +242,7 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th
}

if (getRecordsResult == null) {
throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts +
throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxRetries +
" retry attempts returned ProvisionedThroughputExceededException.");
}

Expand Down Expand Up @@ -292,14 +299,14 @@ public String getShardIterator(StreamShardHandle shard, String shardIteratorType
private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException {
GetShardIteratorResult getShardIteratorResult = null;

int attempt = 0;
while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) {
int retryCount = 0;
while (retryCount <= getShardIteratorMaxRetries && getShardIteratorResult == null) {
try {
getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
} catch (AmazonServiceException ex) {
if (isRecoverableException(ex)) {
long backoffMillis = fullJitterBackoff(
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, retryCount++);
LOG.warn("Got recoverable AmazonServiceException. Backing off for "
+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
Thread.sleep(backoffMillis);
Expand All @@ -310,7 +317,7 @@ private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest)
}

if (getShardIteratorResult == null) {
throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts +
throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxRetries +
" retry attempts returned ProvisionedThroughputExceededException.");
}
return getShardIteratorResult.getShardIterator();
Expand Down Expand Up @@ -406,16 +413,16 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha
ListShardsResult listShardsResults = null;

// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
int attemptCount = 0;
int retryCount = 0;
// List Shards returns just the first 1000 shard entries. Make sure that all entries
// are taken up.
while (listShardsResults == null) { // retry until we get a result
while (retryCount <= listShardsMaxRetries && listShardsResults == null) { // retry until we get a result
try {

listShardsResults = kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
+ ". Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
Expand All @@ -433,6 +440,18 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha
} catch (ExpiredNextTokenException expiredToken) {
LOG.warn("List Shards has an expired token. Reusing the previous state.");
break;
} catch (SdkClientException ex) {
if (retryCount < listShardsMaxRetries && isRecoverableSdkClientException(ex)) {
long backoffMillis = fullJitterBackoff(
listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++);
LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.",
streamName, backoffMillis);
Thread.sleep(backoffMillis);
} else {
// propagate if retries exceeded or not recoverable
// (otherwise would return null result and keep trying forever)
throw ex;
}
}
}
// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,24 @@
import com.amazonaws.AmazonServiceException.ErrorType;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.AmazonKinesisException;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Shard;
import org.apache.commons.lang3.mutable.MutableInt;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.reflect.Whitebox;

import java.util.ArrayList;
Expand All @@ -54,6 +62,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -91,6 +100,37 @@ public void testIsRecoverableExceptionWithNullErrorType() {
assertFalse(KinesisProxy.isRecoverableException(ex));
}

@Test
public void testGetRecordsRetry() throws Exception {
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");

final GetRecordsResult expectedResult = new GetRecordsResult();
MutableInt retries = new MutableInt();
final Throwable[] retriableExceptions = new Throwable[] {
new AmazonKinesisException("mock"),
};

AmazonKinesisClient mockClient = mock(AmazonKinesisClient.class);
Mockito.when(mockClient.getRecords(any())).thenAnswer(new Answer<GetRecordsResult>() {
@Override
public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable{
if (retries.intValue() < retriableExceptions.length) {
retries.increment();
throw retriableExceptions[retries.intValue() - 1];
}
return expectedResult;
}
});

KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient);

GetRecordsResult result = kinesisProxy.getRecords("fakeShardIterator", 1);
assertEquals(retriableExceptions.length, retries.intValue());
assertEquals(expectedResult, result);
}

@Test
public void testGetShardList() throws Exception {
List<String> shardIds =
Expand Down Expand Up @@ -151,6 +191,60 @@ public void testGetShardList() throws Exception {
expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()])));
}

@Test
public void testGetShardListRetry() throws Exception {
Properties kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1");

Shard shard = new Shard();
shard.setShardId("fake-shard-000000000000");
final ListShardsResult expectedResult = new ListShardsResult();
expectedResult.withShards(shard);

MutableInt exceptionCount = new MutableInt();
final Throwable[] retriableExceptions = new Throwable[]{
new AmazonKinesisException("attempt1"),
new AmazonKinesisException("attempt2"),
};

AmazonKinesisClient mockClient = mock(AmazonKinesisClient.class);
Mockito.when(mockClient.listShards(any())).thenAnswer(new Answer<ListShardsResult>() {

@Override
public ListShardsResult answer(InvocationOnMock invocation) throws Throwable {
if (exceptionCount.intValue() < retriableExceptions.length) {
exceptionCount.increment();
throw retriableExceptions[exceptionCount.intValue() - 1];
}
return expectedResult;
}
});

KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient);

HashMap<String, String> streamNames = new HashMap();
streamNames.put("fake-stream", null);
GetShardListResult result = kinesisProxy.getShardList(streamNames);
assertEquals(retriableExceptions.length, exceptionCount.intValue());
assertEquals(true, result.hasRetrievedShards());
assertEquals(shard.getShardId(), result.getLastSeenShardOfStream("fake-stream").getShard().getShardId());

// test max attempt count exceeded
int maxRetries = 1;
exceptionCount.setValue(0);
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_RETRIES, String.valueOf(maxRetries));
kinesisProxy = new KinesisProxy(kinesisConsumerConfig);
Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient);
try {
kinesisProxy.getShardList(streamNames);
Assert.fail("exception expected");
} catch (SdkClientException ex) {
assertEquals(retriableExceptions[maxRetries], ex);
}
assertEquals(maxRetries + 1, exceptionCount.intValue());
}

@Test
public void testCustomConfigurationOverride() {
Properties configProps = new Properties();
Expand Down

0 comments on commit 50d076a

Please sign in to comment.