Skip to content

Commit

Permalink
[FLINK-2731][web-dashboard] add access to JobManager stdout and logs
Browse files Browse the repository at this point in the history
This closes apache#1233.
  • Loading branch information
sachingoel0101 authored and mxm committed Oct 19, 2015
1 parent da071bc commit 03579bb
Show file tree
Hide file tree
Showing 20 changed files with 347 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorSystem;

import com.google.common.io.PatternFilenameFilter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
Expand All @@ -30,10 +30,11 @@
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Router;

import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
Expand Down Expand Up @@ -63,8 +64,10 @@
import scala.concurrent.duration.FiniteDuration;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -87,6 +90,12 @@ public class WebRuntimeMonitor implements WebMonitor {
/** Logger for web frontend startup / shutdown messages */
private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);

/** Job manager's log file pattern */
public static final FilenameFilter LOG_FILE_PATTERN = new PatternFilenameFilter(".*jobmanager[^\\.]*\\.log");

/** Job manager's stdout file pattern */
public static final FilenameFilter STDOUT_FILE_PATTERN = new PatternFilenameFilter(".*jobmanager[^\\.]*\\.out");

// ------------------------------------------------------------------------

/** Guarding concurrent modifications to the server channel pipeline during startup and shutdown */
Expand Down Expand Up @@ -123,6 +132,39 @@ public WebRuntimeMonitor(
String fileName = String.format("flink-web-%s", UUID.randomUUID().toString());
webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName);
LOG.info("Using directory {} for the web interface files", webRootDir);

// figure out where our logs are
final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
final String defaultLogDirectory = flinkRoot + "/log";
final String logDirectories = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, defaultLogDirectory);

// find out which directory holds the path for log and stdout
final ArrayList<String> logPaths = new ArrayList<>();
final ArrayList<String> outPaths = new ArrayList<>();

// yarn allows for multiple log directories. Search in all.
for(String paths: logDirectories.split(",")) {
File dir = new File(paths);
if (dir.exists() && dir.isDirectory() && dir.canRead()) {
if (dir.listFiles(LOG_FILE_PATTERN).length == 1) {
logPaths.add(paths);
}
if (dir.listFiles(STDOUT_FILE_PATTERN).length == 1) {
outPaths.add(paths);
}
}
}

// we don't want any ambiguities. There must be only one log and out file.
if(logPaths.size() != 1 || outPaths.size() != 1) {
throw new IllegalConfigurationException("The path to the log and out files (" +
logDirectories + ") is not valid.");
}

final File logDir = new File(logPaths.get(0));
final File outDir = new File(outPaths.get(0));
LOG.info("Serving job manager logs from {}", logDir.getAbsolutePath());
LOG.info("Serving job manager stdout from {}", outDir.getAbsolutePath());

// port configuration
this.configuredPort = cfg.getWebFrontendPort();
Expand All @@ -144,8 +186,10 @@ public WebRuntimeMonitor(
// the overview - how many task managers, slots, free slots, ...
.GET("/overview", handler(new ClusterOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT)))

// job manager configuration
// job manager configuration, log and stdout
.GET("/jobmanager/config", handler(new JobManagerConfigHandler(config)))
.GET("/jobmanager/log", new StaticFileServerHandler(logDir))
.GET("/jobmanager/stdout", new StaticFileServerHandler(outDir))

// overview over jobs
.GET("/joboverview", handler(new CurrentJobsOverviewHandler(retriever, DEFAULT_REQUEST_TIMEOUT, true, true)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ private MimeTypes() {}
MIME_MAP.put("htm", "text/html");
MIME_MAP.put("css", "text/css");
MIME_MAP.put("txt", "text/plain");
MIME_MAP.put("log", "text/plain");
MIME_MAP.put("out", "text/plain");
MIME_MAP.put("err", "text/plain");
MIME_MAP.put("xml", "text/xml");
MIME_MAP.put("csv", "text/csv");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@
import io.netty.handler.codec.http.router.Routed;
import io.netty.util.CharsetUtil;

import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.io.FilenameFilter;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -130,9 +132,11 @@ public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Except
requestPath = requestPath + "index.html";
}

// convert file separators.
if (File.separatorChar != '/') {
requestPath = requestPath.replace('/', File.separatorChar);
// in case the files being accessed are logs or stdout files, find appropriate paths.
if (requestPath.equals("/jobmanager/log")) {
requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.LOG_FILE_PATTERN);
} else if (requestPath.equals("/jobmanager/stdout")) {
requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.STDOUT_FILE_PATTERN);
}

// convert to absolute path
Expand Down Expand Up @@ -178,7 +182,7 @@ public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Except
return;
}
}

