Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-10316][kinesis] bug was preventing FlinkKinesisProducer to connect to Kinesalite #6789

Conversation

diego-carvallo-tx
Copy link

@diego-carvallo-tx diego-carvallo-tx commented Oct 3, 2018

What is the purpose of the change

  • Some time ago FLINK-4197 added the ability to connect to a local Kinesis endpoint but also introduced some bugs
  • Some time ago FLINK-9402 added some fixes for those bugs but for FlinkKinesisConsumer only
  • This PR addresses FLINK-10316 to fix the remaining bugs to allow FlinkKinesisProducer also to connect to a local Kinesis endpoint

Brief change log

  • The method KinesisConfigUtil.validateAwsConfiguration(Properties config) is used by both FlinkKinesisConsumer and FlinkKinesisProducer but AWS_REGION/AWS_ENDPOINT validation was performed only for Consumer who needs only one of them to be set. On the other side Producer requires AWS_REGION to be set even if AWS_ENDPOINT is defined to connect to local Kinesis. Since this validation code is not common for Consumer and Producer the change in this PR is moving out of this method the AWS_REGION/AWS_ENDPOINT validation which is now handled by validateConsumerConfiguration(...) and getValidatedProducerConfiguration(...) (per suggestion in comments below by @tweise)
  • Changed the Unit Tests accordingly

Jira Note

There are 2 other tickets that could be closed along with this one since they all refer to the same issue: FLINK-9618 and FLINK-8936

Verifying this change

This change is already covered by existing tests, such as (please describe tests).

  • KinesisConfigUtilTest. testMissingAwsRegionInProducerConfig
  • KinesisConfigUtilTest. testAwsRegionOrEndpointInConsumerConfig

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

// per validation in AwsClientBuilder
throw new IllegalArgumentException(String.format("Either AWS region ('%s') or AWS endpoint ('%s') must be set in the config.",
AWSConfigConstants.AWS_REGION, AWSConfigConstants.AWS_REGION));
if (isProducer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it isn't common, respective code should be moved into validateConsumerConfiguration and getValidatedProducerConfiguration.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, I just pushed the code with this change and also updated the PR description.

@diego-carvallo-tx diego-carvallo-tx force-pushed the fix_FlinkKinesisProducer_localstack branch from 4419f4c to 292839d Compare October 4, 2018 23:43
@tzulitai
Copy link
Contributor

Thanks for the PR @diego-carvallo-tx.
Changes look good, merging this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants