Skip to content

Commit

Permalink
[hotfix] [scala api] Move tests to correct package
Browse files Browse the repository at this point in the history
We previously had all Scala API unit tests in the flink-tests
project, because Eclipse could not use macros in 'test' that were
declared in 'main'.

Because we do not support Eclipse for development of the system any
more (only for using Flink to develop Flink-based applications),
we can now move the tests to their natural location and simplify
some of the dependency structures.
  • Loading branch information
StephanEwen committed Oct 14, 2017
1 parent f81af45 commit 6a93597
Show file tree
Hide file tree
Showing 29 changed files with 148 additions and 311 deletions.
22 changes: 20 additions & 2 deletions flink-scala/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,15 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
<version>${chill.version}</version>
<scope>test</scope>
</dependency>

Expand All @@ -89,6 +95,18 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.joda</groupId>
<artifactId>joda-convert</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.operator
package org.apache.flink.api.scala

import org.apache.flink.api.common.InvalidProgramException
import org.apache.flink.api.scala._

import org.junit.Test
import org.junit.Assert

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.operator
package org.apache.flink.api.scala

import org.apache.flink.api.common.InvalidProgramException
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

import org.junit.Test
import org.junit.Assert

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.operator
package org.apache.flink.api.scala

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
import org.apache.flink.api.scala.{SelectByMaxFunction, SelectByMinFunction}
import org.apache.flink.api.scala._

import org.junit.{Assert, Test}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.{File, FileOutputStream, FileWriter, OutputStreamWriter}

import org.apache.flink.api.java.io.PojoCsvInputFormat
import org.apache.flink.api.java.io.TupleCsvInputFormat
import org.apache.flink.api.java.io.CsvInputFormatTest.TwitterPOJO
import org.apache.flink.api.java.typeutils.PojoTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
Expand Down Expand Up @@ -535,3 +534,32 @@ class CsvInputFormatTest {
}

}

class POJO (var table: String, var time: String) {

def this() {
this("", "")
}

override def equals(obj: Any): Boolean = {
obj match {
case that: POJO => table == that.table && time == that.time
case _ => false
}
}
}

