Skip to content

Commit

Permalink
[FLINK-9286] [docs] (addendum) Update classloading docs for inverted …
Browse files Browse the repository at this point in the history
…classloading
  • Loading branch information
StephanEwen committed May 24, 2018
1 parent 428a900 commit c07b693
Showing 1 changed file with 69 additions and 40 deletions.
109 changes: 69 additions & 40 deletions docs/monitoring/debugging_classloading.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,44 @@ under the License.
When running Flink applications, the JVM will load various classes over time.
These classes can be divided into two domains:

- The **Flink Framework** domain: This includes all code in the `/lib` directory in the Flink directory.
By default these are the classes of Apache Flink and its core dependencies.
- The **Java Classpath**: This is Java's common classpath, and it includes the JDK libraries, and all code
in Flink's `/lib` folder (the classes of Apache Flink and its core dependencies).

- The **User Code** domain: These are all classes that are included in the JAR file submitted via the CLI or web interface.
That includes the job's classes, and all libraries and connectors that are put into the uber JAR.
- The **Dynamic User Code**: These are all classes that are included in the JAR files of dynamically submitted jobs,
(via REST, CLI, web UI). They are loaded (and unloaded) dynamically per job.

What classes are part of which domain depends on the particular setup in which you run Apache Flink. As a general rule, whenever you start the Flink
processes first, and the submit jobs, the job's classes are loaded dynamically. If the Flink processes are started together with the job/application,
or the application spawns the Flink components (JobManager, TaskManager, etc.) then all classes are in the Java classpath.

The class loading behaves slightly different for various Flink setups:
In the following are some more details about the different deployment modes:

**Standalone**
**Standalone Session**

When starting a Flink cluster as a standalone session, the JobManagers and TaskManagers are started with the Flink framework classes in the
Java classpath. The classes from all jobs/applications that are submitted against the session (via REST / CLI) are loaded *dynamically*.

<!--
**Docker Containers with Flink-as-a-Library**
If you package a Flink job/application such that your application treats Flink like a library (Flink JobManager/TaskManager daemons as spawned as needed),
then typically all classes are in the *application classpath*. This is the recommended way for container-based setups where the container is specifically
created for an job/application and will contain the job/application's jar files.
-->

**Docker / Kubernetes Sessions**

Docker / Kubernetes setups that start first a set of JobManagers / TaskManagers and then submit jobs/applications via REST or the CLI
behave like standalone sessions: Flink's code is in the Java classpath, the job's code is loaded dynamically.

When starting a Flink cluster, the JobManagers and TaskManagers are started with the Flink framework classes in the
classpath. The classes from all jobs that are submitted against the cluster are loaded *dynamically*.

**YARN**

YARN classloading differs between single job deployments and sessions:

- When submitting a Flink job directly to YARN (via `bin/flink run -m yarn-cluster ...`), dedicated TaskManagers and
JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in their classpath.
- When submitting a Flink job/application directly to YARN (via `bin/flink run -m yarn-cluster ...`), dedicated TaskManagers and
JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in the Java classpath.
That means that there is *no dynamic classloading* involved in that case.

- When starting a YARN session, the JobManagers and TaskManagers are started with the Flink framework classes in the
Expand All @@ -58,21 +76,37 @@ YARN classloading differs between single job deployments and sessions:
**Mesos**

Mesos setups following [this documentation](../ops/deployment/mesos.html) currently behave very much like the a
YARN session: The TaskManager and JobManager processes are started with the Flink framework classes in classpath, job
YARN session: The TaskManager and JobManager processes are started with the Flink framework classes in the Java classpath, job
classes are loaded dynamically when the jobs are submitted.

## Configuring ClassLoader Resolution Order

