Skip to content

Commit

Permalink
[SPARK-37709][K8S] Add AVERAGE_DURATION executor roll policy
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to add `AVERAGE_DURATION` executor roll policy which chooses the executor whose average task time is the highest.

### Why are the changes needed?

This is helpful to detect straggler executors.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with the newly added test cases.

Closes apache#34977 from dongjoon-hyun/SPARK-37709.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Dec 21, 2021
1 parent e415315 commit 1eb5b88
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private[spark] object Config extends Logging {
.createWithDefault(0)

object ExecutorRollPolicy extends Enumeration {
val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, FAILED_TASKS = Value
val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS = Value
}

val EXECUTOR_ROLL_POLICY =
Expand All @@ -160,6 +160,7 @@ private[spark] object Config extends Logging {
"ADD_TIME policy chooses an executor with the smallest add-time. " +
"TOTAL_GC_TIME policy chooses an executor with the biggest total task GC time. " +
"TOTAL_DURATION policy chooses an executor with the biggest total task time. " +
"AVERAGE_DURATION policy chooses an executor with the biggest average task time. " +
"FAILED_TASKS policy chooses an executor with the most number of failed tasks.")
.version("3.3.0")
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
listWithoutDriver.sortBy(_.totalGCTime).reverse
case ExecutorRollPolicy.TOTAL_DURATION =>
listWithoutDriver.sortBy(_.totalDuration).reverse
case ExecutorRollPolicy.AVERAGE_DURATION =>
listWithoutDriver.sortBy(e => e.totalDuration.toFloat / e.totalTasks).reverse
case ExecutorRollPolicy.FAILED_TASKS =>
listWithoutDriver.sortBy(_.failedTasks).reverse
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
// The biggest totalDuration
val execWithBiggestTotalDuration = new ExecutorSummary("4", "host:port", true, 1,
10, 10, 1, 1, 1,
0, 0, 1, 400,
0, 0, 4, 400,
1, 100, 100,
10, false, 20, new Date(1639300003000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
Expand All @@ -83,8 +83,18 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
false, Set())

// The biggest average duration (= totalDuration / totalTask)
val execWithBiggestAverageDuration = new ExecutorSummary("6", "host:port", true, 1,
10, 10, 1, 1, 1,
0, 0, 2, 300,
1, 100, 100,
10, false, 20, new Date(1639300003000L),
Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
false, Set())

val list = Seq(driverSummary, execWithSmallestID, execWithSmallestAddTime,
execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks)
execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks,
execWithBiggestAverageDuration)

test("Empty executor list") {
ExecutorRollPolicy.values.foreach { value =>
Expand Down Expand Up @@ -125,4 +135,10 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
test("Policy: FAILED_TASKS") {
assertEquals(Some("5"), plugin.invokePrivate(_choose(list, ExecutorRollPolicy.FAILED_TASKS)))
}

test("Policy: AVERAGE_DURATION") {
assertEquals(
Some("6"),
plugin.invokePrivate(_choose(list, ExecutorRollPolicy.AVERAGE_DURATION)))
}
}

0 comments on commit 1eb5b88

Please sign in to comment.