Skip to content

Commit

Permalink
[FLINK-13949][rest] Replace duplicated JobVertexDetailsInfo.VertexTas…
Browse files Browse the repository at this point in the history
…kDetail with SubtaskExecutionAttemptDetailsInfo

This closes apache#9699.
  • Loading branch information
jerry-024 authored and tillrohrmann committed Sep 18, 2019
1 parent 155179f commit 88a918b
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 209 deletions.
2 changes: 1 addition & 1 deletion docs/_includes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -2516,7 +2516,7 @@
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexDetailsInfo:VertexTaskDetail",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:SubtaskExecutionAttemptDetailsInfo",
"properties" : {
"subtask" : {
"type" : "integer"
Expand Down
26 changes: 16 additions & 10 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -1637,7 +1637,7 @@
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexDetailsInfo:VertexTaskDetail",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobVertexDetailsInfo:SubtaskExecutionAttemptDetailsInfo",
"properties" : {
"subtask" : {
"type" : "integer"
Expand Down Expand Up @@ -1691,11 +1691,11 @@
}
}
},
"start_time" : {
"type" : "integer"
},
"taskmanager-id" : {
"type" : "string"
},
"start_time" : {
"type" : "integer"
}
}
}
Expand Down Expand Up @@ -1967,9 +1967,6 @@
"duration" : {
"type" : "integer"
},
"taskmanager-id" : {
"type" : "string"
},
"metrics" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
Expand Down Expand Up @@ -1999,6 +1996,12 @@
"type" : "boolean"
}
}
},
"taskmanager-id" : {
"type" : "string"
},
"start_time" : {
"type" : "integer"
}
}
}
Expand Down Expand Up @@ -2050,9 +2053,6 @@
"duration" : {
"type" : "integer"
},
"taskmanager-id" : {
"type" : "string"
},
"metrics" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:metrics:IOMetricsInfo",
Expand Down Expand Up @@ -2082,6 +2082,12 @@
"type" : "boolean"
}
}
},
"taskmanager-id" : {
"type" : "string"
},
"start_time" : {
"type" : "integer"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
Expand All @@ -29,16 +29,14 @@
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexDetailsInfo;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
Expand Down Expand Up @@ -107,49 +105,12 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
}

private static JobVertexDetailsInfo createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher metricFetcher) {
List<JobVertexDetailsInfo.VertexTaskDetail> subtasks = new ArrayList<>();
List<SubtaskExecutionAttemptDetailsInfo> subtasks = new ArrayList<>();
final long now = System.currentTimeMillis();
int num = 0;
for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) {
final ExecutionState status = vertex.getExecutionState();

TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
String locationString = location == null ? "(unassigned)" : location.getHostname() + ":" + location.dataPort();
String resourceId = location == null ? "(unassigned)" : location.getResourceID().getResourceIdString();

long startTime = vertex.getStateTimestamp(ExecutionState.DEPLOYING);
if (startTime == 0) {
startTime = -1;
}
long endTime = status.isTerminal() ? vertex.getStateTimestamp(status) : -1;
long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;

MutableIOMetrics counts = new MutableIOMetrics();
counts.addIOMetrics(
vertex.getCurrentExecutionAttempt(),
metricFetcher,
jobID.toString(),
jobVertex.getJobVertexId().toString());
subtasks.add(new JobVertexDetailsInfo.VertexTaskDetail(
num,
status,
vertex.getCurrentExecutionAttempt().getAttemptNumber(),
locationString,
startTime,
endTime,
duration,
new IOMetricsInfo(
counts.getNumBytesIn(),
counts.isNumBytesInComplete(),
counts.getNumBytesOut(),
counts.isNumBytesOutComplete(),
counts.getNumRecordsIn(),
counts.isNumRecordsInComplete(),
counts.getNumRecordsOut(),
counts.isNumRecordsOutComplete()),
resourceId));

num++;
final AccessExecution execution = vertex.getCurrentExecutionAttempt();
final JobVertexID jobVertexID = jobVertex.getJobVertexId();
subtasks.add(SubtaskExecutionAttemptDetailsInfo.create(execution, metricFetcher, jobID, jobVertexID));
}

return new JobVertexDetailsInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
Expand Down Expand Up @@ -69,18 +68,9 @@ protected SubtaskExecutionAttemptDetailsInfo handleRequest(

final AccessExecution execution = executionVertex.getCurrentExecutionAttempt();

final MutableIOMetrics ioMetrics = new MutableIOMetrics();

final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);

ioMetrics.addIOMetrics(
execution,
metricFetcher,
jobID.toString(),
jobVertexID.toString()
);

return SubtaskExecutionAttemptDetailsInfo.create(execution, ioMetrics);
return SubtaskExecutionAttemptDetailsInfo.create(execution, metricFetcher, jobID, jobVertexID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
Expand All @@ -45,8 +44,6 @@
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -95,15 +92,15 @@ protected SubtaskExecutionAttemptDetailsInfo handleRequest(
final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);

return createDetailsInfo(execution, jobID, jobVertexID, metricFetcher);
return SubtaskExecutionAttemptDetailsInfo.create(execution, metricFetcher, jobID, jobVertexID);
}

@Override
public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
List<ArchivedJson> archive = new ArrayList<>(16);
for (AccessExecutionJobVertex task : graph.getAllVertices().values()) {
for (AccessExecutionVertex subtask : task.getTaskVertices()) {
ResponseBody curAttemptJson = createDetailsInfo(subtask.getCurrentExecutionAttempt(), graph.getJobID(), task.getJobVertexId(), null);
ResponseBody curAttemptJson = SubtaskExecutionAttemptDetailsInfo.create(subtask.getCurrentExecutionAttempt(), null, graph.getJobID(), task.getJobVertexId());
String curAttemptPath = getMessageHeaders().getTargetRestEndpointURL()
.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
Expand All @@ -115,7 +112,7 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
AccessExecution attempt = subtask.getPriorExecutionAttempt(x);
if (attempt != null) {
ResponseBody json = createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null);
ResponseBody json = SubtaskExecutionAttemptDetailsInfo.create(attempt, null, graph.getJobID(), task.getJobVertexId());
String path = getMessageHeaders().getTargetRestEndpointURL()
.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString())
.replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
Expand All @@ -128,21 +125,4 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
}
return archive;
}

private static SubtaskExecutionAttemptDetailsInfo createDetailsInfo(
AccessExecution execution,
JobID jobID,
JobVertexID jobVertexID,
@Nullable MetricFetcher metricFetcher) {
final MutableIOMetrics ioMetrics = new MutableIOMetrics();

ioMetrics.addIOMetrics(
execution,
metricFetcher,
jobID.toString(),
jobVertexID.toString()
);

return SubtaskExecutionAttemptDetailsInfo.create(execution, ioMetrics);
}
}
Loading

0 comments on commit 88a918b

Please sign in to comment.