From 36b03e6d6dea6018b3baeb7f1250c235074238cf Mon Sep 17 00:00:00 2001 From: Ajay Borra Date: Fri, 24 Aug 2018 14:36:02 +0530 Subject: [PATCH 1/2] Setting the output committers to default and made committer v2 algorithm default --- build.gradle | 2 - docs/installation/index.md | 10 +++ gradle/spark.gradle | 3 +- helloworld/gradle/spark.gradle | 3 +- templates/simple/spark.gradle | 3 +- .../io/DirectMapreduceOutputCommitter.scala | 63 --------------- .../op/utils/io/DirectOutputCommitter.scala | 76 ------------------- .../op/utils/io/avro/AvroInOut.scala | 2 - 8 files changed, 13 insertions(+), 149 deletions(-) delete mode 100644 utils/src/main/scala/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala delete mode 100644 utils/src/main/scala/com/salesforce/op/utils/io/DirectOutputCommitter.scala diff --git a/build.gradle b/build.gradle index 09b0654334..6508305ebe 100644 --- a/build.gradle +++ b/build.gradle @@ -197,10 +197,8 @@ configure(allProjs) { ignoreFailures = true include '**/*.java', '**/*.scala' exclude '**/org/apache/spark/ml/SparkDefaultParamsReadWrite.scala', - '**/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala', '**/com/salesforce/op/test/TestSparkContext.scala', '**/com/salesforce/op/test/TempDirectoryTest.scala', - '**/com/salesforce/op/utils/io/DirectOutputCommitter.scala', '**/com/salesforce/op/stages/impl/tuning/OpCrossValidation.scala', '**/com/salesforce/op/stages/impl/tuning/OpTrainValidationSplit.scala', '**/com/salesforce/op/test/*.java', diff --git a/docs/installation/index.md b/docs/installation/index.md index c1b0d9b469..0415b9885f 100644 --- a/docs/installation/index.md +++ b/docs/installation/index.md @@ -5,3 +5,13 @@ * Clone the TransmogrifAI repo: `git clone https://github.com/salesforce/TransmogrifAI.git` * Build the project: `cd TransmogrifAI && ./gradlew compileTestScala installDist` * Start hacking + +# (Optional) Configuration + +## Custom Output Committer's + +Depending on the deployment approach, we can choose to implement/use customized OutputCommitter classes. Following properties can be configured to override default classes and use customized output committer classes. +* `spark.hadoop.mapred.output.committer.class` +* `spark.hadoop.spark.sql.sources.outputCommitterClass` + +* [S3A Committer](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/committers.html), [Cloud Integration](https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/cloud-integration.html#configuring) guides provide more details on the topic. diff --git a/gradle/spark.gradle b/gradle/spark.gradle index 69f96cdb33..60599d53e9 100644 --- a/gradle/spark.gradle +++ b/gradle/spark.gradle @@ -70,8 +70,7 @@ task sparkSubmit(type: Exec, dependsOn: copyLog4jToSpark) { "spark.hadoop.avro.output.codec=deflate", "spark.hadoop.avro.mapred.deflate.level=6", "spark.hadoop.validateOutputSpecs=false", - "spark.hadoop.mapred.output.committer.class=com.salesforce.op.utils.io.DirectOutputCommitter", - "spark.hadoop.spark.sql.sources.outputCommitterClass=com.salesforce.op.utils.io.DirectMapreduceOutputCommitter" + "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" ].collect { ["--conf", it] }.flatten() environment SPARK_HOME: sparkHome diff --git a/helloworld/gradle/spark.gradle b/helloworld/gradle/spark.gradle index 69f96cdb33..60599d53e9 100644 --- a/helloworld/gradle/spark.gradle +++ b/helloworld/gradle/spark.gradle @@ -70,8 +70,7 @@ task sparkSubmit(type: Exec, dependsOn: copyLog4jToSpark) { "spark.hadoop.avro.output.codec=deflate", "spark.hadoop.avro.mapred.deflate.level=6", "spark.hadoop.validateOutputSpecs=false", - "spark.hadoop.mapred.output.committer.class=com.salesforce.op.utils.io.DirectOutputCommitter", - "spark.hadoop.spark.sql.sources.outputCommitterClass=com.salesforce.op.utils.io.DirectMapreduceOutputCommitter" + "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" ].collect { ["--conf", it] }.flatten() environment SPARK_HOME: sparkHome diff --git a/templates/simple/spark.gradle b/templates/simple/spark.gradle index 9784c283a9..3cfb281230 100644 --- a/templates/simple/spark.gradle +++ b/templates/simple/spark.gradle @@ -197,8 +197,7 @@ task sparkSubmit(dependsOn: copyLog4jToSparkNoInstall) { "spark.hadoop.avro.output.codec=deflate", "spark.hadoop.avro.mapred.deflate.level=6", "spark.hadoop.validateOutputSpecs=false", - "spark.hadoop.mapred.output.committer.class=com.salesforce.op.utils.io.DirectOutputCommitter", - "spark.hadoop.spark.sql.sources.outputCommitterClass=com.salesforce.op.utils.io.DirectMapreduceOutputCommitter" + "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2" ].collect { ["--conf", it] }.flatten() def hadoopConfDir = System.env.HOME + "/.fake_hadoop_conf" diff --git a/utils/src/main/scala/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala b/utils/src/main/scala/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala deleted file mode 100644 index e9c16c5e09..0000000000 --- a/utils/src/main/scala/com/salesforce/op/utils/io/DirectMapreduceOutputCommitter.scala +++ /dev/null @@ -1,63 +0,0 @@ -// scalastyle:off header.matches -/* - * Modifications: (c) 2017, Salesforce.com, Inc. - * Copyright 2015 Databricks, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.salesforce.op.utils.io - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat} - -class DirectMapreduceOutputCommitter extends OutputCommitter { - - override def setupJob(jobContext: JobContext): Unit = {} - - override def setupTask(taskContext: TaskAttemptContext): Unit = {} - - override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = { - // We return true here to guard against implementations that do not handle false correctly. - // The meaning of returning false is not entirely clear, so it's possible to be interpreted - // as an error. Returning true just means that commitTask() will be called, which is a no-op. - true - } - - override def commitTask(taskContext: TaskAttemptContext): Unit = {} - - override def abortTask(taskContext: TaskAttemptContext): Unit = {} - - /** - * Creates a _SUCCESS file to indicate the entire job was successful. - * This mimics the behavior of FileOutputCommitter, reusing the same file name and conf option. - */ - override def commitJob(context: JobContext): Unit = { - val conf = context.getConfiguration - if (shouldCreateSuccessFile(conf)) { - val outputPath = FileOutputFormat.getOutputPath(context) - if (outputPath != null) { - val fileSys = outputPath.getFileSystem(conf) - val filePath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME) - fileSys.create(filePath).close() - } - } - } - - /** By default, we do create the _SUCCESS file, but we allow it to be turned off. */ - private def shouldCreateSuccessFile(conf: Configuration): Boolean = { - conf.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true) - } -} diff --git a/utils/src/main/scala/com/salesforce/op/utils/io/DirectOutputCommitter.scala b/utils/src/main/scala/com/salesforce/op/utils/io/DirectOutputCommitter.scala deleted file mode 100644 index 83f89c14ae..0000000000 --- a/utils/src/main/scala/com/salesforce/op/utils/io/DirectOutputCommitter.scala +++ /dev/null @@ -1,76 +0,0 @@ -// scalastyle:off header.matches -/* - * Modifications: (c) 2017, Salesforce.com, Inc. - * Copyright 2015 Databricks, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package com.salesforce.op.utils.io - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapred._ - -/** - * OutputCommitter suitable for S3 workloads. Unlike the usual FileOutputCommitter, which - * writes files to a _temporary/ directory before renaming them to their final location, this - * simply writes directly to the final location. - * - * The FileOutputCommitter is required for HDFS + speculation, which allows only one writer at - * a time for a file (so two people racing to write the same file would not work). However, S3 - * supports multiple writers outputting to the same file, where visibility is guaranteed to be - * atomic. This is a monotonic operation: all writers should be writing the same data, so which - * one wins is immaterial. - * - * Code adapted from Ian Hummel's code from this PR: - * https://github.com/themodernlife/spark/commit/4359664b1d557d55b0579023df809542386d5b8c - */ -class DirectOutputCommitter extends OutputCommitter { - - override def setupJob(jobContext: JobContext): Unit = {} - - override def setupTask(taskContext: TaskAttemptContext): Unit = {} - - override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = { - // We return true here to guard against implementations that do not handle false correctly. - // The meaning of returning false is not entirely clear, so it's possible to be interpreted - // as an error. Returning true just means that commitTask() will be called, which is a no-op. - true - } - - override def commitTask(taskContext: TaskAttemptContext): Unit = {} - - override def abortTask(taskContext: TaskAttemptContext): Unit = {} - - /** - * Creates a _SUCCESS file to indicate the entire job was successful. - * This mimics the behavior of FileOutputCommitter, reusing the same file name and conf option. - */ - override def commitJob(context: JobContext): Unit = { - val conf = context.getJobConf - if (shouldCreateSuccessFile(conf)) { - val outputPath = FileOutputFormat.getOutputPath(conf) - if (outputPath != null) { - val fileSys = outputPath.getFileSystem(conf) - val filePath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME) - fileSys.create(filePath).close() - } - } - } - - /** By default, we do create the _SUCCESS file, but we allow it to be turned off. */ - private def shouldCreateSuccessFile(conf: JobConf): Boolean = { - conf.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true) - } -} diff --git a/utils/src/main/scala/com/salesforce/op/utils/io/avro/AvroInOut.scala b/utils/src/main/scala/com/salesforce/op/utils/io/avro/AvroInOut.scala index 155e0e1d35..4623840875 100644 --- a/utils/src/main/scala/com/salesforce/op/utils/io/avro/AvroInOut.scala +++ b/utils/src/main/scala/com/salesforce/op/utils/io/avro/AvroInOut.scala @@ -32,7 +32,6 @@ package com.salesforce.op.utils.io.avro import java.net.URI -import com.salesforce.op.utils.io.DirectOutputCommitter import com.salesforce.op.utils.spark.RichRDD._ import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord @@ -156,7 +155,6 @@ object AvroInOut { private def createJobConfFromContext(schema: String)(implicit sc: SparkSession) = { val jobConf = new JobConf(sc.sparkContext.hadoopConfiguration) - jobConf.setOutputCommitter(classOf[DirectOutputCommitter]) AvroJob.setOutputSchema(jobConf, new Schema.Parser().parse(schema)) jobConf } From 8405f15a9cf1c4d71e5e6ef5b208c13fad2da908 Mon Sep 17 00:00:00 2001 From: Ajay Borra Date: Sat, 25 Aug 2018 00:18:31 +0530 Subject: [PATCH 2/2] Made cr changes --- docs/installation/index.md | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/docs/installation/index.md b/docs/installation/index.md index 0415b9885f..c1b0d9b469 100644 --- a/docs/installation/index.md +++ b/docs/installation/index.md @@ -5,13 +5,3 @@ * Clone the TransmogrifAI repo: `git clone https://github.com/salesforce/TransmogrifAI.git` * Build the project: `cd TransmogrifAI && ./gradlew compileTestScala installDist` * Start hacking - -# (Optional) Configuration - -## Custom Output Committer's - -Depending on the deployment approach, we can choose to implement/use customized OutputCommitter classes. Following properties can be configured to override default classes and use customized output committer classes. -* `spark.hadoop.mapred.output.committer.class` -* `spark.hadoop.spark.sql.sources.outputCommitterClass` - -* [S3A Committer](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/committers.html), [Cloud Integration](https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/cloud-integration.html#configuring) guides provide more details on the topic.