From 3f06f97505ff57c9f21ae6c0b8544cc7432f2825 Mon Sep 17 00:00:00 2001 From: Ahmed Hamdy Date: Fri, 25 Feb 2022 15:30:32 +0000 Subject: [PATCH] [FLINK-26373][connector/kinesis] Rename KinesisDataStreams module and classes into KinesisStreams --- .../docs/connectors/datastream/kinesis.md | 18 +++++------ .../docs/connectors/table/kinesis.md | 8 ++--- .../docs/connectors/datastream/kinesis.md | 20 ++++++------ docs/content/docs/connectors/table/kinesis.md | 8 ++--- .../pom.xml | 2 +- flink-architecture-tests/pom.xml | 2 +- .../pom.xml | 2 +- .../table/KinesisFirehoseDynamicSink.java | 15 ++++----- .../KinesisFirehoseDynamicTableFactory.java | 4 +-- ...inesisFirehoseDynamicTableFactoryTest.java | 7 ++-- .../75596a92-3816-4a44-85ac-7c96e72f443a} | 0 .../7e2560a3-23eb-40cc-8669-e7943e393b88 | 0 .../84abeb9c-8355-4165-96aa-dda65b04e5e7 | 4 +-- .../archunit-violations/stored.rules | 0 .../pom.xml | 4 +-- .../sink/KinesisStreamsConfigConstants.java} | 4 +-- .../sink/KinesisStreamsException.java} | 16 +++++----- .../kinesis/sink/KinesisStreamsSink.java} | 22 ++++++------- .../sink/KinesisStreamsSinkBuilder.java} | 32 +++++++++---------- .../KinesisStreamsSinkElementConverter.java} | 14 ++++---- .../sink/KinesisStreamsSinkWriter.java} | 27 +++++++--------- .../sink/KinesisStreamsStateSerializer.java} | 4 +-- .../kinesis/sink/PartitionKeyGenerator.java | 0 .../FixedKinesisPartitionKeyGenerator.java | 0 .../table/KinesisConnectorOptions.java | 0 .../kinesis/table/KinesisDynamicSink.java | 10 +++--- .../table/KinesisDynamicTableSinkFactory.java | 12 +++---- .../KinesisPartitionKeyGeneratorFactory.java | 0 .../RandomKinesisPartitionKeyGenerator.java | 0 ...ataFieldsKinesisPartitionKeyGenerator.java | 0 .../KinesisStreamsConnectorOptionsUtils.java} | 4 +-- .../org.apache.flink.table.factories.Factory | 0 .../src/main/resources/log4j2.properties | 0 .../TestCodeArchitectureTest.java | 0 .../sink/KinesisStreamsSinkBuilderTest.java} | 20 ++++++------ .../sink/KinesisStreamsSinkITCase.java} | 12 +++---- .../KinesisStreamsStateSerializerTest.java} | 8 ++--- .../sink/examples/SinkIntoKinesis.java | 9 +++--- .../KinesisDynamicTableSinkFactoryTest.java | 10 +++--- ...ieldsKinesisPartitionKeyGeneratorTest.java | 0 .../KinesisProducerOptionsMapperTest.java | 4 +-- .../testutils/KinesaliteContainer.java | 0 .../src/test/resources/archunit.properties | 0 .../src/test/resources/log4j2-test.properties | 0 .../src/test/resources/profile | 0 .../flink-connector-kinesis/pom.xml | 8 ++--- .../kinesis/FlinkKinesisProducer.java | 8 ++--- .../kinesis/proxy/KinesisProxyV2Factory.java | 10 +++--- .../connectors/kinesis/util/AWSUtil.java | 4 +-- .../pom.xml | 2 +- .../pom.xml | 10 +++--- .../src/main/resources/META-INF/NOTICE | 2 +- flink-connectors/pom.xml | 4 +-- .../pom.xml | 6 ++-- .../table/test/KinesisStreamsTableApiIT.java} | 9 +++--- .../src/test/resources/log4j2-test.properties | 0 .../src/test/resources/send-orders.sql | 0 .../pom.xml | 2 +- .../flink-streaming-kinesis-test/pom.xml | 2 +- flink-end-to-end-tests/pom.xml | 2 +- pom.xml | 2 +- tools/ci/stage.sh | 2 +- 62 files changed, 184 insertions(+), 191 deletions(-) rename flink-connectors/{flink-connector-aws-kinesis-data-streams/archunit-violations/7e2560a3-23eb-40cc-8669-e7943e393b88 => flink-connector-aws-kinesis-streams/archunit-violations/75596a92-3816-4a44-85ac-7c96e72f443a} (100%) create mode 100644 flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/7e2560a3-23eb-40cc-8669-e7943e393b88 rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 (86%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/archunit-violations/stored.rules (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/pom.xml (97%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java => flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsConfigConstants.java} (91%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsException.java => flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsException.java} (71%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java => flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java} (89%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java => flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java} (83%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java => flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java} (88%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java => flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java} (90%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializer.java => flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializer.java} (95%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/main/java/org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator.java (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java (96%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java (92%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisDataStreamsConnectorOptionsUtils.java => flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java} (99%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/main/resources/log4j2.properties (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java => flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java} (85%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java => flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java} (98%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializerTest.java => flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializerTest.java} (89%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java (92%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java (98%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/test/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGeneratorTest.java (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java (94%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/test/resources/archunit.properties (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/test/resources/log4j2-test.properties (100%) rename flink-connectors/{flink-connector-aws-kinesis-data-streams => flink-connector-aws-kinesis-streams}/src/test/resources/profile (100%) rename flink-connectors/{flink-sql-connector-aws-kinesis-data-streams => flink-sql-connector-aws-kinesis-streams}/pom.xml (90%) rename flink-connectors/{flink-sql-connector-aws-kinesis-data-streams => flink-sql-connector-aws-kinesis-streams}/src/main/resources/META-INF/NOTICE (97%) rename flink-end-to-end-tests/{flink-end-to-end-tests-aws-kinesis-data-streams => flink-end-to-end-tests-aws-kinesis-streams}/pom.xml (94%) rename flink-end-to-end-tests/{flink-end-to-end-tests-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisDataStreamsTableApiIT.java => flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java} (96%) rename flink-end-to-end-tests/{flink-end-to-end-tests-aws-kinesis-data-streams => flink-end-to-end-tests-aws-kinesis-streams}/src/test/resources/log4j2-test.properties (100%) rename flink-end-to-end-tests/{flink-end-to-end-tests-aws-kinesis-data-streams => flink-end-to-end-tests-aws-kinesis-streams}/src/test/resources/send-orders.sql (100%) diff --git a/docs/content.zh/docs/connectors/datastream/kinesis.md b/docs/content.zh/docs/connectors/datastream/kinesis.md index 500db81c38dc3..cc0661e77349b 100644 --- a/docs/content.zh/docs/connectors/datastream/kinesis.md +++ b/docs/content.zh/docs/connectors/datastream/kinesis.md @@ -566,9 +566,9 @@ Retry and backoff parameters can be configured using the `ConsumerConfigConstant this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured. Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys. -## Kinesis Data Streams Sink +## Kinesis Streams Sink -The Kinesis Data Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Kinesis stream. +The Kinesis Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Kinesis stream. To write data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the Amazon Kinesis Data Stream console. @@ -585,8 +585,8 @@ sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1"); sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -KinesisDataStreamsSink kdsSink = - KinesisDataStreamsSink.builder() +KinesisStreamsSink kdsSink = + KinesisStreamsSink.builder() .setKinesisClientProperties(sinkProperties) // Required .setSerializationSchema(new SimpleStringSchema()) // Required .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) // Required @@ -614,7 +614,7 @@ sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1") sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") -val kdsSink = KinesisDataStreamsSink.builder() +val kdsSink = KinesisStreamsSink.builder() .setKinesisClientProperties(sinkProperties) // Required .setSerializationSchema(new SimpleStringSchema()) // Required .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) // Required @@ -636,7 +636,7 @@ simpleStringStream.sinkTo(kdsSink) The above is a simple example of using the Kinesis sink. Begin by creating a `java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the builder. The default values for the optional configurations are shown above. Some of these values have been set as a result of [configuration on KDS](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html). -You will always need to supply a `KinesisDataStreamsSinkElementConverter` during sink creation. This is where you specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record. +You will always need to specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record. Some or all of the records in a request may fail to be persisted by Kinesis Data Streams for a number of reasons. If `failOnError` is on, then a runtime exception will be raised. Otherwise those records will be requeued in the buffer for retry. @@ -658,8 +658,8 @@ found at [Quotas and Limits](https://docs.aws.amazon.com/streams/latest/dev/serv You generally reduce backpressure by increasing the size of the internal queue: ``` -KinesisDataStreamsSink kdsSink = - KinesisDataStreamsSink.builder() +KinesisStreamsSink kdsSink = + KinesisStreamsSink.builder() ... .setMaxBufferedRequests(10_000) ... @@ -668,7 +668,7 @@ KinesisDataStreamsSink kdsSink = ## Kinesis Producer {{< hint warning >}} -The old Kinesis sink `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is deprecated and may be removed with a future release of Flink, please use [Kinesis Sink]({{}}) instead. +The old Kinesis sink `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is deprecated and may be removed with a future release of Flink, please use [Kinesis Sink]({{}}) instead. {{< /hint >}} The new sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) whereas the old sink uses the Kinesis Producer Library. Because of this, the new Kinesis sink does not support [aggregation](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation). diff --git a/docs/content.zh/docs/connectors/table/kinesis.md b/docs/content.zh/docs/connectors/table/kinesis.md index b216acda4c17b..93d765719ca17 100644 --- a/docs/content.zh/docs/connectors/table/kinesis.md +++ b/docs/content.zh/docs/connectors/table/kinesis.md @@ -637,7 +637,7 @@ Connector Options Deprecated options previously used by the legacy connector. - Options with equivalant alternatives in KinesisDataStreamsSink are matched + Options with equivalant alternatives in KinesisStreamsSink are matched to their respective properties. Unsupported options are logged out to user as warnings. @@ -809,11 +809,11 @@ Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" > # Updates in 1.15 -Kinesis table API connector sink data stream depends on FlinkKinesisProducer till 1.14, with the introduction of KinesisDataStreamsSink in 1.15 kinesis table API sink connector has been migrated to the new KinesisDataStreamsSink. Authentication options have been migrated identically while sink configuration options are now compatible with KinesisDataStreamsSink. +Kinesis table API connector sink data stream depends on FlinkKinesisProducer till 1.14, with the introduction of KinesisStreamsSink in 1.15 kinesis table API sink connector has been migrated to the new KinesisStreamsSink. Authentication options have been migrated identically while sink configuration options are now compatible with KinesisStreamsSink. -Options configuring FlinkKinesisProducer are now deprecated with fallback support for common configuration options with KinesisDataStreamsSink. +Options configuring FlinkKinesisProducer are now deprecated with fallback support for common configuration options with KinesisStreamsSink. -KinesisDataStreamsSink uses KinesisAsyncClient to send records to kinesis, +KinesisStreamsSink uses KinesisAsyncClient to send records to kinesis, which doesn't support aggregation. In consequence, table options configuring aggregation in the deprecated FlinkKinesisProducer are now deprecated and will be ignored, this includes sink.producer.aggregation-enabled and sink.producer.aggregation-count. diff --git a/docs/content/docs/connectors/datastream/kinesis.md b/docs/content/docs/connectors/datastream/kinesis.md index f91821b9e400d..fbc515de74133 100644 --- a/docs/content/docs/connectors/datastream/kinesis.md +++ b/docs/content/docs/connectors/datastream/kinesis.md @@ -45,7 +45,7 @@ To use this connector, add one or more of the following dependencies to your pro Sink - {{< artifact flink-connector-aws-kinesis-data-streams >}} + {{< artifact flink-connector-aws-kinesis-streams >}} @@ -578,9 +578,9 @@ Retry and backoff parameters can be configured using the `ConsumerConfigConstant this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured. Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys. -## Kinesis Data Streams Sink +## Kinesis Streams Sink -The Kinesis Data Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Kinesis stream. +The Kinesis Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Kinesis stream. To write data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the Amazon Kinesis Data Stream console. @@ -597,8 +597,8 @@ sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1"); sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -KinesisDataStreamsSink kdsSink = - KinesisDataStreamsSink.builder() +KinesisStreamsSink kdsSink = + KinesisStreamsSink.builder() .setKinesisClientProperties(sinkProperties) // Required .setSerializationSchema(new SimpleStringSchema()) // Required .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) // Required @@ -626,7 +626,7 @@ sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1") sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") -val kdsSink = KinesisDataStreamsSink.builder() +val kdsSink = KinesisStreamsSink.builder() .setKinesisClientProperties(sinkProperties) // Required .setSerializationSchema(new SimpleStringSchema()) // Required .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) // Required @@ -648,7 +648,7 @@ simpleStringStream.sinkTo(kdsSink) The above is a simple example of using the Kinesis sink. Begin by creating a `java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the builder. The default values for the optional configurations are shown above. Some of these values have been set as a result of [configuration on KDS](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html). -You will always need to supply a `KinesisDataStreamsSinkElementConverter` during sink creation. This is where you specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record. +You will always need to specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record. Some or all of the records in a request may fail to be persisted by Kinesis Data Streams for a number of reasons. If `failOnError` is on, then a runtime exception will be raised. Otherwise those records will be requeued in the buffer for retry. @@ -670,8 +670,8 @@ found at [Quotas and Limits](https://docs.aws.amazon.com/streams/latest/dev/serv You generally reduce backpressure by increasing the size of the internal queue: ``` -KinesisDataStreamsSink kdsSink = - KinesisDataStreamsSink.builder() +KinesisStreamsSink kdsSink = + KinesisStreamsSink.builder() ... .setMaxBufferedRequests(10_000) ... @@ -680,7 +680,7 @@ KinesisDataStreamsSink kdsSink = ## Kinesis Producer {{< hint warning >}} -The old Kinesis sink `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is deprecated and may be removed with a future release of Flink, please use [Kinesis Sink]({{}}) instead. +The old Kinesis sink `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is deprecated and may be removed with a future release of Flink, please use [Kinesis Sink]({{}}) instead. {{< /hint >}} The new sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) whereas the old sink uses the Kinesis Producer Library. Because of this, the new Kinesis sink does not support [aggregation](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation). diff --git a/docs/content/docs/connectors/table/kinesis.md b/docs/content/docs/connectors/table/kinesis.md index cce4caa29f23a..6a69b897ea761 100644 --- a/docs/content/docs/connectors/table/kinesis.md +++ b/docs/content/docs/connectors/table/kinesis.md @@ -709,7 +709,7 @@ Connector Options Deprecated options previously used by the legacy connector. - Options with equivalant alternatives in KinesisDataStreamsSink are matched + Options with equivalant alternatives in KinesisStreamsSink are matched to their respective properties. Unsupported options are logged out to user as warnings. @@ -881,11 +881,11 @@ Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" > # Updates in 1.15 -Kinesis table API connector sink data stream depends on FlinkKinesisProducer till 1.14, with the introduction of KinesisDataStreamsSink in 1.15 kinesis table API sink connector has been migrated to the new KinesisDataStreamsSink. Authentication options have been migrated identically while sink configuration options are now compatible with KinesisDataStreamsSink. +Kinesis table API connector sink data stream depends on FlinkKinesisProducer till 1.14, with the introduction of KinesisStreamsSink in 1.15 kinesis table API sink connector has been migrated to the new KinesisStreamsSink. Authentication options have been migrated identically while sink configuration options are now compatible with KinesisStreamsSink. -Options configuring FlinkKinesisProducer are now deprecated with fallback support for common configuration options with KinesisDataStreamsSink. +Options configuring FlinkKinesisProducer are now deprecated with fallback support for common configuration options with KinesisStreamsSink. -KinesisDataStreamsSink uses KinesisAsyncClient to send records to kinesis, +KinesisStreamsSink uses KinesisAsyncClient to send records to kinesis, which doesn't support aggregation. In consequence, table options configuring aggregation in the deprecated FlinkKinesisProducer are now deprecated and will be ignored, this includes sink.producer.aggregation-enabled and sink.producer.aggregation-count. diff --git a/flink-architecture-tests/flink-architecture-tests-production/pom.xml b/flink-architecture-tests/flink-architecture-tests-production/pom.xml index 751a0560ce6a1..4752e9b6f872e 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/pom.xml +++ b/flink-architecture-tests/flink-architecture-tests-production/pom.xml @@ -174,7 +174,7 @@ under the License. org.apache.flink - flink-connector-aws-kinesis-data-streams + flink-connector-aws-kinesis-streams diff --git a/flink-architecture-tests/pom.xml b/flink-architecture-tests/pom.xml index d154a508fe5b3..788b158493cfb 100644 --- a/flink-architecture-tests/pom.xml +++ b/flink-architecture-tests/pom.xml @@ -219,7 +219,7 @@ under the License. org.apache.flink - flink-connector-aws-kinesis-data-streams + flink-connector-aws-kinesis-streams ${project.version} test diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml b/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml index bf0a25a663bf9..74639990bf8d1 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/pom.xml @@ -31,7 +31,7 @@ under the License. flink-connector-aws-kinesis-firehose - Flink : Connectors : AWS Kinesis Data Firehose + Flink : Connectors : Amazon Kinesis Data Firehose 2.17.52 diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java index 5b6358bda23f0..149dbaf8ade01 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicSink.java @@ -129,8 +129,8 @@ public String asSummaryString() { /** Builder class for {@link KinesisFirehoseDynamicSink}. */ @Internal - public static class KinesisDataFirehoseDynamicSinkBuilder - extends AsyncDynamicTableSinkBuilder { + public static class KinesisFirehoseDynamicSinkBuilder + extends AsyncDynamicTableSinkBuilder { private DataType consumedDataType = null; private String deliveryStream = null; @@ -138,30 +138,29 @@ public static class KinesisDataFirehoseDynamicSinkBuilder private EncodingFormat> encodingFormat = null; private Boolean failOnError = null; - public KinesisDataFirehoseDynamicSinkBuilder setConsumedDataType( - DataType consumedDataType) { + public KinesisFirehoseDynamicSinkBuilder setConsumedDataType(DataType consumedDataType) { this.consumedDataType = consumedDataType; return this; } - public KinesisDataFirehoseDynamicSinkBuilder setDeliveryStream(String deliveryStream) { + public KinesisFirehoseDynamicSinkBuilder setDeliveryStream(String deliveryStream) { this.deliveryStream = deliveryStream; return this; } - public KinesisDataFirehoseDynamicSinkBuilder setFirehoseClientProperties( + public KinesisFirehoseDynamicSinkBuilder setFirehoseClientProperties( Properties firehoseClientProperties) { this.firehoseClientProperties = firehoseClientProperties; return this; } - public KinesisDataFirehoseDynamicSinkBuilder setEncodingFormat( + public KinesisFirehoseDynamicSinkBuilder setEncodingFormat( EncodingFormat> encodingFormat) { this.encodingFormat = encodingFormat; return this; } - public KinesisDataFirehoseDynamicSinkBuilder setFailOnError(Boolean failOnError) { + public KinesisFirehoseDynamicSinkBuilder setFailOnError(Boolean failOnError) { this.failOnError = failOnError; return this; } diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java index f9a8e9155605c..a7ca38efdce37 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactory.java @@ -45,8 +45,8 @@ public DynamicTableSink createDynamicTableSink(Context context) { AsyncDynamicSinkContext factoryContext = new AsyncDynamicSinkContext(this, context); - KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder builder = - new KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder(); + KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder builder = + new KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder(); KinesisFirehoseConnectorOptionUtils optionsUtils = new KinesisFirehoseConnectorOptionUtils( diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java index 8c458833f02aa..ac5292e01596a 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/table/KinesisFirehoseDynamicTableFactoryTest.java @@ -57,7 +57,7 @@ public void testGoodTableSink() { // Construct expected DynamicTableSink using factory under test KinesisFirehoseDynamicSink expectedSink = - new KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder() + new KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder() .setConsumedDataType(sinkSchema.toPhysicalRowDataType()) .setDeliveryStream(DELIVERY_STREAM_NAME) .setFirehoseClientProperties(defaultSinkProperties()) @@ -134,9 +134,8 @@ private TableOptionsBuilder defaultTableOptions() { .withFormatOption(TestFormatFactory.FAIL_ON_MISSING, "true"); } - private KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder - getDefaultSinkBuilder() { - return new KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder() + private KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder getDefaultSinkBuilder() { + return new KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder() .setFailOnError(true) .setMaxBatchSize(100) .setMaxInFlightRequests(100) diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/7e2560a3-23eb-40cc-8669-e7943e393b88 b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/75596a92-3816-4a44-85ac-7c96e72f443a similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/7e2560a3-23eb-40cc-8669-e7943e393b88 rename to flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/75596a92-3816-4a44-85ac-7c96e72f443a diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/7e2560a3-23eb-40cc-8669-e7943e393b88 b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/7e2560a3-23eb-40cc-8669-e7943e393b88 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 similarity index 86% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 rename to flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 index a370e4c6cfc4f..202fdf191cafc 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 +++ b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/84abeb9c-8355-4165-96aa-dda65b04e5e7 @@ -1,6 +1,6 @@ -org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkITCase does not satisfy: only one of the following predicates match:\ +org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkITCase does not satisfy: only one of the following predicates match:\ * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ - or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule \ No newline at end of file + or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/stored.rules b/flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/stored.rules similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/archunit-violations/stored.rules rename to flink-connectors/flink-connector-aws-kinesis-streams/archunit-violations/stored.rules diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml b/flink-connectors/flink-connector-aws-kinesis-streams/pom.xml similarity index 97% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml rename to flink-connectors/flink-connector-aws-kinesis-streams/pom.xml index 8e4b99b8270b9..7c864efd5e5ab 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/pom.xml +++ b/flink-connectors/flink-connector-aws-kinesis-streams/pom.xml @@ -30,8 +30,8 @@ under the License. .. - flink-connector-aws-kinesis-data-streams - Flink : Connectors : AWS Kinesis Data Streams + flink-connector-aws-kinesis-streams + Flink : Connectors : Amazon Kinesis Data Streams 2.17.52 diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsConfigConstants.java similarity index 91% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsConfigConstants.java index a5a602094d595..338e52a1aeb16 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsConfigConstants.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsConfigConstants.java @@ -19,9 +19,9 @@ import org.apache.flink.annotation.PublicEvolving; -/** Defaults for {@link KinesisDataStreamsSinkWriter}. */ +/** Defaults for {@link KinesisStreamsSinkWriter}. */ @PublicEvolving -public class KinesisDataStreamsConfigConstants { +public class KinesisStreamsConfigConstants { public static final String BASE_KINESIS_USER_AGENT_PREFIX_FORMAT = "Apache Flink %s (%s) Kinesis Connector"; diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsException.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsException.java similarity index 71% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsException.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsException.java index 3ab30e1ef09da..696cbd8619385 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsException.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsException.java @@ -21,30 +21,30 @@ * A {@link RuntimeException} wrapper indicating the exception was thrown from the Kinesis Data * Streams Sink. */ -class KinesisDataStreamsException extends RuntimeException { +class KinesisStreamsException extends RuntimeException { - public KinesisDataStreamsException(final String message) { + public KinesisStreamsException(final String message) { super(message); } - public KinesisDataStreamsException(final String message, final Throwable cause) { + public KinesisStreamsException(final String message, final Throwable cause) { super(message, cause); } /** - * When the flag {@code failOnError} is set in {@link KinesisDataStreamsSinkWriter}, this - * exception is raised as soon as any exception occurs when KDS is written to. + * When the flag {@code failOnError} is set in {@link KinesisStreamsSinkWriter}, this exception + * is raised as soon as any exception occurs when KDS is written to. */ - static class KinesisDataStreamsFailFastException extends KinesisDataStreamsException { + static class KinesisStreamsFailFastException extends KinesisStreamsException { private static final String ERROR_MESSAGE = "Encountered an exception while persisting records, not retrying due to {failOnError} being set."; - public KinesisDataStreamsFailFastException() { + public KinesisStreamsFailFastException() { super(ERROR_MESSAGE); } - public KinesisDataStreamsFailFastException(final Throwable cause) { + public KinesisStreamsFailFastException(final Throwable cause) { super(ERROR_MESSAGE, cause); } } diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java similarity index 89% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java index d8288f797c157..b6cef5a08dc8d 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSink.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSink.java @@ -59,18 +59,18 @@ * Streams, the job will fail immediately if failOnError is set * * - *

Please see the writer implementation in {@link KinesisDataStreamsSinkWriter} + *

Please see the writer implementation in {@link KinesisStreamsSinkWriter} * * @param Type of the elements handled by this sink */ @PublicEvolving -public class KinesisDataStreamsSink extends AsyncSinkBase { +public class KinesisStreamsSink extends AsyncSinkBase { private final boolean failOnError; private final String streamName; private final Properties kinesisClientProperties; - KinesisDataStreamsSink( + KinesisStreamsSink( ElementConverter elementConverter, Integer maxBatchSize, Integer maxInFlightRequests, @@ -101,21 +101,21 @@ public class KinesisDataStreamsSink extends AsyncSinkBase type of incoming records - * @return {@link KinesisDataStreamsSinkBuilder} + * @return {@link KinesisStreamsSinkBuilder} */ - public static KinesisDataStreamsSinkBuilder builder() { - return new KinesisDataStreamsSinkBuilder<>(); + public static KinesisStreamsSinkBuilder builder() { + return new KinesisStreamsSinkBuilder<>(); } @Internal @Override public StatefulSinkWriter> createWriter( InitContext context) throws IOException { - return new KinesisDataStreamsSinkWriter<>( + return new KinesisStreamsSinkWriter<>( getElementConverter(), context, getMaxBatchSize(), @@ -134,7 +134,7 @@ public StatefulSinkWriter> @Override public SimpleVersionedSerializer> getWriterStateSerializer() { - return new KinesisDataStreamsStateSerializer(); + return new KinesisStreamsStateSerializer(); } @Internal @@ -143,7 +143,7 @@ public StatefulSinkWriter> InitContext context, Collection> recoveredState) throws IOException { - return new KinesisDataStreamsSinkWriter<>( + return new KinesisStreamsSinkWriter<>( getElementConverter(), context, getMaxBatchSize(), diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java similarity index 83% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java index 20b960f4994b0..3e6f7eccb365c 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilder.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilder.java @@ -27,14 +27,14 @@ import java.util.Properties; /** - * Builder to construct {@link KinesisDataStreamsSink}. + * Builder to construct {@link KinesisStreamsSink}. * - *

The following example shows the minimum setup to create a {@link KinesisDataStreamsSink} that + *

The following example shows the minimum setup to create a {@link KinesisStreamsSink} that * writes String values to a Kinesis Data Streams stream named your_stream_here. * *

{@code
- * KinesisDataStreamsSink kdsSink =
- *                 KinesisDataStreamsSink.builder()
+ * KinesisStreamsSink kdsSink =
+ *                 KinesisStreamsSink.builder()
  *                         .setElementConverter(elementConverter)
  *                         .setStreamName("your_stream_name")
  *                         .setSerializationSchema(new SimpleStringSchema())
@@ -57,9 +57,9 @@
  * @param  type of elements that should be persisted in the destination
  */
 @PublicEvolving
-public class KinesisDataStreamsSinkBuilder
+public class KinesisStreamsSinkBuilder
         extends AsyncSinkBaseBuilder<
-                InputT, PutRecordsRequestEntry, KinesisDataStreamsSinkBuilder> {
+                InputT, PutRecordsRequestEntry, KinesisStreamsSinkBuilder> {
 
     private static final int DEFAULT_MAX_BATCH_SIZE = 500;
     private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
@@ -75,7 +75,7 @@ public class KinesisDataStreamsSinkBuilder
     private SerializationSchema serializationSchema;
     private PartitionKeyGenerator partitionKeyGenerator;
 
-    KinesisDataStreamsSinkBuilder() {}
+    KinesisStreamsSinkBuilder() {}
 
     /**
      * Sets the name of the KDS stream that the sink will connect to. There is no default for this
@@ -83,40 +83,40 @@ public class KinesisDataStreamsSinkBuilder
      * fail.
      *
      * @param streamName the name of the stream
-     * @return {@link KinesisDataStreamsSinkBuilder} itself
+     * @return {@link KinesisStreamsSinkBuilder} itself
      */
-    public KinesisDataStreamsSinkBuilder setStreamName(String streamName) {
+    public KinesisStreamsSinkBuilder setStreamName(String streamName) {
         this.streamName = streamName;
         return this;
     }
 
-    public KinesisDataStreamsSinkBuilder setSerializationSchema(
+    public KinesisStreamsSinkBuilder setSerializationSchema(
             SerializationSchema serializationSchema) {
         this.serializationSchema = serializationSchema;
         return this;
     }
 
-    public KinesisDataStreamsSinkBuilder setPartitionKeyGenerator(
+    public KinesisStreamsSinkBuilder setPartitionKeyGenerator(
             PartitionKeyGenerator partitionKeyGenerator) {
         this.partitionKeyGenerator = partitionKeyGenerator;
         return this;
     }
 
-    public KinesisDataStreamsSinkBuilder setFailOnError(boolean failOnError) {
+    public KinesisStreamsSinkBuilder setFailOnError(boolean failOnError) {
         this.failOnError = failOnError;
         return this;
     }
 
-    public KinesisDataStreamsSinkBuilder setKinesisClientProperties(
+    public KinesisStreamsSinkBuilder setKinesisClientProperties(
             Properties kinesisClientProperties) {
         this.kinesisClientProperties = kinesisClientProperties;
         return this;
     }
 
     @Override
-    public KinesisDataStreamsSink build() {
-        return new KinesisDataStreamsSink<>(
-                new KinesisDataStreamsSinkElementConverter.Builder()
+    public KinesisStreamsSink build() {
+        return new KinesisStreamsSink<>(
+                new KinesisStreamsSinkElementConverter.Builder()
                         .setSerializationSchema(serializationSchema)
                         .setPartitionKeyGenerator(partitionKeyGenerator)
                         .build(),
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
similarity index 88%
rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
index 287360393f2be..bca74421ee5fb 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkElementConverter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkElementConverter.java
@@ -32,7 +32,7 @@
  * PartitionKeyGenerator} lambda to transform the input element into a String.
  */
 @Internal
-public class KinesisDataStreamsSinkElementConverter
+public class KinesisStreamsSinkElementConverter
         implements ElementConverter {
 
     /** A serialization schema to specify how the input element should be serialized. */
@@ -43,7 +43,7 @@ public class KinesisDataStreamsSinkElementConverter
      */
     private final PartitionKeyGenerator partitionKeyGenerator;
 
-    private KinesisDataStreamsSinkElementConverter(
+    private KinesisStreamsSinkElementConverter(
             SerializationSchema serializationSchema,
             PartitionKeyGenerator partitionKeyGenerator) {
         this.serializationSchema = serializationSchema;
@@ -62,7 +62,7 @@ public static  Builder builder() {
         return new Builder<>();
     }
 
-    /** A builder for the KinesisDataStreamsSinkElementConverter. */
+    /** A builder for the KinesisStreamsSinkElementConverter. */
     public static class Builder {
 
         private SerializationSchema serializationSchema;
@@ -80,16 +80,16 @@ public Builder setPartitionKeyGenerator(
             return this;
         }
 
-        public KinesisDataStreamsSinkElementConverter build() {
+        public KinesisStreamsSinkElementConverter build() {
             Preconditions.checkNotNull(
                     serializationSchema,
                     "No SerializationSchema was supplied to the "
-                            + "KinesisDataStreamsSinkElementConverter builder.");
+                            + "KinesisStreamsSinkElementConverter builder.");
             Preconditions.checkNotNull(
                     partitionKeyGenerator,
                     "No PartitionKeyGenerator lambda was supplied to the "
-                            + "KinesisDataStreamsSinkElementConverter builder.");
-            return new KinesisDataStreamsSinkElementConverter<>(
+                            + "KinesisStreamsSinkElementConverter builder.");
+            return new KinesisStreamsSinkElementConverter<>(
                     serializationSchema, partitionKeyGenerator);
         }
     }
diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
similarity index 90%
rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
index c04504167798c..c5fc21285be19 100644
--- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
+++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
@@ -50,23 +50,22 @@
 import static org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier;
 
 /**
- * Sink writer created by {@link KinesisDataStreamsSink} to write to Kinesis Data Streams. More
- * details on the operation of this sink writer may be found in the doc for {@link
- * KinesisDataStreamsSink}. More details on the internals of this sink writer may be found in {@link
- * AsyncSinkWriter}.
+ * Sink writer created by {@link KinesisStreamsSink} to write to Kinesis Data Streams. More details
+ * on the operation of this sink writer may be found in the doc for {@link KinesisStreamsSink}. More
+ * details on the internals of this sink writer may be found in {@link AsyncSinkWriter}.
  *
  * 

The {@link KinesisAsyncClient} used here may be configured in the standard way for the AWS SDK * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} and {@code * AWS_SECRET_ACCESS_KEY} through environment variables etc. */ -class KinesisDataStreamsSinkWriter extends AsyncSinkWriter { - private static final Logger LOG = LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class); +class KinesisStreamsSinkWriter extends AsyncSinkWriter { + private static final Logger LOG = LoggerFactory.getLogger(KinesisStreamsSinkWriter.class); private static final FatalExceptionClassifier RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER = FatalExceptionClassifier.withRootCauseOfType( ResourceNotFoundException.class, err -> - new KinesisDataStreamsException( + new KinesisStreamsException( "Encountered non-recoverable exception relating to not being able to find the specified resources", err)); @@ -95,7 +94,7 @@ class KinesisDataStreamsSinkWriter extends AsyncSinkWriter elementConverter, Sink.InitContext context, int maxBatchSize, @@ -122,7 +121,7 @@ class KinesisDataStreamsSinkWriter extends AsyncSinkWriter elementConverter, Sink.InitContext context, int maxBatchSize, @@ -161,8 +160,8 @@ private KinesisAsyncClient buildClient( kinesisClientProperties, httpClient, KinesisAsyncClient.builder(), - KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, - KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX); + KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, + KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX); } @Override @@ -220,7 +219,7 @@ private void handlePartiallyFailedRequest( if (failOnError) { getFatalExceptionCons() - .accept(new KinesisDataStreamsException.KinesisDataStreamsFailFastException()); + .accept(new KinesisStreamsException.KinesisStreamsFailFastException()); return; } List failedRequestEntries = @@ -243,9 +242,7 @@ private boolean isRetryable(Throwable err) { } if (failOnError) { getFatalExceptionCons() - .accept( - new KinesisDataStreamsException.KinesisDataStreamsFailFastException( - err)); + .accept(new KinesisStreamsException.KinesisStreamsFailFastException(err)); return false; } diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializer.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializer.java similarity index 95% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializer.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializer.java index f1986efa0ac51..ad1bd8fa2f727 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializer.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializer.java @@ -31,7 +31,7 @@ /** Kinesis Streams implementation {@link AsyncSinkWriterStateSerializer}. */ @Internal -public class KinesisDataStreamsStateSerializer +public class KinesisStreamsStateSerializer extends AsyncSinkWriterStateSerializer { @Override protected void serializeRequestToStream(PutRecordsRequestEntry request, DataOutputStream out) @@ -51,7 +51,7 @@ protected void validateExplicitHashKey(PutRecordsRequestEntry request) { if (request.explicitHashKey() != null) { throw new IllegalStateException( String.format( - "KinesisDataStreamsStateSerializer is incompatible with ElementConverter." + "KinesisStreamsStateSerializer is incompatible with ElementConverter." + "Serializer version %d does not support explicit hash key.", getVersion())); } diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator.java similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/PartitionKeyGenerator.java diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/FixedKinesisPartitionKeyGenerator.java diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisConnectorOptions.java diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java similarity index 96% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java index 361db54d35371..c5ae78bd090ec 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicSink.java @@ -22,8 +22,8 @@ import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink; import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder; -import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink; -import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSinkBuilder; +import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink; +import org.apache.flink.connector.kinesis.sink.KinesisStreamsSinkBuilder; import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.format.EncodingFormat; @@ -105,8 +105,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { SerializationSchema serializationSchema = encodingFormat.createRuntimeEncoder(context, consumedDataType); - KinesisDataStreamsSinkBuilder builder = - KinesisDataStreamsSink.builder() + KinesisStreamsSinkBuilder builder = + KinesisStreamsSink.builder() .setSerializationSchema(serializationSchema) .setPartitionKeyGenerator(partitioner) .setKinesisClientProperties(kinesisClientProperties) @@ -114,7 +114,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { Optional.ofNullable(failOnError).ifPresent(builder::setFailOnError); addAsyncOptionsToSinkBuilder(builder); - KinesisDataStreamsSink kdsSink = builder.build(); + KinesisStreamsSink kdsSink = builder.build(); return SinkV2Provider.of(kdsSink); } diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java similarity index 92% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java index 27a8501b13083..5aa0931d6163f 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactory.java @@ -23,7 +23,7 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.table.AsyncDynamicTableSinkFactory; import org.apache.flink.connector.kinesis.sink.PartitionKeyGenerator; -import org.apache.flink.connector.kinesis.table.util.KinesisDataStreamsConnectorOptionsUtils; +import org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.data.RowData; @@ -39,7 +39,7 @@ import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_PARTITIONER; import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.SINK_PARTITIONER_FIELD_DELIMITER; import static org.apache.flink.connector.kinesis.table.KinesisConnectorOptions.STREAM; -import static org.apache.flink.connector.kinesis.table.util.KinesisDataStreamsConnectorOptionsUtils.KINESIS_CLIENT_PROPERTIES_KEY; +import static org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.KINESIS_CLIENT_PROPERTIES_KEY; import static org.apache.flink.table.factories.FactoryUtil.FORMAT; /** Factory for creating {@link KinesisDynamicSink}. */ @@ -52,8 +52,8 @@ public DynamicTableSink createDynamicTableSink(Context context) { AsyncDynamicSinkContext factoryContext = new AsyncDynamicSinkContext(this, context); - KinesisDataStreamsConnectorOptionsUtils optionsUtils = - new KinesisDataStreamsConnectorOptionsUtils( + KinesisStreamsConnectorOptionsUtils optionsUtils = + new KinesisStreamsConnectorOptionsUtils( factoryContext.getResolvedOptions(), factoryContext.getTableOptions(), (RowType) factoryContext.getPhysicalDataType().getLogicalType(), @@ -105,8 +105,8 @@ public Set> optionalOptions() { options.add(SINK_PARTITIONER); options.add(SINK_PARTITIONER_FIELD_DELIMITER); options.add(SINK_FAIL_ON_ERROR); - return KinesisDataStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper - .addDeprecatedKeys(options); + return KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper.addDeprecatedKeys( + options); } private static void validateKinesisPartitioner( diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/KinesisPartitionKeyGeneratorFactory.java diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/RandomKinesisPartitionKeyGenerator.java diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGenerator.java diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisDataStreamsConnectorOptionsUtils.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java similarity index 99% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisDataStreamsConnectorOptionsUtils.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java index 4981236b04211..4b30fe0524c57 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisDataStreamsConnectorOptionsUtils.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/table/util/KinesisStreamsConnectorOptionsUtils.java @@ -58,7 +58,7 @@ * for handling each specified set of options. */ @Internal -public class KinesisDataStreamsConnectorOptionsUtils { +public class KinesisStreamsConnectorOptionsUtils { /** Key for accessing kinesisAsyncClient properties. */ public static final String KINESIS_CLIENT_PROPERTIES_KEY = "sink.client.properties"; @@ -79,7 +79,7 @@ public class KinesisDataStreamsConnectorOptionsUtils { KinesisProducerOptionsMapper.KINESIS_PRODUCER_PREFIX }; - public KinesisDataStreamsConnectorOptionsUtils( + public KinesisStreamsConnectorOptionsUtils( Map options, ReadableConfig tableOptions, RowType physicalType, diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/resources/log4j2.properties b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/resources/log4j2.properties similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/resources/log4j2.properties rename to flink-connectors/flink-connector-aws-kinesis-streams/src/main/resources/log4j2.properties diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/architecture/TestCodeArchitectureTest.java diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java similarity index 85% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java index c886fd747049d..a6827621e2bc0 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkBuilderTest.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkBuilderTest.java @@ -23,8 +23,8 @@ import org.assertj.core.api.Assertions; import org.junit.Test; -/** Covers construction, defaults and sanity checking of KinesisDataStreamsSinkBuilder. */ -public class KinesisDataStreamsSinkBuilderTest { +/** Covers construction, defaults and sanity checking of KinesisStreamsSinkBuilder. */ +public class KinesisStreamsSinkBuilderTest { private static final SerializationSchema SERIALIZATION_SCHEMA = new SimpleStringSchema(); private static final PartitionKeyGenerator PARTITION_KEY_GENERATOR = @@ -33,9 +33,9 @@ public class KinesisDataStreamsSinkBuilderTest { @Test public void elementConverterOfSinkMustBeSetWhenBuilt() { Assertions.assertThatExceptionOfType(NullPointerException.class) - .isThrownBy(() -> KinesisDataStreamsSink.builder().setStreamName("stream").build()) + .isThrownBy(() -> KinesisStreamsSink.builder().setStreamName("stream").build()) .withMessageContaining( - "No SerializationSchema was supplied to the KinesisDataStreamsSinkElementConverter builder."); + "No SerializationSchema was supplied to the KinesisStreamsSinkElementConverter builder."); } @Test @@ -43,7 +43,7 @@ public void streamNameOfSinkMustBeSetWhenBuilt() { Assertions.assertThatExceptionOfType(NullPointerException.class) .isThrownBy( () -> - KinesisDataStreamsSink.builder() + KinesisStreamsSink.builder() .setPartitionKeyGenerator(PARTITION_KEY_GENERATOR) .setSerializationSchema(SERIALIZATION_SCHEMA) .build()) @@ -56,7 +56,7 @@ public void streamNameOfSinkMustBeSetToNonEmptyWhenBuilt() { Assertions.assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy( () -> - KinesisDataStreamsSink.builder() + KinesisStreamsSink.builder() .setStreamName("") .setPartitionKeyGenerator(PARTITION_KEY_GENERATOR) .setSerializationSchema(SERIALIZATION_SCHEMA) @@ -70,12 +70,12 @@ public void serializationSchemaMustBeSetWhenSinkIsBuilt() { Assertions.assertThatExceptionOfType(NullPointerException.class) .isThrownBy( () -> - KinesisDataStreamsSink.builder() + KinesisStreamsSink.builder() .setStreamName("stream") .setPartitionKeyGenerator(PARTITION_KEY_GENERATOR) .build()) .withMessageContaining( - "No SerializationSchema was supplied to the KinesisDataStreamsSinkElementConverter builder."); + "No SerializationSchema was supplied to the KinesisStreamsSinkElementConverter builder."); } @Test @@ -83,11 +83,11 @@ public void partitionKeyGeneratorMustBeSetWhenSinkIsBuilt() { Assertions.assertThatExceptionOfType(NullPointerException.class) .isThrownBy( () -> - KinesisDataStreamsSink.builder() + KinesisStreamsSink.builder() .setStreamName("stream") .setSerializationSchema(SERIALIZATION_SCHEMA) .build()) .withMessageContaining( - "No PartitionKeyGenerator lambda was supplied to the KinesisDataStreamsSinkElementConverter builder."); + "No PartitionKeyGenerator lambda was supplied to the KinesisStreamsSinkElementConverter builder."); } } diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java similarity index 98% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java index 3e23e871fdd30..d93d899e6c424 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkITCase.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkITCase.java @@ -63,7 +63,7 @@ import static org.apache.flink.connector.aws.config.AWSConfigConstants.TRUST_ALL_CERTIFICATES; /** IT cases for using Kinesis Data Streams Sink based on Kinesalite. */ -public class KinesisDataStreamsSinkITCase extends TestLogger { +public class KinesisStreamsSinkITCase extends TestLogger { private static final String DEFAULT_FIRST_SHARD_NAME = "shardId-000000000000"; @@ -409,10 +409,10 @@ private class Scenario { private String kinesaliteStreamName = null; private String sinkConnectionStreamName; private SerializationSchema serializationSchema = - KinesisDataStreamsSinkITCase.this.serializationSchema; + KinesisStreamsSinkITCase.this.serializationSchema; private PartitionKeyGenerator partitionKeyGenerator = - KinesisDataStreamsSinkITCase.this.partitionKeyGenerator; - private Properties properties = KinesisDataStreamsSinkITCase.this.getDefaultProperties(); + KinesisStreamsSinkITCase.this.partitionKeyGenerator; + private Properties properties = KinesisStreamsSinkITCase.this.getDefaultProperties(); public void runScenario() throws Exception { if (kinesaliteStreamName != null) { @@ -430,8 +430,8 @@ public void runScenario() throws Exception { (long) numberOfElementsToSend)) .returns(String.class); - KinesisDataStreamsSink kdsSink = - KinesisDataStreamsSink.builder() + KinesisStreamsSink kdsSink = + KinesisStreamsSink.builder() .setSerializationSchema(serializationSchema) .setPartitionKeyGenerator(partitionKeyGenerator) .setMaxTimeInBufferMS(bufferMaxTimeMS) diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializerTest.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializerTest.java similarity index 89% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializerTest.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializerTest.java index 01a94156720ba..ce1bde5cd15c8 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsStateSerializerTest.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializerTest.java @@ -30,11 +30,11 @@ import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.assertThatBufferStatesAreEqual; import static org.apache.flink.connector.base.sink.writer.AsyncSinkWriterTestUtils.getTestState; -/** Test class for {@link KinesisDataStreamsStateSerializer}. */ -public class KinesisDataStreamsStateSerializerTest { +/** Test class for {@link KinesisStreamsStateSerializer}. */ +public class KinesisStreamsStateSerializerTest { private static final ElementConverter ELEMENT_CONVERTER = - KinesisDataStreamsSinkElementConverter.builder() + KinesisStreamsSinkElementConverter.builder() .setSerializationSchema(new SimpleStringSchema()) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); @@ -44,7 +44,7 @@ public void testSerializeAndDeserialize() throws IOException { BufferedRequestState expectedState = getTestState(ELEMENT_CONVERTER, this::getRequestSize); - KinesisDataStreamsStateSerializer serializer = new KinesisDataStreamsStateSerializer(); + KinesisStreamsStateSerializer serializer = new KinesisStreamsStateSerializer(); BufferedRequestState actualState = serializer.deserialize(1, serializer.serialize(expectedState)); assertThatBufferStatesAreEqual(actualState, expectedState); diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java similarity index 92% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java index e17afa2b69a9d..362e25fda777a 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/sink/examples/SinkIntoKinesis.java @@ -19,7 +19,7 @@ import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.aws.config.AWSConfigConstants; -import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink; +import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -31,8 +31,7 @@ import java.util.Properties; /** - * An example application demonstrating how to use the {@link KinesisDataStreamsSink} to sink into - * KDS. + * An example application demonstrating how to use the {@link KinesisStreamsSink} to sink into KDS. * *

The {@link KinesisAsyncClient} used here may be configured in the standard way for the AWS SDK * 2.x. e.g. the provision of {@code AWS_ACCESS_KEY_ID} and {@code AWS_SECRET_ACCESS_KEY} through @@ -54,8 +53,8 @@ public static void main(String[] args) throws Exception { Properties sinkProperties = new Properties(); sinkProperties.put(AWSConfigConstants.AWS_REGION, "your-region-here"); - KinesisDataStreamsSink kdsSink = - KinesisDataStreamsSink.builder() + KinesisStreamsSink kdsSink = + KinesisStreamsSink.builder() .setSerializationSchema(new SimpleStringSchema()) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .setStreamName("your-stream-name") diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java similarity index 98% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java index dfe829e60a241..e50acdbfde6e2 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/KinesisDynamicTableSinkFactoryTest.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.kinesis.table; import org.apache.flink.api.connector.sink2.Sink; -import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink; +import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; @@ -85,7 +85,7 @@ public void testGoodTableSinkForPartitionedTable() { DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider = actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); Sink sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink(); - Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class); + Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class); } @Test @@ -142,7 +142,7 @@ public void testGoodTableSinkForNonPartitionedTable() { DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider = actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); Sink sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink(); - Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class); + Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class); } @Test @@ -171,7 +171,7 @@ public void testGoodTableSinkForNonPartitionedTableWithSinkOptions() { DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider = actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); Sink sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink(); - Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class); + Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class); } @Test @@ -205,7 +205,7 @@ public void testGoodTableSinkForNonPartitionedTableWithProducerOptions() { DynamicTableSink.SinkRuntimeProvider sinkFunctionProvider = actualSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false)); Sink sinkFunction = ((SinkV2Provider) sinkFunctionProvider).createSink(); - Assertions.assertThat(sinkFunction).isInstanceOf(KinesisDataStreamsSink.class); + Assertions.assertThat(sinkFunction).isInstanceOf(KinesisStreamsSink.class); } @Test diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGeneratorTest.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGeneratorTest.java similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGeneratorTest.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/RowDataFieldsKinesisPartitionKeyGeneratorTest.java diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java similarity index 94% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java index 4ed5a5b42d8cb..f4a295b33bf26 100644 --- a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/util/KinesisProducerOptionsMapperTest.java @@ -19,7 +19,7 @@ package org.apache.flink.connector.kinesis.table.util; import org.apache.flink.connector.aws.config.AWSConfigConstants; -import org.apache.flink.connector.kinesis.table.util.KinesisDataStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper; +import org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper; import org.apache.flink.util.TestLogger; import org.assertj.core.api.Assertions; @@ -39,7 +39,7 @@ public void testProducerVerifyCertificateOptionsMapping() { expectedOptions.put(AWSConfigConstants.TRUST_ALL_CERTIFICATES, "true"); KinesisProducerOptionsMapper producerOptionsMapper = - new KinesisDataStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper( + new KinesisStreamsConnectorOptionsUtils.KinesisProducerOptionsMapper( deprecatedOptions); Map actualMappedProperties = producerOptionsMapper.mapDeprecatedClientOptions(); diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java rename to flink-connectors/flink-connector-aws-kinesis-streams/src/test/java/org/apache/flink/connectors/kinesis/testutils/KinesaliteContainer.java diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/archunit.properties b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/archunit.properties similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/archunit.properties rename to flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/archunit.properties diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/log4j2-test.properties similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties rename to flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/log4j2-test.properties diff --git a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/profile b/flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/profile similarity index 100% rename from flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/profile rename to flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/profile diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml index 0bb26bc782f91..e9cb2a5441e5d 100644 --- a/flink-connectors/flink-connector-kinesis/pom.xml +++ b/flink-connectors/flink-connector-kinesis/pom.xml @@ -142,7 +142,7 @@ under the License. org.apache.flink - flink-connector-aws-kinesis-data-streams + flink-connector-aws-kinesis-streams ${project.version} @@ -206,7 +206,7 @@ under the License. org.apache.flink - flink-connector-aws-kinesis-data-streams + flink-connector-aws-kinesis-streams ${project.version} test-jar test @@ -324,7 +324,7 @@ under the License. org.apache.flink:flink-connector-aws-base:* - org.apache.flink:flink-connector-aws-kinesis-data-streams:* + org.apache.flink:flink-connector-aws-kinesis-streams:* com.amazonaws:* com.google.protobuf:* org.apache.httpcomponents:* @@ -396,7 +396,7 @@ under the License. - org.apache.flink:flink-connector-aws-kinesis-data-streams:* + org.apache.flink:flink-connector-aws-kinesis-streams:* profile diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java index f3e0bd881eca0..338cd2877118b 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kinesis.sink.KinesisStreamsSink; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -62,10 +63,9 @@ * * @param Data type to produce into Kinesis Streams * @deprecated This producer based on the Kinesis Producer Library KPL has been superseded. The new - * sink can be found in the module {@code - * flink-connectors/flink-connector-aws-kinesis-data-streams} and package {@link - * org.apache.flink.connector.kinesis.sink.KinesisDataStreamsSink}. It is based on the AWS SDK - * for Java 2.x. The work to replace this sink was carried out in FLINK-24227. + * sink can be found in the module {@code flink-connectors/flink-connector-aws-kinesis-streams} + * and package {@link KinesisStreamsSink}. It is based on the AWS SDK for Java 2.x. The work to + * replace this sink was carried out in FLINK-24227. */ @Deprecated @PublicEvolving diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java index 62f25db3ba1f2..f8cf637151949 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyV2Factory.java @@ -20,7 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil; import org.apache.flink.connector.aws.util.AWSGeneralUtil; -import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsConfigConstants; +import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants; import org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisherConfiguration; import org.apache.flink.streaming.connectors.kinesis.util.AwsV2Util; import org.apache.flink.util.Preconditions; @@ -63,17 +63,17 @@ public static KinesisProxyV2Interface createKinesisProxyV2(final Properties conf Properties legacyConfigProps = new Properties(configProps); legacyConfigProps.setProperty( - KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX, + KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX, AWSAsyncSinkUtil.formatFlinkUserAgentPrefix( - KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT)); + KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT)); final KinesisAsyncClient client = AWSAsyncSinkUtil.createAwsAsyncClient( legacyConfigProps, httpClient, KinesisAsyncClient.builder(), - KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, - KinesisDataStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX); + KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT, + KinesisStreamsConfigConstants.KINESIS_CLIENT_USER_AGENT_PREFIX); return new KinesisProxyV2(client, httpClient, configuration, BACKOFF); } diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java index 684234c50e0bc..c79e8df5fc898 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java @@ -20,7 +20,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.connector.aws.config.AWSConfigConstants.CredentialProvider; import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil; -import org.apache.flink.connector.kinesis.sink.KinesisDataStreamsConfigConstants; +import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants; import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants; import org.apache.flink.streaming.connectors.kinesis.model.SentinelSequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; @@ -84,7 +84,7 @@ public static AmazonKinesis createKinesisClient( // set a Flink-specific user agent awsClientConfig.setUserAgentPrefix( AWSAsyncSinkUtil.formatFlinkUserAgentPrefix( - KinesisDataStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT)); + KinesisStreamsConfigConstants.BASE_KINESIS_USER_AGENT_PREFIX_FORMAT)); // utilize automatic refreshment of credentials by directly passing the // AWSCredentialsProvider diff --git a/flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml b/flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml index 69b932a55a443..8b4a2df0d4db2 100644 --- a/flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml +++ b/flink-connectors/flink-sql-connector-aws-kinesis-firehose/pom.xml @@ -29,7 +29,7 @@ 4.0.0 flink-sql-connector-aws-kinesis-firehose - Flink : Connectors : SQL : AWS Kinesis Data Firehose + Flink : Connectors : SQL : Amazon Kinesis Data Firehose diff --git a/flink-connectors/flink-sql-connector-aws-kinesis-data-streams/pom.xml b/flink-connectors/flink-sql-connector-aws-kinesis-streams/pom.xml similarity index 90% rename from flink-connectors/flink-sql-connector-aws-kinesis-data-streams/pom.xml rename to flink-connectors/flink-sql-connector-aws-kinesis-streams/pom.xml index c3bbc8e8e24eb..0804a3a48f0be 100644 --- a/flink-connectors/flink-sql-connector-aws-kinesis-data-streams/pom.xml +++ b/flink-connectors/flink-sql-connector-aws-kinesis-streams/pom.xml @@ -28,13 +28,13 @@ 4.0.0 - flink-sql-connector-aws-kinesis-data-streams - Flink : Connectors : SQL : AWS Kinesis Data Streams + flink-sql-connector-aws-kinesis-streams + Flink : Connectors : SQL : Amazon Kinesis Data Streams org.apache.flink - flink-connector-aws-kinesis-data-streams + flink-connector-aws-kinesis-streams ${project.version} @@ -57,7 +57,7 @@ org.apache.flink:flink-connector-base org.apache.flink:flink-connector-aws-base - org.apache.flink:flink-connector-aws-kinesis-data-streams + org.apache.flink:flink-connector-aws-kinesis-streams software.amazon.awssdk:* org.reactivestreams:* com.typesafe.netty:* @@ -90,7 +90,7 @@ - org.apache.flink:flink-connector-aws-kinesis-data-streams:* + org.apache.flink:flink-connector-aws-kinesis-streams:* profile diff --git a/flink-connectors/flink-sql-connector-aws-kinesis-data-streams/src/main/resources/META-INF/NOTICE b/flink-connectors/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE similarity index 97% rename from flink-connectors/flink-sql-connector-aws-kinesis-data-streams/src/main/resources/META-INF/NOTICE rename to flink-connectors/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE index 51d9fa06abeb0..8b334b044573a 100644 --- a/flink-connectors/flink-sql-connector-aws-kinesis-data-streams/src/main/resources/META-INF/NOTICE +++ b/flink-connectors/flink-sql-connector-aws-kinesis-streams/src/main/resources/META-INF/NOTICE @@ -1,4 +1,4 @@ -flink-sql-connector-aws-kinesis-data-streams +flink-sql-connector-aws-kinesis-streams Copyright 2014-2021 The Apache Software Foundation diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 0eb18439d0da6..a1229f38cc1b1 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -53,7 +53,7 @@ under the License. flink-connector-gcp-pubsub flink-connector-aws-base flink-connector-kinesis - flink-connector-aws-kinesis-data-streams + flink-connector-aws-kinesis-streams flink-connector-aws-kinesis-firehose flink-connector-base flink-file-sink-common @@ -104,7 +104,7 @@ under the License. flink-sql-connector-hive-2.3.6 flink-sql-connector-hive-3.1.2 flink-sql-connector-kafka - flink-sql-connector-aws-kinesis-data-streams + flink-sql-connector-aws-kinesis-streams flink-sql-connector-aws-kinesis-firehose flink-sql-connector-kinesis flink-sql-connector-pulsar diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/pom.xml similarity index 94% rename from flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/pom.xml rename to flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/pom.xml index f6968783f000c..423afd529b88f 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/pom.xml +++ b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/pom.xml @@ -28,7 +28,7 @@ 4.0.0 - flink-end-to-end-tests-aws-kinesis-data-streams + flink-end-to-end-tests-aws-kinesis-streams Flink : E2E Tests : Kinesis SQL tests jar @@ -49,7 +49,7 @@ org.apache.flink - flink-connector-aws-kinesis-data-streams + flink-connector-aws-kinesis-streams ${project.version} test test-jar @@ -85,7 +85,7 @@ org.apache.flink - flink-sql-connector-aws-kinesis-data-streams + flink-sql-connector-aws-kinesis-streams ${project.version} sql-kinesis-streams.jar jar diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisDataStreamsTableApiIT.java b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java similarity index 96% rename from flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisDataStreamsTableApiIT.java rename to flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java index 2c9c2f728720f..fcb3b5b0b6b4e 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisDataStreamsTableApiIT.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/java/org/apache/flink/connector/kinesis/table/test/KinesisStreamsTableApiIT.java @@ -68,11 +68,10 @@ import static java.util.concurrent.TimeUnit.SECONDS; -/** End-to-end test for Kinesis DataStream Table API Sink using Kinesalite. */ -public class KinesisDataStreamsTableApiIT { +/** End-to-end test for Kinesis Streams Table API Sink using Kinesalite. */ +public class KinesisStreamsTableApiIT { - private static final Logger LOGGER = - LoggerFactory.getLogger(KinesisDataStreamsTableApiIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(KinesisStreamsTableApiIT.class); private static final String ORDERS_STREAM = "orders"; private static final String INTER_CONTAINER_KINESALITE_ALIAS = "kinesalite"; @@ -214,7 +213,7 @@ private List readMessagesFromStream(Function deserialiser) thr GetShardIteratorRequest.builder() .shardId(DEFAULT_FIRST_SHARD_NAME) .shardIteratorType(ShardIteratorType.TRIM_HORIZON) - .streamName(KinesisDataStreamsTableApiIT.ORDERS_STREAM) + .streamName(KinesisStreamsTableApiIT.ORDERS_STREAM) .build()) .get() .shardIterator(); diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/resources/log4j2-test.properties similarity index 100% rename from flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/resources/log4j2-test.properties rename to flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/resources/log4j2-test.properties diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/resources/send-orders.sql b/flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/resources/send-orders.sql similarity index 100% rename from flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-data-streams/src/test/resources/send-orders.sql rename to flink-end-to-end-tests/flink-end-to-end-tests-aws-kinesis-streams/src/test/resources/send-orders.sql diff --git a/flink-end-to-end-tests/flink-glue-schema-registry-json-test/pom.xml b/flink-end-to-end-tests/flink-glue-schema-registry-json-test/pom.xml index 81f01e0db9648..19121084be082 100644 --- a/flink-end-to-end-tests/flink-glue-schema-registry-json-test/pom.xml +++ b/flink-end-to-end-tests/flink-glue-schema-registry-json-test/pom.xml @@ -114,7 +114,7 @@ under the License. org.apache.flink - flink-connector-aws-kinesis-data-streams + flink-connector-aws-kinesis-streams ${project.version} test-jar diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml index d5db55600dd5c..fafa40790462e 100644 --- a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml @@ -83,7 +83,7 @@ under the License. org.apache.flink - flink-connector-aws-kinesis-data-streams + flink-connector-aws-kinesis-streams ${project.version} test-jar diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index c39f79803fa92..6a28b30d7240f 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -80,7 +80,7 @@ under the License. flink-glue-schema-registry-avro-test flink-glue-schema-registry-json-test flink-end-to-end-tests-scala - flink-end-to-end-tests-aws-kinesis-data-streams + flink-end-to-end-tests-aws-kinesis-streams flink-end-to-end-tests-aws-kinesis-firehose diff --git a/pom.xml b/pom.xml index d471a8ed48ef2..ab7dead796325 100644 --- a/pom.xml +++ b/pom.xml @@ -1481,7 +1481,7 @@ under the License. flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/query/* flink-connectors/flink-connector-aws-base/src/test/resources/profile flink-connectors/flink-connector-kinesis/src/test/resources/profile - flink-connectors/flink-connector-aws-kinesis-data-streams/src/test/resources/profile + flink-connectors/flink-connector-aws-kinesis-streams/src/test/resources/profile flink-table/flink-table-code-splitter/src/test/resources/** flink-connectors/flink-connector-pulsar/src/test/resources/** diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh index 9ad4ad15e7c22..78562d8dfff9e 100755 --- a/tools/ci/stage.sh +++ b/tools/ci/stage.sh @@ -107,7 +107,7 @@ flink-connectors/flink-connector-elasticsearch-base,\ flink-connectors/flink-connector-nifi,\ flink-connectors/flink-connector-rabbitmq,\ flink-connectors/flink-connector-kinesis,\ -flink-connectors/flink-connector-aws-kinesis-data-streams,\ +flink-connectors/flink-connector-aws-kinesis-streams,\ flink-connectors/flink-connector-aws-kinesis-firehose,\ flink-metrics/flink-metrics-dropwizard,\ flink-metrics/flink-metrics-graphite,\