diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 857affefc2e5a..e20bdebfce7df 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -415,6 +415,10 @@ under the License.
org.apache.hive
hive-hcatalog-core
+
+ org.apache.hive.hcatalog
+ hive-webhcat-java-client
+
org.apache.tez
tez-common
@@ -601,9 +605,20 @@ under the License.
hadoop-mapreduce-client-core
org.apache.hadoop
+
+ org.apache.hadoop
+ hadoop-yarn-server-resourcemanager
+
+
+ org.apache.hive.hcatalog
+ hive-webhcat-java-client
+ ${hive.version}
+ test
+
+
org.apache.flink
@@ -652,6 +667,14 @@ under the License.
+
+ hive-1.1.1
+
+ 1.1.1
+ 2.6.5
+ 3.1.1
+
+
hive-1.2.1
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 1e66551d3a1bf..49ff7441c1677 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -1120,6 +1120,10 @@ private boolean isTablePartitioned(Table hiveTable) {
public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
try {
Table hiveTable = getHiveTable(tablePath);
+ // the stats we put in table parameters will be overridden by HMS in older Hive versions, so error out
+ if (!isTablePartitioned(hiveTable) && hiveVersion.compareTo("1.2.1") < 0) {
+ throw new CatalogException("Alter table stats is not supported in Hive version " + hiveVersion);
+ }
// Set table stats
if (compareAndUpdateStatisticsProperties(tableStatistics, hiveTable.getParameters())) {
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
@@ -1139,7 +1143,7 @@ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatis
Table hiveTable = getHiveTable(tablePath);
// Set table column stats. This only works for non-partitioned tables.
if (!isTablePartitioned(hiveTable)) {
- client.updateTableColumnStatistics(HiveStatsUtil.createTableColumnStats(hiveTable, columnStatistics.getColumnStatisticsData()));
+ client.updateTableColumnStatistics(HiveStatsUtil.createTableColumnStats(hiveTable, columnStatistics.getColumnStatisticsData(), hiveVersion));
} else {
throw new TablePartitionedException(getName(), tablePath);
}
@@ -1204,7 +1208,8 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio
Partition hivePartition = getHivePartition(tablePath, partitionSpec);
Table hiveTable = getHiveTable(tablePath);
String partName = getPartitionName(tablePath, partitionSpec, hiveTable);
- client.updatePartitionColumnStatistics(HiveStatsUtil.createPartitionColumnStats(hivePartition, partName, columnStatistics.getColumnStatisticsData()));
+ client.updatePartitionColumnStatistics(HiveStatsUtil.createPartitionColumnStats(
+ hivePartition, partName, columnStatistics.getColumnStatisticsData(), hiveVersion));
} catch (TableNotExistException | PartitionSpecInvalidException e) {
if (!ignoreIfNotExists) {
throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
@@ -1243,7 +1248,7 @@ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) th
if (!isTablePartitioned(hiveTable)) {
List columnStatisticsObjs = client.getTableColumnStatistics(
hiveTable.getDbName(), hiveTable.getTableName(), getFieldNames(hiveTable.getSd().getCols()));
- return new CatalogColumnStatistics(HiveStatsUtil.createCatalogColumnStats(columnStatisticsObjs));
+ return new CatalogColumnStatistics(HiveStatsUtil.createCatalogColumnStats(columnStatisticsObjs, hiveVersion));
} else {
// TableColumnStats of partitioned table is unknown, the behavior is same as HIVE
return CatalogColumnStatistics.UNKNOWN;
@@ -1280,7 +1285,7 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath
getFieldNames(partition.getSd().getCols()));
List columnStatisticsObjs = partitionColumnStatistics.get(partName);
if (columnStatisticsObjs != null && !columnStatisticsObjs.isEmpty()) {
- return new CatalogColumnStatistics(HiveStatsUtil.createCatalogColumnStats(columnStatisticsObjs));
+ return new CatalogColumnStatistics(HiveStatsUtil.createCatalogColumnStats(columnStatisticsObjs, hiveVersion));
} else {
return CatalogColumnStatistics.UNKNOWN;
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 1f39d844ed2a6..2111099afb834 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -18,12 +18,15 @@
package org.apache.flink.table.catalog.hive.client;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -33,16 +36,18 @@
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.thrift.TException;
import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
/**
* A shim layer to support different versions of Hive.
*/
-public interface HiveShim {
+public interface HiveShim extends Serializable {
/**
* Create a Hive Metastore client based on the given HiveConf object.
@@ -159,4 +164,24 @@ SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params,
*/
void makeSpecFromName(Map partSpec, Path currPath);
+ /**
+ * Get ObjectInspector for a constant value.
+ */
+ ObjectInspector getObjectInspectorForConstant(PrimitiveTypeInfo primitiveTypeInfo, Object value);
+
+ /**
+ * Generate Hive ColumnStatisticsData from Flink CatalogColumnStatisticsDataDate for DATE columns.
+ */
+ ColumnStatisticsData toHiveDateColStats(CatalogColumnStatisticsDataDate flinkDateColStats);
+
+ /**
+ * Whether a Hive ColumnStatisticsData is for DATE columns.
+ */
+ boolean isDateStats(ColumnStatisticsData colStatsData);
+
+ /**
+ * Generate Flink CatalogColumnStatisticsDataDate from Hive ColumnStatisticsData for DATE columns.
+ */
+ CatalogColumnStatisticsDataDate toFlinkDateColStats(ColumnStatisticsData hiveDateColStats);
+
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
index b3e3903385a74..1645d081a1138 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
@@ -32,6 +32,8 @@
*/
public class HiveShimLoader {
+ public static final String HIVE_VERSION_V1_1_0 = "1.1.0";
+ public static final String HIVE_VERSION_V1_1_1 = "1.1.1";
public static final String HIVE_VERSION_V1_2_0 = "1.2.0";
public static final String HIVE_VERSION_V1_2_1 = "1.2.1";
public static final String HIVE_VERSION_V1_2_2 = "1.2.2";
@@ -60,6 +62,12 @@ private HiveShimLoader() {
public static HiveShim loadHiveShim(String version) {
return hiveShims.computeIfAbsent(version, (v) -> {
+ if (v.startsWith(HIVE_VERSION_V1_1_0)) {
+ return new HiveShimV110();
+ }
+ if (v.startsWith(HIVE_VERSION_V1_1_1)) {
+ return new HiveShimV111();
+ }
if (v.startsWith(HIVE_VERSION_V1_2_0)) {
return new HiveShimV120();
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
new file mode 100644
index 0000000000000..91fd80af9a9cd
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV110.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.client;
+
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBinaryObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantBooleanObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantByteObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDateObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantDoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveCharObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantHiveVarcharObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantLongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantStringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantTimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Shim for Hive version 1.1.0.
+ */
+public class HiveShimV110 implements HiveShim {
+
+ @Override
+ public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
+ try {
+ return new HiveMetaStoreClient(hiveConf);
+ } catch (MetaException ex) {
+ throw new CatalogException("Failed to create Hive Metastore client", ex);
+ }
+ }
+
+ @Override
+ // 1.x client doesn't support filtering tables by type, so here we need to get all tables and filter by ourselves
+ public List getViews(IMetaStoreClient client, String databaseName) throws UnknownDBException, TException {
+ // We don't have to use reflection here because client.getAllTables(String) is supposed to be there for
+ // all versions.
+ List tableNames = client.getAllTables(databaseName);
+ List views = new ArrayList<>();
+ for (String name : tableNames) {
+ Table table = client.getTable(databaseName, name);
+ String viewDef = table.getViewOriginalText();
+ if (viewDef != null && !viewDef.isEmpty()) {
+ views.add(table.getTableName());
+ }
+ }
+ return views;
+ }
+
+ @Override
+ public Function getFunction(IMetaStoreClient client, String dbName, String functionName) throws NoSuchObjectException, TException {
+ try {
+ // hive-1.x doesn't throw NoSuchObjectException if function doesn't exist, instead it throws a MetaException
+ return client.getFunction(dbName, functionName);
+ } catch (MetaException e) {
+ // need to check the cause and message of this MetaException to decide whether it should actually be a NoSuchObjectException
+ if (e.getCause() instanceof NoSuchObjectException) {
+ throw (NoSuchObjectException) e.getCause();
+ }
+ if (e.getMessage().startsWith(NoSuchObjectException.class.getSimpleName())) {
+ throw new NoSuchObjectException(e.getMessage());
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean moveToTrash(FileSystem fs, Path path, Configuration conf, boolean purge) throws IOException {
+ try {
+ Method method = FileUtils.class.getDeclaredMethod("moveToTrash", FileSystem.class, Path.class, Configuration.class);
+ return (boolean) method.invoke(null, fs, path, conf);
+ } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
+ throw new IOException("Failed to move " + path + " to trash", e);
+ }
+ }
+
+ @Override
+ public void alterTable(IMetaStoreClient client, String databaseName, String tableName, Table table) throws InvalidOperationException, MetaException, TException {
+ client.alter_table(databaseName, tableName, table);
+ }
+
+ @Override
+ public void alterPartition(IMetaStoreClient client, String databaseName, String tableName, Partition partition)
+ throws InvalidOperationException, MetaException, TException {
+ String errorMsg = "Failed to alter partition for table %s in database %s";
+ try {
+ Method method = client.getClass().getMethod("alter_partition", String.class, String.class, Partition.class);
+ method.invoke(client, databaseName, tableName, partition);
+ } catch (InvocationTargetException ite) {
+ Throwable targetEx = ite.getTargetException();
+ if (targetEx instanceof TException) {
+ throw (TException) targetEx;
+ } else {
+ throw new CatalogException(String.format(errorMsg, tableName, databaseName), targetEx);
+ }
+ } catch (NoSuchMethodException | IllegalAccessException e) {
+ throw new CatalogException(String.format(errorMsg, tableName, databaseName), e);
+ }
+ }
+
+ @Override
+ public SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns) {
+ try {
+ Constructor constructor = SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class,
+ boolean.class, boolean.class);
+ return (SimpleGenericUDAFParameterInfo) constructor.newInstance(params, distinct, allColumns);
+ } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
+ throw new CatalogException("Failed to create SimpleGenericUDAFParameterInfo", e);
+ }
+ }
+
+ @Override
+ public Class> getMetaStoreUtilsClass() {
+ try {
+ return Class.forName("org.apache.hadoop.hive.metastore.MetaStoreUtils");
+ } catch (ClassNotFoundException e) {
+ throw new CatalogException("Failed to find class MetaStoreUtils", e);
+ }
+ }
+
+ @Override
+ public Class> getHiveMetaStoreUtilsClass() {
+ return getMetaStoreUtilsClass();
+ }
+
+ @Override
+ public Class> getDateDataTypeClass() {
+ return java.sql.Date.class;
+ }
+
+ @Override
+ public Class> getTimestampDataTypeClass() {
+ return java.sql.Timestamp.class;
+ }
+
+ @Override
+ public FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs) throws IOException {
+ try {
+ Method method = HiveStatsUtils.class.getMethod("getFileStatusRecurse", Path.class, Integer.TYPE, FileSystem.class);
+ // getFileStatusRecurse is a static method
+ return (FileStatus[]) method.invoke(null, path, level, fs);
+ } catch (Exception ex) {
+ throw new CatalogException("Failed to invoke HiveStatsUtils.getFileStatusRecurse()", ex);
+ }
+ }
+
+ @Override
+ public void makeSpecFromName(Map partSpec, Path currPath) {
+ try {
+ Method method = Warehouse.class.getMethod("makeSpecFromName", Map.class, Path.class);
+ // makeSpecFromName is a static method
+ method.invoke(null, partSpec, currPath);
+ } catch (Exception ex) {
+ throw new CatalogException("Failed to invoke Warehouse.makeSpecFromName()", ex);
+ }
+ }
+
+ @Override
+ public ObjectInspector getObjectInspectorForConstant(PrimitiveTypeInfo primitiveTypeInfo, Object value) {
+ String className;
+ value = HiveInspectors.hivePrimitiveToWritable(value);
+ // Java constant object inspectors are not available until 1.2.0 -- https://issues.apache.org/jira/browse/HIVE-9766
+ // So we have to use writable constant object inspectors for 1.1.x
+ switch (primitiveTypeInfo.getPrimitiveCategory()) {
+ case BOOLEAN:
+ className = WritableConstantBooleanObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case BYTE:
+ className = WritableConstantByteObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case SHORT:
+ className = WritableConstantShortObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case INT:
+ className = WritableConstantIntObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case LONG:
+ className = WritableConstantLongObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case FLOAT:
+ className = WritableConstantFloatObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case DOUBLE:
+ className = WritableConstantDoubleObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case STRING:
+ className = WritableConstantStringObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case CHAR:
+ className = WritableConstantHiveCharObjectInspector.class.getName();
+ try {
+ return (ObjectInspector) Class.forName(className).getDeclaredConstructor(
+ CharTypeInfo.class, value.getClass()).newInstance(primitiveTypeInfo, value);
+ } catch (Exception e) {
+ throw new FlinkHiveUDFException("Failed to create writable constant object inspector", e);
+ }
+ case VARCHAR:
+ className = WritableConstantHiveVarcharObjectInspector.class.getName();
+ try {
+ return (ObjectInspector) Class.forName(className).getDeclaredConstructor(
+ VarcharTypeInfo.class, value.getClass()).newInstance(primitiveTypeInfo, value);
+ } catch (Exception e) {
+ throw new FlinkHiveUDFException("Failed to create writable constant object inspector", e);
+ }
+ case DATE:
+ className = WritableConstantDateObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case TIMESTAMP:
+ className = WritableConstantTimestampObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case DECIMAL:
+ className = WritableConstantHiveDecimalObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case BINARY:
+ className = WritableConstantBinaryObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case UNKNOWN:
+ case VOID:
+ // If type is null, we use the Constant String to replace
+ className = WritableConstantStringObjectInspector.class.getName();
+ return HiveReflectionUtils.createConstantObjectInspector(className, value.toString());
+ default:
+ throw new FlinkHiveUDFException(
+ String.format("Cannot find ConstantObjectInspector for %s", primitiveTypeInfo));
+ }
+ }
+
+ @Override
+ public ColumnStatisticsData toHiveDateColStats(CatalogColumnStatisticsDataDate flinkDateColStats) {
+ throw new UnsupportedOperationException("DATE column stats are not supported until Hive 1.2.0");
+ }
+
+ @Override
+ public boolean isDateStats(ColumnStatisticsData colStatsData) {
+ return false;
+ }
+
+ @Override
+ public CatalogColumnStatisticsDataDate toFlinkDateColStats(ColumnStatisticsData hiveDateColStats) {
+ throw new UnsupportedOperationException("DATE column stats are not supported until Hive 1.2.0");
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV111.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV111.java
new file mode 100644
index 0000000000000..bafa50a949bc5
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV111.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.client;
+
+/**
+ * Shim for Hive version 1.1.1.
+ */
+public class HiveShimV111 extends HiveShimV110 {
+
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java
index d1ed4440f119a..fc929476fd9ee 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java
@@ -19,41 +19,30 @@
package org.apache.flink.table.catalog.hive.client;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;
+import org.apache.flink.table.catalog.stats.Date;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.HiveStatsUtils;
-import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
-import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.UnknownDBException;
-import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.thrift.TException;
-import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
/**
* Shim for Hive version 1.2.0.
*/
-public class HiveShimV120 implements HiveShim {
+public class HiveShimV120 extends HiveShimV111 {
@Override
public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
@@ -66,132 +55,117 @@ public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
}
}
- @Override
- // 1.x client doesn't support filtering tables by type, so here we need to get all tables and filter by ourselves
- public List getViews(IMetaStoreClient client, String databaseName) throws UnknownDBException, TException {
- // We don't have to use reflection here because client.getAllTables(String) is supposed to be there for
- // all versions.
- List tableNames = client.getAllTables(databaseName);
- List views = new ArrayList<>();
- for (String name : tableNames) {
- Table table = client.getTable(databaseName, name);
- String viewDef = table.getViewOriginalText();
- if (viewDef != null && !viewDef.isEmpty()) {
- views.add(table.getTableName());
- }
- }
- return views;
- }
-
- @Override
- public Function getFunction(IMetaStoreClient client, String dbName, String functionName) throws NoSuchObjectException, TException {
- try {
- // hive-1.x doesn't throw NoSuchObjectException if function doesn't exist, instead it throws a MetaException
- return client.getFunction(dbName, functionName);
- } catch (MetaException e) {
- // need to check the cause and message of this MetaException to decide whether it should actually be a NoSuchObjectException
- if (e.getCause() instanceof NoSuchObjectException) {
- throw (NoSuchObjectException) e.getCause();
- }
- if (e.getMessage().startsWith(NoSuchObjectException.class.getSimpleName())) {
- throw new NoSuchObjectException(e.getMessage());
- }
- throw e;
- }
- }
-
- @Override
- public boolean moveToTrash(FileSystem fs, Path path, Configuration conf, boolean purge) throws IOException {
- try {
- Method method = FileUtils.class.getDeclaredMethod("moveToTrash", FileSystem.class, Path.class, Configuration.class);
- return (boolean) method.invoke(null, fs, path, conf);
- } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
- throw new IOException("Failed to move " + path + " to trash", e);
- }
- }
-
@Override
public void alterTable(IMetaStoreClient client, String databaseName, String tableName, Table table) throws InvalidOperationException, MetaException, TException {
- // For Hive-1.2.1, we need to tell HMS not to update stats. Otherwise, the stats we put in the table
+ // For Hive-1.2.x, we need to tell HMS not to update stats. Otherwise, the stats we put in the table
// parameters can be overridden. The extra config we add here will be removed by HMS after it's used.
- table.getParameters().put(StatsSetupConst.DO_NOT_UPDATE_STATS, "true");
+ // Don't use StatsSetupConst.DO_NOT_UPDATE_STATS because it wasn't defined in Hive 1.1.x.
+ table.getParameters().put("DO_NOT_UPDATE_STATS", "true");
client.alter_table(databaseName, tableName, table);
}
@Override
- public void alterPartition(IMetaStoreClient client, String databaseName, String tableName, Partition partition)
- throws InvalidOperationException, MetaException, TException {
- String errorMsg = "Failed to alter partition for table %s in database %s";
- try {
- Method method = client.getClass().getMethod("alter_partition", String.class, String.class, Partition.class);
- method.invoke(client, databaseName, tableName, partition);
- } catch (InvocationTargetException ite) {
- Throwable targetEx = ite.getTargetException();
- if (targetEx instanceof TException) {
- throw (TException) targetEx;
- } else {
- throw new CatalogException(String.format(errorMsg, tableName, databaseName), targetEx);
- }
- } catch (NoSuchMethodException | IllegalAccessException e) {
- throw new CatalogException(String.format(errorMsg, tableName, databaseName), e);
- }
- }
-
- @Override
- public SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns) {
- try {
- Constructor constructor = SimpleGenericUDAFParameterInfo.class.getConstructor(ObjectInspector[].class,
- boolean.class, boolean.class);
- return (SimpleGenericUDAFParameterInfo) constructor.newInstance(params, distinct, allColumns);
- } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
- throw new CatalogException("Failed to create SimpleGenericUDAFParameterInfo", e);
+ public ObjectInspector getObjectInspectorForConstant(PrimitiveTypeInfo primitiveTypeInfo, Object value) {
+ String className;
+ switch (primitiveTypeInfo.getPrimitiveCategory()) {
+ case BOOLEAN:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBooleanObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case BYTE:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantByteObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case SHORT:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantShortObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case INT:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantIntObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case LONG:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantLongObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case FLOAT:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantFloatObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case DOUBLE:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantDoubleObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case STRING:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantStringObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case CHAR:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantHiveCharObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case VARCHAR:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantHiveVarcharObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case DATE:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantDateObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case TIMESTAMP:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantTimestampObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case DECIMAL:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantHiveDecimalObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case BINARY:
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBinaryObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ case UNKNOWN:
+ case VOID:
+ // If type is null, we use the Java Constant String to replace
+ className = "org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantStringObjectInspector";
+ return HiveReflectionUtils.createConstantObjectInspector(className, value.toString());
+ default:
+ throw new FlinkHiveUDFException(
+ String.format("Cannot find ConstantObjectInspector for %s", primitiveTypeInfo));
}
}
@Override
- public Class> getMetaStoreUtilsClass() {
+ public ColumnStatisticsData toHiveDateColStats(CatalogColumnStatisticsDataDate flinkDateColStats) {
try {
- return Class.forName("org.apache.hadoop.hive.metastore.MetaStoreUtils");
- } catch (ClassNotFoundException e) {
- throw new CatalogException("Failed to find class MetaStoreUtils", e);
+ Class dateStatsClz = Class.forName("org.apache.hadoop.hive.metastore.api.DateColumnStatsData");
+ Object dateStats = dateStatsClz.getDeclaredConstructor(long.class, long.class)
+ .newInstance(flinkDateColStats.getNullCount(), flinkDateColStats.getNdv());
+ Class hmsDateClz = Class.forName("org.apache.hadoop.hive.metastore.api.Date");
+ Method setHigh = dateStatsClz.getDeclaredMethod("setHighValue", hmsDateClz);
+ Method setLow = dateStatsClz.getDeclaredMethod("setLowValue", hmsDateClz);
+ Constructor hmsDateConstructor = hmsDateClz.getConstructor(long.class);
+ setHigh.invoke(dateStats, hmsDateConstructor.newInstance(flinkDateColStats.getMax().getDaysSinceEpoch()));
+ setLow.invoke(dateStats, hmsDateConstructor.newInstance(flinkDateColStats.getMin().getDaysSinceEpoch()));
+ Class colStatsClz = ColumnStatisticsData.class;
+ return (ColumnStatisticsData) colStatsClz.getDeclaredMethod("dateStats", dateStatsClz).invoke(null, dateStats);
+ } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
+ throw new CatalogException("Failed to create Hive statistics for date column", e);
}
}
@Override
- public Class> getHiveMetaStoreUtilsClass() {
- return getMetaStoreUtilsClass();
- }
-
- @Override
- public Class> getDateDataTypeClass() {
- return java.sql.Date.class;
- }
-
- @Override
- public Class> getTimestampDataTypeClass() {
- return java.sql.Timestamp.class;
- }
-
- @Override
- public FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs) throws IOException {
+ public boolean isDateStats(ColumnStatisticsData colStatsData) {
try {
- Method method = HiveStatsUtils.class.getMethod("getFileStatusRecurse", Path.class, Integer.TYPE, FileSystem.class);
- // getFileStatusRecurse is a static method
- return (FileStatus[]) method.invoke(null, path, level, fs);
- } catch (Exception ex) {
- throw new CatalogException("Failed to invoke HiveStatsUtils.getFileStatusRecurse()", ex);
+ Method method = ColumnStatisticsData.class.getDeclaredMethod("isSetDateStats");
+ return (boolean) method.invoke(colStatsData);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ throw new CatalogException("Failed to decide whether ColumnStatisticsData is for DATE column", e);
}
}
@Override
- public void makeSpecFromName(Map partSpec, Path currPath) {
+ public CatalogColumnStatisticsDataDate toFlinkDateColStats(ColumnStatisticsData hiveDateColStats) {
try {
- Method method = Warehouse.class.getMethod("makeSpecFromName", Map.class, Path.class);
- // makeSpecFromName is a static method
- method.invoke(null, partSpec, currPath);
- } catch (Exception ex) {
- throw new CatalogException("Failed to invoke Warehouse.makeSpecFromName()", ex);
+ Object dateStats = ColumnStatisticsData.class.getDeclaredMethod("getDateStats").invoke(hiveDateColStats);
+ Class dateStatsClz = dateStats.getClass();
+ long numDV = (long) dateStatsClz.getMethod("getNumDVs").invoke(dateStats);
+ long numNull = (long) dateStatsClz.getMethod("getNumNulls").invoke(dateStats);
+ Object hmsHighDate = dateStatsClz.getMethod("getHighValue").invoke(dateStats);
+ Object hmsLowDate = dateStatsClz.getMethod("getLowValue").invoke(dateStats);
+ Class hmsDateClz = hmsHighDate.getClass();
+ Method hmsDateDays = hmsDateClz.getMethod("getDaysSinceEpoch");
+ long highDateDays = (long) hmsDateDays.invoke(hmsHighDate);
+ long lowDateDays = (long) hmsDateDays.invoke(hmsLowDate);
+ return new CatalogColumnStatisticsDataDate(new Date(lowDateDays), new Date(highDateDays), numDV, numNull);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ throw new CatalogException("Failed to create Flink statistics for date column", e);
}
}
-
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
index 1941096c9b729..b03eb66ff896a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
@@ -28,8 +28,7 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantDateObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantTimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
@@ -88,25 +87,14 @@ public static List getPvals(HiveShim hiveShim, List partCol
}
}
- public static JavaConstantDateObjectInspector createJavaConstantDateObjectInspector(HiveShim hiveShim, Object value) {
- Constructor> meth = null;
+ public static ObjectInspector createConstantObjectInspector(String className, Object value) {
try {
- meth = JavaConstantDateObjectInspector.class.getDeclaredConstructor(hiveShim.getDateDataTypeClass());
- meth.setAccessible(true);
- return (JavaConstantDateObjectInspector) meth.newInstance(value);
- } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
- throw new FlinkHiveUDFException("Failed to instantiate JavaConstantDateObjectInspector");
- }
- }
-
- public static JavaConstantTimestampObjectInspector createJavaConstantTimestampObjectInspector(HiveShim hiveShim, Object value) {
- Constructor> meth = null;
- try {
- meth = JavaConstantTimestampObjectInspector.class.getDeclaredConstructor(hiveShim.getDateDataTypeClass());
- meth.setAccessible(true);
- return (JavaConstantTimestampObjectInspector) meth.newInstance(value);
- } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
- throw new FlinkHiveUDFException("Failed to instantiate JavaConstantTimestampObjectInspector");
+ Constructor> method = Class.forName(className).getDeclaredConstructor(value.getClass());
+ method.setAccessible(true);
+ return (ObjectInspector) method.newInstance(value);
+ } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException
+ | InvocationTargetException e) {
+ throw new FlinkHiveUDFException("Failed to instantiate JavaConstantDateObjectInspector", e);
}
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
index 232b5f6e2712f..7f0551f259005 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveStatsUtil.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.catalog.hive.util;
import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBoolean;
@@ -35,7 +37,6 @@
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
-import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
@@ -69,13 +70,13 @@ private HiveStatsUtil() {}
/**
* Create a map of Flink column stats from the given Hive column stats.
*/
- public static Map createCatalogColumnStats(@Nonnull List hiveColStats) {
+ public static Map createCatalogColumnStats(@Nonnull List hiveColStats, String hiveVersion) {
checkNotNull(hiveColStats, "hiveColStats can not be null");
Map colStats = new HashMap<>();
for (ColumnStatisticsObj colStatsObj : hiveColStats) {
CatalogColumnStatisticsDataBase columnStats = createTableColumnStats(
HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(colStatsObj.getColType())),
- colStatsObj.getStatsData());
+ colStatsObj.getStatsData(), hiveVersion);
colStats.put(colStatsObj.getColName(), columnStats);
}
@@ -87,9 +88,10 @@ public static Map createCatalogColumnSt
*/
public static ColumnStatistics createTableColumnStats(
Table hiveTable,
- Map colStats) {
+ Map colStats,
+ String hiveVersion) {
ColumnStatisticsDesc desc = new ColumnStatisticsDesc(true, hiveTable.getDbName(), hiveTable.getTableName());
- return createHiveColumnStatistics(colStats, hiveTable.getSd(), desc);
+ return createHiveColumnStatistics(colStats, hiveTable.getSd(), desc, hiveVersion);
}
/**
@@ -98,16 +100,18 @@ public static ColumnStatistics createTableColumnStats(
public static ColumnStatistics createPartitionColumnStats(
Partition hivePartition,
String partName,
- Map colStats) {
+ Map colStats,
+ String hiveVersion) {
ColumnStatisticsDesc desc = new ColumnStatisticsDesc(false, hivePartition.getDbName(), hivePartition.getTableName());
desc.setPartName(partName);
- return createHiveColumnStatistics(colStats, hivePartition.getSd(), desc);
+ return createHiveColumnStatistics(colStats, hivePartition.getSd(), desc, hiveVersion);
}
private static ColumnStatistics createHiveColumnStatistics(
Map colStats,
StorageDescriptor sd,
- ColumnStatisticsDesc desc) {
+ ColumnStatisticsDesc desc,
+ String hiveVersion) {
List colStatsList = new ArrayList<>();
for (FieldSchema field : sd.getCols()) {
@@ -115,8 +119,10 @@ private static ColumnStatistics createHiveColumnStatistics(
String hiveColType = field.getType();
CatalogColumnStatisticsDataBase flinkColStat = colStats.get(field.getName());
if (null != flinkColStat) {
- ColumnStatisticsData statsData =
- getColumnStatisticsData(HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(hiveColType)), flinkColStat);
+ ColumnStatisticsData statsData = getColumnStatisticsData(
+ HiveTypeUtil.toFlinkType(TypeInfoUtils.getTypeInfoFromTypeString(hiveColType)),
+ flinkColStat,
+ hiveVersion);
ColumnStatisticsObj columnStatisticsObj = new ColumnStatisticsObj(hiveColName, hiveColType, statsData);
colStatsList.add(columnStatisticsObj);
}
@@ -128,7 +134,8 @@ private static ColumnStatistics createHiveColumnStatistics(
/**
* Create Flink ColumnStats from Hive ColumnStatisticsData.
*/
- private static CatalogColumnStatisticsDataBase createTableColumnStats(DataType colType, ColumnStatisticsData stats) {
+ private static CatalogColumnStatisticsDataBase createTableColumnStats(DataType colType, ColumnStatisticsData stats, String hiveVersion) {
+ HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
if (stats.isSetBinaryStats()) {
BinaryColumnStatsData binaryStats = stats.getBinaryStats();
return new CatalogColumnStatisticsDataBinary(
@@ -141,13 +148,8 @@ private static CatalogColumnStatisticsDataBase createTableColumnStats(DataType c
booleanStats.getNumTrues(),
booleanStats.getNumFalses(),
booleanStats.getNumNulls());
- } else if (stats.isSetDateStats()) {
- DateColumnStatsData dateStats = stats.getDateStats();
- return new CatalogColumnStatisticsDataDate(
- new org.apache.flink.table.catalog.stats.Date(dateStats.getLowValue().getDaysSinceEpoch()),
- new org.apache.flink.table.catalog.stats.Date(dateStats.getHighValue().getDaysSinceEpoch()),
- dateStats.getNumDVs(),
- dateStats.getNumNulls());
+ } else if (hiveShim.isDateStats(stats)) {
+ return hiveShim.toFlinkDateColStats(stats);
} else if (stats.isSetDoubleStats()) {
DoubleColumnStatsData doubleStats = stats.getDoubleStats();
return new CatalogColumnStatisticsDataDouble(
@@ -180,7 +182,8 @@ private static CatalogColumnStatisticsDataBase createTableColumnStats(DataType c
* Note we currently assume that, in Flink, the max and min of ColumnStats will be same type as the Flink column type.
* For example, for SHORT and Long columns, the max and min of their ColumnStats should be of type SHORT and LONG.
*/
- private static ColumnStatisticsData getColumnStatisticsData(DataType colType, CatalogColumnStatisticsDataBase colStat) {
+ private static ColumnStatisticsData getColumnStatisticsData(DataType colType, CatalogColumnStatisticsDataBase colStat,
+ String hiveVersion) {
LogicalTypeRoot type = colType.getLogicalType().getTypeRoot();
if (type.equals(LogicalTypeRoot.CHAR)
|| type.equals(LogicalTypeRoot.VARCHAR)) {
@@ -222,11 +225,8 @@ private static ColumnStatisticsData getColumnStatisticsData(DataType colType, Ca
}
} else if (type.equals(LogicalTypeRoot.DATE)) {
if (colStat instanceof CatalogColumnStatisticsDataDate) {
- CatalogColumnStatisticsDataDate dateColumnStatsData = (CatalogColumnStatisticsDataDate) colStat;
- DateColumnStatsData dateStats = new DateColumnStatsData(dateColumnStatsData.getNullCount(), dateColumnStatsData.getNdv());
- dateStats.setHighValue(new org.apache.hadoop.hive.metastore.api.Date(dateColumnStatsData.getMax().getDaysSinceEpoch()));
- dateStats.setLowValue(new org.apache.hadoop.hive.metastore.api.Date(dateColumnStatsData.getMin().getDaysSinceEpoch()));
- return ColumnStatisticsData.dateStats(dateStats);
+ HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
+ return hiveShim.toHiveDateColStats((CatalogColumnStatisticsDataDate) colStat);
}
} else if (type.equals(LogicalTypeRoot.VARBINARY)
|| type.equals(LogicalTypeRoot.BINARY)) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
index 68c3ede224278..b5f7dacec64c3 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java
@@ -323,4 +323,13 @@ protected TypeInfo defaultMethod(LogicalType logicalType) {
String.format("Flink doesn't support converting type %s to Hive type yet.", dataType.toString()));
}
}
+
+ /**
+ * INTERVAL are not available in older versions. So better to have our own enum for primitive categories.
+ */
+ public enum HivePrimitiveCategory {
+ VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
+ DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,
+ UNKNOWN
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
index e970ace597f86..5b1ebdaa02a48 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
@@ -41,7 +41,7 @@ public class HiveGenericUDF extends HiveScalarFunction {
private static final Logger LOG = LoggerFactory.getLogger(HiveGenericUDF.class);
private transient GenericUDF.DeferredObject[] deferredObjects;
- private transient HiveShim hiveShim;
+ private HiveShim hiveShim;
public HiveGenericUDF(HiveFunctionWrapper hiveFunctionWrapper, HiveShim hiveShim) {
super(hiveFunctionWrapper);
@@ -56,9 +56,10 @@ public void openInternal() {
function = hiveFunctionWrapper.createFunction();
+ ObjectInspector[] argInspectors = HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes);
+
try {
- returnInspector = function.initializeAndFoldConstants(
- HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes));
+ returnInspector = function.initializeAndFoldConstants(argInspectors);
} catch (UDFArgumentException e) {
throw new FlinkHiveUDFException(e);
}
@@ -67,8 +68,7 @@ public void openInternal() {
for (int i = 0; i < deferredObjects.length; i++) {
deferredObjects[i] = new DeferredObjectAdapter(
- TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
- HiveTypeUtil.toHiveTypeInfo(argTypes[i])),
+ argInspectors[i],
argTypes[i].getLogicalType()
);
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
index a7dbc8b9a57f2..3578a83a71642 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
@@ -64,7 +64,7 @@ public class HiveGenericUDTF extends TableFunction implements HiveFunction
private transient boolean allIdentityConverter;
private transient HiveObjectConversion[] conversions;
- private transient HiveShim hiveShim;
+ private HiveShim hiveShim;
public HiveGenericUDTF(HiveFunctionWrapper hiveFunctionWrapper, HiveShim hiveShim) {
this.hiveFunctionWrapper = hiveFunctionWrapper;
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index e7e88b2fe0eda..377e1e981e685 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -22,7 +22,6 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.hive.client.HiveShim;
-import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
import org.apache.flink.table.types.DataType;
@@ -45,7 +44,6 @@
import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -64,18 +62,6 @@
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBinaryObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBooleanObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantByteObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantDoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantFloatObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantHiveCharObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantHiveDecimalObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantHiveVarcharObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantIntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantLongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantShortObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantStringObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
@@ -95,10 +81,12 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.sql.Date;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -130,64 +118,20 @@ public static ObjectInspector[] toInspectors(HiveShim hiveShim, Object[] args, D
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
HiveTypeUtil.toHiveTypeInfo(argTypes[i]));
} else {
- argumentInspectors[i] =
- HiveInspectors.getPrimitiveJavaConstantObjectInspector(
- hiveShim,
- (PrimitiveTypeInfo) HiveTypeUtil.toHiveTypeInfo(argTypes[i]),
- constant
- );
+ PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) HiveTypeUtil.toHiveTypeInfo(argTypes[i]);
+ argumentInspectors[i] = hiveShim.getObjectInspectorForConstant(primitiveTypeInfo, constant);
}
}
return argumentInspectors;
}
- private static ConstantObjectInspector getPrimitiveJavaConstantObjectInspector(HiveShim hiveShim,
- PrimitiveTypeInfo typeInfo, Object value) {
- switch (typeInfo.getPrimitiveCategory()) {
- case BOOLEAN:
- return new JavaConstantBooleanObjectInspector((Boolean) value);
- case BYTE:
- return new JavaConstantByteObjectInspector((Byte) value);
- case SHORT:
- return new JavaConstantShortObjectInspector((Short) value);
- case INT:
- return new JavaConstantIntObjectInspector((Integer) value);
- case LONG:
- return new JavaConstantLongObjectInspector((Long) value);
- case FLOAT:
- return new JavaConstantFloatObjectInspector((Float) value);
- case DOUBLE:
- return new JavaConstantDoubleObjectInspector((Double) value);
- case STRING:
- return new JavaConstantStringObjectInspector((String) value);
- case CHAR:
- return new JavaConstantHiveCharObjectInspector((HiveChar) value);
- case VARCHAR:
- return new JavaConstantHiveVarcharObjectInspector((HiveVarchar) value);
- case DATE:
- return HiveReflectionUtils.createJavaConstantDateObjectInspector(hiveShim, value);
- case TIMESTAMP:
- return HiveReflectionUtils.createJavaConstantTimestampObjectInspector(hiveShim, value);
- case DECIMAL:
- return new JavaConstantHiveDecimalObjectInspector((HiveDecimal) value);
- case BINARY:
- return new JavaConstantBinaryObjectInspector((byte[]) value);
- case UNKNOWN:
- case VOID:
- // If type is null, we use the Java Constant String to replace
- return new JavaConstantStringObjectInspector((String) value);
- default:
- throw new FlinkHiveUDFException(
- String.format("Cannot find ConstantObjectInspector for %s", typeInfo));
- }
- }
-
/**
* Get conversion for converting Flink object to Hive object from an ObjectInspector and the corresponding Flink DataType.
*/
public static HiveObjectConversion getConversion(ObjectInspector inspector, LogicalType dataType) {
if (inspector instanceof PrimitiveObjectInspector) {
+ HiveObjectConversion conversion;
if (inspector instanceof BooleanObjectInspector ||
inspector instanceof StringObjectInspector ||
inspector instanceof ByteObjectInspector ||
@@ -199,14 +143,20 @@ public static HiveObjectConversion getConversion(ObjectInspector inspector, Logi
inspector instanceof DateObjectInspector ||
inspector instanceof TimestampObjectInspector ||
inspector instanceof BinaryObjectInspector) {
- return IdentityConversion.INSTANCE;
+ conversion = IdentityConversion.INSTANCE;
} else if (inspector instanceof HiveCharObjectInspector) {
- return o -> new HiveChar((String) o, ((CharType) dataType).getLength());
+ conversion = o -> new HiveChar((String) o, ((CharType) dataType).getLength());
} else if (inspector instanceof HiveVarcharObjectInspector) {
- return o -> new HiveVarchar((String) o, ((VarCharType) dataType).getLength());
+ conversion = o -> new HiveVarchar((String) o, ((VarCharType) dataType).getLength());
} else if (inspector instanceof HiveDecimalObjectInspector) {
- return o -> o == null ? null : HiveDecimal.create((BigDecimal) o);
+ conversion = o -> o == null ? null : HiveDecimal.create((BigDecimal) o);
+ } else {
+ throw new FlinkHiveUDFException("Unsupported primitive object inspector " + inspector.getClass().getName());
+ }
+ if (((PrimitiveObjectInspector) inspector).preferWritable()) {
+ conversion = new WritableHiveObjectConversion(conversion);
}
+ return conversion;
}
if (inspector instanceof ListObjectInspector) {
@@ -460,7 +410,7 @@ private static Class> getClassFromObjectInspector(ObjectInspector inspector) {
switch (inspector.getCategory()) {
case PRIMITIVE: {
PrimitiveObjectInspector primitiveOI = (PrimitiveObjectInspector) inspector;
- switch (primitiveOI.getPrimitiveCategory()) {
+ switch (HiveTypeUtil.HivePrimitiveCategory.valueOf(primitiveOI.getPrimitiveCategory().name())) {
case STRING:
case CHAR:
case VARCHAR:
@@ -506,4 +456,47 @@ private static Class> getClassFromObjectInspector(ObjectInspector inspector) {
throw new IllegalArgumentException("Unsupported type " + inspector.getCategory().name());
}
}
+
+ /**
+ * Converts a Hive primitive java object to corresponding Writable object.
+ */
+ public static Writable hivePrimitiveToWritable(Object value) {
+ Writable writable;
+ // in case value is already a Writable
+ if (value instanceof Writable) {
+ writable = (Writable) value;
+ } else if (value instanceof Boolean) {
+ writable = new BooleanWritable((Boolean) value);
+ } else if (value instanceof Byte) {
+ writable = new ByteWritable((Byte) value);
+ } else if (value instanceof Short) {
+ writable = new ShortWritable((Short) value);
+ } else if (value instanceof Integer) {
+ writable = new IntWritable((Integer) value);
+ } else if (value instanceof Long) {
+ writable = new LongWritable((Long) value);
+ } else if (value instanceof Float) {
+ writable = new FloatWritable((Float) value);
+ } else if (value instanceof Double) {
+ writable = new DoubleWritable((Double) value);
+ } else if (value instanceof String) {
+ writable = new Text((String) value);
+ } else if (value instanceof HiveChar) {
+ writable = new HiveCharWritable((HiveChar) value);
+ } else if (value instanceof HiveVarchar) {
+ writable = new HiveVarcharWritable((HiveVarchar) value);
+ } else if (value instanceof Date) {
+ writable = new DateWritable((Date) value);
+ } else if (value instanceof Timestamp) {
+ writable = new TimestampWritable((Timestamp) value);
+ } else if (value instanceof BigDecimal) {
+ HiveDecimal hiveDecimal = HiveDecimal.create((BigDecimal) value);
+ writable = new HiveDecimalWritable(hiveDecimal);
+ } else if (value instanceof byte[]) {
+ writable = new BytesWritable((byte[]) value);
+ } else {
+ throw new CatalogException("Unsupported primitive java value of class " + value.getClass().getName());
+ }
+ return writable;
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/WritableHiveObjectConversion.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/WritableHiveObjectConversion.java
new file mode 100644
index 0000000000000..c659569965df5
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/WritableHiveObjectConversion.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive.conversion;
+
+/**
+ * A HiveObjectConversion that converts Flink objects to Hive Writable objects.
+ */
+public class WritableHiveObjectConversion implements HiveObjectConversion {
+
+ private final HiveObjectConversion flinkToJavaConversion;
+
+ WritableHiveObjectConversion(HiveObjectConversion flinkToJavaConversion) {
+ this.flinkToJavaConversion = flinkToJavaConversion;
+ }
+
+ @Override
+ public Object toHiveObject(Object o) {
+ return HiveInspectors.hivePrimitiveToWritable(flinkToJavaConversion.toHiveObject(o));
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveServerContext.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveServerContext.java
index 236f253445f61..6c90da86bf44c 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveServerContext.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveServerContext.java
@@ -107,6 +107,9 @@ private void configureMiscHiveSettings(HiveConf hiveConf) {
hiveConf.setBoolVar(HIVE_SERVER2_LOGGING_OPERATION_ENABLED, false);
hiveConf.setVar(HADOOPBIN, "NO_BIN!");
+
+ // To avoid https://issues.apache.org/jira/browse/HIVE-13185 when loading data into tables
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVECHECKFILEFORMAT, false);
}
private void overrideHiveConf(HiveConf hiveConf) {
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
index ba2ff57208f86..2a344d1b88057 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
@@ -37,6 +37,8 @@ public static HiveRunnerShim load() {
String hiveVersion = HiveShimLoader.getHiveVersion();
return hiveRunnerShims.computeIfAbsent(hiveVersion, v -> {
switch (v) {
+ case HiveShimLoader.HIVE_VERSION_V1_1_0:
+ case HiveShimLoader.HIVE_VERSION_V1_1_1:
case HiveShimLoader.HIVE_VERSION_V1_2_0:
case HiveShimLoader.HIVE_VERSION_V1_2_1:
case HiveShimLoader.HIVE_VERSION_V1_2_2:
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
index 878cd686f79d4..8a8aa0f7af6f9 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceTest.java
@@ -84,12 +84,11 @@ public void testReadNonPartitionedTable() throws Exception {
final String dbName = "source_db";
final String tblName = "test";
hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)");
- hiveShell.insertInto(dbName, tblName)
- .withAllColumns()
- .addRow(1, 1, "a", 1000L, 1.11)
- .addRow(2, 2, "b", 2000L, 2.22)
- .addRow(3, 3, "c", 3000L, 3.33)
- .addRow(4, 4, "d", 4000L, 4.44)
+ HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
+ .addRow(new Object[]{1, 1, "a", 1000L, 1.11})
+ .addRow(new Object[]{2, 2, "b", 2000L, 2.22})
+ .addRow(new Object[]{3, 3, "c", 3000L, 3.33})
+ .addRow(new Object[]{4, 4, "d", 4000L, 4.44})
.commit();
TableEnvironment tEnv = HiveTestUtils.createTableEnv();
@@ -116,9 +115,8 @@ public void testReadComplexDataType() throws Exception {
map.put(1, "a");
map.put(2, "b");
Object[] struct = new Object[]{3, 3L};
- hiveShell.insertInto(dbName, tblName)
- .withAllColumns()
- .addRow(array, map, struct)
+ HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
+ .addRow(new Object[]{array, map, struct})
.commit();
TableEnvironment tEnv = HiveTestUtils.createTableEnv();
tEnv.registerCatalog(catalogName, hiveCatalog);
@@ -141,13 +139,14 @@ public void testReadPartitionTable() throws Exception {
final String tblName = "test_table_pt";
hiveShell.execute("CREATE TABLE source_db.test_table_pt " +
"(year STRING, value INT) partitioned by (pt int);");
- hiveShell.insertInto(dbName, tblName)
- .withColumns("year", "value", "pt")
- .addRow("2014", 3, 0)
- .addRow("2014", 4, 0)
- .addRow("2015", 2, 1)
- .addRow("2015", 5, 1)
- .commit();
+ HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
+ .addRow(new Object[]{"2014", 3})
+ .addRow(new Object[]{"2014", 4})
+ .commit("pt=0");
+ HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
+ .addRow(new Object[]{"2015", 2})
+ .addRow(new Object[]{"2015", 5})
+ .commit("pt=1");
TableEnvironment tEnv = HiveTestUtils.createTableEnv();
tEnv.registerCatalog(catalogName, hiveCatalog);
Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt");
@@ -165,13 +164,14 @@ public void testPartitionPrunning() throws Exception {
final String tblName = "test_table_pt_1";
hiveShell.execute("CREATE TABLE source_db.test_table_pt_1 " +
"(year STRING, value INT) partitioned by (pt int);");
- hiveShell.insertInto(dbName, tblName)
- .withColumns("year", "value", "pt")
- .addRow("2014", 3, 0)
- .addRow("2014", 4, 0)
- .addRow("2015", 2, 1)
- .addRow("2015", 5, 1)
- .commit();
+ HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
+ .addRow(new Object[]{"2014", 3})
+ .addRow(new Object[]{"2014", 4})
+ .commit("pt=0");
+ HiveTestUtils.createTextTableInserter(hiveShell, dbName, tblName)
+ .addRow(new Object[]{"2015", 2})
+ .addRow(new Object[]{"2015", 5})
+ .commit("pt=1");
TableEnvironment tEnv = HiveTestUtils.createTableEnv();
tEnv.registerCatalog(catalogName, hiveCatalog);
Table src = tEnv.sqlQuery("select * from hive.source_db.test_table_pt_1 where pt = 0");
@@ -196,11 +196,13 @@ public void testProjectionPushDown() throws Exception {
hiveShell.execute("create table src(x int,y string) partitioned by (p1 bigint, p2 string)");
final String catalogName = "hive";
try {
- hiveShell.insertInto("default", "src")
- .addRow(1, "a", 2013, "2013")
- .addRow(2, "b", 2013, "2013")
- .addRow(3, "c", 2014, "2014")
- .commit();
+ HiveTestUtils.createTextTableInserter(hiveShell, "default", "src")
+ .addRow(new Object[]{1, "a"})
+ .addRow(new Object[]{2, "b"})
+ .commit("p1=2013, p2='2013'");
+ HiveTestUtils.createTextTableInserter(hiveShell, "default", "src")
+ .addRow(new Object[]{3, "c"})
+ .commit("p1=2014, p2='2014'");
TableEnvironment tableEnv = HiveTestUtils.createTableEnv();
tableEnv.registerCatalog(catalogName, hiveCatalog);
Table table = tableEnv.sqlQuery("select p1, count(y) from hive.`default`.src group by p1");
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 9484e550b9d1e..2c50bef67884d 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -77,7 +77,7 @@ public void testDefaultPartitionName() throws Exception {
hiveShell.execute("create database db1");
hiveShell.execute("create table db1.src (x int, y int)");
hiveShell.execute("create table db1.part (x int) partitioned by (y int)");
- hiveShell.insertInto("db1", "src").addRow(1, 1).addRow(2, null).commit();
+ HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src").addRow(new Object[]{1, 1}).addRow(new Object[]{2, null}).commit();
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
@@ -186,7 +186,7 @@ public void testInsertOverwrite() throws Exception {
try {
// non-partitioned
hiveShell.execute("create table db1.dest (x int, y string)");
- hiveShell.insertInto("db1", "dest").addRow(1, "a").addRow(2, "b").commit();
+ HiveTestUtils.createTextTableInserter(hiveShell, "db1", "dest").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit();
verifyHiveQueryResult("select * from db1.dest", Arrays.asList("1\ta", "2\tb"));
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
tableEnv.sqlUpdate("insert overwrite db1.dest values (3,'c')");
@@ -195,7 +195,8 @@ public void testInsertOverwrite() throws Exception {
// static partition
hiveShell.execute("create table db1.part(x int) partitioned by (y int)");
- hiveShell.insertInto("db1", "part").addRow(1, 1).addRow(2, 2).commit();
+ HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{1}).commit("y=1");
+ HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part").addRow(new Object[]{2}).commit("y=2");
tableEnv = getTableEnvWithHiveCatalog();
tableEnv.sqlUpdate("insert overwrite db1.part partition (y=1) select 100");
tableEnv.execute("insert overwrite static partition");
@@ -217,7 +218,7 @@ public void testStaticPartition() throws Exception {
hiveShell.execute("create database db1");
try {
hiveShell.execute("create table db1.src (x int)");
- hiveShell.insertInto("db1", "src").addRow(1).addRow(2).commit();
+ HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src").addRow(new Object[]{1}).addRow(new Object[]{2}).commit();
hiveShell.execute("create table db1.dest (x int) partitioned by (p1 string, p2 double)");
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
tableEnv.sqlUpdate("insert into db1.dest partition (p1='1''1', p2=1.1) select x from db1.src");
@@ -234,7 +235,11 @@ public void testDynamicPartition() throws Exception {
hiveShell.execute("create database db1");
try {
hiveShell.execute("create table db1.src (x int, y string, z double)");
- hiveShell.insertInto("db1", "src").addRow(1, "a", 1.1).addRow(2, "a", 2.2).addRow(3, "b", 3.3).commit();
+ HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src")
+ .addRow(new Object[]{1, "a", 1.1})
+ .addRow(new Object[]{2, "a", 2.2})
+ .addRow(new Object[]{3, "b", 3.3})
+ .commit();
hiveShell.execute("create table db1.dest (x int) partitioned by (p1 string, p2 double)");
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
tableEnv.sqlUpdate("insert into db1.dest select * from db1.src");
@@ -251,7 +256,7 @@ public void testPartialDynamicPartition() throws Exception {
hiveShell.execute("create database db1");
try {
hiveShell.execute("create table db1.src (x int, y string)");
- hiveShell.insertInto("db1", "src").addRow(1, "a").addRow(2, "b").commit();
+ HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src").addRow(new Object[]{1, "a"}).addRow(new Object[]{2, "b"}).commit();
hiveShell.execute("create table db1.dest (x int) partitioned by (p1 double, p2 string)");
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
tableEnv.sqlUpdate("insert into db1.dest partition (p1=1.1) select x,y from db1.src");
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
index 0fe10d7d3ae84..fb5c04a04ae54 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogHiveMetadataTest.java
@@ -23,6 +23,7 @@
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBase;
import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataBinary;
@@ -79,26 +80,32 @@ public void testCreateTable_StorageFormatSet() throws Exception {
// ------ table and column stats ------
@Test
public void testAlterTableColumnStatistics() throws Exception {
+ String hiveVersion = ((HiveCatalog) catalog).getHiveVersion();
+ boolean supportDateStats = hiveVersion.compareTo(HiveShimLoader.HIVE_VERSION_V1_2_0) >= 0;
catalog.createDatabase(db1, createDb(), false);
- TableSchema tableSchema = TableSchema.builder()
- .field("first", DataTypes.STRING())
- .field("second", DataTypes.INT())
- .field("third", DataTypes.BOOLEAN())
- .field("fourth", DataTypes.DATE())
- .field("fifth", DataTypes.DOUBLE())
- .field("sixth", DataTypes.BIGINT())
- .field("seventh", DataTypes.BYTES())
- .build();
+ TableSchema.Builder builder = TableSchema.builder()
+ .field("first", DataTypes.STRING())
+ .field("second", DataTypes.INT())
+ .field("third", DataTypes.BOOLEAN())
+ .field("fourth", DataTypes.DOUBLE())
+ .field("fifth", DataTypes.BIGINT())
+ .field("sixth", DataTypes.BYTES());
+ if (supportDateStats) {
+ builder.field("seventh", DataTypes.DATE());
+ }
+ TableSchema tableSchema = builder.build();
CatalogTable catalogTable = new CatalogTableImpl(tableSchema, getBatchTableProperties(), TEST_COMMENT);
catalog.createTable(path1, catalogTable, false);
Map columnStatisticsDataBaseMap = new HashMap<>();
columnStatisticsDataBaseMap.put("first", new CatalogColumnStatisticsDataString(10, 5.2, 3, 100));
columnStatisticsDataBaseMap.put("second", new CatalogColumnStatisticsDataLong(0, 1000, 3, 0));
columnStatisticsDataBaseMap.put("third", new CatalogColumnStatisticsDataBoolean(15, 20, 3));
- columnStatisticsDataBaseMap.put("fourth", new CatalogColumnStatisticsDataDate(new Date(71L), new Date(17923L), 1321, 0L));
- columnStatisticsDataBaseMap.put("fifth", new CatalogColumnStatisticsDataDouble(15.02, 20.01, 3, 10));
- columnStatisticsDataBaseMap.put("sixth", new CatalogColumnStatisticsDataLong(0, 20, 3, 2));
- columnStatisticsDataBaseMap.put("seventh", new CatalogColumnStatisticsDataBinary(150, 20, 3));
+ columnStatisticsDataBaseMap.put("fourth", new CatalogColumnStatisticsDataDouble(15.02, 20.01, 3, 10));
+ columnStatisticsDataBaseMap.put("fifth", new CatalogColumnStatisticsDataLong(0, 20, 3, 2));
+ columnStatisticsDataBaseMap.put("sixth", new CatalogColumnStatisticsDataBinary(150, 20, 3));
+ if (supportDateStats) {
+ columnStatisticsDataBaseMap.put("seventh", new CatalogColumnStatisticsDataDate(new Date(71L), new Date(17923L), 1321, 0L));
+ }
CatalogColumnStatistics catalogColumnStatistics = new CatalogColumnStatistics(columnStatisticsDataBaseMap);
catalog.alterTableColumnStatistics(path1, catalogColumnStatistics, false);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTestBase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTestBase.java
index 232c250595ac4..88895a3c9ea01 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTestBase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogTestBase.java
@@ -21,6 +21,7 @@
import org.apache.flink.table.catalog.CatalogTestBase;
import org.apache.flink.table.catalog.ObjectPath;
+import org.junit.Assume;
import org.junit.Test;
/**
@@ -28,6 +29,16 @@
*/
public abstract class HiveCatalogTestBase extends CatalogTestBase {
+ // ------ table and column stats ------
+
+ @Override
+ @Test
+ public void testAlterTableStats() throws Exception {
+ String hiveVersion = ((HiveCatalog) catalog).getHiveVersion();
+ Assume.assumeTrue(hiveVersion.compareTo("1.2.1") >= 0);
+ super.testAlterTableStats();
+ }
+
// ------ functions ------
@Test
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
index 2e06d519288ce..ec142cb9bfe49 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveTestUtils.java
@@ -37,15 +37,20 @@
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.StringUtils;
+import com.klarna.hiverunner.HiveShell;
import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.rules.TemporaryFolder;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -63,6 +68,8 @@ public class HiveTestUtils {
private static final int MIN_EPH_PORT = 49152;
private static final int MAX_EPH_PORT = 61000;
+ private static final byte[] SEPARATORS = new byte[]{(byte) 1, (byte) 2, (byte) 3};
+
/**
* Create a HiveCatalog with an embedded Hive Metastore.
*/
@@ -138,4 +145,106 @@ public static List collectTable(TableEnvironment tableEnv, Table table) thr
ArrayList data = result.getAccumulatorResult(id);
return SerializedListAccumulator.deserializeList(data, serializer);
}
+
+ // Insert into a single partition of a text table.
+ public static TextTableInserter createTextTableInserter(HiveShell hiveShell, String dbName, String tableName) {
+ return new TextTableInserter(hiveShell, dbName, tableName);
+ }
+
+ /**
+ * insert table operation.
+ */
+ public static class TextTableInserter {
+
+ private final HiveShell hiveShell;
+ private final String dbName;
+ private final String tableName;
+ private final List