Skip to content

Commit

Permalink
[FLINK-2873] detect & serve the job manager log files correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
mxm committed Oct 20, 2015
1 parent 6c44d93 commit e711969
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,10 @@ public final class ConfigConstants {
*/
public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history";

public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.logpath";
/**
* The log file location (may be in /log for standalone but under log directory when using YARN)
*/
public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path";


// ------------------------------ Web Client ------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorSystem;
import com.google.common.io.PatternFilenameFilter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
Expand All @@ -32,9 +31,7 @@
import io.netty.handler.codec.http.router.Router;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
Expand Down Expand Up @@ -65,10 +62,8 @@
import scala.concurrent.duration.FiniteDuration;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -91,12 +86,6 @@ public class WebRuntimeMonitor implements WebMonitor {
/** Logger for web frontend startup / shutdown messages */
private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);

/** Job manager's log file pattern */
public static final FilenameFilter LOG_FILE_PATTERN = new PatternFilenameFilter(".*jobmanager[^\\.]*\\.log");

/** Job manager's stdout file pattern */
public static final FilenameFilter STDOUT_FILE_PATTERN = new PatternFilenameFilter(".*jobmanager[^\\.]*\\.out");

// ------------------------------------------------------------------------

/** Guarding concurrent modifications to the server channel pipeline during startup and shutdown */
Expand Down Expand Up @@ -136,39 +125,11 @@ public WebRuntimeMonitor(
String fileName = String.format("flink-web-%s", UUID.randomUUID().toString());
webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName);
LOG.info("Using directory {} for the web interface files", webRootDir);

// figure out where our logs are
final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
final String defaultLogDirectory = flinkRoot + "/log";
final String logDirectories = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, defaultLogDirectory);

// find out which directory holds the path for log and stdout
final ArrayList<String> logPaths = new ArrayList<>();
final ArrayList<String> outPaths = new ArrayList<>();

// yarn allows for multiple log directories. Search in all.
for(String paths: logDirectories.split(",")) {
File dir = new File(paths);
if (dir.exists() && dir.isDirectory() && dir.canRead()) {
if (dir.listFiles(LOG_FILE_PATTERN).length == 1) {
logPaths.add(paths);
}
if (dir.listFiles(STDOUT_FILE_PATTERN).length == 1) {
outPaths.add(paths);
}
}
}

// we don't want any ambiguities. There must be only one log and out file.
if(logPaths.size() != 1 || outPaths.size() != 1) {
throw new IllegalConfigurationException("The path to the log and out files (" +
logDirectories + ") is not valid.");
}
final WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(config);

final File logDir = new File(logPaths.get(0));
final File outDir = new File(outPaths.get(0));
LOG.info("Serving job manager logs from {}", logDir.getAbsolutePath());
LOG.info("Serving job manager stdout from {}", outDir.getAbsolutePath());
LOG.info("Serving job manager log from {}", logFiles.logFile.getAbsolutePath());
LOG.info("Serving job manager stdout from {}", logFiles.stdOutFile.getAbsolutePath());

// port configuration
this.configuredPort = cfg.getWebFrontendPort();
Expand All @@ -190,7 +151,7 @@ public WebRuntimeMonitor(
// the overview - how many task managers, slots, free slots, ...
.GET("/overview", handler(new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT)))

// job manager configuration, log and stdout
// job manager configuration
.GET("/jobmanager/config", handler(new JobManagerConfigHandler(config)))

// overview over jobs
Expand Down Expand Up @@ -220,8 +181,10 @@ public WebRuntimeMonitor(
.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))

.GET("/jobmanager/log", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logDir))
.GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, outDir))
// log and stdout
.GET("/jobmanager/log", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile))
.GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile))

// this handler serves all the static contents
.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import io.netty.handler.codec.http.router.Routed;
import io.netty.util.CharsetUtil;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.slf4j.Logger;
Expand All @@ -60,7 +59,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.FilenameFilter;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.text.ParseException;
Expand Down Expand Up @@ -166,11 +164,9 @@ public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Except
requestPath = requestPath + "index.html";
}

// in case the files being accessed are logs or stdout files, find appropriate paths.
if (requestPath.equals("/jobmanager/log")) {
requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.LOG_FILE_PATTERN);
} else if (requestPath.equals("/jobmanager/stdout")) {
requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.STDOUT_FILE_PATTERN);
// in case the files being accessed are logs or stdout files, find appropriate paths.
if (requestPath.equals("/jobmanager/log") || requestPath.equals("/jobmanager/stdout")) {
requestPath = "";
}

Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
Expand Down Expand Up @@ -371,9 +367,4 @@ private static void setContentTypeHeader(HttpResponse response, File file) {
String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType();
response.headers().set(CONTENT_TYPE, mimeFinal);
}

private static String getFileName(File directory, FilenameFilter pattern) {
File[] files = directory.listFiles(pattern);
return files.length == 0 ? null : files[0].getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
Expand Down Expand Up @@ -84,12 +85,12 @@ public void testStandaloneWebRuntimeMonitor() throws Exception {
ActorRef jmActor = flink.jobManagerActors().get().head();

File logDir = temporaryFolder.newFolder("log");
Files.createFile(new File(logDir, "jobmanager.log").toPath());
Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());

Configuration monitorConfig = new Configuration();
monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath());
monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());

// Needs to match the leader address from the leader retrieval service
String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
Expand Down Expand Up @@ -149,11 +150,11 @@ public void testRedirectToLeader() throws Exception {
temporaryFolder.getRoot().getPath());

File logDir = temporaryFolder.newFolder();
Files.createFile(new File(logDir, "jobmanager.log").toPath());
Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());

config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath());
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());