Flink uses a hierarchy of ClassLoaders for loading classes from the user-code jar(s). The user-code
ClassLoader has a reference to the parent ClassLoader, which is the default Java ClassLoader in most
cases. By default, Java ClassLoaders will first look for classes in the parent ClassLoader and then in
the child ClassLoader for cases where we have a hierarchy of ClassLoaders. This is problematic if you
have in your user jar a version of a library that conflicts with a version that comes with Flink. You can
change this behaviour by configuring the ClassLoader resolution order via
[classloader.resolve-order](../ops/config.html#classloader-resolve-order) in the Flink config.
Note that certain classes are always resolved through the parent ClassLoader first, which can be configured via
## Inverted Class Loading and ClassLoader Resolution Order

In setups where dynamic classloading is involved (sessions), there is a hierarchy of typically two ClassLoaders:
(1) Java's *application classloader*, which has all classes in the classpath, and (2) the dynamic *user code classloader*.
for loading classes from the user-code jar(s). The user-code ClassLoader has the application classloader as its parent.
cases.

By default, Flink inverts classloading order, meaning it looks into the user code classloader first, and only looks into
the parent (application classloader) if the class is not part of the dynamically loaded user code.

The benefit of inverted classloading is that jobs can use different library versions than Flink's core itself, which is very
useful when the different versions of the libraries are not compatible. The mechanism helps to avoid the common dependency conflict
errors like `IllegalAccessError` or `NoSuchMethodError`. Different parts of the code simply have separate copies of the
classes (Flink's core or one of its dependencies can use a different copy than the user code).
In most cases, this work well and no additional configuration from the user is needed.

However, there are cases when the inverted classloading causes problems (see below, "X cannot be cast to X").
You can revert back to Java's default mode by configuring the ClassLoader resolution order via
[classloader.resolve-order](../ops/config.html#classloader-resolve-order) in the Flink config to `parent-first`
(from Flink's default `child-first`).

Please note that certain classes are always resolved in a *parent-first* way (through the parent ClassLoader first), because they
are shared between Flink's core and the user code or the user-code facing APIs. The packages for these classes are configured via
[classloader.parent-first-patterns-default](../ops/config.html#classloader-parent-first-patterns-default) and
[classloader.parent-first-patterns-additional](../ops/config.html#classloader-parent-first-patterns-additional).
To add new packages to be *parent-first* loaded, please set the `classloader.parent-first-patterns-additional` config option.


## Avoiding Dynamic Classloading

Expand Down Expand Up @@ -103,26 +137,22 @@ and access the user code class loader via `getRuntimeContext().getUserCodeClassL

## X cannot be cast to X exceptions

When you see an exception in the style `com.foo.X cannot be cast to com.foo.X`, it means that multiple versions of the class
`com.foo.X` have been loaded by different class loaders, and types of that class are attempted to be assigned to each other.

The reason is in most cases that an object of the `com.foo.X` class loaded from a previous execution attempt is still cached somewhere,
and picked up by a restarted task/operator that reloaded the code. Note that this is again only possible in deployments that use
dynamic class loading.
In setups with dynamic classloading, you may see an exception in the style `com.foo.X cannot be cast to com.foo.X`.
This means that multiple versions of the class `com.foo.X` have been loaded by different class loaders, and types of that class are attempted to be assigned to each other.

Common causes of cached object instances:
One common reason is that a library is not compatible with Flink's *inverted classloading* approach. You can turn off inverted classloading
to verify this (set [`classloader.resolve-order: parent-first`](../ops/config.html#classloader-resolve-order) in the Flink config) or exclude
the library from inverted classloading (set [`classloader.parent-first-patterns-additional`](../ops/config.html#classloader-parent-first-patterns-additional)
in the Flink config).

- When using *Apache Avro*: The *SpecificDatumReader* caches instances of records. Avoid using `SpecificData.INSTANCE`. See also
[this discussion](https://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-get-help-on-ClassCastException-when-re-submitting-a-job-tp10972p11133.html)

- Using certain serialization frameworks for cloning objects (such as *Apache Avro*)

- Interning objects (for example via Guava's Interners)
Another cause can be cached object instances, as produced by some libraries like *Apache Avro*, or by interning objects (for example via Guava's Interners).
The solution here is to either have a setup without any dynamic classloading, or to make sure that the respective library is fully part of the dynamically loaded code.
The latter means that the library must not be added to Flink's `/lib` folder, but must be part of the application's fat-jar/uber-jar


## Unloading of Dynamically Loaded Classes

All scenarios that involve dynamic class loading (i.e., standalone, sessions, mesos, ...) rely on classes being *unloaded* again.
All scenarios that involve dynamic class loading (sessions) rely on classes being *unloaded* again.
Class unloading means that the Garbage Collector finds that no objects from a class exist and more, and thus removes the class
(the code, static variable, metadata, etc).

Expand All @@ -141,15 +171,14 @@ Common causes for class leaks and suggested fixes:

## Resolving Dependency Conflicts with Flink using the maven-shade-plugin.

Apache Flink loads many classes by default into its classpath. If a user uses a different version of a library that Flink is using, often `IllegalAccessExceptions` or `NoSuchMethodError` are the result.
A way to address dependency conflicts from the application developer's side is to avoid exposing dependencies by *shading them away*.

Through Hadoop, Flink for example depends on the `aws-sdk` library or on `protobuf-java`. If your user code is using these libraries and you run into issues we recommend relocating the dependency in your user code jar.

Apache Maven offers the [maven-shade-plugin](https://maven.apache.org/plugins/maven-shade-plugin/), which allows one to change the package of a class *after* compiling it (so the code you are writing is not affected by the shading). For example if you have the `com.amazonaws` packages from the aws sdk in your user code jar, the shade plugin would relocate them into the `org.myorg.shaded.com.amazonaws` package, so that your code is calling your aws sdk version.
Apache Maven offers the [maven-shade-plugin](https://maven.apache.org/plugins/maven-shade-plugin/), which allows one to change the package of a
class *after* compiling it (so the code you are writing is not affected by the shading). For example if you have the `com.amazonaws` packages from
the aws sdk in your user code jar, the shade plugin would relocate them into the `org.myorg.shaded.com.amazonaws` package, so that your code is calling your aws sdk version.

This documentation page explains [relocating classes using the shade plugin](https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html).


Note that some of Flink's dependencies, such as `guava` are shaded away by the maintainers of Flink, so users usually don't have to worry about it.
Note that most of Flink's dependencies, such as `guava`, `netty`, `jackson`, etc. are shaded away by the maintainers of Flink, so users usually don't have to worry about it.

{% top %}

0 comments on commit c07b693

Please sign in to comment.