Skip to content

Commit

Permalink
[FLINK-26373][connector/kinesis] Rename KinesisDataStreams module and…
Browse files Browse the repository at this point in the history
… classes into KinesisStreams
  • Loading branch information
vahmed-hamdy authored and dannycranmer committed Mar 1, 2022
1 parent dcd2156 commit 3f06f97
Show file tree
Hide file tree
Showing 62 changed files with 184 additions and 191 deletions.
18 changes: 9 additions & 9 deletions docs/content.zh/docs/connectors/datastream/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,9 @@ Retry and backoff parameters can be configured using the `ConsumerConfigConstant
this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured.
Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys.

## Kinesis Data Streams Sink
## Kinesis Streams Sink

The Kinesis Data Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Kinesis stream.
The Kinesis Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Kinesis stream.

To write data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the Amazon Kinesis Data Stream console.

Expand All @@ -585,8 +585,8 @@ sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1");
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");

KinesisDataStreamsSink<String> kdsSink =
KinesisDataStreamsSink.<String>builder()
KinesisStreamsSink<String> kdsSink =
KinesisStreamsSink.<String>builder()
.setKinesisClientProperties(sinkProperties) // Required
.setSerializationSchema(new SimpleStringSchema()) // Required
.setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) // Required
Expand Down Expand Up @@ -614,7 +614,7 @@ sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1")
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")

val kdsSink = KinesisDataStreamsSink.<String>builder()
val kdsSink = KinesisStreamsSink.<String>builder()
.setKinesisClientProperties(sinkProperties) // Required
.setSerializationSchema(new SimpleStringSchema()) // Required
.setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) // Required
Expand All @@ -636,7 +636,7 @@ simpleStringStream.sinkTo(kdsSink)

The above is a simple example of using the Kinesis sink. Begin by creating a `java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the builder. The default values for the optional configurations are shown above. Some of these values have been set as a result of [configuration on KDS](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).

You will always need to supply a `KinesisDataStreamsSinkElementConverter` during sink creation. This is where you specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record.
You will always need to specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record.

Some or all of the records in a request may fail to be persisted by Kinesis Data Streams for a number of reasons. If `failOnError` is on, then a runtime exception will be raised. Otherwise those records will be requeued in the buffer for retry.

Expand All @@ -658,8 +658,8 @@ found at [Quotas and Limits](https://docs.aws.amazon.com/streams/latest/dev/serv

You generally reduce backpressure by increasing the size of the internal queue:
```
KinesisDataStreamsSink<String> kdsSink =
KinesisDataStreamsSink.<String>builder()
KinesisStreamsSink<String> kdsSink =
KinesisStreamsSink.<String>builder()
...
.setMaxBufferedRequests(10_000)
...
Expand All @@ -668,7 +668,7 @@ KinesisDataStreamsSink<String> kdsSink =
## Kinesis Producer

{{< hint warning >}}
The old Kinesis sink `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is deprecated and may be removed with a future release of Flink, please use [Kinesis Sink]({{<ref "docs/connectors/datastream/kinesis#kinesis-data-streams-sink">}}) instead.
The old Kinesis sink `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is deprecated and may be removed with a future release of Flink, please use [Kinesis Sink]({{<ref "docs/connectors/datastream/kinesis#kinesis-streams-sink">}}) instead.
{{< /hint >}}

The new sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) whereas the old sink uses the Kinesis Producer Library. Because of this, the new Kinesis sink does not support [aggregation](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation).
Expand Down
8 changes: 4 additions & 4 deletions docs/content.zh/docs/connectors/table/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ Connector Options
<td></td>
<td>
Deprecated options previously used by the legacy connector.
Options with equivalant alternatives in <code>KinesisDataStreamsSink</code> are matched
Options with equivalant alternatives in <code>KinesisStreamsSink</code> are matched
to their respective properties. Unsupported options are logged out to user as warnings.
</td>
</tr>
Expand Down Expand Up @@ -809,11 +809,11 @@ Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" >

# Updates in 1.15

Kinesis table API connector sink data stream depends on <code>FlinkKinesisProducer</code> till 1.14, with the introduction of <code>KinesisDataStreamsSink</code> in 1.15 kinesis table API sink connector has been migrated to the new <code>KinesisDataStreamsSink</code>. Authentication options have been migrated identically while sink configuration options are now compatible with <code>KinesisDataStreamsSink</code>.
Kinesis table API connector sink data stream depends on <code>FlinkKinesisProducer</code> till 1.14, with the introduction of <code>KinesisStreamsSink</code> in 1.15 kinesis table API sink connector has been migrated to the new <code>KinesisStreamsSink</code>. Authentication options have been migrated identically while sink configuration options are now compatible with <code>KinesisStreamsSink</code>.

Options configuring <code>FlinkKinesisProducer</code> are now deprecated with fallback support for common configuration options with <code>KinesisDataStreamsSink</code>.
Options configuring <code>FlinkKinesisProducer</code> are now deprecated with fallback support for common configuration options with <code>KinesisStreamsSink</code>.

<code>KinesisDataStreamsSink</code> uses <code>KinesisAsyncClient</code> to send records to kinesis,
<code>KinesisStreamsSink</code> uses <code>KinesisAsyncClient</code> to send records to kinesis,
which doesn't support aggregation. In consequence, table options configuring aggregation in the deprecated <code>FlinkKinesisProducer</code>
are now deprecated and will be ignored, this includes <code>sink.producer.aggregation-enabled</code> and
<code>sink.producer.aggregation-count</code>.
Expand Down
20 changes: 10 additions & 10 deletions docs/content/docs/connectors/datastream/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ To use this connector, add one or more of the following dependencies to your pro
</tr>
<tr>
<td>Sink</td>
<td>{{< artifact flink-connector-aws-kinesis-data-streams >}}</td>
<td>{{< artifact flink-connector-aws-kinesis-streams >}}</td>
</tr>
</tbody>
</table>
Expand Down Expand Up @@ -578,9 +578,9 @@ Retry and backoff parameters can be configured using the `ConsumerConfigConstant
this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured.
Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys.

## Kinesis Data Streams Sink
## Kinesis Streams Sink

The Kinesis Data Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Kinesis stream.
The Kinesis Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Kinesis stream.

To write data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the Amazon Kinesis Data Stream console.

Expand All @@ -597,8 +597,8 @@ sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1");
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");

KinesisDataStreamsSink<String> kdsSink =
KinesisDataStreamsSink.<String>builder()
KinesisStreamsSink<String> kdsSink =
KinesisStreamsSink.<String>builder()
.setKinesisClientProperties(sinkProperties) // Required
.setSerializationSchema(new SimpleStringSchema()) // Required
.setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) // Required
Expand Down Expand Up @@ -626,7 +626,7 @@ sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1")
sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")

