Skip to content

Commit

Permalink
[FLINK-18840][table-api] Add StreamStatementSet.attachAsDataStream()
Browse files Browse the repository at this point in the history
This closes apache#16816.
  • Loading branch information
twalthr committed Aug 16, 2021
1 parent fc0d35b commit 7ff4cbd
Show file tree
Hide file tree
Showing 15 changed files with 634 additions and 11 deletions.
130 changes: 130 additions & 0 deletions docs/content.zh/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2149,6 +2149,136 @@ watermarks in a subsequent `ProcessFunction` in DataStream API.

{{< top >}}

Adding Table API Pipelines to DataStream API
--------------------------------------------

A single Flink job can consist of multiple disconnected pipelines that run next to each other.

Source-to-sink pipelines defined in Table API can be attached *as a whole* to the `StreamExecutionEnvironment`
and will be submitted when calling one of the `execute` methods in the DataStream API.

However, a source does not necessarily have to be a table source but can also be another DataStream
pipeline that was converted to Table API before. Thus, it is possible to use table sinks for DataStream API
programs.

The functionality is available through a specialized `StreamStatementSet` instance created with
`StreamTableEnvironment.createStatementSet()`. By using a statement set, the planner can optimize all
added statements together and come up with one or more end-to-end pipelines that are added to the
`StreamExecutionEnvironment` when calling `StreamStatementSet.attachAsDataStream()`.

The following example shows how to add table programs to a DataStream API program within one job.

{{< tabs "de4cd538-4345-49ee-b86e-b308f002e069" >}}
{{< tab "Java" >}}
```java
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.*;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

StreamStatementSet statementSet = tableEnv.createStatementSet();

// create some source
TableDescriptor sourceDescriptor =
TableDescriptor.forConnector("datagen")
.option("number-of-rows", "3")
.schema(
Schema.newBuilder()
.column("myCol", DataTypes.INT())
.column("myOtherCol", DataTypes.BOOLEAN())
.build())
.build();

// create some sink
TableDescriptor sinkDescriptor = TableDescriptor.forConnector("print").build();

// add a pure Table API pipeline
Table tableFromSource = tableEnv.from(sourceDescriptor);
statementSet.addInsert(sinkDescriptor, tableFromSource);

// use table sinks for the DataStream API pipeline
DataStream<Integer> dataStream = env.fromElements(1, 2, 3);
Table tableFromStream = tableEnv.fromDataStream(dataStream);
statementSet.addInsert(sinkDescriptor, tableFromStream);

// attach both pipelines to StreamExecutionEnvironment
// (the statement set will be cleared after calling this method)
statementSet.attachAsDataStream();

// define other DataStream API parts
env.fromElements(4, 5, 6).addSink(new DiscardingSink<>());

// use DataStream API to submit the pipelines
env.execute();

// prints similar to:
// +I[1618440447, false]
// +I[1259693645, true]
// +I[158588930, false]
// +I[1]
// +I[2]
// +I[3]
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.streaming.api.functions.sink.DiscardingSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

val statementSet = tableEnv.createStatementSet()

// create some source
val sourceDescriptor = TableDescriptor.forConnector("datagen")
.option("number-of-rows", "3")
.schema(Schema.newBuilder
.column("myCol", DataTypes.INT)
.column("myOtherCol", DataTypes.BOOLEAN).build)
.build

// create some sink
val sinkDescriptor = TableDescriptor.forConnector("print").build

// add a pure Table API pipeline
val tableFromSource = tableEnv.from(sourceDescriptor)
statementSet.addInsert(sinkDescriptor, tableFromSource)

// use table sinks for the DataStream API pipeline
val dataStream = env.fromElements(1, 2, 3)
val tableFromStream = tableEnv.fromDataStream(dataStream)
statementSet.addInsert(sinkDescriptor, tableFromStream)

// attach both pipelines to StreamExecutionEnvironment
// (the statement set will be cleared calling this method)
statementSet.attachAsDataStream()

// define other DataStream API parts
env.fromElements(4, 5, 6).addSink(new DiscardingSink[Int]())

// now use DataStream API to submit the pipelines
env.execute()

// prints similar to:
// +I[1618440447, false]
// +I[1259693645, true]
// +I[158588930, false]
// +I[1]
// +I[2]
// +I[3]
```
{{< /tab >}}
{{< /tabs >}}

{{< top >}}

Implicit Conversions in Scala
-----------------------------

Expand Down
131 changes: 130 additions & 1 deletion docs/content/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2148,6 +2148,136 @@ watermarks in a subsequent `ProcessFunction` in DataStream API.

{{< top >}}

Adding Table API Pipelines to DataStream API
--------------------------------------------

A single Flink job can consist of multiple disconnected pipelines that run next to each other.

Source-to-sink pipelines defined in Table API can be attached *as a whole* to the `StreamExecutionEnvironment`
and will be submitted when calling one of the `execute` methods in the DataStream API.

However, a source does not necessarily have to be a table source but can also be another DataStream
pipeline that was converted to Table API before. Thus, it is possible to use table sinks for DataStream API
programs.

The functionality is available through a specialized `StreamStatementSet` instance created with
`StreamTableEnvironment.createStatementSet()`. By using a statement set, the planner can optimize all
added statements together and come up with one or more end-to-end pipelines that are added to the
`StreamExecutionEnvironment` when calling `StreamStatementSet.attachAsDataStream()`.

