Skip to content

Commit

Permalink
[FLINK-4829] snapshot accumulators on a best-effort basis
Browse files Browse the repository at this point in the history
Heartbeats should not fail when accumulators could not be snapshotted. Instead,
we should simply skip the reporting of the failed accumulator. Eventually, the
accumulator will be reported; at the latest, when the job finishes.

This closes apache#2649
  • Loading branch information
mxm committed Oct 18, 2016
1 parent 783dca5 commit d95929e
Showing 1 changed file with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import com.codahale.metrics.jvm.{BufferPoolMetricSet, GarbageCollectorMetricSet,
import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
import com.fasterxml.jackson.databind.ObjectMapper
import grizzled.slf4j.Logger
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.flink.configuration._
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType}
Expand Down Expand Up @@ -1335,9 +1336,15 @@ class TaskManager(

runningTasks.asScala foreach {
case (execID, task) =>
val registry = task.getAccumulatorRegistry
val accumulators = registry.getSnapshot
accumulatorEvents.append(accumulators)
try {
val registry = task.getAccumulatorRegistry
val accumulators = registry.getSnapshot
accumulatorEvents.append(accumulators)
} catch {
case e: Exception =>
log.warn("Failed to take accumulator snapshot for task {}.",
execID, ExceptionUtils.getRootCause(e))
}
}

currentJobManager foreach {
Expand Down

0 comments on commit d95929e

Please sign in to comment.