Skip to content

Commit

Permalink
[FLINK-8255] Throw meaningful exception when using string keys on row…
Browse files Browse the repository at this point in the history
… types
  • Loading branch information
snuyanzin authored and aljoscha committed Jan 8, 2020
1 parent 9f99d59 commit fd91220
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> co
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public ReduceOperator<T> minBy(int... fields) {
if (!getType().isTupleType()) {
if (!getType().isTupleType() || !(getType() instanceof TupleTypeInfo)) {
throw new InvalidProgramException("DataSet#minBy(int...) only works on Tuple types.");
}

Expand Down Expand Up @@ -557,7 +557,7 @@ public ReduceOperator<T> minBy(int... fields) {
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public ReduceOperator<T> maxBy(int... fields) {
if (!getType().isTupleType()) {
if (!getType().isTupleType() || !(getType() instanceof TupleTypeInfo)) {
throw new InvalidProgramException("DataSet#maxBy(int...) only works on Tuple types.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public GroupReduceOperator<T, T> first(int n) {
public ReduceOperator<T> minBy(int... fields) {

// Check for using a tuple
if (!this.inputDataSet.getType().isTupleType()) {
if (!this.inputDataSet.getType().isTupleType() || !(this.inputDataSet.getType() instanceof TupleTypeInfo)) {
throw new InvalidProgramException("Method minBy(int) only works on tuples.");
}

Expand All @@ -243,7 +243,7 @@ public ReduceOperator<T> minBy(int... fields) {
public ReduceOperator<T> maxBy(int... fields) {

// Check for using a tuple
if (!this.inputDataSet.getType().isTupleType()) {
if (!this.inputDataSet.getType().isTupleType() || !(this.inputDataSet.getType() instanceof TupleTypeInfo)) {
throw new InvalidProgramException("Method maxBy(int) only works on tuples.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.types.Row;

import org.junit.Assert;
import org.junit.Test;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
Expand Down Expand Up @@ -204,6 +209,43 @@ public void testOutOfTupleBoundsGrouping3() {
groupDs.maxBy(1, 2, 3, 4, -1);
}

/**
* Validates that no ClassCastException happens
* should not fail e.g. like in FLINK-8255.
*/
@Test(expected = InvalidProgramException.class)
public void testMaxByRowTypeInfoKeyFieldsDataset() {

final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT};

String[] fieldNames = new String[]{"id", "value"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
DataSet tupleDs = env
.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo);

tupleDs.maxBy(0);
}

/**
* Validates that no ClassCastException happens
* should not fail e.g. like in FLINK-8255.
*/
@Test(expected = InvalidProgramException.class)
public void testMaxByRowTypeInfoKeyFieldsForUnsortedGrouping() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

TypeInformation[] types = new TypeInformation[]{Types.INT, Types.INT};

String[] fieldNames = new String[]{"id", "value"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);

UnsortedGrouping groupDs = env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo).groupBy(0);

groupDs.maxBy(1);
}

/**
* Custom data type, for testing purposes.
*/
Expand All @@ -229,5 +271,4 @@ public String toString() {
return myInt + "," + myLong + "," + myString;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.types.Row;

import org.junit.Assert;
import org.junit.Test;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
Expand Down Expand Up @@ -204,6 +209,43 @@ public void testOutOfTupleBoundsGrouping3() {
groupDs.minBy(1, 2, 3, 4, -1);
}

/**
* Validates that no ClassCastException happens
* should not fail e.g. like in FLINK-8255.
*/
@Test(expected = InvalidProgramException.class)
public void testMinByRowTypeInfoKeyFieldsDataset() {

final ExecutionEnvironment env = ExecutionEnvironment
.getExecutionEnvironment();
TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT};

String[] fieldNames = new String[]{"id", "value"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
DataSet tupleDs = env
.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo);

tupleDs.minBy(0);
}

/**
* Validates that no ClassCastException happens
* should not fail e.g. like in FLINK-8255.
*/
@Test(expected = InvalidProgramException.class)
public void testMinByRowTypeInfoKeyFieldsForUnsortedGrouping() {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

TypeInformation[] types = new TypeInformation[]{Types.INT, Types.INT};

String[] fieldNames = new String[]{"id", "value"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);

UnsortedGrouping groupDs = env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo).groupBy(0);

groupDs.minBy(1);
}

/**
* Custom data type, for testing purposes.
*/
Expand All @@ -229,5 +271,4 @@ public String toString() {
return myInt + "," + myLong + "," + myString;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public static <T, F> FieldAccessor<T, F> getAccessor(TypeInformation<T> typeInfo
}

// In case of tuples
} else if (typeInfo.isTupleType()) {
} else if (typeInfo.isTupleType() && typeInfo instanceof TupleTypeInfo) {
TupleTypeInfo tupleTypeInfo = (TupleTypeInfo) typeInfo;
FieldExpression decomp = decomposeFieldExpression(field);
int fieldPos = tupleTypeInfo.getFieldIndex(decomp.head);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;

Expand Down Expand Up @@ -386,4 +387,23 @@ public void testIllegalBasicType2() {

FieldAccessor<Long, Long> f = FieldAccessorFactory.getAccessor(tpeInfo, "foo", null);
}

/**
* Validates that no ClassCastException happens
* should not fail e.g. like in FLINK-8255.
*/
@Test(expected = CompositeType.InvalidFieldReferenceException.class)
public void testRowTypeInfo() {
TypeInformation<?>[] typeList = new TypeInformation<?>[]{
new RowTypeInfo(
BasicTypeInfo.SHORT_TYPE_INFO,
BasicTypeInfo.BIG_DEC_TYPE_INFO)
};

String[] fieldNames = new String[]{"row"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(typeList, fieldNames);

FieldAccessor f = FieldAccessorFactory.getAccessor(rowTypeInfo, "row.0", null);
}

}

0 comments on commit fd91220

Please sign in to comment.