val kdsSink = KinesisDataStreamsSink.<String>builder()
val kdsSink = KinesisStreamsSink.<String>builder()
.setKinesisClientProperties(sinkProperties) // Required
.setSerializationSchema(new SimpleStringSchema()) // Required
.setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) // Required
Expand All @@ -648,7 +648,7 @@ simpleStringStream.sinkTo(kdsSink)

The above is a simple example of using the Kinesis sink. Begin by creating a `java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the builder. The default values for the optional configurations are shown above. Some of these values have been set as a result of [configuration on KDS](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).

You will always need to supply a `KinesisDataStreamsSinkElementConverter` during sink creation. This is where you specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record.
You will always need to specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record.

Some or all of the records in a request may fail to be persisted by Kinesis Data Streams for a number of reasons. If `failOnError` is on, then a runtime exception will be raised. Otherwise those records will be requeued in the buffer for retry.

Expand All @@ -670,8 +670,8 @@ found at [Quotas and Limits](https://docs.aws.amazon.com/streams/latest/dev/serv

You generally reduce backpressure by increasing the size of the internal queue:
```
KinesisDataStreamsSink<String> kdsSink =
KinesisDataStreamsSink.<String>builder()
KinesisStreamsSink<String> kdsSink =
KinesisStreamsSink.<String>builder()
...
.setMaxBufferedRequests(10_000)
...
Expand All @@ -680,7 +680,7 @@ KinesisDataStreamsSink<String> kdsSink =
## Kinesis Producer

{{< hint warning >}}
The old Kinesis sink `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is deprecated and may be removed with a future release of Flink, please use [Kinesis Sink]({{<ref "docs/connectors/datastream/kinesis#kinesis-data-streams-sink">}}) instead.
The old Kinesis sink `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is deprecated and may be removed with a future release of Flink, please use [Kinesis Sink]({{<ref "docs/connectors/datastream/kinesis#kinesis-streams-sink">}}) instead.
{{< /hint >}}

The new sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) whereas the old sink uses the Kinesis Producer Library. Because of this, the new Kinesis sink does not support [aggregation](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation).
Expand Down
8 changes: 4 additions & 4 deletions docs/content/docs/connectors/table/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ Connector Options
<td></td>
<td>
Deprecated options previously used by the legacy connector.
Options with equivalant alternatives in <code>KinesisDataStreamsSink</code> are matched
Options with equivalant alternatives in <code>KinesisStreamsSink</code> are matched
to their respective properties. Unsupported options are logged out to user as warnings.
</td>
</tr>
Expand Down Expand Up @@ -881,11 +881,11 @@ Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" >

# Updates in 1.15

Kinesis table API connector sink data stream depends on <code>FlinkKinesisProducer</code> till 1.14, with the introduction of <code>KinesisDataStreamsSink</code> in 1.15 kinesis table API sink connector has been migrated to the new <code>KinesisDataStreamsSink</code>. Authentication options have been migrated identically while sink configuration options are now compatible with <code>KinesisDataStreamsSink</code>.
Kinesis table API connector sink data stream depends on <code>FlinkKinesisProducer</code> till 1.14, with the introduction of <code>KinesisStreamsSink</code> in 1.15 kinesis table API sink connector has been migrated to the new <code>KinesisStreamsSink</code>. Authentication options have been migrated identically while sink configuration options are now compatible with <code>KinesisStreamsSink</code>.

