Skip to content

Commit

Permalink
[FLINK-4197] Allow Kinesis endpoint to be overridden via config
Browse files Browse the repository at this point in the history
This closes apache#2227
  • Loading branch information
skidder authored and rmetzger committed Jul 13, 2016
1 parent 565f941 commit bc3a96f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 0 deletions.
40 changes: 40 additions & 0 deletions docs/apis/streaming/connectors/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,3 +302,43 @@ Otherwise, the returned stream name is used.
Other optional configuration keys can be found in `KinesisConfigConstants`.


### Using Non-AWS Kinesis Endpoints for Testing

It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as kinesalite; this is especially useful when performing functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overriden via a configuration property.

To override the AWS endpoint, set the `KinesisConfigConstants.CONFIG_AWS_ENDPOINT` property in the Flink configuration, in addition to the `KinesisConfigConstants.CONFIG_AWS_REGION` required by Flink. Although the region is required, it will not be used to determine the AWS endpoint URL.

The following example shows how one might supply the `KinesisConfigConstants.CONFIG_AWS_ENDPOINT` configuration property:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Properties kinesisProducerConfig = new Properties();
kinesisProducerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
kinesisProducerConfig.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"aws_access_key_id_here");
kinesisProducerConfig.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"aws_secret_key_here");
kinesisProducerConfig.put(
KinesisConfigConstants.CONFIG_AWS_ENDPOINT,
"http:https://localhost:4567");
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val kinesisProducerConfig = new Properties();
kinesisProducerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
kinesisProducerConfig.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"aws_access_key_id_here");
kinesisProducerConfig.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"aws_secret_key_here");
kinesisProducerConfig.put(
KinesisConfigConstants.CONFIG_AWS_ENDPOINT,
"http:https://localhost:4567");
{% endhighlight %}
</div>
</div>
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public class KinesisConfigConstants {
/** The AWS region of the Kinesis streams to be pulled ("us-east-1" is used if not set) */
public static final String CONFIG_AWS_REGION = "aws.region";

/** The AWS endpoint for Kinesis (derived from the AWS region setting if not set) */
public static final String CONFIG_AWS_ENDPOINT = "aws.endpoint";

/** Maximum number of items to pack into an PutRecords request. **/
public static final String CONFIG_PRODUCER_COLLECTION_MAX_COUNT = "aws.producer.collectionMaxCount";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public static AmazonKinesisClient createKinesisClient(Properties configProps) {
AmazonKinesisClient client =
new AmazonKinesisClient(AWSUtil.getCredentialsProvider(configProps).getCredentials(), awsClientConfig);
client.setRegion(Region.getRegion(Regions.fromName(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION))));
if (configProps.containsKey(KinesisConfigConstants.CONFIG_AWS_ENDPOINT)) {
client.setEndpoint(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_ENDPOINT));
}
return client;
}

Expand Down

0 comments on commit bc3a96f

Please sign in to comment.