From 1f47c0ac27ad42d3c4b066914a351e23a6cacbef Mon Sep 17 00:00:00 2001 From: zentol Date: Thu, 3 Jan 2019 17:29:56 +0100 Subject: [PATCH] [hotfix][rest] Centralize REST error logging --- .../runtime/rest/handler/AbstractHandler.java | 42 +++++++++++++------ .../rest/handler/AbstractRestHandler.java | 28 +------------ .../AbstractTaskManagerFileHandler.java | 25 ++++------- 3 files changed, 40 insertions(+), 55 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java index a87d3ada5eb8b..cc9f355471085 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractHandler.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException; @@ -50,6 +51,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -157,33 +159,49 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe final FileUploads finalUploadedFiles = uploadedFiles; requestProcessingFuture .whenComplete((Void ignored, Throwable throwable) -> { - inFlightRequestTracker.deregisterRequest(); - cleanupFileUploads(finalUploadedFiles); + if (throwable != null) { + handleException(ExceptionUtils.stripCompletionException(throwable), ctx, httpRequest) + .whenComplete((Void ignored2, Throwable throwable2) -> finalizeRequestProcessing(finalUploadedFiles)); + } else { + finalizeRequestProcessing(finalUploadedFiles); + } }); - } catch (RestHandlerException rhe) { - inFlightRequestTracker.deregisterRequest(); + } catch (Throwable e) { + final FileUploads finalUploadedFiles = uploadedFiles; + handleException(e, ctx, httpRequest) + .whenComplete((Void ignored, Throwable throwable) -> finalizeRequestProcessing(finalUploadedFiles)); + } + } + + private void finalizeRequestProcessing(FileUploads uploadedFiles) { + inFlightRequestTracker.deregisterRequest(); + cleanupFileUploads(uploadedFiles); + } + + private CompletableFuture handleException(Throwable throwable, ChannelHandlerContext ctx, HttpRequest httpRequest) { + if (throwable instanceof RestHandlerException) { + RestHandlerException rhe = (RestHandlerException) throwable; if (log.isDebugEnabled()) { log.error("Exception occurred in REST handler.", rhe); } else { log.error("Exception occurred in REST handler: {}", rhe.getMessage()); } - HandlerUtils.sendErrorResponse( + return HandlerUtils.sendErrorResponse( ctx, httpRequest, new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus(), responseHeaders); - cleanupFileUploads(uploadedFiles); - } catch (Throwable e) { - inFlightRequestTracker.deregisterRequest(); - log.error("Request processing failed.", e); - HandlerUtils.sendErrorResponse( + } else { + log.error("Implementation error: Unhandled exception.", throwable); + String stackTrace = String.format("", + ExceptionUtils.stringifyException(throwable)); + return HandlerUtils.sendErrorResponse( ctx, httpRequest, - new ErrorResponseBody("Internal server error."), + new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)), HttpResponseStatus.INTERNAL_SERVER_ERROR, responseHeaders); - cleanupFileUploads(uploadedFiles); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java index d88abaabc66aa..88a0c5e4cef0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java @@ -19,17 +19,14 @@ package org.apache.flink.runtime.rest.handler; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rest.handler.util.HandlerUtils; -import org.apache.flink.runtime.rest.messages.ErrorResponseBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.MessageParameters; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; @@ -39,7 +36,6 @@ import javax.annotation.Nonnull; -import java.util.Arrays; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -79,29 +75,7 @@ protected CompletableFuture respondToRequest(ChannelHandlerContext ctx, Ht response = FutureUtils.completedExceptionally(e); } - return response.handle((resp, throwable) -> throwable != null ? - errorResponse(throwable) : Tuple2.of(resp, messageHeaders.getResponseStatusCode())) - .thenCompose(r -> HandlerUtils.sendResponse(ctx, httpRequest, r.f0, r.f1, responseHeaders)); - } - - private Tuple2 errorResponse(Throwable throwable) { - Throwable error = ExceptionUtils.stripCompletionException(throwable); - if (error instanceof RestHandlerException) { - final RestHandlerException rhe = (RestHandlerException) error; - if (log.isDebugEnabled()) { - log.error("Exception occurred in REST handler.", rhe); - } else { - log.error("Exception occurred in REST handler: {}", rhe.getMessage()); - } - return Tuple2.of(new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus()); - } else { - log.error("Implementation error: Unhandled exception.", error); - String stackTrace = String.format("", - ExceptionUtils.stringifyException(throwable)); - return Tuple2.of( - new ErrorResponseBody(Arrays.asList("Internal server error.", stackTrace)), - HttpResponseStatus.INTERNAL_SERVER_ERROR); - } + return response.thenAccept(resp -> HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode(), responseHeaders)); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java index 82ca82cdfec9b..c2d7a284d01bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java @@ -24,12 +24,11 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException; +import org.apache.flink.runtime.rest.NotFoundException; import org.apache.flink.runtime.rest.handler.AbstractHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; -import org.apache.flink.runtime.rest.handler.util.HandlerUtils; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; -import org.apache.flink.runtime.rest.messages.ErrorResponseBody; import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; @@ -75,7 +74,6 @@ import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; @@ -159,23 +157,18 @@ protected CompletableFuture respondToRequest(ChannelHandlerContext ctx, Ht fileBlobKeys.invalidate(taskManagerId); final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); - final ErrorResponseBody errorResponseBody; - final HttpResponseStatus httpResponseStatus; if (strippedThrowable instanceof UnknownTaskExecutorException) { - errorResponseBody = new ErrorResponseBody("Unknown TaskExecutor " + taskManagerId + '.'); - httpResponseStatus = HttpResponseStatus.NOT_FOUND; + throw new CompletionException( + new NotFoundException( + String.format("Failed to transfer file from TaskExecutor %s because it was unknown.", taskManagerId), + strippedThrowable)); } else { - errorResponseBody = new ErrorResponseBody("Internal server error: " + throwable.getMessage() + '.'); - httpResponseStatus = INTERNAL_SERVER_ERROR; + throw new CompletionException( + new FlinkException( + String.format("Failed to transfer file from TaskExecutor %s.", taskManagerId), + strippedThrowable)); } - - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - errorResponseBody, - httpResponseStatus, - responseHeaders); } }); }