Skip to content

Commit

Permalink
[FLINK-19518] Show proper job duration for running jobs in web ui
Browse files Browse the repository at this point in the history
  • Loading branch information
rmetzger committed Oct 8, 2020
1 parent 5c7935a commit 6f5dffb
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,55 @@ public void testCancel() throws Exception {
BlockingInvokable.reset();
}

/**
* See FLINK-19518. This test ensures that the /jobs/overview handler shows a duration != 0.
*
*/
@Test
public void testJobOverviewHandler() throws Exception {
// this only works if there is no active job at this point
assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());

// Create a task
final JobVertex sender = new JobVertex("Sender");
sender.setParallelism(2);
sender.setInvokableClass(BlockingInvokable.class);

final JobGraph jobGraph = new JobGraph("Stoppable streaming test job", sender);
final JobID jid = jobGraph.getJobID();

ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
clusterClient.submitJob(jobGraph).get();

// wait for job to show up
while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(10);
}

// wait for tasks to be properly running
BlockingInvokable.latch.await();

final Duration testTimeout = Duration.ofMinutes(2);
final LocalTime deadline = LocalTime.now().plus(testTimeout);

String json = TestBaseUtils.getFromHTTP("http:https://localhost:" + getRestPort() + "/jobs/overview");

ObjectMapper mapper = new ObjectMapper();
JsonNode parsed = mapper.readTree(json);
ArrayNode jsonJobs = (ArrayNode) parsed.get("jobs");
assertEquals(1, jsonJobs.size());
assertThat("Duration must be positive", jsonJobs.get(0).get("duration").asInt() > 0);

clusterClient.cancel(jobGraph.getJobID()).get();

// ensure cancellation is finished
while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
Thread.sleep(20);
}

BlockingInvokable.reset();
}

