Skip to content

Commit

Permalink
[FLINK-28312][rest] Introduce REST APIs for log URL retrieval
Browse files Browse the repository at this point in the history
This closes apache#20179.
  • Loading branch information
KarmaGYZ committed Jul 7, 2022
1 parent bbd5a81 commit a99fc3f
Show file tree
Hide file tree
Showing 14 changed files with 725 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@
<td>Integer</td>
<td>The maximum number of jobs to retain in each archive directory defined by `historyserver.archive.fs.dir`. If set to `-1`(default), there is no limit to the number of archives. If set to `0` or less than `-1` HistoryServer will throw an <code class="highlighter-rouge">IllegalConfigurationException</code>. </td>
</tr>
<tr>
<td><h5>historyserver.log.jobmanager.url-pattern</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Pattern of the log URL of JobManager. The HistoryServer will generate actual URLs from it, with replacing the special placeholders, `&lt;jobid&gt;`, to the id of job.</td>
</tr>
<tr>
<td><h5>historyserver.log.taskmanager.url-pattern</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Pattern of the log URL of TaskManager. The HistoryServer will generate actual URLs from it, with replacing the special placeholders, `&lt;jobid&gt;` and `&lt;tmid&gt;`, to the id of job and TaskManager respectively.</td>
</tr>
<tr>
<td><h5>historyserver.web.address</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
141 changes: 141 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -3203,6 +3203,76 @@
</tr>
</tbody>
</table>
<table class="rest-api table table-bordered">
<tbody>
<tr>
<td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/jobmanager/log-url</strong></h5></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
<td class="text-left">Response code: <code>200 OK</code></td>
</tr>
<tr>
<td colspan="2">Returns the log url of jobmanager of a specific job.</td>
</tr>
<tr>
<td colspan="2">Path parameters</td>
</tr>
<tr>
<td colspan="2">
<ul>
<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
<div class="book-expand">
<label>
<div class="book-expand-head flex justify-between">
<span>Request</span>
&nbsp; <span></span>
</div>
<input type="checkbox" class="hidden">
<div class="book-expand-content markdown-inner">
<pre>
<code>
{} </code>
</pre>
</div>
</label>
</div>
</td>
</tr>
<tr>
<td colspan="2">
<div class="book-expand">
<label>
<div class="book-expand-head flex justify-between">
<span>Response</span>
&nbsp; <span></span>
</div>
<input type="checkbox" class="hidden">
<div class="book-expand-content markdown-inner">
<pre>
<code>
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogUrlResponse",
"properties" : {
"url" : {
"type" : "string"
}
}
} </code>
</pre>
</div>
</label>
</div>
</td>
</tr>
</tbody>
</table>
<table class="rest-api table table-bordered">
<tbody>
<tr>
Expand Down Expand Up @@ -3853,6 +3923,77 @@
</tr>
</tbody>
</table>
<table class="rest-api table table-bordered">
<tbody>
<tr>
<td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/taskmanagers/:taskmanagerid/log-url</strong></h5></td>
</tr>
<tr>
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
<td class="text-left">Response code: <code>200 OK</code></td>
</tr>
<tr>
<td colspan="2">Returns the log url of jobmanager of a specific job.</td>
</tr>
<tr>
<td colspan="2">Path parameters</td>
</tr>
<tr>
<td colspan="2">
<ul>
<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li>
<li><code>taskmanagerid</code> - 32-character hexadecimal string that identifies a task manager.</li>
</ul>
</td>
</tr>
<tr>
<td colspan="2">
<div class="book-expand">
<label>
<div class="book-expand-head flex justify-between">
<span>Request</span>
&nbsp; <span></span>
</div>
<input type="checkbox" class="hidden">
<div class="book-expand-content markdown-inner">
<pre>
<code>
{} </code>
</pre>
</div>
</label>
</div>
</td>
</tr>
<tr>
<td colspan="2">
<div class="book-expand">
<label>
<div class="book-expand-head flex justify-between">
<span>Response</span>
&nbsp; <span></span>
</div>
<input type="checkbox" class="hidden">
<div class="book-expand-content markdown-inner">
<pre>
<code>
{
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogUrlResponse",
"properties" : {
"url" : {
"type" : "string"
}
}
} </code>
</pre>
</div>
</label>
</div>
</td>
</tr>
</tbody>
</table>
<table class="rest-api table table-bordered">
<tbody>
<tr>
Expand Down
47 changes: 47 additions & 0 deletions docs/static/generated/rest_v1_dispatcher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,24 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/EnvironmentInfo'
/jobs/{jobid}/jobmanager/log-url:
get:
description: Returns the log url of jobmanager of a specific job.
operationId: getJobManagerLogUrl
parameters:
- name: jobid
in: path
description: 32-character hexadecimal string value that identifies a job.
required: true
schema:
$ref: '#/components/schemas/JobID'
responses:
"200":
description: The request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/LogUrlResponse'
/jobs/{jobid}/metrics:
get:
description: Provides access to job metrics.
Expand Down Expand Up @@ -865,6 +883,30 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/TriggerResponse'
/jobs/{jobid}/taskmanagers/{taskmanagerid}/log-url:
get:
description: Returns the log url of jobmanager of a specific job.
operationId: getTaskManagerLogUrl
parameters:
- name: jobid
in: path
description: 32-character hexadecimal string value that identifies a job.
required: true
schema:
$ref: '#/components/schemas/JobID'
- name: taskmanagerid
in: path
description: 32-character hexadecimal string that identifies a task manager.
required: true
schema:
$ref: '#/components/schemas/ResourceID'
responses:
"200":
description: The request was successful.
content:
application/json:
schema:
$ref: '#/components/schemas/LogUrlResponse'
/jobs/{jobid}/vertices/{vertexid}:
get:
description: "Returns details for a task, with a summary for each of its subtasks."
Expand Down Expand Up @@ -1916,6 +1958,11 @@ components:
properties:
status:
$ref: '#/components/schemas/JobStatus'
LogUrlResponse:
type: object
properties:
url:
type: string
JobAccumulator:
type: object
Id:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,29 @@ public class HistoryServerOptions {
+ " that are no longer present `%s`.",
HISTORY_SERVER_ARCHIVE_DIRS.key()));

