Skip to content

Commit

Permalink
[FLINK-16303][rest] Enable retrieval of custom JobManager log files
Browse files Browse the repository at this point in the history
This closes apache#11542.
  • Loading branch information
jerry-024 authored and GJL committed Apr 13, 2020
1 parent 531d6bf commit 53755ec
Show file tree
Hide file tree
Showing 35 changed files with 910 additions and 226 deletions.
64 changes: 60 additions & 4 deletions docs/_includes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,62 @@
</tr>
</tbody>
</table>
<table class="table table-bordered">
<tbody>
<tr>
<td class="text-left" colspan="2"><h5><strong>/jobmanager/logs</strong></h5></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
<td class="text-left">Response code: <code>200 OK</code></td>
</tr>
<tr>
<td colspan="2">Returns the list of log files on the JobManager.</td>
</tr>
<tr>
<td colspan="2">
<button data-toggle="collapse" data-target="#6742555">Request</button>
<div id="6742555" class="collapse">
<pre>
<code>
{} </code>
</pre>
</div>
</td>
</tr>
<tr>
<td colspan="2">
<button data-toggle="collapse" data-target="#1230246947">Response</button>
<div id="1230246947" class="collapse">
<pre>
<code>
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogListInfo",
"properties" : {
"logs" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
"properties" : {
"name" : {
"type" : "string"
},
"size" : {
"type" : "integer"
}
}
}
}
}
} </code>
</pre>
</div>
</td>
</tr>
</tbody>
</table>
<table class="table table-bordered">
<tbody>
<tr>
Expand Down Expand Up @@ -4126,19 +4182,19 @@
</tr>
<tr>
<td colspan="2">
<button data-toggle="collapse" data-target="#30156062">Response</button>
<div id="30156062" class="collapse">
<button data-toggle="collapse" data-target="#639163524">Response</button>
<div id="639163524" class="collapse">
<pre>
<code>
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:LogListInfo",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogListInfo",
"properties" : {
"logs" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:LogInfo",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
"properties" : {
"name" : {
"type" : "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,19 @@ public void getLogAndStdoutFiles() throws Exception {
assertThat(logs, containsString("job manager out"));
}

@Test
public void getCustomLogFiles() throws Exception {
WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);

String customFileName = "test.log";
final String logDir = logFiles.logFile.getParent();
final String expectedLogContent = "job manager custom log";
FileUtils.writeStringToFile(new File(logDir, customFileName), expectedLogContent);

String logs = TestBaseUtils.getFromHTTP("http:https://localhost:" + getRestPort() + "/jobmanager/logs/" + customFileName);
assertThat(logs, containsString(expectedLogContent));
}

@Test
public void getTaskManagerLogAndStdoutFiles() throws Exception {
String json = TestBaseUtils.getFromHTTP("http:https://localhost:" + getRestPort() + "/taskmanagers/");
Expand Down
39 changes: 37 additions & 2 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,41 @@
}
}
}
}, {
"url" : "/jobmanager/logs",
"method" : "GET",
"status-code" : "200 OK",
"file-upload" : false,
"path-parameters" : {
"pathParameters" : [ ]
},
"query-parameters" : {
"queryParameters" : [ ]
},
"request" : {
"type" : "any"
},
"response" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogListInfo",
"properties" : {
"logs" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
"properties" : {
"name" : {
"type" : "string"
},
"size" : {
"type" : "integer"
}
}
}
}
}
}
}, {
"url" : "/jobmanager/metrics",
"method" : "GET",
Expand Down Expand Up @@ -2702,13 +2737,13 @@
},
"response" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:LogListInfo",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogListInfo",
"properties" : {
"logs" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:LogInfo",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo",
"properties" : {
"name" : {
"type" : "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rest.messages.taskmanager.LogInfo;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rest.messages.taskmanager.LogInfo;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.cluster;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.AbstractHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FlinkException;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

import javax.annotation.Nullable;

import java.io.File;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

/**
* Base class for serving files from the JobManager.
*/
public abstract class AbstractJobManagerFileHandler<M extends MessageParameters> extends AbstractHandler<RestfulGateway, EmptyRequestBody, M> {

protected AbstractJobManagerFileHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
UntypedResponseMessageHeaders<EmptyRequestBody, M> messageHeaders) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);
}

@Override
protected CompletableFuture<Void> respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest<EmptyRequestBody, M> handlerRequest, RestfulGateway gateway) {
File file = getFile(handlerRequest);
if (file != null && file.exists()) {
try {
HandlerUtils.transferFile(
ctx,
file,
httpRequest);
} catch (FlinkException e) {
throw new CompletionException(new FlinkException("Could not transfer file to client.", e));
}
return CompletableFuture.completedFuture(null);
} else {
return HandlerUtils.sendErrorResponse(
ctx,
httpRequest,
new ErrorResponseBody("This file does not exist in JobManager log dir."),
HttpResponseStatus.NOT_FOUND,
Collections.emptyMap());
}
}

@Nullable
protected abstract File getFile(HandlerRequest<EmptyRequestBody, M> handlerRequest);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.cluster;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.LogFileNamePathParameter;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.cluster.FileMessageParameters;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nullable;

import java.io.File;
import java.util.Map;

/**
* Rest handler which serves the custom log file from JobManager.
*/
public class JobManagerCustomLogHandler extends AbstractJobManagerFileHandler<FileMessageParameters> {

private File logDir;

public JobManagerCustomLogHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout, Map<String, String> responseHeaders,
UntypedResponseMessageHeaders<EmptyRequestBody, FileMessageParameters> messageHeaders,
@Nullable File logDir) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);

this.logDir = logDir;
}

@Override
protected File getFile(HandlerRequest<EmptyRequestBody, FileMessageParameters> handlerRequest) {
if (logDir == null) {
return null;
}
String filename = handlerRequest.getPathParameter(LogFileNamePathParameter.class);
return new File(logDir, filename);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.cluster;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nullable;

import java.io.File;
import java.util.Map;

/**
* Rest handler which serves the log files from JobManager.
*/
public class JobManagerLogFileHandler extends AbstractJobManagerFileHandler<EmptyMessageParameters> {

private final File file;

public JobManagerLogFileHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout, Map<String, String> responseHeaders,
UntypedResponseMessageHeaders<EmptyRequestBody, EmptyMessageParameters> messageHeaders,
@Nullable File file) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders);

this.file = file;
}

@Override
protected File getFile(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> handlerRequest) {
return file;
}
}
Loading

0 comments on commit 53755ec

Please sign in to comment.