Skip to content

Commit

Permalink
[FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Ki…
Browse files Browse the repository at this point in the history
…nesis Streaming Connector

This closes apache#3078.
  • Loading branch information
skidder authored and tzulitai committed Jan 20, 2017
1 parent 988729e commit b380bd3
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.flink.streaming.connectors.kinesis.proxy;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.StreamStatus;
import com.amazonaws.services.kinesis.model.Shard;
Expand Down Expand Up @@ -193,12 +194,16 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th
while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
try {
getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
} catch (ProvisionedThroughputExceededException ex) {
long backoffMillis = fullJitterBackoff(
getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
+ backoffMillis + " millis.");
Thread.sleep(backoffMillis);
} catch (AmazonServiceException ex) {
if (isRecoverableException(ex)) {
long backoffMillis = fullJitterBackoff(
getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
LOG.warn("Got recoverable AmazonServiceException. Backing off for "
+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
Thread.sleep(backoffMillis);
} else {
throw ex;
}
}
}

Expand Down Expand Up @@ -237,12 +242,16 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp
try {
getShardIteratorResult =
kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
} catch (ProvisionedThroughputExceededException ex) {
long backoffMillis = fullJitterBackoff(
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
LOG.warn("Got ProvisionedThroughputExceededException. Backing off for "
+ backoffMillis + " millis.");
Thread.sleep(backoffMillis);
} catch (AmazonServiceException ex) {
if (isRecoverableException(ex)) {
long backoffMillis = fullJitterBackoff(
getShardIteratorBaseBackoffMillis, getShardIteratorMaxBackoffMillis, getShardIteratorExpConstant, attempt++);
LOG.warn("Got recoverable AmazonServiceException. Backing off for "
+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
Thread.sleep(backoffMillis);
} else {
throw ex;
}
}
}

Expand All @@ -253,6 +262,29 @@ public String getShardIterator(KinesisStreamShard shard, String shardIteratorTyp
return getShardIteratorResult.getShardIterator();
}

/**
* Determines whether the exception is recoverable using exponential-backoff.
*
* @param ex Exception to inspect
* @return <code>true</code> if the exception can be recovered from, else
* <code>false</code>
*/
protected static boolean isRecoverableException(AmazonServiceException ex) {
if (ex.getErrorType() == null) {
return false;
}

switch (ex.getErrorType()) {
case Client:
return ex instanceof ProvisionedThroughputExceededException;
case Service:
case Unknown:
return true;
default:
return false;
}
}

private List<KinesisStreamShard> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
List<KinesisStreamShard> shardsOfStream = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kinesis.proxy;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

import org.junit.Test;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.AmazonServiceException.ErrorType;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;

/**
* Test for methods in the {@link KinesisProxy} class.
*/
public class KinesisProxyTest {

@Test
public void testIsRecoverableExceptionWithProvisionedThroughputExceeded() {
final ProvisionedThroughputExceededException ex = new ProvisionedThroughputExceededException("asdf");
ex.setErrorType(ErrorType.Client);
assertTrue(KinesisProxy.isRecoverableException(ex));
}

@Test
public void testIsRecoverableExceptionWithServiceException() {
final AmazonServiceException ex = new AmazonServiceException("asdf");
ex.setErrorType(ErrorType.Service);
assertTrue(KinesisProxy.isRecoverableException(ex));
}

@Test
public void testIsRecoverableExceptionWithExpiredIteratorException() {
final ExpiredIteratorException ex = new ExpiredIteratorException("asdf");
ex.setErrorType(ErrorType.Client);
assertFalse(KinesisProxy.isRecoverableException(ex));
}

@Test
public void testIsRecoverableExceptionWithNullErrorType() {
final AmazonServiceException ex = new AmazonServiceException("asdf");
ex.setErrorType(null);
assertFalse(KinesisProxy.isRecoverableException(ex));
}

}

0 comments on commit b380bd3

Please sign in to comment.