for (int i = 0; i < jobManagerSystem.length; i++) {
jobManagerSystem[i] = AkkaUtils.createActorSystem(new Configuration(),
Expand Down Expand Up @@ -289,12 +290,12 @@ public void testLeaderNotAvailable() throws Exception {
try (TestingServer zooKeeper = new TestingServer()) {

File logDir = temporaryFolder.newFolder();
Files.createFile(new File(logDir, "jobmanager.log").toPath());
Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());

final Configuration config = new Configuration();
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath());
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorSystem;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
Expand All @@ -33,6 +35,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
Expand All @@ -47,6 +50,66 @@ public final class WebMonitorUtils {

private static final Logger LOG = LoggerFactory.getLogger(WebMonitorUtils.class);

/**
* Singleton to hold the log and stdout file
*/
public static class LogFiles {

private static LogFiles INSTANCE;

public final File logFile;
public final File stdOutFile;

private LogFiles(String logFile) {
this.logFile = checkFileLocation(logFile);
String stdOutFile = logFile.replaceFirst("\\.log$", ".out");
this.stdOutFile = checkFileLocation(stdOutFile);;
}

/**
* Verify log file location
* @param logFilePath Path to log file
* @return File or null if not a valid log file
*/
private static File checkFileLocation (String logFilePath) {
File logFile = new File(logFilePath);
if (logFile.exists() && logFile.canRead()) {
return logFile;
} else {
throw new IllegalConfigurationException("Job manager log file was supposed to be at " +
logFile.getAbsolutePath() + " but it does not exist or is not readable.");
}
}

/**
* Finds the Flink log directory using log.file Java property that is set during startup.
*/
public static LogFiles find(Configuration config) {
if (INSTANCE == null) {

/** Figure out log file location based on 'log.file' VM argument **/
final String logEnv = "log.file";
String logFilePath = System.getProperty(logEnv);

if (logFilePath == null) {
LOG.warn("Log file environment variable '{}' is not set.", logEnv);
logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
}

if (logFilePath == null) {
throw new IllegalConfigurationException("JobManager log file not found. " +
"Can't serve log files. Log file location couldn't be determined via the " +
logEnv + " environment variable or the config constant " +
ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY);
}

INSTANCE = new LogFiles(logFilePath);
}

return INSTANCE;
}
}

/**
* Starts the web runtime monitor. Because the actual implementation of the runtime monitor is
* in another project, we load the runtime monitor dynamically.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -131,7 +132,7 @@ public static ForkableFlinkMiniCluster startCluster(
logDir = File.createTempFile("TestBaseUtils-logdir", null);
Assert.assertTrue("Unable to delete temp file", logDir.delete());
Assert.assertTrue("Unable to create temp directory", logDir.mkdir());
Files.createFile(new File(logDir, "jobmanager.log").toPath());
Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());

config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
Expand All @@ -141,7 +142,7 @@ public static ForkableFlinkMiniCluster startCluster(
config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);

config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.toString());
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());

ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, singleActorSystem, mode);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,13 @@ public void getTaskmanagers() {
@Test
public void getLogAndStdoutFiles() {
try {
String logPath = cluster.configuration().getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
Assert.assertNotNull(logPath);
WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(cluster.configuration());

FileUtils.writeStringToFile(new File(logPath, "jobmanager.log"), "job manager log");
FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
String logs = getFromHTTP("https://localhost:" + port + "/jobmanager/log");
Assert.assertTrue(logs.contains("job manager log"));

FileUtils.writeStringToFile(new File(logPath, "jobmanager.out"), "job manager out");
FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
logs = getFromHTTP("https://localhost:" + port + "/jobmanager/stdout");
Assert.assertTrue(logs.contains("job manager out"));
}catch(Throwable e) {
Expand All @@ -138,8 +137,7 @@ public void getConfiguration() {
JSONArray array = new JSONArray(config);

Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(array);
Assert.assertEquals(logDir.toString(),
conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY));
Assert.assertTrue(conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY).startsWith(logDir.toString()));
Assert.assertEquals(
cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ abstract class ApplicationMasterBase {
val currDir = env.get(Environment.PWD.key())
require(currDir != null, "Current directory unknown.")

val logDirs = env.get(Environment.LOG_DIRS.key())

val streamingMode = if(ApplicationMasterBase.hasStreamingMode(env)) {
log.info("Starting ApplicationMaster/JobManager in streaming mode")
StreamingMode.STREAMING
Expand All @@ -119,8 +117,7 @@ abstract class ApplicationMasterBase {

// if a web monitor shall be started, set the port to random binding
if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0.
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
}

val (actorSystem, jmActor, archiveActor, webMonitor) =
Expand All @@ -147,7 +144,7 @@ abstract class ApplicationMasterBase {

// generate configuration file for TaskManagers
generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, akkaHostname,
jobManagerPort, webServerPort, logDirs, slots, taskManagerCount,
jobManagerPort, webServerPort, slots, taskManagerCount,
dynamicPropertiesEncodedString)

val hadoopConfig = new YarnConfiguration();
Expand Down Expand Up @@ -184,7 +181,6 @@ abstract class ApplicationMasterBase {
ownHostname: String,
jobManagerPort: Int,
jobManagerWebPort: Int,
logDirs: String,
slots: Int,
taskManagerCount: Int,
dynamicPropertiesEncodedString: String)
Expand All @@ -202,7 +198,6 @@ abstract class ApplicationMasterBase {
output.println(s"${ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY}: $ownHostname")
output.println(s"${ConfigConstants.JOB_MANAGER_IPC_PORT_KEY}: $jobManagerPort")

output.println(s"${ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY}: $logDirs")
output.println(s"${ConfigConstants.JOB_MANAGER_WEB_PORT_KEY}: $jobManagerWebPort")


Expand Down
Loading

0 comments on commit e711969

Please sign in to comment.