Skip to content

Commit

Permalink
[FLINK-17111][table] Support SHOW VIEWS in Flink SQL
Browse files Browse the repository at this point in the history
This closes apache#11869
  • Loading branch information
docete committed Apr 28, 2020
1 parent eddd91a commit ea51116
Show file tree
Hide file tree
Showing 17 changed files with 318 additions and 16 deletions.
16 changes: 14 additions & 2 deletions flink-python/pyflink/table/table_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,26 @@ def list_databases(self):

def list_tables(self):
"""
Gets the names of all tables in the current database of the current catalog.
Gets the names of all tables and views in the current database of the current catalog.
It returns both temporary and permanent tables and views.
:return: List of table names in the current database of the current catalog.
:return: List of table and view names in the current database of the current catalog.
:rtype: list[str]
"""
j_table_name_array = self._j_tenv.listTables()
return [item for item in j_table_name_array]

def list_views(self):
"""
Gets the names of all views in the current database of the current catalog.
It returns both temporary and permanent views.
:return: List of view names in the current database of the current catalog.
:rtype: list[str]
"""
j_view_name_array = self._j_tenv.listViews()
return [item for item in j_view_name_array]

def list_user_defined_functions(self):
"""
Gets the names of all user defined functions registered in this environment.
Expand Down
3 changes: 3 additions & 0 deletions flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"org.apache.flink.sql.parser.dql.SqlShowDatabases"
"org.apache.flink.sql.parser.dql.SqlShowFunctions"
"org.apache.flink.sql.parser.dql.SqlShowTables"
"org.apache.flink.sql.parser.dql.SqlShowViews"
"org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
"org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
Expand Down Expand Up @@ -92,6 +93,7 @@
"STRING"
"TABLES"
"USE"
"VIEWS"
"WATERMARK"
]

Expand Down Expand Up @@ -443,6 +445,7 @@
"SqlShowTables()"
"SqlRichDescribeTable()"
"SqlAlterTable()"
"SqlShowViews()"
]

# List of methods for parsing custom literals.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,20 @@ SqlShowFunctions SqlShowFunctions() :
}
}

/**
* Parse a "Show Views" metadata query command.
*/
SqlShowViews SqlShowViews() :
{
SqlParserPos pos;
}
{
<SHOW> <VIEWS> { pos = getPos(); }
{
return new SqlShowViews(pos);
}
}

/**
* Parse a "Show Tables" metadata query command.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.dql;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;

import java.util.Collections;
import java.util.List;

/**
* SHOW VIEWS sql call.
*/
public class SqlShowViews extends SqlCall {

public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("SHOW VIEWS", SqlKind.OTHER);

public SqlShowViews(SqlParserPos pos) {
super(pos);
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return Collections.EMPTY_LIST;
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("SHOW VIEWS");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,11 @@ public void testDropTemporaryView() {
sql(sql).ok(expected);
}

@Test
public void testShowViews() {
sql("show views").ok("SHOW VIEWS");
}

// Override the test because our ROW field type default is nullable,
// which is different with Calcite.
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,15 @@ default Table fromValues(DataType rowType, Object... values) {
*/
String[] listTables();

/**
* Gets the names of all views available in the current namespace (the current database of the current catalog).
* It returns both temporary and permanent views.
*
* @return A list of the names of all registered views in the current database of the current catalog.
* @see #listTemporaryViews()
*/
String[] listViews();

/**
* Gets the names of all temporary tables and views available in the current namespace (the current
* database of the current catalog).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.flink.table.operations.ShowDatabasesOperation;
import org.apache.flink.table.operations.ShowFunctionsOperation;
import org.apache.flink.table.operations.ShowTablesOperation;
import org.apache.flink.table.operations.ShowViewsOperation;
import org.apache.flink.table.operations.TableSourceQueryOperation;
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
Expand Down Expand Up @@ -142,7 +143,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
"Unsupported SQL query! executeSql() only accepts a single SQL statement of type " +
"CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, " +
"CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, " +
"SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, CREATE VIEW, DROP VIEW.";
"SHOW CATALOGS, SHOW DATABASES, SHOW TABLES, SHOW FUNCTIONS, CREATE VIEW, DROP VIEW, SHOW VIEWS.";

/**
* Provides necessary methods for {@link ConnectTableDescriptor}.
Expand Down Expand Up @@ -520,6 +521,14 @@ public String[] listTables() {
.toArray(String[]::new);
}

@Override
public String[] listViews() {
return catalogManager.listViews()
.stream()
.sorted()
.toArray(String[]::new);
}

@Override
public String[] listTemporaryTables() {
return catalogManager.listTemporaryTables()
Expand Down Expand Up @@ -795,6 +804,8 @@ private TableResult executeOperation(Operation operation) {
return buildShowResult(listTables());
} else if (operation instanceof ShowFunctionsOperation) {
return buildShowResult(listFunctions());
} else if (operation instanceof ShowViewsOperation) {
return buildShowResult(listViews());
} else {
throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,7 @@ public Set<String> listTemporaryTables() {
* @return names of registered temporary views
*/
public Set<String> listTemporaryViews() {
return listTemporaryTablesInternal(getCurrentCatalog(), getCurrentDatabase())
.filter(e -> e.getValue() instanceof CatalogView)
return listTemporaryViewsInternal(getCurrentCatalog(), getCurrentDatabase())
.map(e -> e.getKey().getObjectName())
.collect(Collectors.toSet());
}
Expand All @@ -415,6 +414,43 @@ private Stream<Map.Entry<ObjectIdentifier, CatalogBaseTable>> listTemporaryTable
});
}

/**
* Returns an array of names of all views(both temporary and permanent) registered in
* the namespace of the current catalog and database.
*
* @return names of all registered views
*/
public Set<String> listViews() {
return listViews(getCurrentCatalog(), getCurrentDatabase());
}

/**
* Returns an array of names of all views(both temporary and permanent) registered in
* the namespace of the current catalog and database.
*
* @return names of registered views
*/
public Set<String> listViews(String catalogName, String databaseName) {
Catalog currentCatalog = catalogs.get(getCurrentCatalog());

try {
return Stream.concat(
currentCatalog.listViews(getCurrentDatabase()).stream(),
listTemporaryViewsInternal(catalogName, databaseName)
.map(e -> e.getKey().getObjectName())
).collect(Collectors.toSet());
} catch (DatabaseNotExistException e) {
throw new ValidationException("Current database does not exist", e);
}
}

private Stream<Map.Entry<ObjectIdentifier, CatalogBaseTable>> listTemporaryViewsInternal(
String catalogName,
String databaseName) {
return listTemporaryTablesInternal(catalogName, databaseName)
.filter(e -> e.getValue() instanceof CatalogView);
}

/**
* Lists all available schemas in the root of the catalog manager. It is not equivalent to listing all catalogs
* as it includes also different catalog parts of the temporary objects.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations;

/**
* Operation to describe a SHOW VIEWS statement.
*/
public class ShowViewsOperation implements ShowOperation {

@Override
public String asSummaryString() {
return "SHOW VIEWS";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.sql.parser.dql.SqlShowDatabases;
import org.apache.flink.sql.parser.dql.SqlShowFunctions;
import org.apache.flink.sql.parser.dql.SqlShowTables;
import org.apache.flink.sql.parser.dql.SqlShowViews;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
Expand All @@ -66,6 +67,7 @@
import org.apache.flink.table.operations.ShowDatabasesOperation;
import org.apache.flink.table.operations.ShowFunctionsOperation;
import org.apache.flink.table.operations.ShowTablesOperation;
import org.apache.flink.table.operations.ShowViewsOperation;
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
Expand Down Expand Up @@ -191,6 +193,8 @@ public static Optional<Operation> convert(
return Optional.of(converter.convertCreateView((SqlCreateView) validated));
} else if (validated instanceof SqlDropView) {
return Optional.of(converter.convertDropView((SqlDropView) validated));
} else if (validated instanceof SqlShowViews) {
return Optional.of(converter.convertShowViews((SqlShowViews) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
Expand Down Expand Up @@ -567,6 +571,11 @@ private Operation convertDropView(SqlDropView sqlDropView) {
return new DropViewOperation(identifier, sqlDropView.getIfExists(), sqlDropView.isTemporary());
}

/** Convert SHOW VIEWS statement. */
private Operation convertShowViews(SqlShowViews sqlShowViews) {
return new ShowViewsOperation();
}

/** Fallback method for sql query. */
private Operation convertSqlQuery(SqlNode node) {
return toQueryOperation(flinkPlanner, node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
package org.apache.flink.table.planner.calcite

import org.apache.flink.sql.parser.ExtendedSqlNode
import org.apache.flink.sql.parser.dql.{SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables}
import org.apache.flink.sql.parser.dql.{SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews}
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader

import com.google.common.collect.ImmutableList
import org.apache.calcite.config.NullCollation
import org.apache.calcite.plan._
Expand All @@ -34,7 +33,6 @@ import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{SqlRexConvertletTable, SqlToRelConverter}
import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}

import java.lang.{Boolean => JBoolean}
import java.util
import java.util.function.{Function => JFunction}
Expand Down Expand Up @@ -127,7 +125,8 @@ class FlinkPlannerImpl(
|| sqlNode.isInstanceOf[SqlShowCatalogs]
|| sqlNode.isInstanceOf[SqlShowDatabases]
|| sqlNode.isInstanceOf[SqlShowTables]
|| sqlNode.isInstanceOf[SqlShowFunctions]) {
|| sqlNode.isInstanceOf[SqlShowFunctions]
|| sqlNode.isInstanceOf[SqlShowViews]) {
return sqlNode
}
validator.validate(sqlNode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,42 @@ class TableEnvironmentTest {
tableEnv.executeSql("DROP TEMPORARY VIEW default_catalog.default_database.T2")
}

@Test
def testExecuteSqlWithShowViews(): Unit = {
val createTableStmt =
"""
|CREATE TABLE tbl1 (
| a bigint,
| b int,
| c varchar
|) with (
| 'connector' = 'COLLECTION',
| 'is-bounded' = 'false'
|)
""".stripMargin
val tableResult1 = tableEnv.executeSql(createTableStmt)
assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind)

val tableResult2 = tableEnv.executeSql("CREATE VIEW view1 AS SELECT * FROM tbl1")
assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind)

val tableResult3 = tableEnv.executeSql("SHOW VIEWS")
assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult3.getResultKind)
checkData(
util.Arrays.asList(Row.of("view1")).iterator(),
tableResult3.collect())

val tableResult4 = tableEnv.executeSql("CREATE TEMPORARY VIEW view2 AS SELECT * FROM tbl1")
assertEquals(ResultKind.SUCCESS, tableResult4.getResultKind)

// SHOW VIEWS also shows temporary views
val tableResult5 = tableEnv.executeSql("SHOW VIEWS")
assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult5.getResultKind)
checkData(
util.Arrays.asList(Row.of("view1"), Row.of("view2")).iterator(),
tableResult5.collect())
}

private def checkData(expected: util.Iterator[Row], actual: util.Iterator[Row]): Unit = {
while (expected.hasNext && actual.hasNext) {
assertEquals(expected.next(), actual.next())
Expand Down
Loading

0 comments on commit ea51116

Please sign in to comment.