Skip to content

Commit

Permalink
[FLINK-8944] [Kinesis Connector] Use listShards instead of DescribeSt…
Browse files Browse the repository at this point in the history
…ream for shard discovery as it offer higher rate limits

This closes apache#5992
  • Loading branch information
kailashhd authored and tzulitai committed Jun 22, 2018
1 parent ae8cef3 commit ef9e837
Show file tree
Hide file tree
Showing 5 changed files with 295 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,42 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The date format of initial timestamp to start reading Kinesis stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION). */
public static final String STREAM_TIMESTAMP_DATE_FORMAT = "flink.stream.initpos.timestamp.format";

/**
* Deprecated key.
*
* @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_BASE} instead
**/
@Deprecated
/** The base backoff time between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_BASE = "flink.stream.describe.backoff.base";

/**
* Deprecated key.
*
* @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_MAX} instead
**/
@Deprecated
/** The maximum backoff time between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_MAX = "flink.stream.describe.backoff.max";

/**
* Deprecated key.
*
* @deprecated Use {@link ConsumerConfigConstants#LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT} instead
**/
@Deprecated
/** The power constant for exponential backoff between each describeStream attempt. */
public static final String STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = "flink.stream.describe.backoff.expconst";

/** The base backoff time between each listShards attempt. */
public static final String LIST_SHARDS_BACKOFF_BASE = "flink.list.shards.backoff.base";

/** The maximum backoff time between each listShards attempt. */
public static final String LIST_SHARDS_BACKOFF_MAX = "flink.list.shards.backoff.max";

/** The power constant for exponential backoff between each listShards attempt. */
public static final String LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.list.shards.backoff.expconst";

/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";

Expand Down Expand Up @@ -115,12 +142,21 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {

public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX";

@Deprecated
public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L;

@Deprecated
public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L;

@Deprecated
public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;

public static final long DEFAULT_LIST_SHARDS_BACKOFF_BASE = 1000L;

public static final long DEFAULT_LIST_SHARDS_BACKOFF_MAX = 5000L;

public static final double DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5;

public static final int DEFAULT_SHARD_GETRECORDS_MAX = 10000;

public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,27 @@
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ExpiredNextTokenException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.ResourceInUseException;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -76,17 +79,17 @@ public class KinesisProxy implements KinesisProxyInterface {
private static final Random seed = new Random();

// ------------------------------------------------------------------------
// describeStream() related performance settings
// listShards() related performance settings
// ------------------------------------------------------------------------

/** Base backoff millis for the describe stream operation. */
private final long describeStreamBaseBackoffMillis;
/** Base backoff millis for the list shards operation. */
private final long listShardsBaseBackoffMillis;

/** Maximum backoff millis for the describe stream operation. */
private final long describeStreamMaxBackoffMillis;
/** Maximum backoff millis for the list shards operation. */
private final long listShardsMaxBackoffMillis;

/** Exponential backoff power constant for the describe stream operation. */
private final double describeStreamExpConstant;
/** Exponential backoff power constant for the list shards operation. */
private final double listShardsExpConstant;

// ------------------------------------------------------------------------
// getRecords() related performance settings
Expand Down Expand Up @@ -127,21 +130,22 @@ public class KinesisProxy implements KinesisProxyInterface {
*/
protected KinesisProxy(Properties configProps) {
checkNotNull(configProps);
KinesisConfigUtil.replaceDeprecatedConsumerKeys(configProps);

this.kinesisClient = createKinesisClient(configProps);

this.describeStreamBaseBackoffMillis = Long.valueOf(
this.listShardsBaseBackoffMillis = Long.valueOf(
configProps.getProperty(
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE)));
this.describeStreamMaxBackoffMillis = Long.valueOf(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_BASE)));
this.listShardsMaxBackoffMillis = Long.valueOf(
configProps.getProperty(
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
Long.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX)));
this.describeStreamExpConstant = Double.valueOf(
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
Long.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_MAX)));
this.listShardsExpConstant = Double.valueOf(
configProps.getProperty(
ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT)));
ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
Double.toString(ConsumerConfigConstants.DEFAULT_LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT)));

