Skip to content

Commit

Permalink
[FLINK-16363][table] Correct the execution behavior of TableEnvironme…
Browse files Browse the repository at this point in the history
…nt and StreamTableEnvironment

Note: In previous versions, TableEnvironment.execute() and StreamExecutionEnvironment.execute() can both trigger table and DataStream programs. Since 1.11.0, table programs can only be triggered by TableEnvironment.execute(). Once table program is convereted into DataStream program (through toAppendStream() or toRetractStream() method), it can only be triggered by StreamExecutionEnvironment.execute().

This closes apache#11296
  • Loading branch information
godfreyhe authored and wuchong committed Mar 11, 2020
1 parent ffc8e42 commit 1276fa6
Show file tree
Hide file tree
Showing 52 changed files with 1,234 additions and 283 deletions.
23 changes: 11 additions & 12 deletions docs/dev/table/common.md
Original file line number Diff line number Diff line change
Expand Up @@ -835,13 +835,18 @@ Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/de
1. Optimization of the logical plan
2. Translation into a DataStream or DataSet program

A Table API or SQL query is translated when:
For streaming, a Table API or SQL query is translated when:

* `TableEnvironment.execute()` is called. A `Table` (emitted to a `TableSink` through `Table.insertInto()`) or a SQL update query (specified through `TableEnvironment.sqlUpdate()`) will be buffered in `TableEnvironment` first. Each sink will be optimized independently. The execution graph contains multiple independent sub-DAGs.
* A `Table` is translated when it is converted into a `DataStream` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)). Once translated, it's a regular DataStream program and is executed when `StreamExecutionEnvironment.execute()` is called.

For batch, a Table API or SQL query is translated when:

* a `Table` is emitted to a `TableSink`, i.e., when `Table.insertInto()` is called.
* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called.
* a `Table` is converted into a `DataStream` or `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)).
* a `Table` is converted into a `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)).

Once translated, a Table API or SQL query is handled like a regular DataStream or DataSet program and is executed when `StreamExecutionEnvironment.execute()` or `ExecutionEnvironment.execute()` is called.
Once translated, a Table API or SQL query is handled like a regular DataSet program and is executed when `ExecutionEnvironment.execute()` is called.

</div>

Expand All @@ -851,17 +856,11 @@ Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/de
1. Optimization of the logical plan,
2. Translation into a DataStream program.

The behavior of translating a query is different for `TableEnvironment` and `StreamTableEnvironment`.

For `TableEnvironment`, a Table API or SQL query is translated when `TableEnvironment.execute()` is called, because `TableEnvironment` will optimize multiple-sinks into one DAG.
a Table API or SQL query is translated when:

While for `StreamTableEnvironment`, a Table API or SQL query is translated when:

* a `Table` is emitted to a `TableSink`, i.e., when `Table.insertInto()` is called.
* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called.
* a `Table` is converted into a `DataStream`.
* `TableEnvironment.execute()` is called. A `Table` (emitted to a `TableSink` through `Table.insertInto()`) or a SQL update query (specified through `TableEnvironment.sqlUpdate()`) will be buffered in `TableEnvironment` first. All sinks will be optimized into one DAG.
* A `Table` is translated when it is converted into a `DataStream` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)). Once translated, it's a regular DataStream program and is executed when `StreamExecutionEnvironment.execute()` is called.

Once translated, a Table API or SQL query is handled like a regular DataStream program and is executed when `TableEnvironment.execute()` or `StreamExecutionEnvironment.execute()` is called.

</div>
</div>
Expand Down
24 changes: 11 additions & 13 deletions docs/dev/table/common.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -822,13 +822,18 @@ Table API 和 SQL 查询会被翻译成 [DataStream]({{ site.baseurl }}/zh/dev/d
1. 优化逻辑执行计划
2. 翻译成 DataStream 或 DataSet 程序

Table API 或者 SQL 查询在下列情况下会被翻译:
对于 Streaming 而言,Table API 或者 SQL 查询在下列情况下会被翻译:

*`TableEnvironment.execute()` 被调用时。`Table` (通过 `Table.insertInto()` 输出给 `TableSink`)和 SQL (通过调用 `TableEnvironment.sqlUpdate()`)会先被缓存到 `TableEnvironment` 中,每个 sink 会被单独优化。执行计划将包括多个独立的有向无环子图。
* `Table` 被转换成 `DataStream` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))。转换完成后,它就成为一个普通的 DataStream 程序,并且会在调用 `StreamExecutionEnvironment.execute()` 的时候被执行。

