Skip to content

Commit

Permalink
[FLINK-6844] [scala] Implement compatibility methods for TraversableS…
Browse files Browse the repository at this point in the history
…erializer

This closes apache#4081.
  • Loading branch information
tzulitai committed Jun 7, 2017
1 parent bdffde3 commit c11d5ed
Showing 1 changed file with 38 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.flink.api.scala.typeutils
import java.io.ObjectInputStream

import org.apache.flink.annotation.Internal
import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot}
import org.apache.flink.api.common.typeutils._
import org.apache.flink.core.memory.{DataInputView, DataOutputView}

import scala.collection.generic.CanBuildFrom
Expand Down Expand Up @@ -152,11 +152,46 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E](
}

override def snapshotConfiguration(): TypeSerializerConfigSnapshot = {
throw new UnsupportedOperationException()
new TraversableSerializer.TraversableSerializerConfigSnapshot[E](elementSerializer)
}

override def ensureCompatibility(
configSnapshot: TypeSerializerConfigSnapshot): CompatibilityResult[T] = {
throw new UnsupportedOperationException()

configSnapshot match {
case traversableSerializerConfigSnapshot:
TraversableSerializer.TraversableSerializerConfigSnapshot[E] =>

val elemCompatRes = CompatibilityUtil.resolveCompatibilityResult(
traversableSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f0,
classOf[UnloadableDummyTypeSerializer[_]],
traversableSerializerConfigSnapshot.getSingleNestedSerializerAndConfig.f1,
elementSerializer)

if (elemCompatRes.isRequiresMigration) {
CompatibilityResult.requiresMigration()
} else {
CompatibilityResult.compatible()
}

case _ => CompatibilityResult.requiresMigration()
}
}
}

object TraversableSerializer {

class TraversableSerializerConfigSnapshot[E](
private var elementSerializer: TypeSerializer[E])
extends CompositeTypeSerializerConfigSnapshot(elementSerializer) {

/** This empty nullary constructor is required for deserializing the configuration. */
def this() = this(null)

override def getVersion = TraversableSerializerConfigSnapshot.VERSION
}

object TraversableSerializerConfigSnapshot {
val VERSION = 1
}
}

0 comments on commit c11d5ed

Please sign in to comment.