Skip to content

Commit

Permalink
[FLINK-14526][hive] Support Hive version 1.1.0 and 1.1.1
Browse files Browse the repository at this point in the history
To support Hive 1.1.0 and 1.1.1.

This closes apache#9995.
  • Loading branch information
lirui-apache authored and bowenli86 committed Oct 29, 2019
1 parent 759b8cb commit 6cc09ac
Show file tree
Hide file tree
Showing 23 changed files with 829 additions and 296 deletions.
23 changes: 23 additions & 0 deletions flink-connectors/flink-connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,10 @@ under the License.
<groupId>org.apache.hive</groupId>
<artifactId>hive-hcatalog-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-webhcat-java-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.tez</groupId>
<artifactId>tez-common</artifactId>
Expand Down Expand Up @@ -601,9 +605,20 @@ under the License.
<artifactId>hadoop-mapreduce-client-core</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-webhcat-java-client</artifactId>
<version>${hive.version}</version>
<scope>test</scope>
</dependency>

<!-- TODO: move to flink-connector-hive-test end-to-end test module once it's setup -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -652,6 +667,14 @@ under the License.

<profiles>
<!-- Activate this profile with -Phive-1.2.1 to build and test against hive-1.2.1 -->
<profile>
<id>hive-1.1.1</id>
<properties>
<hive.version>1.1.1</hive.version>
<hivemetastore.hadoop.version>2.6.5</hivemetastore.hadoop.version>
<hiverunner.version>3.1.1</hiverunner.version>
</properties>
</profile>
<profile>
<id>hive-1.2.1</id>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,10 @@ private boolean isTablePartitioned(Table hiveTable) {
public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
try {
Table hiveTable = getHiveTable(tablePath);
// the stats we put in table parameters will be overridden by HMS in older Hive versions, so error out
if (!isTablePartitioned(hiveTable) && hiveVersion.compareTo("1.2.1") < 0) {
throw new CatalogException("Alter table stats is not supported in Hive version " + hiveVersion);
}
// Set table stats
if (compareAndUpdateStatisticsProperties(tableStatistics, hiveTable.getParameters())) {
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), hiveTable);
Expand All @@ -1139,7 +1143,7 @@ public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatis
Table hiveTable = getHiveTable(tablePath);
// Set table column stats. This only works for non-partitioned tables.
if (!isTablePartitioned(hiveTable)) {
client.updateTableColumnStatistics(HiveStatsUtil.createTableColumnStats(hiveTable, columnStatistics.getColumnStatisticsData()));
client.updateTableColumnStatistics(HiveStatsUtil.createTableColumnStats(hiveTable, columnStatistics.getColumnStatisticsData(), hiveVersion));
} else {
throw new TablePartitionedException(getName(), tablePath);
}
Expand Down Expand Up @@ -1204,7 +1208,8 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio
Partition hivePartition = getHivePartition(tablePath, partitionSpec);
Table hiveTable = getHiveTable(tablePath);
String partName = getPartitionName(tablePath, partitionSpec, hiveTable);
client.updatePartitionColumnStatistics(HiveStatsUtil.createPartitionColumnStats(hivePartition, partName, columnStatistics.getColumnStatisticsData()));
client.updatePartitionColumnStatistics(HiveStatsUtil.createPartitionColumnStats(
hivePartition, partName, columnStatistics.getColumnStatisticsData(), hiveVersion));
} catch (TableNotExistException | PartitionSpecInvalidException e) {
if (!ignoreIfNotExists) {
throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
Expand Down Expand Up @@ -1243,7 +1248,7 @@ public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) th
if (!isTablePartitioned(hiveTable)) {
List<ColumnStatisticsObj> columnStatisticsObjs = client.getTableColumnStatistics(
hiveTable.getDbName(), hiveTable.getTableName(), getFieldNames(hiveTable.getSd().getCols()));
return new CatalogColumnStatistics(HiveStatsUtil.createCatalogColumnStats(columnStatisticsObjs));
return new CatalogColumnStatistics(HiveStatsUtil.createCatalogColumnStats(columnStatisticsObjs, hiveVersion));
} else {
// TableColumnStats of partitioned table is unknown, the behavior is same as HIVE
return CatalogColumnStatistics.UNKNOWN;
Expand Down Expand Up @@ -1280,7 +1285,7 @@ public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath
getFieldNames(partition.getSd().getCols()));
List<ColumnStatisticsObj> columnStatisticsObjs = partitionColumnStatistics.get(partName);
if (columnStatisticsObjs != null && !columnStatisticsObjs.isEmpty()) {
return new CatalogColumnStatistics(HiveStatsUtil.createCatalogColumnStats(columnStatisticsObjs));
return new CatalogColumnStatistics(HiveStatsUtil.createCatalogColumnStats(columnStatisticsObjs, hiveVersion));
} else {
return CatalogColumnStatistics.UNKNOWN;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

package org.apache.flink.table.catalog.hive.client;

import org.apache.flink.table.catalog.stats.CatalogColumnStatisticsDataDate;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
Expand All @@ -33,16 +36,18 @@
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.thrift.TException;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
* A shim layer to support different versions of Hive.
*/
public interface HiveShim {
public interface HiveShim extends Serializable {

/**
* Create a Hive Metastore client based on the given HiveConf object.
Expand Down Expand Up @@ -159,4 +164,24 @@ SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params,
*/
void makeSpecFromName(Map<String, String> partSpec, Path currPath);

/**
* Get ObjectInspector for a constant value.
*/
ObjectInspector getObjectInspectorForConstant(PrimitiveTypeInfo primitiveTypeInfo, Object value);

/**
* Generate Hive ColumnStatisticsData from Flink CatalogColumnStatisticsDataDate for DATE columns.
*/
ColumnStatisticsData toHiveDateColStats(CatalogColumnStatisticsDataDate flinkDateColStats);

/**
* Whether a Hive ColumnStatisticsData is for DATE columns.
*/
boolean isDateStats(ColumnStatisticsData colStatsData);

/**
* Generate Flink CatalogColumnStatisticsDataDate from Hive ColumnStatisticsData for DATE columns.
*/
CatalogColumnStatisticsDataDate toFlinkDateColStats(ColumnStatisticsData hiveDateColStats);

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
*/
public class HiveShimLoader {

public static final String HIVE_VERSION_V1_1_0 = "1.1.0";
public static final String HIVE_VERSION_V1_1_1 = "1.1.1";
public static final String HIVE_VERSION_V1_2_0 = "1.2.0";
public static final String HIVE_VERSION_V1_2_1 = "1.2.1";
public static final String HIVE_VERSION_V1_2_2 = "1.2.2";
Expand Down Expand Up @@ -60,6 +62,12 @@ private HiveShimLoader() {

public static HiveShim loadHiveShim(String version) {
return hiveShims.computeIfAbsent(version, (v) -> {
if (v.startsWith(HIVE_VERSION_V1_1_0)) {
return new HiveShimV110();
}
if (v.startsWith(HIVE_VERSION_V1_1_1)) {
return new HiveShimV111();
}
if (v.startsWith(HIVE_VERSION_V1_2_0)) {
return new HiveShimV120();
}
Expand Down
Loading

0 comments on commit 6cc09ac

Please sign in to comment.