对于 Batch 而言,Table API 或者 SQL 查询在下列情况下会被翻译:

* `Table` 被输出给 `TableSink`,即当调用 `Table.insertInto()` 时。
* SQL 更新语句执行时,即,当调用 `TableEnvironment.sqlUpdate()` 时。
* `Table` 被转换成 `DataStream` 或者 `DataSet` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))。
* `Table` 被转换成 `DataSet` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))。

翻译完成后,Table API 或者 SQL 查询会被当做普通的 DataStream 或 DataSet 程序对待并且会在调用 `StreamExecutionEnvironment.execute()` `ExecutionEnvironment.execute()` 的时候被执行。
翻译完成后,Table API 或者 SQL 查询会被当做普通的 DataSet 程序对待并且会在调用 `ExecutionEnvironment.execute()` 的时候被执行。

</div>

Expand All @@ -838,17 +843,10 @@ Table API 或者 SQL 查询在下列情况下会被翻译:
1. 优化逻辑执行计划
2. 翻译成 DataStream 程序

TableEnvironment 和 StreamTableEnvironment 翻译查询的方式不同。

对于 `TableEnvironment`,Table API 和 SQL 查询会在调用 `TableEnvironment.execute()` 时被翻译,因为 `TableEnvironment` 会将多 sink 优化成一张有向无环图。

而对于 `StreamTableEnvironment`,当下列情况发生时,Table API 和 SQL 查询才会被翻译:

