Skip to content

Commit

Permalink
[FLINK-14881][kinesis] Bump aws-sdk-version to 1.11.754 to support We…
Browse files Browse the repository at this point in the history
…bIdentityToken

This closes apache#12008
  • Loading branch information
aroch authored and rmetzger committed May 14, 2020
1 parent dd5d007 commit c1ea6fc
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 27 deletions.
23 changes: 20 additions & 3 deletions docs/dev/connectors/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,25 @@ Due to the licensing issue, the `flink-connector-kinesis{{ site.scala_version_su

## Using the Amazon Kinesis Streams Service
Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams.
to setup Kinesis streams.

## Configuring Access to Kinesis with IAM
Make sure to create the appropriate IAM policy to allow reading / writing to / from the Kinesis streams. See examples [here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html).

Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis.
By default, the `AUTO` Credentials Provider is used.
If the access key ID and secret key are set in the configuration, the `BASIC` provider is used.

A specific Credentials Provider can **optionally** be set by using the `AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting.

Supported Credential Providers are:
* `AUTO` - Using the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider.
* `BASIC` - Using access key ID and secret key supplied as configuration.
* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables.
* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey.
* `PROFILE` - Use AWS credentials profile file to create the AWS credentials.
* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.
* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token.

## Kinesis Consumer

