Skip to content

Commit

Permalink
[FLINK-2703] Prepare Flink for being used with Logback.
Browse files Browse the repository at this point in the history
This closes apache#1194
  • Loading branch information
rmetzger committed Sep 30, 2015
1 parent 622c1be commit 1243d7b
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 16 deletions.
129 changes: 129 additions & 0 deletions docs/apis/best_practices.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,132 @@ For Google Protobuf you need the following Maven dependency:


Please adjust the versions of both libraries as needed.


## Using Logback instead of Log4j

**Note: This tutorial is applicable starting from Flink 0.10**

Apache Flink is using [slf4j](http:https://www.slf4j.org/) as the logging abstraction in the code. Users are advised to use sfl4j as well in their user functions.

Sfl4j is a compile-time logging interface that can use different logging implementations at runtime, such as [log4j](http:https://logging.apache.org/log4j/2.x/) or [Logback](http:https://logback.qos.ch/).

Flink is depending on Log4j by default. This page describes how to use Flink with Logback.

To get a logger instance in the code, use the following code:


{% highlight java %}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyClass implements MapFunction {
private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
// ...
{% endhighlight %}


### Use Logback when running Flink out of the IDE / from a Java application


In all cases were classes are executed with a classpath created by a dependency manager such as Maven, Flink will pull log4j into the classpath.

Therefore, you will need to exclude log4j from Flink's dependencies. The following description will assume a Maven project created from a [Flink quickstart](../quickstart/java_api_quickstart.html).

Change your projects `pom.xml` file like this:

{% highlight xml %}
<dependencies>
<!-- Add the two required logback dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.3</version>
</dependency>

<!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
Hadoop is logging to log4j! -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.7</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>0.10-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.10-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.10-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
{% endhighlight %}

The following changes were done in the `<dependencies>` section:

* Exclude all `log4j` dependencies from all Flink dependencies: This causes Maven to ignore Flink's transitive dependencies to log4j.
* Exclude the `slf4j-log4j12` artifact from Flink's dependencies: Since we are going to use the slf4j to logback binding, we have to remove the slf4j to log4j binding.
* Add the Logback dependencies: `logback-core` and `logback-classic`
* Add dependencies for `log4j-over-slf4j`. `log4j-over-slf4j` is a tool which allows legacy applications which are directly using the Log4j APIs to use the Slf4j interface. Flink depends on Hadoop which is directly using Log4j for logging. Therefore, we need to redirect all logger calls from Log4j to Slf4j which is in turn logging to Logback.

Please note that you need to manually add the exclusions to all new Flink dependencies you are adding to the pom file.

You may also need to check if other dependencies (non Flink) are pulling in log4j bindings. You can analyze the dependencies of your project with `mvn dependency:tree`.



### Use Logback when running Flink on a cluster

This tutorial is applicable when running Flink on YARN or as a standalone cluster.

In order to use Logback instead of Log4j with Flink, you need to remove the `log4j-1.2.xx.jar` and `sfl4j-log4j12-xxx.jar` from the `lib/` directory.

Next, you need to put the following jar files into the `lib/` folder:

* `logback-classic.jar`
* `logback-core.jar`
* `log4j-over-slf4j.jar`: This bridge needs to be present in the classpath for redirecting logging calls from Hadoop (which is using Log4j) to Slf4j.

2 changes: 2 additions & 0 deletions flink-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ under the License.
<exclude>org.apache.flink:flink-scala-examples</exclude>
<exclude>org.apache.flink:flink-streaming-examples</exclude>
<exclude>org.apache.flink:flink-python</exclude>
<exclude>org.slf4j:slf4j-log4j12</exclude>
<exclude>log4j:log4j</exclude>
</excludes>
</artifactSet>
<transformers>
Expand Down
2 changes: 2 additions & 0 deletions flink-dist/src/main/assemblies/bin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ under the License.

<includes>
<include>org.apache.flink:flink-python</include>
<include>org.slf4j:slf4j-log4j12</include>
<include>log4j:log4j</include>
</includes>
</dependencySet>
</dependencySets>
Expand Down
9 changes: 8 additions & 1 deletion flink-shaded-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,14 @@ under the License.
<filter>
<artifact>org.slf4j:*</artifact>
<excludes>
<exclude>org/slf4j/impl/StaticLoggerBinder*</exclude>
<exclude>org/slf4j/impl/**</exclude>
</excludes>
</filter>
<!-- Exclude Hadoop's log4j. Hadoop can use Flink's log4j dependency -->
<filter>
<artifact>log4j:*</artifact>
<excludes>
<exclude>org/apache/log4j/**</exclude>
</excludes>
</filter>
</filters>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void setup() {
@Test
public void testClientStartup() {
LOG.info("Starting testClientStartup()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
"-jm", "768",
"-tm", "1024", "-qu", "qa-team"},
Expand All @@ -72,6 +72,7 @@ public void testNonexistingQueue() {
LOG.info("Starting testNonexistingQueue()");
addTestAppender(FlinkYarnClient.class, Level.WARN);
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
"-jm", "768",
"-tm", "1024",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void checkForProhibitedLogContents() {
@Test
public void testClientStartup() {
LOG.info("Starting testClientStartup()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
"-jm", "768",
"-tm", "1024",
Expand All @@ -119,6 +119,7 @@ public void testDetachedMode() {
LOG.info("Starting testDetachedMode()");
addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
"-jm", "768",
"-tm", "1024",
Expand Down Expand Up @@ -166,7 +167,7 @@ public void testDetachedMode() {
@Test(timeout=100000) // timeout after 100 seconds
public void testTaskManagerFailure() {
LOG.info("Starting testTaskManagerFailure()");
Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
"-jm", "768",
"-tm", "1024",
Expand Down Expand Up @@ -338,6 +339,7 @@ public void testQueryCluster() {
public void testNonexistingQueue() {
LOG.info("Starting testNonexistingQueue()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
"-jm", "768",
"-tm", "1024",
Expand All @@ -362,7 +364,7 @@ public void testNonexistingQueue() {
public void testResourceComputation() {
addTestAppender(FlinkYarnClient.class, Level.WARN);
LOG.info("Starting testResourceComputation()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "5",
"-jm", "256",
"-tm", "1585"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
Expand Down Expand Up @@ -390,7 +392,7 @@ public void testResourceComputation() {
public void testfullAlloc() {
addTestAppender(FlinkYarnClient.class, Level.WARN);
LOG.info("Starting testfullAlloc()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "2",
"-jm", "256",
"-tm", "3840"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION, 0);
Expand All @@ -413,7 +415,7 @@ public void perJobYarnCluster() {
File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"} , "streaming")); // exclude streaming wordcount here.
Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
runWithArgs(new String[]{"run", "-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
"-yn", "1",
"-ys", "2", //test that the job is executed with a DOP of 2
"-yjm", "768",
Expand Down Expand Up @@ -441,6 +443,7 @@ public void perJobYarnClusterWithParallelism() {
"-p", "2", //test that the job is executed with a DOP of 2
"-m", "yarn-cluster",
"-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
"-yn", "1",
"-yjm", "768",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
Expand Down Expand Up @@ -477,6 +480,7 @@ private void testDetachedPerJobYarnClusterInternal(String job) {
}

Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
"-yt", flinkLibFolder.getAbsolutePath(),
"-yn", "1",
"-yjm", "768",
"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
Expand Down Expand Up @@ -621,6 +625,7 @@ public void testJavaAPI() {
flinkYarnClient.setJobManagerMemory(768);
flinkYarnClient.setTaskManagerMemory(1024);
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
String confDirPath = System.getenv("FLINK_CONF_DIR");
flinkYarnClient.setConfigurationDirectory(confDirPath);
flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
Expand All @@ -632,9 +637,8 @@ public void testJavaAPI() {
yarnCluster = flinkYarnClient.deploy();
yarnCluster.connectToCluster();
} catch (Exception e) {
System.err.println("Error while deploying YARN cluster: "+e.getMessage());
LOG.warn("Failing test", e);
Assert.fail();
Assert.fail("Error while deploying YARN cluster: "+e.getMessage());
}
FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1);
for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,18 @@ public abstract class YarnTestBase extends TestLogger {

protected static MiniYARNCluster yarnCluster = null;

/**
* Uberjar (fat jar) file of Flink
*/
protected static File flinkUberjar;

protected static final Configuration yarnConfiguration;

/**
* lib/ folder of the flink distribution.
*/
protected static File flinkLibFolder;

static {
yarnConfiguration = new YarnConfiguration();
yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
Expand Down Expand Up @@ -329,6 +337,10 @@ public static void startYARNWithConfig(Configuration conf) {
flinkUberjar = findFile(uberjarStartLoc, new RootDirFilenameFilter());
Assert.assertNotNull("Flink uberjar not found", flinkUberjar);
String flinkDistRootDir = flinkUberjar.getParentFile().getParent();
flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/
Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder);
Assert.assertTrue("lib folder not found", flinkLibFolder.exists());
Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory());

if (!flinkUberjar.exists()) {
Assert.fail("Unable to locate yarn-uberjar.jar");
Expand Down
2 changes: 1 addition & 1 deletion flink-yarn-tests/src/main/resources/log4j-test.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.
################################################################################

log4j.rootLogger=FATAL, console
log4j.rootLogger=INFO, console

# Log all infos in the given file
log4j.appender.console=org.apache.log4j.ConsoleAppender
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -248,15 +247,14 @@ public int getTaskManagerCount() {
@Override
public void setShipFiles(List<File> shipFiles) {
File shipFile;
for(Iterator<File> it = shipFiles.iterator(); it.hasNext(); ) {
shipFile = it.next();
for (File shipFile1 : shipFiles) {
shipFile = shipFile1;
// remove uberjar from ship list (by default everything in the lib/ folder is added to
// the list of files to ship, but we handle the uberjar separately.
if(shipFile.getName().startsWith("flink-dist-") && shipFile.getName().endsWith("jar")) {
it.remove();
if (!(shipFile.getName().startsWith("flink-dist-") && shipFile.getName().endsWith("jar"))) {
this.shipFiles.add(shipFile);
}
}
this.shipFiles.addAll(shipFiles);
}

public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
Expand Down

0 comments on commit 1243d7b

Please sign in to comment.