Skip to content

Commit

Permalink
[FLINK-6922] [serializer] Remove Java serialization from Enum(Value)S…
Browse files Browse the repository at this point in the history
…erializerConfigSnapshot

This commit removes the use of Java serialization for serializing the
enum class and constants in the Java EnumSerializerConfigSnapshot and
Scala ScalaEnumSerializerConfigSnapshot.

This closes apache#4147.
  • Loading branch information
tzulitai committed Jun 20, 2017
1 parent 228faf8 commit f24a499
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;

import org.apache.flink.annotation.Internal;
Expand All @@ -33,7 +33,6 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
Expand Down Expand Up @@ -185,22 +184,47 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c

if (enumClass.equals(config.getTypeClass())) {

// reorder enum constants so that previously existing constants
// remain in the same order, and new
LinkedHashSet<T> reorderedEnumConstants = new LinkedHashSet<>();
reorderedEnumConstants.addAll(Arrays.asList(config.getEnumConstants()));
reorderedEnumConstants.addAll(Arrays.asList(enumClass.getEnumConstants()));

// regenerate enum constant to ordinal bidirectional map
this.values = (T[]) Array.newInstance(enumClass, reorderedEnumConstants.size());
this.valueToOrdinal.clear();
int i = 0;
for (T constant : reorderedEnumConstants) {
this.values[i] = constant;
this.valueToOrdinal.put(constant, i);
i++;
T[] reorderedEnumConstants = (T[]) Array.newInstance(enumClass, this.values.length);
Map<T, Integer> rebuiltEnumConstantToOrdinalMap = new HashMap<>(this.values.length);

List<String> previousEnumConstants = config.getEnumConstants();

if (previousEnumConstants.size() <= this.values.length) {
for (int i = 0; i < previousEnumConstants.size(); i++) {
String previousEnumConstantStr = previousEnumConstants.get(i);

try {
// fetch the actual enum, and use it to populate the reconstructed bi-directional map
T enumConstant = Enum.valueOf(enumClass, previousEnumConstantStr);

reorderedEnumConstants[i] = enumConstant;
rebuiltEnumConstantToOrdinalMap.put(enumConstant, i);
} catch (IllegalArgumentException e) {
// a previous enum constant no longer exists, and therefore requires migration
return CompatibilityResult.requiresMigration();
}
}
} else {
// some enum constants have been removed (because there are
// fewer constants now), and therefore requires migration
return CompatibilityResult.requiresMigration();
}

// if there are new enum constants, append them to the end
if (this.values.length > previousEnumConstants.size()) {
int appendedNewOrdinal = previousEnumConstants.size();
for (T currentEnumConstant : this.values) {
if (!rebuiltEnumConstantToOrdinalMap.containsKey(currentEnumConstant)) {
reorderedEnumConstants[appendedNewOrdinal] = currentEnumConstant;
rebuiltEnumConstantToOrdinalMap.put(currentEnumConstant, appendedNewOrdinal);
appendedNewOrdinal++;
}
}
}

// if we reach here, we can simply reconfigure ourselves to be compatible
this.values = reorderedEnumConstants;
this.valueToOrdinal = rebuiltEnumConstantToOrdinalMap;
return CompatibilityResult.compatible();
}
}
Expand All @@ -219,40 +243,53 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c
public static final class EnumSerializerConfigSnapshot<T extends Enum<T>>
extends GenericTypeSerializerConfigSnapshot<T> {

private static final int VERSION = 1;
private static final int VERSION = 2;

private T[] enumConstants;
private List<String> enumConstants;

/** This empty nullary constructor is required for deserializing the configuration. */
public EnumSerializerConfigSnapshot() {}

public EnumSerializerConfigSnapshot(Class<T> enumClass, T[] enumConstants) {
public EnumSerializerConfigSnapshot(Class<T> enumClass, T[] enumConstantsArr) {
super(enumClass);
this.enumConstants = Preconditions.checkNotNull(enumConstants);
this.enumConstants = buildEnumConstantsList(Preconditions.checkNotNull(enumConstantsArr));
}

@Override
public void write(DataOutputView out) throws IOException {
super.write(out);

try (final DataOutputViewStream outViewWrapper = new DataOutputViewStream(out)) {
InstantiationUtil.serializeObject(outViewWrapper, enumConstants);
out.writeInt(enumConstants.size());
for (String enumConstant : enumConstants) {
out.writeUTF(enumConstant);
}
}

@Override
public void read(DataInputView in) throws IOException {
super.read(in);

try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) {
try {
enumConstants = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
} catch (ClassNotFoundException e) {
throw new IOException("The requested enum class cannot be found in classpath.", e);
} catch (IllegalArgumentException e) {
throw new IOException("A previously existing enum constant of "
+ getTypeClass().getName() + " no longer exists.", e);
if (getReadVersion() == 1) {
try (final DataInputViewStream inViewWrapper = new DataInputViewStream(in)) {
try {
T[] legacyEnumConstants = InstantiationUtil.deserializeObject(inViewWrapper, getUserCodeClassLoader());
this.enumConstants = buildEnumConstantsList(legacyEnumConstants);
} catch (ClassNotFoundException e) {
throw new IOException("The requested enum class cannot be found in classpath.", e);
} catch (IllegalArgumentException e) {
throw new IOException("A previously existing enum constant of "
+ getTypeClass().getName() + " no longer exists.", e);
}
}
} else if (getReadVersion() == VERSION) {
int numEnumConstants = in.readInt();

this.enumConstants = new ArrayList<>(numEnumConstants);
for (int i = 0; i < numEnumConstants; i++) {
enumConstants.add(in.readUTF());
}
} else {
throw new IOException("Cannot deserialize EnumSerializerConfigSnapshot with version " + getReadVersion());
}
}

Expand All @@ -261,21 +298,33 @@ public int getVersion() {
return VERSION;
}

public T[] getEnumConstants() {
@Override
public int[] getCompatibleVersions() {
return new int[] {VERSION, 1};
}

public List<String> getEnumConstants() {
return enumConstants;
}

@Override
public boolean equals(Object obj) {
return super.equals(obj)
&& Arrays.equals(
enumConstants,
((EnumSerializerConfigSnapshot) obj).getEnumConstants());
return super.equals(obj) && enumConstants.equals(((EnumSerializerConfigSnapshot) obj).getEnumConstants());
}

@Override
public int hashCode() {
return super.hashCode() * 31 + Arrays.hashCode(enumConstants);
return super.hashCode() * 31 + enumConstants.hashCode();
}

private static <T extends Enum<T>> List<String> buildEnumConstantsList(T[] enumConstantsArr) {
List<String> res = new ArrayList<>(enumConstantsArr.length);

for (T enumConstant : enumConstantsArr) {
res.add(enumConstant.name());
}

return res;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.typeutils.base;

import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import javax.tools.JavaCompiler;
import javax.tools.ToolProvider;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;

public class EnumSerializerUpgradeTest extends TestLogger {

@ClassRule
public static TemporaryFolder temporaryFolder = new TemporaryFolder();

private static final String ENUM_NAME = "EnumSerializerUpgradeTestEnum";

private static final String ENUM_A = "public enum " + ENUM_NAME + " { A, B, C }";
private static final String ENUM_B = "public enum " + ENUM_NAME + " { A, B, C, D }";
private static final String ENUM_C = "public enum " + ENUM_NAME + " { A, C }";
private static final String ENUM_D = "public enum " + ENUM_NAME + " { A, C, B }";

/**
* Check that identical enums don't require migration
*/
@Test
public void checkIndenticalEnums() throws Exception {
Assert.assertFalse(checkCompatibility(ENUM_A, ENUM_A).isRequiresMigration());
}

/**
* Check that appending fields to the enum does not require migration
*/
@Test
public void checkAppendedField() throws Exception {
Assert.assertFalse(checkCompatibility(ENUM_A, ENUM_B).isRequiresMigration());
}

/**
* Check that removing enum fields requires migration
*/
@Test
public void checkRemovedField() throws Exception {
Assert.assertTrue(checkCompatibility(ENUM_A, ENUM_C).isRequiresMigration());
}

/**
* Check that changing the enum field order don't require migration
*/
@Test
public void checkDifferentFieldOrder() throws Exception {
Assert.assertFalse(checkCompatibility(ENUM_A, ENUM_D).isRequiresMigration());
}

@SuppressWarnings("unchecked")
private static CompatibilityResult checkCompatibility(String enumSourceA, String enumSourceB)
throws IOException, ClassNotFoundException {

ClassLoader classLoader = compileAndLoadEnum(
temporaryFolder.newFolder(), ENUM_NAME + ".java", enumSourceA);

EnumSerializer enumSerializer = new EnumSerializer(classLoader.loadClass(ENUM_NAME));

TypeSerializerConfigSnapshot snapshot = enumSerializer.snapshotConfiguration();
byte[] snapshotBytes;
try (
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
DataOutputViewStreamWrapper outputViewStreamWrapper = new DataOutputViewStreamWrapper(outBuffer)) {

TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(outputViewStreamWrapper, snapshot);
snapshotBytes = outBuffer.toByteArray();
}

ClassLoader classLoader2 = compileAndLoadEnum(
temporaryFolder.newFolder(), ENUM_NAME + ".java", enumSourceB);

TypeSerializerConfigSnapshot restoredSnapshot;
try (
ByteArrayInputStream inBuffer = new ByteArrayInputStream(snapshotBytes);
DataInputViewStreamWrapper inputViewStreamWrapper = new DataInputViewStreamWrapper(inBuffer)) {

restoredSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(inputViewStreamWrapper, classLoader2);
}

EnumSerializer enumSerializer2 = new EnumSerializer(classLoader2.loadClass(ENUM_NAME));
return enumSerializer2.ensureCompatibility(restoredSnapshot);
}

private static ClassLoader compileAndLoadEnum(File root, String filename, String source) throws IOException {
File file = writeSourceFile(root, filename, source);

compileClass(file);

return new URLClassLoader(
new URL[]{root.toURI().toURL()},
Thread.currentThread().getContextClassLoader());
}

private static File writeSourceFile(File root, String filename, String source) throws IOException {
File file = new File(root, filename);
FileWriter fileWriter = new FileWriter(file);

fileWriter.write(source);
fileWriter.close();

return file;
}

private static int compileClass(File sourceFile) {
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
return compiler.run(null, null, null, sourceFile.getPath());
}
}
Loading

0 comments on commit f24a499

Please sign in to comment.