Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable features via config that are off by default in the profiler AutoTuner #668

Merged
merged 5 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ class RecommendationEntry(val name: String,
class AutoTuner(
val clusterProps: ClusterProperties,
val appInfoProvider: AppSummaryInfoBaseProvider,
val platform: Platform) extends Logging {
val platform: Platform,
unsupportedOperators: Seq[DriverLogUnsupportedOperators]) extends Logging {

import AutoTuner._

Expand All @@ -350,11 +351,9 @@ class AutoTuner(
}

def getPropertyValue(key: String): Option[String] = {
val fromProfile = appInfoProvider.getProperty(key)
fromProfile match {
case None => Option(clusterProps.softwareProperties.get(key))
case Some(_) => fromProfile
}
val fromProfile = Option(appInfoProvider).flatMap(_.getProperty(key))
// If the value is not found above, fallback to cluster properties
fromProfile.orElse(Option(clusterProps.softwareProperties.get(key)))
}

def initRecommendations(): Unit = {
Expand Down Expand Up @@ -819,6 +818,20 @@ class AutoTuner(
appendRecommendation("spark.sql.shuffle.partitions", s"$shufflePartitions")
}

/**
* Analyzes unsupported driver logs and generates recommendations for configuration properties.
*/
private def recommendFromDriverLogs(): Unit = {
// Iterate through unsupported operators' reasons and check for matching properties
unsupportedOperators.map(_.reason).foreach { operatorReason =>
recommendationsFromDriverLogs.collect {
case (config, recommendedValue) if operatorReason.contains(config) =>
appendRecommendation(config, recommendedValue)
appendComment(commentForExperimentalConfig(config))
}
}
}

def appendOptionalComment(lookup: String, comment: String): Unit = {
if (!skippedRecommendations.contains(lookup)) {
appendComment(comment)
Expand Down Expand Up @@ -921,6 +934,9 @@ class AutoTuner(
case (property, value) => appendRecommendation(property, value)
}
}
if (unsupportedOperators.nonEmpty) {
recommendFromDriverLogs()
}
(toRecommendationsProfileResult, toCommentProfileResult)
}
}
Expand Down Expand Up @@ -970,6 +986,8 @@ object AutoTuner extends Logging {
val DEF_READ_SIZE_THRESHOLD = 100 * 1024L * 1024L * 1024L
val DEFAULT_WORKER_INFO_PATH = "./worker_info.yaml"
val SUPPORTED_SIZE_UNITS: Seq[String] = Seq("b", "k", "m", "g", "t", "p")
private val DOC_URL: String = "https://nvidia.github.io/spark-rapids/docs/" +
"additional-functionality/advanced_configs.html#advanced-configuration"

val commentsForMissingProps: Map[String, String] = Map(
"spark.executor.memory" ->
Expand Down Expand Up @@ -1017,15 +1035,27 @@ object AutoTuner extends Logging {
" If the Spark RAPIDS jar is being bundled with your Spark\n" +
" distribution, this step is not needed.")
)

// Recommended values for specific unsupported configurations
private val recommendationsFromDriverLogs: Map[String, String] = Map(
"spark.rapids.sql.incompatibleDateFormats.enabled" -> "true"
)

def commentForExperimentalConfig(config: String): String = {
s"Using $config does not guarantee to produce the same results as CPU. " +
s"Please refer to $DOC_URL."
}

// the plugin jar is in the form of rapids-4-spark_scala_binary-(version)-*.jar
val pluginJarRegEx: Regex = "rapids-4-spark_\\d\\.\\d+-(\\d{2}\\.\\d{2}\\.\\d+).*\\.jar".r

private def handleException(
ex: Exception,
appInfo: AppSummaryInfoBaseProvider,
platform: Platform): AutoTuner = {
platform: Platform,
unsupportedOperators: Seq[DriverLogUnsupportedOperators]): AutoTuner = {
logError("Exception: " + ex.getStackTrace.mkString("Array(", ", ", ")"))
val tuning = new AutoTuner(new ClusterProperties(), appInfo, platform)
val tuning = new AutoTuner(new ClusterProperties(), appInfo, platform, unsupportedOperators)
val msg = ex match {
case cEx: ConstructorException => cEx.getContext
case _ => if (ex.getCause != null) ex.getCause.toString else ex.toString
Expand Down Expand Up @@ -1075,26 +1105,30 @@ object AutoTuner extends Logging {
def buildAutoTunerFromProps(
clusterProps: String,
singleAppProvider: AppSummaryInfoBaseProvider,
platform: Platform = PlatformFactory.createInstance()): AutoTuner = {
platform: Platform = PlatformFactory.createInstance(),
unsupportedOperators: Seq[DriverLogUnsupportedOperators] = Seq.empty): AutoTuner = {
try {
val clusterPropsOpt = loadClusterPropertiesFromContent(clusterProps)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform,
unsupportedOperators)
} catch {
case e: Exception =>
handleException(e, singleAppProvider, platform)
handleException(e, singleAppProvider, platform, unsupportedOperators)
}
}

def buildAutoTuner(
filePath: String,
singleAppProvider: AppSummaryInfoBaseProvider,
platform: Platform = PlatformFactory.createInstance()): AutoTuner = {
platform: Platform = PlatformFactory.createInstance(),
unsupportedOperators: Seq[DriverLogUnsupportedOperators] = Seq.empty): AutoTuner = {
try {
val clusterPropsOpt = loadClusterProps(filePath)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform)
new AutoTuner(clusterPropsOpt.getOrElse(new ClusterProperties()), singleAppProvider, platform,
unsupportedOperators)
} catch {
case e: Exception =>
handleException(e, singleAppProvider, platform)
handleException(e, singleAppProvider, platform, unsupportedOperators)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.nvidia.spark.rapids.tool.profiling

import com.nvidia.spark.rapids.tool.{PlatformFactory, PlatformNames}
import com.nvidia.spark.rapids.tool.PlatformNames
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ object ProfileMain extends Logging {
}

val profiler = new Profiler(hadoopConf, appArgs, enablePB)
profiler.profile(eventLogFsFiltered)
if (driverLog.nonEmpty){
profiler.profileDriver(driverLog)
if (driverLog.nonEmpty) {
profiler.profileDriver(driverLog, eventLogFsFiltered.isEmpty)
}
profiler.profile(eventLogFsFiltered)
(0, filteredLogs.size)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,21 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
progressBar.foreach(_.finishAll())
}

def profileDriver(driverLogInfos: String): Unit = {
def profileDriver(driverLogInfos: String, eventLogsEmpty: Boolean): Unit = {
nartal1 marked this conversation as resolved.
Show resolved Hide resolved
val profileOutputWriter = new ProfileOutputWriter(s"$outputDir/driver",
Profiler.DRIVER_LOG_NAME, numOutputRows, true)

try {
val driverLogProcessor = new DriverLogProcessor(driverLogInfos)
val unsupportedDrivers = driverLogProcessor.processDriverLog()
val unsupportedDriverOperators = driverLogProcessor.processDriverLog()
profileOutputWriter.write(s"Unsupported operators in driver log",
unsupportedDrivers)
unsupportedDriverOperators)
if (eventLogsEmpty && useAutoTuner) {
// Since event logs are empty, AutoTuner will not run while processing event logs.
// We need to run it here explicitly.
val (properties, comments) = runAutoTuner(None, unsupportedDriverOperators)
profileOutputWriter.writeText("\n### A. Recommended Configuration ###\n")
profileOutputWriter.writeText(Profiler.getAutoTunerResultsAsString(properties, comments))
}
} finally {
profileOutputWriter.close()
}
Expand Down Expand Up @@ -403,6 +409,26 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
appLogPath, ioAnalysisMetrics), compareRes)
}

/**
* A wrapper method to run the AutoTuner.
* @param appInfo Summary of the application for tuning.
* @param unsupportedDriverOperators List of unsupported operators from driver log
*/
private def runAutoTuner(appInfo: Option[ApplicationSummaryInfo],
unsupportedDriverOperators: Seq[DriverLogUnsupportedOperators])
: (Seq[RecommendedPropertyResult], Seq[RecommendedCommentResult]) = {
val appInfoProvider = appInfo.map(new SingleAppSummaryInfoProvider(_)).orNull
val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH)
val platform = appArgs.platform()
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(workerInfoPath, appInfoProvider,
PlatformFactory.createInstance(platform), unsupportedDriverOperators)

// The autotuner allows skipping some properties,
// e.g., getRecommendedProperties(Some(Seq("spark.executor.instances"))) skips the
// recommendation related to executor instances.
autoTuner.getRecommendedProperties()
}

def writeOutput(profileOutputWriter: ProfileOutputWriter,
appsSum: Seq[ApplicationSummaryInfo], outputCombined: Boolean,
comparedRes: Option[CompareSummaryInfo] = None): Unit = {
Expand Down Expand Up @@ -464,7 +490,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
} else {
appsSum
}
sums.foreach { app =>
sums.foreach { app: ApplicationSummaryInfo =>
profileOutputWriter.writeText("### A. Information Collected ###")
profileOutputWriter.write("Application Information", app.appInfo)
profileOutputWriter.write("Application Log Path Mapping", app.appLogPath)
Expand Down Expand Up @@ -510,15 +536,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea
Some("Unsupported SQL Ops"))

if (useAutoTuner) {
val workerInfoPath = appArgs.workerInfo.getOrElse(AutoTuner.DEFAULT_WORKER_INFO_PATH)
val platform = appArgs.platform()
val autoTuner: AutoTuner = AutoTuner.buildAutoTuner(workerInfoPath,
new SingleAppSummaryInfoProvider(app),
PlatformFactory.createInstance(platform))
// the autotuner allows skipping some properties
// e.g. getRecommendedProperties(Some(Seq("spark.executor.instances"))) skips the
// recommendation related to executor instances.
val (properties, comments) = autoTuner.getRecommendedProperties()
val (properties, comments) = runAutoTuner(Some(app), Seq.empty)
profileOutputWriter.writeText("\n### D. Recommended Configuration ###\n")
profileOutputWriter.writeText(Profiler.getAutoTunerResultsAsString(properties, comments))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.nvidia.spark.rapids.tool.qualification

import com.nvidia.spark.rapids.tool.{PlatformFactory, PlatformNames}
import com.nvidia.spark.rapids.tool.PlatformNames
import org.rogach.scallop.{ScallopConf, ScallopOption}
import org.rogach.scallop.exceptions.ScallopException

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1359,4 +1359,152 @@ class AutoTunerSuite extends FunSuite with BeforeAndAfterEach with Logging {
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("Recommendations generated for unsupported operators from driver logs only") {
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "8",
"spark.executor.memory" -> "47222m",
"spark.rapids.sql.concurrentGpuTasks" -> "2",
"spark.task.resource.gpu.amount" -> "0.0625")
val unsupportedDriverOperators = Seq(
DriverLogUnsupportedOperators(
"FromUnixTime", 1,
"Only UTC zone id is supported. Actual default zone id: America/Los_Angeles; " +
"CORRECTED format 'yyyyMMdd' on the GPU is not guaranteed to produce the same " +
"results as Spark on CPU. Set spark.rapids.sql.incompatibleDateFormats.enabled=true " +
"to force onto GPU.")
)
val workerInfo = buildWorkerInfoAsString(Some(customProps))
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, null,
PlatformFactory.createInstance(), unsupportedDriverOperators)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|
|Spark Properties:
|--conf spark.rapids.sql.incompatibleDateFormats.enabled=true
|
|Comments:
|- 'spark.rapids.sql.incompatibleDateFormats.enabled' was not set.
|- AutoTuner recommendations only support eventlogs generated by Spark applications utilizing RAPIDS Accelerator for Apache Spark
|- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html
|- ${AutoTuner.commentForExperimentalConfig("spark.rapids.sql.incompatibleDateFormats.enabled")}
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("Recommendations generated for unsupported operators from driver and event logs") {
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "8",
"spark.executor.memory" -> "47222m",
"spark.rapids.sql.concurrentGpuTasks" -> "2",
"spark.task.resource.gpu.amount" -> "0.0625")
val unsupportedDriverOperators = Seq(
DriverLogUnsupportedOperators(
"FromUnixTime", 1,
"Only UTC zone id is supported. Actual default zone id: America/Los_Angeles; " +
"CORRECTED format 'yyyyMMdd' on the GPU is not guaranteed to produce the same " +
"results as Spark on CPU. Set spark.rapids.sql.incompatibleDateFormats.enabled=true " +
"to force onto GPU.")
)
val workerInfo = buildWorkerInfoAsString(Some(customProps))
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo,
getGpuAppMockInfoProvider, PlatformFactory.createInstance(), unsupportedDriverOperators)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|
|Spark Properties:
|--conf spark.executor.cores=16
|--conf spark.executor.instances=8
|--conf spark.executor.memory=32768m
|--conf spark.executor.memoryOverhead=8396m
|--conf spark.rapids.memory.pinnedPool.size=4096m
|--conf spark.rapids.shuffle.multiThreaded.reader.threads=16
|--conf spark.rapids.shuffle.multiThreaded.writer.threads=16
|--conf spark.rapids.sql.incompatibleDateFormats.enabled=true
|--conf spark.rapids.sql.multiThreadedRead.numThreads=20
|--conf spark.shuffle.manager=com.nvidia.spark.rapids.spark311.RapidsShuffleManager
|--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=128m
|--conf spark.sql.adaptive.coalescePartitions.minPartitionNum=128
|--conf spark.sql.files.maxPartitionBytes=512m
|--conf spark.sql.shuffle.partitions=200
|
|Comments:
|- 'spark.executor.instances' was not set.
|- 'spark.executor.memoryOverhead' was not set.
|- 'spark.rapids.memory.pinnedPool.size' was not set.
|- 'spark.rapids.shuffle.multiThreaded.reader.threads' was not set.
|- 'spark.rapids.shuffle.multiThreaded.writer.threads' was not set.
|- 'spark.rapids.sql.incompatibleDateFormats.enabled' was not set.
|- 'spark.rapids.sql.multiThreadedRead.numThreads' was not set.
|- 'spark.shuffle.manager' was not set.
|- 'spark.sql.adaptive.advisoryPartitionSizeInBytes' was not set.
|- 'spark.sql.adaptive.coalescePartitions.minPartitionNum' was not set.
|- 'spark.sql.adaptive.enabled' should be enabled for better performance.
|- 'spark.sql.files.maxPartitionBytes' was not set.
|- 'spark.sql.shuffle.partitions' was not set.
|- ${AutoTuner.classPathComments("rapids.jars.missing")}
|- ${AutoTuner.classPathComments("rapids.shuffle.jars")}
|- ${AutoTuner.commentForExperimentalConfig("spark.rapids.sql.incompatibleDateFormats.enabled")}
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}


test("Recommendations generated for empty unsupported operators from driver logs only") {
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "8",
"spark.executor.memory" -> "47222m",
"spark.rapids.sql.concurrentGpuTasks" -> "2",
"spark.task.resource.gpu.amount" -> "0.0625")
val workerInfo = buildWorkerInfoAsString(Some(customProps))
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, null,
PlatformFactory.createInstance(), Seq.empty)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|Cannot recommend properties. See Comments.
|
|Comments:
|- AutoTuner recommendations only support eventlogs generated by Spark applications utilizing RAPIDS Accelerator for Apache Spark
|- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}

test("Recommendations not generated for unsupported operators from driver logs") {
// This test does not generate any recommendations for the unsupported operator 'Literal'
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "8",
"spark.executor.memory" -> "47222m",
"spark.rapids.sql.concurrentGpuTasks" -> "2",
"spark.task.resource.gpu.amount" -> "0.0625")
val unsupportedDriverOperators = Seq(
DriverLogUnsupportedOperators(
"Literal", 3,
"expression Literal 1700518632630000 produces an unsupported type TimestampType")
)
val workerInfo = buildWorkerInfoAsString(Some(customProps))
val autoTuner: AutoTuner = AutoTuner.buildAutoTunerFromProps(workerInfo, null,
PlatformFactory.createInstance(), unsupportedDriverOperators)
val (properties, comments) = autoTuner.getRecommendedProperties()
val autoTunerOutput = Profiler.getAutoTunerResultsAsString(properties, comments)
// scalastyle:off line.size.limit
val expectedResults =
s"""|Cannot recommend properties. See Comments.
|
|Comments:
|- AutoTuner recommendations only support eventlogs generated by Spark applications utilizing RAPIDS Accelerator for Apache Spark
|- RAPIDS Accelerator for Apache Spark jar is missing in "spark.plugins". Please refer to https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/overview.html
|""".stripMargin
// scalastyle:on line.size.limit
assert(expectedResults == autoTunerOutput)
}
}
Loading