From 02a8c41d83e3ff500f4cd2f474d8efe063222a71 Mon Sep 17 00:00:00 2001 From: zjuwangg Date: Thu, 21 Nov 2019 17:03:59 +0800 Subject: [PATCH] [FLINK-14691][table] Support create/drop/alter/use database in TableEnvironment This closes #10296 --- .../flink/table/catalog/hive/HiveCatalog.java | 6 +- .../client/HiveMetastoreClientWrapper.java | 5 + .../src/main/codegen/includes/parserImpls.ftl | 8 +- .../sql/parser/ddl/SqlAlterDatabase.java | 2 +- .../sql/parser/ddl/SqlCreateDatabase.java | 2 +- .../flink/sql/parser/ddl/SqlDropDatabase.java | 26 ++--- .../flink/sql/parser/ddl/SqlUseDatabase.java | 3 +- .../api/internal/TableEnvironmentImpl.java | 75 ++++++++++++- .../table/catalog/GenericInMemoryCatalog.java | 23 +++- .../operations/UseDatabaseOperation.java | 46 ++++++++ .../ddl/AlterDatabaseOperation.java | 71 ++++++++++++ .../table/operations/ddl/AlterOperation.java | 32 ++++++ .../ddl/CreateDatabaseOperation.java | 78 +++++++++++++ .../operations/ddl/DropDatabaseOperation.java | 61 +++++++++++ .../catalog/GenericInMemoryCatalogTest.java | 4 +- .../apache/flink/table/catalog/Catalog.java | 22 +++- .../flink/table/catalog/CatalogTest.java | 14 +-- .../operations/SqlToOperationConverter.java | 94 ++++++++++++++++ .../SqlToOperationConverterTest.java | 103 +++++++++++++++++- .../planner/catalog/CatalogTableITCase.scala | 103 +++++++++++++++++- .../sqlexec/SqlToOperationConverter.java | 95 ++++++++++++++++ .../table/api/internal/TableEnvImpl.scala | 72 ++++++++++-- .../sqlexec/SqlToOperationConverterTest.java | 103 +++++++++++++++++- .../table/catalog/CatalogTableITCase.scala | 103 +++++++++++++++++- 24 files changed, 1089 insertions(+), 62 deletions(-) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseDatabaseOperation.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterOperation.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateDatabaseOperation.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropDatabaseOperation.java diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 1c5fd0c496f7e..101d0cdee79d0 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -302,10 +302,10 @@ public boolean databaseExists(String databaseName) throws CatalogException { } @Override - public void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, - DatabaseNotEmptyException, CatalogException { + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { try { - client.dropDatabase(name, true, ignoreIfNotExists); + client.dropDatabase(name, true, ignoreIfNotExists, cascade); } catch (NoSuchObjectException e) { if (!ignoreIfNotExists) { throw new DatabaseNotExistException(getName(), name); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java index 1e4bfca81d166..7bca36ec87d70 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java @@ -149,6 +149,11 @@ public void dropDatabase(String name, boolean deleteData, boolean ignoreIfNotExi client.dropDatabase(name, deleteData, ignoreIfNotExists); } + public void dropDatabase(String name, boolean deleteData, boolean ignoreIfNotExists, boolean cascade) + throws NoSuchObjectException, InvalidOperationException, MetaException, TException { + client.dropDatabase(name, deleteData, ignoreIfNotExists, cascade); + } + public void alterDatabase(String name, Database database) throws NoSuchObjectException, MetaException, TException { client.alterDatabase(name, database); } diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index 5180c8cfdfe90..7b01efc61ef86 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -138,7 +138,7 @@ SqlDrop SqlDropDatabase(Span s, boolean replace) : { SqlIdentifier databaseName = null; boolean ifExists = false; - boolean isRestrict = true; + boolean cascade = false; } { @@ -151,13 +151,13 @@ SqlDrop SqlDropDatabase(Span s, boolean replace) : databaseName = CompoundIdentifier() [ - { isRestrict = true; } + { cascade = false; } | - { isRestrict = false; } + { cascade = true; } ] { - return new SqlDropDatabase(s.pos(), databaseName, ifExists, isRestrict); + return new SqlDropDatabase(s.pos(), databaseName, ifExists, cascade); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterDatabase.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterDatabase.java index 6ed0dcea7e5e7..bf74eaa814e02 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterDatabase.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterDatabase.java @@ -38,7 +38,7 @@ */ public class SqlAlterDatabase extends SqlCall { - public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("ALTER DATABASE", SqlKind.OTHER); + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("ALTER DATABASE", SqlKind.OTHER_DDL); private final SqlIdentifier databaseName; diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateDatabase.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateDatabase.java index 87a0686b3b459..9c423bd0a9ea3 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateDatabase.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateDatabase.java @@ -45,7 +45,7 @@ */ public class SqlCreateDatabase extends SqlCreate implements ExtendedSqlNode { - public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE DATABASE", SqlKind.OTHER); + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE DATABASE", SqlKind.OTHER_DDL); private final SqlIdentifier databaseName; diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropDatabase.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropDatabase.java index d9a079283d4d9..703215bbf541e 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropDatabase.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropDatabase.java @@ -37,20 +37,20 @@ */ public class SqlDropDatabase extends SqlDrop implements ExtendedSqlNode { private static final SqlOperator OPERATOR = - new SqlSpecialOperator("DROP DATABASE", SqlKind.OTHER); + new SqlSpecialOperator("DROP DATABASE", SqlKind.OTHER_DDL); - private SqlIdentifier databaseName; - private boolean ifExists; - private boolean isRestrict = true; + private final SqlIdentifier databaseName; + private final boolean ifExists; + private final boolean isCascade; public SqlDropDatabase(SqlParserPos pos, SqlIdentifier databaseName, boolean ifExists, - boolean isRestrict) { + boolean isCascade) { super(OPERATOR, pos, ifExists); this.databaseName = databaseName; this.ifExists = ifExists; - this.isRestrict = isRestrict; + this.isCascade = isCascade; } @Override @@ -62,16 +62,12 @@ public SqlIdentifier getDatabaseName() { return databaseName; } - public void setDatabaseName(SqlIdentifier viewName) { - this.databaseName = viewName; - } - public boolean getIfExists() { return this.ifExists; } - public void setIfExists(boolean ifExists) { - this.ifExists = ifExists; + public boolean isCascade() { + return isCascade; } @Override @@ -82,10 +78,10 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("IF EXISTS"); } databaseName.unparse(writer, leftPrec, rightPrec); - if (isRestrict) { - writer.keyword("RESTRICT"); - } else { + if (isCascade) { writer.keyword("CASCADE"); + } else { + writer.keyword("RESTRICT"); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseDatabase.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseDatabase.java index 724b619c2682a..7f8c8e23089f6 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseDatabase.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlUseDatabase.java @@ -35,13 +35,12 @@ */ public class SqlUseDatabase extends SqlCall { - public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE DATABASE", SqlKind.OTHER); + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE DATABASE", SqlKind.OTHER_DDL); private final SqlIdentifier databaseName; public SqlUseDatabase(SqlParserPos pos, SqlIdentifier databaseName) { super(pos); this.databaseName = databaseName; - } @Override diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index cd736f324f538..9ee27a3ff378e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -39,6 +39,9 @@ import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.QueryOperationCatalogView; import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.ExecutorFactory; import org.apache.flink.table.delegation.Parser; @@ -59,7 +62,11 @@ import org.apache.flink.table.operations.QueryOperation; import org.apache.flink.table.operations.TableSourceQueryOperation; import org.apache.flink.table.operations.UseCatalogOperation; +import org.apache.flink.table.operations.UseDatabaseOperation; +import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; +import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.DropDatabaseOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.operations.utils.OperationTreeBuilder; import org.apache.flink.table.sinks.TableSink; @@ -94,6 +101,10 @@ public class TableEnvironmentImpl implements TableEnvironment { protected final FunctionCatalog functionCatalog; protected final Planner planner; protected final Parser parser; + private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG = + "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " + + "INSERT, CREATE TABLE, DROP TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " + + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE"; /** * Provides necessary methods for {@link ConnectTableDescriptor}. @@ -449,9 +460,7 @@ public void sqlUpdate(String stmt) { List operations = parser.parse(stmt); if (operations.size() != 1) { - throw new TableException( - "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " + - "INSERT, CREATE TABLE, DROP TABLE, USE CATALOG"); + throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG); } Operation operation = operations.get(0); @@ -469,21 +478,75 @@ public void sqlUpdate(String stmt) { createTableOperation.getCatalogTable(), createTableOperation.getTableIdentifier(), createTableOperation.isIgnoreIfExists()); + } else if (operation instanceof CreateDatabaseOperation) { + CreateDatabaseOperation createDatabaseOperation = (CreateDatabaseOperation) operation; + Catalog catalog = getCatalogOrThrowException(createDatabaseOperation.getCatalogName()); + String exMsg = getDDLOpExecuteErrorMsg(createDatabaseOperation.asSummaryString()); + try { + catalog.createDatabase( + createDatabaseOperation.getDatabaseName(), + createDatabaseOperation.getCatalogDatabase(), + createDatabaseOperation.isIgnoreIfExists()); + } catch (DatabaseAlreadyExistException e) { + throw new ValidationException(exMsg, e); + } catch (Exception e) { + throw new TableException(exMsg, e); + } } else if (operation instanceof DropTableOperation) { DropTableOperation dropTableOperation = (DropTableOperation) operation; catalogManager.dropTable( dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists()); + } else if (operation instanceof DropDatabaseOperation) { + DropDatabaseOperation dropDatabaseOperation = (DropDatabaseOperation) operation; + Catalog catalog = getCatalogOrThrowException(dropDatabaseOperation.getCatalogName()); + String exMsg = getDDLOpExecuteErrorMsg(dropDatabaseOperation.asSummaryString()); + try { + catalog.dropDatabase( + dropDatabaseOperation.getDatabaseName(), + dropDatabaseOperation.isIfExists(), + dropDatabaseOperation.isCascade()); + } catch (DatabaseNotExistException | DatabaseNotEmptyException e) { + throw new ValidationException(exMsg, e); + } catch (Exception e) { + throw new TableException(exMsg, e); + } + } else if (operation instanceof AlterDatabaseOperation) { + AlterDatabaseOperation alterDatabaseOperation = (AlterDatabaseOperation) operation; + Catalog catalog = getCatalogOrThrowException(alterDatabaseOperation.getCatalogName()); + String exMsg = getDDLOpExecuteErrorMsg(alterDatabaseOperation.asSummaryString()); + try { + catalog.alterDatabase( + alterDatabaseOperation.getDatabaseName(), + alterDatabaseOperation.getCatalogDatabase(), + false); + } catch (DatabaseNotExistException e) { + throw new ValidationException(exMsg, e); + } catch (Exception e) { + throw new TableException(exMsg, e); + } } else if (operation instanceof UseCatalogOperation) { UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation; catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName()); + } else if (operation instanceof UseDatabaseOperation) { + UseDatabaseOperation useDatabaseOperation = (UseDatabaseOperation) operation; + catalogManager.setCurrentCatalog(useDatabaseOperation.getCatalogName()); + catalogManager.setCurrentDatabase(useDatabaseOperation.getDatabaseName()); } else { - throw new TableException( - "Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of " + - "type INSERT, CREATE TABLE, DROP TABLE, USE CATALOG"); + throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG); } } + /** Get catalog from catalogName or throw a ValidationException if the catalog not exists. */ + private Catalog getCatalogOrThrowException(String catalogName) { + return getCatalog(catalogName) + .orElseThrow(() -> new ValidationException(String.format("Catalog %s does not exist.", catalogName))); + } + + private String getDDLOpExecuteErrorMsg(String action) { + return String.format("Could not execute %s ", action); + } + @Override public String getCurrentCatalog() { return catalogManager.getCurrentCatalog(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java index 82cd997c5c13e..29e87a5bad4d3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java @@ -107,7 +107,7 @@ public void createDatabase(String databaseName, CatalogDatabase db, boolean igno } @Override - public void dropDatabase(String databaseName, boolean ignoreIfNotExists) + public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotExistException, DatabaseNotEmptyException { checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName)); @@ -116,6 +116,27 @@ public void dropDatabase(String databaseName, boolean ignoreIfNotExists) // Make sure the database is empty if (isDatabaseEmpty(databaseName)) { databases.remove(databaseName); + } else if (cascade) { + // delete all tables in this database and then delete the database. + List deleteTablePaths = tables.keySet().stream() + .filter(op -> op.getDatabaseName().equals(databaseName)).collect(Collectors.toList()); + deleteTablePaths.forEach(objectPath -> { + try { + dropTable(objectPath, true); + } catch (TableNotExistException e) { + //ignore + } + }); + List deleteFunctionPaths = functions.keySet().stream() + .filter(op -> op.getDatabaseName().equals(databaseName)).collect(Collectors.toList()); + deleteFunctionPaths.forEach(objectPath -> { + try { + dropFunction(objectPath, true); + } catch (FunctionNotExistException e) { + //ignore + } + }); + databases.remove(databaseName); } else { throw new DatabaseNotEmptyException(getName(), databaseName); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseDatabaseOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseDatabaseOperation.java new file mode 100644 index 0000000000000..cbf08ccb507e4 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/UseDatabaseOperation.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +/** + * Operation to describe a USE [catalogName.]dataBaseName statement. + */ +public class UseDatabaseOperation implements UseOperation { + + private final String catalogName; + private final String databaseName; + + public UseDatabaseOperation(String catalogName, String databaseName) { + this.catalogName = catalogName; + this.databaseName = databaseName; + } + + public String getCatalogName() { + return catalogName; + } + + public String getDatabaseName() { + return databaseName; + } + + @Override + public String asSummaryString() { + return String.format("USE %s.%s", catalogName, databaseName); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java new file mode 100644 index 0000000000000..081f95b6bc6e1 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterDatabaseOperation.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Operation to describe a ALTER DATABASE statement. + */ +public class AlterDatabaseOperation implements AlterOperation { + private final String catalogName; + private final String databaseName; + private final CatalogDatabase catalogDatabase; + + public AlterDatabaseOperation( + String catalogName, + String databaseName, + CatalogDatabase catalogDatabase) { + this.catalogName = catalogName; + this.databaseName = databaseName; + this.catalogDatabase = catalogDatabase; + } + + public String getCatalogName() { + return catalogName; + } + + public String getDatabaseName() { + return databaseName; + } + + public CatalogDatabase getCatalogDatabase() { + return catalogDatabase; + } + + @Override + public String asSummaryString() { + Map params = new LinkedHashMap<>(); + params.put("alterDatabase", catalogDatabase.getProperties()); + params.put("catalogName", catalogName); + params.put("databaseName", databaseName); + + return OperationUtils.formatWithChildren( + "ALTER DATABASE", + params, + Collections.emptyList(), + Operation::asSummaryString); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterOperation.java new file mode 100644 index 0000000000000..ec498384d3a55 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/AlterOperation.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.operations.Operation; + +/** + * A {@link Operation} that describes the DDL statements, e.g. ALTER TABLE or ALTER DATABASE. + * + *