Options configuring <code>FlinkKinesisProducer</code> are now deprecated with fallback support for common configuration options with <code>KinesisDataStreamsSink</code>.
Options configuring <code>FlinkKinesisProducer</code> are now deprecated with fallback support for common configuration options with <code>KinesisStreamsSink</code>.

<code>KinesisDataStreamsSink</code> uses <code>KinesisAsyncClient</code> to send records to kinesis,
<code>KinesisStreamsSink</code> uses <code>KinesisAsyncClient</code> to send records to kinesis,
which doesn't support aggregation. In consequence, table options configuring aggregation in the deprecated <code>FlinkKinesisProducer</code>
are now deprecated and will be ignored, this includes <code>sink.producer.aggregation-enabled</code> and
<code>sink.producer.aggregation-count</code>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion flink-architecture-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-aws-kinesis-data-streams</artifactId>
<artifactId>flink-connector-aws-kinesis-streams</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ under the License.
</parent>

<artifactId>flink-connector-aws-kinesis-firehose</artifactId>
<name>Flink : Connectors : AWS Kinesis Data Firehose</name>
<name>Flink : Connectors : Amazon Kinesis Data Firehose</name>
<properties>
<aws.sdk.version>2.17.52</aws.sdk.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,39 +129,38 @@ public String asSummaryString() {

/** Builder class for {@link KinesisFirehoseDynamicSink}. */
@Internal
public static class KinesisDataFirehoseDynamicSinkBuilder
extends AsyncDynamicTableSinkBuilder<Record, KinesisDataFirehoseDynamicSinkBuilder> {
public static class KinesisFirehoseDynamicSinkBuilder
extends AsyncDynamicTableSinkBuilder<Record, KinesisFirehoseDynamicSinkBuilder> {

private DataType consumedDataType = null;
private String deliveryStream = null;
private Properties firehoseClientProperties = null;
private EncodingFormat<SerializationSchema<RowData>> encodingFormat = null;
private Boolean failOnError = null;

public KinesisDataFirehoseDynamicSinkBuilder setConsumedDataType(
DataType consumedDataType) {
public KinesisFirehoseDynamicSinkBuilder setConsumedDataType(DataType consumedDataType) {
this.consumedDataType = consumedDataType;
return this;
}

public KinesisDataFirehoseDynamicSinkBuilder setDeliveryStream(String deliveryStream) {
public KinesisFirehoseDynamicSinkBuilder setDeliveryStream(String deliveryStream) {
this.deliveryStream = deliveryStream;
return this;
}

public KinesisDataFirehoseDynamicSinkBuilder setFirehoseClientProperties(
public KinesisFirehoseDynamicSinkBuilder setFirehoseClientProperties(
Properties firehoseClientProperties) {
this.firehoseClientProperties = firehoseClientProperties;
return this;
}

public KinesisDataFirehoseDynamicSinkBuilder setEncodingFormat(
public KinesisFirehoseDynamicSinkBuilder setEncodingFormat(
EncodingFormat<SerializationSchema<RowData>> encodingFormat) {
this.encodingFormat = encodingFormat;
return this;
}

public KinesisDataFirehoseDynamicSinkBuilder setFailOnError(Boolean failOnError) {
public KinesisFirehoseDynamicSinkBuilder setFailOnError(Boolean failOnError) {
this.failOnError = failOnError;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {

AsyncDynamicSinkContext factoryContext = new AsyncDynamicSinkContext(this, context);

KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder builder =
new KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder();
KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder builder =
new KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder();

KinesisFirehoseConnectorOptionUtils optionsUtils =
new KinesisFirehoseConnectorOptionUtils(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testGoodTableSink() {

// Construct expected DynamicTableSink using factory under test
KinesisFirehoseDynamicSink expectedSink =
new KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder()
new KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder()
.setConsumedDataType(sinkSchema.toPhysicalRowDataType())
.setDeliveryStream(DELIVERY_STREAM_NAME)
.setFirehoseClientProperties(defaultSinkProperties())
Expand Down Expand Up @@ -134,9 +134,8 @@ private TableOptionsBuilder defaultTableOptions() {
.withFormatOption(TestFormatFactory.FAIL_ON_MISSING, "true");
}

private KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder
getDefaultSinkBuilder() {
return new KinesisFirehoseDynamicSink.KinesisDataFirehoseDynamicSinkBuilder()
private KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder getDefaultSinkBuilder() {
return new KinesisFirehoseDynamicSink.KinesisFirehoseDynamicSinkBuilder()
.setFailOnError(true)
.setMaxBatchSize(100)
.setMaxInFlightRequests(100)
Expand Down
Loading

0 comments on commit 3f06f97

Please sign in to comment.