Skip to content

Commit

Permalink
[SPARK-37659][UI] Fix FsHistoryProvider race condition between list a…
Browse files Browse the repository at this point in the history
…nd delet log info

### What changes were proposed in this pull request?

Add lock between list and delet log info in `FsHistoryProvider.checkForLogs`.

### Why are the changes needed?

After [SPARK-29043](https://issues.apache.org/jira/browse/SPARK-29043), `FsHistoryProvider` will list the log info without waitting all `mergeApplicationListing` task finished.

However the `LevelDBIterator` of list log info is not thread safe if some other threads delete the related log info at same time.

There is the error msg:
```
21/12/15 14:12:02 ERROR FsHistoryProvider: Exception in checking for event log updates
java.util.NoSuchElementException: 1^__main__^+hdfs:https://xxx/application_xxx.inprogress
        at org.apache.spark.util.kvstore.LevelDB.get(LevelDB.java:132)
        at org.apache.spark.util.kvstore.LevelDBIterator.next(LevelDBIterator.java:137)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:184)
        at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:47)
        at scala.collection.TraversableLike.to(TraversableLike.scala:678)
        at scala.collection.TraversableLike.to$(TraversableLike.scala:675)
        at scala.collection.AbstractTraversable.to(Traversable.scala:108)
        at scala.collection.TraversableOnce.toList(TraversableOnce.scala:299)
        at scala.collection.TraversableOnce.toList$(TraversableOnce.scala:299)
        at scala.collection.AbstractTraversable.toList(Traversable.scala:108)
        at org.apache.spark.deploy.history.FsHistoryProvider.checkForLogs(FsHistoryProvider.scala:588)
        at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$startPolling$3(FsHistoryProvider.scala:299)
```

### Does this PR introduce _any_ user-facing change?

yes, bug fix

### How was this patch tested?

manual test, after this patch the exception go away

Closes apache#34919 from ulysses-you/SPARK-37659.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
  • Loading branch information
ulysses-you authored and gengliangwang committed Dec 23, 2021
1 parent 6511dbb commit 29e3e8d
Showing 1 changed file with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
//
// Only entries with valid applications are cleaned up here. Cleaning up invalid log
// files is done by the periodic cleaner task.
val stale = listing.view(classOf[LogInfo])
.index("lastProcessed")
.last(newLastScanTime - 1)
.asScala
.toList
val stale = listing.synchronized {
listing.view(classOf[LogInfo])
.index("lastProcessed")
.last(newLastScanTime - 1)
.asScala
.toList
}
stale.filterNot(isProcessing)
.filterNot(info => notStale.contains(info.logPath))
.foreach { log =>
Expand Down Expand Up @@ -728,7 +730,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
markInaccessible(rootPath)
// SPARK-28157 We should remove this inaccessible entry from the KVStore
// to handle permission-only changes with the same file sizes later.
listing.delete(classOf[LogInfo], rootPath.toString)
listing.synchronized {
listing.delete(classOf[LogInfo], rootPath.toString)
}
case e: Exception =>
logError("Exception while merging application listings", e)
} finally {
Expand Down Expand Up @@ -841,7 +845,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// Fetch the entry first to avoid an RPC when it's already removed.
listing.read(classOf[LogInfo], inProgressLog)
if (!fs.isFile(new Path(inProgressLog))) {
listing.delete(classOf[LogInfo], inProgressLog)
listing.synchronized {
listing.delete(classOf[LogInfo], inProgressLog)
}
}
} catch {
case _: NoSuchElementException =>
Expand Down

0 comments on commit 29e3e8d

Please sign in to comment.