Skip to content

Commit

Permalink
[FLINK-2206] Fix incorrect counts of finished, canceled, and failed j…
Browse files Browse the repository at this point in the history
…obs in webinterface

This closes apache#826
  • Loading branch information
fhueske committed Jun 12, 2015
1 parent 1e7d0bb commit 42d1917
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.flink.util.StringUtils;
import org.eclipse.jetty.io.EofException;

import scala.Tuple3;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
Expand Down Expand Up @@ -117,6 +118,20 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se
writeJsonForArchive(resp.getWriter(), archivedJobs);
}
}
else if("jobcounts".equals(req.getParameter("get"))) {
response = Patterns.ask(archive, ArchiveMessages.getRequestJobCounts(),
new Timeout(timeout));

result = Await.result(response, timeout);

if(!(result instanceof Tuple3)) {
throw new RuntimeException("RequestJobCounts requires a response of type " +
"Tuple3. Instead the response is of type " + result.getClass() +
".");
} else {
writeJsonForJobCounts(resp.getWriter(), (Tuple3)result);
}
}
else if("job".equals(req.getParameter("get"))) {
String jobId = req.getParameter("job");

Expand Down Expand Up @@ -340,6 +355,22 @@ public int compare(ExecutionGraph o1, ExecutionGraph o2) {

}

/**
* Writes Json with the job counts
*
* @param wrt
* @param counts
*/
private void writeJsonForJobCounts(PrintWriter wrt, Tuple3<Integer, Integer, Integer> jobCounts) {

wrt.write("{");
wrt.write("\"finished\": " + jobCounts._1() + ",");
wrt.write("\"canceled\": " + jobCounts._2() + ",");
wrt.write("\"failed\": " + jobCounts._3());
wrt.write("}");

}

/**
* Writes infos about archived job in Json format, including groupvertices and groupverticetimes
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,22 @@ function poll(jobId) {
});
})();

/*
* Polls the job execution counts on page load and every 2 seconds
*/
(function pollJobCounts() {
$.ajax({ url : "jobsInfo?get=jobcounts", cache: false, type : "GET",
success : function(json) {

$("#jobs-finished").html(json.finished);
$("#jobs-canceled").html(json.canceled);
$("#jobs-failed").html(json.failed);

}, dataType : "json",
});
setTimeout(pollJobCounts, 2000);
})();

/*
* Polls the number of taskmanagers on page load
*/
Expand Down Expand Up @@ -418,20 +434,12 @@ function updateTable(json) {
}
}

var archive_finished = 0;
var archive_failed = 0;
var archive_canceled = 0;

/*
* Creates job history table
*/
function fillTableArchive(table, json) {
$(table).html("");

$("#jobs-finished").html(archive_finished);
$("#jobs-failed").html(archive_failed);
$("#jobs-canceled").html(archive_canceled);


$.each(json, function(i, job) {
_fillTableArchive(table, job, false)
});
Expand Down Expand Up @@ -459,14 +467,4 @@ function _fillTableArchive(table, job, prepend) {
+ job.jobname + " ("
+ formattedTimeFromTimestamp(parseInt(job.time))
+ ")</a></li>");
if (job.status == "FINISHED")
archive_finished++;
if (job.status == "FAILED")
archive_failed++;
if (job.status == "CANCELED")
archive_canceled++;

$("#jobs-finished").html(archive_finished);
$("#jobs-failed").html(archive_failed);
$("#jobs-canceled").html(archive_canceled);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager

import akka.actor.Actor
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.executiongraph.ExecutionGraph
import org.apache.flink.runtime.messages.ArchiveMessages._
Expand All @@ -45,6 +46,8 @@ import scala.collection.mutable
* then a [[CurrentJobStatus]] message with the last state is returned to the sender, otherwise
* a [[JobNotFound]] message is returned
*
* - [[RequestJobCounts]] returns the number of finished, canceled, and failed jobs as a Tuple3
*
* @param max_entries Maximum number of stored Flink jobs
*/
class MemoryArchivist(private val max_entries: Int)
Expand All @@ -57,12 +60,23 @@ class MemoryArchivist(private val max_entries: Int)
*/
val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]()

/* Counters for finished, canceled, and failed jobs */
var finishedCnt: Int = 0
var canceledCnt: Int = 0
var failedCnt: Int = 0

override def receiveWithLogMessages: Receive = {

/* Receive Execution Graph to archive */
case ArchiveExecutionGraph(jobID, graph) =>
// wrap graph inside a soft reference
graphs.update(jobID, graph)
// update job counters
graph.getState match {
case JobStatus.FINISHED => finishedCnt += 1
case JobStatus.CANCELED => canceledCnt += 1
case JobStatus.FAILED => failedCnt += 1
}
trimHistory()

case RequestArchivedJob(jobID: JobID) =>
Expand All @@ -83,6 +97,9 @@ class MemoryArchivist(private val max_entries: Int)
case Some(graph) => sender ! CurrentJobStatus(jobID, graph.getState)
case None => sender ! JobNotFound(jobID)
}

case RequestJobCounts =>
sender ! (finishedCnt, canceledCnt, failedCnt)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ object ArchiveMessages {
*/
case object RequestArchivedJobs

/**
* Requests the number of finished, canceled, and failed jobs
*/
case object RequestJobCounts

/**
* Reqeuest a specific ExecutionGraph by JobID. The response is [[RequestArchivedJob]]
* @param jobID
Expand All @@ -56,12 +61,16 @@ object ArchiveMessages {
jobs.asJavaCollection
}
}

// --------------------------------------------------------------------------
// Utility methods to allow simpler case object access from Java
// --------------------------------------------------------------------------

def getRequestArchivedJobs : AnyRef = {
RequestArchivedJobs
}

def getRequestJobCounts : AnyRef = {
RequestJobCounts
}
}

0 comments on commit 42d1917

Please sign in to comment.