Skip to content

Commit

Permalink
[hotfix] [web-dashboard] Various dashboard code cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Dec 30, 2015
1 parent 72f9a6c commit 6f2d9d7
Show file tree
Hide file tree
Showing 44 changed files with 479 additions and 530 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocation
unsetAsContext();
System.setOut(originalOut);
System.setErr(originalErr);
System.err.println(baes);
System.out.println(baos);
}

String stdout = baos.toString();
String stderr = baes.toString();

throw new ProgramInvocationException(
"The program plan could not be fetched - the program aborted pre-maturely.\n"
+ "System.err: " + baes.toString() + '\n'
+ "System.out: " + baos.toString() + '\n');
"The program plan could not be fetched - the program aborted pre-maturely."
+ "\n\nSystem.err: " + (stdout.length() == 0 ? "(none)" : stdout)
+ "\n\nSystem.out: " + (stderr.length() == 0 ? "(none)" : stderr));
}
// ------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public final class ConfigConstants {
public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path";

/** Config parameter indicating whether jobs can be uploaded and run from the web-frontend. */
public static final String JOB_MANAGER_WEB_SUBMISSION_KEY = "jobmanager.web.submit.enable";
public static final String JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY = "jobmanager.web.submit.enable";

/** Flag to disable checkpoint stats. */
public static final String JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = "jobmanager.web.checkpoints.disable";
Expand Down Expand Up @@ -701,7 +701,7 @@ public final class ConfigConstants {
public static final int DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT = 5;

/** By default, submitting jobs from the web-frontend is allowed. */
public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMISSION = true;
public static final boolean DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED = true;

/** Default flag to disable checkpoint stats. */
public static final boolean DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,18 @@
* https://github.com/netty/netty/blob/netty-4.0.31.Final/example/src/main/java/io/netty/example/http/upload/HttpUploadServerHandler.java
*****************************************************************************/

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.QueryStringEncoder;
Expand All @@ -44,7 +49,11 @@
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;

import org.apache.flink.util.ExceptionUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.UUID;

/**
Expand All @@ -53,78 +62,124 @@
* If a file is required to be uploaded, it handles the upload, and in the http request to the
* next handler, passes the name of the file to the next handler.
*/
@ChannelHandler.Sharable
public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject> {

private HttpRequest request;

private boolean readingChunks;

private static final HttpDataFactory factory = new DefaultHttpDataFactory(true); // use disk

private String requestPath;
private static final Charset ENCODING = Charset.forName("UTF-8");

/** A decoder factory that always stores POST chunks on disk */
private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true);

private static final File TMP_DIR = new File(System.getProperty("java.io.tmpdir"));


private HttpRequest currentRequest;

private HttpPostRequestDecoder decoder;
private HttpPostRequestDecoder currentDecoder;

private String currentRequestPath;

private final File uploadDir;

/**
* The directory where files should be uploaded.
*/
public HttpRequestHandler(File uploadDir) {
this.uploadDir = uploadDir;
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
if (decoder != null) {
decoder.cleanFiles();
if (currentDecoder != null) {
currentDecoder.cleanFiles();
}
}

@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
request = (HttpRequest) msg;
requestPath = new QueryStringDecoder(request.getUri()).path();
if (request.getMethod() != HttpMethod.POST) {
ctx.fireChannelRead(request);
} else {
// try to decode the posted data now.
decoder = new HttpPostRequestDecoder(factory, request);
readingChunks = HttpHeaders.isTransferEncodingChunked(request);
if (readingChunks) {
readingChunks = true;
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
try {
if (msg instanceof HttpRequest) {
currentRequest = (HttpRequest) msg;
currentRequestPath = null;

if (currentDecoder != null) {
currentDecoder.destroy();
currentDecoder = null;
}

if (currentRequest.getMethod() == HttpMethod.GET || currentRequest.getMethod() == HttpMethod.DELETE) {
// directly delegate to the router
ctx.fireChannelRead(currentRequest);
}
else if (currentRequest.getMethod() == HttpMethod.POST) {
// POST comes in multiple objects. First the request, then the contents
// keep the request and path for the remaining objects of the POST request
currentRequestPath = new QueryStringDecoder(currentRequest.getUri()).path();
currentDecoder = new HttpPostRequestDecoder(DATA_FACTORY, currentRequest);
}
else {
throw new IOException("Unsupported HTTP method: " + currentRequest.getMethod().name());
}
}
} else if (decoder != null && msg instanceof HttpContent) {
// New chunk is received
HttpContent chunk = (HttpContent) msg;
decoder.offer(chunk);
try {
while (decoder.hasNext()) {
InterfaceHttpData data = decoder.next();
// IF SOMETHING EVER NEEDS POST PARAMETERS, THIS WILL BE THE PLACE TO HANDLE IT
// all fields values will be passed with type Attribute.
if (data.getHttpDataType() == HttpDataType.FileUpload) {
DiskFileUpload file = (DiskFileUpload) data;
if (file.isCompleted()) {
String newName = UUID.randomUUID() + "_" + file.getFilename();
file.renameTo(new File(uploadDir, newName));
QueryStringEncoder encoder = new QueryStringEncoder(requestPath);
encoder.addParam("file", newName);
request.setUri(encoder.toString());
else if (currentDecoder != null && msg instanceof HttpContent) {
// received new chunk, give it to the current decoder
HttpContent chunk = (HttpContent) msg;
currentDecoder.offer(chunk);

try {
while (currentDecoder.hasNext()) {
InterfaceHttpData data = currentDecoder.next();

// IF SOMETHING EVER NEEDS POST PARAMETERS, THIS WILL BE THE PLACE TO HANDLE IT
// all fields values will be passed with type Attribute.

if (data.getHttpDataType() == HttpDataType.FileUpload) {
DiskFileUpload file = (DiskFileUpload) data;
if (file.isCompleted()) {
String name = file.getFilename();

File target = new File(TMP_DIR, UUID.randomUUID() + "_" + name);
file.renameTo(target);

QueryStringEncoder encoder = new QueryStringEncoder(currentRequestPath);
encoder.addParam("filepath", target.getAbsolutePath());
encoder.addParam("filename", name);

currentRequest.setUri(encoder.toString());
}
}

data.release();
}
data.release();
}
} catch (EndOfDataDecoderException e) {
//
catch (EndOfDataDecoderException ignored) {}

if (chunk instanceof LastHttpContent) {
HttpRequest request = currentRequest;
currentRequest = null;
currentRequestPath = null;

currentDecoder.destroy();
currentDecoder = null;

// fire next channel handler
ctx.fireChannelRead(request);
}
}
if (chunk instanceof LastHttpContent) {
readingChunks = false;
decoder.destroy();
decoder = null;
ctx.fireChannelRead(request);
}
catch (Throwable t) {
currentRequest = null;
currentRequestPath = null;

if (currentDecoder != null) {
currentDecoder.destroy();
currentDecoder = null;
}

if (ctx.channel().isActive()) {
byte[] bytes = ExceptionUtils.stringifyException(t).getBytes(ENCODING);

DefaultFullHttpResponse response = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR,
Unpooled.wrappedBuffer(bytes));

response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());

ctx.writeAndFlush(response);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.webmonitor;

import com.fasterxml.jackson.core.JsonGenerator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -27,56 +26,47 @@
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.StringWriter;
import org.apache.flink.util.ExceptionUtils;

import org.slf4j.Logger;

/**
* This is the last handler in the pipeline and logs all error messages.
* This is the last handler in the pipeline. It logs all error messages and sends exception
* responses.
*/
@ChannelHandler.Sharable
public class PipelineErrorHandler extends SimpleChannelInboundHandler<Object> {

private static final Logger LOG = LoggerFactory.getLogger(PipelineErrorHandler.class);
/** The logger to which the handler writes the log statements */
private final Logger logger;

public PipelineErrorHandler(Logger logger) {
this.logger = logger;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object message) {
// we can't deal with this message. No one in the pipeline handled it. Log it.
LOG.debug("Unknown message received: {}", message);
logger.debug("Unknown message received: {}", message);
sendError(ctx, "Unknown message received.");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.debug("Unhandled exception: {}", cause);
sendError(ctx, cause.getMessage());
logger.debug("Unhandled exception: {}", cause);
sendError(ctx, ExceptionUtils.stringifyException(cause));
}

private void sendError(ChannelHandlerContext ctx, String error) {
DefaultFullHttpResponse response;
StringWriter writer = new StringWriter();
try {
JsonGenerator gen = JsonFactory.jacksonFactory.createJsonGenerator(writer);
gen.writeStartObject();
gen.writeStringField("error", error);
gen.writeEndObject();
gen.close();
// send a bad request status code.
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(writer.toString().getBytes()));
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
} catch (IOException e) {
// seriously? Let's just send some plain text.
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.BAD_REQUEST, Unpooled.wrappedBuffer(error.getBytes()));
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
}
response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
if (ctx.channel().isActive()) {
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(error.getBytes()));

response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());

ctx.writeAndFlush(response);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.router.KeepAliveWrite;
import io.netty.handler.codec.http.router.Routed;

import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.util.ExceptionUtils;

import scala.Option;
import scala.Tuple2;
import scala.concurrent.Await;
Expand All @@ -58,6 +60,9 @@ public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> {

private static final Charset ENCODING = Charset.forName("UTF-8");

public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";


private final RequestHandler handler;

private final JobManagerRetriever retriever;
Expand All @@ -66,11 +71,8 @@ public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> {

private final FiniteDuration timeout;

private final String contentType;

private String localJobManagerAddress;

public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address";


public RuntimeMonitorHandler(
RequestHandler handler,
Expand All @@ -82,7 +84,6 @@ public RuntimeMonitorHandler(
this.retriever = checkNotNull(retriever);
this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture);
this.timeout = checkNotNull(timeout);
this.contentType = (handler instanceof RequestHandler.JsonResponse) ? "application/json" : "text/plain";
}

@Override
Expand Down Expand Up @@ -134,7 +135,7 @@ private void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGate
HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));

response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, contentType);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json");
}
catch (NotFoundException e) {
// this should result in a 404 error code (not found)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,10 @@ public int getWebFrontendPort() {
public long getRefreshInterval() {
return config.getLong(JOB_MANAGER_WEB_REFRESH_INTERVAL_KEY, DEFAULT_JOB_MANAGER_WEB_REFRESH_INTERVAL);
}

public boolean isProgramSubmitEnabled() {
return config.getBoolean(
ConfigConstants.JOB_MANAGER_WEB_SUBMIT_ENABLED_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_SUBMIT_ENABLED);
}
}
Loading

0 comments on commit 6f2d9d7

Please sign in to comment.