From a99fc3ffbf95910dd8fad6833852ca5eed3f1896 Mon Sep 17 00:00:00 2001 From: Yangze Guo Date: Thu, 12 May 2022 12:19:44 +0800 Subject: [PATCH] [FLINK-28312][rest] Introduce REST APIs for log URL retrieval This closes #20179. --- .../history_server_configuration.html | 12 ++ .../generated/rest_v1_dispatcher.html | 141 ++++++++++++++++++ docs/static/generated/rest_v1_dispatcher.yml | 47 ++++++ .../configuration/HistoryServerOptions.java | 23 +++ .../webmonitor/history/HistoryServer.java | 24 +++ .../src/test/resources/rest_api_v1.snapshot | 52 +++++++ .../handler/job/GeneratedLogUrlHandler.java | 88 +++++++++++ .../messages/JobManagerLogUrlHeaders.java | 72 +++++++++ .../JobTaskManagerMessageParameters.java | 35 +++++ .../runtime/rest/messages/LogUrlResponse.java | 55 +++++++ .../messages/TaskManagerLogUrlHeaders.java | 79 ++++++++++ .../webmonitor/WebMonitorEndpoint.java | 13 ++ .../job/GeneratedLogUrlHandlerTest.java | 52 +++++++ .../rest/messages/LogUrlResponseTest.java | 32 ++++ 14 files changed, 725 insertions(+) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/GeneratedLogUrlHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerLogUrlHeaders.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTaskManagerMessageParameters.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/LogUrlResponse.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerLogUrlHeaders.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/GeneratedLogUrlHandlerTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/LogUrlResponseTest.java diff --git a/docs/layouts/shortcodes/generated/history_server_configuration.html b/docs/layouts/shortcodes/generated/history_server_configuration.html index 63dee62c1af3d..eb5cd6fbc59fb 100644 --- a/docs/layouts/shortcodes/generated/history_server_configuration.html +++ b/docs/layouts/shortcodes/generated/history_server_configuration.html @@ -32,6 +32,18 @@ Integer The maximum number of jobs to retain in each archive directory defined by `historyserver.archive.fs.dir`. If set to `-1`(default), there is no limit to the number of archives. If set to `0` or less than `-1` HistoryServer will throw an IllegalConfigurationException. + +
historyserver.log.jobmanager.url-pattern
+ (none) + String + Pattern of the log URL of JobManager. The HistoryServer will generate actual URLs from it, with replacing the special placeholders, `<jobid>`, to the id of job. + + +
historyserver.log.taskmanager.url-pattern
+ (none) + String + Pattern of the log URL of TaskManager. The HistoryServer will generate actual URLs from it, with replacing the special placeholders, `<jobid>` and `<tmid>`, to the id of job and TaskManager respectively. +
historyserver.web.address
(none) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index 3fb81e43024ab..8d5e8191415cb 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -3203,6 +3203,76 @@ + + + + + + + + + + + + + + + + + + + + + + + + + +
/jobs/:jobid/jobmanager/log-url
Verb: GETResponse code: 200 OK
Returns the log url of jobmanager of a specific job.
Path parameters
+
    +
  • jobid - 32-character hexadecimal string value that identifies a job.
  • +
+
+
+ +
+
+
+ +
+
@@ -3853,6 +3923,77 @@
+ + + + + + + + + + + + + + + + + + + + + + + + + +
/jobs/:jobid/taskmanagers/:taskmanagerid/log-url
Verb: GETResponse code: 200 OK
Returns the log url of jobmanager of a specific job.
Path parameters
+
    +
  • jobid - 32-character hexadecimal string value that identifies a job.
  • +
  • taskmanagerid - 32-character hexadecimal string that identifies a task manager.
  • +
