Skip to content

Commit

Permalink
[FLINK-3131] [runtime-web] Add checkpoint statistics handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
uce authored and StephanEwen committed Dec 30, 2015
1 parent 2c7e63a commit 002e0ab
Show file tree
Hide file tree
Showing 5 changed files with 540 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCheckpointsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
Expand All @@ -46,6 +47,7 @@
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
Expand Down Expand Up @@ -165,6 +167,7 @@ public WebRuntimeMonitor(
.GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/checkpoints", handler(new JobVertexCheckpointsHandler(currentGraphs)))

.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs)))
Expand All @@ -175,6 +178,7 @@ public WebRuntimeMonitor(
.GET("/jobs/:jobid/config", handler(new JobConfigHandler(currentGraphs)))
.GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs)))
.GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs)))
.GET("/jobs/:jobid/checkpoints", handler(new JobCheckpointsHandler(currentGraphs)))

.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
Expand All @@ -194,7 +198,6 @@ public WebRuntimeMonitor(
// this handler serves all the static contents
.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir));


synchronized (startupShutdownLock) {

// add shutdown hook for deleting the directory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStats;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import scala.Option;

import java.io.StringWriter;
import java.util.Map;

/**
* Request handler that returns checkpoint stats for a job.
*/
public class JobCheckpointsHandler extends AbstractExecutionGraphRequestHandler
implements RequestHandler.JsonResponse {

public JobCheckpointsHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}

@Override
public String handleRequest(ExecutionGraph graph, Map<String, String> params) throws Exception {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);

CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();

gen.writeStartObject();

if (tracker != null) {
Option<JobCheckpointStats> stats = tracker.getJobStats();

if (stats.isDefined()) {
// Total number of checkpoints
gen.writeNumberField("count", stats.get().getCount());

// Duration
gen.writeFieldName("duration");
gen.writeStartObject();
gen.writeNumberField("min", stats.get().getMinDuration());
gen.writeNumberField("max", stats.get().getMaxDuration());
gen.writeNumberField("avg", stats.get().getAverageDuration());
gen.writeEndObject();

// State size
gen.writeFieldName("size");
gen.writeStartObject();
gen.writeNumberField("min", stats.get().getMinStateSize());
gen.writeNumberField("max", stats.get().getMaxStateSize());
gen.writeNumberField("avg", stats.get().getAverageStateSize());
gen.writeEndObject();

// Recent history
gen.writeArrayFieldStart("history");
for (CheckpointStats checkpoint : stats.get().getRecentHistory()) {
gen.writeStartObject();
gen.writeNumberField("id", checkpoint.getCheckpointId());
gen.writeNumberField("timestamp", checkpoint.getTriggerTimestamp());
gen.writeNumberField("duration", checkpoint.getDuration());
gen.writeNumberField("size", checkpoint.getStateSize());
gen.writeEndObject();
}
gen.writeEndArray();
}
}

gen.writeEndObject();
gen.close();

return writer.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.webmonitor.handlers;

import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import scala.Option;

import java.io.StringWriter;
import java.util.Map;

/**
* Request handler that returns checkpoint stats for a single job vertex.
*/
public class JobVertexCheckpointsHandler extends AbstractJobVertexRequestHandler
implements RequestHandler.JsonResponse {

public JobVertexCheckpointsHandler(ExecutionGraphHolder executionGraphHolder) {
super(executionGraphHolder);
}

@Override
public String handleRequest(ExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
gen.writeStartObject();

CheckpointStatsTracker tracker = jobVertex.getGraph().getCheckpointStatsTracker();

if (tracker != null) {
Option<OperatorCheckpointStats> statsOption = tracker
.getOperatorStats(jobVertex.getJobVertexId());

if (statsOption.isDefined()) {
OperatorCheckpointStats stats = statsOption.get();

gen.writeNumberField("id", stats.getCheckpointId());
gen.writeNumberField("timestamp", stats.getTriggerTimestamp());
gen.writeNumberField("duration", stats.getDuration());
gen.writeNumberField("size", stats.getStateSize());
gen.writeNumberField("parallelism", stats.getNumberOfSubTasks());

gen.writeArrayFieldStart("subtasks");
for (int i = 0; i < stats.getNumberOfSubTasks(); i++) {
gen.writeStartObject();
gen.writeNumberField("subtask", i);
gen.writeNumberField("duration", stats.getSubTaskDuration(i));
gen.writeNumberField("size", stats.getSubTaskStateSize(i));
gen.writeEndObject();
}
gen.writeEndArray();
}
}

gen.writeEndObject();
gen.close();

return writer.toString();
}

}
Loading

0 comments on commit 002e0ab

Please sign in to comment.