From ede4da5b364d657096f593e5c4e99393d5137dc1 Mon Sep 17 00:00:00 2001 From: Shengkai <33114724+fsk119@users.noreply.github.com> Date: Fri, 24 Jul 2020 14:13:55 +0800 Subject: [PATCH] [FLINK-18616][table] Add SHOW CURRENT DDLs This closes #12934 --- docs/dev/table/hive/hive_dialect.md | 23 ++++--- docs/dev/table/hive/hive_dialect.zh.md | 22 ++++--- docs/dev/table/sql/show.md | 40 ++++++++++++- docs/dev/table/sql/show.zh.md | 40 ++++++++++++- .../connectors/hive/HiveDialectITCase.java | 4 ++ .../flink/table/client/cli/CliClient.java | 30 ++++++++++ .../table/client/cli/SqlCommandParser.java | 12 ++++ .../flink/table/client/cli/CliClientTest.java | 42 +++++++++---- .../client/cli/SqlCommandParserTest.java | 6 ++ .../client/cli/TestingExecutorBuilder.java | 2 + .../gateway/local/LocalExecutorITCase.java | 6 ++ .../src/main/codegen/data/Parser.tdd | 3 + .../src/main/codegen/includes/parserImpls.ftl | 17 ++++++ .../hive/FlinkHiveSqlParserImplTest.java | 10 ++++ .../src/main/codegen/data/Parser.tdd | 3 + .../src/main/codegen/includes/parserImpls.ftl | 15 +++++ .../sql/parser/dql/SqlShowCurrentCatalog.java | 60 +++++++++++++++++++ .../parser/dql/SqlShowCurrentDatabase.java | 60 +++++++++++++++++++ .../sql/parser/FlinkSqlParserImplTest.java | 10 ++++ .../api/internal/TableEnvironmentImpl.java | 6 ++ .../ShowCurrentCatalogOperation.java | 29 +++++++++ .../ShowCurrentDatabaseOperation.java | 29 +++++++++ .../operations/SqlToOperationConverter.java | 18 ++++++ .../planner/calcite/FlinkPlannerImpl.scala | 4 +- .../planner/catalog/CatalogTableITCase.scala | 9 ++- .../sqlexec/SqlToOperationConverter.java | 18 ++++++ .../table/api/internal/TableEnvImpl.scala | 4 ++ .../table/calcite/FlinkPlannerImpl.scala | 4 +- .../api/batch/BatchTableEnvironmentTest.scala | 9 ++- .../table/catalog/CatalogTableITCase.scala | 8 ++- 30 files changed, 505 insertions(+), 38 deletions(-) create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCurrentCatalog.java create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCurrentDatabase.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentCatalogOperation.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentDatabaseOperation.java diff --git a/docs/dev/table/hive/hive_dialect.md b/docs/dev/table/hive/hive_dialect.md index c5c039ca226cb..b6d58e4d80501 100644 --- a/docs/dev/table/hive/hive_dialect.md +++ b/docs/dev/table/hive/hive_dialect.md @@ -51,7 +51,7 @@ execution: configuration: table.sql-dialect: hive - + {% endhighlight %} You can also set the dialect after the SQL Client has launched. @@ -87,12 +87,21 @@ This section lists the supported DDLs with the Hive dialect. We'll mainly focus here. You can refer to [Hive doc](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL) for the semantics of each DDL statement. +### CATALOG + +#### Show + +{% highlight sql %} +SHOW CURRENT CATALOG; +{% endhighlight %} + ### DATABASE #### Show {% highlight sql %} SHOW DATABASES; +SHOW CURRENT DATABASE; {% endhighlight %} #### Create @@ -157,13 +166,13 @@ CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name ] [LOCATION fs_path] [TBLPROPERTIES (property_name=property_value, ...)] - + row_format: : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char] [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char] [NULL DEFINED AS char] | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, ...)] - + file_format: : SEQUENCEFILE | TEXTFILE @@ -172,10 +181,10 @@ file_format: | PARQUET | AVRO | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname - + column_constraint: : NOT NULL [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]] - + table_constraint: : [CONSTRAINT constraint_name] PRIMARY KEY (col_name, ...) [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]] {% endhighlight %} @@ -216,9 +225,9 @@ present, the operation will be applied to the corresponding partition instead of {% highlight sql %} ALTER TABLE table_name [PARTITION partition_spec] SET SERDE serde_class_name [WITH SERDEPROPERTIES serde_properties]; - + ALTER TABLE table_name [PARTITION partition_spec] SET SERDEPROPERTIES serde_properties; - + serde_properties: : (property_name = property_value, property_name = property_value, ... ) {% endhighlight %} diff --git a/docs/dev/table/hive/hive_dialect.zh.md b/docs/dev/table/hive/hive_dialect.zh.md index 1236ad8324fef..5ea0d4151168a 100644 --- a/docs/dev/table/hive/hive_dialect.zh.md +++ b/docs/dev/table/hive/hive_dialect.zh.md @@ -51,7 +51,7 @@ execution: configuration: table.sql-dialect: hive - + {% endhighlight %} You can also set the dialect after the SQL Client has launched. @@ -87,6 +87,14 @@ This section lists the supported DDLs with the Hive dialect. We'll mainly focus here. You can refer to [Hive doc](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL) for the semantics of each DDL statement. +### CATALOG + +#### Show + +{% highlight sql %} +SHOW CURRENT CATALOG; +{% endhighlight %} + ### DATABASE #### Show @@ -157,13 +165,13 @@ CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name ] [LOCATION fs_path] [TBLPROPERTIES (property_name=property_value, ...)] - + row_format: : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char] [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char] [NULL DEFINED AS char] | SERDE serde_name [WITH SERDEPROPERTIES (property_name=property_value, ...)] - + file_format: : SEQUENCEFILE | TEXTFILE @@ -172,10 +180,10 @@ file_format: | PARQUET | AVRO | INPUTFORMAT input_format_classname OUTPUTFORMAT output_format_classname - + column_constraint: : NOT NULL [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]] - + table_constraint: : [CONSTRAINT constraint_name] PRIMARY KEY (col_name, ...) [[ENABLE|DISABLE] [VALIDATE|NOVALIDATE] [RELY|NORELY]] {% endhighlight %} @@ -216,9 +224,9 @@ present, the operation will be applied to the corresponding partition instead of {% highlight sql %} ALTER TABLE table_name [PARTITION partition_spec] SET SERDE serde_class_name [WITH SERDEPROPERTIES serde_properties]; - + ALTER TABLE table_name [PARTITION partition_spec] SET SERDEPROPERTIES serde_properties; - + serde_properties: : (property_name = property_value, property_name = property_value, ... ) {% endhighlight %} diff --git a/docs/dev/table/sql/show.md b/docs/dev/table/sql/show.md index 4c9e2419e907c..3ce960f3aa667 100644 --- a/docs/dev/table/sql/show.md +++ b/docs/dev/table/sql/show.md @@ -25,12 +25,14 @@ under the License. * This will be replaced by the TOC {:toc} -SHOW statements are used to list all catalogs, or list all databases in the current catalog, or list all tables/views in the current catalog and the current database, or list all functions including temp system functions, system functions, temp catalog functions and catalog functions in the current catalog and the current database. +SHOW statements are used to list all catalogs, or list all databases in the current catalog, or list all tables/views in the current catalog and the current database, or show current catalog and database, or list all functions including temp system functions, system functions, temp catalog functions and catalog functions in the current catalog and the current database. Flink SQL supports the following SHOW statements for now: - SHOW CATALOGS +- SHOW CURRENT CATALOG - SHOW DATABASES -- SHOW TABLES +- SHOW CURRENT DATABASE +- SHOW TABLES - SHOW VIEWS - SHOW FUNCTIONS @@ -55,6 +57,14 @@ tEnv.executeSql("SHOW CATALOGS").print(); // | default_catalog | // +-----------------+ +// show current catalog +tEnv.executeSql("SHOW CURRENT CATALOG").print(); +// +----------------------+ +// | current catalog name | +// +----------------------+ +// | default_catalog | +// +----------------------+ + // show databases tEnv.executeSql("SHOW DATABASES").print(); // +------------------+ @@ -63,6 +73,14 @@ tEnv.executeSql("SHOW DATABASES").print(); // | default_database | // +------------------+ +// show current database +tEnv.executeSql("SHOW CURRENT DATABASE").print(); +// +-----------------------+ +// | current database name | +// +-----------------------+ +// | default_database | +// +-----------------------+ + // create a table tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)"); // show tables @@ -244,6 +262,14 @@ SHOW CATALOGS Show all catalogs. +## SHOW CURRENT CATALOG + +{% highlight sql %} +SHOW CURRENT CATALOG +{% endhighlight %} + +Show current catalog. + ## SHOW DATABASES {% highlight sql %} @@ -252,6 +278,14 @@ SHOW DATABASES Show all databases in the current catalog. +## SHOW CURRENT DATABASE + +{% highlight sql %} +SHOW CURRENT DATABASE +{% endhighlight %} + +Show current database. + ## SHOW TABLES {% highlight sql %} @@ -274,4 +308,4 @@ Show all views in the current catalog and the current database. SHOW FUNCTIONS {% endhighlight %} -Show all functions including temp system functions, system functions, temp catalog functions and catalog functions in the current catalog and current database. \ No newline at end of file +Show all functions including temp system functions, system functions, temp catalog functions and catalog functions in the current catalog and current database. diff --git a/docs/dev/table/sql/show.zh.md b/docs/dev/table/sql/show.zh.md index 9f6c648330fd5..1da5e649ed896 100644 --- a/docs/dev/table/sql/show.zh.md +++ b/docs/dev/table/sql/show.zh.md @@ -25,12 +25,14 @@ under the License. * This will be replaced by the TOC {:toc} -SHOW 语句用于列出所有的 catalog,或者列出当前 catalog 中所有的 database,或者列出当前 catalog 和当前 database 的所有表或视图,或者列出所有的 function,包括:临时系统 function,系统 function,临时 catalog function,当前 catalog 和 database 中的 catalog function。 +SHOW 语句用于列出所有的 catalog,或者列出当前 catalog 中所有的 database,或者列出当前 catalog 和当前 database 的所有表或视图,或者列出当前正在使用的 catalog 和 database, 或者列出所有的 function,包括:临时系统 function,系统 function,临时 catalog function,当前 catalog 和 database 中的 catalog function。 目前 Flink SQL 支持下列 SHOW 语句: - SHOW CATALOGS +- SHOW CURRENT CATALOG - SHOW DATABASES -- SHOW TABLES +- SHOW CURRENT DATABASE +- SHOW TABLES - SHOW VIEWS - SHOW FUNCTIONS @@ -55,6 +57,14 @@ tEnv.executeSql("SHOW CATALOGS").print(); // | default_catalog | // +-----------------+ +// show current catalog +tEnv.executeSql("SHOW CURRENT CATALOG").print(); +// +----------------------+ +// | current catalog name | +// +----------------------+ +// | default_catalog | +// +----------------------+ + // show databases tEnv.executeSql("SHOW DATABASES").print(); // +------------------+ @@ -63,6 +73,14 @@ tEnv.executeSql("SHOW DATABASES").print(); // | default_database | // +------------------+ +// show current database +tEnv.executeSql("SHOW CURRENT DATABASE").print(); +// +-----------------------+ +// | current database name | +// +-----------------------+ +// | default_database | +// +-----------------------+ + // create a table tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)"); // show tables @@ -244,6 +262,14 @@ SHOW CATALOGS 展示所有的 catalog。 +## SHOW CURRENT CATALOG + +{% highlight sql %} +SHOW CURRENT CATALOG +{% endhighlight %} + +显示当前正在使用的 catalog。 + ## SHOW DATABASES {% highlight sql %} @@ -252,6 +278,14 @@ SHOW DATABASES 展示当前 catalog 中所有的 database。 +## SHOW CURRENT DATABASE + +{% highlight sql %} +SHOW CURRENT DATABASE +{% endhighlight %} + +显示当前正在使用的 database。 + ## SHOW TABLES {% highlight sql %} @@ -274,4 +308,4 @@ SHOW VIEWS SHOW FUNCTIONS {% endhighlight %} -展示所有的 function,包括:临时系统 function, 系统 function, 临时 catalog function,当前 catalog 和 database 中的 catalog function。 \ No newline at end of file +展示所有的 function,包括:临时系统 function, 系统 function, 临时 catalog function,当前 catalog 和 database 中的 catalog function。 diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java index ded35fb053396..733e0164a73a1 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java @@ -436,6 +436,10 @@ public void testCatalog() { List databases = Lists.newArrayList(tableEnv.executeSql("show databases").collect()); assertEquals(1, databases.size()); assertEquals(DEFAULT_BUILTIN_DATABASE, databases.get(0).toString()); + String catalogName = tableEnv.executeSql("show current catalog").collect().next().toString(); + assertEquals(DEFAULT_BUILTIN_CATALOG, catalogName); + String databaseName = tableEnv.executeSql("show current database").collect().next().toString(); + assertEquals(DEFAULT_BUILTIN_DATABASE, databaseName); } @Test diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index c03c5979d8435..a7fd110d6517c 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -286,9 +286,15 @@ private void callCommand(SqlCommandCall cmdCall) { case SHOW_CATALOGS: callShowCatalogs(); break; + case SHOW_CURRENT_CATALOG: + callShowCurrentCatalog(); + break; case SHOW_DATABASES: callShowDatabases(); break; + case SHOW_CURRENT_DATABASE: + callShowCurrentDatabase(); + break; case SHOW_TABLES: callShowTables(); break; @@ -445,6 +451,18 @@ private void callShowCatalogs() { terminal.flush(); } + private void callShowCurrentCatalog() { + String currentCatalog; + try { + currentCatalog = executor.executeSql(sessionId, "SHOW CURRENT CATALOG").collect().next().toString(); + } catch (SqlExecutionException e) { + printExecutionException(e); + return; + } + terminal.writer().println(currentCatalog); + terminal.flush(); + } + private void callShowDatabases() { final List dbs; try { @@ -461,6 +479,18 @@ private void callShowDatabases() { terminal.flush(); } + private void callShowCurrentDatabase() { + String currentDatabase; + try { + currentDatabase = executor.executeSql(sessionId, "SHOW CURRENT DATABASE").collect().next().toString(); + } catch (SqlExecutionException e) { + printExecutionException(e); + return; + } + terminal.writer().println(currentDatabase); + terminal.flush(); + } + private void callShowTables() { final List tables; try { diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java index 63ee8448c4dc7..8c5671ece4a67 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java @@ -26,6 +26,8 @@ import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.ShowCatalogsOperation; +import org.apache.flink.table.operations.ShowCurrentCatalogOperation; +import org.apache.flink.table.operations.ShowCurrentDatabaseOperation; import org.apache.flink.table.operations.ShowDatabasesOperation; import org.apache.flink.table.operations.ShowFunctionsOperation; import org.apache.flink.table.operations.ShowTablesOperation; @@ -140,9 +142,15 @@ private static SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) { } else if (operation instanceof ShowCatalogsOperation) { cmd = SqlCommand.SHOW_CATALOGS; operands = new String[0]; + } else if (operation instanceof ShowCurrentCatalogOperation) { + cmd = SqlCommand.SHOW_CURRENT_CATALOG; + operands = new String[0]; } else if (operation instanceof ShowDatabasesOperation) { cmd = SqlCommand.SHOW_DATABASES; operands = new String[0]; + } else if (operation instanceof ShowCurrentDatabaseOperation) { + cmd = SqlCommand.SHOW_CURRENT_DATABASE; + operands = new String[0]; } else if (operation instanceof ShowTablesOperation) { cmd = SqlCommand.SHOW_TABLES; operands = new String[0]; @@ -225,8 +233,12 @@ enum SqlCommand { SHOW_CATALOGS, + SHOW_CURRENT_CATALOG, + SHOW_DATABASES, + SHOW_CURRENT_DATABASE, + SHOW_TABLES, SHOW_FUNCTIONS, diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java index ed068253236ed..5dd55707d683a 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java @@ -20,7 +20,10 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ResultKind; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.cli.utils.SqlParserHelper; import org.apache.flink.table.client.cli.utils.TerminalUtils; import org.apache.flink.table.client.cli.utils.TerminalUtils.MockOutputStream; @@ -33,6 +36,7 @@ import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.TestLogger; import org.jline.reader.Candidate; @@ -73,6 +77,7 @@ public class CliClientTest extends TestLogger { private static final String INSERT_INTO_STATEMENT = "INSERT INTO MyTable SELECT * FROM MyOtherTable"; private static final String INSERT_OVERWRITE_STATEMENT = "INSERT OVERWRITE MyTable SELECT * FROM MyOtherTable"; private static final String SELECT_STATEMENT = "SELECT * FROM MyOtherTable"; + private static final Row SHOW_ROW = new Row(1); @Test public void testUpdateSubmission() throws Exception { @@ -178,36 +183,53 @@ public void testCreateTableWithInvalidDdl() throws Exception { } @Test - public void testUseCatalog() throws Exception { + public void testUseCatalogAndShowCurrentCatalog() throws Exception { TestingExecutor executor = new TestingExecutorBuilder() .setExecuteSqlConsumer((ignored1, sql) -> { - if (!sql.toLowerCase().equals("use catalog cat")) { - throw new SqlExecutionException("unexpected catalog name: cat"); - } else { + if (sql.toLowerCase().equals("use catalog cat")) { return TestTableResult.TABLE_RESULT_OK; - } - }) + } else if (sql.toLowerCase().equals("show current catalog")){ + SHOW_ROW.setField(0, "cat"); + return new TestTableResult(ResultKind.SUCCESS_WITH_CONTENT, + TableSchema.builder().field("current catalog name", DataTypes.STRING()).build(), + CloseableIterator.ofElement(SHOW_ROW, ele -> {})); + } else { + throw new SqlExecutionException("unexpected sql statement: " + sql); + }}) .build(); String output = testExecuteSql(executor, "use catalog cat;"); assertThat(executor.getNumExecuteSqlCalls(), is(1)); assertFalse(output.contains("unexpected catalog name")); + + output = testExecuteSql(executor, "show current catalog;"); + assertThat(executor.getNumExecuteSqlCalls(), is(2)); + assertTrue(output.contains("cat")); } @Test - public void testUseDatabase() throws Exception { + public void testUseDatabaseAndShowCurrentDatabase() throws Exception { TestingExecutor executor = new TestingExecutorBuilder() .setExecuteSqlConsumer((ignored1, sql) -> { - if (!sql.toLowerCase().equals("use db")) { - throw new SqlExecutionException("unexpected database name: db"); - } else { + if (sql.toLowerCase().equals("use db")) { return TestTableResult.TABLE_RESULT_OK; + } else if (sql.toLowerCase().equals("show current database")) { + SHOW_ROW.setField(0, "db"); + return new TestTableResult(ResultKind.SUCCESS_WITH_CONTENT, + TableSchema.builder().field("current database name", DataTypes.STRING()).build(), + CloseableIterator.ofElement(SHOW_ROW, ele -> {})); + } else { + throw new SqlExecutionException("unexpected database name: db"); } }) .build(); String output = testExecuteSql(executor, "use db;"); assertThat(executor.getNumExecuteSqlCalls(), is(1)); assertFalse(output.contains("unexpected database name")); + + output = testExecuteSql(executor, "show current database;"); + assertThat(executor.getNumExecuteSqlCalls(), is(2)); + assertTrue(output.contains("db")); } @Test diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java index 879d6914f3358..ba51dcc8ea353 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java @@ -213,9 +213,15 @@ public void testCommands() throws Exception { // show catalogs TestItem.validSql("SHOW CATALOGS;", SqlCommand.SHOW_CATALOGS), TestItem.validSql(" SHOW CATALOGS ;", SqlCommand.SHOW_CATALOGS), + // show current catalog + TestItem.validSql("show current catalog", SqlCommand.SHOW_CURRENT_CATALOG), + TestItem.validSql("show current catalog", SqlCommand.SHOW_CURRENT_CATALOG), // show databases TestItem.validSql("SHOW DATABASES;", SqlCommand.SHOW_DATABASES), TestItem.validSql(" SHOW DATABASES ;", SqlCommand.SHOW_DATABASES), + // show current database + TestItem.validSql("show current database", SqlCommand.SHOW_CURRENT_DATABASE), + TestItem.validSql("show current database", SqlCommand.SHOW_CURRENT_DATABASE), // show tables TestItem.validSql("SHOW TABLES;", SqlCommand.SHOW_TABLES), TestItem.validSql(" SHOW TABLES ;", SqlCommand.SHOW_TABLES), diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java index 9ba2b1a7ce3d9..7339fa2386588 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutorBuilder.java @@ -36,6 +36,8 @@ */ class TestingExecutorBuilder { + private String defaultCurrentCatalogName = "default_catalog"; + private String defaultCurrentDatabaseName = "default_database"; private List>>, SqlExecutionException>> resultChangesSupplier = Collections.emptyList(); private List, SqlExecutionException>> snapshotResultsSupplier = Collections.emptyList(); private List, SqlExecutionException>> resultPagesSupplier = Collections.emptyList(); diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index de0dbe7fa15ac..6895c2df6e3fc 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -898,6 +898,8 @@ public void testUseCatalogAndUseDatabase() throws Exception { executor.executeSql(sessionId, "use catalog hivecatalog"); + assertShowResult(executor.executeSql(sessionId, "SHOW CURRENT CATALOG"), Collections.singletonList("hivecatalog")); + assertShowResult( executor.executeSql(sessionId, "SHOW DATABASES"), Arrays.asList(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE, HiveCatalog.DEFAULT_DB) @@ -914,6 +916,10 @@ public void testUseCatalogAndUseDatabase() throws Exception { executor.executeSql(sessionId, "SHOW TABLES"), Collections.singletonList(DependencyTest.TestHiveCatalogFactory.TEST_TABLE) ); + + assertShowResult(executor.executeSql(sessionId, "SHOW CURRENT DATABASE"), + Collections.singletonList(DependencyTest.TestHiveCatalogFactory.ADDITIONAL_TEST_DATABASE)); + } finally { executor.closeSession(sessionId); } diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd index e000d34dd61fc..16bba9a8baff6 100644 --- a/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser-hive/src/main/codegen/data/Parser.tdd @@ -79,7 +79,9 @@ "org.apache.flink.sql.parser.dql.SqlDescribeCatalog" "org.apache.flink.sql.parser.dql.SqlDescribeDatabase" "org.apache.flink.sql.parser.dql.SqlShowCatalogs" + "org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog" "org.apache.flink.sql.parser.dql.SqlShowDatabases" + "org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase" "org.apache.flink.sql.parser.dql.SqlShowFunctions" "org.apache.flink.sql.parser.dql.SqlShowTables" "org.apache.flink.sql.parser.dql.SqlRichDescribeTable" @@ -507,6 +509,7 @@ statementParserMethods: [ "RichSqlInsert()" "SqlShowCatalogs()" + "SqlShowCurrentCatalogOrDatabase()" "SqlDescribeCatalog()" "SqlUseCatalog()" "SqlShowDatabases()" diff --git a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl index b2f2b503d16d7..8a3af91c16117 100644 --- a/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser-hive/src/main/codegen/includes/parserImpls.ftl @@ -1048,6 +1048,23 @@ SqlShowCatalogs SqlShowCatalogs() : } } +SqlCall SqlShowCurrentCatalogOrDatabase() : +{ +} +{ + ( + + { + return new SqlShowCurrentCatalog(getPos()); + } + | + + { + return new SqlShowCurrentDatabase(getPos()); + } + ) +} + SqlDescribeCatalog SqlDescribeCatalog() : { SqlIdentifier catalogName; diff --git a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java index c7cf54a0f7e5d..71bfb173a4654 100644 --- a/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java +++ b/flink-table/flink-sql-parser-hive/src/test/java/org/apache/flink/sql/parser/hive/FlinkHiveSqlParserImplTest.java @@ -52,6 +52,11 @@ public void testShowDatabases() { sql("show databases").ok("SHOW DATABASES"); } + @Test + public void testShowCurrentDatabase() { + sql("show current database").ok("SHOW CURRENT DATABASE"); + } + @Test public void testUseDatabase() { // use database @@ -259,6 +264,11 @@ public void testShowCatalogs() { sql("show catalogs").ok("SHOW CATALOGS"); } + @Test + public void testShowCurrentCatalog() { + sql("show current catalog").ok("SHOW CURRENT CATALOG"); + } + @Test public void testUseCatalog() { sql("use catalog cat").ok("USE CATALOG `CAT`"); diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 0ed629eb08f0a..18a80788813d6 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -58,7 +58,9 @@ "org.apache.flink.sql.parser.dql.SqlDescribeCatalog" "org.apache.flink.sql.parser.dql.SqlDescribeDatabase" "org.apache.flink.sql.parser.dql.SqlShowCatalogs" + "org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog" "org.apache.flink.sql.parser.dql.SqlShowDatabases" + "org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase" "org.apache.flink.sql.parser.dql.SqlShowFunctions" "org.apache.flink.sql.parser.dql.SqlShowTables" "org.apache.flink.sql.parser.dql.SqlShowViews" @@ -444,6 +446,7 @@ statementParserMethods: [ "RichSqlInsert()" "SqlShowCatalogs()" + "SqlShowCurrentCatalogOrDatabase()" "SqlDescribeCatalog()" "SqlUseCatalog()" "SqlShowDatabases()" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 249ba136b10bd..105bdc82252e7 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -58,6 +58,21 @@ SqlShowCatalogs SqlShowCatalogs() : } } +SqlCall SqlShowCurrentCatalogOrDatabase() : +{ +} +{ + ( + { + return new SqlShowCurrentCatalog(getPos()); + } + | + { + return new SqlShowCurrentDatabase(getPos()); + } + ) +} + SqlDescribeCatalog SqlDescribeCatalog() : { SqlIdentifier catalogName; diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCurrentCatalog.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCurrentCatalog.java new file mode 100644 index 0000000000000..6ef7afd73febb --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCurrentCatalog.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** + * SHOW CURRENT CATALOG sql call. + */ +public class SqlShowCurrentCatalog extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("SHOW CURRENT CATALOG", SqlKind.OTHER); + + public SqlShowCurrentCatalog(SqlParserPos pos) { + super(pos); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + return Collections.emptyList(); + } + + @Override + public void unparse( + SqlWriter writer, + int leftPrec, + int rightPrec) { + writer.keyword("SHOW CURRENT CATALOG"); + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCurrentDatabase.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCurrentDatabase.java new file mode 100644 index 0000000000000..942c4f51ae6ce --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCurrentDatabase.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** +* Show current database. + */ +public class SqlShowCurrentDatabase extends SqlCall { + + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("SHOW CURRENT DATABASE", SqlKind.OTHER); + + public SqlShowCurrentDatabase(SqlParserPos pos) { + super(pos); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + return Collections.EMPTY_LIST; + } + + @Override + public void unparse( + SqlWriter writer, + int leftPrec, + int rightPrec) { + writer.keyword("SHOW CURRENT DATABASE"); + } +} diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 7b01d4362f31c..0c05dba0d3c9e 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -47,6 +47,11 @@ public void testShowCatalogs() { sql("show catalogs").ok("SHOW CATALOGS"); } + @Test + public void testShowCurrentCatalog() { + sql("show current catalog").ok("SHOW CURRENT CATALOG"); + } + @Test public void testDescribeCatalog() { sql("describe catalog a").ok("DESCRIBE CATALOG `A`"); @@ -88,6 +93,11 @@ public void testShowDataBases() { sql("show databases").ok("SHOW DATABASES"); } + @Test + public void testShowCurrentDatabase() { + sql("show current database").ok("SHOW CURRENT DATABASE"); + } + @Test public void testUseDataBase() { sql("use default_db").ok("USE `DEFAULT_DB`"); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index e853d28d7657b..0724e9f832ab3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -88,6 +88,8 @@ import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.SelectSinkOperation; import org.apache.flink.table.operations.ShowCatalogsOperation; +import org.apache.flink.table.operations.ShowCurrentCatalogOperation; +import org.apache.flink.table.operations.ShowCurrentDatabaseOperation; import org.apache.flink.table.operations.ShowDatabasesOperation; import org.apache.flink.table.operations.ShowFunctionsOperation; import org.apache.flink.table.operations.ShowTablesOperation; @@ -1007,8 +1009,12 @@ private TableResult executeOperation(Operation operation) { return TableResultImpl.TABLE_RESULT_OK; } else if (operation instanceof ShowCatalogsOperation) { return buildShowResult("catalog name", listCatalogs()); + } else if (operation instanceof ShowCurrentCatalogOperation){ + return buildShowResult("current catalog name", new String[]{catalogManager.getCurrentCatalog()}); } else if (operation instanceof ShowDatabasesOperation) { return buildShowResult("database name", listDatabases()); + } else if (operation instanceof ShowCurrentDatabaseOperation) { + return buildShowResult("current database name", new String[]{catalogManager.getCurrentDatabase()}); } else if (operation instanceof ShowTablesOperation) { return buildShowResult("table name", listTables()); } else if (operation instanceof ShowFunctionsOperation) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentCatalogOperation.java new file mode 100644 index 0000000000000..4e4b70ab29dc6 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentCatalogOperation.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +/** + * Operation to describe a SHOW CURRENT CATALOG statement. + */ +public class ShowCurrentCatalogOperation implements ShowOperation{ + @Override + public String asSummaryString() { + return "SHOW CURRENT CATALOG"; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentDatabaseOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentDatabaseOperation.java new file mode 100644 index 0000000000000..e733f5d98c045 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCurrentDatabaseOperation.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +/** + * Operation to describe SHOW CURRENT DATABASE operation. + */ +public class ShowCurrentDatabaseOperation implements ShowOperation{ + @Override + public String asSummaryString() { + return "SHOW CURRENT DATABASE"; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index fa045392076fa..2538ba5a610b4 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -50,6 +50,8 @@ import org.apache.flink.sql.parser.dml.RichSqlInsert; import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; import org.apache.flink.sql.parser.dql.SqlShowCatalogs; +import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog; +import org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase; import org.apache.flink.sql.parser.dql.SqlShowDatabases; import org.apache.flink.sql.parser.dql.SqlShowFunctions; import org.apache.flink.sql.parser.dql.SqlShowTables; @@ -79,6 +81,8 @@ import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ShowCatalogsOperation; +import org.apache.flink.table.operations.ShowCurrentCatalogOperation; +import org.apache.flink.table.operations.ShowCurrentDatabaseOperation; import org.apache.flink.table.operations.ShowDatabasesOperation; import org.apache.flink.table.operations.ShowFunctionsOperation; import org.apache.flink.table.operations.ShowTablesOperation; @@ -216,8 +220,12 @@ public static Optional convert( return Optional.of(converter.convertDropCatalog((SqlDropCatalog) validated)); } else if (validated instanceof SqlShowCatalogs) { return Optional.of(converter.convertShowCatalogs((SqlShowCatalogs) validated)); + } else if (validated instanceof SqlShowCurrentCatalog){ + return Optional.of(converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) validated)); } else if (validated instanceof SqlShowDatabases) { return Optional.of(converter.convertShowDatabases((SqlShowDatabases) validated)); + } else if (validated instanceof SqlShowCurrentDatabase) { + return Optional.of(converter.convertShowCurrentDatabase((SqlShowCurrentDatabase) validated)); } else if (validated instanceof SqlShowTables) { return Optional.of(converter.convertShowTables((SqlShowTables) validated)); } else if (validated instanceof SqlShowFunctions) { @@ -638,11 +646,21 @@ private Operation convertShowCatalogs(SqlShowCatalogs sqlShowCatalogs) { return new ShowCatalogsOperation(); } + /** Convert SHOW CURRENT CATALOG statement. */ + private Operation convertShowCurrentCatalog(SqlShowCurrentCatalog sqlShowCurrentCatalog) { + return new ShowCurrentCatalogOperation(); + } + /** Convert SHOW DATABASES statement. */ private Operation convertShowDatabases(SqlShowDatabases sqlShowDatabases) { return new ShowDatabasesOperation(); } + /** Convert SHOW CURRENT DATABASE statement. */ + private Operation convertShowCurrentDatabase(SqlShowCurrentDatabase sqlShowCurrentDatabase) { + return new ShowCurrentDatabaseOperation(); + } + /** Convert SHOW TABLES statement. */ private Operation convertShowTables(SqlShowTables sqlShowTables) { return new ShowTablesOperation(); diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala index 395dce337711d..980a50a4fe2a9 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.calcite import org.apache.flink.sql.parser.ExtendedSqlNode -import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews} +import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, SqlShowCurrentCatalog, SqlShowCurrentDatabase, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews} import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader @@ -125,7 +125,9 @@ class FlinkPlannerImpl( || sqlNode.getKind == SqlKind.DROP_FUNCTION || sqlNode.getKind == SqlKind.OTHER_DDL || sqlNode.isInstanceOf[SqlShowCatalogs] + || sqlNode.isInstanceOf[SqlShowCurrentCatalog] || sqlNode.isInstanceOf[SqlShowDatabases] + || sqlNode.isInstanceOf[SqlShowCurrentDatabase] || sqlNode.isInstanceOf[SqlShowTables] || sqlNode.isInstanceOf[SqlShowFunctions] || sqlNode.isInstanceOf[SqlShowViews] diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala index ae721a9ac060e..e1968edf5573d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala @@ -990,17 +990,18 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { } @Test - def testUseCatalog(): Unit = { + def testUseCatalogAndShowCurrentCatalog(): Unit = { tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("cat1")) tableEnv.registerCatalog("cat2", new GenericInMemoryCatalog("cat2")) tableEnv.executeSql("use catalog cat1") assertEquals("cat1", tableEnv.getCurrentCatalog) tableEnv.executeSql("use catalog cat2") assertEquals("cat2", tableEnv.getCurrentCatalog) + assertEquals("cat2", tableEnv.executeSql("show current catalog").collect().next().toString) } @Test - def testUseDatabase(): Unit = { + def testUseDatabaseAndShowCurrentDatabase(): Unit = { val catalog = new GenericInMemoryCatalog("cat1") tableEnv.registerCatalog("cat1", catalog) val catalogDB1 = new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1") @@ -1009,8 +1010,12 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase { catalog.createDatabase("db2", catalogDB2, true) tableEnv.executeSql("use cat1.db1") assertEquals("db1", tableEnv.getCurrentDatabase) + var currentDatabase = tableEnv.executeSql("show current database").collect().next().toString + assertEquals("db1", currentDatabase) tableEnv.executeSql("use db2") assertEquals("db2", tableEnv.getCurrentDatabase) + currentDatabase = tableEnv.executeSql("show current database").collect().next().toString + assertEquals("db2", currentDatabase) } @Test diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java index a0239c495a0e5..9d3ab4dfcbd99 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java @@ -38,6 +38,8 @@ import org.apache.flink.sql.parser.dml.RichSqlInsert; import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; import org.apache.flink.sql.parser.dql.SqlShowCatalogs; +import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog; +import org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase; import org.apache.flink.sql.parser.dql.SqlShowDatabases; import org.apache.flink.sql.parser.dql.SqlShowFunctions; import org.apache.flink.sql.parser.dql.SqlShowTables; @@ -67,6 +69,8 @@ import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.PlannerQueryOperation; import org.apache.flink.table.operations.ShowCatalogsOperation; +import org.apache.flink.table.operations.ShowCurrentCatalogOperation; +import org.apache.flink.table.operations.ShowCurrentDatabaseOperation; import org.apache.flink.table.operations.ShowDatabasesOperation; import org.apache.flink.table.operations.ShowFunctionsOperation; import org.apache.flink.table.operations.ShowTablesOperation; @@ -181,8 +185,12 @@ public static Optional convert( return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated)); } else if (validated instanceof SqlShowCatalogs) { return Optional.of(converter.convertShowCatalogs((SqlShowCatalogs) validated)); + } else if (validated instanceof SqlShowCurrentCatalog) { + return Optional.of(converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) validated)); } else if (validated instanceof SqlShowDatabases) { return Optional.of(converter.convertShowDatabases((SqlShowDatabases) validated)); + } else if (validated instanceof SqlShowCurrentDatabase) { + return Optional.of(converter.convertShowCurrentDatabase((SqlShowCurrentDatabase) validated)); } else if (validated instanceof SqlShowTables) { return Optional.of(converter.convertShowTables((SqlShowTables) validated)); } else if (validated instanceof SqlShowFunctions) { @@ -491,11 +499,21 @@ private Operation convertShowCatalogs(SqlShowCatalogs sqlShowCatalogs) { return new ShowCatalogsOperation(); } + /** Convert SHOW CURRENT CATALOG statement. */ + private Operation convertShowCurrentCatalog(SqlShowCurrentCatalog sqlShowCurrentCatalog) { + return new ShowCurrentCatalogOperation(); + } + /** Convert SHOW DATABASES statement. */ private Operation convertShowDatabases(SqlShowDatabases sqlShowDatabases) { return new ShowDatabasesOperation(); } + /** Convert SHOW CURRENT DATABASE statement. */ + private Operation convertShowCurrentDatabase(SqlShowCurrentDatabase sqlShowCurrentDatabase) { + return new ShowCurrentDatabaseOperation(); + } + /** Convert SHOW TABLES statement. */ private Operation convertShowTables(SqlShowTables sqlShowTables) { return new ShowTablesOperation(); diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index d12bf55ab489f..226b6202ec03d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -743,8 +743,12 @@ abstract class TableEnvImpl( TableResultImpl.TABLE_RESULT_OK case _: ShowCatalogsOperation => buildShowResult("catalog name", listCatalogs()) + case _: ShowCurrentCatalogOperation => + buildShowResult("current catalog name", Array(catalogManager.getCurrentCatalog)) case _: ShowDatabasesOperation => buildShowResult("database name", listDatabases()) + case _: ShowCurrentDatabaseOperation => + buildShowResult("current database name", Array(catalogManager.getCurrentDatabase)) case _: ShowTablesOperation => buildShowResult("table name", listTables()) case _: ShowFunctionsOperation => diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala index 4ac4d0905135d..62b79781beb79 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.calcite import org.apache.flink.sql.parser.ExtendedSqlNode -import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews} +import org.apache.flink.sql.parser.dql.{SqlRichDescribeTable, SqlShowCatalogs, SqlShowCurrentCatalog, SqlShowCurrentDatabase, SqlShowDatabases, SqlShowFunctions, SqlShowTables, SqlShowViews} import org.apache.flink.table.api.{TableException, ValidationException} import org.apache.flink.table.catalog.CatalogReader @@ -123,7 +123,9 @@ class FlinkPlannerImpl( || sqlNode.getKind == SqlKind.DROP_FUNCTION || sqlNode.getKind == SqlKind.OTHER_DDL || sqlNode.isInstanceOf[SqlShowCatalogs] + || sqlNode.isInstanceOf[SqlShowCurrentCatalog] || sqlNode.isInstanceOf[SqlShowDatabases] + || sqlNode.isInstanceOf[SqlShowCurrentDatabase] || sqlNode.isInstanceOf[SqlShowTables] || sqlNode.isInstanceOf[SqlShowFunctions] || sqlNode.isInstanceOf[SqlShowViews] diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala index 2c31b9ac73a79..9afe02eb46e8a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/BatchTableEnvironmentTest.scala @@ -187,17 +187,19 @@ class BatchTableEnvironmentTest extends TableTestBase { } @Test - def testExecuteSqlWithUseCatalog(): Unit = { + def testExecuteSqlWithUseCatalogAndShowCurrentCatalog(): Unit = { val util = batchTestUtil() util.tableEnv.registerCatalog("my_catalog", new GenericInMemoryCatalog("my_catalog")) assertEquals("default_catalog", util.tableEnv.getCurrentCatalog) val tableResult2 = util.tableEnv.executeSql("USE CATALOG my_catalog") assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind) assertEquals("my_catalog", util.tableEnv.getCurrentCatalog) + val tableResult3 = util.tableEnv.executeSql("SHOW CURRENT CATALOG") + assertEquals("my_catalog", tableResult3.collect().next().toString) } @Test - def testExecuteSqlWithUseDatabase(): Unit = { + def testExecuteSqlWithUseDatabaseAndShowCurrentDatabase(): Unit = { val util = batchTestUtil() val tableResult1 = util.tableEnv.executeSql("CREATE DATABASE db1 COMMENT 'db1_comment'") assertEquals(ResultKind.SUCCESS, tableResult1.getResultKind) @@ -208,6 +210,9 @@ class BatchTableEnvironmentTest extends TableTestBase { val tableResult2 = util.tableEnv.executeSql("USE db1") assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind) assertEquals("db1", util.tableEnv.getCurrentDatabase) + + val tableResult3 = util.tableEnv.executeSql("SHOW CURRENT DATABASE") + assertEquals("db1", tableResult3.collect().next().toString) } @Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala index ce50b3301fe67..d78d04fd34c12 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala @@ -634,17 +634,19 @@ class CatalogTableITCase(isStreaming: Boolean) extends AbstractTestBase { } @Test - def testUseCatalog(): Unit = { + def testUseCatalogAndShowCurrentCatalog(): Unit = { tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("cat1")) tableEnv.registerCatalog("cat2", new GenericInMemoryCatalog("cat2")) tableEnv.sqlUpdate("use catalog cat1") assertEquals("cat1", tableEnv.getCurrentCatalog) + assertEquals("cat1", tableEnv.executeSql("show current catalog").collect().next().toString) tableEnv.sqlUpdate("use catalog cat2") assertEquals("cat2", tableEnv.getCurrentCatalog) + assertEquals("cat2", tableEnv.executeSql("show current catalog").collect().next().toString) } @Test - def testUseDatabase(): Unit = { + def testUseDatabaseShowCurrentDatabase(): Unit = { val catalog = new GenericInMemoryCatalog("cat1") tableEnv.registerCatalog("cat1", catalog) val catalogDB1 = new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1") @@ -653,8 +655,10 @@ class CatalogTableITCase(isStreaming: Boolean) extends AbstractTestBase { catalog.createDatabase("db2", catalogDB2, true) tableEnv.sqlUpdate("use cat1.db1") assertEquals("db1", tableEnv.getCurrentDatabase) + assertEquals("db1", tableEnv.executeSql("show current database").collect().next().toString) tableEnv.sqlUpdate("use db2") assertEquals("db2", tableEnv.getCurrentDatabase) + assertEquals("db2", tableEnv.executeSql("show current database").collect().next().toString) } @Test