Skip to content

Commit

Permalink
[FLINK-8841] Remove HashMapSerializer and use MapSerializer instead.
Browse files Browse the repository at this point in the history
This closes apache#5910.
  • Loading branch information
kl0u committed Apr 26, 2018
1 parent 0113ee2 commit 3ac2823
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.typeutils.runtime.KryoRegistrationSerializerConfigSnapshot;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
Expand All @@ -41,8 +42,10 @@
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -194,10 +197,12 @@ protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFo
try {
Class.forName(streamClassDescriptor.getName(), false, classLoader);
} catch (ClassNotFoundException e) {
if (streamClassDescriptor.getName().equals("org.apache.avro.generic.GenericData$Array")) {
ObjectStreamClass result = ObjectStreamClass.lookup(
KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class);
return result;

final ObjectStreamClass equivalentSerializer =
MigrationUtil.getEquivalentSerializer(streamClassDescriptor.getName());

if (equivalentSerializer != null) {
return equivalentSerializer;
}
}

Expand All @@ -221,6 +226,52 @@ protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFo
}
}

/**
* A mapping between the full path of a deprecated serializer and its equivalent.
* These mappings are hardcoded and fixed.
*
* <p>IMPORTANT: mappings can be removed after 1 release as there will be a "migration path".
* As an example, a serializer is removed in 1.5-SNAPSHOT, then the mapping should be added for 1.5,
* and it can be removed in 1.6, as the path would be Flink-{< 1.5} -> Flink-1.5 -> Flink-{>= 1.6}.
*/
private enum MigrationUtil {

// To add a new mapping just pick a name and add an entry as the following:

GENERIC_DATA_ARRAY_SERIALIZER(
"org.apache.avro.generic.GenericData$Array",
ObjectStreamClass.lookup(KryoRegistrationSerializerConfigSnapshot.DummyRegisteredClass.class)),
HASH_MAP_SERIALIZER(
"org.apache.flink.runtime.state.HashMapSerializer",
ObjectStreamClass.lookup(MapSerializer.class)); // added in 1.5

/** An internal unmodifiable map containing the mappings between deprecated and new serializers. */
private static final Map<String, ObjectStreamClass> EQUIVALENCE_MAP = Collections.unmodifiableMap(initMap());

/** The full name of the class of the old serializer. */
private final String oldSerializerName;

/** The serialization descriptor of the class of the new serializer. */
private final ObjectStreamClass newSerializerStreamClass;

MigrationUtil(String oldSerializerName, ObjectStreamClass newSerializerStreamClass) {
this.oldSerializerName = oldSerializerName;
this.newSerializerStreamClass = newSerializerStreamClass;
}

private static Map<String, ObjectStreamClass> initMap() {
final Map<String, ObjectStreamClass> init = new HashMap<>(4);
for (MigrationUtil m: MigrationUtil.values()) {
init.put(m.oldSerializerName, m.newSerializerStreamClass);
}
return init;
}

private static ObjectStreamClass getEquivalentSerializer(String classDescriptorName) {
return EQUIVALENCE_MAP.get(classDescriptorName);
}
}

/**
* Creates a new instance of the given class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.rocksdb.DBOptions;

import java.io.File;
import java.util.Map;

import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -160,8 +159,8 @@ public void testMapSerialization() throws Exception {
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);

final InternalMapState<Long, VoidNamespace, Long, String, Map<Long, String>> mapState =
(InternalMapState<Long, VoidNamespace, Long, String, Map<Long, String>>)
final InternalMapState<Long, VoidNamespace, Long, String> mapState =
(InternalMapState<Long, VoidNamespace, Long, String>)
longHeapKeyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ public void testMapSerialization() throws Exception {
);
longHeapKeyedStateBackend.setCurrentKey(key);

final InternalMapState<Long, VoidNamespace, Long, String, HashMap<Long, String>> mapState =
(InternalMapState<Long, VoidNamespace, Long, String, HashMap<Long, String>>)
final InternalMapState<Long, VoidNamespace, Long, String> mapState =
(InternalMapState<Long, VoidNamespace, Long, String>)
longHeapKeyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
Expand All @@ -328,9 +328,9 @@ public void testMapSerialization() throws Exception {
*
* @throws Exception
*/
public static <M extends Map<Long, String>> void testMapSerialization(
public static void testMapSerialization(
final long key,
final InternalMapState<Long, VoidNamespace, Long, String, M> mapState) throws Exception {
final InternalMapState<Long, VoidNamespace, Long, String> mapState) throws Exception {

TypeSerializer<Long> userKeySerializer = LongSerializer.INSTANCE;
TypeSerializer<String> userValueSerializer = StringSerializer.INSTANCE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -232,7 +231,7 @@ protected abstract <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingS
* @param <UK> Type of the keys in the state
* @param <UV> Type of the values in the state *
*/
protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV, ? extends Map<UK, UV>> createMapState(
protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(
TypeSerializer<N> namespaceSerializer,
MapStateDescriptor<UK, UV> stateDesc) throws Exception;

Expand Down

This file was deleted.

Loading

0 comments on commit 3ac2823

Please sign in to comment.