+
+
+ +
+
+
+ +
+
diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index b495a75290b2f..2d8a79402ef30 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -677,6 +677,24 @@ paths: application/json: schema: $ref: '#/components/schemas/EnvironmentInfo' + /jobs/{jobid}/jobmanager/log-url: + get: + description: Returns the log url of jobmanager of a specific job. + operationId: getJobManagerLogUrl + parameters: + - name: jobid + in: path + description: 32-character hexadecimal string value that identifies a job. + required: true + schema: + $ref: '#/components/schemas/JobID' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/LogUrlResponse' /jobs/{jobid}/metrics: get: description: Provides access to job metrics. @@ -865,6 +883,30 @@ paths: application/json: schema: $ref: '#/components/schemas/TriggerResponse' + /jobs/{jobid}/taskmanagers/{taskmanagerid}/log-url: + get: + description: Returns the log url of jobmanager of a specific job. + operationId: getTaskManagerLogUrl + parameters: + - name: jobid + in: path + description: 32-character hexadecimal string value that identifies a job. + required: true + schema: + $ref: '#/components/schemas/JobID' + - name: taskmanagerid + in: path + description: 32-character hexadecimal string that identifies a task manager. + required: true + schema: + $ref: '#/components/schemas/ResourceID' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/LogUrlResponse' /jobs/{jobid}/vertices/{vertexid}: get: description: "Returns details for a task, with a summary for each of its subtasks." @@ -1916,6 +1958,11 @@ components: properties: status: $ref: '#/components/schemas/JobStatus' + LogUrlResponse: + type: object + properties: + url: + type: string JobAccumulator: type: object Id: diff --git a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java index d2968e70c30d7..0af6f44b9f383 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java @@ -60,6 +60,29 @@ public class HistoryServerOptions { + " that are no longer present `%s`.", HISTORY_SERVER_ARCHIVE_DIRS.key())); + /** + * Pattern of the log URL of TaskManager. The HistoryServer will generate actual URLs from it. + */ + public static final ConfigOption HISTORY_SERVER_TASKMANAGER_LOG_URL_PATTERN = + key("historyserver.log.taskmanager.url-pattern") + .stringType() + .noDefaultValue() + .withDescription( + "Pattern of the log URL of TaskManager. The HistoryServer will generate actual URLs from it," + + " with replacing the special placeholders, `` and ``, to the id of job" + + " and TaskManager respectively."); + + /** + * Pattern of the log URL of JobManager. The HistoryServer will generate actual URLs from it. + */ + public static final ConfigOption HISTORY_SERVER_JOBMANAGER_LOG_URL_PATTERN = + key("historyserver.log.jobmanager.url-pattern") + .stringType() + .noDefaultValue() + .withDescription( + "Pattern of the log URL of JobManager. The HistoryServer will generate actual URLs from it," + + " with replacing the special placeholders, ``, to the id of job."); + /** The local directory used by the HistoryServer web-frontend. */ public static final ConfigOption HISTORY_SERVER_WEB_DIR = key("historyserver.web.tmpdir") diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index cd4ab2d30716b..b9a14f8072276 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -30,8 +30,11 @@ import org.apache.flink.runtime.history.FsJobArchivist; import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory; import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.rest.handler.job.GeneratedLogUrlHandler; import org.apache.flink.runtime.rest.handler.router.Router; import org.apache.flink.runtime.rest.messages.DashboardConfiguration; +import org.apache.flink.runtime.rest.messages.JobManagerLogUrlHeaders; +import org.apache.flink.runtime.rest.messages.TaskManagerLogUrlHeaders; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; import org.apache.flink.runtime.util.EnvironmentInformation; @@ -44,6 +47,7 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ShutdownHookUtil; +import org.apache.flink.util.StringUtils; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -63,6 +67,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -285,6 +290,25 @@ void start() throws IOException, InterruptedException { LOG.info("Using directory {} as local cache.", webDir); Router router = new Router(); + + final String jobManagerPattern = + config.get(HistoryServerOptions.HISTORY_SERVER_JOBMANAGER_LOG_URL_PATTERN); + if (!StringUtils.isNullOrWhitespaceOnly(jobManagerPattern)) { + router.addGet( + JobManagerLogUrlHeaders.getInstance().getTargetRestEndpointURL(), + new GeneratedLogUrlHandler( + CompletableFuture.completedFuture(jobManagerPattern))); + } + + final String taskManagerPattern = + config.get(HistoryServerOptions.HISTORY_SERVER_TASKMANAGER_LOG_URL_PATTERN); + if (!StringUtils.isNullOrWhitespaceOnly(taskManagerPattern)) { + router.addGet( + TaskManagerLogUrlHeaders.getInstance().getTargetRestEndpointURL(), + new GeneratedLogUrlHandler( + CompletableFuture.completedFuture(taskManagerPattern))); + } + router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir)); createDashboardConfigFile(); diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 2de7c030c29b1..f0e069087afda 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -1979,6 +1979,31 @@ } } } + }, { + "url" : "/jobs/:jobid/jobmanager/log-url", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "any" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogUrlResponse", + "properties" : { + "url" : { + "type" : "string" + } + } + } }, { "url" : "/jobs/:jobid/metrics", "method" : "GET", @@ -2238,6 +2263,33 @@ } } } + }, { + "url" : "/jobs/:jobid/taskmanagers/:taskmanagerid/log-url", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + }, { + "key" : "taskmanagerid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "any" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogUrlResponse", + "properties" : { + "url" : { + "type" : "string" + } + } + } }, { "url" : "/jobs/:jobid/vertices/:vertexid", "method" : "GET", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/GeneratedLogUrlHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/GeneratedLogUrlHandler.java new file mode 100644 index 0000000000000..f4d5af81dbfda --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/GeneratedLogUrlHandler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.rest.handler.router.RoutedRequest; +import org.apache.flink.runtime.rest.handler.util.HandlerUtils; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.LogUrlResponse; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** Handler for retrieving the log url of a specified TaskManager or JobManager. */ +@ChannelHandler.Sharable +public class GeneratedLogUrlHandler extends SimpleChannelInboundHandler> { + + private final CompletableFuture patternFuture; + + public GeneratedLogUrlHandler(CompletableFuture patternFuture) { + this.patternFuture = patternFuture; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, RoutedRequest routedRequest) + throws Exception { + try { + Map pathParams = routedRequest.getRouteResult().pathParams(); + final String taskManagerId = pathParams.get(TaskManagerIdPathParameter.KEY); + final String jobId = Preconditions.checkNotNull(pathParams.get(JobIDPathParameter.KEY)); + final LogUrlResponse response = + new LogUrlResponse( + generateLogUrl( + patternFuture.get(5, TimeUnit.MILLISECONDS), + jobId, + taskManagerId)); + + HandlerUtils.sendResponse( + ctx, + routedRequest.getRequest(), + response, + HttpResponseStatus.OK, + Collections.emptyMap()); + } catch (Exception e) { + HandlerUtils.sendErrorResponse( + ctx, + routedRequest.getRequest(), + new ErrorResponseBody(e.getMessage()), + HttpResponseStatus.INTERNAL_SERVER_ERROR, + Collections.emptyMap()); + } + } + + @VisibleForTesting + static String generateLogUrl(String pattern, String jobId, String taskManagerId) { + String generatedUrl = pattern.replaceAll("", jobId); + if (null != taskManagerId) { + generatedUrl = generatedUrl.replaceAll("", taskManagerId); + } + return generatedUrl; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerLogUrlHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerLogUrlHeaders.java new file mode 100644 index 0000000000000..c55efeb5b36b9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobManagerLogUrlHeaders.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** Headers for the log url retriever of JobManager. */ +public class JobManagerLogUrlHeaders + implements MessageHeaders { + private static final JobManagerLogUrlHeaders INSTANCE = new JobManagerLogUrlHeaders(); + + private static final String URL = "/jobs/:" + JobIDPathParameter.KEY + "/jobmanager/log-url"; + + private JobManagerLogUrlHeaders() {} + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class getResponseClass() { + return LogUrlResponse.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public JobMessageParameters getUnresolvedMessageParameters() { + return new JobMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static JobManagerLogUrlHeaders getInstance() { + return INSTANCE; + } + + @Override + public String getDescription() { + return "Returns the log url of jobmanager of a specific job."; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTaskManagerMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTaskManagerMessageParameters.java new file mode 100644 index 0000000000000..73e554d966792 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTaskManagerMessageParameters.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; + +import java.util.Arrays; +import java.util.Collection; + +/** Message parameters which require a job path parameter and a TaskManager id path parameter. */ +public class JobTaskManagerMessageParameters extends JobMessageParameters { + public final TaskManagerIdPathParameter taskManagerIdParameter = + new TaskManagerIdPathParameter(); + + @Override + public Collection> getPathParameters() { + return Arrays.asList(jobPathParameter, taskManagerIdParameter); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/LogUrlResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/LogUrlResponse.java new file mode 100644 index 0000000000000..1b849a332d8f7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/LogUrlResponse.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** Response of log URLs. */ +public class LogUrlResponse implements ResponseBody { + + public static final String FIELD_NAME_LOG_URL = "url"; + + @JsonProperty(FIELD_NAME_LOG_URL) + private final String url; + + @JsonCreator + public LogUrlResponse(@JsonProperty(FIELD_NAME_LOG_URL) String url) { + this.url = url; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LogUrlResponse that = (LogUrlResponse) o; + return url.equals(that.url); + } + + @Override + public int hashCode() { + return Objects.hash(url); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerLogUrlHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerLogUrlHeaders.java new file mode 100644 index 0000000000000..a3ca0750b665e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/TaskManagerLogUrlHeaders.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** Headers for the log url retriever of TaskManager. */ +public class TaskManagerLogUrlHeaders + implements MessageHeaders< + EmptyRequestBody, LogUrlResponse, JobTaskManagerMessageParameters> { + private static final TaskManagerLogUrlHeaders INSTANCE = new TaskManagerLogUrlHeaders(); + + private static final String URL = + "/jobs/:" + + JobIDPathParameter.KEY + + "/taskmanagers/:" + + TaskManagerIdPathParameter.KEY + + "/log-url"; + + private TaskManagerLogUrlHeaders() {} + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class getResponseClass() { + return LogUrlResponse.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public JobTaskManagerMessageParameters getUnresolvedMessageParameters() { + return new JobTaskManagerMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static TaskManagerLogUrlHeaders getInstance() { + return INSTANCE; + } + + @Override + public String getDescription() { + return "Returns the log url of jobmanager of a specific job."; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index cf5ad90abcc10..6a0c4320a3be7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.rest.handler.cluster.ShutdownHandler; import org.apache.flink.runtime.rest.handler.dataset.ClusterDataSetDeleteHandlers; import org.apache.flink.runtime.rest.handler.dataset.ClusterDataSetListHandler; +import org.apache.flink.runtime.rest.handler.job.GeneratedLogUrlHandler; import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler; import org.apache.flink.runtime.rest.handler.job.JobCancellationHandler; import org.apache.flink.runtime.rest.handler.job.JobConfigHandler; @@ -103,6 +104,7 @@ import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; import org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders; import org.apache.flink.runtime.rest.messages.JobManagerEnvironmentHeaders; +import org.apache.flink.runtime.rest.messages.JobManagerLogUrlHeaders; import org.apache.flink.runtime.rest.messages.JobPlanHeaders; import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders; import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders; @@ -111,6 +113,7 @@ import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import org.apache.flink.runtime.rest.messages.SubtasksAllAccumulatorsHeaders; import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders; +import org.apache.flink.runtime.rest.messages.TaskManagerLogUrlHeaders; import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; import org.apache.flink.runtime.rest.messages.YarnCancelJobTerminationHeaders; import org.apache.flink.runtime.rest.messages.YarnStopJobTerminationHeaders; @@ -627,6 +630,14 @@ protected List> initiali executor, metricFetcher); + final GeneratedLogUrlHandler jobManagerLogUrlHandler = + new GeneratedLogUrlHandler( + localAddressFuture.thenApply(url -> url + "/#/job-manager/logs")); + + final GeneratedLogUrlHandler taskManagerLogUrlHandler = + new GeneratedLogUrlHandler( + localAddressFuture.thenApply(url -> url + "/#/task-manager//logs")); + final SavepointDisposalHandlers savepointDisposalHandlers = new SavepointDisposalHandlers(asyncOperationStoreDuration); @@ -789,6 +800,8 @@ protected List> initiali Tuple2.of( jobManagerJobConfigurationHandler.getMessageHeaders(), jobManagerJobConfigurationHandler)); + handlers.add(Tuple2.of(JobManagerLogUrlHeaders.getInstance(), jobManagerLogUrlHandler)); + handlers.add(Tuple2.of(TaskManagerLogUrlHeaders.getInstance(), taskManagerLogUrlHandler)); final AbstractRestHandler jobVertexFlameGraphHandler; if (clusterConfiguration.get(RestOptions.ENABLE_FLAMEGRAPH)) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/GeneratedLogUrlHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/GeneratedLogUrlHandlerTest.java new file mode 100644 index 0000000000000..528c7115e93bc --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/GeneratedLogUrlHandlerTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Test for the {@link GeneratedLogUrlHandler}. */ +public class GeneratedLogUrlHandlerTest extends TestLogger { + + @Test + public void testGenerateJobManagerLogUrl() { + final String pattern = "http://localhost//log"; + final String jobId = "jobid"; + + final String generatedUrl = GeneratedLogUrlHandler.generateLogUrl(pattern, jobId, null); + + assertEquals(pattern.replace("", jobId), generatedUrl); + } + + @Test + public void testGenerateTaskManagerLogUrl() { + final String pattern = "http://localhost//tm//log"; + final String jobId = "jobid"; + final String taskManagerId = "tmid"; + + final String generatedUrl = + GeneratedLogUrlHandler.generateLogUrl(pattern, jobId, taskManagerId); + + assertEquals( + pattern.replace("", jobId).replace("", taskManagerId), generatedUrl); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/LogUrlResponseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/LogUrlResponseTest.java new file mode 100644 index 0000000000000..e658bd9fe7b6c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/LogUrlResponseTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +/** Tests that the {@link LogUrlResponse} can be marshalled and unmarshalled. */ +public class LogUrlResponseTest extends RestResponseMarshallingTestBase { + @Override + protected Class getTestResponseClass() { + return LogUrlResponse.class; + } + + @Override + protected LogUrlResponse getTestResponseInstance() { + return new LogUrlResponse("http://localhost:8081/log"); + } +}