if (logger.isDebugEnabled()) {
logger.debug("Responding with file '" + file.getAbsolutePath() + '\'');
}
Expand All @@ -195,7 +199,11 @@ public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Except

HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
setContentTypeHeader(response, file);
setDateAndCacheHeaders(response, file);

// since the log and out files are rapidly changing, we don't want to browser to cache them
if (!(requestPath.contains("log") || requestPath.contains("out"))) {
setDateAndCacheHeaders(response, file);
}
if (HttpHeaders.isKeepAlive(request)) {
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
Expand Down Expand Up @@ -300,4 +308,9 @@ private static void setContentTypeHeader(HttpResponse response, File file) {
String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType();
response.headers().set(CONTENT_TYPE, mimeFinal);
}

private static String getFileName(File directory, FilenameFilter pattern) {
File[] files = directory.listFiles(pattern);
return files.length == 0 ? null : files[0].getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional
li(ui-sref-active='active')
a(ui-sref=".config") Configuration
li(ui-sref-active='active')
a(ui-sref=".logfile") Logs
a(ui-sref=".log") Logs
li(ui-sref-active='active')
a(ui-sref=".stdout") Stdout

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,24 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
table.table.table-properties
thead
tr
th(colspan="2")
.row
.col-xs-10
| Job Manager Logs
.col-xs-1.text-right
a(ng-click="reloadData()" class="show-pointer")
i.fa.fa-refresh
.col-xs-1.text-left
a(href="jobmanager/log")
i.fa.fa-download

tbody
tr
td(colspan="2")
pre
| {{jobmanager.log}}

Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,25 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
table.table.table-properties
thead
tr
th(colspan="2")
.row
.col-xs-10
| Job Manager Output
.col-xs-1.text-right
a(ng-click="reloadData()" class="show-pointer")
i.fa.fa-refresh
.col-xs-1.text-left
a(href="jobmanager/stdout")
i.fa.fa-download

tbody
tr
td(colspan="2")
pre
| {{jobmanager.stdout}}

8 changes: 5 additions & 3 deletions flink-runtime-web/web-dashboard/app/scripts/index.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,14 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
views:
details:
templateUrl: "partials/jobmanager/stdout.html"
controller: 'JobManagerStdoutController'

.state "jobmanager.logfile",
url: "/logfile"
.state "jobmanager.log",
url: "/log"
views:
details:
templateUrl: "partials/jobmanager/logfile.html"
templateUrl: "partials/jobmanager/log.html"
controller: 'JobManagerLogsController'

$urlRouterProvider.otherwise "/overview"

Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,23 @@ angular.module('flinkApp')
if !$scope.jobmanager?
$scope.jobmanager = {}
$scope.jobmanager['config'] = data

.controller 'JobManagerLogsController', ($scope, JobManagerLogsService) ->
JobManagerLogsService.loadLogs().then (data) ->
if !$scope.jobmanager?
$scope.jobmanager = {}
$scope.jobmanager['log'] = data

$scope.reloadData = () ->
JobManagerLogsService.loadLogs().then (data) ->
$scope.jobmanager['log'] = data

.controller 'JobManagerStdoutController', ($scope, JobManagerStdoutService) ->
JobManagerStdoutService.loadStdout().then (data) ->
if !$scope.jobmanager?
$scope.jobmanager = {}
$scope.jobmanager['stdout'] = data

$scope.reloadData = () ->
JobManagerStdoutService.loadStdout().then (data) ->
$scope.jobmanager['stdout'] = data
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,33 @@ angular.module('flinkApp')
deferred.promise

@

.service 'JobManagerLogsService', ($http, flinkConfig, $q) ->
logs = {}

@loadLogs = ->
deferred = $q.defer()

$http.get("jobmanager/log")
.success (data, status, headers, config) ->
logs = data
deferred.resolve(data)

deferred.promise

@

.service 'JobManagerStdoutService', ($http, flinkConfig, $q) ->
stdout = {}

@loadStdout = ->
deferred = $q.defer()

$http.get("jobmanager/stdout")
.success (data, status, headers, config) ->
stdout = data
deferred.resolve(data)

deferred.promise

@
5 changes: 5 additions & 0 deletions flink-runtime-web/web-dashboard/app/styles/index.styl
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,8 @@ livechart
#non-heap-mem
background-color: #90ed7d

a.show-pointer
cursor: pointer



3 changes: 3 additions & 0 deletions flink-runtime-web/web-dashboard/web/css/index.css
Original file line number Diff line number Diff line change
Expand Up @@ -591,3 +591,6 @@ svg.graph .node-label {
#non-heap-mem {
background-color: #90ed7d;
}
a.show-pointer {
cursor: pointer;
}
Loading

0 comments on commit 03579bb

Please sign in to comment.