The following example shows how to add table programs to a DataStream API program within one job.

{{< tabs "de4cd538-4345-49ee-b86e-b308f002e069" >}}
{{< tab "Java" >}}
```java
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.*;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

StreamStatementSet statementSet = tableEnv.createStatementSet();

// create some source
TableDescriptor sourceDescriptor =
TableDescriptor.forConnector("datagen")
.option("number-of-rows", "3")
.schema(
Schema.newBuilder()
.column("myCol", DataTypes.INT())
.column("myOtherCol", DataTypes.BOOLEAN())
.build())
.build();

// create some sink
TableDescriptor sinkDescriptor = TableDescriptor.forConnector("print").build();

// add a pure Table API pipeline
Table tableFromSource = tableEnv.from(sourceDescriptor);
statementSet.addInsert(sinkDescriptor, tableFromSource);

// use table sinks for the DataStream API pipeline
DataStream<Integer> dataStream = env.fromElements(1, 2, 3);
Table tableFromStream = tableEnv.fromDataStream(dataStream);
statementSet.addInsert(sinkDescriptor, tableFromStream);

// attach both pipelines to StreamExecutionEnvironment
// (the statement set will be cleared after calling this method)
statementSet.attachAsDataStream();

// define other DataStream API parts
env.fromElements(4, 5, 6).addSink(new DiscardingSink<>());

// use DataStream API to submit the pipelines
env.execute();

// prints similar to:
// +I[1618440447, false]
// +I[1259693645, true]
// +I[158588930, false]
// +I[1]
// +I[2]
// +I[3]
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
import org.apache.flink.streaming.api.functions.sink.DiscardingSink
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

val statementSet = tableEnv.createStatementSet()

// create some source
val sourceDescriptor = TableDescriptor.forConnector("datagen")
.option("number-of-rows", "3")
.schema(Schema.newBuilder
.column("myCol", DataTypes.INT)
.column("myOtherCol", DataTypes.BOOLEAN).build)
.build

// create some sink
val sinkDescriptor = TableDescriptor.forConnector("print").build

// add a pure Table API pipeline
val tableFromSource = tableEnv.from(sourceDescriptor)
statementSet.addInsert(sinkDescriptor, tableFromSource)

// use table sinks for the DataStream API pipeline
val dataStream = env.fromElements(1, 2, 3)
val tableFromStream = tableEnv.fromDataStream(dataStream)
statementSet.addInsert(sinkDescriptor, tableFromStream)

// attach both pipelines to StreamExecutionEnvironment
// (the statement set will be cleared calling this method)
statementSet.attachAsDataStream()

// define other DataStream API parts
env.fromElements(4, 5, 6).addSink(new DiscardingSink[Int]())

// now use DataStream API to submit the pipelines
env.execute()

// prints similar to:
// +I[1618440447, false]
// +I[1259693645, true]
// +I[158588930, false]
// +I[1]
// +I[2]
// +I[3]
```
{{< /tab >}}
{{< /tabs >}}

{{< top >}}

Implicit Conversions in Scala
-----------------------------

Expand Down Expand Up @@ -2188,7 +2318,6 @@ val dataStreamAgain1: DataStream[Row] = table
// call toChangelogStream() implicitly on the Table object
val dataStreamAgain2: DataStream[Row] = table.toChangelogStream
```

{{< top >}}

Mapping between TypeInformation and DataType
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.api.bridge.java;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;

/**
* A {@link StatementSet} that integrates with the Java-specific {@link DataStream} API.
*
* <p>It accepts pipelines defined by DML statements or {@link Table} objects. The planner can
* optimize all added statements together and then either submit them as one job or attach them to
* the underlying {@link StreamExecutionEnvironment}.
*
* <p>The added statements will be cleared when calling the {@link #execute()} or {@link
* #attachAsDataStream()} method.
*/
@PublicEvolving
public interface StreamStatementSet extends StatementSet {

@Override
StreamStatementSet addInsertSql(String statement);

@Override
StreamStatementSet addInsert(String targetPath, Table table);

@Override
StreamStatementSet addInsert(String targetPath, Table table, boolean overwrite);

@Override
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table);

@Override
StreamStatementSet addInsert(TableDescriptor targetDescriptor, Table table, boolean overwrite);

/**
* Optimizes all statements as one entity and adds them as transformations to the underlying
* {@link StreamExecutionEnvironment}.
*
* <p>Use {@link StreamExecutionEnvironment#execute()} to execute them.
*
* <p>The added statements will be cleared after calling this method.
*/
void attachAsDataStream();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -721,6 +722,17 @@ Table fromChangelogStream(
DataStream<Row> toChangelogStream(
Table table, Schema targetSchema, ChangelogMode changelogMode);

/**
* Returns a {@link StatementSet} that integrates with the Java-specific {@link DataStream} API.
*
* <p>It accepts pipelines defined by DML statements or {@link Table} objects. The planner can
* optimize all added statements together and then either submit them as one job or attach them
* to the underlying {@link StreamExecutionEnvironment}.
*
* @return statement set builder for the Java-specific {@link DataStream} API
*/
StreamStatementSet createStatementSet();

/**
* Converts the given {@link DataStream} into a {@link Table} with specified field names.
*
Expand Down
Loading

0 comments on commit 7ff4cbd

Please sign in to comment.