Skip to content

Commit

Permalink
[FLINK-1395] Add tests for custom serializers with JodaTime.
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha authored and StephanEwen committed Jan 19, 2015
1 parent 29c54a2 commit 020b282
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@

import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.junit.Test;

/**
* Abstract test base for serializers.
*
* We have a toString() call on all deserialized
* values because this is further evidence that the deserialized value is actually correct.
* (JodaTime DataTime) with the default KryoSerializer used to pass this test but the
* internal state would be corrupt, which becomes evident when toString is called.
*/
public abstract class SerializerTestBase<T> {

Expand Down Expand Up @@ -99,6 +103,7 @@ public void testCopy() {

for (T datum : testData) {
T copy = serializer.copy(datum);
copy.toString();
deepEquals("Copied element is not equal to the original element.", datum, copy);
}
}
Expand All @@ -117,6 +122,7 @@ public void testCopyIntoNewElements() {

for (T datum : testData) {
T copy = serializer.copy(datum, serializer.createInstance());
copy.toString();
deepEquals("Copied element is not equal to the original element.", datum, copy);
}
}
Expand All @@ -137,6 +143,7 @@ public void testCopyIntoReusedElements() {

for (T datum : testData) {
T copy = serializer.copy(datum, target);
copy.toString();
deepEquals("Copied element is not equal to the original element.", datum, copy);
target = copy;
}
Expand All @@ -162,6 +169,8 @@ public void testSerializeIndividually() {
assertTrue("No data available during deserialization.", in.available() > 0);

T deserialized = serializer.deserialize(serializer.createInstance(), in);
deserialized.toString();

deepEquals("Deserialized value if wrong.", value, deserialized);

assertTrue("Trailing data available after deserialization.", in.available() == 0);
Expand Down Expand Up @@ -190,6 +199,8 @@ public void testSerializeIndividuallyReusingValues() {
assertTrue("No data available during deserialization.", in.available() > 0);

T deserialized = serializer.deserialize(reuseValue, in);
deserialized.toString();

deepEquals("Deserialized value if wrong.", value, deserialized);

assertTrue("Trailing data available after deserialization.", in.available() == 0);
Expand Down Expand Up @@ -220,6 +231,8 @@ public void testSerializeAsSequenceNoReuse() {
int num = 0;
while (in.available() > 0) {
T deserialized = serializer.deserialize(in);
deserialized.toString();

deepEquals("Deserialized value if wrong.", testData[num], deserialized);
num++;
}
Expand Down Expand Up @@ -250,6 +263,8 @@ public void testSerializeAsSequenceReusingValues() {
int num = 0;
while (in.available() > 0) {
T deserialized = serializer.deserialize(reuseValue, in);
deserialized.toString();

deepEquals("Deserialized value if wrong.", testData[num], deserialized);
reuseValue = deserialized;
num++;
Expand Down Expand Up @@ -283,6 +298,8 @@ public void testSerializedCopyIndividually() {
assertTrue("No data available copying.", toVerify.available() > 0);

T deserialized = serializer.deserialize(serializer.createInstance(), toVerify);
deserialized.toString();

deepEquals("Deserialized value if wrong.", value, deserialized);

assertTrue("Trailing data available after deserialization.", toVerify.available() == 0);
Expand Down Expand Up @@ -318,6 +335,8 @@ public void testSerializedCopyAsSequence() {

while (toVerify.available() > 0) {
T deserialized = serializer.deserialize(serializer.createInstance(), toVerify);
deserialized.toString();

deepEquals("Deserialized value if wrong.", testData[num], deserialized);
num++;
}
Expand Down
3 changes: 3 additions & 0 deletions flink-dist/src/main/flink-bin/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ under the Apache License (v 2.0):
- Chill_2.10 v0.5.1 (https://github.com/twitter/chill)
- Jetty Web Container (http:https://www.eclipse.org/jetty/)
- Amazon Web Services SDK for Java (http:https://aws.amazon.com/sdkforjava/)
- ScalaTest (http:https://www.scalatest.org)
- StartBootstrap (http:https://startbootstrap.com)
- CHAP Links Library Timeline (http:https://almende.github.io/chap-links-library/)
- Twitter Hosebird Client (hbc) (https://github.com/twitter/hbc)
Expand Down Expand Up @@ -298,6 +299,7 @@ BSD-style licenses:
- Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet
- D3 (http:https://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
- LevelDB JNI (https://github.com/fusesource/leveldbjni/) - Copyright (c) 2011, FuseSource Corp.
- Memcached (https://github.com/memcached/memcached) - Copyright (c) 2003, Danga Interactive, Inc.
- Redis (http:https://redis.io/) - Copyright (c) 2009, Salvatore Sanfilippo and Pieter Noordhuis

[BSD-like License]
Expand Down Expand Up @@ -461,6 +463,7 @@ For src/main/native/src/org/apache/hadoop/io/compress/lz4/{lz4.h,lz4.c,lz4hc.h,l
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

You can contact the author at :
- LZ4 homepage : http:https://fastcompression.blogspot.com/p/lz4.html
- LZ4 source repository : http:https://code.google.com/p/lz4/
- LZ4 public forum : https://groups.google.com/forum/#!forum/lz4c
*/
Expand Down
7 changes: 7 additions & 0 deletions flink-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<!-- Because flink-scala uses it in tests -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import org.apache.flink.api.common.typeutils.SerializerTestInstance;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.util.StringUtils;
import org.joda.time.DateTime;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -36,7 +39,6 @@ abstract public class AbstractGenericTypeSerializerTest {

private final Random rnd = new Random(349712539451944123L);


@Test
public void testString() {
runTests("abc", "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public void testJavaSet(){
runTests(b);
}



@Test
public void testJavaDequeue(){
Collection<Integer> c = new LinkedList<Integer>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.typeutils.runtime;

import java.util.Collection;
import java.util.HashSet;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.joda.time.LocalDate;
import org.junit.Test;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

@SuppressWarnings("unchecked")
public class KryoWithCustomSerializersTest extends AbstractGenericTypeSerializerTest {


@Test
public void testJodaTime(){
Collection<LocalDate> b = new HashSet<LocalDate>();

b.add(new LocalDate(1L));
b.add(new LocalDate(2L));

KryoSerializer.registerSerializer(LocalDate.class, LocalDateSerializer.class);

runTests(b);
}

@Override
protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
return new KryoSerializer<T>(type);
}

public static final class LocalDateSerializer extends Serializer<LocalDate> implements java.io.Serializable {

private static final long serialVersionUID = 1L;

@Override
public void write(Kryo kryo, Output output, LocalDate object) {
output.writeInt(object.getYear());
output.writeInt(object.getMonthOfYear());
output.writeInt(object.getDayOfMonth());
}

@Override
public LocalDate read(Kryo kryo, Input input, Class<LocalDate> type) {
return new LocalDate(input.readInt(), input.readInt(), input.readInt());
}
}
}
7 changes: 7 additions & 0 deletions flink-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ under the License.
<artifactId>scalatest_2.10</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@ package org.apache.flink.api.scala.runtime

import org.apache.flink.api.common.typeutils.SerializerTestInstance
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.joda.time.DateTime
import org.junit.Test

import scala.reflect._
import org.joda.time.LocalDate
import org.apache.flink.api.java.typeutils.runtime.KryoSerializer
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.io.Input

class KryoGenericTypeSerializerTest {

Expand All @@ -32,6 +38,15 @@ class KryoGenericTypeSerializerTest {
runTests(a)
}

@Test
def jodaSerialization: Unit = {
val a = List(new LocalDate(1), new LocalDate(2))

KryoSerializer.registerSerializer(classOf[LocalDate], new LocalDateSerializer())

runTests(a)
}

@Test
def testScalaListSerialization: Unit = {
val a = List(42,1,49,1337)
Expand Down Expand Up @@ -133,3 +148,16 @@ class KryoGenericTypeSerializerTest {
instance.testAll()
}
}

class LocalDateSerializer extends Serializer[LocalDate] with java.io.Serializable {

override def write(kryo: Kryo, output: Output, obj: LocalDate) {
output.writeInt(obj.getYear());
output.writeInt(obj.getMonthOfYear());
output.writeInt(obj.getDayOfMonth());
}

override def read(kryo: Kryo, input: Input, typeClass: Class[LocalDate]) : LocalDate = {
new LocalDate(input.readInt(), input.readInt(), input.readInt());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@
package org.apache.flink.api.scala.runtime

import java.util

import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.util.StringUtils
import org.joda.time.LocalDate
import org.junit.Assert
import org.junit.Test

import org.apache.flink.api.scala._

import scala.collection.JavaConverters._

import java.util.Random
import org.apache.flink.api.java.typeutils.runtime.KryoSerializer

class TupleSerializerTest {

Expand Down Expand Up @@ -92,6 +90,23 @@ class TupleSerializerTest {
runTests(testTuples)
}

@Test
def testTuple2StringJodaTime(): Unit = {
val rnd: Random = new Random(807346528946L)

val testTuples = Array(
(StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
(StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
(StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
("", rnd.nextDouble),
(StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
(StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)))

KryoSerializer.registerSerializer(classOf[LocalDate], new LocalDateSerializer())

runTests(testTuples)
}

@Test
def testTuple2StringStringArray(): Unit = {
val rnd: Random = new Random(289347567856686223L)
Expand Down

0 comments on commit 020b282

Please sign in to comment.