Different sub operations can have their special target name. For example, a alter table + * operation may have a target table name and a flag to describe if is exists. + */ +@Internal +public interface AlterOperation extends Operation { +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateDatabaseOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateDatabaseOperation.java new file mode 100644 index 0000000000000..ff50d5a5ddce3 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateDatabaseOperation.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Operation to describe a CREATE DATABASE statement. + */ +public class CreateDatabaseOperation implements CreateOperation { + private final String catalogName; + private final String databaseName; + private final CatalogDatabase catalogDatabase; + private final boolean ignoreIfExists; + + public CreateDatabaseOperation( + String catalogName, + String databaseName, + CatalogDatabase catalogDatabase, boolean ignoreIfExists) { + this.catalogName = catalogName; + this.databaseName = databaseName; + this.catalogDatabase = catalogDatabase; + this.ignoreIfExists = ignoreIfExists; + } + + public String getCatalogName() { + return catalogName; + } + + public String getDatabaseName() { + return databaseName; + } + + public CatalogDatabase getCatalogDatabase() { + return catalogDatabase; + } + + public boolean isIgnoreIfExists() { + return ignoreIfExists; + } + + @Override + public String asSummaryString() { + Map params = new LinkedHashMap<>(); + params.put("catalogDatabase", catalogDatabase.getProperties()); + params.put("catalogName", catalogName); + params.put("databaseName", databaseName); + params.put("ignoreIfExists", ignoreIfExists); + + return OperationUtils.formatWithChildren( + "CREATE DATABASE", + params, + Collections.emptyList(), + Operation::asSummaryString); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropDatabaseOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropDatabaseOperation.java new file mode 100644 index 0000000000000..dfa4e659bf2c4 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropDatabaseOperation.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.ddl; + +/** + * Operation to describe a DROP DATABASE statement. + */ +public class DropDatabaseOperation implements DropOperation { + private final String catalogName; + private final String databaseName; + private final boolean ifExists; + private final boolean cascade; + + public DropDatabaseOperation(String catalogName, String databaseName, boolean ifExists, boolean cascade) { + this.catalogName = catalogName; + this.databaseName = databaseName; + this.ifExists = ifExists; + this.cascade = cascade; + } + + public String getCatalogName() { + return catalogName; + } + + public String getDatabaseName() { + return databaseName; + } + + public boolean isCascade() { + return cascade; + } + + public boolean isIfExists() { + return ifExists; + } + + @Override + public String asSummaryString() { + StringBuilder summaryString = new StringBuilder("DROP DATABASE"); + summaryString.append(ifExists ? " IF EXISTS " : ""); + summaryString.append(" " + catalogName + "." + databaseName); + summaryString.append(cascade ? " CASCADE" : " RESTRICT"); + return summaryString.toString(); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java index ac14039e9a044..aa3baab5f1832 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/GenericInMemoryCatalogTest.java @@ -125,9 +125,9 @@ public void testStatistics() throws Exception { // Clean up catalog.dropTable(path1, false); - catalog.dropDatabase(db1, false); + catalog.dropDatabase(db1, false, false); catalog.dropTable(path2, false); - catalog.dropDatabase(db2, false); + catalog.dropDatabase(db2, false, false); } // ------ utilities ------ diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index a43d09105b219..a02f2af3ee468 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -144,7 +144,27 @@ void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExist * @throws DatabaseNotExistException if the given database does not exist * @throws CatalogException in case of any runtime exception */ - void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, + default void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException, + DatabaseNotEmptyException, CatalogException{ + dropDatabase(name, ignoreIfNotExists, false); + } + + /** + * Drop a database. + * + * @param name Name of the database to be dropped. + * @param ignoreIfNotExists Flag to specify behavior when the database does not exist: + * if set to false, throw an exception, + * if set to true, do nothing. + * @param cascade Flag to specify behavior when the database contains table or function: + * if set to true, delete all tables and functions in the database and then delete the + * database, + * if set to false, throw an exception. + * @throws DatabaseNotExistException if the given database does not exist + * @throws DatabaseNotEmptyException if the given database is not empty and isRestrict is true + * @throws CatalogException in case of any runtime exception + */ + void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException; /** diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java index 5ca0c113ed3e1..64cb2cecc3a72 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java @@ -101,10 +101,10 @@ public void cleanup() throws Exception { catalog.dropFunction(path1, true); } if (catalog.databaseExists(db1)) { - catalog.dropDatabase(db1, true); + catalog.dropDatabase(db1, true, false); } if (catalog.databaseExists(db2)) { - catalog.dropDatabase(db2, true); + catalog.dropDatabase(db2, true, false); } } @@ -167,7 +167,7 @@ public void testDropDb() throws Exception { assertTrue(catalog.databaseExists(db1)); - catalog.dropDatabase(db1, false); + catalog.dropDatabase(db1, false, true); assertFalse(catalog.databaseExists(db1)); } @@ -176,12 +176,12 @@ public void testDropDb() throws Exception { public void testDropDb_DatabaseNotExistException() throws Exception { exception.expect(DatabaseNotExistException.class); exception.expectMessage("Database db1 does not exist in Catalog"); - catalog.dropDatabase(db1, false); + catalog.dropDatabase(db1, false, false); } @Test public void testDropDb_DatabaseNotExist_Ignore() throws Exception { - catalog.dropDatabase(db1, true); + catalog.dropDatabase(db1, true, false); } @Test @@ -191,7 +191,7 @@ public void testDropDb_DatabaseNotEmptyException() throws Exception { exception.expect(DatabaseNotEmptyException.class); exception.expectMessage("Database db1 in catalog test-catalog is not empty"); - catalog.dropDatabase(db1, true); + catalog.dropDatabase(db1, true, false); } @Test @@ -729,7 +729,7 @@ public void testDropFunction_FunctionNotExistException() throws Exception { public void testDropFunction_FunctionNotExist_ignored() throws Exception { catalog.createDatabase(db1, createDb(), false); catalog.dropFunction(nonExistObjectPath, true); - catalog.dropDatabase(db1, false); + catalog.dropDatabase(db1, false, false); } // ------ partitions ------ diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index dd7701d7bd508..5a002eea0d1c5 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -18,23 +18,35 @@ package org.apache.flink.table.planner.operations; +import org.apache.flink.sql.parser.ddl.SqlAlterDatabase; +import org.apache.flink.sql.parser.ddl.SqlCreateDatabase; import org.apache.flink.sql.parser.ddl.SqlCreateTable; +import org.apache.flink.sql.parser.ddl.SqlDropDatabase; import org.apache.flink.sql.parser.ddl.SqlDropTable; import org.apache.flink.sql.parser.ddl.SqlTableColumn; import org.apache.flink.sql.parser.ddl.SqlTableOption; import org.apache.flink.sql.parser.ddl.SqlUseCatalog; +import org.apache.flink.sql.parser.ddl.SqlUseDatabase; import org.apache.flink.sql.parser.dml.RichSqlInsert; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.UseCatalogOperation; +import org.apache.flink.table.operations.UseDatabaseOperation; +import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; +import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.DropDatabaseOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; @@ -104,6 +116,14 @@ public static Optional convert( return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated)); } else if (validated instanceof SqlUseCatalog) { return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated)); + } else if (validated instanceof SqlUseDatabase) { + return Optional.of(converter.convertUseDatabase((SqlUseDatabase) validated)); + } else if (validated instanceof SqlCreateDatabase) { + return Optional.of(converter.convertCreateDatabase((SqlCreateDatabase) validated)); + } else if (validated instanceof SqlDropDatabase) { + return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated)); + } else if (validated instanceof SqlAlterDatabase) { + return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated)); } else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } else { @@ -186,6 +206,80 @@ private Operation convertUseCatalog(SqlUseCatalog useCatalog) { return new UseCatalogOperation(useCatalog.getCatalogName()); } + /** Convert use database statement. */ + private Operation convertUseDatabase(SqlUseDatabase useDatabase) { + String[] fullDatabaseName = useDatabase.fullDatabaseName(); + if (fullDatabaseName.length > 2) { + throw new SqlConversionException("use database identifier format error"); + } + String catalogName = fullDatabaseName.length == 2 ? fullDatabaseName[0] : catalogManager.getCurrentCatalog(); + String databaseName = fullDatabaseName.length == 2 ? fullDatabaseName[1] : fullDatabaseName[0]; + return new UseDatabaseOperation(catalogName, databaseName); + } + + /** Convert CREATE DATABASE statement. */ + private Operation convertCreateDatabase(SqlCreateDatabase sqlCreateDatabase) { + String[] fullDatabaseName = sqlCreateDatabase.fullDatabaseName(); + if (fullDatabaseName.length > 2) { + throw new SqlConversionException("create database identifier format error"); + } + String catalogName = (fullDatabaseName.length == 1) ? catalogManager.getCurrentCatalog() : fullDatabaseName[0]; + String databaseName = (fullDatabaseName.length == 1) ? fullDatabaseName[0] : fullDatabaseName[1]; + boolean ignoreIfExists = sqlCreateDatabase.isIfNotExists(); + String databaseComment = sqlCreateDatabase.getComment() + .map(comment -> comment.getNlsString().getValue()).orElse(null); + // set with properties + Map properties = new HashMap<>(); + sqlCreateDatabase.getPropertyList().getList().forEach(p -> + properties.put(((SqlTableOption) p).getKeyString().toLowerCase(), + ((SqlTableOption) p).getValueString())); + CatalogDatabase catalogDatabase = new CatalogDatabaseImpl(properties, databaseComment); + return new CreateDatabaseOperation(catalogName, databaseName, catalogDatabase, ignoreIfExists); + } + + /** Convert DROP DATABASE statement. */ + private Operation convertDropDatabase(SqlDropDatabase sqlDropDatabase) { + String[] fullDatabaseName = sqlDropDatabase.fullDatabaseName(); + if (fullDatabaseName.length > 2) { + throw new SqlConversionException("drop database identifier format error"); + } + String catalogName = (fullDatabaseName.length == 1) ? catalogManager.getCurrentCatalog() : fullDatabaseName[0]; + String databaseName = (fullDatabaseName.length == 1) ? fullDatabaseName[0] : fullDatabaseName[1]; + return new DropDatabaseOperation( + catalogName, + databaseName, + sqlDropDatabase.getIfExists(), + sqlDropDatabase.isCascade()); + } + + /** Convert ALTER DATABASE statement. */ + private Operation convertAlterDatabase(SqlAlterDatabase sqlAlterDatabase) { + String[] fullDatabaseName = sqlAlterDatabase.fullDatabaseName(); + if (fullDatabaseName.length > 2) { + throw new SqlConversionException("alter database identifier format error"); + } + String catalogName = (fullDatabaseName.length == 1) ? catalogManager.getCurrentCatalog() : fullDatabaseName[0]; + String databaseName = (fullDatabaseName.length == 1) ? fullDatabaseName[0] : fullDatabaseName[1]; + Map properties = new HashMap<>(); + CatalogDatabase originCatalogDatabase; + Optional catalog = catalogManager.getCatalog(catalogName); + if (catalog.isPresent()) { + try { + originCatalogDatabase = catalog.get().getDatabase(databaseName); + properties.putAll(originCatalogDatabase.getProperties()); + } catch (DatabaseNotExistException e) { + throw new SqlConversionException(String.format("Database %s not exists", databaseName), e); + } + } else { + throw new SqlConversionException(String.format("Catalog %s not exists", catalogName)); + } + // set with properties + sqlAlterDatabase.getPropertyList().getList().forEach(p -> + properties.put(((SqlTableOption) p).getKeyString().toLowerCase(), ((SqlTableOption) p).getValueString())); + CatalogDatabase catalogDatabase = new CatalogDatabaseImpl(properties, originCatalogDatabase.getComment()); + return new AlterDatabaseOperation(catalogName, databaseName, catalogDatabase); + } + /** Fallback method for sql query. */ private Operation convertSqlQuery(SqlNode node) { return toQueryOperation(flinkPlanner, node); diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java index 597a8a8edbde8..ea2e966d45860 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java @@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogFunction; import org.apache.flink.table.catalog.CatalogFunctionImpl; import org.apache.flink.table.catalog.CatalogManager; @@ -42,7 +43,11 @@ import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.UseCatalogOperation; +import org.apache.flink.table.operations.UseDatabaseOperation; +import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; +import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.DropDatabaseOperation; import org.apache.flink.table.planner.calcite.CalciteParser; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema; @@ -118,13 +123,98 @@ public void after() throws TableNotExistException { @Test public void testUseCatalog() { final String sql = "USE CATALOG cat1"; - FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); - final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); - Operation operation = parse(sql, planner, parser); + Operation operation = parse(sql, SqlDialect.DEFAULT); assert operation instanceof UseCatalogOperation; assertEquals("cat1", ((UseCatalogOperation) operation).getCatalogName()); } + @Test + public void testUseDatabase() { + final String sql1 = "USE db1"; + Operation operation1 = parse(sql1, SqlDialect.DEFAULT); + assert operation1 instanceof UseDatabaseOperation; + assertEquals("builtin", ((UseDatabaseOperation) operation1).getCatalogName()); + assertEquals("db1", ((UseDatabaseOperation) operation1).getDatabaseName()); + + final String sql2 = "USE cat1.db1"; + Operation operation2 = parse(sql2, SqlDialect.DEFAULT); + assert operation2 instanceof UseDatabaseOperation; + assertEquals("cat1", ((UseDatabaseOperation) operation2).getCatalogName()); + assertEquals("db1", ((UseDatabaseOperation) operation2).getDatabaseName()); + } + + @Test(expected = SqlConversionException.class) + public void testUseDatabaseWithException() { + final String sql = "USE cat1.db1.tbl1"; + Operation operation = parse(sql, SqlDialect.DEFAULT); + } + + @Test + public void testCreateDatabase() { + final String[] createDatabaseSqls = new String[] { + "create database db1", + "create database if not exists cat1.db1", + "create database cat1.db1 comment 'db1_comment'", + "create database cat1.db1 comment 'db1_comment' with ('k1' = 'v1', 'k2' = 'v2')" + }; + final String[] expectedCatalogs = new String[] {"builtin", "cat1", "cat1", "cat1"}; + final String expectedDatabase = "db1"; + final String[] expectedComments = new String[] {null, null, "db1_comment", "db1_comment"}; + final boolean[] expectedIgnoreIfExists = new boolean[] {false, true, false, false}; + final int[] expectedPropertySize = new int[] {0, 0, 0, 2}; + + for (int i = 0; i < createDatabaseSqls.length; i++) { + Operation operation = parse(createDatabaseSqls[i], SqlDialect.DEFAULT); + assert operation instanceof CreateDatabaseOperation; + final CreateDatabaseOperation createDatabaseOperation = (CreateDatabaseOperation) operation; + assertEquals(expectedCatalogs[i], createDatabaseOperation.getCatalogName()); + assertEquals(expectedDatabase, createDatabaseOperation.getDatabaseName()); + assertEquals(expectedComments[i], createDatabaseOperation.getCatalogDatabase().getComment()); + assertEquals(expectedIgnoreIfExists[i], createDatabaseOperation.isIgnoreIfExists()); + assertEquals(expectedPropertySize[i], createDatabaseOperation.getCatalogDatabase().getProperties().size()); + } + } + + @Test + public void testDropDatabase() { + final String[] dropDatabaseSqls = new String[] { + "drop database db1", + "drop database if exists db1", + "drop database if exists cat1.db1 CASCADE", + "drop database if exists cat1.db1 RESTRICT" + }; + final String[] expectedCatalogs = new String[] {"builtin", "builtin", "cat1", "cat1"}; + final String expectedDatabase = "db1"; + final boolean[] expectedIfExists = new boolean[] {false, true, true, true}; + final boolean[] expectedIsCascades = new boolean[] {false, false, true, false}; + + for (int i = 0; i < dropDatabaseSqls.length; i++) { + Operation operation = parse(dropDatabaseSqls[i], SqlDialect.DEFAULT); + assert operation instanceof DropDatabaseOperation; + final DropDatabaseOperation dropDatabaseOperation = (DropDatabaseOperation) operation; + assertEquals(expectedCatalogs[i], dropDatabaseOperation.getCatalogName()); + assertEquals(expectedDatabase, dropDatabaseOperation.getDatabaseName()); + assertEquals(expectedIfExists[i], dropDatabaseOperation.isIfExists()); + assertEquals(expectedIsCascades[i], dropDatabaseOperation.isCascade()); + } + } + + @Test + public void testAlterDatabase() throws Exception { + catalogManager.registerCatalog("cat1", + new GenericInMemoryCatalog("default", "default")); + catalogManager.getCatalog("cat1").get() + .createDatabase("db1", + new CatalogDatabaseImpl(new HashMap<>(), "db1_comment"), + true); + final String sql = "alter database cat1.db1 set ('k1'='a')"; + Operation operation = parse(sql, SqlDialect.DEFAULT); + assert operation instanceof AlterDatabaseOperation; + assertEquals("db1", ((AlterDatabaseOperation) operation).getDatabaseName()); + assertEquals("cat1", ((AlterDatabaseOperation) operation).getCatalogName()); + assertEquals("db1_comment", ((AlterDatabaseOperation) operation).getCatalogDatabase().getComment()); + } + @Test public void testCreateTable() { final String sql = "CREATE TABLE tbl1 (\n" + @@ -472,6 +562,13 @@ private Operation parse(String sql, FlinkPlannerImpl planner, CalciteParser pars return SqlToOperationConverter.convert(planner, catalogManager, node).get(); } + private Operation parse(String sql, SqlDialect sqlDialect) { + FlinkPlannerImpl planner = getPlannerBySqlDialect(sqlDialect); + final CalciteParser parser = getParserBySqlDialect(sqlDialect); + SqlNode node = parser.parse(sql); + return SqlToOperationConverter.convert(planner, catalogManager, node).get(); + } + private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) { tableConfig.setSqlDialect(sqlDialect); return plannerContext.createFlinkPlanner(catalogManager.getCurrentCatalog(), diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala index b6750be560e11..257daedaaa629 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala @@ -21,12 +21,12 @@ package org.apache.flink.table.planner.catalog import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.api.internal.TableEnvironmentImpl import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, ValidationException} -import org.apache.flink.table.catalog.{CatalogFunctionImpl, GenericInMemoryCatalog, ObjectPath} +import org.apache.flink.table.catalog.{CatalogDatabaseImpl, CatalogFunctionImpl, GenericInMemoryCatalog, ObjectPath} import org.apache.flink.table.planner.expressions.utils.Func0 import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0 import org.apache.flink.types.Row -import org.junit.Assert.assertEquals +import org.junit.Assert.{assertEquals, fail} import org.junit.rules.ExpectedException import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -835,6 +835,105 @@ class CatalogTableITCase(isStreamingMode: Boolean) { tableEnv.sqlUpdate("use catalog cat2") assertEquals("cat2", tableEnv.getCurrentCatalog) } + + @Test + def testUseDatabase(): Unit = { + val catalog = new GenericInMemoryCatalog("cat1") + tableEnv.registerCatalog("cat1", catalog) + val catalogDB1 = new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1") + val catalogDB2 = new CatalogDatabaseImpl(new util.HashMap[String, String](), "db2") + catalog.createDatabase("db1", catalogDB1, true) + catalog.createDatabase("db2", catalogDB2, true) + tableEnv.sqlUpdate("use cat1.db1") + assertEquals("db1", tableEnv.getCurrentDatabase) + tableEnv.sqlUpdate("use db2") + assertEquals("db2", tableEnv.getCurrentDatabase) + } + + @Test + def testCreateDatabase: Unit = { + tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("default")) + tableEnv.registerCatalog("cat2", new GenericInMemoryCatalog("default")) + tableEnv.sqlUpdate("use catalog cat1") + tableEnv.sqlUpdate("create database db1 ") + tableEnv.sqlUpdate("create database if not exists db1 ") + try { + tableEnv.sqlUpdate("create database db1 ") + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + tableEnv.sqlUpdate("create database cat2.db1 comment 'test_comment'" + + " with ('k1' = 'v1', 'k2' = 'v2')") + val database = tableEnv.getCatalog("cat2").get().getDatabase("db1") + assertEquals("test_comment", database.getComment) + assertEquals(2, database.getProperties.size()) + val expectedProperty = new util.HashMap[String, String]() + expectedProperty.put("k1", "v1") + expectedProperty.put("k2", "v2") + assertEquals(expectedProperty, database.getProperties) + } + + @Test + def testDropDatabase: Unit = { + tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("default")) + tableEnv.sqlUpdate("use catalog cat1") + tableEnv.sqlUpdate("create database db1") + tableEnv.sqlUpdate("drop database db1") + tableEnv.sqlUpdate("drop database if exists db1") + try { + tableEnv.sqlUpdate("drop database db1") + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + tableEnv.sqlUpdate("create database db1") + tableEnv.sqlUpdate("use db1") + val ddl1 = + """ + |create table t1( + | a bigint, + | b bigint, + | c varchar + |) with ( + | 'connector' = 'COLLECTION' + |) + """.stripMargin + tableEnv.sqlUpdate(ddl1) + val ddl2 = + """ + |create table t2( + | a bigint, + | b bigint, + | c varchar + |) with ( + | 'connector' = 'COLLECTION' + |) + """.stripMargin + tableEnv.sqlUpdate(ddl2) + try { + tableEnv.sqlUpdate("drop database db1") + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + tableEnv.sqlUpdate("drop database db1 cascade") + } + + @Test + def testAlterDatabase: Unit = { + tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("default")) + tableEnv.sqlUpdate("use catalog cat1") + tableEnv.sqlUpdate("create database db1 comment 'db1_comment' with ('k1' = 'v1')") + tableEnv.sqlUpdate("alter database db1 set ('k1' = 'a', 'k2' = 'b')") + val database = tableEnv.getCatalog("cat1").get().getDatabase("db1") + assertEquals("db1_comment", database.getComment) + assertEquals(2, database.getProperties.size()) + val expectedProperty = new util.HashMap[String, String]() + expectedProperty.put("k1", "a") + expectedProperty.put("k2", "b") + assertEquals(expectedProperty, database.getProperties) + } } object CatalogTableITCase { diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java index 00051e109e8d5..9801f38c65511 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java @@ -18,27 +18,39 @@ package org.apache.flink.table.sqlexec; +import org.apache.flink.sql.parser.ddl.SqlAlterDatabase; +import org.apache.flink.sql.parser.ddl.SqlCreateDatabase; import org.apache.flink.sql.parser.ddl.SqlCreateTable; +import org.apache.flink.sql.parser.ddl.SqlDropDatabase; import org.apache.flink.sql.parser.ddl.SqlDropTable; import org.apache.flink.sql.parser.ddl.SqlTableColumn; import org.apache.flink.sql.parser.ddl.SqlTableOption; import org.apache.flink.sql.parser.ddl.SqlUseCatalog; +import org.apache.flink.sql.parser.ddl.SqlUseDatabase; import org.apache.flink.sql.parser.dml.RichSqlInsert; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.calcite.FlinkPlannerImpl; import org.apache.flink.table.calcite.FlinkTypeFactory; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.PlannerQueryOperation; import org.apache.flink.table.operations.UseCatalogOperation; +import org.apache.flink.table.operations.UseDatabaseOperation; +import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; +import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.DropDatabaseOperation; import org.apache.flink.table.operations.ddl.DropTableOperation; import org.apache.flink.table.types.utils.TypeConversions; @@ -106,6 +118,14 @@ public static Optional convert( return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated)); } else if (validated instanceof SqlUseCatalog) { return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated)); + } else if (validated instanceof SqlUseDatabase) { + return Optional.of(converter.convertUseDatabase((SqlUseDatabase) validated)); + } else if (validated instanceof SqlCreateDatabase) { + return Optional.of(converter.convertCreateDatabase((SqlCreateDatabase) validated)); + } else if (validated instanceof SqlDropDatabase) { + return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated)); + } else if (validated instanceof SqlAlterDatabase) { + return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated)); } else if (validated.getKind().belongsTo(SqlKind.QUERY)) { return Optional.of(converter.convertSqlQuery(validated)); } else { @@ -198,6 +218,81 @@ private Operation convertUseCatalog(SqlUseCatalog useCatalog) { return new UseCatalogOperation(useCatalog.getCatalogName()); } + /** Convert use database statement. */ + private Operation convertUseDatabase(SqlUseDatabase useDatabase) { + String[] fullDatabaseName = useDatabase.fullDatabaseName(); + if (fullDatabaseName.length > 2) { + throw new SqlConversionException("use database identifier format error"); + } + String catalogName = fullDatabaseName.length == 2 ? fullDatabaseName[0] : catalogManager.getCurrentCatalog(); + String databaseName = fullDatabaseName.length == 2 ? fullDatabaseName[1] : fullDatabaseName[0]; + return new UseDatabaseOperation(catalogName, databaseName); + } + + /** Convert CREATE DATABASE statement. */ + private Operation convertCreateDatabase(SqlCreateDatabase sqlCreateDatabase) { + String[] fullDatabaseName = sqlCreateDatabase.fullDatabaseName(); + if (fullDatabaseName.length > 2) { + throw new SqlConversionException("create database identifier format error"); + } + String catalogName = (fullDatabaseName.length == 1) ? catalogManager.getCurrentCatalog() : fullDatabaseName[0]; + String databaseName = (fullDatabaseName.length == 1) ? fullDatabaseName[0] : fullDatabaseName[1]; + boolean ignoreIfExists = sqlCreateDatabase.isIfNotExists(); + String databaseComment = sqlCreateDatabase.getComment() + .map(comment -> comment.getNlsString().getValue()).orElse(null); + // set with properties + Map properties = new HashMap<>(); + sqlCreateDatabase.getPropertyList().getList().forEach(p -> + properties.put(((SqlTableOption) p).getKeyString().toLowerCase(), + ((SqlTableOption) p).getValueString())); + CatalogDatabase catalogDatabase = new CatalogDatabaseImpl(properties, databaseComment); + return new CreateDatabaseOperation(catalogName, databaseName, catalogDatabase, ignoreIfExists); + } + + /** Convert DROP DATABASE statement. */ + private Operation convertDropDatabase(SqlDropDatabase sqlDropDatabase) { + String[] fullDatabaseName = sqlDropDatabase.fullDatabaseName(); + if (fullDatabaseName.length > 2) { + throw new SqlConversionException("drop database identifier format error"); + } + String catalogName = (fullDatabaseName.length == 1) ? catalogManager.getCurrentCatalog() : fullDatabaseName[0]; + String databaseName = (fullDatabaseName.length == 1) ? fullDatabaseName[0] : fullDatabaseName[1]; + return new DropDatabaseOperation( + catalogName, + databaseName, + sqlDropDatabase.getIfExists(), + sqlDropDatabase.isCascade()); + } + + /** Convert ALTER DATABASE statement. */ + private Operation convertAlterDatabase(SqlAlterDatabase sqlAlterDatabase) { + String[] fullDatabaseName = sqlAlterDatabase.fullDatabaseName(); + if (fullDatabaseName.length > 2) { + throw new SqlConversionException("alter database identifier format error"); + } + String catalogName = (fullDatabaseName.length == 1) ? catalogManager.getCurrentCatalog() : fullDatabaseName[0]; + String databaseName = (fullDatabaseName.length == 1) ? fullDatabaseName[0] : fullDatabaseName[1]; + Map properties = new HashMap<>(); + CatalogDatabase originCatalogDatabase; + Optional catalog = catalogManager.getCatalog(catalogName); + if (catalog.isPresent()) { + try { + originCatalogDatabase = catalog.get().getDatabase(databaseName); + properties.putAll(originCatalogDatabase.getProperties()); + } catch (DatabaseNotExistException e) { + throw new SqlConversionException(String.format("Database %s not exists", databaseName), e); + } + } else { + throw new SqlConversionException(String.format("Catalog %s not exists", catalogName)); + } + // set with properties + sqlAlterDatabase.getPropertyList().getList().forEach(p -> + properties.put(((SqlTableOption) p).getKeyString().toLowerCase(), + ((SqlTableOption) p).getValueString())); + CatalogDatabase catalogDatabase = new CatalogDatabaseImpl(properties, originCatalogDatabase.getComment()); + return new AlterDatabaseOperation(catalogName, databaseName, catalogDatabase); + } + //~ Tools ------------------------------------------------------------------ /** diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 8798e092daae2..090806e9cbbe2 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -30,20 +30,21 @@ import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction, _} import org.apache.flink.table.module.{Module, ModuleManager} import org.apache.flink.table.operations.ddl.{CreateTableOperation, DropTableOperation} +import org.apache.flink.table.operations.ddl._ import org.apache.flink.table.operations.utils.OperationTreeBuilder import org.apache.flink.table.operations.{CatalogQueryOperation, TableSourceQueryOperation, _} import org.apache.flink.table.planner.{ParserImpl, PlanningConfigurationBuilder} import org.apache.flink.table.sinks.{OverwritableTableSink, PartitionableTableSink, TableSink, TableSinkUtils} import org.apache.flink.table.sources.TableSource import org.apache.flink.table.util.JavaScalaConversionUtil - import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.tools.FrameworkConfig - import _root_.java.util.function.{Supplier => JSupplier} import _root_.java.util.{Optional, HashMap => JHashMap, Map => JMap} +import org.apache.flink.table.catalog.exceptions.{DatabaseAlreadyExistException, DatabaseNotEmptyException, DatabaseNotExistException} + import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.JavaConversions._ import _root_.scala.util.Try @@ -117,6 +118,11 @@ abstract class TableEnvImpl( def getConfig: TableConfig = config + private val UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG = + "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " + + "INSERT, CREATE TABLE, DROP TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " + + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE" + private def isStreamingMode: Boolean = this match { case _: BatchTableEnvImpl => false case _ => true @@ -466,9 +472,7 @@ abstract class TableEnvImpl( override def sqlUpdate(stmt: String): Unit = { val operations = parser.parse(stmt) - if (operations.size != 1) throw new TableException( - "Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " + - "INSERT, CREATE TABLE, DROP TABLE, USE CATALOG") + if (operations.size != 1) throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG) operations.get(0) match { case op: CatalogSinkModifyOperation => @@ -481,18 +485,70 @@ abstract class TableEnvImpl( createTableOperation.getCatalogTable, createTableOperation.getTableIdentifier, createTableOperation.isIgnoreIfExists) + case createDatabaseOperation: CreateDatabaseOperation => + val catalog = getCatalogOrThrowException(createDatabaseOperation.getCatalogName) + val exMsg = getDDLOpExecuteErrorMsg(createDatabaseOperation.asSummaryString) + try { + catalog.createDatabase( + createDatabaseOperation.getDatabaseName, + createDatabaseOperation.getCatalogDatabase, + createDatabaseOperation.isIgnoreIfExists) + } catch { + case ex: DatabaseAlreadyExistException => throw new ValidationException(exMsg, ex) + case ex: Exception => throw new TableException(exMsg, ex) + } case dropTableOperation: DropTableOperation => catalogManager.dropTable( dropTableOperation.getTableIdentifier, dropTableOperation.isIfExists) + case dropDatabaseOperation: DropDatabaseOperation => + val catalog = getCatalogOrThrowException(dropDatabaseOperation.getCatalogName) + val exMsg = getDDLOpExecuteErrorMsg(dropDatabaseOperation.asSummaryString) + try { + catalog.dropDatabase( + dropDatabaseOperation.getDatabaseName, + dropDatabaseOperation.isIfExists, + dropDatabaseOperation.isCascade) + } catch { + case ex: DatabaseNotEmptyException => throw new ValidationException(exMsg, ex) + case ex: DatabaseNotExistException => throw new ValidationException(exMsg, ex) + case ex: Exception => throw new TableException(exMsg, ex) + } + case alterDatabaseOperation: AlterDatabaseOperation => + val catalog = getCatalogOrThrowException(alterDatabaseOperation.getCatalogName) + val exMsg = getDDLOpExecuteErrorMsg(alterDatabaseOperation.asSummaryString) + try { + catalog.alterDatabase( + alterDatabaseOperation.getDatabaseName, + alterDatabaseOperation.getCatalogDatabase, + false) + } catch { + case ex: DatabaseNotExistException => throw new ValidationException(exMsg, ex) + case ex: Exception => throw new TableException(exMsg, ex) + } case useCatalogOperation: UseCatalogOperation => catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName) - case _ => throw new TableException( - "Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of " + - "type INSERT, CREATE TABLE, DROP TABLE, USE CATALOG") + case useDatabaseOperation: UseDatabaseOperation => + catalogManager.setCurrentCatalog(useDatabaseOperation.getCatalogName) + catalogManager.setCurrentDatabase(useDatabaseOperation.getDatabaseName) + case _ => throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG) } } + /** Get catalog from catalogName or throw a ValidationException if the catalog not exists. */ + private def getCatalogOrThrowException(catalogName: String): Catalog = { + getCatalog(catalogName) + .orElseThrow( + new JSupplier[Throwable] { + override def get() = new ValidationException( + String.format("Catalog %s does not exist", catalogName)) + }) + } + + private def getDDLOpExecuteErrorMsg(action: String):String = { + String.format("Could not execute %s", action) + } + protected def createTable(tableOperation: QueryOperation): TableImpl = { TableImpl.createTable( this, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java index da0e68ec74b13..b50bd3eabe069 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java @@ -29,6 +29,7 @@ import org.apache.flink.table.calcite.CalciteParser; import org.apache.flink.table.calcite.FlinkPlannerImpl; import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogManagerCalciteSchema; import org.apache.flink.table.catalog.CatalogTable; @@ -45,7 +46,11 @@ import org.apache.flink.table.operations.CatalogSinkModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.UseCatalogOperation; +import org.apache.flink.table.operations.UseDatabaseOperation; +import org.apache.flink.table.operations.ddl.AlterDatabaseOperation; +import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.operations.ddl.DropDatabaseOperation; import org.apache.flink.table.planner.PlanningConfigurationBuilder; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.TypeConversions; @@ -117,13 +122,98 @@ public void after() throws TableNotExistException { @Test public void testUseCatalog() { final String sql = "USE CATALOG cat1"; - FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); - SqlNode node = getParserBySqlDialect(SqlDialect.DEFAULT).parse(sql); - Operation operation = SqlToOperationConverter.convert(planner, catalogManager, node).get(); + Operation operation = parse(sql, SqlDialect.DEFAULT); assert operation instanceof UseCatalogOperation; assertEquals("cat1", ((UseCatalogOperation) operation).getCatalogName()); } + @Test + public void testUseDatabase() { + final String sql1 = "USE db1"; + Operation operation1 = parse(sql1, SqlDialect.DEFAULT); + assert operation1 instanceof UseDatabaseOperation; + assertEquals("builtin", ((UseDatabaseOperation) operation1).getCatalogName()); + assertEquals("db1", ((UseDatabaseOperation) operation1).getDatabaseName()); + + final String sql2 = "USE cat1.db1"; + Operation operation2 = parse(sql2, SqlDialect.DEFAULT); + assert operation2 instanceof UseDatabaseOperation; + assertEquals("cat1", ((UseDatabaseOperation) operation2).getCatalogName()); + assertEquals("db1", ((UseDatabaseOperation) operation2).getDatabaseName()); + } + + @Test(expected = SqlConversionException.class) + public void testUseDatabaseWithException() { + final String sql = "USE cat1.db1.tbl1"; + Operation operation = parse(sql, SqlDialect.DEFAULT); + } + + @Test + public void testCreateDatabase() { + final String[] createDatabaseSqls = new String[] { + "create database db1", + "create database if not exists cat1.db1", + "create database cat1.db1 comment 'db1_comment'", + "create database cat1.db1 comment 'db1_comment' with ('k1' = 'v1', 'k2' = 'v2')" + }; + final String[] expectedCatalogs = new String[] {"builtin", "cat1", "cat1", "cat1"}; + final String expectedDatabase = "db1"; + final String[] expectedComments = new String[] {null, null, "db1_comment", "db1_comment"}; + final boolean[] expectedIgnoreIfExists = new boolean[] {false, true, false, false}; + final int[] expectedPropertySize = new int[] {0, 0, 0, 2}; + + for (int i = 0; i < createDatabaseSqls.length; i++) { + Operation operation = parse(createDatabaseSqls[i], SqlDialect.DEFAULT); + assert operation instanceof CreateDatabaseOperation; + final CreateDatabaseOperation createDatabaseOperation = (CreateDatabaseOperation) operation; + assertEquals(expectedCatalogs[i], createDatabaseOperation.getCatalogName()); + assertEquals(expectedDatabase, createDatabaseOperation.getDatabaseName()); + assertEquals(expectedComments[i], createDatabaseOperation.getCatalogDatabase().getComment()); + assertEquals(expectedIgnoreIfExists[i], createDatabaseOperation.isIgnoreIfExists()); + assertEquals(expectedPropertySize[i], createDatabaseOperation.getCatalogDatabase().getProperties().size()); + } + } + + @Test + public void testAlterDatabase() throws Exception { + catalogManager.registerCatalog("cat1", + new GenericInMemoryCatalog("default", "default")); + catalogManager.getCatalog("cat1").get() + .createDatabase("db1", + new CatalogDatabaseImpl(new HashMap<>(), "db1_comment"), + true); + final String sql = "alter database cat1.db1 set ('k1'='a')"; + Operation operation = parse(sql, SqlDialect.DEFAULT); + assert operation instanceof AlterDatabaseOperation; + assertEquals("db1", ((AlterDatabaseOperation) operation).getDatabaseName()); + assertEquals("cat1", ((AlterDatabaseOperation) operation).getCatalogName()); + assertEquals("db1_comment", ((AlterDatabaseOperation) operation).getCatalogDatabase().getComment()); + } + + @Test + public void testDropDatabase() { + final String[] dropDatabaseSqls = new String[] { + "drop database db1", + "drop database if exists db1", + "drop database if exists cat1.db1 CASCADE", + "drop database if exists cat1.db1 RESTRICT" + }; + final String[] expectedCatalogs = new String[] {"builtin", "builtin", "cat1", "cat1"}; + final String expectedDatabase = "db1"; + final boolean[] expectedIfExists = new boolean[] {false, true, true, true}; + final boolean[] expectedIsCascades = new boolean[] {false, false, true, false}; + + for (int i = 0; i < dropDatabaseSqls.length; i++) { + Operation operation = parse(dropDatabaseSqls[i], SqlDialect.DEFAULT); + assert operation instanceof DropDatabaseOperation; + final DropDatabaseOperation dropDatabaseOperation = (DropDatabaseOperation) operation; + assertEquals(expectedCatalogs[i], dropDatabaseOperation.getCatalogName()); + assertEquals(expectedDatabase, dropDatabaseOperation.getDatabaseName()); + assertEquals(expectedIfExists[i], dropDatabaseOperation.isIfExists()); + assertEquals(expectedIsCascades[i], dropDatabaseOperation.isCascade()); + } + } + @Test public void testCreateTable() { final String sql = "CREATE TABLE tbl1 (\n" + @@ -442,6 +532,13 @@ private FlinkPlannerImpl getPlannerBySqlDialect(SqlDialect sqlDialect) { catalogManager.getCurrentDatabase()); } + private Operation parse(String sql, SqlDialect sqlDialect) { + FlinkPlannerImpl planner = getPlannerBySqlDialect(sqlDialect); + final CalciteParser parser = getParserBySqlDialect(sqlDialect); + SqlNode node = parser.parse(sql); + return SqlToOperationConverter.convert(planner, catalogManager, node).get(); + } + //~ Inner Classes ---------------------------------------------------------- private static class TestItem { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala index fa5db23c9f823..0c4eb6301047e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala @@ -24,12 +24,10 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvir import org.apache.flink.table.api.{TableEnvironment, ValidationException} import org.apache.flink.table.factories.utils.TestCollectionTableFactory import org.apache.flink.types.Row - -import org.junit.Assert.assertEquals +import org.junit.Assert.{assertEquals, fail} import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{Before, Ignore, Test} - import java.util import scala.collection.JavaConversions._ @@ -544,6 +542,105 @@ class CatalogTableITCase(isStreaming: Boolean) { tableEnv.sqlUpdate("use catalog cat2") assertEquals("cat2", tableEnv.getCurrentCatalog) } + + @Test + def testUseDatabase(): Unit = { + val catalog = new GenericInMemoryCatalog("cat1") + tableEnv.registerCatalog("cat1", catalog) + val catalogDB1 = new CatalogDatabaseImpl(new util.HashMap[String, String](), "db1") + val catalogDB2 = new CatalogDatabaseImpl(new util.HashMap[String, String](), "db2") + catalog.createDatabase("db1", catalogDB1, true) + catalog.createDatabase("db2", catalogDB2, true) + tableEnv.sqlUpdate("use cat1.db1") + assertEquals("db1", tableEnv.getCurrentDatabase) + tableEnv.sqlUpdate("use db2") + assertEquals("db2", tableEnv.getCurrentDatabase) + } + + @Test + def testCreateDatabase: Unit = { + tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("default")) + tableEnv.registerCatalog("cat2", new GenericInMemoryCatalog("default")) + tableEnv.sqlUpdate("use catalog cat1") + tableEnv.sqlUpdate("create database db1 ") + tableEnv.sqlUpdate("create database if not exists db1 ") + try { + tableEnv.sqlUpdate("create database db1 ") + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + tableEnv.sqlUpdate("create database cat2.db1 comment 'test_comment'" + + " with ('k1' = 'v1', 'k2' = 'v2')") + val database = tableEnv.getCatalog("cat2").get().getDatabase("db1") + assertEquals("test_comment", database.getComment) + assertEquals(2, database.getProperties.size()) + val expectedProperty = new util.HashMap[String, String]() + expectedProperty.put("k1", "v1") + expectedProperty.put("k2", "v2") + assertEquals(expectedProperty, database.getProperties) + } + + @Test + def testDropDatabase: Unit = { + tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("default")) + tableEnv.sqlUpdate("use catalog cat1") + tableEnv.sqlUpdate("create database db1") + tableEnv.sqlUpdate("drop database db1") + tableEnv.sqlUpdate("drop database if exists db1") + try { + tableEnv.sqlUpdate("drop database db1") + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + tableEnv.sqlUpdate("create database db1") + tableEnv.sqlUpdate("use db1") + val ddl1 = + """ + |create table t1( + | a bigint, + | b bigint, + | c varchar + |) with ( + | 'connector' = 'COLLECTION' + |) + """.stripMargin + tableEnv.sqlUpdate(ddl1) + val ddl2 = + """ + |create table t2( + | a bigint, + | b bigint, + | c varchar + |) with ( + | 'connector' = 'COLLECTION' + |) + """.stripMargin + tableEnv.sqlUpdate(ddl2) + try { + tableEnv.sqlUpdate("drop database db1") + fail("ValidationException expected") + } catch { + case _: ValidationException => //ignore + } + tableEnv.sqlUpdate("drop database db1 cascade") + } + + @Test + def testAlterDatabase: Unit = { + tableEnv.registerCatalog("cat1", new GenericInMemoryCatalog("default")) + tableEnv.sqlUpdate("use catalog cat1") + tableEnv.sqlUpdate("create database db1 comment 'db1_comment' with ('k1' = 'v1')") + tableEnv.sqlUpdate("alter database db1 set ('k1' = 'a', 'k2' = 'b')") + val database = tableEnv.getCatalog("cat1").get().getDatabase("db1") + assertEquals("db1_comment", database.getComment) + assertEquals(2, database.getProperties.size()) + val expectedProperty = new util.HashMap[String, String]() + expectedProperty.put("k1", "a") + expectedProperty.put("k2", "b") + assertEquals(expectedProperty, database.getProperties) + } } object CatalogTableITCase {