Skip to content

Commit

Permalink
[FLINK-21808][hive] Support DQL/DML in HiveParser
Browse files Browse the repository at this point in the history
This closes apache#15253
  • Loading branch information
lirui-apache authored and wuchong committed Apr 1, 2021
1 parent 730ebe5 commit 04bbf03
Show file tree
Hide file tree
Showing 65 changed files with 16,780 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
Expand Down Expand Up @@ -207,4 +209,30 @@ void createTableWithConstraints(
/** Create orc {@link BulkWriter.Factory} for different hive versions. */
BulkWriter.Factory<RowData> createOrcBulkWriterFactory(
Configuration conf, String schema, LogicalType[] fieldTypes);

/** Checks whether a hive table is a materialized view. */
default boolean isMaterializedView(org.apache.hadoop.hive.ql.metadata.Table table) {
return false;
}

default PrimitiveTypeInfo getIntervalYearMonthTypeInfo() {
throw new UnsupportedOperationException(
"INTERVAL YEAR MONTH type not supported until 1.2.0");
}

default PrimitiveTypeInfo getIntervalDayTimeTypeInfo() {
throw new UnsupportedOperationException("INTERVAL DAY TIME type not supported until 1.2.0");
}

default boolean isIntervalYearMonthType(
PrimitiveObjectInspector.PrimitiveCategory primitiveCategory) {
return false;
}

default boolean isIntervalDayTimeType(
PrimitiveObjectInspector.PrimitiveCategory primitiveCategory) {
return false;
}

void registerTemporaryFunction(String funcName, Class funcClass);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.orc.nohive.OrcNoHiveBulkWriterFactory;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
Expand Down Expand Up @@ -93,6 +94,12 @@
/** Shim for Hive version 1.0.0. */
public class HiveShimV100 implements HiveShim {

private static final Method registerTemporaryFunction =
HiveReflectionUtils.tryGetMethod(
FunctionRegistry.class,
"registerTemporaryFunction",
new Class[] {String.class, Class.class});

@Override
public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
try {
Expand Down Expand Up @@ -409,6 +416,15 @@ public BulkWriter.Factory<RowData> createOrcBulkWriterFactory(
return new OrcNoHiveBulkWriterFactory(conf, schema, fieldTypes);
}

@Override
public void registerTemporaryFunction(String funcName, Class funcClass) {
try {
registerTemporaryFunction.invoke(null, funcName, funcClass);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new FlinkHiveException("Failed to register temp function", e);
}
}

boolean isBuiltInFunctionInfo(FunctionInfo info) {
return info.isNative();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.catalog.hive.client;

import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
import org.apache.flink.table.catalog.stats.Date;
Expand All @@ -31,9 +32,14 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.thrift.TException;

import java.lang.reflect.Array;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Set;
Expand All @@ -42,6 +48,43 @@
/** Shim for Hive version 1.2.0. */
public class HiveShimV120 extends HiveShimV111 {

private static PrimitiveTypeInfo intervalYearMonthTypeInfo;
private static PrimitiveTypeInfo intervalDayTimeTypeInfo;
private static Class funcResourceClz;
private static Method registerTemporaryUDF;

private static boolean inited = false;

private static void init() {
if (!inited) {
synchronized (HiveShimV120.class) {
if (!inited) {
try {
Field field =
TypeInfoFactory.class.getDeclaredField("intervalYearMonthTypeInfo");
intervalYearMonthTypeInfo = (PrimitiveTypeInfo) field.get(null);
field = TypeInfoFactory.class.getDeclaredField("intervalDayTimeTypeInfo");
intervalDayTimeTypeInfo = (PrimitiveTypeInfo) field.get(null);
funcResourceClz =
Thread.currentThread()
.getContextClassLoader()
.loadClass(
"org.apache.hadoop.hive.ql.exec.FunctionInfo$FunctionResource");
registerTemporaryUDF =
FunctionRegistry.class.getDeclaredMethod(
"registerTemporaryUDF",
String.class,
Class.class,
Array.newInstance(funcResourceClz, 0).getClass());
inited = true;
} catch (Exception e) {
throw new FlinkHiveException(e);
}
}
}
}
}

@Override
public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
try {
Expand Down Expand Up @@ -187,4 +230,39 @@ boolean isBuiltInFunctionInfo(FunctionInfo info) {
throw new CatalogException("Failed to invoke FunctionInfo.isBuiltIn()", ex);
}
}

@Override
public PrimitiveTypeInfo getIntervalYearMonthTypeInfo() {
init();
return intervalYearMonthTypeInfo;
}

@Override
public PrimitiveTypeInfo getIntervalDayTimeTypeInfo() {
init();
return intervalDayTimeTypeInfo;
}

@Override
public boolean isIntervalYearMonthType(
PrimitiveObjectInspector.PrimitiveCategory primitiveCategory) {
return getIntervalYearMonthTypeInfo().getPrimitiveCategory() == primitiveCategory;
}

@Override
public boolean isIntervalDayTimeType(
PrimitiveObjectInspector.PrimitiveCategory primitiveCategory) {
return getIntervalDayTimeTypeInfo().getPrimitiveCategory() == primitiveCategory;
}

@Override
public void registerTemporaryFunction(String funcName, Class funcClass) {
init();
try {
registerTemporaryUDF.invoke(
null, funcName, funcClass, Array.newInstance(funcResourceClz, 0));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new FlinkHiveException("Failed to register temp function", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.catalog.hive.client;

import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.table.catalog.exceptions.CatalogException;

import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -40,6 +41,27 @@
/** Shim for Hive version 2.3.0. */
public class HiveShimV230 extends HiveShimV220 {

private static Method isMaterializedView;

private static boolean inited = false;

private static void init() {
if (!inited) {
synchronized (HiveShimV230.class) {
if (!inited) {
try {
isMaterializedView =
org.apache.hadoop.hive.ql.metadata.Table.class.getDeclaredMethod(
"isMaterializedView");
inited = true;
} catch (Exception e) {
throw new FlinkHiveException(e);
}
}
}
}
}

@Override
public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
try {
Expand Down Expand Up @@ -99,4 +121,14 @@ public SimpleGenericUDAFParameterInfo createUDAFParameterInfo(
throw new CatalogException("Failed to create SimpleGenericUDAFParameterInfo", e);
}
}

@Override
public boolean isMaterializedView(org.apache.hadoop.hive.ql.metadata.Table table) {
init();
try {
return (boolean) isMaterializedView.invoke(table);
} catch (Exception e) {
throw new FlinkHiveException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.NullType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.SmallIntType;
import org.apache.flink.table.types.logical.TimestampType;
Expand Down Expand Up @@ -359,6 +360,11 @@ public TypeInfo visit(RowType rowType) {
return TypeInfoFactory.getStructTypeInfo(names, typeInfos);
}

@Override
public TypeInfo visit(NullType nullType) {
return TypeInfoFactory.voidTypeInfo;
}

@Override
protected TypeInfo defaultMethod(LogicalType logicalType) {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private void init() throws HiveException {
initialized = true;
}

private GenericUDAFEvaluator createEvaluator(ObjectInspector[] inputInspectors)
public GenericUDAFEvaluator createEvaluator(ObjectInspector[] inputInspectors)
throws SemanticException {
GenericUDAFResolver2 resolver;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public static HiveObjectConversion getConversion(
|| inspector instanceof LongObjectInspector
|| inspector instanceof FloatObjectInspector
|| inspector instanceof DoubleObjectInspector
|| inspector instanceof BinaryObjectInspector) {
|| inspector instanceof BinaryObjectInspector
|| inspector instanceof VoidObjectInspector) {
conversion = IdentityConversion.INSTANCE;
} else if (inspector instanceof DateObjectInspector) {
conversion = hiveShim::toHiveDate;
Expand Down Expand Up @@ -467,7 +468,7 @@ private static ObjectInspector getObjectInspectorForPrimitiveConstant(
}
}

private static ObjectInspector getObjectInspector(TypeInfo type) {
public static ObjectInspector getObjectInspector(TypeInfo type) {
switch (type.getCategory()) {
case PRIMITIVE:
PrimitiveTypeInfo primitiveType = (PrimitiveTypeInfo) type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.hive.udf.generic.HiveGenericUDFGrouping;
import org.apache.flink.util.StringUtils;

import org.apache.hadoop.hive.ql.exec.FunctionInfo;
Expand All @@ -46,20 +47,23 @@ public class HiveModule implements Module {
new HashSet<>(
Arrays.asList(
"count",
"cume_dist",
"current_date",
"current_timestamp",
"dense_rank",
"first_value",
"lag",
"last_value",
"lead",
"ntile",
"rank",
"row_number",
"hop",
"hop_end",
"hop_proctime",
"hop_rowtime",
"hop_start",
"percent_rank",
"session",
"session_end",
"session_proctime",
Expand Down Expand Up @@ -96,6 +100,7 @@ public Set<String> listFunctions() {
if (functionNames.isEmpty()) {
functionNames = hiveShim.listBuiltInFunctions();
functionNames.removeAll(BUILT_IN_FUNC_BLACKLIST);
functionNames.add("grouping");
}
return functionNames;
}
Expand All @@ -105,6 +110,13 @@ public Optional<FunctionDefinition> getFunctionDefinition(String name) {
if (BUILT_IN_FUNC_BLACKLIST.contains(name)) {
return Optional.empty();
}
// We override Hive's grouping function. Refer to the implementation for more details.
if (name.equalsIgnoreCase("grouping")) {
return Optional.of(
factory.createFunctionDefinitionFromHiveFunction(
name, HiveGenericUDFGrouping.class.getName()));
}

Optional<FunctionInfo> info = hiveShim.getBuiltInFunctionInfo(name);

return info.map(
Expand Down
Loading

0 comments on commit 04bbf03

Please sign in to comment.