diff --git a/docs/content.zh/docs/dev/table/sql/explain.md b/docs/content.zh/docs/dev/table/sql/explain.md index b0c33cdc6c50b..05c7d59f8deda 100644 --- a/docs/content.zh/docs/dev/table/sql/explain.md +++ b/docs/content.zh/docs/dev/table/sql/explain.md @@ -69,8 +69,8 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // register a table named "Orders" -tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256) WITH (...)"); -tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256) WITH (...)"); +tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')"); +tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')"); // explain SELECT statement through TableEnvironment.explainSql() String explanation = tEnv.explainSql( @@ -87,6 +87,13 @@ TableResult tableResult = tEnv.executeSql( "SELECT `count`, word FROM MyTable2"); tableResult.print(); +TableResult tableResult2 = tEnv.executeSql( + "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN " + + "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " + + "UNION ALL " + + "SELECT `count`, word FROM MyTable2"); +tableResult2.print(); + ``` {{< /tab >}} {{< tab "Scala" >}} @@ -95,8 +102,8 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment() val tEnv = StreamTableEnvironment.create(env) // register a table named "Orders" -tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256) WITH (...)") -tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256) WITH (...)") +tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')") +tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')") // explain SELECT statement through TableEnvironment.explainSql() val explanation = tEnv.explainSql( @@ -113,14 +120,22 @@ val tableResult = tEnv.executeSql( "SELECT `count`, word FROM MyTable2") tableResult.print() +val tableResult2 = tEnv.executeSql( + "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN " + + "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " + + "UNION ALL " + + "SELECT `count`, word FROM MyTable2") +tableResult2.print() + ``` {{< /tab >}} {{< tab "Python" >}} ```python -table_env = StreamTableEnvironment.create(...) +settings = EnvironmentSettings.new_instance()... +table_env = StreamTableEnvironment.create(env, settings) -t_env.execute_sql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256) WITH (...)") -t_env.execute_sql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256) WITH (...)") +t_env.execute_sql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')") +t_env.execute_sql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')") # explain SELECT statement through TableEnvironment.explain_sql() explanation1 = t_env.explain_sql( @@ -137,52 +152,142 @@ table_result = t_env.execute_sql( "SELECT `count`, word FROM MyTable2") table_result.print() +table_result2 = t_env.execute_sql( + "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN " + "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " + "UNION ALL " + "SELECT `count`, word FROM MyTable2") +table_result2.print() + ``` {{< /tab >}} {{< tab "SQL CLI" >}} + ```sql -Flink SQL> CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256); +Flink SQL> CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen'); [INFO] Table has been created. -Flink SQL> CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256); +Flink SQL> CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen'); [INFO] Table has been created. Flink SQL> EXPLAIN PLAN FOR SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' > UNION ALL > SELECT `count`, word FROM MyTable2; - + +Flink SQL> EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN SELECT `count`, word FROM MyTable1 +> WHERE word LIKE 'F%' +> UNION ALL +> SELECT `count`, word FROM MyTable2; ``` {{< /tab >}} {{< /tabs >}} The `EXPLAIN` result is: + +{{< tabs "explain result" >}} + +{{< tab "EXPLAIN PLAN" >}} + +```text +== Abstract Syntax Tree == +LogicalUnion(all=[true]) +:- LogicalProject(count=[$0], word=[$1]) +: +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) ++- LogicalProject(count=[$0], word=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) + +== Optimized Physical Plan == +Union(all=[true], union=[count, word]) +:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) ++- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) + +== Optimized Execution Plan == +Union(all=[true], union=[count, word]) +:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) ++- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) +``` + +{{< /tab >}} + +{{< tab "EXPLAIN PLAN WITH DETAILS" >}} + ```text == Abstract Syntax Tree == LogicalUnion(all=[true]) - LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) - LogicalTableScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) - LogicalTableScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) +:- LogicalProject(count=[$0], word=[$1]) +: +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) ++- LogicalProject(count=[$0], word=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) == Optimized Physical Plan == -Union(all=[true], union all=[count, word]) - Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) - TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) - TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) +Union(all=[true], union=[count, word], changelogMode=[I]): rowcount = 1.05E8, cumulative cost = {3.1E8 rows, 3.05E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory} +:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], changelogMode=[I]): rowcount = 5000000.0, cumulative cost = {1.05E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory} +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory} ++- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory} == Optimized Execution Plan == -Union(all=[true], union all=[count, word]) - Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) - TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) - TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) +Union(all=[true], union=[count, word]) +:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) ++- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : 37, + "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])", + "pact" : "Data Source", + "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])", + "parallelism" : 1 + }, { + "id" : 38, + "type" : "Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])", + "pact" : "Operator", + "contents" : "Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])", + "parallelism" : 1, + "predecessors" : [ { + "id" : 37, + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : 39, + "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])", + "pact" : "Data Source", + "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])", + "parallelism" : 1 + } ] ``` +{{< /tab >}} + +{{< /tabs >}} + {{< top >}} +## ExplainDetails +```text +Print the plan for the statement with specified ExplainDetails. + +ESTIMATED_COST: generates cost information on physical node estimated by optimizer, +e.g. TableSourceScan(..., cumulative cost ={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}) + +CHANGELOG_MODE:generates changelog mode for every physical rel node. +e.g. GroupAggregate(..., changelogMode=[I,UA,D]) + +JSON_EXECUTION_PLAN: generates the execution plan in json format of the program. +``` + ## Syntax ```sql -EXPLAIN PLAN FOR +EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] ``` -For query syntax, please refer to [Queries]({{< ref "docs/dev/table/sql/queries/overview" >}}) page. + +For query syntax, please refer to [Queries]({{< ref "docs/dev/table/sql/queries" >}}#supported-syntax) page. For INSERT, please refer to [INSERT]({{< ref "docs/dev/table/sql/insert" >}}) page. diff --git a/docs/content/docs/dev/table/sql/explain.md b/docs/content/docs/dev/table/sql/explain.md index e49df1beaf8db..1c554bf33380f 100644 --- a/docs/content/docs/dev/table/sql/explain.md +++ b/docs/content/docs/dev/table/sql/explain.md @@ -30,7 +30,6 @@ under the License. EXPLAIN statements are used to explain the logical and optimized query plans of a query or an INSERT statement. - ## Run an EXPLAIN statement {{< tabs "explain" >}} @@ -70,8 +69,8 @@ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironm StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); // register a table named "Orders" -tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256) WITH (...)"); -tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256) WITH (...)"); +tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')"); +tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')"); // explain SELECT statement through TableEnvironment.explainSql() String explanation = tEnv.explainSql( @@ -88,6 +87,13 @@ TableResult tableResult = tEnv.executeSql( "SELECT `count`, word FROM MyTable2"); tableResult.print(); +TableResult tableResult2 = tEnv.executeSql( + "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN " + + "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " + + "UNION ALL " + + "SELECT `count`, word FROM MyTable2"); +tableResult2.print(); + ``` {{< /tab >}} {{< tab "Scala" >}} @@ -96,8 +102,8 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment() val tEnv = StreamTableEnvironment.create(env) // register a table named "Orders" -tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256) WITH (...)") -tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256) WITH (...)") +tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')") +tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')") // explain SELECT statement through TableEnvironment.explainSql() val explanation = tEnv.explainSql( @@ -114,14 +120,21 @@ val tableResult = tEnv.executeSql( "SELECT `count`, word FROM MyTable2") tableResult.print() +val tableResult2 = tEnv.executeSql( + "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN " + + "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " + + "UNION ALL " + + "SELECT `count`, word FROM MyTable2") +tableResult2.print() + ``` {{< /tab >}} {{< tab "Python" >}} ```python table_env = StreamTableEnvironment.create(...) -t_env.execute_sql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256) WITH (...)") -t_env.execute_sql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256) WITH (...)") +t_env.execute_sql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')") +t_env.execute_sql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')") # explain SELECT statement through TableEnvironment.explain_sql() explanation1 = t_env.explain_sql( @@ -138,51 +151,140 @@ table_result = t_env.execute_sql( "SELECT `count`, word FROM MyTable2") table_result.print() +table_result2 = t_env.execute_sql( + "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN " + "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " + "UNION ALL " + "SELECT `count`, word FROM MyTable2") +table_result2.print() + ``` {{< /tab >}} {{< tab "SQL CLI" >}} + ```sql -Flink SQL> CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256); +Flink SQL> CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen'); [INFO] Table has been created. -Flink SQL> CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256); +Flink SQL> CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen'); [INFO] Table has been created. Flink SQL> EXPLAIN PLAN FOR SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' > UNION ALL > SELECT `count`, word FROM MyTable2; - + +Flink SQL> EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN SELECT `count`, word FROM MyTable1 +> WHERE word LIKE 'F%' +> UNION ALL +> SELECT `count`, word FROM MyTable2; ``` {{< /tab >}} {{< /tabs >}} The `EXPLAIN` result is: + +{{< tabs "explain result" >}} + +{{< tab "EXPLAIN PLAN" >}} + +```text +== Abstract Syntax Tree == +LogicalUnion(all=[true]) +:- LogicalProject(count=[$0], word=[$1]) +: +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) ++- LogicalProject(count=[$0], word=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) + +== Optimized Physical Plan == +Union(all=[true], union=[count, word]) +:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) ++- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) + +== Optimized Execution Plan == +Union(all=[true], union=[count, word]) +:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) ++- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) +``` + +{{< /tab >}} + +{{< tab "EXPLAIN PLAN WITH DETAILS" >}} + ```text == Abstract Syntax Tree == LogicalUnion(all=[true]) - LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) - LogicalTableScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) - LogicalTableScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) +:- LogicalProject(count=[$0], word=[$1]) +: +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) ++- LogicalProject(count=[$0], word=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) == Optimized Physical Plan == -Union(all=[true], union all=[count, word]) - Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) - TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) - TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) +Union(all=[true], union=[count, word], changelogMode=[I]): rowcount = 1.05E8, cumulative cost = {3.1E8 rows, 3.05E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory} +:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], changelogMode=[I]): rowcount = 5000000.0, cumulative cost = {1.05E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory} +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory} ++- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory} == Optimized Execution Plan == -Union(all=[true], union all=[count, word]) - Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) - TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) - TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) +Union(all=[true], union=[count, word]) +:- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) ++- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) + +== Physical Execution Plan == +{ + "nodes" : [ { + "id" : 37, + "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])", + "pact" : "Data Source", + "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])", + "parallelism" : 1 + }, { + "id" : 38, + "type" : "Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])", + "pact" : "Operator", + "contents" : "Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])", + "parallelism" : 1, + "predecessors" : [ { + "id" : 37, + "ship_strategy" : "FORWARD", + "side" : "second" + } ] + }, { + "id" : 39, + "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])", + "pact" : "Data Source", + "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])", + "parallelism" : 1 + } ] ``` +{{< /tab >}} + +{{< /tabs >}} + {{< top >}} +## ExplainDetails +```text +Print the plan for the statement with specified ExplainDetails. + +ESTIMATED_COST: generates cost information on physical node estimated by optimizer, +e.g. TableSourceScan(..., cumulative cost ={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory}) + +CHANGELOG_MODE:generates changelog mode for every physical rel node. +e.g. GroupAggregate(..., changelogMode=[I,UA,D]) + +JSON_EXECUTION_PLAN: generates the execution plan in json format of the program. +``` + ## Syntax ```sql -EXPLAIN PLAN FOR +EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] ``` For query syntax, please refer to [Queries]({{< ref "docs/dev/table/sql/queries/overview" >}}) page.