Expand Down Expand Up @@ -91,8 +109,7 @@ The above is a simple example of using the consumer. Configuration for the consu
instance, the configuration keys for which can be found in `AWSConfigConstants` (AWS-specific parameters) and
`ConsumerConfigConstants` (Kinesis consumer parameters). The example
demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which
the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting
`AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, `ASSUME_ROLE`, and `AUTO`). Also, data is being consumed
the AWS access key ID and secret access key are directly supplied in the configuration. Also, data is being consumed
from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).

Expand Down
2 changes: 1 addition & 1 deletion flink-connectors/flink-connector-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ under the License.
<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
<name>flink-connector-kinesis</name>
<properties>
<aws.sdk.version>1.11.603</aws.sdk.version>
<aws.sdk.version>1.11.754</aws.sdk.version>
<aws.kinesis-kcl.version>1.11.2</aws.kinesis-kcl.version>
<aws.kinesis-kpl.version>0.14.0</aws.kinesis-kpl.version>
<aws.dynamodbstreams-kinesis-adapter.version>1.5.0</aws.dynamodbstreams-kinesis-adapter.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ public enum CredentialProvider {
/** Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. **/
ASSUME_ROLE,

/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, PROFILE in the AWS instance metadata. **/
/** Use AWS WebIdentityToken in order to assume a role. A token file and role details can be supplied as configuration or environment variables. **/
WEB_IDENTITY_TOKEN,

/** A credentials provider chain will be used that searches for credentials in this order: ENV_VARS, SYS_PROPS, WEB_IDENTITY_TOKEN, PROFILE in the AWS instance metadata. **/
AUTO,
}

Expand All @@ -70,15 +73,18 @@ public enum CredentialProvider {
/** Optional configuration for profile name if credential provider type is set to be PROFILE. */
public static final String AWS_PROFILE_NAME = profileName(AWS_CREDENTIALS_PROVIDER);

/** The role ARN to use when credential provider type is set to ASSUME_ROLE. */
/** The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN. */
public static final String AWS_ROLE_ARN = roleArn(AWS_CREDENTIALS_PROVIDER);

/** The role session name to use when credential provider type is set to ASSUME_ROLE. */
/** The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN. */
public static final String AWS_ROLE_SESSION_NAME = roleSessionName(AWS_CREDENTIALS_PROVIDER);

/** The external ID to use when credential provider type is set to ASSUME_ROLE. */
public static final String AWS_ROLE_EXTERNAL_ID = externalId(AWS_CREDENTIALS_PROVIDER);

/** The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN. */
public static final String AWS_WEB_IDENTITY_TOKEN_FILE = webIdentityTokenFile(AWS_CREDENTIALS_PROVIDER);

/**
* The credentials provider that provides credentials for assuming the role when credential
* provider type is set to ASSUME_ROLE.
Expand Down Expand Up @@ -120,4 +126,8 @@ public static String externalId(String prefix) {
public static String roleCredentialsProvider(String prefix) {
return prefix + ".role.provider";
}

public static String webIdentityTokenFile(String prefix) {
return prefix + ".webIdentityToken.file";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.auth.WebIdentityTokenCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
Expand Down Expand Up @@ -174,6 +175,13 @@ public void refresh() {
.withStsClient(baseCredentials)
.build();

case WEB_IDENTITY_TOKEN:
return WebIdentityTokenCredentialsProvider.builder()
.roleArn(configProps.getProperty(AWSConfigConstants.roleArn(configPrefix), null))
.roleSessionName(configProps.getProperty(AWSConfigConstants.roleSessionName(configPrefix), null))
.webIdentityTokenFile(configProps.getProperty(AWSConfigConstants.webIdentityTokenFile(configPrefix), null))
.build();

default:
case AUTO:
return new DefaultAWSCredentialsProviderChain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ This project bundles the following dependencies under the Apache Software Licens

- com.amazonaws:amazon-kinesis-client:1.11.2
- com.amazonaws:amazon-kinesis-producer:0.14.0
- com.amazonaws:aws-java-sdk-core:1.11.603
- com.amazonaws:aws-java-sdk-dynamodb:1.11.603
- com.amazonaws:aws-java-sdk-kinesis:1.11.603
- com.amazonaws:aws-java-sdk-kms:1.11.603
- com.amazonaws:aws-java-sdk-s3:1.11.603
- com.amazonaws:aws-java-sdk-sts:1.11.603
- com.amazonaws:aws-java-sdk-core:1.11.754
- com.amazonaws:aws-java-sdk-dynamodb:1.11.754
- com.amazonaws:aws-java-sdk-kinesis:1.11.754
- com.amazonaws:aws-java-sdk-kms:1.11.754
- com.amazonaws:aws-java-sdk-s3:1.11.754
- com.amazonaws:aws-java-sdk-sts:1.11.754
- com.amazonaws:dynamodb-streams-kinesis-adapter:1.5.0
- com.amazonaws:jmespath-java:1.11.603
- com.amazonaws:jmespath-java:1.11.754
- org.apache.httpcomponents:httpclient:4.5.9
- org.apache.httpcomponents:httpcore:4.4.6

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kinesis.util;

import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.WebIdentityTokenCredentialsProvider;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.Properties;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Tests for AWSUtil.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest(AWSUtil.class)
public class AWSUtilTest {
@Rule
private final ExpectedException exception = ExpectedException.none();

@Test
public void testDefaultCredentialsProvider() {
Properties testConfig = new Properties();

AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);

assertTrue(credentialsProvider instanceof DefaultAWSCredentialsProviderChain);
}

@Test
public void testGetCredentialsProvider() {
Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "WEB_IDENTITY_TOKEN");

AWSCredentialsProvider credentialsProvider = AWSUtil.getCredentialsProvider(testConfig);
assertTrue(credentialsProvider instanceof WebIdentityTokenCredentialsProvider);
}

@Test
public void testInvalidCredentialsProvider() {
exception.expect(IllegalArgumentException.class);

Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "INVALID_PROVIDER");

AWSUtil.getCredentialsProvider(testConfig);
}

@Test
public void testValidRegion() {
assertTrue(AWSUtil.isValidRegion("us-east-1"));
}

@Test
public void testInvalidRegion() {
assertFalse(AWSUtil.isValidRegion("ur-east-1"));
}
}
22 changes: 9 additions & 13 deletions flink-filesystems/flink-s3-fs-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,22 @@ under the License.

<packaging>jar</packaging>

<!-- Override the flink-parent dependencyManagement definition for hadoop-common to ensure
${fs.hadoopshaded.version} is used for this file system -->
<dependencyManagement>
<dependencies>
<!-- Override the flink-parent dependencyManagement definition for hadoop-common to ensure
${fs.hadoopshaded.version} is used for this file system -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${fs.hadoopshaded.version}</version>
</dependency>

<!-- aws-sdk requires httpclient >= 4.5.9 due to api compatibility breakage -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.9</version>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
Expand Down Expand Up @@ -87,17 +94,6 @@ under the License.
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<!-- aws-sdk requires httpclient >= 4.5.9 due to api compatibility breakage -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.9</version>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This project bundles the following dependencies under the Apache Software Licens
- com.amazonaws:aws-java-sdk-dynamodb:1.11.754
- com.amazonaws:aws-java-sdk-kms:1.11.754
- com.amazonaws:aws-java-sdk-s3:1.11.754
- com.amazonaws:aws-java-sdk-sts:1.11.754
- com.amazonaws:jmespath-java:1.11.754
- com.fasterxml.jackson.core:jackson-annotations:2.10.1
- com.fasterxml.jackson.core:jackson-core:2.10.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ This project bundles the following dependencies under the Apache Software Licens
- com.amazonaws:aws-java-sdk-dynamodb:1.11.754
- com.amazonaws:aws-java-sdk-kms:1.11.754
- com.amazonaws:aws-java-sdk-s3:1.11.754
- com.amazonaws:aws-java-sdk-sts:1.11.754
- com.amazonaws:jmespath-java:1.11.754
- com.facebook.presto:presto-hive:0.187
- com.facebook.presto.hadoop:hadoop-apache2:2.7.3-1
Expand Down

0 comments on commit c1ea6fc

Please sign in to comment.