Skip to content

Commit

Permalink
[FLINK-3226] Improvements for expected types
Browse files Browse the repository at this point in the history
This closes apache#1709
  • Loading branch information
twalthr authored and vasia committed Mar 18, 2016
1 parent 3e3f076 commit b5d6a56
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import scala.collection.JavaConversions._

object TypeConverter {

val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq()).asInstanceOf[TypeInformation[Any]]

def typeInfoToSqlType(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match {
case BOOLEAN_TYPE_INFO => BOOLEAN
case BOOLEAN_VALUE_TYPE_INFO => BOOLEAN
Expand Down Expand Up @@ -83,6 +85,24 @@ object TypeConverter {
??? // TODO more types
}

/**
* Determines the return type of Flink operators based on the logical fields, the expected
* physical type and configuration parameters.
*
* For example:
* - No physical type expected, only 3 non-null fields and efficient type usage enabled
* -> return Tuple3
* - No physical type expected, efficient type usage enabled, but 3 nullable fields
* -> return Row because Tuple does not support null values
* - Physical type expected
* -> check if physical type is compatible and return it
*
* @param logicalRowType logical row information
* @param expectedPhysicalType expected physical type
* @param nullable fields can be nullable
* @param useEfficientTypes use the most efficient types (e.g. Tuples and value types)
* @return suitable return type
*/
def determineReturnType(
logicalRowType: RelDataType,
expectedPhysicalType: Option[TypeInformation[Any]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class DataSetFlatMap(

override def translateToPlan(config: TableConfig,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType)
val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config)
val returnType = determineReturnType(
getRowType,
expectedType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.table.{TableConfig, Row}
import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.api.table.plan.{PlanGenException, TypeConverter}
import org.apache.flink.api.table.typeinfo.RowTypeInfo
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.{Row, TableConfig}

import scala.collection.JavaConverters._
import org.apache.flink.api.table.plan.TypeConverter

/**
* Flink RelNode which matches along with ReduceGroupOperator.
Expand Down Expand Up @@ -67,7 +66,16 @@ class DataSetGroupReduce(
config: TableConfig,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {

val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType)
expectedType match {
case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
throw new PlanGenException("GroupReduce operations currently only support returning Rows.")
case _ => // ok
}

val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
config,
// tell the input operator that this operator currently only supports Rows as input
Some(TypeConverter.DEFAULT_ROW_TYPE))

// get the output types
val fieldsNames = rowType.getFieldNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ import org.apache.flink.api.table.TableConfig
trait DataSetRel extends RelNode {

/**
* Translate the FlinkRelNode into Flink operator.
* Translates the FlinkRelNode into a Flink operator.
*
* @param config runtime configuration
* @param expectedType specifies the type the Flink operator should return. The type must
* have the same arity as the result. For instance, if the
* expected type is a RowTypeInfo this method will return a DataSet of
* type Row. If the expected type is Tuple2, the operator will return
* a Tuple2 if possible. Row otherwise.
* @return DataSet of type expectedType or RowTypeInfo
*/
def translateToPlan(
config: TableConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.api.table.Row
import org.apache.flink.test.util.MultipleProgramsTestBase
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import TableProgramsTestBase.TableConfigMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import scala.collection.JavaConverters._
import org.apache.flink.api.table.test.TableProgramsTestBase
import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode

@RunWith(classOf[Parameterized])
class CalcITCase(
Expand Down Expand Up @@ -70,7 +69,7 @@ class CalcITCase(
}

@Test
def TestCalcWithAggregation(): Unit = {
def testCalcWithAggregation(): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment
val t = CollectionDataSets.get3TupleDataSet(env).toTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.flink.api.table.Row
import org.apache.flink.api.table.expressions.Literal
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import TableProgramsTestBase.TableConfigMode
import org.apache.flink.api.table.test.utils.TableProgramsTestBase
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
Expand Down

0 comments on commit b5d6a56

Please sign in to comment.