class TwitterPOJO(theTable: String, theTime: String, var tweet: String)
extends POJO(theTable, theTime) {

def this() {
this("", "", "")
}

override def equals(obj: Any): Boolean = {
obj match {
case that: TwitterPOJO => table == that.table && time == that.time && tweet == that.tweet
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@

package org.apache.flink.api.scala.metrics

import org.apache.flink.metrics.Gauge
import org.apache.flink.runtime.metrics.{MetricRegistry, MetricRegistryConfiguration}
import org.apache.flink.runtime.metrics.groups.GenericMetricGroup
import org.apache.flink.util.TestLogger

import org.junit.Test
import org.scalatest.junit.JUnitSuiteLike

class ScalaGaugeTest extends TestLogger with JUnitSuiteLike {
class ScalaGaugeTest extends TestLogger {

@Test
def testGaugeCorrectValue(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@
*/
package org.apache.flink.api.scala.runtime

import java.io._

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeutils.SerializerTestInstance
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer

import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}

import org.joda.time.LocalDate

import org.junit.Test

import scala.collection.mutable
import scala.io.Source
import scala.reflect._

class KryoGenericTypeSerializerTest {
Expand Down Expand Up @@ -148,95 +146,6 @@ class KryoGenericTypeSerializerTest {
runTests(list)
}

/**
* Tests that the registered classes in Kryo did not change.
*
* Once we have proper serializer versioning this test will become obsolete.
* But currently a change in the serializers can break savepoint backwards
* compatability between Flink versions.
*/
@Test
def testDefaultKryoRegisteredClassesDidNotChange(): Unit = {
// Previous registration (id => registered class (Class#getName))
val previousRegistrations: mutable.HashMap[Int, String] = mutable.HashMap[Int, String]()

val stream = Thread.currentThread().getContextClassLoader()
.getResourceAsStream("flink_11-kryo_registrations")
Source.fromInputStream(stream).getLines().foreach{
line =>
val Array(id, registeredClass) = line.split(",")
previousRegistrations.put(id.toInt, registeredClass)
}

// Get Kryo and verify that the registered IDs and types in
// Kryo have not changed compared to the provided registrations
// file.
val kryo = new KryoSerializer[Integer](classOf[Integer], new ExecutionConfig()).getKryo
val nextId = kryo.getNextRegistrationId
for (i <- 0 until nextId) {
val registration = kryo.getRegistration(i)

previousRegistrations.get(registration.getId) match {
case None => throw new IllegalStateException(s"Expected no entry with ID " +
s"${registration.getId}, but got one for type ${registration.getType.getName}. This " +
s"can lead to registered user types being deserialized with the wrong serializer when " +
s"restoring a savepoint.")
case Some(registeredClass) =>
if (registeredClass != registration.getType.getName) {
throw new IllegalStateException(s"Expected type ${registration.getType.getName} with " +
s"ID ${registration.getId}, but got $registeredClass.")
}
}
}

// Verify number of registrations (required to check if current number of
// registrations is less than before).
if (previousRegistrations.size != nextId) {
throw new IllegalStateException(s"Number of registered classes changed (previously " +
s"${previousRegistrations.size}, but now $nextId). This can lead to registered user " +
s"types being deserialized with the wrong serializer when restoring a savepoint.")
}
}

/**
* Creates a Kryo serializer and writes the default registrations out to a
* comma separated file with one entry per line:
*
* id,class
*
* The produced file is used to check that the registered IDs don't change
* in future Flink versions.
*
* This method is not used in the tests, but documents how the test file
* has been created and can be used to re-create it if needed.
*
* @param filePath File path to write registrations to
*/
private def writeDefaultKryoRegistrations(filePath: String) = {
val file = new File(filePath)
if (file.exists()) {
file.delete()
}

val writer = new BufferedWriter(new FileWriter(file))

try {
val kryo = new KryoSerializer[Integer](classOf[Integer], new ExecutionConfig()).getKryo

val nextId = kryo.getNextRegistrationId
for (i <- 0 until nextId) {
val registration = kryo.getRegistration(i)
val str = registration.getId + "," + registration.getType.getName
writer.write(str, 0, str.length)
writer.newLine()
}

println(s"Created file with registrations at $file.")
} finally {
writer.close()
}
}


case class ComplexType(id: String, number: Int, values: List[Int]){
override def equals(obj: Any): Boolean ={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,15 @@
*/
package org.apache.flink.api.scala.types

import java.io.{DataInput, DataOutput}

import org.apache.flink.api.java.typeutils.TypeExtractorTest.CustomTuple
import org.apache.flink.api.java.io.CollectionInputFormat
import org.apache.hadoop.io.Writable
import org.junit.{Assert, Test}

import org.apache.flink.api.common.typeinfo._
import org.apache.flink.api.java.io.CollectionInputFormat
import org.apache.flink.api.java.typeutils.TypeExtractorTest.CustomTuple
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.{UnitTypeInfo, CaseClassTypeInfo}
import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, UnitTypeInfo}
import org.apache.flink.types.{IntValue, StringValue}

class MyWritable extends Writable {
def write(out: DataOutput) {
}

def readFields(in: DataInput) {
}
}
import org.junit.{Assert, Test}

case class CustomCaseClass(a: String, b: Int)

Expand Down Expand Up @@ -182,14 +171,6 @@ class TypeInformationGenTest {
.getComponentInfo.isInstanceOf[PojoTypeInfo[_]])
}

@Test
def testWritableType(): Unit = {
val ti = createTypeInformation[MyWritable]

Assert.assertTrue(ti.isInstanceOf[WritableTypeInfo[_]])
Assert.assertEquals(classOf[MyWritable], ti.asInstanceOf[WritableTypeInfo[_]].getTypeClass)
}

@Test
def testTupleWithBasicTypes(): Unit = {
val ti = createTypeInformation[(Int, Long, Double, Float, Boolean, String, Char, Short, Byte)]
Expand Down
86 changes: 0 additions & 86 deletions flink-tests/src/test/resources/flink_11-kryo_registrations

This file was deleted.

Loading

0 comments on commit 6a93597

Please sign in to comment.