From 6b36fd20f9aefaa75475f840b437229e351f401a Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Fri, 9 Jan 2015 17:20:06 +0100 Subject: [PATCH] [FLINK-1197] [docs] Add information about types and type extraction --- docs/_includes/sidenav.html | 3 +- docs/internal_job_scheduling.md | 2 +- docs/internal_types_serialization.md | 228 +++++++++++++++++++++++++++ 3 files changed, 231 insertions(+), 2 deletions(-) create mode 100644 docs/internal_types_serialization.md diff --git a/docs/_includes/sidenav.html b/docs/_includes/sidenav.html index 8a6585ff68a78..a308c182965e5 100644 --- a/docs/_includes/sidenav.html +++ b/docs/_includes/sidenav.html @@ -50,9 +50,10 @@
  • -
  • Internals
  • +
  • Advanced
  • +
  • diff --git a/docs/internal_job_scheduling.md b/docs/internal_job_scheduling.md index 6cfde4daf0ff2..0d08f76bd8d5e 100644 --- a/docs/internal_job_scheduling.md +++ b/docs/internal_job_scheduling.md @@ -35,7 +35,7 @@ each of which can run one pipeline of parallel tasks. A pipeline consists of mul Note that Flink often executes successive tasks concurrently: For Streaming programs, that happens in any case, but also for batch programs, it happens frequently. -The figure below illustrates that. Consider a program with a data source, a *MapFunction*, and a *ReduceFunctoin*. +The figure below illustrates that. Consider a program with a data source, a *MapFunction*, and a *ReduceFunction*. The source and MapFunction are executed with a parallelism of 4, while the ReduceFunction is executed with a parallism of 3. A pipeline consists of the sequence Source - Map - Reduce. On a cluster with 2 TaskManagers with 3 slots each, the program will be executed as described below. diff --git a/docs/internal_types_serialization.md b/docs/internal_types_serialization.md new file mode 100644 index 0000000000000..187364bd8fd67 --- /dev/null +++ b/docs/internal_types_serialization.md @@ -0,0 +1,228 @@ +--- +title: "Type Extraction and Serialization" +--- + + + +Flink handles types in a unique way, containing its own type descriptors, +generic type extraction, and type serialization framework. +This document describes the concepts and the rationale behind them. + +There are fundamental differences in the way that the Scala API and +the Java API handle type information, so most of the issues described +here relate only to one of the to APIs. + +* This will be replaced by the TOC +{:toc} + + +## Type handling in Flink + +Flink tries to know as much information about what types enter and leave user functions as possible. +This stands in contrast to the approach to just assuming nothing and letting the +programming language and serialization framework handle all types dynamically. + +* To allow using POJOs and grouping/joining them by referring to field names, Flink needs the type + information to make checks (for typos and type compatibility) before the job is executed. + +* The more we know, the better serialization and data layout schemes the compiler/optimizer can develop. + That is quite important for the memory usage paradigm in Flink (work on serialized data + inside/outside the heap and make serialization very cheap). + +* For the upcoming logical programs (see roadmap draft) we need this to know the "schema" of functions. + +* Finally, it also spares users having to worry about serialization frameworks and having to register + types at those frameworks. + + +## Flink's TypeInformation class + +The class {% gh_link /flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java "TypeInformation" %} +is the base class for all type descriptors. It reveals some basic properties of the type and can generate serializers +and, in specializations, comparators for the types. +(*Note that comparators in Flink do much more than defining an order - they are basically the utility to handle keys*) + +Internally, Flink makes the following distinctions between types: + +* Basic types: All Java primitives and their boxed form, plus `void`, `String`, and `Date`. + +* Primitive arrays and Object arrays + +* Composite types + + * Flink Java Tuples (part of the Flink Java API) + + * Scala *case classes* (including Scala tuples) + + * POJOs: classes that follow a certain bean-like pattern + +* Scala auxiliary types (Option, Either, Lists, Maps, ...) + +* Generic types: These will not be serialized by Flink itself, but by Kryo. + +POJOs are of particular interest, because they support the creation of complex types and the use of field +names in the definition of keys: `dataSet.join(another).where("name").equalTo("personName")`. +They are also transparent to the runtime and can be handled very efficiently by Flink. + + +**Rules for POJO types** + +Flink recognizes a data type as a POJO type (and allows "by-name" field referencing) if the following +conditions are fulfilled: + +* The class is public and standalone (no non-static inner class) +* The class has a public no-argument constructor +* All fields in the class (and all superclasses) are either public or + or have a public getter and a setter method that follows the Java beans + naming conventions for getters and setters. + + +## Type Information in the Scala API + +Scala has very elaborate concepts for runtime type information though *type manifests* and *class tags*. In +general, types and methods have access to the types of their generic parameters - thus, Scala programs do +not suffer from type erasure as Java programs do. + +In addition, Scala allows to run custom code in the Scala Compiler through Scala Macros - that means that some Flink +code gets executed whenever you compile a Scala program written against Flink's Scala API. + +We use the Macros to look at the parameter types and return types of all user functions during compilation - that +is the point in time when certainly all type information is perfectly available. Within the macro, we create +a *TypeInformation* for the function's return types (or parameter types) and make it part of the operation. + + +#### No Implicit Value for Evidence Parameter Error + +In the case where TypeInformation could not be created, programs fail to compile with an error +stating *"could not find implicit value for evidence parameter of type TypeInformation"*. + +A frequent reason if that the code that generates the TypeInformation has not been imported. +Make sure to import the entire flink.api.scala package. +{% highlight scala %} +import org.apache.flink.api.scala._ +{% endhighlight %} + +Another common cause are generic methods, which can be fixed as described in the following section. + + +#### Generic Methods + +Consider the following case below: + +{% highlight scala %} +def[T] selectFirst(input: DataSet[(T, _)]) : DataSet[T] = { + input.map { v => v._1 } +} + +val data : DataSet[(String, Long) = ... + +val result = selectFirst(data) +{% endhighlight %} + +For such generic methods, the data types of the function parameters and return type may not be the same +for every call and are not known at the site where the method is defined. The code above will result +in an error that not enough implicit evidence is available. + +In such cases, the type information has to be generated at the invocation site and passed to the +method. Scala offers *implicit parameters* for that. + +The following code tells Scala to bring a type information for *T* into the function. The type +information will then be generated at the sites where the method is invoked, rather than where the +method is defined. + +{% highlight scala %} +def[T : TypeInformation] selectFirst(input: DataSet[(T, _)]) : DataSet[T] = { + input.map { v => v._1 } +} +{% endhighlight %} + + + +## Type Information in the Java API + +Java in general erases generic type information. Only for subclasses of generic classes, the subclass +stores the type to which the generic type variables bind. + +Flink uses reflection on the (anonymous) classes that implement the user functions to figure out the types of +the generic parameters of the function. This logic also contains some simple type inference for cases where +the return types of functions are dependent on input types, such as in the generic utility method below: + +{% highlight java %} +public class AppendOne extends MapFunction> { + + public Tuple2 map(T value) { + return new Tuple2(value, 1L); + } +} +{% endhighlight %} + +Not in all cases can Flink figure out the data types of functions reliably in Java. +Some issues remain with generic lambdas (we are trying to solve this with the Java community, +see below) and with generic type variables that we cannot infer. + + +#### Type Hints in the Java API + +To help cases where Flink cannot reconstruct the erased generic type information, the Java API +offers so called *type hints* from version 0.9 on. The type hints tell the system the type of +the data set produced by a function. The following gives an example: + +{% highlight java %} +DataSet result = dataSet + .map(new MyGenericNonInferrableFunction()) + .returns(SomeType.class); +{% endhighlight %} + +The `returns` statement specifies the produced type, in this case via a class. The hints support +type definition through + +* Classes, for non-parameterized types (no generics) +* Strings in the form of `returns("Tuple2")`, which are parsed and converted + to a TypeInformation. +* A TypeInformation directly + + +#### Type extraction for Java 8 lambdas + +Type extraction for Java 8 lambdas works differently than for non-lambdas, because lambdas are not associated +with an implementing class that extends the function interface. + +Currently, Flink tries to figure out which method implements the lambda and uses Java's generic signatures to +determine the parameter types and the return type. However, these signatures are not generated for lambdas +by all compilers (as of writing this document only reliably by the Eclipse JDT compiler 4.5 from Milestone 2 +onwards) + + +**Improving Type information for Java Lambdas** + +One of the Flink committers (Timo Walther) has actually become active in the Eclipse JDT compiler community and +in the OpenJDK community and submitted patches to the compiler to improve availability of type information +available for Java 8 lambdas. + +The Eclipse JDT compiler has added support for this as of version 4.5 M4. Discussion about the feature in the +OpenJDK compiler is pending. + + + + + + + +