From b380bd313e036f2bc66941367f014c12bf8baa6d Mon Sep 17 00:00:00 2001 From: Scott Kidder Date: Fri, 16 Dec 2016 08:46:54 -0800 Subject: [PATCH] [FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector This closes #3078. --- .../kinesis/proxy/KinesisProxy.java | 58 +++++++++++++---- .../kinesis/proxy/KinesisProxyTest.java | 63 +++++++++++++++++++ 2 files changed, 108 insertions(+), 13 deletions(-) create mode 100644 flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index 9ffc8e64deeea..0b0fccf77de31 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -17,14 +17,15 @@ package org.apache.flink.streaming.connectors.kinesis.proxy; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.Shard; @@ -193,12 +194,16 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { try { getRecordsResult = kinesisClient.getRecords(getRecordsRequest); - } catch (ProvisionedThroughputExceededException ex) { - long backoffMillis = fullJitterBackoff( - getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); - LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " - + backoffMillis + " millis."); - Thread.sleep(backoffMillis); + } catch (AmazonServiceException ex) { + if (isRecoverableException(ex)) { + long backoffMillis = fullJitterBackoff( + getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); + LOG.warn("Got recoverable AmazonServiceException. Backing off for " + + backoffMillis + " millis (" + ex.getErrorMessage() + ")"); + Thread.sleep(backoffMillis); + } else { + throw ex; + } } } @@ -237,12 +242,16 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp try { getShardIteratorResult = kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum); - } catch (ProvisionedThroughputExceededException ex) { - long backoffMillis = fullJitterBackoff( - getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); - LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " - + backoffMillis + " millis."); - Thread.sleep(backoffMillis); + } catch (AmazonServiceException ex) { + if (isRecoverableException(ex)) { + long backoffMillis = fullJitterBackoff( + getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++); + LOG.warn("Got recoverable AmazonServiceException. Backing off for " + + backoffMillis + " millis (" + ex.getErrorMessage() + ")"); + Thread.sleep(backoffMillis); + } else { + throw ex; + } } } @@ -253,6 +262,29 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp return getShardIteratorResult.getShardIterator(); } + /** + * Determines whether the exception is recoverable using exponential-backoff. + * + * @param ex Exception to inspect + * @return true if the exception can be recovered from, else + * false + */ + protected static boolean isRecoverableException(AmazonServiceException ex) { + if (ex.getErrorType() == null) { + return false; + } + + switch (ex.getErrorType()) { + case Client: + return ex instanceof ProvisionedThroughputExceededException; + case Service: + case Unknown: + return true; + default: + return false; + } + } + private List getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException { List shardsOfStream = new ArrayList<>(); diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java new file mode 100644 index 0000000000000..86202c5edc348 --- /dev/null +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; + +import org.junit.Test; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.AmazonServiceException.ErrorType; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; + +/** + * Test for methods in the {@link KinesisProxy} class. + */ +public class KinesisProxyTest { + + @Test + public void testIsRecoverableExceptionWithProvisionedThroughputExceeded() { + final ProvisionedThroughputExceededException ex = new ProvisionedThroughputExceededException("asdf"); + ex.setErrorType(ErrorType.Client); + assertTrue(KinesisProxy.isRecoverableException(ex)); + } + + @Test + public void testIsRecoverableExceptionWithServiceException() { + final AmazonServiceException ex = new AmazonServiceException("asdf"); + ex.setErrorType(ErrorType.Service); + assertTrue(KinesisProxy.isRecoverableException(ex)); + } + + @Test + public void testIsRecoverableExceptionWithExpiredIteratorException() { + final ExpiredIteratorException ex = new ExpiredIteratorException("asdf"); + ex.setErrorType(ErrorType.Client); + assertFalse(KinesisProxy.isRecoverableException(ex)); + } + + @Test + public void testIsRecoverableExceptionWithNullErrorType() { + final AmazonServiceException ex = new AmazonServiceException("asdf"); + ex.setErrorType(null); + assertFalse(KinesisProxy.isRecoverableException(ex)); + } + +}