* `Table 被发送至`TableSink`,即,当 `Table.insertInto()` 被调用时。
* SQL 更新语句执行时,即,当调用 `TableEnvironment.sqlUpdate()` 时。
* `Table` 被转换成 `DataStream` 时。
Table API 或者 SQL 查询在下列情况下会被翻译:

翻译完成后,Table API 或者 SQL 查询会被当做普通的 DataStream 程序对待并且会在调用 `TableEnvironment.execute()` 或者 `StreamExecutionEnvironment.execute()` 的时候被执行。
*`TableEnvironment.execute()` 被调用时。`Table` (通过 `Table.insertInto()` 输出给 `TableSink`)和 SQL (通过调用 `TableEnvironment.sqlUpdate()`)会先被缓存到 `TableEnvironment` 中,所有的 sink 会被优化成一张有向无环图。
* `Table` 被转换成 `DataStream` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))。转换完成后,它就成为一个普通的 DataStream 程序,并且会在调用 `StreamExecutionEnvironment.execute()` 的时候被执行。

</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ public void testCassandraTableSink() throws Exception {

tEnv.sqlQuery("select * from testFlinkTable").insertInto("cassandraTable");

env.execute();
tEnv.execute("job name");
ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY));

// validate that all input was correctly written to Cassandra
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public void testBlinkUdf() throws Exception {
}

private void testUdf(boolean batch) throws Exception {
StreamExecutionEnvironment env = null;
TableEnvironment tEnv;
EnvironmentSettings.Builder envBuilder = EnvironmentSettings.newInstance().useBlinkPlanner();
if (batch) {
Expand All @@ -165,8 +166,8 @@ private void testUdf(boolean batch) throws Exception {
if (batch) {
tEnv = TableEnvironment.create(envBuilder.build());
} else {
tEnv = StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment(), envBuilder.build());
env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env, envBuilder.build());
}

BatchTestBase.configForMiniCluster(tEnv.getConfig());
Expand Down Expand Up @@ -237,7 +238,7 @@ private void testUdf(boolean batch) throws Exception {
streamTEnv.toRetractStream(tEnv.sqlQuery(selectSql), Row.class)
.map(new JavaToScala())
.addSink((SinkFunction) sink);
streamTEnv.execute("");
env.execute("");
results = JavaScalaConversionUtil.toJava(sink.getRetractResults());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void testKafkaSourceSink() throws Exception {
result.addSink(sink).setParallelism(1);

try {
tEnv.execute("Job_2");
env.execute("Job_2");
} catch (Throwable e) {
// we have to use a specific exception to indicate the job is finished,
// because the registered Kafka source is infinite.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testReal() throws Exception {
")");

tEnv.sqlUpdate("INSERT INTO upsertSink SELECT CAST(1.0 as FLOAT)");
env.execute();
tEnv.execute("job name");
check(new Row[] {Row.of(1.0f)}, DB_URL, "REAL_TABLE", new String[]{"real_data"});
}

Expand Down Expand Up @@ -172,7 +172,7 @@ public long extractAscendingTimestamp(Tuple4<Integer, Long, String, Timestamp> e
" GROUP BY len, cTag\n" +
")\n" +
"GROUP BY cnt, cTag");
env.execute();
tEnv.execute("job name");
check(new Row[] {
Row.of(1, 5, 1, Timestamp.valueOf("1970-01-01 00:00:00.006")),
Row.of(7, 1, 1, Timestamp.valueOf("1970-01-01 00:00:00.021")),
Expand Down Expand Up @@ -203,7 +203,7 @@ public void testAppend() throws Exception {
")");

tEnv.sqlUpdate("INSERT INTO upsertSink SELECT id, num, ts FROM T WHERE id IN (2, 10, 20)");
env.execute();
tEnv.execute("job name");
check(new Row[] {
Row.of(2, 2, Timestamp.valueOf("1970-01-01 00:00:00.002")),
Row.of(10, 4, Timestamp.valueOf("1970-01-01 00:00:00.01")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public static void main(String[] args) throws Exception {

tEnv.toAppendStream(result, Order.class).print();

// after the table program is converted to DataStream program,
// we must use `env.execute()` to submit the job.
env.execute();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ public static void main(String[] args) throws Exception {
Table result = tEnv.sqlQuery(query);
tEnv.toAppendStream(result, Row.class).print();

// submit the job
tEnv.execute("Streaming Window SQL Job");
// after the table program is converted to DataStream program,
// we must use `env.execute()` to submit the job.
env.execute("Streaming Window SQL Job");

// should output:
// 2019-12-12 00:00:00.000,3,10,3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import tempfile
import json

import unittest

from pyflink.common import ExecutionConfig, RestartStrategies
from pyflink.datastream import (StreamExecutionEnvironment, CheckpointConfig,
CheckpointingMode, MemoryStateBackend, TimeCharacteristic)
Expand Down Expand Up @@ -172,6 +174,7 @@ def test_get_set_stream_time_characteristic(self):

self.assertEqual(time_characteristic, TimeCharacteristic.EventTime)

@unittest.skip("Python API does not support DataStream now. refactor this test later")
def test_get_execution_plan(self):
tmp_dir = tempfile.gettempdir()
source_path = os.path.join(tmp_dir + '/streaming.csv')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ def test_explain_with_multi_sinks(self):
t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)

with self.assertRaises(TableException):
t_env.explain(extended=True)
actual = t_env.explain(extended=True)
assert isinstance(actual, str)

def test_sql_query(self):
t_env = self.t_env
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.planner.delegation.ExecutorBase;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -273,26 +272,13 @@ public ExecutionConfig getExecutionConfig() {

public Pipeline createPipeline(String name) {
if (streamExecEnv != null) {
// special case for Blink planner to apply batch optimizations
// note: it also modifies the ExecutionConfig!
if (isBlinkPlanner(executor.getClass())) {
return ((ExecutorBase) executor).getStreamGraph(name);
}
return streamExecEnv.getStreamGraph(name);
StreamTableEnvironmentImpl streamTableEnv = (StreamTableEnvironmentImpl) tableEnv;
return streamTableEnv.getPipeline(name);
} else {
return execEnv.createProgramPlan(name);
}
}

private boolean isBlinkPlanner(Class<? extends Executor> executorClass) {
try {
return ExecutorBase.class.isAssignableFrom(executorClass);
} catch (NoClassDefFoundError ignore) {
// blink planner might not be on the class path
return false;
}
}

/** Returns a builder for this {@link ExecutionContext}. */
public static Builder builder(
Environment defaultEnv,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
Expand Down Expand Up @@ -357,6 +358,13 @@ public StreamExecutionEnvironment execEnv() {
return executionEnvironment;
}

/**
* This method is used for sql client to submit job.
*/
public Pipeline getPipeline(String jobName) {
return execEnv.createPipeline(translateAndClearBuffer(), tableConfig, jobName);
}

private <T> DataStream<T> toDataStream(Table table, OutputConversionModifyOperation modifyOperation) {
List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));

Expand All @@ -372,17 +380,6 @@ protected void validateTableSource(TableSource<?> tableSource) {
validateTimeCharacteristic(TableSourceValidation.hasRowtimeAttribute(tableSource));
}

@Override
protected boolean isEagerOperationTranslation() {
return true;
}

@Override
public String explain(boolean extended) {
// throw exception directly, because the operations to explain are always empty
throw new TableException("'explain' method without any tables is unsupported in StreamTableEnvironment.");
}

private <T> TypeInformation<T> extractTypeInformation(Table table, Class<T> clazz) {
try {
return TypeExtractor.createTypeInfo(clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlParserException;
Expand Down Expand Up @@ -427,11 +428,7 @@ private void insertIntoInternal(UnresolvedIdentifier unresolvedIdentifier, Table
objectIdentifier,
table.getQueryOperation()));

if (isEagerOperationTranslation()) {
translate(modifyOperations);
} else {
buffer(modifyOperations);
}
buffer(modifyOperations);
}

private Optional<CatalogQueryOperation> scanInternal(UnresolvedIdentifier identifier) {
Expand Down Expand Up @@ -566,12 +563,7 @@ public void sqlUpdate(String stmt) {
Operation operation = operations.get(0);

if (operation instanceof ModifyOperation) {
List<ModifyOperation> modifyOperations = Collections.singletonList((ModifyOperation) operation);
if (isEagerOperationTranslation()) {
translate(modifyOperations);
} else {
buffer(modifyOperations);
}
buffer(Collections.singletonList((ModifyOperation) operation));
} else if (operation instanceof CreateTableOperation) {
CreateTableOperation createTableOperation = (CreateTableOperation) operation;
catalogManager.createTable(
Expand Down Expand Up @@ -723,9 +715,8 @@ public TableConfig getConfig() {

@Override
public JobExecutionResult execute(String jobName) throws Exception {
translate(bufferedModifyOperations);
bufferedModifyOperations.clear();
return execEnv.execute(jobName);
Pipeline pipeline = execEnv.createPipeline(translateAndClearBuffer(), tableConfig, jobName);
return execEnv.execute(pipeline);
}

/**
Expand All @@ -738,22 +729,6 @@ protected QueryOperation qualifyQueryOperation(ObjectIdentifier identifier, Quer
return queryOperation;
}

/**
* Defines the behavior of this {@link TableEnvironment}. If true the queries will
* be translated immediately. If false the {@link ModifyOperation}s will be buffered
* and translated only when {@link #execute(String)} is called.
*
* <p>If the {@link TableEnvironment} works in a lazy manner it is undefined what
* configurations values will be used. It depends on the characteristic of the particular
* parameter. Some might used values current to the time of query construction (e.g. the currentCatalog)
* and some use values from the time when {@link #execute(String)} is called (e.g. timeZone).
*
* @return true if the queries should be translated immediately.
*/
protected boolean isEagerOperationTranslation() {
return false;
}

/**
* Subclasses can override this method to add additional checks.
*
Expand All @@ -763,10 +738,25 @@ protected void validateTableSource(TableSource<?> tableSource) {
TableSourceValidation.validateTableSource(tableSource, tableSource.getTableSchema());
}

private void translate(List<ModifyOperation> modifyOperations) {
List<Transformation<?>> transformations = planner.translate(modifyOperations);
/**
* Translate the buffered operations to Transformations, and clear the buffer.
*
* <p>The buffer will be clear even if the `translate` fails. In most cases,
* the failure is not retryable (e.g. type mismatch, can't generate physical plan).
* If the buffer is not clear after failure, the following `translate` will also fail.
*/
protected List<Transformation<?>> translateAndClearBuffer() {
List<Transformation<?>> transformations;
try {
transformations = translate(bufferedModifyOperations);
} finally {
bufferedModifyOperations.clear();
}
return transformations;
}

execEnv.apply(transformations);
private List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
return planner.translate(modifyOperations);
}

private void buffer(List<ModifyOperation> modifyOperations) {
Expand Down
Loading

0 comments on commit 1276fa6

Please sign in to comment.