Skip to content

Commit

Permalink
[FLINK-10679] [core] Factor out reusable read/write logic for seriali…
Browse files Browse the repository at this point in the history
…zer snapshots
  • Loading branch information
StephanEwen authored and tzulitai committed Oct 30, 2018
1 parent e57a46f commit ead11bd
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public static <T> TypeSerializer<T> tryReadSerializer(
} catch (UnloadableTypeSerializerException e) {
if (useDummyPlaceholder) {
LOG.warn("Could not read a requested serializer. Replaced with a UnloadableDummyTypeSerializer.", e.getCause());
return new UnloadableDummyTypeSerializer<>(e.getSerializerBytes(), e);
return new UnloadableDummyTypeSerializer<>(e.getSerializerBytes(), e.getCause());
} else {
throw e;
}
Expand Down Expand Up @@ -232,8 +232,6 @@ public static List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> readSer
*/
public static final class TypeSerializerSerializationProxy<T> extends VersionedIOReadableWritable {

private static final Logger LOG = LoggerFactory.getLogger(TypeSerializerSerializationProxy.class);

private static final int VERSION = 1;

private ClassLoader userClassLoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,35 @@ public interface TypeSerializerSnapshot<T> {
*/
<NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(NS newSerializer);

// ------------------------------------------------------------------------
// read / write utilities
// ------------------------------------------------------------------------

/**
* Writes the given snapshot to the out stream. One should always use this method to write
* snapshots out, rather than directly calling {@link #writeSnapshot(DataOutputView)}.
*
* <p>The snapshot written with this method can be read via {@link #readVersionedSnapshot(DataInputView, ClassLoader)}.
*/
static void writeVersionedSnapshot(DataOutputView out, TypeSerializerSnapshot<?> snapshot) throws IOException {
out.writeUTF(snapshot.getClass().getName());
out.writeInt(snapshot.getCurrentVersion());
snapshot.writeSnapshot(out);
}


/**
* Reads a snapshot from the stream, performing resolving
*
* <p>This method reads snapshots written by {@link #writeVersionedSnapshot(DataOutputView, TypeSerializerSnapshot)}.
*/
static <T> TypeSerializerSnapshot<T> readVersionedSnapshot(DataInputView in, ClassLoader cl) throws IOException {
final TypeSerializerSnapshot<T> snapshot =
TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(in, cl);

final int version = in.readInt();
snapshot.readSnapshot(version, in, cl);

return snapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,30 @@ public static <T> TypeSerializerSnapshot<T> readSerializerSnapshot(
return proxy.getSerializerSnapshot();
}


public static <T> TypeSerializerSnapshot<T> readAndInstantiateSnapshotClass(DataInputView in, ClassLoader cl) throws IOException {
final String className = in.readUTF();

final Class<? extends TypeSerializerSnapshot> rawClazz;
try {
rawClazz = Class
.forName(className, false, cl)
.asSubclass(TypeSerializerSnapshot.class);
}
catch (ClassNotFoundException e) {
throw new IOException(
"Could not find requested TypeSerializerSnapshot class '" + className + "' in classpath.", e);
}
catch (ClassCastException e) {
throw new IOException("The class '" + className + "' is not a subclass of TypeSerializerSnapshot.", e);
}

@SuppressWarnings("unchecked")
final Class<? extends TypeSerializerSnapshot<T>> clazz = (Class<? extends TypeSerializerSnapshot<T>>) rawClazz;

return InstantiationUtil.instantiate(clazz);
}

/**
* Utility serialization proxy for a {@link TypeSerializerSnapshot}.
*/
Expand Down Expand Up @@ -127,11 +151,7 @@ public void write(DataOutputView out) throws IOException {
// write the format version of this utils format
super.write(out);

// config snapshot class, so that we can re-instantiate the
// correct type of config snapshot instance when deserializing
out.writeUTF(serializerSnapshot.getClass().getName());
out.writeInt(serializerSnapshot.getCurrentVersion());
serializerSnapshot.writeSnapshot(out);
TypeSerializerSnapshot.writeVersionedSnapshot(out, serializerSnapshot);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -172,10 +192,7 @@ TypeSerializerSnapshot<T> getSerializerSnapshot() {
*/
@VisibleForTesting
static <T> TypeSerializerSnapshot<T> deserializeV2(DataInputView in, ClassLoader cl) throws IOException {
final TypeSerializerSnapshot<T> snapshot = readAndInstantiateSnapshotClass(in, cl);
final int version = in.readInt();
snapshot.readSnapshot(version, in, cl);
return snapshot;
return TypeSerializerSnapshot.readVersionedSnapshot(in, cl);
}

/**
Expand All @@ -195,14 +212,6 @@ static <T> TypeSerializerSnapshot<T> deserializeV1(
// - new snapshot type that understands the old format and can produce a restore serializer from it
if (snapshot instanceof TypeSerializerConfigSnapshot) {
TypeSerializerConfigSnapshot<T> oldTypeSnapshot = (TypeSerializerConfigSnapshot<T>) snapshot;

// old type, assume we need a serializer from the outside
if (serializer == null || serializer instanceof UnloadableDummyTypeSerializer) {
throw new IOException(
"Found serializer snapshot of pre-Flink-1.7 format (TypeSerializerConfigSnapshot) " +
"but could not Java-deserialize the corresponding TypeSerializer.");
}

oldTypeSnapshot.setPriorSerializer(serializer);
oldTypeSnapshot.setUserCodeClassLoader(cl);
oldTypeSnapshot.read(in);
Expand All @@ -216,23 +225,6 @@ static <T> TypeSerializerSnapshot<T> deserializeV1(
return snapshot;
}

@SuppressWarnings("unchecked")
private static <T> TypeSerializerSnapshot<T> readAndInstantiateSnapshotClass(DataInputView in, ClassLoader cl) throws IOException {
final String serializerConfigClassname = in.readUTF();
final Class<? extends TypeSerializerSnapshot<T>> serializerConfigSnapshotClass;

try {
serializerConfigSnapshotClass = (Class<? extends TypeSerializerSnapshot<T>>)
Class.forName(serializerConfigClassname, false, cl);
} catch (ClassNotFoundException e) {
throw new IOException(
"Could not find requested TypeSerializerSnapshot class "
+ serializerConfigClassname + " in classpath.", e);
}

return InstantiationUtil.instantiate(serializerConfigSnapshotClass);
}

@SuppressWarnings("deprecation")
private static <T> void setSerializerForWriteIfOldPath(
TypeSerializerSnapshot<T> serializerSnapshot,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.typeutils;

import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;

import org.junit.Test;

import java.io.IOException;
import java.io.ObjectInputStream;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/**
* Tests the backwards compatibility of the TypeSerializerConfigSnapshot.
*/
@SuppressWarnings({"serial", "deprecation"})
public class TypeSerializerSnapshotTest {

@Test
public void testBridgeCompatibilityCheck() throws Exception {
TestSerializerConfigSnapshot snap = new TestSerializerConfigSnapshot();

TestSerializer serCompat = new TestSerializer(true);
TypeSerializerSchemaCompatibility<Object> resultCompat = snap.resolveSchemaCompatibility(serCompat);
assertTrue(resultCompat.isCompatibleAsIs());

TestSerializer serIncompat = new TestSerializer(false);
TypeSerializerSchemaCompatibility<Object> resultIncompat = snap.resolveSchemaCompatibility(serIncompat);
assertTrue(resultIncompat.isIncompatible());
}

@Test
public void testSerializeConfigWhenSerializerMissing() throws Exception {
TestSerializer ser = new TestSerializer();
TypeSerializerConfigSnapshot<Object> snap = (TypeSerializerConfigSnapshot<Object>) ser.snapshotConfiguration();

try {
TypeSerializerSnapshot.writeVersionedSnapshot(new DataOutputSerializer(64), snap);
fail("exception expected");
}
catch (IllegalStateException e) {
// expected
}
}

@Test
public void testSerializerDeserializationFailure() throws Exception {
TestSerializer ser = new TestSerializer();
TypeSerializerConfigSnapshot<Object> snap = (TypeSerializerConfigSnapshot<Object>) ser.snapshotConfiguration();
snap.setPriorSerializer(ser);

DataOutputSerializer out = new DataOutputSerializer(64);

TypeSerializerSnapshot.writeVersionedSnapshot(out, snap);
TypeSerializerSnapshot<Object> readBack = TypeSerializerSnapshot.readVersionedSnapshot(
new DataInputDeserializer(out.getCopyOfBuffer()), getClass().getClassLoader());

assertNotNull(readBack);

try {
readBack.restoreSerializer();
fail("expected exception");
}
catch (IllegalStateException e) {
// expected
}

((TypeSerializerConfigSnapshot<Object>) readBack).setPriorSerializer(
new UnloadableDummyTypeSerializer<>(new byte[0]));
try {
readBack.restoreSerializer();
fail("expected exception");
}
catch (IllegalStateException e) {
// expected
}
}

// ------------------------------------------------------------------------

private static final class TestSerializer extends TypeSerializer<Object> {

private final boolean compatible;

TestSerializer() {
this(true);
}

TestSerializer(boolean compatible) {
this.compatible = compatible;
}

@Override
public boolean isImmutableType() {
throw new UnsupportedOperationException();
}

@Override
public TypeSerializer<Object> duplicate() {
throw new UnsupportedOperationException();
}

@Override
public Object createInstance() {
throw new UnsupportedOperationException();
}

@Override
public Object copy(Object from) {
throw new UnsupportedOperationException();
}

@Override
public Object copy(Object from, Object reuse) {
throw new UnsupportedOperationException();
}

@Override
public int getLength() {
throw new UnsupportedOperationException();
}

@Override
public void serialize(Object record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public Object deserialize(DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public Object deserialize(Object reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public boolean equals(Object obj) {
return obj instanceof TestSerializer;
}

@Override
public boolean canEqual(Object obj) {
return true;
}

@Override
public int hashCode() {
return 0;
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
throw new IOException("cannot deserialize");
}

@Override
public TypeSerializerSnapshot<Object> snapshotConfiguration() {
return new TestSerializerConfigSnapshot();
}

@Override
public CompatibilityResult<Object> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
return compatible ? CompatibilityResult.compatible() : CompatibilityResult.requiresMigration();
}
}

public static class TestSerializerConfigSnapshot extends TypeSerializerConfigSnapshot<Object> {

@Override
public int getVersion() {
return 0;
}
}
}

0 comments on commit ead11bd

Please sign in to comment.