this.getRecordsBaseBackoffMillis = Long.valueOf(
configProps.getProperty(
Expand Down Expand Up @@ -353,19 +357,24 @@ protected static boolean isRecoverableException(AmazonServiceException ex) {
private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable String lastSeenShardId) throws InterruptedException {
List<StreamShardHandle> shardsOfStream = new ArrayList<>();

DescribeStreamResult describeStreamResult;
// List Shards returns just the first 1000 shard entries. In order to read the entire stream,
// we need to use the returned nextToken to get additional shards.
ListShardsResult listShardsResult;
String startShardToken = null;
do {
describeStreamResult = describeStream(streamName, lastSeenShardId);

List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
listShardsResult = listShards(streamName, lastSeenShardId, startShardToken);
if (listShardsResult == null) {
// In case we have exceptions while retrieving all shards, ensure that incomplete shard list is not returned.
// Hence clearing the incomplete shard list before returning it.
shardsOfStream.clear();
return shardsOfStream;
}
List<Shard> shards = listShardsResult.getShards();
for (Shard shard : shards) {
shardsOfStream.add(new StreamShardHandle(streamName, shard));
}

if (shards.size() != 0) {
lastSeenShardId = shards.get(shards.size() - 1).getShardId();
}
} while (describeStreamResult.getStreamDescription().isHasMoreShards());
startShardToken = listShardsResult.getNextToken();
} while (startShardToken != null);

return shardsOfStream;
}
Expand All @@ -382,50 +391,63 @@ private List<StreamShardHandle> getShardsOfStream(String streamName, @Nullable S
* @param startShardId which shard to start with for this describe operation (earlier shard's infos will not appear in result)
* @return the result of the describe stream operation
*/
private DescribeStreamResult describeStream(String streamName, @Nullable String startShardId) throws InterruptedException {
final DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName(streamName);
describeStreamRequest.setExclusiveStartShardId(startShardId);
private ListShardsResult listShards(String streamName, @Nullable String startShardId,
@Nullable String startNextToken)
throws InterruptedException {
final ListShardsRequest listShardsRequest = new ListShardsRequest();
if (startNextToken == null) {
listShardsRequest.setExclusiveStartShardId(startShardId);
listShardsRequest.setStreamName(streamName);
} else {
// Note the nextToken returned by AWS expires within 300 sec.
listShardsRequest.setNextToken(startNextToken);
}

DescribeStreamResult describeStreamResult = null;
ListShardsResult listShardsResults = null;

// Call DescribeStream, with full-jitter backoff (if we get LimitExceededException).
// Call ListShards, with full-jitter backoff (if we get LimitExceededException).
int attemptCount = 0;
while (describeStreamResult == null) { // retry until we get a result
// List Shards returns just the first 1000 shard entries. Make sure that all entries
// are taken up.
while (listShardsResults == null) { // retry until we get a result
try {
describeStreamResult = kinesisClient.describeStream(describeStreamRequest);

listShardsResults = kinesisClient.listShards(listShardsRequest);
} catch (LimitExceededException le) {
long backoffMillis = fullJitterBackoff(
describeStreamBaseBackoffMillis, describeStreamMaxBackoffMillis, describeStreamExpConstant, attemptCount++);
LOG.warn("Got LimitExceededException when describing stream " + streamName + ". Backing off for "
+ backoffMillis + " millis.");
listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
LOG.warn("Got LimitExceededException when listing shards from stream " + streamName
+ ". Backing off for " + backoffMillis + " millis.");
Thread.sleep(backoffMillis);
} catch (ResourceNotFoundException re) {
throw new RuntimeException("Error while getting stream details", re);
}
}

String streamStatus = describeStreamResult.getStreamDescription().getStreamStatus();
if (!(streamStatus.equals(StreamStatus.ACTIVE.toString()) || streamStatus.equals(StreamStatus.UPDATING.toString()))) {
if (LOG.isWarnEnabled()) {
LOG.warn("The status of stream " + streamName + " is " + streamStatus + "; result of the current " +
"describeStream operation will not contain any shard information.");
} catch (ResourceInUseException reInUse) {
if (LOG.isWarnEnabled()) {
// List Shards will throw an exception if stream in not in active state. Return and re-use previous state available.
LOG.info("The stream is currently not in active state. Reusing the older state "
+ "for the time being");
break;
}
} catch (ResourceNotFoundException reNotFound) {
throw new RuntimeException("Stream not found. Error while getting shard list.", reNotFound);
} catch (InvalidArgumentException inArg) {
throw new RuntimeException("Invalid Arguments to listShards.", inArg);
} catch (ExpiredNextTokenException expiredToken) {
LOG.warn("List Shards has an expired token. Reusing the previous state.");
break;
}
}

// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before the exclusive
// start shard id in the returned shards list; check if we need to remove these erroneously returned shards
if (startShardId != null) {
List<Shard> shards = describeStreamResult.getStreamDescription().getShards();
// Kinesalite (mock implementation of Kinesis) does not correctly exclude shards before
// the exclusive start shard id in the returned shards list; check if we need to remove
// these erroneously returned shards.
if (startShardId != null && listShardsResults != null) {
List<Shard> shards = listShardsResults.getShards();
Iterator<Shard> shardItr = shards.iterator();
while (shardItr.hasNext()) {
if (StreamShardHandle.compareShardIds(shardItr.next().getShardId(), startShardId) <= 0) {
shardItr.remove();
}
}
}

return describeStreamResult;
return listShardsResults;
}

protected static long fullJitterBackoff(long base, long max, double power, int attempt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@

import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -73,6 +77,8 @@ public class KinesisConfigUtil {
/** Default values for ThreadPoolSize. **/
protected static final int DEFAULT_THREAD_POOL_SIZE = 10;

private static final Logger LOG = LoggerFactory.getLogger(KinesisConfigUtil.class);

/**
* Validate configuration properties for {@link FlinkKinesisConsumer}.
*/
Expand Down Expand Up @@ -142,14 +148,14 @@ public static void validateConsumerConfiguration(Properties config) {
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
"Invalid value given for shard discovery sleep interval in milliseconds. Must be a valid non-negative long value.");

validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE,
"Invalid value given for describe stream operation base backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE,
"Invalid value given for list shards operation base backoff milliseconds. Must be a valid non-negative long value.");

validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX,
"Invalid value given for describe stream operation max backoff milliseconds. Must be a valid non-negative long value.");
validateOptionalPositiveLongProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX,
"Invalid value given for list shards operation max backoff milliseconds. Must be a valid non-negative long value.");

validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT,
"Invalid value given for describe stream operation backoff exponential constant. Must be a valid non-negative double value.");
validateOptionalPositiveDoubleProperty(config, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT,
"Invalid value given for list shards operation backoff exponential constant. Must be a valid non-negative double value.");

if (config.containsKey(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS)) {
checkArgument(
Expand Down Expand Up @@ -181,6 +187,23 @@ public static Properties replaceDeprecatedProducerKeys(Properties configProps) {
return configProps;
}

public static Properties replaceDeprecatedConsumerKeys(Properties configProps) {
HashMap<String, String> deprecatedOldKeyToNewKeys = new HashMap<>();
deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_BASE, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_BASE);
deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_MAX, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_MAX);
deprecatedOldKeyToNewKeys.put(ConsumerConfigConstants.STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT, ConsumerConfigConstants.LIST_SHARDS_BACKOFF_EXPONENTIAL_CONSTANT);
for (Map.Entry<String, String> entry : deprecatedOldKeyToNewKeys.entrySet()) {
String deprecatedOldKey = entry.getKey();
String newKey = entry.getValue();
if (configProps.containsKey(deprecatedOldKey)) {
LOG.warn("Please note {} property has been deprecated. Please use the {} new property key", deprecatedOldKey, newKey);
configProps.setProperty(newKey, configProps.getProperty(deprecatedOldKey));
configProps.remove(deprecatedOldKey);
}
}
return configProps;
}

/**
* Validate configuration properties for {@link FlinkKinesisProducer},
* and return a constructed KinesisProducerConfiguration.
Expand Down
Loading

0 comments on commit ef9e837

Please sign in to comment.