forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-7811] Add ScalaKryoInstantiator from Chill 0.7.4 and update to…
… 0.7.6 We have to do this in order to be able to update our Chill dependency without changing the the Kryo serializers that are registered by default. The problem is that snapshots of our Kryo serializer only contain the user-registered serializers, not the serializers that are registered by default by Chill. If we had that, we could probably get by without this change. The reason we have to update is that there is no Chill 0.7.4 dependency for Scala 2.12.
- Loading branch information
Showing
8 changed files
with
445 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
63 changes: 63 additions & 0 deletions
63
flink-runtime/src/main/java/org/apache/flink/runtime/types/FlinkChillPackageRegistrar.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http:https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.types; | ||
|
||
import com.twitter.chill.IKryoRegistrar; | ||
import com.twitter.chill.java.ArraysAsListSerializer; | ||
import com.twitter.chill.java.BitSetSerializer; | ||
import com.twitter.chill.java.InetSocketAddressSerializer; | ||
import com.twitter.chill.java.IterableRegistrar; | ||
import com.twitter.chill.java.LocaleSerializer; | ||
import com.twitter.chill.java.RegexSerializer; | ||
import com.twitter.chill.java.SimpleDateFormatSerializer; | ||
import com.twitter.chill.java.SqlDateSerializer; | ||
import com.twitter.chill.java.SqlTimeSerializer; | ||
import com.twitter.chill.java.TimestampSerializer; | ||
import com.twitter.chill.java.URISerializer; | ||
import com.twitter.chill.java.UUIDSerializer; | ||
|
||
/* | ||
This code is copied as is from Twitter Chill 0.7.4 because we need to user a newer chill version | ||
but want to ensure that the serializers that are registered by default stay the same. | ||
The only changes to the code are those that are required to make it compile and pass checkstyle | ||
checks in our code base. | ||
*/ | ||
|
||
/** | ||
* Creates a registrar for all the serializers in the chill.java package. | ||
*/ | ||
public class FlinkChillPackageRegistrar { | ||
|
||
public static IKryoRegistrar all() { | ||
return new IterableRegistrar( | ||
ArraysAsListSerializer.registrar(), | ||
BitSetSerializer.registrar(), | ||
PriorityQueueSerializer.registrar(), | ||
RegexSerializer.registrar(), | ||
SqlDateSerializer.registrar(), | ||
SqlTimeSerializer.registrar(), | ||
TimestampSerializer.registrar(), | ||
URISerializer.registrar(), | ||
InetSocketAddressSerializer.registrar(), | ||
UUIDSerializer.registrar(), | ||
LocaleSerializer.registrar(), | ||
SimpleDateFormatSerializer.registrar()); | ||
} | ||
} |
93 changes: 93 additions & 0 deletions
93
flink-runtime/src/main/java/org/apache/flink/runtime/types/PriorityQueueSerializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http:https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.types; | ||
|
||
import com.esotericsoftware.kryo.Kryo; | ||
import com.esotericsoftware.kryo.Serializer; | ||
import com.esotericsoftware.kryo.io.Input; | ||
import com.esotericsoftware.kryo.io.Output; | ||
import com.twitter.chill.IKryoRegistrar; | ||
import com.twitter.chill.SingleRegistrar; | ||
|
||
import java.lang.reflect.Field; | ||
import java.util.Comparator; | ||
import java.util.PriorityQueue; | ||
|
||
/* | ||
This code is copied as is from Twitter Chill 0.7.4 because we need to user a newer chill version | ||
but want to ensure that the serializers that are registered by default stay the same. | ||
The only changes to the code are those that are required to make it compile and pass checkstyle | ||
checks in our code base. | ||
*/ | ||
|
||
class PriorityQueueSerializer extends Serializer<PriorityQueue<?>> { | ||
private Field compField; | ||
|
||
public static IKryoRegistrar registrar() { | ||
return new SingleRegistrar(PriorityQueue.class, new PriorityQueueSerializer()); | ||
} | ||
|
||
public PriorityQueueSerializer() { | ||
try { | ||
compField = PriorityQueue.class.getDeclaredField("comparator"); | ||
compField.setAccessible(true); | ||
} | ||
catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
public Comparator<?> getComparator(PriorityQueue<?> q) { | ||
try { | ||
return (Comparator<?>) compField.get(q); | ||
} | ||
catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
public void write(Kryo k, Output o, PriorityQueue<?> q) { | ||
k.writeClassAndObject(o, getComparator(q)); | ||
o.writeInt(q.size(), true); | ||
for (Object a : q) { | ||
k.writeClassAndObject(o, a); | ||
o.flush(); | ||
} | ||
} | ||
|
||
public PriorityQueue<?> read(Kryo k, Input i, Class<PriorityQueue<?>> c) { | ||
Comparator<Object> comp = (Comparator<Object>) k.readClassAndObject(i); | ||
int sz = i.readInt(true); | ||
// can't create with size 0: | ||
PriorityQueue<Object> result; | ||
if (sz == 0) { | ||
result = new PriorityQueue<Object>(1, comp); | ||
} | ||
else { | ||
result = new PriorityQueue<Object>(sz, comp); | ||
} | ||
int idx = 0; | ||
while (idx < sz) { | ||
result.add(k.readClassAndObject(i)); | ||
idx += 1; | ||
} | ||
return result; | ||
} | ||
} |
194 changes: 194 additions & 0 deletions
194
flink-runtime/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http:https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.types | ||
|
||
import scala.collection.immutable.{BitSet, HashMap, HashSet, ListMap, ListSet, NumericRange, Queue, Range, SortedMap, SortedSet} | ||
import scala.collection.mutable.{Buffer, ListBuffer, WrappedArray, BitSet => MBitSet, HashMap => MHashMap, HashSet => MHashSet, Map => MMap, Queue => MQueue, Set => MSet} | ||
import scala.util.matching.Regex | ||
import _root_.java.io.Serializable | ||
|
||
import com.twitter.chill._ | ||
|
||
|
||
import scala.collection.JavaConverters._ | ||
|
||
/* | ||
This code is copied as is from Twitter Chill 0.7.4 because we need to user a newer chill version | ||
but want to ensure that the serializers that are registered by default stay the same. | ||
The only changes to the code are those that are required to make it compile and pass checkstyle | ||
checks in our code base. | ||
*/ | ||
|
||
/** | ||
* This class has a no-arg constructor, suitable for use with reflection instantiation | ||
* It has no registered serializers, just the standard Kryo configured for Kryo. | ||
*/ | ||
class EmptyFlinkScalaKryoInstantiator extends KryoInstantiator { | ||
override def newKryo = { | ||
val k = new KryoBase | ||
k.setRegistrationRequired(false) | ||
k.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy) | ||
|
||
// Handle cases where we may have an odd classloader setup like with libjars | ||
// for hadoop | ||
val classLoader = Thread.currentThread.getContextClassLoader | ||
k.setClassLoader(classLoader) | ||
|
||
k | ||
} | ||
} | ||
|
||
object FlinkScalaKryoInstantiator extends Serializable { | ||
private val mutex = new AnyRef with Serializable // some serializable object | ||
@transient private var kpool: KryoPool = null | ||
|
||
/** | ||
* Return a KryoPool that uses the FlinkScalaKryoInstantiator | ||
*/ | ||
def defaultPool: KryoPool = mutex.synchronized { | ||
if (null == kpool) { | ||
kpool = KryoPool.withByteArrayOutputStream(guessThreads, new FlinkScalaKryoInstantiator) | ||
} | ||
kpool | ||
} | ||
|
||
private def guessThreads: Int = { | ||
val cores = Runtime.getRuntime.availableProcessors | ||
val GUESS_THREADS_PER_CORE = 4 | ||
GUESS_THREADS_PER_CORE * cores | ||
} | ||
} | ||
|
||
/** Makes an empty instantiator then registers everything */ | ||
class FlinkScalaKryoInstantiator extends EmptyFlinkScalaKryoInstantiator { | ||
override def newKryo = { | ||
val k = super.newKryo | ||
val reg = new AllScalaRegistrar | ||
reg(k) | ||
k | ||
} | ||
} | ||
|
||
class ScalaCollectionsRegistrar extends IKryoRegistrar { | ||
def apply(newK: Kryo) { | ||
// for binary compat this is here, but could be moved to RichKryo | ||
def useField[T](cls: Class[T]) { | ||
val fs = new com.esotericsoftware.kryo.serializers.FieldSerializer(newK, cls) | ||
fs.setIgnoreSyntheticFields(false) // scala generates a lot of these attributes | ||
newK.register(cls, fs) | ||
} | ||
// The wrappers are private classes: | ||
useField(List(1, 2, 3).asJava.getClass) | ||
useField(List(1, 2, 3).iterator.asJava.getClass) | ||
useField(Map(1 -> 2, 4 -> 3).asJava.getClass) | ||
useField(new _root_.java.util.ArrayList().asScala.getClass) | ||
useField(new _root_.java.util.HashMap().asScala.getClass) | ||
|
||
/* | ||
* Note that subclass-based use: addDefaultSerializers, else: register | ||
* You should go from MOST specific, to least to specific when using | ||
* default serializers. The FIRST one found is the one used | ||
*/ | ||
newK | ||
// wrapper array is abstract | ||
.forSubclass[WrappedArray[Any]](new WrappedArraySerializer[Any]) | ||
.forSubclass[BitSet](new BitSetSerializer) | ||
.forSubclass[SortedSet[Any]](new SortedSetSerializer) | ||
.forClass[Some[Any]](new SomeSerializer[Any]) | ||
.forClass[Left[Any, Any]](new LeftSerializer[Any, Any]) | ||
.forClass[Right[Any, Any]](new RightSerializer[Any, Any]) | ||
.forTraversableSubclass(Queue.empty[Any]) | ||
// List is a sealed class, so there are only two subclasses: | ||
.forTraversableSubclass(List.empty[Any]) | ||
// Add ListBuffer subclass before Buffer to prevent the more general case taking precedence | ||
.forTraversableSubclass(ListBuffer.empty[Any], isImmutable = false) | ||
// add mutable Buffer before Vector, otherwise Vector is used | ||
.forTraversableSubclass(Buffer.empty[Any], isImmutable = false) | ||
// Vector is a final class | ||
.forTraversableClass(Vector.empty[Any]) | ||
.forTraversableSubclass(ListSet.empty[Any]) | ||
// specifically register small sets since Scala represents them differently | ||
.forConcreteTraversableClass(Set[Any]('a)) | ||
.forConcreteTraversableClass(Set[Any]('a, 'b)) | ||
.forConcreteTraversableClass(Set[Any]('a, 'b, 'c)) | ||
.forConcreteTraversableClass(Set[Any]('a, 'b, 'c, 'd)) | ||
// default set implementation | ||
.forConcreteTraversableClass(HashSet[Any]('a, 'b, 'c, 'd, 'e)) | ||
// specifically register small maps since Scala represents them differently | ||
.forConcreteTraversableClass(Map[Any, Any]('a -> 'a)) | ||
.forConcreteTraversableClass(Map[Any, Any]('a -> 'a, 'b -> 'b)) | ||
.forConcreteTraversableClass(Map[Any, Any]('a -> 'a, 'b -> 'b, 'c -> 'c)) | ||
.forConcreteTraversableClass(Map[Any, Any]('a -> 'a, 'b -> 'b, 'c -> 'c, 'd -> 'd)) | ||
// default map implementation | ||
.forConcreteTraversableClass( | ||
HashMap[Any, Any]('a -> 'a, 'b -> 'b, 'c -> 'c, 'd -> 'd, 'e -> 'e)) | ||
// The normal fields serializer works for ranges | ||
.registerClasses(Seq(classOf[Range.Inclusive], | ||
classOf[NumericRange.Inclusive[_]], | ||
classOf[NumericRange.Exclusive[_]])) | ||
// Add some maps | ||
.forSubclass[SortedMap[Any, Any]](new SortedMapSerializer) | ||
.forTraversableSubclass(ListMap.empty[Any, Any]) | ||
.forTraversableSubclass(HashMap.empty[Any, Any]) | ||
// The above ListMap/HashMap must appear before this: | ||
.forTraversableSubclass(Map.empty[Any, Any]) | ||
// here are the mutable ones: | ||
.forTraversableClass(MBitSet.empty, isImmutable = false) | ||
.forTraversableClass(MHashMap.empty[Any, Any], isImmutable = false) | ||
.forTraversableClass(MHashSet.empty[Any], isImmutable = false) | ||
.forTraversableSubclass(MQueue.empty[Any], isImmutable = false) | ||
.forTraversableSubclass(MMap.empty[Any, Any], isImmutable = false) | ||
.forTraversableSubclass(MSet.empty[Any], isImmutable = false) | ||
} | ||
} | ||
|
||
class JavaWrapperCollectionRegistrar extends IKryoRegistrar { | ||
def apply(newK: Kryo) { | ||
newK.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer) | ||
} | ||
} | ||
|
||
/** Registers all the scala (and java) serializers we have */ | ||
class AllScalaRegistrar extends IKryoRegistrar { | ||
def apply(k: Kryo) { | ||
val col = new ScalaCollectionsRegistrar | ||
col(k) | ||
|
||
val jcol = new JavaWrapperCollectionRegistrar | ||
jcol(k) | ||
|
||
// Register all 22 tuple serializers and specialized serializers | ||
ScalaTupleSerialization.register(k) | ||
k.forClass[Symbol](new KSerializer[Symbol] { | ||
override def isImmutable = true | ||
def write(k: Kryo, out: Output, obj: Symbol) { out.writeString(obj.name) } | ||
def read(k: Kryo, in: Input, cls: Class[Symbol]) = Symbol(in.readString) | ||
}) | ||
.forSubclass[Regex](new RegexSerializer) | ||
.forClass[ClassManifest[Any]](new ClassManifestSerializer[Any]) | ||
.forSubclass[Manifest[Any]](new ManifestSerializer[Any]) | ||
.forSubclass[scala.Enumeration#Value](new EnumerationSerializer) | ||
|
||
// use the singleton serializer for boxed Unit | ||
val boxedUnit = scala.Unit.box(()) | ||
k.register(boxedUnit.getClass, new SingletonSerializer(boxedUnit)) | ||
FlinkChillPackageRegistrar.all()(k) | ||
} | ||
} |
Oops, something went wrong.