Skip to content

Commit

Permalink
[FLINK-9124] [kinesis] Allow customization of KinesisProxy.getRecords…
Browse files Browse the repository at this point in the history
… read timeout and retry.

This closes apache#5803.
  • Loading branch information
tweise authored and tzulitai committed Apr 18, 2018
1 parent d5ec911 commit 9fec5ca
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
Expand Down Expand Up @@ -125,7 +128,7 @@ public class KinesisProxy implements KinesisProxyInterface {
protected KinesisProxy(Properties configProps) {
checkNotNull(configProps);

this.kinesisClient = AWSUtil.createKinesisClient(configProps);
this.kinesisClient = createKinesisClient(configProps);

this.describeStreamBaseBackoffMillis = Long.valueOf(
configProps.getProperty(
Expand Down Expand Up @@ -176,6 +179,16 @@ protected KinesisProxy(Properties configProps) {

}

/**
* Create the Kinesis client, using the provided configuration properties and default {@link ClientConfiguration}.
* Derived classes can override this method to customize the client configuration.
* @param configProps
* @return
*/
protected AmazonKinesis createKinesisClient(Properties configProps) {
return AWSUtil.createKinesisClient(configProps, new ClientConfigurationFactory().getConfig());
}

/**
* Creates a Kinesis proxy.
*
Expand All @@ -201,12 +214,12 @@ public GetRecordsResult getRecords(String shardIterator, int maxRecordsToGet) th
while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) {
try {
getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
} catch (AmazonServiceException ex) {
if (isRecoverableException(ex)) {
} catch (SdkClientException ex) {
if (isRecoverableSdkClientException(ex)) {
long backoffMillis = fullJitterBackoff(
getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
LOG.warn("Got recoverable AmazonServiceException. Backing off for "
+ backoffMillis + " millis (" + ex.getErrorMessage() + ")");
LOG.warn("Got recoverable SdkClientException. Backing off for "
+ backoffMillis + " millis (" + ex.getMessage() + ")");
Thread.sleep(backoffMillis);
} else {
throw ex;
Expand Down Expand Up @@ -296,6 +309,21 @@ private String getShardIterator(GetShardIteratorRequest getShardIteratorRequest)
return getShardIteratorResult.getShardIterator();
}

/**
* Determines whether the exception is recoverable using exponential-backoff.
*
* @param ex Exception to inspect
* @return <code>true</code> if the exception can be recovered from, else
* <code>false</code>
*/
protected boolean isRecoverableSdkClientException(SdkClientException ex) {
if (ex instanceof AmazonServiceException) {
return KinesisProxy.isRecoverableException((AmazonServiceException) ex);
}
// customizations may decide to retry other errors, such as read timeouts
return false;
}

/**
* Determines whether the exception is recoverable using exponential-backoff.
*
Expand Down Expand Up @@ -397,7 +425,7 @@ private DescribeStreamResult describeStream(String streamName, @Nullable String
return describeStreamResult;
}

private static long fullJitterBackoff(long base, long max, double power, int attempt) {
protected static long fullJitterBackoff(long base, long max, double power, int attempt) {
long exponentialBackoff = (long) Math.min(max, base * Math.pow(power, attempt));
return (long) (seed.nextDouble() * exponentialBackoff); // random jitter between 0 and the exponential backoff
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,20 @@ public class AWSUtil {
* @return a new AmazonKinesis client
*/
public static AmazonKinesis createKinesisClient(Properties configProps) {
return createKinesisClient(configProps, new ClientConfigurationFactory().getConfig());
}

/**
* Creates an Amazon Kinesis Client.
* @param configProps configuration properties containing the access key, secret key, and region
* @param awsClientConfig preconfigured AWS SDK client configuration
* @return a new Amazon Kinesis Client
*/
public static AmazonKinesis createKinesisClient(Properties configProps, ClientConfiguration awsClientConfig) {
// set a Flink-specific user agent
ClientConfiguration awsClientConfig = new ClientConfigurationFactory().getConfig()
.withUserAgentPrefix(String.format(USER_AGENT_FORMAT,
EnvironmentInformation.getVersion(),
EnvironmentInformation.getRevisionInformation().commitId));
awsClientConfig.setUserAgentPrefix(String.format(USER_AGENT_FORMAT,
EnvironmentInformation.getVersion(),
EnvironmentInformation.getRevisionInformation().commitId));

// utilize automatic refreshment of credentials by directly passing the AWSCredentialsProvider
AmazonKinesisClientBuilder builder = AmazonKinesisClientBuilder.standard()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,22 @@

package org.apache.flink.streaming.connectors.kinesis.proxy;

import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.AmazonServiceException.ErrorType;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import org.junit.Test;
import org.powermock.reflect.Whitebox;

import java.util.Properties;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -59,4 +69,21 @@ public void testIsRecoverableExceptionWithNullErrorType() {
assertFalse(KinesisProxy.isRecoverableException(ex));
}

@Test
public void testCustomConfigurationOverride() {
Properties configProps = new Properties();
configProps.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
KinesisProxy proxy = new KinesisProxy(configProps) {
@Override
protected AmazonKinesis createKinesisClient(Properties configProps) {
ClientConfiguration clientConfig = new ClientConfigurationFactory().getConfig();
clientConfig.setSocketTimeout(10000);
return AWSUtil.createKinesisClient(configProps, clientConfig);
}
};
AmazonKinesis kinesisClient = Whitebox.getInternalState(proxy, "kinesisClient");
ClientConfiguration clientConfiguration = Whitebox.getInternalState(kinesisClient, "clientConfiguration");
assertEquals(10000, clientConfiguration.getSocketTimeout());
}

}

0 comments on commit 9fec5ca

Please sign in to comment.