Skip to content

Commit

Permalink
Update to iceberg 0.6.2 and fix for handling dateCreated in iceberg p…
Browse files Browse the repository at this point in the history
…artition query. (#317)
  • Loading branch information
zhljen committed Dec 18, 2018
1 parent ab3b049 commit 3b39dc9
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 83 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ hive_version=1.2.1
org.gradle.parallel=false
org.gradle.daemon=false

iceberg_version=0.6.1
iceberg_version=0.6.2
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.netflix.metacat.connector.hive.util;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.netflix.iceberg.expressions.Expression;
import com.netflix.iceberg.expressions.Expressions;
Expand All @@ -33,15 +34,20 @@
import com.netflix.metacat.common.server.partition.parser.SimpleNode;
import com.netflix.metacat.common.server.partition.parser.Variable;
import com.netflix.metacat.common.server.partition.visitor.PartitionParserEval;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Iceberg Filter generator.
*/
public class IcebergFilterGenerator extends PartitionParserEval {
private static final Set<String> ICEBERG_TIMESTAMP_NAMES
= ImmutableSet.of("dateCreated", "lastUpdated");
private final Map<String, Types.NestedField> fieldMap;

/**
Expand All @@ -56,84 +62,6 @@ public IcebergFilterGenerator(final List<Types.NestedField> fields) {
}
}

/**
* evalString.
*
* @param node node
* @param data data
* @return eval String
*/
public Expression evalString(final SimpleNode node, final Object data) {
final Object lhs = node.jjtGetChild(0).jjtAccept(this, data);
final Compare comparison = (Compare) node.jjtGetChild(1).jjtAccept(this, data);
final Object rhs = node.jjtGetChild(2).jjtAccept(this, data);
return createIcebergFilter(lhs, rhs, comparison);
}

private boolean isField(final String key) {
return fieldMap.containsKey(key.toLowerCase());
}

private Expression createIcebergFilter(final Object lhs,
final Object rhs,
final Compare comparison) {
String key = null;
Object value = null;
//
// lhs, rhs or both can be keys
//
if (lhs instanceof String && isField((String) lhs)) {
key = lhs.toString();
value = rhs;
} else if (rhs instanceof String && isField((String) rhs)) {
key = rhs.toString();
value = lhs;
}
if (key == null || value == null) {
throw new RuntimeException("Invalid expression key/value " + lhs + "/" + rhs);
}

//the parser changes numerical to bigdecimal
if (value instanceof BigDecimal) {
switch (fieldMap.get(key).type().typeId()) {
case LONG:
value = ((BigDecimal) value).longValue();
break;
case INTEGER:
value = ((BigDecimal) value).intValue();
break;
case DOUBLE:
value = ((BigDecimal) value).doubleValue();
break;
case FLOAT:
value = ((BigDecimal) value).floatValue();
break;
case DECIMAL:
break;
default:
throw new RuntimeException("Unsupported BigDecimal to Iceberg Type");
}
}

switch (comparison) {
case EQ:
return Expressions.equal(key, value);
case LTE:
return Expressions.lessThanOrEqual(key, value);
case GTE:
return Expressions.greaterThanOrEqual(key, value);
case GT:
return Expressions.greaterThan(key, value);
case LT:
return Expressions.lessThan(key, value);
case NEQ:
return Expressions.notEqual(key, value);
default:
throw new RuntimeException("Not supported");
}
}


@Override
public Object visit(final ASTAND node, final Object data) {
return Expressions.and((Expression) node.jjtGetChild(0).jjtAccept(this, data),
Expand Down Expand Up @@ -166,8 +94,10 @@ public Object visit(final ASTBETWEEN node, final Object data) {
final Object value = node.jjtGetChild(0).jjtAccept(this, data);
final Object startValue = node.jjtGetChild(1).jjtAccept(this, data);
final Object endValue = node.jjtGetChild(2).jjtAccept(this, data);
final Expression compare1 = createIcebergFilter(value, startValue, node.not ? Compare.LT : Compare.GTE);
final Expression compare2 = createIcebergFilter(value, endValue, node.not ? Compare.GT : Compare.LTE);
final Expression compare1 =
createIcebergExpression(value, startValue, node.not ? Compare.LT : Compare.GTE);
final Expression compare2 =
createIcebergExpression(value, endValue, node.not ? Compare.GT : Compare.LTE);
return (node.not)
? Expressions.or(compare1, compare2) : Expressions.and(compare1, compare2);
}
Expand Down Expand Up @@ -200,4 +130,121 @@ private Expression evalSingleTerm(final ASTCOMPARE node, final Object data) {
}
return Expressions.alwaysFalse();
}

/**
* evalString.
*
* @param node node
* @param data data
* @return eval String
*/
private Expression evalString(final SimpleNode node, final Object data) {
final Object lhs = node.jjtGetChild(0).jjtAccept(this, data);
final Compare comparison = (Compare) node.jjtGetChild(1).jjtAccept(this, data);
final Object rhs = node.jjtGetChild(2).jjtAccept(this, data);
return createIcebergExpression(lhs, rhs, comparison);
}

/**
* Check if the key is part of field.
*
* @param key input string
* @return True if key is a field.
*/
private boolean isField(final Object key) {
return (key instanceof String) && fieldMap.containsKey(((String) key).toLowerCase());
}

/**
* Check if the key is an iceberg supported date filter field.
*
* @param key input string
* @return True if key is an iceberg supported date filter field.
*/
private boolean isIcebergTimestamp(final Object key) {
return (key instanceof String) && ICEBERG_TIMESTAMP_NAMES.contains(key);
}

/**
* Get the key and value field of iceberg expression.
*
* @param lhs left hand string
* @param rhs right hand string
* @return key value pair for iceberg expression.
*/
private Pair<String, Object> getExpressionKeyValue(final Object lhs,
final Object rhs) {
if (isIcebergTimestamp(lhs)) {
return new ImmutablePair<>(lhs.toString(), ((BigDecimal) rhs).longValue());
} else if (isIcebergTimestamp(rhs)) {
return new ImmutablePair<>(rhs.toString(), ((BigDecimal) lhs).longValue());
}
if (isField(lhs)) {
return new ImmutablePair<>(lhs.toString(), getValue(lhs.toString(), rhs));
} else if (isField(rhs)) {
return new ImmutablePair<>(rhs.toString(), getValue(rhs.toString(), lhs));
}
throw new RuntimeException(
String.format("Invalid input \"%s/%s\" filter must be columns in fields %s or %s",
lhs, rhs, fieldMap.keySet().toString(), ICEBERG_TIMESTAMP_NAMES.toString()));
}

/**
* Transform the value type to iceberg type.
*
* @param key the input filter key
* @param value the input filter value
* @return iceberg type
*/
private Object getValue(final String key, final Object value) {
if (value instanceof BigDecimal) {
switch (fieldMap.get(key).type().typeId()) {
case LONG:
return ((BigDecimal) value).longValue();
case INTEGER:
return ((BigDecimal) value).intValue();
case DOUBLE:
return ((BigDecimal) value).doubleValue();
case FLOAT:
return ((BigDecimal) value).floatValue();
case DECIMAL:
return value;
default:
throw new RuntimeException("Unsupported BigDecimal to Iceberg Type");
}
}
return value;
}

/**
* Based on filter create iceberg expression.
*
* @param lhs left hand string
* @param rhs right hand string
* @param comparison comparing operator
* @return iceberg expression
*/
private Expression createIcebergExpression(final Object lhs,
final Object rhs,
final Compare comparison) {
final Pair<String, Object> keyValue = getExpressionKeyValue(lhs, rhs);
final String key = keyValue.getLeft();
final Object value = keyValue.getRight();
switch (comparison) {
case EQ:
return Expressions.equal(key, value);
case LTE:
return Expressions.lessThanOrEqual(key, value);
case GTE:
return Expressions.greaterThanOrEqual(key, value);
case GT:
return Expressions.greaterThan(key, value);
case LT:
return Expressions.lessThan(key, value);
case NEQ:
return Expressions.notEqual(key, value);
default:
throw new RuntimeException("Not supported");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import com.netflix.metacat.connector.hive.iceberg.IcebergTableOpWrapper
import com.netflix.spectator.api.NoopRegistry
import spock.lang.Specification

class IcebergPartitionFilterSpec extends Specification{
class IcebergFilterSpec extends Specification{


def 'Test iceberg filter' () {
Expand All @@ -54,7 +54,6 @@ class IcebergPartitionFilterSpec extends Specification{
_ * type.typeId() >> Type.TypeID.INTEGER
1 * app.name() >> "app"


where:
filter | resultop | resultstring
'dateint<=8 and dateint>=5' | Expression.Operation.AND | "(ref(name=\"dateint\") <= 8 and ref(name=\"dateint\") >= 5)"
Expand All @@ -70,6 +69,31 @@ class IcebergPartitionFilterSpec extends Specification{
'app==\"abc\"' | Expression.Operation.EQ | "ref(name=\"app\") == \"abc\""
'app!=\"abc\"' | Expression.Operation.NOT_EQ | "ref(name=\"app\") != \"abc\""
'app>=\"abc\"' | Expression.Operation.GT_EQ | "ref(name=\"app\") >= \"abc\""
'dateCreated>=150000' | Expression.Operation.GT_EQ | "ref(name=\"dateCreated\") >= 150000"
'lastUpdated>=150000' | Expression.Operation.GT_EQ | "ref(name=\"lastUpdated\") >= 150000"
'dateint<5 and dateCreated>=150000' | Expression.Operation.AND | "(ref(name=\"dateint\") < 5 and ref(name=\"dateCreated\") >= 150000)"
}

def 'Test iceberg filter throw exceptions' () {
def dateint = Mock(Types.NestedField)
def app = Mock(Types.NestedField)
def type = Mock(Type)
when:
def fields = [dateint, app]
final IcebergFilterGenerator generator = new IcebergFilterGenerator(fields);
Expression expression = (Expression) new PartitionParser(new StringReader(filter)).filter()
.jjtAccept(generator, null);
then:
thrown(RuntimeException)
1 * dateint.name() >> "dateint"
_ * dateint.type() >> type
_ * type.typeId() >> Type.TypeID.INTEGER
1 * app.name() >> "app"

where:
filter | resultop | resultstring
'dateintcde<=8 and dateint>=5' | Expression.Operation.AND | "(ref(name=\"dateint\") <= 8 and ref(name=\"dateint\") >= 5)"
'dateCreated_etest>=150000' | Expression.Operation.GT_EQ | "ref(name=\"dateCreated\") >= 150000"
}

def 'Test get icebergPartitionMap default iceberg summary fetch size' () {
Expand Down

0 comments on commit 3b39dc9

Please sign in to comment.