Skip to content

Commit

Permalink
[FLINK-6869] [scala] Specify serialVersionUID for all Scala serializers
Browse files Browse the repository at this point in the history
Previously, Scala serializers did not specify the serialVersionUID, and
therefore prohibited restore from previous Flink version snapshots
because the serializers' implementations changed.

The serialVersionUIDs added in this commit are identical to what they
were (as generated by Java) in Flink 1.2, so that we can at least
restore state that were written with the Scala serializers as of 1.2.
  • Loading branch information
tzulitai committed Jun 13, 2017
1 parent 6edb72d commit 75ea808
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.flink.types.NullFieldException
* our Java Tuples so we have to treat them differently.
*/
@Internal
@SerialVersionUID(7341356073446263475L)
abstract class CaseClassSerializer[T <: Product](
clazz: Class[T],
scalaFieldSerializers: Array[TypeSerializer[_]])
Expand Down Expand Up @@ -80,6 +81,15 @@ abstract class CaseClassSerializer[T <: Product](
createInstance(fields)
}

override def createSerializerInstance(
tupleClass: Class[T],
fieldSerializers: Array[TypeSerializer[_]]): TupleSerializerBase[T] = {
this.getClass
.getConstructors()(0)
.newInstance(tupleClass, fieldSerializers)
.asInstanceOf[CaseClassSerializer[T]]
}

def copy(from: T, reuse: T): T = {
copy(from)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView}
* Serializer for [[Either]].
*/
@Internal
@SerialVersionUID(9219995873023657525L)
class EitherSerializer[A, B, T <: Either[A, B]](
val leftSerializer: TypeSerializer[A],
val rightSerializer: TypeSerializer[B])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.flink.util.{InstantiationUtil, Preconditions}
* Serializer for [[Enumeration]] values.
*/
@Internal
@SerialVersionUID(-2403076635594572920L)
class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[E#Value] {

type T = E#Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.flink.core.memory.{DataInputView, DataOutputView}
* Serializer for [[Option]].
*/
@Internal
@SerialVersionUID(-8635243274072627338L)
class OptionSerializer[A](val elemSerializer: TypeSerializer[A])
extends TypeSerializer[Option[A]] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import scala.collection.generic.CanBuildFrom
* Serializer for Scala Collections.
*/
@Internal
@SerialVersionUID(7522917416391312410L)
abstract class TraversableSerializer[T <: TraversableOnce[E], E](
var elementSerializer: TypeSerializer[E])
extends TypeSerializer[T] with Cloneable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import scala.util.{Failure, Success, Try}
* Serializer for [[scala.util.Try]].
*/
@Internal
@SerialVersionUID(-3052182891252564491L)
class TrySerializer[A](
private val elemSerializer: TypeSerializer[A],
private val executionConfig: ExecutionConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
import org.apache.flink.core.memory.{DataInputView, DataOutputView}

@Internal
@SerialVersionUID(5413377487955047394L)
class UnitSerializer extends TypeSerializerSingleton[Unit] {

def isImmutableType(): Boolean = true
Expand Down

0 comments on commit 75ea808

Please sign in to comment.