@Test
public void testCancelYarn() throws Exception {
// this only works if there is no active job at this point
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
Expand Down Expand Up @@ -138,19 +137,9 @@ public CompletableFuture<DispatcherJobResult> getResultFuture() {
}

public CompletableFuture<JobDetails> requestJobDetails(Time timeout) {
return requestJobStatus(timeout).thenApply(status -> {
int[] tasksPerState = new int[ExecutionState.values().length];
return requestJob(timeout).thenApply(executionGraph -> {
synchronized (lock) {
return new JobDetails(
jobId,
jobName,
initializationTimestamp,
0,
0,
status,
0,
tasksPerState,
0);
return JobDetails.createDetailsForJob(executionGraph);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -177,7 +176,7 @@ public void put(ArchivedExecutionGraph archivedExecutionGraph) throws IOExceptio
// write the ArchivedExecutionGraph to disk
storeArchivedExecutionGraph(archivedExecutionGraph);

final JobDetails detailsForJob = WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
final JobDetails detailsForJob = JobDetails.createDetailsForJob(archivedExecutionGraph);

jobDetailsCache.put(jobId, detailsForJob);
archivedExecutionGraphCache.put(jobId, archivedExecutionGraph);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -69,7 +68,7 @@ public JobsOverview getStoredJobsOverview() {
@Override
public Collection<JobDetails> getAvailableJobDetails() {
return serializableExecutionGraphs.values().stream()
.map(WebMonitorUtils::createDetailsForJob)
.map(JobDetails::createDetailsForJob)
.collect(Collectors.toList());
}

Expand All @@ -79,7 +78,7 @@ public JobDetails getAvailableJobDetails(JobID jobId) {
final ArchivedExecutionGraph archivedExecutionGraph = serializableExecutionGraphs.get(jobId);

if (archivedExecutionGraph != null) {
return WebMonitorUtils.createDetailsForJob(archivedExecutionGraph);
return JobDetails.createDetailsForJob(archivedExecutionGraph);
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
Expand Down Expand Up @@ -98,6 +101,42 @@ public JobDetails(
this.tasksPerState = checkNotNull(tasksPerState);
this.numTasks = numTasks;
}

public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
JobStatus status = job.getState();

long started = job.getStatusTimestamp(JobStatus.INITIALIZING);
long finished = status.isGloballyTerminalState() ? job.getStatusTimestamp(status) : -1L;
long duration = (finished >= 0L ? finished : System.currentTimeMillis()) - started;

int[] countsPerStatus = new int[ExecutionState.values().length];
long lastChanged = 0;
int numTotalTasks = 0;

for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) {
AccessExecutionVertex[] vertices = ejv.getTaskVertices();
numTotalTasks += vertices.length;

for (AccessExecutionVertex vertex : vertices) {
ExecutionState state = vertex.getExecutionState();
countsPerStatus[state.ordinal()]++;
lastChanged = Math.max(lastChanged, vertex.getStateTimestamp(state));
}
}

lastChanged = Math.max(lastChanged, finished);

return new JobDetails(
job.getJobID(),
job.getJobName(),
started,
finished,
duration,
status,
lastChanged,
countsPerStatus,
numTotalTasks);
}

// ------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
Expand All @@ -30,7 +31,6 @@
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
Expand Down Expand Up @@ -67,7 +67,7 @@ protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerR

@Override
public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
ResponseBody json = new MultipleJobsDetails(Collections.singleton(WebMonitorUtils.createDetailsForJob(graph)));
ResponseBody json = new MultipleJobsDetails(Collections.singleton(JobDetails.createDetailsForJob(graph)));
String path = getMessageHeaders().getTargetRestEndpointURL()
.replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString());
return Collections.singletonList(new ArchivedJson(path, json));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
Expand Down Expand Up @@ -665,7 +664,7 @@ public JobStatus requestJobStatus() {
@Override
public JobDetails requestJobDetails() {
mainThreadExecutor.assertRunningInMainThread();
return WebMonitorUtils.createDetailsForJob(executionGraph);
return JobDetails.createDetailsForJob(executionGraph);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,11 @@

package org.apache.flink.runtime.webmonitor;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -224,42 +218,6 @@ public static Map<String, String> fromKeyValueJsonArray(String jsonString) {
}
}

public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
JobStatus status = job.getState();

long started = job.getStatusTimestamp(JobStatus.INITIALIZING);
long finished = status.isGloballyTerminalState() ? job.getStatusTimestamp(status) : -1L;
long duration = (finished >= 0L ? finished : System.currentTimeMillis()) - started;

int[] countsPerStatus = new int[ExecutionState.values().length];
long lastChanged = 0;
int numTotalTasks = 0;

for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) {
AccessExecutionVertex[] vertices = ejv.getTaskVertices();
numTotalTasks += vertices.length;

for (AccessExecutionVertex vertex : vertices) {
ExecutionState state = vertex.getExecutionState();
countsPerStatus[state.ordinal()]++;
lastChanged = Math.max(lastChanged, vertex.getStateTimestamp(state));
}
}

lastChanged = Math.max(lastChanged, finished);

return new JobDetails(
job.getJobID(),
job.getJobName(),
started,
finished,
duration,
status,
lastChanged,
countsPerStatus,
numTotalTasks);
}

/**
* Checks and normalizes the given URI. This method first checks the validity of the
* URI (scheme and path are not null) and then normalizes the URI to a path.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.ManualTicker;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -366,6 +365,6 @@ private static Matcher<ArchivedExecutionGraph> matchesPartiallyWith(ArchivedExec
}

private static Collection<JobDetails> generateJobDetails(Collection<ArchivedExecutionGraph> executionGraphs) {
return executionGraphs.stream().map(WebMonitorUtils::createDetailsForJob).collect(Collectors.toList());
return executionGraphs.stream().map(JobDetails::createDetailsForJob).collect(Collectors.toList());
}
}

0 comments on commit 6f5dffb

Please sign in to comment.