Skip to content

Commit

Permalink
[FLINK-14691][table] Support create/drop/alter/use database in TableE…
Browse files Browse the repository at this point in the history
…nvironment

This closes apache#10296
  • Loading branch information
zjuwangg authored and KurtYoung committed Dec 4, 2019
1 parent 2bd9cec commit 02a8c41
Show file tree
Hide file tree
Showing 24 changed files with 1,089 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,10 @@ public boolean databaseExists(String databaseName) throws CatalogException {
}

@Override
public void dropDatabase(String name, boolean ignoreIfNotExists) throws DatabaseNotExistException,
DatabaseNotEmptyException, CatalogException {
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
try {
client.dropDatabase(name, true, ignoreIfNotExists);
client.dropDatabase(name, true, ignoreIfNotExists, cascade);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(getName(), name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ public void dropDatabase(String name, boolean deleteData, boolean ignoreIfNotExi
client.dropDatabase(name, deleteData, ignoreIfNotExists);
}

public void dropDatabase(String name, boolean deleteData, boolean ignoreIfNotExists, boolean cascade)
throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
client.dropDatabase(name, deleteData, ignoreIfNotExists, cascade);
}

public void alterDatabase(String name, Database database) throws NoSuchObjectException, MetaException, TException {
client.alterDatabase(name, database);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ SqlDrop SqlDropDatabase(Span s, boolean replace) :
{
SqlIdentifier databaseName = null;
boolean ifExists = false;
boolean isRestrict = true;
boolean cascade = false;
}
{
<DATABASE>
Expand All @@ -151,13 +151,13 @@ SqlDrop SqlDropDatabase(Span s, boolean replace) :

databaseName = CompoundIdentifier()
[
<RESTRICT> { isRestrict = true; }
<RESTRICT> { cascade = false; }
|
<CASCADE> { isRestrict = false; }
<CASCADE> { cascade = true; }
]

{
return new SqlDropDatabase(s.pos(), databaseName, ifExists, isRestrict);
return new SqlDropDatabase(s.pos(), databaseName, ifExists, cascade);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
*/
public class SqlAlterDatabase extends SqlCall {

public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("ALTER DATABASE", SqlKind.OTHER);
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("ALTER DATABASE", SqlKind.OTHER_DDL);

private final SqlIdentifier databaseName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
*/
public class SqlCreateDatabase extends SqlCreate implements ExtendedSqlNode {

public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE DATABASE", SqlKind.OTHER);
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE DATABASE", SqlKind.OTHER_DDL);

private final SqlIdentifier databaseName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,20 @@
*/
public class SqlDropDatabase extends SqlDrop implements ExtendedSqlNode {
private static final SqlOperator OPERATOR =
new SqlSpecialOperator("DROP DATABASE", SqlKind.OTHER);
new SqlSpecialOperator("DROP DATABASE", SqlKind.OTHER_DDL);

private SqlIdentifier databaseName;
private boolean ifExists;
private boolean isRestrict = true;
private final SqlIdentifier databaseName;
private final boolean ifExists;
private final boolean isCascade;

public SqlDropDatabase(SqlParserPos pos,
SqlIdentifier databaseName,
boolean ifExists,
boolean isRestrict) {
boolean isCascade) {
super(OPERATOR, pos, ifExists);
this.databaseName = databaseName;
this.ifExists = ifExists;
this.isRestrict = isRestrict;
this.isCascade = isCascade;
}

@Override
Expand All @@ -62,16 +62,12 @@ public SqlIdentifier getDatabaseName() {
return databaseName;
}

public void setDatabaseName(SqlIdentifier viewName) {
this.databaseName = viewName;
}

public boolean getIfExists() {
return this.ifExists;
}

public void setIfExists(boolean ifExists) {
this.ifExists = ifExists;
public boolean isCascade() {
return isCascade;
}

@Override
Expand All @@ -82,10 +78,10 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("IF EXISTS");
}
databaseName.unparse(writer, leftPrec, rightPrec);
if (isRestrict) {
writer.keyword("RESTRICT");
} else {
if (isCascade) {
writer.keyword("CASCADE");
} else {
writer.keyword("RESTRICT");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@
*/
public class SqlUseDatabase extends SqlCall {

public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE DATABASE", SqlKind.OTHER);
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("USE DATABASE", SqlKind.OTHER_DDL);
private final SqlIdentifier databaseName;

public SqlUseDatabase(SqlParserPos pos, SqlIdentifier databaseName) {
super(pos);
this.databaseName = databaseName;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Parser;
Expand All @@ -59,7 +62,11 @@
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.TableSourceQueryOperation;
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.sinks.TableSink;
Expand Down Expand Up @@ -94,6 +101,10 @@ public class TableEnvironmentImpl implements TableEnvironment {
protected final FunctionCatalog functionCatalog;
protected final Planner planner;
protected final Parser parser;
private static final String UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG =
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
"INSERT, CREATE TABLE, DROP TABLE, USE CATALOG, USE [CATALOG.]DATABASE, " +
"CREATE DATABASE, DROP DATABASE, ALTER DATABASE";

/**
* Provides necessary methods for {@link ConnectTableDescriptor}.
Expand Down Expand Up @@ -449,9 +460,7 @@ public void sqlUpdate(String stmt) {
List<Operation> operations = parser.parse(stmt);

if (operations.size() != 1) {
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
"INSERT, CREATE TABLE, DROP TABLE, USE CATALOG");
throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);
}

Operation operation = operations.get(0);
Expand All @@ -469,21 +478,75 @@ public void sqlUpdate(String stmt) {
createTableOperation.getCatalogTable(),
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
} else if (operation instanceof CreateDatabaseOperation) {
CreateDatabaseOperation createDatabaseOperation = (CreateDatabaseOperation) operation;
Catalog catalog = getCatalogOrThrowException(createDatabaseOperation.getCatalogName());
String exMsg = getDDLOpExecuteErrorMsg(createDatabaseOperation.asSummaryString());
try {
catalog.createDatabase(
createDatabaseOperation.getDatabaseName(),
createDatabaseOperation.getCatalogDatabase(),
createDatabaseOperation.isIgnoreIfExists());
} catch (DatabaseAlreadyExistException e) {
throw new ValidationException(exMsg, e);
} catch (Exception e) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof DropTableOperation) {
DropTableOperation dropTableOperation = (DropTableOperation) operation;
catalogManager.dropTable(
dropTableOperation.getTableIdentifier(),
dropTableOperation.isIfExists());
} else if (operation instanceof DropDatabaseOperation) {
DropDatabaseOperation dropDatabaseOperation = (DropDatabaseOperation) operation;
Catalog catalog = getCatalogOrThrowException(dropDatabaseOperation.getCatalogName());
String exMsg = getDDLOpExecuteErrorMsg(dropDatabaseOperation.asSummaryString());
try {
catalog.dropDatabase(
dropDatabaseOperation.getDatabaseName(),
dropDatabaseOperation.isIfExists(),
dropDatabaseOperation.isCascade());
} catch (DatabaseNotExistException | DatabaseNotEmptyException e) {
throw new ValidationException(exMsg, e);
} catch (Exception e) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof AlterDatabaseOperation) {
AlterDatabaseOperation alterDatabaseOperation = (AlterDatabaseOperation) operation;
Catalog catalog = getCatalogOrThrowException(alterDatabaseOperation.getCatalogName());
String exMsg = getDDLOpExecuteErrorMsg(alterDatabaseOperation.asSummaryString());
try {
catalog.alterDatabase(
alterDatabaseOperation.getDatabaseName(),
alterDatabaseOperation.getCatalogDatabase(),
false);
} catch (DatabaseNotExistException e) {
throw new ValidationException(exMsg, e);
} catch (Exception e) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof UseCatalogOperation) {
UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation;
catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName());
} else if (operation instanceof UseDatabaseOperation) {
UseDatabaseOperation useDatabaseOperation = (UseDatabaseOperation) operation;
catalogManager.setCurrentCatalog(useDatabaseOperation.getCatalogName());
catalogManager.setCurrentDatabase(useDatabaseOperation.getDatabaseName());
} else {
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of " +
"type INSERT, CREATE TABLE, DROP TABLE, USE CATALOG");
throw new TableException(UNSUPPORTED_QUERY_IN_SQL_UPDATE_MSG);
}
}

/** Get catalog from catalogName or throw a ValidationException if the catalog not exists. */
private Catalog getCatalogOrThrowException(String catalogName) {
return getCatalog(catalogName)
.orElseThrow(() -> new ValidationException(String.format("Catalog %s does not exist.", catalogName)));
}

private String getDDLOpExecuteErrorMsg(String action) {
return String.format("Could not execute %s ", action);
}

@Override
public String getCurrentCatalog() {
return catalogManager.getCurrentCatalog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void createDatabase(String databaseName, CatalogDatabase db, boolean igno
}

@Override
public void dropDatabase(String databaseName, boolean ignoreIfNotExists)
public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));

Expand All @@ -116,6 +116,27 @@ public void dropDatabase(String databaseName, boolean ignoreIfNotExists)
// Make sure the database is empty
if (isDatabaseEmpty(databaseName)) {
databases.remove(databaseName);
} else if (cascade) {
// delete all tables in this database and then delete the database.
List<ObjectPath> deleteTablePaths = tables.keySet().stream()
.filter(op -> op.getDatabaseName().equals(databaseName)).collect(Collectors.toList());
deleteTablePaths.forEach(objectPath -> {
try {
dropTable(objectPath, true);
} catch (TableNotExistException e) {
//ignore
}
});
List<ObjectPath> deleteFunctionPaths = functions.keySet().stream()
.filter(op -> op.getDatabaseName().equals(databaseName)).collect(Collectors.toList());
deleteFunctionPaths.forEach(objectPath -> {
try {
dropFunction(objectPath, true);
} catch (FunctionNotExistException e) {
//ignore
}
});
databases.remove(databaseName);
} else {
throw new DatabaseNotEmptyException(getName(), databaseName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations;

/**
* Operation to describe a USE [catalogName.]dataBaseName statement.
*/
public class UseDatabaseOperation implements UseOperation {

private final String catalogName;
private final String databaseName;

public UseDatabaseOperation(String catalogName, String databaseName) {
this.catalogName = catalogName;
this.databaseName = databaseName;
}

public String getCatalogName() {
return catalogName;
}

public String getDatabaseName() {
return databaseName;
}

@Override
public String asSummaryString() {
return String.format("USE %s.%s", catalogName, databaseName);
}
}
Loading

0 comments on commit 02a8c41

Please sign in to comment.