Skip to content

Commit

Permalink
[FLINK-31505][table] Move execution logic of DropOperation out from T…
Browse files Browse the repository at this point in the history
…ableEnvironmentImpl (apache#22209)
  • Loading branch information
snuyanzin committed Mar 19, 2023
1 parent de258f3 commit 06be368
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@
package org.apache.flink.table.api.internal;

import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExecutableOperation;

/** A simple implementation of {@link ExecutableOperation.Context}. */
public class ExecutableOperationContextImpl implements ExecutableOperation.Context {

private final CatalogManager catalogManager;
private final FunctionCatalog functionCatalog;
private final ModuleManager moduleManager;

public ExecutableOperationContextImpl(
CatalogManager catalogManager, ModuleManager moduleManager) {
CatalogManager catalogManager,
FunctionCatalog functionCatalog,
ModuleManager moduleManager) {
this.catalogManager = catalogManager;
this.functionCatalog = functionCatalog;
this.moduleManager = moduleManager;
}

Expand All @@ -39,6 +44,11 @@ public CatalogManager getCatalogManager() {
return catalogManager;
}

@Override
public FunctionCatalog getFunctionCatalog() {
return functionCatalog;
}

@Override
public ModuleManager getModuleManager() {
return moduleManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
Expand Down Expand Up @@ -115,7 +114,6 @@
import org.apache.flink.table.operations.StatementSetOperation;
import org.apache.flink.table.operations.TableSourceQueryOperation;
import org.apache.flink.table.operations.UnloadModuleOperation;
import org.apache.flink.table.operations.UseModulesOperation;
import org.apache.flink.table.operations.command.AddJarOperation;
import org.apache.flink.table.operations.command.ExecutePlanOperation;
import org.apache.flink.table.operations.command.ShowJarsOperation;
Expand All @@ -140,13 +138,7 @@
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateViewOperation;
import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.DropCatalogOperation;
import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.operations.ddl.DropPartitionsOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.resource.ResourceType;
Expand Down Expand Up @@ -262,7 +254,8 @@ protected TableEnvironmentImpl(
isStreamingMode);
catalogManager.initSchemaResolver(
isStreamingMode, operationTreeBuilder.getResolverBuilder());
this.operationCtx = new ExecutableOperationContextImpl(catalogManager, moduleManager);
this.operationCtx =
new ExecutableOperationContextImpl(catalogManager, functionCatalog, moduleManager);
}

public static TableEnvironmentImpl create(Configuration configuration) {
Expand Down Expand Up @@ -1010,16 +1003,6 @@ public TableResultInternal executeInternal(Operation operation) {
createTableOperation.isIgnoreIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof DropTableOperation) {
DropTableOperation dropTableOperation = (DropTableOperation) operation;
if (dropTableOperation.isTemporary()) {
catalogManager.dropTemporaryTable(
dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());
} else {
catalogManager.dropTable(
dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof AlterTableOperation) {
AlterTableOperation alterTableOperation = (AlterTableOperation) operation;
Catalog catalog =
Expand Down Expand Up @@ -1111,16 +1094,6 @@ public TableResultInternal executeInternal(Operation operation) {
createViewOperation.isIgnoreIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof DropViewOperation) {
DropViewOperation dropViewOperation = (DropViewOperation) operation;
if (dropViewOperation.isTemporary()) {
catalogManager.dropTemporaryView(
dropViewOperation.getViewIdentifier(), dropViewOperation.isIfExists());
} else {
catalogManager.dropView(
dropViewOperation.getViewIdentifier(), dropViewOperation.isIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof AlterViewOperation) {
AlterViewOperation alterViewOperation = (AlterViewOperation) operation;
Catalog catalog =
Expand Down Expand Up @@ -1171,21 +1144,6 @@ public TableResultInternal executeInternal(Operation operation) {
} catch (Exception e) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof DropDatabaseOperation) {
DropDatabaseOperation dropDatabaseOperation = (DropDatabaseOperation) operation;
Catalog catalog = getCatalogOrThrowException(dropDatabaseOperation.getCatalogName());
String exMsg = getDDLOpExecuteErrorMsg(dropDatabaseOperation.asSummaryString());
try {
catalog.dropDatabase(
dropDatabaseOperation.getDatabaseName(),
dropDatabaseOperation.isIfExists(),
dropDatabaseOperation.isCascade());
return TableResultImpl.TABLE_RESULT_OK;
} catch (DatabaseNotExistException | DatabaseNotEmptyException e) {
throw new ValidationException(exMsg, e);
} catch (Exception e) {
throw new TableException(exMsg, e);
}
} else if (operation instanceof AlterDatabaseOperation) {
AlterDatabaseOperation alterDatabaseOperation = (AlterDatabaseOperation) operation;
Catalog catalog = getCatalogOrThrowException(alterDatabaseOperation.getCatalogName());
Expand All @@ -1205,10 +1163,6 @@ public TableResultInternal executeInternal(Operation operation) {
return createCatalogFunction((CreateCatalogFunctionOperation) operation);
} else if (operation instanceof CreateTempSystemFunctionOperation) {
return createSystemFunction((CreateTempSystemFunctionOperation) operation);
} else if (operation instanceof DropCatalogFunctionOperation) {
return dropCatalogFunction((DropCatalogFunctionOperation) operation);
} else if (operation instanceof DropTempSystemFunctionOperation) {
return dropSystemFunction((DropTempSystemFunctionOperation) operation);
} else if (operation instanceof AlterCatalogFunctionOperation) {
return alterCatalogFunction((AlterCatalogFunctionOperation) operation);
} else if (operation instanceof AddJarOperation) {
Expand All @@ -1217,16 +1171,6 @@ public TableResultInternal executeInternal(Operation operation) {
return buildShowResult("jars", listJars());
} else if (operation instanceof CreateCatalogOperation) {
return createCatalog((CreateCatalogOperation) operation);
} else if (operation instanceof DropCatalogOperation) {
DropCatalogOperation dropCatalogOperation = (DropCatalogOperation) operation;
String exMsg = getDDLOpExecuteErrorMsg(dropCatalogOperation.asSummaryString());
try {
catalogManager.unregisterCatalog(
dropCatalogOperation.getCatalogName(), dropCatalogOperation.isIfExists());
return TableResultImpl.TABLE_RESULT_OK;
} catch (CatalogException e) {
throw new ValidationException(exMsg, e);
}
} else if (operation instanceof LoadModuleOperation) {
return loadModule((LoadModuleOperation) operation);
} else if (operation instanceof UnloadModuleOperation) {
Expand Down Expand Up @@ -1500,16 +1444,6 @@ private TableResultInternal unloadModule(UnloadModuleOperation operation) {
}
}

private TableResultInternal useModules(UseModulesOperation operation) {
String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
try {
moduleManager.useModules(operation.getModuleNames().toArray(new String[0]));
return TableResultImpl.TABLE_RESULT_OK;
} catch (ValidationException e) {
throw new ValidationException(String.format("%s. %s", exMsg, e.getMessage()), e);
}
}

private TableResultInternal buildShowResult(String columnName, String[] objects) {
return buildResult(
new String[] {columnName},
Expand Down Expand Up @@ -1941,35 +1875,6 @@ private TableResultInternal alterCatalogFunction(
}
}

private TableResultInternal dropCatalogFunction(
DropCatalogFunctionOperation dropCatalogFunctionOperation) {
String exMsg = getDDLOpExecuteErrorMsg(dropCatalogFunctionOperation.asSummaryString());
try {
if (dropCatalogFunctionOperation.isTemporary()) {
functionCatalog.dropTempCatalogFunction(
dropCatalogFunctionOperation.getFunctionIdentifier(),
dropCatalogFunctionOperation.isIfExists());
} else {
Catalog catalog =
getCatalogOrThrowException(
dropCatalogFunctionOperation
.getFunctionIdentifier()
.getCatalogName());

catalog.dropFunction(
dropCatalogFunctionOperation.getFunctionIdentifier().toObjectPath(),
dropCatalogFunctionOperation.isIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} catch (ValidationException e) {
throw e;
} catch (FunctionNotExistException e) {
throw new ValidationException(e.getMessage(), e);
} catch (Exception e) {
throw new TableException(exMsg, e);
}
}

private TableResultInternal createSystemFunction(CreateTempSystemFunctionOperation operation) {
String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());

Expand All @@ -1986,18 +1891,6 @@ private TableResultInternal createSystemFunction(CreateTempSystemFunctionOperati
}
}

private TableResultInternal dropSystemFunction(DropTempSystemFunctionOperation operation) {
try {
functionCatalog.dropTemporarySystemFunction(
operation.getFunctionName(), operation.isIfExists());
return TableResultImpl.TABLE_RESULT_OK;
} catch (ValidationException e) {
throw e;
} catch (Exception e) {
throw new TableException(getDDLOpExecuteErrorMsg(operation.asSummaryString()), e);
}
}

@VisibleForTesting
public TableImpl createTable(QueryOperation tableOperation) {
return TableImpl.createTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@ public Optional<Catalog> getCatalog(String catalogName) {
return Optional.ofNullable(catalogs.get(catalogName));
}

public Catalog getCatalogOrThrowException(String catalogName) {
return getCatalog(catalogName)
.orElseThrow(
() ->
new ValidationException(
String.format("Catalog %s does not exist", catalogName)));
}

/**
* Gets the current catalog that will be used when resolving table path.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.module.ModuleManager;

/**
Expand Down Expand Up @@ -53,6 +54,8 @@ interface Context {

CatalogManager getCatalogManager();

FunctionCatalog getFunctionCatalog();

ModuleManager getModuleManager();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

Expand Down Expand Up @@ -68,4 +74,28 @@ public boolean isTemporary() {
public String getFunctionName() {
return this.functionIdentifier.getObjectName();
}

@Override
public TableResultInternal execute(Context ctx) {
try {
if (isTemporary()) {
ctx.getFunctionCatalog()
.dropTempCatalogFunction(getFunctionIdentifier(), isIfExists());
} else {
Catalog catalog =
ctx.getCatalogManager()
.getCatalogOrThrowException(
getFunctionIdentifier().getCatalogName());

catalog.dropFunction(getFunctionIdentifier().toObjectPath(), isIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} catch (ValidationException e) {
throw e;
} catch (FunctionNotExistException e) {
throw new ValidationException(e.getMessage(), e);
} catch (Exception e) {
throw new TableException(String.format("Could not execute %s", asSummaryString()), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.OperationUtils;

Expand Down Expand Up @@ -51,4 +55,15 @@ public String asSummaryString() {
return OperationUtils.formatWithChildren(
"DROP CATALOG", params, Collections.emptyList(), Operation::asSummaryString);
}

@Override
public TableResultInternal execute(Context ctx) {
try {
ctx.getCatalogManager().unregisterCatalog(getCatalogName(), isIfExists());
return TableResultImpl.TABLE_RESULT_OK;
} catch (CatalogException e) {
throw new ValidationException(
String.format("Could not execute %s", asSummaryString()), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;

/** Operation to describe a DROP DATABASE statement. */
public class DropDatabaseOperation implements DropOperation {
private final String catalogName;
Expand Down Expand Up @@ -57,4 +65,18 @@ public String asSummaryString() {
summaryString.append(cascade ? " CASCADE" : " RESTRICT");
return summaryString.toString();
}

@Override
public TableResultInternal execute(Context ctx) {
Catalog catalog = ctx.getCatalogManager().getCatalogOrThrowException(getCatalogName());
try {
catalog.dropDatabase(getDatabaseName(), isIfExists(), isCascade());
return TableResultImpl.TABLE_RESULT_OK;
} catch (DatabaseNotExistException | DatabaseNotEmptyException e) {
throw new ValidationException(
String.format("Could not execute %s", asSummaryString()), e);
} catch (Exception e) {
throw new TableException(String.format("Could not execute %s", asSummaryString()), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.operations.ddl;

import org.apache.flink.table.operations.ExecutableOperation;
import org.apache.flink.table.operations.Operation;

/**
Expand All @@ -26,4 +27,4 @@
* <p>Different sub operations can have their special target name. For example, a drop table
* operation may have a target table name and a flag to describe if is exists.
*/
public interface DropOperation extends Operation {}
public interface DropOperation extends Operation, ExecutableOperation {}
Loading

0 comments on commit 06be368

Please sign in to comment.