/**
* Pattern of the log URL of TaskManager. The HistoryServer will generate actual URLs from it.
*/
public static final ConfigOption<String> HISTORY_SERVER_TASKMANAGER_LOG_URL_PATTERN =
key("historyserver.log.taskmanager.url-pattern")
.stringType()
.noDefaultValue()
.withDescription(
"Pattern of the log URL of TaskManager. The HistoryServer will generate actual URLs from it,"
+ " with replacing the special placeholders, `<jobid>` and `<tmid>`, to the id of job"
+ " and TaskManager respectively.");

/**
* Pattern of the log URL of JobManager. The HistoryServer will generate actual URLs from it.
*/
public static final ConfigOption<String> HISTORY_SERVER_JOBMANAGER_LOG_URL_PATTERN =
key("historyserver.log.jobmanager.url-pattern")
.stringType()
.noDefaultValue()
.withDescription(
"Pattern of the log URL of JobManager. The HistoryServer will generate actual URLs from it,"
+ " with replacing the special placeholders, `<jobid>`, to the id of job.");

/** The local directory used by the HistoryServer web-frontend. */
public static final ConfigOption<String> HISTORY_SERVER_WEB_DIR =
key("historyserver.web.tmpdir")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@
import org.apache.flink.runtime.history.FsJobArchivist;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.job.GeneratedLogUrlHandler;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.JobManagerLogUrlHeaders;
import org.apache.flink.runtime.rest.messages.TaskManagerLogUrlHeaders;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
Expand All @@ -44,6 +47,7 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -63,6 +67,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -285,6 +290,25 @@ void start() throws IOException, InterruptedException {
LOG.info("Using directory {} as local cache.", webDir);

Router router = new Router();

final String jobManagerPattern =
config.get(HistoryServerOptions.HISTORY_SERVER_JOBMANAGER_LOG_URL_PATTERN);
if (!StringUtils.isNullOrWhitespaceOnly(jobManagerPattern)) {
router.addGet(
JobManagerLogUrlHeaders.getInstance().getTargetRestEndpointURL(),
new GeneratedLogUrlHandler(
CompletableFuture.completedFuture(jobManagerPattern)));
}

final String taskManagerPattern =
config.get(HistoryServerOptions.HISTORY_SERVER_TASKMANAGER_LOG_URL_PATTERN);
if (!StringUtils.isNullOrWhitespaceOnly(taskManagerPattern)) {
router.addGet(
TaskManagerLogUrlHeaders.getInstance().getTargetRestEndpointURL(),
new GeneratedLogUrlHandler(
CompletableFuture.completedFuture(taskManagerPattern)));
}

router.addGet("/:*", new HistoryServerStaticFileServerHandler(webDir));

createDashboardConfigFile();
Expand Down
52 changes: 52 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -1979,6 +1979,31 @@
}
}
}
}, {
"url" : "/jobs/:jobid/jobmanager/log-url",
"method" : "GET",
"status-code" : "200 OK",
"file-upload" : false,
"path-parameters" : {
"pathParameters" : [ {
"key" : "jobid"
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
},
"request" : {
"type" : "any"
},
"response" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogUrlResponse",
"properties" : {
"url" : {
"type" : "string"
}
}
}
}, {
"url" : "/jobs/:jobid/metrics",
"method" : "GET",
Expand Down Expand Up @@ -2238,6 +2263,33 @@
}
}
}
}, {
"url" : "/jobs/:jobid/taskmanagers/:taskmanagerid/log-url",
"method" : "GET",
"status-code" : "200 OK",
"file-upload" : false,
"path-parameters" : {
"pathParameters" : [ {
"key" : "jobid"
}, {
"key" : "taskmanagerid"
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
},
"request" : {
"type" : "any"
},
"response" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogUrlResponse",
"properties" : {
"url" : {
"type" : "string"
}
}
}
}, {
"url" : "/jobs/:jobid/vertices/:vertexid",
"method" : "GET",
Expand Down
Loading

0 comments on commit a99fc3f

Please sign in to comment.