Skip to content

Commit

Permalink
[FLINK-22844][docs][table] Add documentation to introduce ExplainDeta…
Browse files Browse the repository at this point in the history
…ils for EXPLAIN sytnax (#16051)
  • Loading branch information
chaozwn committed Aug 17, 2021
1 parent 45d8736 commit 6ce650c
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 45 deletions.
151 changes: 128 additions & 23 deletions docs/content.zh/docs/dev/table/sql/explain.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// register a table named "Orders"
tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256) WITH (...)");
tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256) WITH (...)");
tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')");
tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')");

// explain SELECT statement through TableEnvironment.explainSql()
String explanation = tEnv.explainSql(
Expand All @@ -87,6 +87,13 @@ TableResult tableResult = tEnv.executeSql(
"SELECT `count`, word FROM MyTable2");
tableResult.print();

TableResult tableResult2 = tEnv.executeSql(
"EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN " +
"SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
"UNION ALL " +
"SELECT `count`, word FROM MyTable2");
tableResult2.print();

```
{{< /tab >}}
{{< tab "Scala" >}}
Expand All @@ -95,8 +102,8 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment()
val tEnv = StreamTableEnvironment.create(env)

// register a table named "Orders"
tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256) WITH (...)")
tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256) WITH (...)")
tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")

// explain SELECT statement through TableEnvironment.explainSql()
val explanation = tEnv.explainSql(
Expand All @@ -113,14 +120,22 @@ val tableResult = tEnv.executeSql(
"SELECT `count`, word FROM MyTable2")
tableResult.print()

val tableResult2 = tEnv.executeSql(
"EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN " +
"SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
"UNION ALL " +
"SELECT `count`, word FROM MyTable2")
tableResult2.print()

```
{{< /tab >}}
{{< tab "Python" >}}
```python
table_env = StreamTableEnvironment.create(...)
settings = EnvironmentSettings.new_instance()...
table_env = StreamTableEnvironment.create(env, settings)

t_env.execute_sql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256) WITH (...)")
t_env.execute_sql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256) WITH (...)")
t_env.execute_sql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
t_env.execute_sql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")

# explain SELECT statement through TableEnvironment.explain_sql()
explanation1 = t_env.explain_sql(
Expand All @@ -137,52 +152,142 @@ table_result = t_env.execute_sql(
"SELECT `count`, word FROM MyTable2")
table_result.print()

table_result2 = t_env.execute_sql(
"EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN "
"SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' "
"UNION ALL "
"SELECT `count`, word FROM MyTable2")
table_result2.print()

```
{{< /tab >}}
{{< tab "SQL CLI" >}}

```sql
Flink SQL> CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256);
Flink SQL> CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen');
[INFO] Table has been created.

Flink SQL> CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256);
Flink SQL> CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen');
[INFO] Table has been created.

Flink SQL> EXPLAIN PLAN FOR SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%'
> UNION ALL
> SELECT `count`, word FROM MyTable2;


Flink SQL> EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN SELECT `count`, word FROM MyTable1
> WHERE word LIKE 'F%'
> UNION ALL
> SELECT `count`, word FROM MyTable2;
```
{{< /tab >}}
{{< /tabs >}}

The `EXPLAIN` result is:

{{< tabs "explain result" >}}

{{< tab "EXPLAIN PLAN" >}}

```text
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalProject(count=[$0], word=[$1])
: +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
== Optimized Physical Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
== Optimized Execution Plan ==
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
```

{{< /tab >}}

{{< tab "EXPLAIN PLAN WITH DETAILS" >}}

```text
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
LogicalTableScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
LogicalTableScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
:- LogicalProject(count=[$0], word=[$1])
: +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
+- LogicalProject(count=[$0], word=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
== Optimized Physical Plan ==
Union(all=[true], union all=[count, word])
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
Union(all=[true], union=[count, word], changelogMode=[I]): rowcount = 1.05E8, cumulative cost = {3.1E8 rows, 3.05E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], changelogMode=[I]): rowcount = 5000000.0, cumulative cost = {1.05E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
== Optimized Execution Plan ==
Union(all=[true], union all=[count, word])
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
Union(all=[true], union=[count, word])
:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
== Physical Execution Plan ==
{
"nodes" : [ {
"id" : 37,
"type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])",
"pact" : "Data Source",
"contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])",
"parallelism" : 1
}, {
"id" : 38,
"type" : "Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])",
"pact" : "Operator",
"contents" : "Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 37,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 39,
"type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])",
"pact" : "Data Source",
"contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])",
"parallelism" : 1
} ]
```

{{< /tab >}}

{{< /tabs >}}

{{< top >}}

## ExplainDetails
```text
Print the plan for the statement with specified ExplainDetails.
ESTIMATED_COST: generates cost information on physical node estimated by optimizer,
e.g. TableSourceScan(..., cumulative cost ={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory})
CHANGELOG_MODE:generates changelog mode for every physical rel node.
e.g. GroupAggregate(..., changelogMode=[I,UA,D])
JSON_EXECUTION_PLAN: generates the execution plan in json format of the program.
```

## Syntax

```sql
EXPLAIN PLAN FOR <query_statement_or_insert_statement>
EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] <query_statement_or_insert_statement>
```

For query syntax, please refer to [Queries]({{< ref "docs/dev/table/sql/queries/overview" >}}) page.

For query syntax, please refer to [Queries]({{< ref "docs/dev/table/sql/queries" >}}#supported-syntax) page.
For INSERT, please refer to [INSERT]({{< ref "docs/dev/table/sql/insert" >}}) page.
Loading

0 comments on commit 6ce650c

Please sign in to comment.