Skip to content

Commit

Permalink
[FLINK-8451] [serializers] Make Scala tuple serializer deserializatio…
Browse files Browse the repository at this point in the history
…n more failure tolerant

This closes apache#5567.
  • Loading branch information
twalthr committed Feb 28, 2018
1 parent e8d1685 commit 6c837d7
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void read(DataInputView in) throws IOException {
super.read(in);

try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) {
tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
tupleClass = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader(), true);
} catch (ClassNotFoundException e) {
throw new IOException("Could not find requested tuple class in classpath.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,61 @@ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, Clas
*
* <p>This can be removed once 1.2 is no longer supported.
*/
private static Set<String> scalaSerializerClassnames = new HashSet<>();
private static final Set<String> scalaSerializerClassnames = new HashSet<>();
static {
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TraversableSerializer");
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.CaseClassSerializer");
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EnumValueSerializer");
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.OptionSerializer");
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.TrySerializer");
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.EitherSerializer");
scalaSerializerClassnames.add("org.apache.flink.api.scala.typeutils.UnitSerializer");
}

/**
* The serialVersionUID might change between Scala versions and since those classes are
* part of the tuple serializer config snapshots we need to ignore them.
*
* @see <a href="https://issues.apache.org/jira/browse/FLINK-8451">FLINK-8451</a>
*/
private static final Set<String> scalaTypes = new HashSet<>();
static {
scalaTypes.add("scala.Tuple1");
scalaTypes.add("scala.Tuple2");
scalaTypes.add("scala.Tuple3");
scalaTypes.add("scala.Tuple4");
scalaTypes.add("scala.Tuple5");
scalaTypes.add("scala.Tuple6");
scalaTypes.add("scala.Tuple7");
scalaTypes.add("scala.Tuple8");
scalaTypes.add("scala.Tuple9");
scalaTypes.add("scala.Tuple10");
scalaTypes.add("scala.Tuple11");
scalaTypes.add("scala.Tuple12");
scalaTypes.add("scala.Tuple13");
scalaTypes.add("scala.Tuple14");
scalaTypes.add("scala.Tuple15");
scalaTypes.add("scala.Tuple16");
scalaTypes.add("scala.Tuple17");
scalaTypes.add("scala.Tuple18");
scalaTypes.add("scala.Tuple19");
scalaTypes.add("scala.Tuple20");
scalaTypes.add("scala.Tuple21");
scalaTypes.add("scala.Tuple22");
scalaTypes.add("scala.Tuple1$mcJ$sp");
scalaTypes.add("scala.Tuple1$mcI$sp");
scalaTypes.add("scala.Tuple1$mcD$sp");
scalaTypes.add("scala.Tuple2$mcJJ$sp");
scalaTypes.add("scala.Tuple2$mcJI$sp");
scalaTypes.add("scala.Tuple2$mcJD$sp");
scalaTypes.add("scala.Tuple2$mcIJ$sp");
scalaTypes.add("scala.Tuple2$mcII$sp");
scalaTypes.add("scala.Tuple2$mcID$sp");
scalaTypes.add("scala.Tuple2$mcDJ$sp");
scalaTypes.add("scala.Tuple2$mcDI$sp");
scalaTypes.add("scala.Tuple2$mcDD$sp");
}

/**
* An {@link ObjectInputStream} that ignores serialVersionUID mismatches when deserializing objects of
* anonymous classes or our Scala serializer classes and also replaces occurences of GenericData.Array
Expand Down Expand Up @@ -158,12 +201,13 @@ protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFo
}
}

Class localClass = resolveClass(streamClassDescriptor);
if (scalaSerializerClassnames.contains(localClass.getName()) || localClass.isAnonymousClass()
final Class localClass = resolveClass(streamClassDescriptor);
final String name = localClass.getName();
if (scalaSerializerClassnames.contains(name) || scalaTypes.contains(name) || localClass.isAnonymousClass()
// isAnonymousClass does not work for anonymous Scala classes; additionally check by classname
|| localClass.getName().contains("$anon$") || localClass.getName().contains("$anonfun")) {
|| name.contains("$anon$") || name.contains("$anonfun")) {

ObjectStreamClass localClassDescriptor = ObjectStreamClass.lookup(localClass);
final ObjectStreamClass localClassDescriptor = ObjectStreamClass.lookup(localClass);
if (localClassDescriptor != null
&& localClassDescriptor.getSerialVersionUID() != streamClassDescriptor.getSerialVersionUID()) {
LOG.warn("Ignoring serialVersionUID mismatch for anonymous class {}; was {}, now {}.",
Expand Down
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala.runtime

import java.io.InputStream

import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
import org.apache.flink.api.java.typeutils.runtime.TupleSerializerConfigSnapshot
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.scala.runtime.TupleSerializerCompatibilityTestGenerator._
import org.apache.flink.api.scala.typeutils.CaseClassSerializer
import org.apache.flink.core.memory.DataInputViewStreamWrapper
import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
import org.junit.Test

/**
* Test for ensuring backwards compatibility of tuples and case classes across Scala versions.
*/
class TupleSerializerCompatibilityTest {

@Test
def testCompatibilityWithFlink_1_3(): Unit = {
var is: InputStream = null
try {
is = getClass.getClassLoader.getResourceAsStream(SNAPSHOT_RESOURCE)
val snapshotIn = new DataInputViewStreamWrapper(is)

val deserialized = TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(
snapshotIn,
getClass.getClassLoader)

assertEquals(1, deserialized.size)

val oldSerializer = deserialized.get(0).f0
val oldConfigSnapshot = deserialized.get(0).f1

// test serializer and config snapshot
assertNotNull(oldSerializer)
assertNotNull(oldConfigSnapshot)
assertTrue(oldSerializer.isInstanceOf[CaseClassSerializer[_]])
assertTrue(oldConfigSnapshot.isInstanceOf[TupleSerializerConfigSnapshot[_]])

val currentSerializer = createTypeInformation[TestCaseClass]
.createSerializer(new ExecutionConfig())
assertFalse(currentSerializer.ensureCompatibility(oldConfigSnapshot).isRequiresMigration)

// test old data serialization
is.close()
is = getClass.getClassLoader.getResourceAsStream(DATA_RESOURCE)
var dataIn = new DataInputViewStreamWrapper(is)

assertEquals(TEST_DATA_1, oldSerializer.deserialize(dataIn))
assertEquals(TEST_DATA_2, oldSerializer.deserialize(dataIn))
assertEquals(TEST_DATA_3, oldSerializer.deserialize(dataIn))

// test new data serialization
is.close()
is = getClass.getClassLoader.getResourceAsStream(DATA_RESOURCE)
dataIn = new DataInputViewStreamWrapper(is)
assertEquals(TEST_DATA_1, currentSerializer.deserialize(dataIn))
assertEquals(TEST_DATA_2, currentSerializer.deserialize(dataIn))
assertEquals(TEST_DATA_3, currentSerializer.deserialize(dataIn))
} finally {
if (is != null) {
is.close()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala.runtime

import java.io.FileOutputStream
import java.util.Collections

import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
import org.apache.flink.core.memory.DataOutputViewStreamWrapper

/**
* Run this code on a 1.3 (or earlier) branch to generate the test data
* for the [[TupleSerializerCompatibilityTest]].
*
* This generator is a separate file because a companion object would have side-effects on the
* annotated classes generated by Scala.
*/
object TupleSerializerCompatibilityTestGenerator {

case class TestCaseClass(
i: Int,
e: Either[String, Unit],
t2: (Boolean, String),
t1: (Double),
t2ii: (Int, Int))

val TEST_DATA_1 = TestCaseClass(42, Left("Hello"), (false, "what?"), 12.2, (12, 12))
val TEST_DATA_2 = TestCaseClass(42, Right(), (false, "what?"), 12.2, (100, 200))
val TEST_DATA_3 = TestCaseClass(100, Left("Hello"), (true, "what?"), 14.2, (-1, Int.MinValue))

val SNAPSHOT_RESOURCE: String = "flink-1.3.2-scala-types-serializer-snapshot"

val DATA_RESOURCE: String = "flink-1.3.2-scala-types-serializer-data"

val SNAPSHOT_OUTPUT_PATH: String = "/tmp/snapshot/" + SNAPSHOT_RESOURCE

val DATA_OUTPUT_PATH: String = "/tmp/snapshot/" + DATA_RESOURCE

def main(args: Array[String]): Unit = {

val typeInfo = org.apache.flink.api.scala.createTypeInformation[TestCaseClass]

val serializer = typeInfo.createSerializer(new ExecutionConfig())
val configSnapshot = serializer.snapshotConfiguration()

var fos: FileOutputStream = null
try {
fos = new FileOutputStream(SNAPSHOT_OUTPUT_PATH)
val out = new DataOutputViewStreamWrapper(fos)

TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
out,
Collections.singletonList(
new org.apache.flink.api.java.tuple.Tuple2(serializer, configSnapshot)
)
)
} finally {
if (fos != null) {
fos.close()
}
}

fos = null
try {
fos = new FileOutputStream(DATA_OUTPUT_PATH)
val out = new DataOutputViewStreamWrapper(fos)

serializer.serialize(TEST_DATA_1, out)
serializer.serialize(TEST_DATA_2, out)
serializer.serialize(TEST_DATA_3, out)
} finally {
if (fos != null) {
fos.close()
}
}
}
}

0 comments on commit 6c837d7

Please sign in to comment.