From 50d076ab6ad325907690a2c115ee2cb1c45775c9 Mon Sep 17 00:00:00 2001 From: Thomas Weise Date: Thu, 2 Aug 2018 17:47:34 -0700 Subject: [PATCH] [FLINK-10020] [kinesis] Support recoverable exceptions in listShards. This closes #6482. --- .../config/ConsumerConfigConstants.java | 7 +- .../kinesis/proxy/KinesisProxy.java | 53 +++++++---- .../kinesis/proxy/KinesisProxyTest.java | 94 +++++++++++++++++++ 3 files changed, 136 insertions(+), 18 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 48a0b3c955937..443b19ec382f7 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -92,6 +92,9 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The power constant for exponential backoff between each describeStream attempt. */ public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst"; + /** The maximum number of listShards attempts if we get a recoverable exception. */ + public static final String LIST_SHARDS_RETRIES = "flink.list.shards.maxretries"; + /** The base backoff time between each listShards attempt. */ public static final String LIST_SHARDS_BACKOFF_BASE = "flink.list.shards.backoff.base"; @@ -104,7 +107,7 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */ public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount"; - /** The maximum number of getRecords attempts if we get ProvisionedThroughputExceededException. */ + /** The maximum number of getRecords attempts if we get a recoverable exception. */ public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries"; /** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */ @@ -161,6 +164,8 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { public static final double DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; + public static final int DEFAULT_LIST_SHARDS_RETRIES = 10; + public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000; public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index 7e6a36044146c..262181ae3bcb2 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -91,6 +91,9 @@ public class KinesisProxy implements KinesisProxyInterface { /** Exponential backoff power constant for the list shards operation. */ private final double listShardsExpConstant; + /** Maximum retry attempts for the list shards operation. */ + private final int listShardsMaxRetries; + // ------------------------------------------------------------------------ // getRecords() related performance settings // ------------------------------------------------------------------------ @@ -104,8 +107,8 @@ public class KinesisProxy implements KinesisProxyInterface { /** Exponential backoff power constant for the get records operation. */ private final double getRecordsExpConstant; - /** Maximum attempts for the get records operation. */ - private final int getRecordsMaxAttempts; + /** Maximum retry attempts for the get records operation. */ + private final int getRecordsMaxRetries; // ------------------------------------------------------------------------ // getShardIterator() related performance settings @@ -120,8 +123,8 @@ public class KinesisProxy implements KinesisProxyInterface { /** Exponential backoff power constant for the get shard iterator operation. */ private final double getShardIteratorExpConstant; - /** Maximum attempts for the get shard iterator operation. */ - private final int getShardIteratorMaxAttempts; + /** Maximum retry attempts for the get shard iterator operation. */ + private final int getShardIteratorMaxRetries; /** * Create a new KinesisProxy based on the supplied configuration properties. @@ -146,6 +149,10 @@ protected KinesisProxy(Properties configProps) { configProps.getProperty( ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT, Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT))); + this.listShardsMaxRetries = Integer.valueOf( + configProps.getProperty( + ConsumerConfigConstants.LIST_SHARDS_RETRIES, + Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_RETRIES))); this.getRecordsBaseBackoffMillis = Long.valueOf( configProps.getProperty( @@ -159,7 +166,7 @@ protected KinesisProxy(Properties configProps) { configProps.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT, Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT))); - this.getRecordsMaxAttempts = Integer.valueOf( + this.getRecordsMaxRetries = Integer.valueOf( configProps.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_RETRIES, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_RETRIES))); @@ -176,7 +183,7 @@ protected KinesisProxy(Properties configProps) { configProps.getProperty( ConsumerConfigConstants.SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT, Double.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT))); - this.getShardIteratorMaxAttempts = Integer.valueOf( + this.getShardIteratorMaxRetries = Integer.valueOf( configProps.getProperty( ConsumerConfigConstants.SHARD_GETITERATOR_RETRIES, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETITERATOR_RETRIES))); @@ -217,14 +224,14 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th GetRecordsResult getRecordsResult = null; - int attempt = 0; - while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { + int retryCount = 0; + while (retryCount <= getRecordsMaxRetries && getRecordsResult == null) { try { getRecordsResult = kinesisClient.getRecords(getRecordsRequest); } catch (SdkClientException ex) { if (isRecoverableSdkClientException(ex)) { long backoffMillis = fullJitterBackoff( - getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); + getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, retryCount++); LOG.warn("Got recoverable SdkClientException. Backing off for " + backoffMillis + " millis (" + ex.getMessage() + ")"); Thread.sleep(backoffMillis); @@ -235,7 +242,7 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th } if (getRecordsResult == null) { - throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxAttempts + + throw new RuntimeException("Rate Exceeded for getRecords operation - all " + getRecordsMaxRetries + " retry attempts returned ProvisionedThroughputExceededException."); } @@ -292,14 +299,14 @@ public String getShardIterator(StreamShardHandle shard, String shardIteratorType private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) throws InterruptedException { GetShardIteratorResult getShardIteratorResult = null; - int attempt = 0; - while (attempt <= getShardIteratorMaxAttempts && getShardIteratorResult == null) { + int retryCount = 0; + while (retryCount <= getShardIteratorMaxRetries && getShardIteratorResult == null) { try { getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest); } catch (AmazonServiceException ex) { if (isRecoverableException(ex)) { long backoffMillis = fullJitterBackoff( - getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); + getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, retryCount++); LOG.warn("Got recoverable AmazonServiceException. Backing off for " + backoffMillis + " millis (" + ex.getErrorMessage() + ")"); Thread.sleep(backoffMillis); @@ -310,7 +317,7 @@ private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest) } if (getShardIteratorResult == null) { - throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxAttempts + + throw new RuntimeException("Rate Exceeded for getShardIterator operation - all " + getShardIteratorMaxRetries + " retry attempts returned ProvisionedThroughputExceededException."); } return getShardIteratorResult.getShardIterator(); @@ -406,16 +413,16 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha ListShardsResult listShardsResults = null; // Call ListShards, with full-jitter backoff (if we get LimitExceededException). - int attemptCount = 0; + int retryCount = 0; // List Shards returns just the first 1000 shard entries. Make sure that all entries // are taken up. - while (listShardsResults == null) { // retry until we get a result + while (retryCount <= listShardsMaxRetries && listShardsResults == null) { // retry until we get a result try { listShardsResults = kinesisClient.listShards(listShardsRequest); } catch (LimitExceededException le) { long backoffMillis = fullJitterBackoff( - listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++); LOG.warn("Got LimitExceededException when listing shards from stream " + streamName + ". Backing off for " + backoffMillis + " millis."); Thread.sleep(backoffMillis); @@ -433,6 +440,18 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha } catch (ExpiredNextTokenException expiredToken) { LOG.warn("List Shards has an expired token. Reusing the previous state."); break; + } catch (SdkClientException ex) { + if (retryCount < listShardsMaxRetries && isRecoverableSdkClientException(ex)) { + long backoffMillis = fullJitterBackoff( + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, retryCount++); + LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.", + streamName, backoffMillis); + Thread.sleep(backoffMillis); + } else { + // propagate if retries exceeded or not recoverable + // (otherwise would return null result and keep trying forever) + throw ex; + } } } // Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java index 775ae4b335278..edf6ceb0d5741 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java @@ -27,16 +27,24 @@ import com.amazonaws.AmazonServiceException.ErrorType; import com.amazonaws.ClientConfiguration; import com.amazonaws.ClientConfigurationFactory; +import com.amazonaws.SdkClientException; import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.AmazonKinesisException; import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.ListShardsRequest; import com.amazonaws.services.kinesis.model.ListShardsResult; import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.Shard; +import org.apache.commons.lang3.mutable.MutableInt; import org.hamcrest.Description; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.powermock.reflect.Whitebox; import java.util.ArrayList; @@ -54,6 +62,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -91,6 +100,37 @@ public void testIsRecoverableExceptionWithNullErrorType() { assertFalse(KinesisProxy.isRecoverableException(ex)); } + @Test + public void testGetRecordsRetry() throws Exception { + Properties kinesisConsumerConfig = new Properties(); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + + final GetRecordsResult expectedResult = new GetRecordsResult(); + MutableInt retries = new MutableInt(); + final Throwable[] retriableExceptions = new Throwable[] { + new AmazonKinesisException("mock"), + }; + + AmazonKinesisClient mockClient = mock(AmazonKinesisClient.class); + Mockito.when(mockClient.getRecords(any())).thenAnswer(new Answer() { + @Override + public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable{ + if (retries.intValue() < retriableExceptions.length) { + retries.increment(); + throw retriableExceptions[retries.intValue() - 1]; + } + return expectedResult; + } + }); + + KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig); + Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient); + + GetRecordsResult result = kinesisProxy.getRecords("fakeShardIterator", 1); + assertEquals(retriableExceptions.length, retries.intValue()); + assertEquals(expectedResult, result); + } + @Test public void testGetShardList() throws Exception { List shardIds = @@ -151,6 +191,60 @@ public void testGetShardList() throws Exception { expectedStreamShard.toArray(new StreamShardHandle[actualShardList.size()]))); } + @Test + public void testGetShardListRetry() throws Exception { + Properties kinesisConsumerConfig = new Properties(); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + + Shard shard = new Shard(); + shard.setShardId("fake-shard-000000000000"); + final ListShardsResult expectedResult = new ListShardsResult(); + expectedResult.withShards(shard); + + MutableInt exceptionCount = new MutableInt(); + final Throwable[] retriableExceptions = new Throwable[]{ + new AmazonKinesisException("attempt1"), + new AmazonKinesisException("attempt2"), + }; + + AmazonKinesisClient mockClient = mock(AmazonKinesisClient.class); + Mockito.when(mockClient.listShards(any())).thenAnswer(new Answer() { + + @Override + public ListShardsResult answer(InvocationOnMock invocation) throws Throwable { + if (exceptionCount.intValue() < retriableExceptions.length) { + exceptionCount.increment(); + throw retriableExceptions[exceptionCount.intValue() - 1]; + } + return expectedResult; + } + }); + + KinesisProxy kinesisProxy = new KinesisProxy(kinesisConsumerConfig); + Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient); + + HashMap streamNames = new HashMap(); + streamNames.put("fake-stream", null); + GetShardListResult result = kinesisProxy.getShardList(streamNames); + assertEquals(retriableExceptions.length, exceptionCount.intValue()); + assertEquals(true, result.hasRetrievedShards()); + assertEquals(shard.getShardId(), result.getLastSeenShardOfStream("fake-stream").getShard().getShardId()); + + // test max attempt count exceeded + int maxRetries = 1; + exceptionCount.setValue(0); + kinesisConsumerConfig.setProperty(ConsumerConfigConstants.LIST_SHARDS_RETRIES, String.valueOf(maxRetries)); + kinesisProxy = new KinesisProxy(kinesisConsumerConfig); + Whitebox.getField(KinesisProxy.class, "kinesisClient").set(kinesisProxy, mockClient); + try { + kinesisProxy.getShardList(streamNames); + Assert.fail("exception expected"); + } catch (SdkClientException ex) { + assertEquals(retriableExceptions[maxRetries], ex); + } + assertEquals(maxRetries + 1, exceptionCount.intValue()); + } + @Test public void testCustomConfigurationOverride() { Properties configProps = new Properties();