Skip to content

Commit

Permalink
[FLINK-21985][table][sql-client] Support EXPLAIN syntax in SQL Client…
Browse files Browse the repository at this point in the history
… and Table API

This closes apache#15402
  • Loading branch information
chaozwn authored and wuchong committed Mar 30, 2021
1 parent 8bbb4e3 commit d84c216
Show file tree
Hide file tree
Showing 17 changed files with 391 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.command.ClearOperation;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
Expand Down Expand Up @@ -350,6 +352,9 @@ private void callOperation(Operation operation, ExecutionMode mode) {
} else if (operation instanceof QueryOperation) {
// SELECT
callSelect((QueryOperation) operation);
} else if (operation instanceof ExplainOperation) {
// EXPLAIN
callExplain((ExplainOperation) operation);
} else {
// fallback to default implementation
executeOperation(operation);
Expand Down Expand Up @@ -452,6 +457,21 @@ private void callInsert(CatalogSinkModifyOperation operation) {
terminal.flush();
}

public void callExplain(ExplainOperation operation) {
final String explanation;
try {
TableResult tableResult = executor.executeOperation(sessionId, operation);
// show raw content instead of tableau style
explanation =
Objects.requireNonNull(tableResult.collect().next().getField(0)).toString();
} catch (SqlExecutionException | NullPointerException e) {
printExecutionException(e);
return;
}
terminal.writer().println(explanation);
terminal.flush();
}

private void executeOperation(Operation operation) {
TableResult result = executor.executeOperation(sessionId, operation);
if (TABLE_RESULT_OK == result) {
Expand Down
116 changes: 116 additions & 0 deletions flink-table/flink-sql-client/src/test/resources/sql/table.q
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,119 @@ drop table `mod`;
show tables;
Empty set
!ok
# ==========================================================================
# test explain
# ==========================================================================
CREATE TABLE IF NOT EXISTS orders (
`user` BIGINT NOT NULl,
product VARCHAR(32),
amount INT,
ts TIMESTAMP(3),
ptime AS PROCTIME(),
PRIMARY KEY(`user`) NOT ENFORCED,
WATERMARK FOR ts AS ts - INTERVAL '1' SECONDS
) with (
'connector' = 'datagen'
);
[INFO] Execute statement succeed.
!info
CREATE TABLE IF NOT EXISTS orders2 (
`user` BIGINT NOT NULl,
product VARCHAR(32),
amount INT,
ts TIMESTAMP(3),
PRIMARY KEY(`user`) NOT ENFORCED
) with (
'connector' = 'blackhole'
);
[INFO] Execute statement succeed.
!info
# test explain plan for select
explain plan for select `user`, product from orders;
== Abstract Syntax Tree ==
LogicalProject(user=[$0], product=[$1])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, orders]])
== Optimized Physical Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
== Optimized Execution Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
!ok
# test explain plan for insert
explain plan for insert into orders2 select `user`, product, amount, ts from orders;
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
+- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, orders]])
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
!ok
# test explain select
explain select `user`, product from orders;
== Abstract Syntax Tree ==
LogicalProject(user=[$0], product=[$1])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, orders]])
== Optimized Physical Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
== Optimized Execution Plan ==
Calc(select=[user, product])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- Calc(select=[user, product, ts])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
!ok
# test explain insert
explain insert into orders2 select `user`, product, amount, ts from orders;
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
+- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[-($3, 1000:INTERVAL SECOND)])
+- LogicalProject(user=[$0], product=[$1], amount=[$2], ts=[$3], ptime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, orders]])
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
+- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.orders2], fields=[user, product, amount, ts])
+- WatermarkAssigner(rowtime=[ts], watermark=[(ts - 1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, orders]], fields=[user, product, amount, ts])
!ok
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
"org.apache.flink.sql.parser.dql.SqlShowTables"
"org.apache.flink.sql.parser.dql.SqlShowPartitions"
"org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
"org.apache.flink.sql.parser.dql.SqlRichExplain"
"org.apache.flink.sql.parser.dql.SqlUnloadModule"
"org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
Expand Down Expand Up @@ -537,6 +538,7 @@
"SqlShowPartitions()"
"SqlUnloadModule()"
"SqlUseModules()"
"SqlRichExplain()"
]

