From ca4fa3da0f32dd6c07b84768fe8d089d9cb8c482 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 16 Sep 2014 14:15:06 +0200 Subject: [PATCH] Rename ScalaTupleTypeInfo to CaseClassTypeInfo This better reflects what it is actually for. It is still derived from TupleTypeInfoBase, though. --- .../scala/org/apache/flink/api/scala/DataSet.scala | 2 +- .../org/apache/flink/api/scala/GroupedDataSet.scala | 2 +- .../org/apache/flink/api/scala/coGroupDataSet.scala | 6 +++--- .../flink/api/scala/codegen/TypeInformationGen.scala | 6 +++--- .../org/apache/flink/api/scala/crossDataSet.scala | 6 +++--- .../org/apache/flink/api/scala/joinDataSet.scala | 6 +++--- .../scala/org/apache/flink/api/scala/package.scala | 4 ++-- ...pleComparator.scala => CaseClassComparator.scala} | 6 +++--- ...pleSerializer.scala => CaseClassSerializer.scala} | 8 ++++---- ...laTupleTypeInfo.scala => CaseClassTypeInfo.scala} | 12 ++++++------ .../flink/api/scala/unfinishedKeyPairOperation.scala | 2 +- .../scala/runtime/GenericPairComparatorTest.scala | 6 +++--- .../runtime/tuple/base/TupleComparatorTestBase.scala | 2 +- 13 files changed, 34 insertions(+), 34 deletions(-) rename flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/{ScalaTupleComparator.scala => CaseClassComparator.scala} (96%) rename flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/{ScalaTupleSerializer.scala => CaseClassSerializer.scala} (90%) rename flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/{ScalaTupleTypeInfo.scala => CaseClassTypeInfo.scala} (88%) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala index 0d0519c291b81..8aabce492d2b3 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala @@ -29,7 +29,7 @@ import org.apache.flink.api.java.operators.Keys.FieldPositionKeys import org.apache.flink.api.java.operators._ import org.apache.flink.api.java.{DataSet => JavaDataSet} import org.apache.flink.api.scala.operators.{ScalaCsvOutputFormat, ScalaAggregateOperator} -import org.apache.flink.api.scala.typeutils.ScalaTupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.core.fs.{FileSystem, Path} import org.apache.flink.types.TypeInformation diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala index 3c5bf9e322323..88a8c7ccd5ccf 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.scala import org.apache.flink.api.common.InvalidProgramException import org.apache.flink.api.scala.operators.ScalaAggregateOperator -import org.apache.flink.api.scala.typeutils.ScalaTupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import scala.collection.JavaConverters._ diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala index 582edac0ef23d..7613c548655ad 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/coGroupDataSet.scala @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.operators._ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo import org.apache.flink.api.java.{DataSet => JavaDataSet} -import org.apache.flink.api.scala.typeutils.{ScalaTupleSerializer, ScalaTupleTypeInfo} +import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo} import org.apache.flink.types.TypeInformation import org.apache.flink.util.Collector @@ -177,7 +177,7 @@ private[flink] class UnfinishedCoGroupOperationImpl[T: ClassTag, O: ClassTag]( val rightArrayType = ObjectArrayTypeInfo.getInfoFor(new Array[O](0).getClass, rightSet.set.getType) - val returnType = new ScalaTupleTypeInfo[(Array[T], Array[O])]( + val returnType = new CaseClassTypeInfo[(Array[T], Array[O])]( classOf[(Array[T], Array[O])], Seq(leftArrayType, rightArrayType), Array("_1", "_2")) { override def createSerializer: TypeSerializer[(Array[T], Array[O])] = { @@ -186,7 +186,7 @@ private[flink] class UnfinishedCoGroupOperationImpl[T: ClassTag, O: ClassTag]( fieldSerializers(i) = types(i).createSerializer } - new ScalaTupleSerializer[(Array[T], Array[O])]( + new CaseClassSerializer[(Array[T], Array[O])]( classOf[(Array[T], Array[O])], fieldSerializers) { override def createInstance(fields: Array[AnyRef]) = { diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala index 235caa729ffc3..cfa5a21f5eeb2 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.scala.codegen import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils._ -import org.apache.flink.api.scala.typeutils.{ScalaTupleSerializer, ScalaTupleTypeInfo} +import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo} import org.apache.flink.types.{Value, TypeInformation} import org.apache.hadoop.io.Writable @@ -70,14 +70,14 @@ private[flink] trait TypeInformationGen[C <: Context] { val fieldNames = desc.getters map { f => Literal(Constant(f.getter.name.toString)) } toList val fieldNamesExpr = c.Expr[Seq[String]](mkSeq(fieldNames)) reify { - new ScalaTupleTypeInfo[T](tpeClazz.splice, fieldsExpr.splice, fieldNamesExpr.splice) { + new CaseClassTypeInfo[T](tpeClazz.splice, fieldsExpr.splice, fieldNamesExpr.splice) { override def createSerializer: TypeSerializer[T] = { val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity) for (i <- 0 until getArity) { fieldSerializers(i) = types(i).createSerializer } - new ScalaTupleSerializer[T](tupleType, fieldSerializers) { + new CaseClassSerializer[T](tupleType, fieldSerializers) { override def createInstance(fields: Array[AnyRef]): T = { instance.splice } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala index df34587668833..97e48b10e466e 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/crossDataSet.scala @@ -22,7 +22,7 @@ import org.apache.flink.api.common.functions.{RichCrossFunction, CrossFunction} import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.operators._ import org.apache.flink.api.java.{DataSet => JavaDataSet} -import org.apache.flink.api.scala.typeutils.{ScalaTupleSerializer, ScalaTupleTypeInfo} +import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo} import org.apache.flink.types.TypeInformation import org.apache.flink.util.Collector @@ -109,7 +109,7 @@ private[flink] object CrossDataSetImpl { (left, right) } } - val returnType = new ScalaTupleTypeInfo[(T, O)]( + val returnType = new CaseClassTypeInfo[(T, O)]( classOf[(T, O)], Seq(leftSet.getType, rightSet.getType), Array("_1", "_2")) { override def createSerializer: TypeSerializer[(T, O)] = { @@ -118,7 +118,7 @@ private[flink] object CrossDataSetImpl { fieldSerializers(i) = types(i).createSerializer } - new ScalaTupleSerializer[(T, O)](classOf[(T, O)], fieldSerializers) { + new CaseClassSerializer[(T, O)](classOf[(T, O)], fieldSerializers) { override def createInstance(fields: Array[AnyRef]) = { (fields(0).asInstanceOf[T], fields(1).asInstanceOf[O]) } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala index a607301d846fb..fa63c4fefcec7 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/joinDataSet.scala @@ -25,7 +25,7 @@ import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin.WrappingFlat import org.apache.flink.api.java.operators.JoinOperator.{EquiJoin, JoinHint} import org.apache.flink.api.java.operators._ import org.apache.flink.api.java.{DataSet => JavaDataSet} -import org.apache.flink.api.scala.typeutils.{ScalaTupleSerializer, ScalaTupleTypeInfo} +import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassTypeInfo} import org.apache.flink.types.TypeInformation import org.apache.flink.util.Collector @@ -179,7 +179,7 @@ private[flink] class UnfinishedJoinOperationImpl[T, O]( out.collect((left, right)) } } - val returnType = new ScalaTupleTypeInfo[(T, O)]( + val returnType = new CaseClassTypeInfo[(T, O)]( classOf[(T, O)], Seq(leftSet.set.getType, rightSet.set.getType), Array("_1", "_2")) { override def createSerializer: TypeSerializer[(T, O)] = { @@ -188,7 +188,7 @@ private[flink] class UnfinishedJoinOperationImpl[T, O]( fieldSerializers(i) = types(i).createSerializer } - new ScalaTupleSerializer[(T, O)](classOf[(T, O)], fieldSerializers) { + new CaseClassSerializer[(T, O)](classOf[(T, O)], fieldSerializers) { override def createInstance(fields: Array[AnyRef]) = { (fields(0).asInstanceOf[T], fields(1).asInstanceOf[O]) } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala index 73dd721c8a746..8bb69f91ffad1 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala @@ -21,7 +21,7 @@ package org.apache.flink.api import _root_.scala.reflect.ClassTag import language.experimental.macros import org.apache.flink.types.TypeInformation -import org.apache.flink.api.scala.typeutils.{ScalaTupleTypeInfo, TypeUtils} +import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils} import org.apache.flink.api.java.{DataSet => JavaDataSet} package object scala { @@ -36,7 +36,7 @@ package object scala { typeInfo: TypeInformation[_], fields: Array[String]): Array[Int] = { typeInfo match { - case ti: ScalaTupleTypeInfo[_] => + case ti: CaseClassTypeInfo[_] => ti.getFieldIndices(fields) case _ => diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala similarity index 96% rename from flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala rename to flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala index 1c8f8df422c4f..ae7892f2878e5 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleComparator.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassComparator.scala @@ -24,10 +24,10 @@ import org.apache.flink.types.{KeyFieldOutOfBoundsException, NullKeyFieldExcepti ; /** - * Comparator for Scala Tuples. Access is different from + * Comparator for Case Classes. Access is different from * our Java Tuples so we have to treat them differently. */ -class ScalaTupleComparator[T <: Product]( +class CaseClassComparator[T <: Product]( keys: Array[Int], scalaComparators: Array[TypeComparator[_]], scalaSerializers: Array[TypeSerializer[_]] ) @@ -39,7 +39,7 @@ class ScalaTupleComparator[T <: Product]( def duplicate: TypeComparator[T] = { // ensure that the serializers are available instantiateDeserializationUtils() - val result = new ScalaTupleComparator[T](keyPositions, comparators, serializers) + val result = new CaseClassComparator[T](keyPositions, comparators, serializers) result.privateDuplicate(this) result } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala similarity index 90% rename from flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala rename to flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala index 90d3b5bf97f3e..f910b106f59c0 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala @@ -23,13 +23,13 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView} ; /** - * Serializer for Scala Tuples. Creation and access is different from + * Serializer for Case Classes. Creation and access is different from * our Java Tuples so we have to treat them differently. */ -abstract class ScalaTupleSerializer[T <: Product]( - tupleClass: Class[T], +abstract class CaseClassSerializer[T <: Product]( + clazz: Class[T], scalaFieldSerializers: Array[TypeSerializer[_]]) - extends TupleSerializerBase[T](tupleClass, scalaFieldSerializers) { + extends TupleSerializerBase[T](clazz, scalaFieldSerializers) { def createInstance: T = { val fields: Array[AnyRef] = new Array(arity) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala similarity index 88% rename from flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala rename to flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala index ad407cbe07252..a5e7793f12a49 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaTupleTypeInfo.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassTypeInfo.scala @@ -22,14 +22,14 @@ import org.apache.flink.types.TypeInformation import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} /** - * TypeInformation for Scala Tuples. Creation and access is different from + * TypeInformation for Case Classes. Creation and access is different from * our Java Tuples so we have to treat them differently. */ -abstract class ScalaTupleTypeInfo[T <: Product]( - tupleClass: Class[T], +abstract class CaseClassTypeInfo[T <: Product]( + clazz: Class[T], fieldTypes: Seq[TypeInformation[_]], val fieldNames: Seq[String]) - extends TupleTypeInfoBase[T](tupleClass, fieldTypes: _*) { + extends TupleTypeInfoBase[T](clazz, fieldTypes: _*) { def createComparator(logicalKeyFields: Array[Int], orders: Array[Boolean]): TypeComparator[T] = { // sanity checks @@ -72,14 +72,14 @@ abstract class ScalaTupleTypeInfo[T <: Product]( fieldSerializers(i) = types(i).createSerializer } - new ScalaTupleComparator[T](logicalKeyFields, fieldComparators, fieldSerializers) + new CaseClassComparator[T](logicalKeyFields, fieldComparators, fieldSerializers) } def getFieldIndices(fields: Array[String]): Array[Int] = { val result = fields map { x => fieldNames.indexOf(x) } if (result.contains(-1)) { throw new IllegalArgumentException("Fields '" + fields.mkString(", ") + - "' are not valid for " + tupleClass + " with fields '" + fieldNames.mkString(", ") + "'.") + "' are not valid for " + clazz + " with fields '" + fieldNames.mkString(", ") + "'.") } result } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala index 85414816573cd..3bf3db27c8963 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/unfinishedKeyPairOperation.scala @@ -24,7 +24,7 @@ import org.apache.flink.api.java.{DataSet => JavaDataSet} import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.operators.Keys import org.apache.flink.api.java.operators.Keys.FieldPositionKeys -import org.apache.flink.api.scala.typeutils.ScalaTupleTypeInfo +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.types.TypeInformation /** diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala index 95a1b7711586f..c130b7e758ea8 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/GenericPairComparatorTest.scala @@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeutils.base.{DoubleComparator, DoubleSeria import org.apache.flink.api.java.typeutils.runtime.{GenericPairComparator, TupleComparator} import org.apache.flink.api.scala.runtime.tuple.base.PairComparatorTestBase -import org.apache.flink.api.scala.typeutils.ScalaTupleComparator +import org.apache.flink.api.scala.typeutils.CaseClassComparator class GenericPairComparatorTest extends PairComparatorTestBase[(Int, String, Double), (Int, Float, Long, Double)] { @@ -42,8 +42,8 @@ class GenericPairComparatorTest val sers2 = Array[TypeSerializer[_]](IntSerializer.INSTANCE, DoubleSerializer.INSTANCE) - val comp1 = new ScalaTupleComparator[(Int, String, Double)](fields1, comps1, sers1) - val comp2 = new ScalaTupleComparator[(Int, Float, Long, Double)](fields2, comps2, sers2) + val comp1 = new CaseClassComparator[(Int, String, Double)](fields1, comps1, sers1) + val comp2 = new CaseClassComparator[(Int, Float, Long, Double)](fields2, comps2, sers2) new GenericPairComparator[(Int, String, Double), (Int, Float, Long, Double)](comp1, comp2) } diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala index 5370e612d953f..a14238486a17b 100644 --- a/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/TupleComparatorTestBase.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.scala.runtime.tuple.base import org.apache.flink.api.common.typeutils.ComparatorTestBase -import org.apache.flink.api.scala.typeutils.{ScalaTupleSerializer, ScalaTupleComparator} +import org.apache.flink.api.scala.typeutils.{CaseClassSerializer, CaseClassComparator} import org.junit.Assert._ abstract class TupleComparatorTestBase[T <: Product] extends ComparatorTestBase[T] {