Skip to content

Commit

Permalink
[FLINK-15391][hive] DATE and TIMESTAMP partition columns don't work
Browse files Browse the repository at this point in the history
Fix issues of read/write DATE and TIMESTAMP partition columns.

this closes apache#10690.
  • Loading branch information
lirui-apache authored and bowenli86 committed Dec 26, 2019
1 parent d3f1f06 commit 2e9a7f3
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connectors.hive;

import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.filesystem.RowPartitionComputer;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import java.util.LinkedHashMap;

/**
* A RowPartitionComputer that converts Flink objects to Hive objects before computing the partition value strings.
*/
public class HivePartitionComputer extends RowPartitionComputer {

private static final long serialVersionUID = 1L;

private final HiveObjectConversion[] partColConversions;

HivePartitionComputer(HiveShim hiveShim, String defaultPartValue, String[] columnNames,
DataType[] columnTypes, String[] partitionColumns) {
super(defaultPartValue, columnNames, partitionColumns);
partColConversions = new HiveObjectConversion[partitionIndexes.length];
for (int i = 0; i < partColConversions.length; i++) {
DataType partColType = columnTypes[partitionIndexes[i]];
ObjectInspector objectInspector = HiveInspectors.getObjectInspector(partColType);
partColConversions[i] = HiveInspectors.getConversion(objectInspector, partColType.getLogicalType(), hiveShim);
}
}

@Override
public LinkedHashMap<String, String> generatePartValues(Row in) throws Exception {
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();

for (int i = 0; i < partitionIndexes.length; i++) {
int index = partitionIndexes[i];
Object field = in.getField(index);
String partitionValue = field != null ? partColConversions[i].toHiveObject(field).toString() : null;
if (StringUtils.isEmpty(partitionValue)) {
partitionValue = defaultPartValue;
}
partSpec.put(partitionColumns[i], partitionValue);
}
return partSpec;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.filesystem.FileSystemOutputFormat;
import org.apache.flink.table.filesystem.RowPartitionComputer;
import org.apache.flink.table.sinks.OutputFormatTableSink;
import org.apache.flink.table.sinks.OverwritableTableSink;
import org.apache.flink.table.sinks.PartitionableTableSink;
Expand Down Expand Up @@ -94,11 +93,13 @@ public OutputFormat<Row> getOutputFormat() {
StorageDescriptor sd = table.getSd();

FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
builder.setPartitionComputer(new RowPartitionComputer(
builder.setPartitionComputer(new HivePartitionComputer(
hiveShim,
jobConf.get(
HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal),
tableSchema.getFieldNames(),
tableSchema.getFieldDataTypes(),
partitionColumns));
builder.setDynamicGrouped(dynamicGrouping);
builder.setPartitionColumns(partitionColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.sources.LimitableTableSource;
import org.apache.flink.table.sources.PartitionableTableSource;
Expand All @@ -56,6 +59,7 @@

import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -81,6 +85,7 @@ public class HiveTableSource implements
@Nullable
private List<Map<String, String>> remainingPartitions = null;
private String hiveVersion;
private HiveShim hiveShim;
private boolean partitionPruned;
private int[] projectedFields;
private boolean isLimitPushDown = false;
Expand All @@ -92,6 +97,7 @@ public HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable catal
this.catalogTable = Preconditions.checkNotNull(catalogTable);
this.hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
"Hive version is not defined");
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
partitionPruned = false;
}

Expand All @@ -108,6 +114,7 @@ private HiveTableSource(JobConf jobConf, ObjectPath tablePath, CatalogTable cata
this.catalogTable = Preconditions.checkNotNull(catalogTable);
this.remainingPartitions = remainingPartitions;
this.hiveVersion = hiveVersion;
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
this.partitionPruned = partitionPruned;
this.projectedFields = projectedFields;
this.isLimitPushDown = isLimitPushDown;
Expand Down Expand Up @@ -286,7 +293,15 @@ private Object restorePartitionValueFromFromType(String valStr, DataType type) {
case DOUBLE:
return Double.valueOf(valStr);
case DATE:
return Date.valueOf(valStr);
return HiveInspectors.toFlinkObject(
HiveInspectors.getObjectInspector(type),
hiveShim.toHiveDate(Date.valueOf(valStr)),
hiveShim);
case TIMESTAMP_WITHOUT_TIME_ZONE:
return HiveInspectors.toFlinkObject(
HiveInspectors.getObjectInspector(type),
hiveShim.toHiveTimestamp(Timestamp.valueOf(valStr)),
hiveShim);
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, L
ensurePartitionedTable(tablePath, hiveTable);
List<String> partColNames = getFieldNames(hiveTable.getPartitionKeys());
Optional<String> filter = HiveTableUtil.makePartitionFilter(
getNonPartitionFields(hiveConf, hiveTable).size(), partColNames, expressions);
getNonPartitionFields(hiveConf, hiveTable).size(), partColNames, expressions, hiveShim);
if (!filter.isPresent()) {
throw new UnsupportedOperationException(
"HiveCatalog is unable to handle the partition filter expressions: " + expressions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionVisitor;
Expand All @@ -28,7 +29,9 @@
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand Down Expand Up @@ -193,9 +196,10 @@ public static boolean requireRelyConstraint(byte trait) {
* @param expressions The filter expressions in CNF form
* @return an Optional filter string equivalent to the expressions, which is empty if the expressions can't be handled
*/
public static Optional<String> makePartitionFilter(int partColOffset, List<String> partColNames, List<Expression> expressions) {
public static Optional<String> makePartitionFilter(
int partColOffset, List<String> partColNames, List<Expression> expressions, HiveShim hiveShim) {
List<String> filters = new ArrayList<>(expressions.size());
ExpressionExtractor extractor = new ExpressionExtractor(partColOffset, partColNames);
ExpressionExtractor extractor = new ExpressionExtractor(partColOffset, partColNames, hiveShim);
for (Expression expression : expressions) {
String str = expression.accept(extractor);
if (str == null) {
Expand Down Expand Up @@ -225,10 +229,12 @@ private static class ExpressionExtractor implements ExpressionVisitor<String> {
// used to shift field reference index
private final int partColOffset;
private final List<String> partColNames;
private final HiveShim hiveShim;

ExpressionExtractor(int partColOffset, List<String> partColNames) {
ExpressionExtractor(int partColOffset, List<String> partColNames, HiveShim hiveShim) {
this.partColOffset = partColOffset;
this.partColNames = partColNames;
this.hiveShim = hiveShim;
}

@Override
Expand All @@ -250,7 +256,19 @@ public String visit(CallExpression call) {

@Override
public String visit(ValueLiteralExpression valueLiteral) {
return valueLiteral.asSummaryString();
DataType dataType = valueLiteral.getOutputDataType();
Object value = valueLiteral.getValueAs(Object.class).orElse(null);
if (value == null) {
return "null";
}
value = HiveInspectors.getConversion(HiveInspectors.getObjectInspector(dataType), dataType.getLogicalType(), hiveShim)
.toHiveObject(value);
String res = value.toString();
LogicalTypeRoot typeRoot = dataType.getLogicalType().getTypeRoot();
if (typeRoot == LogicalTypeRoot.VARCHAR || typeRoot == LogicalTypeRoot.CHAR) {
res = "'" + res.replace("'", "''") + "'";
}
return res;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import org.apache.flink.annotation.Internal;

import java.io.Serializable;

/**
* Interface to convert Flink object to Hive object.
*/
@FunctionalInterface
@Internal
public interface HiveObjectConversion {
public interface HiveObjectConversion extends Serializable {

Object toHiveObject(Object o);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
@Internal
public class IdentityConversion implements HiveObjectConversion {

private static final long serialVersionUID = 1L;

public static final IdentityConversion INSTANCE = new IdentityConversion();

private IdentityConversion() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
*/
public class WritableHiveObjectConversion implements HiveObjectConversion {

private static final long serialVersionUID = 1L;

private final HiveObjectConversion flinkToJavaConversion;

WritableHiveObjectConversion(HiveObjectConversion flinkToJavaConversion) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,34 @@ public void testPartialDynamicPartition() throws Exception {
}
}

@Test
public void testDateTimestampPartitionColumns() throws Exception {
hiveShell.execute("create database db1");
try {
hiveShell.execute("create table db1.part(x int) partitioned by (dt date,ts timestamp)");
HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part")
.addRow(new Object[]{1})
.addRow(new Object[]{2})
.commit("dt='2019-12-23',ts='2019-12-23 00:00:00'");
HiveTestUtils.createTextTableInserter(hiveShell, "db1", "part")
.addRow(new Object[]{3})
.commit("dt='2019-12-25',ts='2019-12-25 16:23:43.012'");
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
List<Row> results = TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.part order by x"));
assertEquals("[1,2019-12-23,2019-12-23T00:00, 2,2019-12-23,2019-12-23T00:00, 3,2019-12-25,2019-12-25T16:23:43.012]", results.toString());

results = TableUtils.collectToList(tableEnv.sqlQuery("select x from db1.part where dt=cast('2019-12-25' as date)"));
assertEquals("[3]", results.toString());

tableEnv.sqlUpdate("insert into db1.part select 4,cast('2019-12-31' as date),cast('2019-12-31 12:00:00.0' as timestamp)");
tableEnv.execute("insert");
results = TableUtils.collectToList(tableEnv.sqlQuery("select max(dt) from db1.part"));
assertEquals("[2019-12-31]", results.toString());
} finally {
hiveShell.execute("drop database db1 cascade");
}
}

@Test
public void testUDTF() throws Exception {
// W/o https://issues.apache.org/jira/browse/HIVE-11878 Hive registers the App classloader as the classloader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.table.catalog.hive.util;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
Expand All @@ -38,6 +40,8 @@
*/
public class HiveTableUtilTest {

private static final HiveShim hiveShim = HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());

@Test
public void testMakePartitionFilter() {
List<String> partColNames = Arrays.asList("p1", "p2", "p3");
Expand All @@ -50,15 +54,16 @@ public void testMakePartitionFilter() {
Arrays.asList(p2Ref, new ValueLiteralExpression("a", DataTypes.STRING())), DataTypes.BOOLEAN());
ResolvedExpression p3Exp = new CallExpression(BuiltInFunctionDefinitions.EQUALS,
Arrays.asList(p3Ref, new ValueLiteralExpression(1.1, DataTypes.DOUBLE())), DataTypes.BOOLEAN());
Optional<String> filter = HiveTableUtil.makePartitionFilter(2, partColNames, Arrays.asList(p1Exp));
Optional<String> filter = HiveTableUtil.makePartitionFilter(2, partColNames, Arrays.asList(p1Exp), hiveShim);
assertEquals("(p1 = 1)", filter.orElse(null));

filter = HiveTableUtil.makePartitionFilter(2, partColNames, Arrays.asList(p1Exp, p3Exp));
filter = HiveTableUtil.makePartitionFilter(2, partColNames, Arrays.asList(p1Exp, p3Exp), hiveShim);
assertEquals("(p1 = 1) and (p3 = 1.1)", filter.orElse(null));

filter = HiveTableUtil.makePartitionFilter(2, partColNames,
Arrays.asList(p2Exp,
new CallExpression(BuiltInFunctionDefinitions.OR, Arrays.asList(p1Exp, p3Exp), DataTypes.BOOLEAN())));
new CallExpression(BuiltInFunctionDefinitions.OR, Arrays.asList(p1Exp, p3Exp), DataTypes.BOOLEAN())),
hiveShim);
assertEquals("(p2 = 'a') and ((p1 = 1) or (p3 = 1.1))", filter.orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;

import static org.apache.flink.table.runtime.functions.SqlDateTimeUtils.dateToInternal;
Expand Down Expand Up @@ -111,6 +112,9 @@ private static ColumnVector createHiveVectorFromConstant(
case DOUBLE:
return createDoubleVector(batchSize, value);
case DATE:
if (value instanceof LocalDate) {
value = Date.valueOf((LocalDate) value);
}
return createLongVector(batchSize, dateToInternal((Date) value));
case TIMESTAMP_WITHOUT_TIME_ZONE:
return createTimestampVector(batchSize, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ public class RowPartitionComputer implements PartitionComputer<Row> {

private static final long serialVersionUID = 1L;

private final String defaultPartValue;
private final String[] partitionColumns;
protected final String defaultPartValue;
protected final String[] partitionColumns;
private final int[] nonPartitionIndexes;
private final int[] partitionIndexes;
protected final int[] partitionIndexes;

public RowPartitionComputer(String defaultPartValue, String[] columnNames, String[] partitionColumns) {
this.defaultPartValue = defaultPartValue;
Expand Down

0 comments on commit 2e9a7f3

Please sign in to comment.