Skip to content

Commit

Permalink
[FLINK-10316] [kinesis] bug was preventing FlinkKinesisProducer to co…
Browse files Browse the repository at this point in the history
…nnect to Kinesalite

This closes apache#6808.
  • Loading branch information
Diego Carvallo authored and tzulitai committed Oct 10, 2018
1 parent f071cd5 commit 9b0bf69
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ public static void validateConsumerConfiguration(Properties config) {

validateAwsConfiguration(config);

if (!(config.containsKey(AWSConfigConstants.AWS_REGION) ^ config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) {
// per validation in AwsClientBuilder
throw new IllegalArgumentException(String.format("For FlinkKinesisConsumer either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_ENDPOINT));
}

if (config.containsKey(ConsumerConfigConstants.STREAM_INITIAL_POSITION)) {
String initPosType = config.getProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION);

Expand Down Expand Up @@ -213,6 +219,11 @@ public static KinesisProducerConfiguration getValidatedProducerConfiguration(Pro

validateAwsConfiguration(config);

if (!config.containsKey(AWSConfigConstants.AWS_REGION)) {
// per requirement in Amazon Kinesis Producer Library
throw new IllegalArgumentException(String.format("For FlinkKinesisProducer AWS region ('%s') must be set in the config.", AWSConfigConstants.AWS_REGION));
}

KinesisProducerConfiguration kpc = KinesisProducerConfiguration.fromProperties(config);
kpc.setRegion(config.getProperty(AWSConfigConstants.AWS_REGION));

Expand Down Expand Up @@ -266,12 +277,6 @@ public static void validateAwsConfiguration(Properties config) {
}
}

if (!(config.containsKey(AWSConfigConstants.AWS_REGION) ^ config.containsKey(ConsumerConfigConstants.AWS_ENDPOINT))) {
// per validation in AwsClientBuilder
throw new IllegalArgumentException(String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION));
}

if (config.containsKey(AWSConfigConstants.AWS_REGION)) {
// specified AWS Region name must be recognizable
if (!AWSUtil.isValidRegion(config.getProperty(AWSConfigConstants.AWS_REGION))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,24 +125,24 @@ public void testCorrectlySetRegionInProducerConfiguration() {
assertEquals("incorrect region", region, kpc.getRegion());
}

// ----------------------------------------------------------------------
// validateAwsConfiguration() tests
// ----------------------------------------------------------------------

@Test
public void testMissingAwsRegionInConfig() {
String expectedMessage = String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION);
public void testMissingAwsRegionInProducerConfig() {
String expectedMessage = String.format("For FlinkKinesisProducer AWS region ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION);
exception.expect(IllegalArgumentException.class);
exception.expectMessage(expectedMessage);

Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");

KinesisConfigUtil.validateAwsConfiguration(testConfig);
KinesisConfigUtil.getValidatedProducerConfiguration(testConfig);
}

// ----------------------------------------------------------------------
// validateAwsConfiguration() tests
// ----------------------------------------------------------------------

@Test
public void testUnrecognizableAwsRegionInConfig() {
exception.expect(IllegalArgumentException.class);
Expand All @@ -156,22 +156,6 @@ public void testUnrecognizableAwsRegionInConfig() {
KinesisConfigUtil.validateAwsConfiguration(testConfig);
}

@Test
public void testAwsRegionOrEndpointInConfig() {
String expectedMessage = String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION);
exception.expect(IllegalArgumentException.class);
exception.expectMessage(expectedMessage);

Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east");
testConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT, "fake");
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");

KinesisConfigUtil.validateAwsConfiguration(testConfig);
}

@Test
public void testCredentialProviderTypeSetToBasicButNoCredentialSetInConfig() {
exception.expect(IllegalArgumentException.class);
Expand Down Expand Up @@ -200,6 +184,22 @@ public void testUnrecognizableCredentialProviderTypeInConfig() {
// validateConsumerConfiguration() tests
// ----------------------------------------------------------------------

@Test
public void testAwsRegionOrEndpointInConsumerConfig() {
String expectedMessage = String.format("For FlinkKinesisConsumer either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_ENDPOINT);
exception.expect(IllegalArgumentException.class);
exception.expectMessage(expectedMessage);

Properties testConfig = new Properties();
testConfig.setProperty(AWSConfigConstants.AWS_REGION, "us-east-1");
testConfig.setProperty(AWSConfigConstants.AWS_ENDPOINT, "fake");
testConfig.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, "accessKey");
testConfig.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "secretKey");

KinesisConfigUtil.validateConsumerConfiguration(testConfig);
}

@Test
public void testUnrecognizableStreamInitPositionTypeInConfig() {
exception.expect(IllegalArgumentException.class);
Expand Down

0 comments on commit 9b0bf69

Please sign in to comment.