Skip to content

Commit

Permalink
[tests] SumMinMaxITCase to use collect() rather than flakey temp files
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Apr 9, 2015
1 parent 9110269 commit 1cff478
Showing 1 changed file with 16 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,54 +20,37 @@ package org.apache.flink.api.scala.operators

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase}
import org.junit.{Test, After, Before, Rule}
import org.junit.rules.TemporaryFolder
import org.apache.flink.test.util.MultipleProgramsTestBase

import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.Assert._

import org.apache.flink.api.scala._


@RunWith(classOf[Parameterized])
class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
private var resultPath: String = null
private var expected: String = null
private val _tempFolder = new TemporaryFolder()

@Rule
def tempFolder = _tempFolder

@Before
def before(): Unit = {
resultPath = tempFolder.newFile().toURI.toString
}

@After
def after(): Unit = {
compareResultsByLinesInMemory(expected, resultPath)
}

@Test
def testFullAggregate(): Unit = {
// Full aggregate
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env)

val aggregateDs = ds
val aggregateDs : DataSet[(Int, Long)] = ds
.sum(0)
.andMax(1)
// Ensure aggregate operator correctly copies other fields
.filter(_._3 != null)
.map{ t => (t._1, t._2) }

aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)

env.execute()
val result: Seq[(Int, Long)] = aggregateDs.collect

expected = "231,6\n"
assertEquals(1, result.size)
assertEquals((231, 6L), result.head)
}

@Test
Expand All @@ -83,11 +66,10 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
.filter(_._3 != null)
.map { t => (t._2, t._1) }

aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)

env.execute()
val result : Seq[(Long, Int)] = aggregateDs.collect.sortWith((a, b) => a._1 < b._1)

expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
val expected : Seq[(Long, Int)] = Seq((1L, 1), (2L, 5), (3L, 15), (4L, 34), (5L, 65), (6L, 111))
assertEquals(expected, result)
}

@Test
Expand All @@ -96,18 +78,17 @@ class SumMinMaxITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = CollectionDataSets.get3TupleDataSet(env)

val aggregateDs = ds
val aggregateDs: DataSet[Int] = ds
.groupBy(1)
.min(0)
.min(0)
// Ensure aggregate operator correctly copies other fields
.filter(_._3 != null)
.map { t => new Tuple1(t._1) }

aggregateDs.writeAsCsv(resultPath, writeMode = WriteMode.OVERWRITE)
.map { t => t._1 }

env.execute()
val result: Seq[Int] = aggregateDs.collect

expected = "1\n"
assertEquals(1, result.size)
assertEquals(Seq(1), result)
}
}

0 comments on commit 1cff478

Please sign in to comment.