Skip to content

Commit

Permalink
[scala] Add missing operator IT Cases
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Oct 9, 2014
1 parent f562d49 commit d749c24
Show file tree
Hide file tree
Showing 19 changed files with 2,859 additions and 348 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -116,7 +115,7 @@ else if (other instanceof ExpressionKeys) {

if(keyType.isTupleType()) {
// special case again:
TupleTypeInfo<?> tupleKeyType = (TupleTypeInfo<?>) keyType;
TupleTypeInfoBase<?> tupleKeyType = (TupleTypeInfoBase<?>) keyType;
List<FlatFieldDescriptor> keyTypeFields = new ArrayList<FlatFieldDescriptor>(tupleKeyType.getTotalFields());
tupleKeyType.getKey(ExpressionKeys.SELECT_ALL_CHAR, 0, keyTypeFields);
if(expressionKeys.keyFields.size() != keyTypeFields.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.flink.api.java.operators.Keys.ExpressionKeys
import org.apache.flink.api.java.operators._
import org.apache.flink.api.java.{DataSet => JavaDataSet}
import org.apache.flink.api.scala.operators.{ScalaCsvOutputFormat, ScalaAggregateOperator}
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.api.common.typeinfo.TypeInformation
Expand Down Expand Up @@ -221,6 +222,16 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
this
}

def withParameters(parameters: Configuration): DataSet[T] = {
javaSet match {
case udfOp: UdfOperator[_] => udfOp.withParameters(parameters)
case _ =>
throw new UnsupportedOperationException("Operator " + javaSet.toString + " cannot have " +
"parameters")
}
this
}

// --------------------------------------------------------------------------------------------
// Filter & Transformations
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.flink.api.scala.codegen

import java.lang.reflect.{Field, Modifier}

import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
Expand All @@ -29,6 +31,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.hadoop.io.Writable

import scala.collection.JavaConverters._
import scala.collection.mutable

import scala.reflect.macros.Context

Expand Down Expand Up @@ -154,17 +157,40 @@ private[flink] trait TypeInformationGen[C <: Context] {
val fields = fieldsList.splice
val clazz: Class[T] = tpeClazz.splice

val fieldMap = TypeExtractor.getAllDeclaredFields(clazz).asScala map {
f => (f.getName, f)
} toMap

val pojoFields = fields map {
case (fName, fTpe) =>
new PojoField(fieldMap(fName), fTpe)
var traversalClazz: Class[_] = clazz
val clazzFields = mutable.Map[String, Field]()

var error = false
while (traversalClazz != null) {
for (field <- traversalClazz.getDeclaredFields) {
if (clazzFields.contains(field.getName)) {
println(s"The field $field is already contained in the " +
s"hierarchy of the class ${clazz}. Please use unique field names throughout " +
"your class hierarchy")
error = true
}
clazzFields += (field.getName -> field)
}
traversalClazz = traversalClazz.getSuperclass
}

new PojoTypeInfo(clazz, pojoFields.asJava)
if (error) {
new GenericTypeInfo(clazz)
} else {
val pojoFields = fields flatMap {
case (fName, fTpe) =>
val field = clazzFields(fName)
if (Modifier.isTransient(field.getModifiers) || Modifier.isStatic(field.getModifiers)) {
// ignore transient and static fields
// the TypeAnalyzer for some reason does not always detect transient fields
None
} else {
Some(new PojoField(clazzFields(fName), fTpe))
}
}

new PojoTypeInfo(clazz, pojoFields.asJava)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private[flink] class HalfUnfinishedKeyPairOperation[L, R, O](
* Specify the key selector function for the right side of the key based operation. This returns
* the finished operation.
*/
def equalTo[K: TypeInformation](fun: (R) => K) = {
def equalTo[K: TypeInformation](fun: (R) => K): O = {
val keyType = implicitly[TypeInformation[K]]
val keyExtractor = new KeySelector[R, K] {
def getKey(in: R) = fun(in)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class AggregateITCase(config: Configuration) extends JavaProgramTestBase(config)
}

protected def testProgram(): Unit = {
expectedResult = DistinctProgs.runProgram(curProgId, resultPath)
expectedResult = AggregateProgs.runProgram(curProgId, resultPath)
}

protected override def postSubmit(): Unit = {
Expand All @@ -129,7 +129,7 @@ object AggregateITCase {
@Parameters
def getConfigurations: java.util.Collection[Array[AnyRef]] = {
val configs = mutable.MutableList[Array[AnyRef]]()
for (i <- 1 to DistinctProgs.NUM_PROGRAMS) {
for (i <- 1 to AggregateProgs.NUM_PROGRAMS) {
val config = new Configuration()
config.setInteger("ProgramId", i)
configs += Array(config)
Expand Down
Loading

0 comments on commit d749c24

Please sign in to comment.