# List of methods for parsing custom literals.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1605,3 +1605,17 @@ SqlShowModules SqlShowModules() :
return new SqlShowModules(startPos.plus(getPos()), requireFull);
}
}

/**
* Parses a explain module statement.
*/
SqlNode SqlRichExplain() :
{
SqlNode stmt;
}
{
<EXPLAIN> [ <PLAN> <FOR> ]
stmt = SqlQueryOrDml() {
return new SqlRichExplain(getPos(),stmt);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -482,4 +482,54 @@ public void testShowModules() {

sql("show full modules").ok("SHOW FULL MODULES");
}

@Test
public void testExplain() {
String sql = "explain plan for select * from emps";
String expected = "EXPLAIN SELECT *\n" + "FROM `EMPS`";
this.sql(sql).ok(expected);
}

@Test
public void testExplainJsonFormat() {
// Unsupported feature. Escape the test.
}

@Test
public void testExplainWithImpl() {
// Unsupported feature. Escape the test.
}

@Test
public void testExplainWithoutImpl() {
// Unsupported feature. Escape the test.
}

@Test
public void testExplainWithType() {
// Unsupported feature. Escape the test.
}

@Test
public void testExplainAsXml() {
// Unsupported feature. Escape the test.
}

@Test
public void testExplainAsJson() {
// TODO: FLINK-20562
}

@Test
public void testExplainInsert() {
String expected = "EXPLAIN INSERT INTO `EMPS1`\n" + "(SELECT *\n" + "FROM `EMPS2`)";
this.sql("explain plan for insert into emps1 select * from emps2").ok(expected);
}

@Test
public void testExplainUpsert() {
String sql = "explain plan for upsert into emps1 values (1, 2)";
String expected = "EXPLAIN UPSERT INTO `EMPS1`\n" + "VALUES (ROW(1, 2))";
this.sql(sql).ok(expected);
}
}
2 changes: 2 additions & 0 deletions flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"org.apache.flink.sql.parser.dml.SqlEndStatementSet"
"org.apache.flink.sql.parser.dql.SqlDescribeCatalog"
"org.apache.flink.sql.parser.dql.SqlDescribeDatabase"
"org.apache.flink.sql.parser.dql.SqlRichExplain"
"org.apache.flink.sql.parser.dql.SqlLoadModule"
"org.apache.flink.sql.parser.dql.SqlShowCatalogs"
"org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog"
Expand Down Expand Up @@ -483,6 +484,7 @@
"SqlShowViews()"
"SqlUnloadModule()"
"SqlUseModules()"
"SqlRichExplain()"
]

# List of methods for parsing custom literals.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1569,3 +1569,17 @@ SqlEndStatementSet SqlEndStatementSet() :
return new SqlEndStatementSet(getPos());
}
}

/**
* Parses a explain module statement.
*/
SqlNode SqlRichExplain() :
{
SqlNode stmt;
}
{
<EXPLAIN> [ <PLAN> <FOR> ]
stmt = SqlQueryOrDml() {
return new SqlRichExplain(getPos(),stmt);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.dql;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;

/** EXPLAIN (PLAN FOR)* STATEMENT sql call. */
public class SqlRichExplain extends SqlCall {

public static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("EXPLAIN", SqlKind.EXPLAIN);

private SqlNode statement;

public SqlRichExplain(SqlParserPos pos, SqlNode statement) {
super(pos);
this.statement = statement;
}

public SqlNode getStatement() {
return statement;
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return Collections.singletonList(statement);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("EXPLAIN");
statement.unparse(writer, leftPrec, rightPrec);
}

@Override
public void setOperand(int i, SqlNode operand) {
if (i == 0) {
statement = operand;
} else {
throw new UnsupportedOperationException(
"SqlExplain SqlNode only support index equals 1");
}
}
}
Loading

0 comments on commit d84c216

Please sign in to comment.