Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lirui-apache committed Mar 15, 2021
1 parent 4a831d6 commit e971a1d
Show file tree
Hide file tree
Showing 20 changed files with 75 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.CatalogViewImpl;
import org.apache.flink.table.catalog.FunctionCatalog.InlineCatalogFunction;
import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
Expand All @@ -51,6 +50,7 @@
import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.Operation;
Expand All @@ -72,16 +72,25 @@
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemInlineFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateViewOperation;
import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
import org.apache.flink.table.operations.ddl.DropPartitionsOperation;
import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.planner.delegation.hive.HiveParserCreateTableDesc.NotNullConstraint;
import org.apache.flink.table.planner.delegation.hive.HiveParserCreateTableDesc.PrimaryKey;
import org.apache.flink.table.planner.delegation.hive.desc.DropPartitionDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserAlterDatabaseDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserAlterTableDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc.NotNullConstraint;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc.PrimaryKey;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateViewDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserDropDatabaseDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserDropFunctionDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserDropTableDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserShowTablesDesc;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserBaseSemanticAnalyzer;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserStorageFormat;
import org.apache.flink.table.planner.utils.OperationConverterUtils;
Expand Down Expand Up @@ -142,10 +151,12 @@
/** A converter to generate DDL operations. */
public class DDLOperationConverter {

private final Parser parser;
private final CatalogManager catalogManager;
private final HiveFunctionDefinitionFactory funcDefFactory;

public DDLOperationConverter(CatalogManager catalogManager, HiveShim hiveShim) {
public DDLOperationConverter(Parser parser, CatalogManager catalogManager, HiveShim hiveShim) {
this.parser = parser;
this.catalogManager = catalogManager;
this.funcDefFactory = new HiveFunctionDefinitionFactory(hiveShim);
}
Expand Down Expand Up @@ -240,8 +251,8 @@ private Operation convertCreateFunction(CreateFunctionDesc desc) {
funcDefFactory.createFunctionDefinition(
desc.getFunctionName(),
new CatalogFunctionImpl(desc.getClassName(), FunctionLanguage.JAVA));
return new CreateTempSystemInlineFunctionOperation(
desc.getFunctionName(), false, new InlineCatalogFunction(funcDefinition));
return new CreateTempSystemFunctionOperation(
desc.getFunctionName(), false, funcDefinition);
} else {
ObjectIdentifier identifier = parseObjectIdentifier(desc.getFunctionName());
CatalogFunction catalogFunction =
Expand Down Expand Up @@ -318,7 +329,7 @@ private Operation convertDropPartitions(DropPartitionDesc desc) {
UnresolvedIdentifier.of(desc.getDbName(), desc.getTableName()));
CatalogBaseTable catalogBaseTable = getCatalogBaseTable(tableIdentifier);
if (catalogBaseTable instanceof CatalogView) {
throw new ValidationException("ALTER TABLE for a view is not allowed");
throw new ValidationException("DROP PARTITION for a view is not supported");
}
List<CatalogPartitionSpec> specs =
desc.getSpecs().stream()
Expand All @@ -335,7 +346,7 @@ private Operation convertAddPartitions(AddPartitionDesc desc) {
UnresolvedIdentifier.of(desc.getDbName(), desc.getTableName()));
CatalogBaseTable catalogBaseTable = getCatalogBaseTable(tableIdentifier);
if (catalogBaseTable instanceof CatalogView) {
throw new ValidationException("ALTER TABLE for a view is not allowed");
throw new ValidationException("ADD PARTITION for a view is not supported");
}
List<CatalogPartitionSpec> specs = new ArrayList<>();
List<CatalogPartition> partitions = new ArrayList<>();
Expand Down Expand Up @@ -746,8 +757,7 @@ private CatalogPartition getPartition(
}

private ObjectIdentifier parseObjectIdentifier(String compoundName) {
UnresolvedIdentifier unresolvedIdentifier =
UnresolvedIdentifier.of(compoundName.split("\\."));
UnresolvedIdentifier unresolvedIdentifier = parser.parseIdentifier(compoundName);
return catalogManager.qualifyIdentifier(unresolvedIdentifier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.flink.table.planner.calcite.SqlExprToRexConverter;
import org.apache.flink.table.planner.delegation.ParserImpl;
import org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.planner.delegation.hive.desc.CreateTableASDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateViewDesc;
import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParseException;
import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParseUtils;
import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser;
Expand Down Expand Up @@ -200,8 +202,6 @@ private List<Operation> processCmd(
final HiveParserContext context = new HiveParserContext(hiveConf);
// parse statement to get AST
final ASTNode node = HiveASTParseUtils.parse(cmd, context);
// generate Calcite plan
Operation operation;
if (DDL_NODES.contains(node.getType())) {
HiveParserQueryState queryState = new HiveParserQueryState(hiveConf);
HiveParserDDLSemanticAnalyzer ddlAnalyzer =
Expand All @@ -212,7 +212,7 @@ private List<Operation> processCmd(
getCatalogManager().getCurrentDatabase());
Serializable work = ddlAnalyzer.analyzeInternal(node);
DDLOperationConverter ddlConverter =
new DDLOperationConverter(getCatalogManager(), hiveShim);
new DDLOperationConverter(this, getCatalogManager(), hiveShim);
if (work instanceof HiveParserCreateViewDesc) {
return super.parse(cmd);
} else if (work instanceof CreateTableASDesc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;
package org.apache.flink.table.planner.delegation.hive.desc;

import org.apache.hadoop.hive.ql.parse.ASTNode;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;
package org.apache.flink.table.planner.delegation.hive.desc;

import java.io.Serializable;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;
package org.apache.flink.table.planner.delegation.hive.desc;

import org.apache.hadoop.hive.ql.plan.PrincipalDesc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;
package org.apache.flink.table.planner.delegation.hive.desc;

import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;
package org.apache.flink.table.planner.delegation.hive.desc;

import org.apache.flink.table.planner.delegation.hive.parse.HiveParserBaseSemanticAnalyzer;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserStorageFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;
package org.apache.flink.table.planner.delegation.hive.desc;

import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.parse.ASTNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;
package org.apache.flink.table.planner.delegation.hive.desc;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;
package org.apache.flink.table.planner.delegation.hive.desc;

import org.apache.hadoop.hive.ql.plan.DropFunctionDesc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;
package org.apache.flink.table.planner.delegation.hive.desc;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.planner.delegation.hive;
package org.apache.flink.table.planner.delegation.hive.desc;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
import org.apache.flink.table.planner.delegation.hive.HiveParserCreateTableDesc.NotNullConstraint;
import org.apache.flink.table.planner.delegation.hive.HiveParserCreateTableDesc.PrimaryKey;
import org.apache.flink.table.planner.delegation.hive.HiveParserUtils;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc.NotNullConstraint;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc.PrimaryKey;

import org.antlr.runtime.tree.Tree;
import org.apache.calcite.util.ImmutableBitSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.planner.delegation.hive.CreateTableASDesc;
import org.apache.flink.table.planner.delegation.hive.DropPartitionDesc;
import org.apache.flink.table.planner.delegation.hive.HiveParserAlterDatabaseDesc;
import org.apache.flink.table.planner.delegation.hive.HiveParserAlterTableDesc;
import org.apache.flink.table.planner.delegation.hive.HiveParserAuthorizationParseUtils;
import org.apache.flink.table.planner.delegation.hive.HiveParserConstants;
import org.apache.flink.table.planner.delegation.hive.HiveParserContext;
import org.apache.flink.table.planner.delegation.hive.HiveParserCreateTableDesc;
import org.apache.flink.table.planner.delegation.hive.HiveParserCreateTableDesc.NotNullConstraint;
import org.apache.flink.table.planner.delegation.hive.HiveParserCreateTableDesc.PrimaryKey;
import org.apache.flink.table.planner.delegation.hive.HiveParserCreateViewDesc;
import org.apache.flink.table.planner.delegation.hive.HiveParserDropDatabaseDesc;
import org.apache.flink.table.planner.delegation.hive.HiveParserDropFunctionDesc;
import org.apache.flink.table.planner.delegation.hive.HiveParserDropTableDesc;
import org.apache.flink.table.planner.delegation.hive.HiveParserQueryState;
import org.apache.flink.table.planner.delegation.hive.HiveParserShowTablesDesc;
import org.apache.flink.table.planner.delegation.hive.desc.CreateTableASDesc;
import org.apache.flink.table.planner.delegation.hive.desc.DropPartitionDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserAlterDatabaseDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserAlterTableDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc.NotNullConstraint;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateTableDesc.PrimaryKey;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserCreateViewDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserDropDatabaseDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserDropFunctionDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserDropTableDesc;
import org.apache.flink.table.planner.delegation.hive.desc.HiveParserShowTablesDesc;

import org.antlr.runtime.tree.CommonTree;
import org.apache.hadoop.fs.Path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -300,7 +301,7 @@ public void testAlterTable() throws Exception {
ObjectPath tablePath = new ObjectPath("default", "tbl1");

// change properties
tableEnv.executeSql("alter table tbl1 set tblproperties ('k2'='v2')");
tableEnv.executeSql("alter table `default`.tbl1 set tblproperties ('k2'='v2')");
Table hiveTable = hiveCatalog.getHiveTable(tablePath);
assertEquals("v1", hiveTable.getParameters().get("k1"));
assertEquals("v2", hiveTable.getParameters().get("k2"));
Expand Down Expand Up @@ -546,9 +547,8 @@ public void testTemporaryFunction() throws Exception {
String.format(
"create temporary function temp_abs as '%s'",
GenericUDFAbs.class.getName()));
List<Row> functions =
CollectionUtil.iteratorToList(tableEnv.executeSql("show functions").collect());
assertTrue(functions.toString().contains("temp_abs"));
String[] functions = tableEnv.listUserDefinedFunctions();
assertArrayEquals(new String[] {"temp_abs"}, functions);
// call the function
tableEnv.executeSql("create table src(x int)");
tableEnv.executeSql("insert into src values (1),(-1)").await();
Expand All @@ -563,8 +563,8 @@ public void testTemporaryFunction() throws Exception {
queryResult(tableEnv.sqlQuery("select temp_abs(x) from `default`.src")).toString());
// drop the function
tableEnv.executeSql("drop temporary function temp_abs");
functions = CollectionUtil.iteratorToList(tableEnv.executeSql("show functions").collect());
assertFalse(functions.toString().contains("temp_abs"));
functions = tableEnv.listUserDefinedFunctions();
assertEquals(0, functions.length);
tableEnv.executeSql("drop temporary function if exists foo");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemInlineFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateViewOperation;
import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.DropCatalogOperation;
Expand Down Expand Up @@ -170,8 +169,7 @@ private static SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) {
} else if (operation instanceof ShowPartitionsOperation) {
cmd = SqlCommand.SHOW_PARTITIONS;
} else if (operation instanceof CreateCatalogFunctionOperation
|| operation instanceof CreateTempSystemFunctionOperation
|| operation instanceof CreateTempSystemInlineFunctionOperation) {
|| operation instanceof CreateTempSystemFunctionOperation) {
cmd = SqlCommand.CREATE_FUNCTION;
} else if (operation instanceof DropCatalogFunctionOperation
|| operation instanceof DropTempSystemFunctionOperation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemInlineFunctionOperation;
import org.apache.flink.table.operations.ddl.CreateViewOperation;
import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.DropCatalogOperation;
Expand Down Expand Up @@ -1056,8 +1055,6 @@ private TableResult executeOperation(Operation operation) {
}
} else if (operation instanceof CreateCatalogFunctionOperation) {
return createCatalogFunction((CreateCatalogFunctionOperation) operation);
} else if (operation instanceof CreateTempSystemInlineFunctionOperation) {
return createSystemInlineFunction((CreateTempSystemInlineFunctionOperation) operation);
} else if (operation instanceof CreateTempSystemFunctionOperation) {
return createSystemFunction((CreateTempSystemFunctionOperation) operation);
} else if (operation instanceof DropCatalogFunctionOperation) {
Expand Down Expand Up @@ -1639,29 +1636,12 @@ private TableResult dropCatalogFunction(
}
}

private TableResult createSystemInlineFunction(
CreateTempSystemInlineFunctionOperation operation) {
String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
try {
functionCatalog.registerTemporarySystemFunction(
operation.getFunctionName(),
operation.getCatalogFunction(),
operation.ifNotExists());
return TableResultImpl.TABLE_RESULT_OK;
} catch (ValidationException e) {
throw e;
} catch (Exception e) {
throw new TableException(exMsg, e);
}
}

private TableResult createSystemFunction(CreateTempSystemFunctionOperation operation) {
String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());
try {
functionCatalog.registerTemporarySystemFunction(
operation.getFunctionName(),
operation.getFunctionClass(),
operation.getFunctionLanguage(),
operation.getCatalogFunction(),
operation.isIgnoreIfExists());
return TableResultImpl.TABLE_RESULT_OK;
} catch (ValidationException e) {
Expand Down
Loading

0 comments on commit e971a1d

Please sign in to comment.