Skip to content

Commit

Permalink
[FLINK-8242] [orc] Fix predicate translation if literal is not Serial…
Browse files Browse the repository at this point in the history
…izable.

This closes apache#5345.
  • Loading branch information
fhueske committed Feb 5, 2018
1 parent de3d85b commit 63a19e8
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.orc.TypeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.ArrayList;
Expand Down Expand Up @@ -80,6 +82,8 @@
public class OrcTableSource
implements BatchTableSource<Row>, ProjectableTableSource<Row>, FilterableTableSource<Row> {

private static final Logger LOG = LoggerFactory.getLogger(OrcTableSource.class);

private static final int DEFAULT_BATCH_SIZE = 1000;

// path to read ORC files from
Expand Down Expand Up @@ -192,7 +196,10 @@ public TableSource<Row> applyPredicate(List<Expression> predicates) {
for (Expression pred : predicates) {
Predicate orcPred = toOrcPredicate(pred);
if (orcPred != null) {
LOG.info("Predicate [{}] converted into OrcPredicate [{}] and pushed into OrcTableSource for path {}.", pred, orcPred, path);
orcPredicates.add(orcPred);
} else {
LOG.info("Predicate [{}] could not be pushed into OrcTableSource for path {}.", pred, path);
}
}

Expand Down Expand Up @@ -241,17 +248,32 @@ private Predicate toOrcPredicate(Expression pred) {

if (!isValid(binComp)) {
// not a valid predicate
LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred);
return null;
}
PredicateLeaf.Type litType = getLiteralType(binComp);
if (litType == null) {
// unsupported literal type
LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred);
return null;
}

boolean literalOnRight = literalOnRight(binComp);
String colName = getColumnName(binComp);
Serializable literal = (Serializable) getLiteral(binComp);

// fetch literal and ensure it is serializable
Object literalObj = getLiteral(binComp);
Serializable literal;
// validate that literal is serializable
if (literalObj instanceof Serializable) {
literal = (Serializable) literalObj;
} else {
LOG.warn("Encountered a non-serializable literal of type {}. " +
"Cannot push predicate [{}] into OrcTableSource. " +
"This is a bug and should be reported.",
literalObj.getClass().getCanonicalName(), pred);
return null;
}

if (pred instanceof EqualTo) {
return new OrcRowInputFormat.Equals(colName, litType, literal);
Expand Down Expand Up @@ -288,18 +310,21 @@ private Predicate toOrcPredicate(Expression pred) {
}
} else {
// unsupported predicate
LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred);
return null;
}
} else if (pred instanceof UnaryExpression) {

UnaryExpression unary = (UnaryExpression) pred;
if (!isValid(unary)) {
// not a valid predicate
LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred);
return null;
}
PredicateLeaf.Type colType = toOrcType(((UnaryExpression) pred).child().resultType());
if (colType == null) {
// unsupported type
LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred);
return null;
}

Expand All @@ -312,10 +337,12 @@ private Predicate toOrcPredicate(Expression pred) {
new OrcRowInputFormat.IsNull(colName, colType));
} else {
// unsupported predicate
LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred);
return null;
}
} else {
// unsupported predicate
LOG.debug("Unsupported predicate [{}] cannot be pushed into OrcTableSource.", pred);
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,15 @@ public void testApplyPredicate() throws Exception {
.forOrcSchema(TEST_SCHEMA_NESTED)
.build();

// expressions for predicates
// expressions for supported predicates
Expression pred1 = new GreaterThan(
new ResolvedFieldReference("int1", Types.INT),
new Literal(100, Types.INT));
Expression pred2 = new EqualTo(
new ResolvedFieldReference("string1", Types.STRING),
new Literal("hello", Types.STRING));
Expression pred3 = new EqualTo(
// unsupported predicate
Expression unsupportedPred = new EqualTo(
new GetCompositeField(
new ItemAt(
new ResolvedFieldReference(
Expand All @@ -195,10 +196,18 @@ public void testApplyPredicate() throws Exception {
"int1"),
new Literal(1, Types.INT)
);
// invalid predicate
Expression invalidPred = new EqualTo(
new ResolvedFieldReference("long1", Types.LONG),
// some invalid, non-serializable literal (here an object of this test class)
new Literal(new OrcTableSourceTest(), Types.LONG)
);

ArrayList<Expression> preds = new ArrayList<>();
preds.add(pred1);
preds.add(pred2);
preds.add(pred3);
preds.add(unsupportedPred);
preds.add(invalidPred);

// apply predicates on TableSource
OrcTableSource projected = (OrcTableSource) orc.applyPredicate(preds);
Expand All @@ -214,7 +223,7 @@ public void testApplyPredicate() throws Exception {
Types.ROW_NAMED(getNestedFieldNames(), getNestedFieldTypes()),
projected.getReturnType());

// ensure IF is configured with supported predicates
// ensure IF is configured with valid/supported predicates
OrcTableSource spyTS = spy(projected);
OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class);
doReturn(mockIF).when(spyTS).buildOrcInputFormat();
Expand Down

0 comments on commit 63a19e8

Please sign in to comment.