Skip to content

Commit

Permalink
[FLINK-7484] Perform proper deep copy in CaseClassSerializer.duplicate()
Browse files Browse the repository at this point in the history
This also adds a test that verifies the deep copy.
  • Loading branch information
aljoscha committed Oct 13, 2017
1 parent 85b2f27 commit 90be577
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.java.typeutils.runtime;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
Expand All @@ -43,7 +44,7 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {

protected final Class<T> tupleClass;

protected final TypeSerializer<Object>[] fieldSerializers;
protected TypeSerializer<Object>[] fieldSerializers;

protected final int arity;

Expand Down Expand Up @@ -183,4 +184,9 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c
}

protected abstract TupleSerializerBase<T> createSerializerInstance(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers);

@VisibleForTesting
public TypeSerializer<Object>[] getFieldSerializers() {
return fieldSerializers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ abstract class CaseClassSerializer[T <: Product](
val result = super.clone().asInstanceOf[CaseClassSerializer[T]]

// achieve a deep copy by duplicating the field serializers
result.fieldSerializers.transform(_.duplicate())
result.fieldSerializers = result.fieldSerializers.map(_.duplicate())
result.fields = null
result.instanceCreationFailed = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,43 @@ package org.apache.flink.api.scala.runtime
import java.util
import java.util.Random

import org.apache.flink.api.scala._
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.typeutils.CaseClassSerializer
import org.apache.flink.util.StringUtils

import org.joda.time.LocalDate

import org.junit.Assert
import org.junit.Test
import org.junit.Assert._
import org.junit.{Assert, Test}

import scala.collection.JavaConverters._

class TupleSerializerTest {

@Test
def testProperDeepCopy(): Unit = {
val tpe = createTypeInformation[((String, Int), (Int, String))]

val originalSerializer =
tpe.createSerializer(new ExecutionConfig)
.asInstanceOf[CaseClassSerializer[((String, Int), (Int, String))]]
val duplicateSerializer = originalSerializer.duplicate()

duplicateSerializer.getFieldSerializers

// the list of child serializers must be duplicated
assertTrue(duplicateSerializer.getFieldSerializers ne originalSerializer.getFieldSerializers)

// each of the child serializers (which are themselves CaseClassSerializers) must be duplicated
assertTrue(
duplicateSerializer.getFieldSerializers()(0) ne originalSerializer.getFieldSerializers()(0))

assertTrue(
duplicateSerializer.getFieldSerializers()(1) ne originalSerializer.getFieldSerializers()(1))
}

@Test
def testTuple1Int(): Unit = {
val testTuples = Array(Tuple1(42), Tuple1(1), Tuple1(0), Tuple1(-1), Tuple1(Int.MaxValue),
Expand Down

0 comments on commit 90be577

Please sign in to comment.