From e4b8a9219a73291d9e1dbc22274ef97f46898f25 Mon Sep 17 00:00:00 2001 From: "adamchit1996@gmail.com" Date: Fri, 20 Sep 2019 15:46:22 -0700 Subject: [PATCH 01/16] refactored maxTrainingSample get and set function so that all classes that inherit from splitter can use them --- .../op/stages/impl/tuning/DataBalancer.scala | 18 ------------------ .../op/stages/impl/tuning/Splitter.scala | 18 ++++++++++++++++++ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataBalancer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataBalancer.scala index 13ba9aea22..5970b4cc70 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataBalancer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataBalancer.scala @@ -338,24 +338,6 @@ trait DataBalancerParams extends Params { def getSampleFraction: Double = $(sampleFraction) - /** - * Maximum size of dataset want to train on. - * Value should be > 0. - * Default is 5000. - * - * @group param - */ - final val maxTrainingSample = new IntParam(this, "maxTrainingSample", - "maximum size of dataset want to train on", ParamValidators.inRange( - lowerBound = 0, upperBound = 1 << 30, lowerInclusive = false, upperInclusive = true - ) - ) - setDefault(maxTrainingSample, SplitterParamsDefault.MaxTrainingSampleDefault) - - def setMaxTrainingSample(value: Int): this.type = set(maxTrainingSample, value) - - def getMaxTrainingSample: Int = $(maxTrainingSample) - /** * Fraction to sample minority data * Value should be > 0.0 diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala index 99734a1fa1..3808e28a03 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala @@ -129,6 +129,24 @@ trait SplitterParams extends Params { def setReserveTestFraction(value: Double): this.type = set(reserveTestFraction, value) def getReserveTestFraction: Double = $(reserveTestFraction) + /** + * Maximum size of dataset want to train on. + * Value should be > 0. + * Default is 1000000. + * + * @group param + */ + final val maxTrainingSample = new IntParam(this, "maxTrainingSample", + "maximum size of dataset want to train on", ParamValidators.inRange( + lowerBound = 0, upperBound = 1 << 30, lowerInclusive = false, upperInclusive = true + ) + ) + setDefault(maxTrainingSample, SplitterParamsDefault.MaxTrainingSampleDefault) + + def setMaxTrainingSample(value: Int): this.type = set(maxTrainingSample, value) + + def getMaxTrainingSample: Int = $(maxTrainingSample) + final val labelColumnName = new Param[String](this, "labelColumnName", "label column name, column 0 if not specified") private[op] def getLabelColumnName = $(labelColumnName) From 217025471c610b2c6ebfbc786bab2f3050f49b94 Mon Sep 17 00:00:00 2001 From: "adamchit1996@gmail.com" Date: Fri, 20 Sep 2019 15:58:41 -0700 Subject: [PATCH 02/16] added downsampling logic if MaxTrainingSample reached --- .../op/stages/impl/tuning/DataSplitter.scala | 59 ++++++++++++++++--- .../op/stages/impl/tuning/Splitter.scala | 4 +- 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala index 7d0511a2db..e9893618c6 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala @@ -31,8 +31,9 @@ package com.salesforce.op.stages.impl.tuning import com.salesforce.op.UID +import com.salesforce.op.stages.impl.selector.ModelSelectorNames import org.apache.spark.ml.param._ -import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.types.{Metadata, MetadataBuilder} case object DataSplitter { @@ -46,11 +47,13 @@ case object DataSplitter { */ def apply( seed: Long = SplitterParamsDefault.seedDefault, - reserveTestFraction: Double = SplitterParamsDefault.ReserveTestFractionDefault + reserveTestFraction: Double = SplitterParamsDefault.ReserveTestFractionDefault, + maxTrainingSample: Int = SplitterParamsDefault.MaxTrainingSampleDefault ): DataSplitter = { new DataSplitter() .setSeed(seed) .setReserveTestFraction(reserveTestFraction) + .setMaxTrainingSample(maxTrainingSample) } } @@ -59,30 +62,69 @@ case object DataSplitter { * * @param uid */ -class DataSplitter(uid: String = UID[DataSplitter]) extends Splitter(uid = uid) { +class DataSplitter(uid: String = UID[DataSplitter]) extends Splitter(uid = uid) with DataSplitterParams { /** - * Function to set parameters before passing into the validation step - * eg - do data balancing or dropping based on the labels + * Function to set the down sampling fraction and parameters before passing into the validation step * * @param data * @return Parameters set in examining data */ override def preValidationPrepare(data: Dataset[Row]): PrevalidationVal = { - summary = Option(DataSplitterSummary()) + val dataSetSize = data.count().toDouble + val sampleF = getMaxTrainingSample / dataSetSize + val DownSampleFraction = if (sampleF < 1) sampleF else 1 + summary = Option(DataSplitterSummary(DownSampleFraction)) + setDownSampleFraction(DownSampleFraction) PrevalidationVal(summary, None) } + /** + * Rebalance the training data within the validation step + * + * @param data to prepare for model training. first column must be the label as a double + * @return balanced training set and a test set + */ + override def validationPrepare(data: Dataset[Row]): Dataset[Row] = { + + val dataPrep = super.validationPrepare(data) + + // check if down sampling is needed + val balanced: DataFrame = if (getDownSampleFraction < 1) { + dataPrep.sample( false, getDownSampleFraction, getSeed) + } else { + dataPrep + } + balanced.persist() + } override def copy(extra: ParamMap): DataSplitter = { val copy = new DataSplitter(uid) copyValues(copy, extra) } } +trait DataSplitterParams extends Params { + /** + * Fraction to down sample data + * Value should be in [0.0, 1.0] + * + * @group param + */ + private[op] final val downSampleFraction = new DoubleParam(this, "downSampleFraction", + "fraction to down sample data", ParamValidators.inRange( + lowerBound = 0.0, upperBound = 1.0, lowerInclusive = false, upperInclusive = true + ) + ) + + private[op] def setDownSampleFraction(value: Double): this.type = set(downSampleFraction, value) + + private[op] def getDownSampleFraction: Double = $(downSampleFraction) +} /** - * Empty class because no summary information for a data splitter + * Summary for data splitter run for storage in metadata + * @param downSamplingFraction down sampling fraction for training set */ -case class DataSplitterSummary() extends SplitterSummary { +case class DataSplitterSummary(downSamplingFraction: Double) extends SplitterSummary { /** * Converts to [[Metadata]] @@ -94,6 +136,7 @@ case class DataSplitterSummary() extends SplitterSummary { def toMetadata(skipUnsupported: Boolean): Metadata = { new MetadataBuilder() .putString(SplitterSummary.ClassName, this.getClass.getName) + .putDouble(ModelSelectorNames.DownSample, downSamplingFraction) .build() } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala index 3808e28a03..110dbdfa1f 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala @@ -170,7 +170,9 @@ private[op] object SplitterSummary { def fromMetadata(metadata: Metadata): Try[SplitterSummary] = Try { metadata.getString(ClassName) match { - case s if s == classOf[DataSplitterSummary].getName => DataSplitterSummary() + case s if s == classOf[DataSplitterSummary].getName => DataSplitterSummary( + downSamplingFraction = metadata.getDouble(ModelSelectorNames.DownSample) + ) case s if s == classOf[DataBalancerSummary].getName => DataBalancerSummary( positiveLabels = metadata.getLong(ModelSelectorNames.Positive), negativeLabels = metadata.getLong(ModelSelectorNames.Negative), From dff09b9f25d2d5e99e5762f3a256e7245ac21a8a Mon Sep 17 00:00:00 2001 From: "adamchit1996@gmail.com" Date: Fri, 20 Sep 2019 16:02:58 -0700 Subject: [PATCH 03/16] added unit tests for downsampling in regression data splitter --- .../stages/impl/tuning/DataSplitterTest.scala | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala index b7768e8959..ed30355232 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala @@ -43,6 +43,7 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma val seed = 1234L val dataCount = 1000 + val MaxTrainingSampleDefault = 1E6.toInt val data = RandomRDDs.normalVectorRDD(sc, 1000, 3, seed = seed) @@ -56,6 +57,32 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma train.count() shouldBe dataCount } + it should "down-sample when the data count is above the max allowed" in { + val numRows = 1E6.toInt + 1E6.toInt + val data = + RandomRDDs.normalVectorRDD(sc, numRows, 3, seed = seed) + .map(v => (1.0, Vectors.dense(v.toArray), "A")).toDF() + dataSplitter.preValidationPrepare(data) + val dataBalanced = dataSplitter.validationPrepare(data) + + // validationPrepare calls the data sample method that samples the data to a target ratio but there is an epsilon + // to how precise this function is which is why we need to check around that epsilon + val samplingErrorEpsilon = (0.1 * MaxTrainingSampleDefault).toInt + + dataBalanced.count() shouldBe MaxTrainingSampleDefault.toLong +- samplingErrorEpsilon.toLong + } + + it should "set and get maxTrainingSample" in { + val numRows = 2000 + val maxRows = numRows / 2 + + dataSplitter + .setReserveTestFraction(0.0) + .setMaxTrainingSample(maxRows) + + dataSplitter.getMaxTrainingSample shouldBe maxRows + } + it should "split the data in the appropriate proportion - 0.2" in { val (train, test) = dataSplitter.setReserveTestFraction(0.2).split(data) math.abs(test.count() - 200) < 30 shouldBe true @@ -71,8 +98,10 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma it should "keep the data unchanged when prepare is called" in { val summary = dataSplitter.preValidationPrepare(data) val train = dataSplitter.validationPrepare(data) + val sampleF = MaxTrainingSampleDefault / dataCount.toDouble + val downSampleFraction = if (sampleF < 1) sampleF else 1 train.collect().zip(data.collect()).foreach { case (a, b) => a shouldBe b } - assertDataSplitterSummary(summary.summaryOpt) { s => s shouldBe DataSplitterSummary() } + assertDataSplitterSummary(summary.summaryOpt) { s => s shouldBe DataSplitterSummary(downSampleFraction) } } } From 14c6b428738d23b926cca6b23cfd1f9f54ca80cc Mon Sep 17 00:00:00 2001 From: "adamchit1996@gmail.com" Date: Fri, 20 Sep 2019 16:12:06 -0700 Subject: [PATCH 04/16] added integration tests to test downsampling from the model selector level --- .../RegressionModelSelectorTest.scala | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala index 5833dd55c7..7e2e1bce5b 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala @@ -37,7 +37,7 @@ import com.salesforce.op.stages.impl.CompareParamGrid import com.salesforce.op.stages.impl.regression.{RegressionModelsToTry => RMT} import com.salesforce.op.stages.impl.selector.ModelSelectorNames.EstimatorType import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelectorSummary} -import com.salesforce.op.stages.impl.tuning.BestEstimator +import com.salesforce.op.stages.impl.tuning.{BestEstimator, DataSplitter} import com.salesforce.op.test.TestSparkContext import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.RichMetadata._ @@ -62,6 +62,7 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext with CompareParamGrid with OpXGBoostQuietLogging { val seed = 1234L val stageNames = "label_prediction" + val dataCount = 200 import spark.implicits._ val rand = new Random(seed) @@ -120,9 +121,27 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext it should "set the data splitting params correctly" in { val modelSelector = RegressionModelSelector() - modelSelector.splitter.get.setReserveTestFraction(0.1).setSeed(11L) + modelSelector.splitter.get.setReserveTestFraction(0.1).setSeed(11L).setMaxTrainingSample(1000) + modelSelector.splitter.get.getSeed shouldBe 11L modelSelector.splitter.get.getReserveTestFraction shouldBe 0.1 + modelSelector.splitter.get.getMaxTrainingSample shouldBe 1000 + } + + it should "set maxTrainingSample and down-sample" in { + + implicit val vectorEncoder: org.apache.spark.sql.Encoder[Vector] = ExpressionEncoder() + implicit val e1 = Encoders.tuple(Encoders.scalaDouble, vectorEncoder) + val maxTrainingSample = 100 + val sampleF = maxTrainingSample / dataCount.toDouble + val downSampleFraction = if (sampleF < 1) sampleF else 1 + val dataSplitter = DataSplitter(maxTrainingSample = maxTrainingSample, seed = seed, reserveTestFraction = 0.0) + val modelSelector = RegressionModelSelector.withCrossValidation(Option(dataSplitter), seed = seed) + val model = modelSelector.setInput(label, features).fit(data) + val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) + val modelDownSampleFraction = metaData.dataPrepParameters("downSampleFraction" ) + + modelDownSampleFraction shouldBe downSampleFraction } it should "split into training and test" in { From 722341b1af565ef807ee75b70cb36e488d7dc57c Mon Sep 17 00:00:00 2001 From: "adam.chit" Date: Thu, 3 Oct 2019 14:24:07 -0700 Subject: [PATCH 05/16] style changes --- .../op/stages/impl/tuning/DataSplitter.scala | 2 +- .../op/stages/impl/tuning/DataSplitterTest.scala | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala index e9893618c6..d0ff81f1be 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala @@ -73,7 +73,7 @@ class DataSplitter(uid: String = UID[DataSplitter]) extends Splitter(uid = uid) override def preValidationPrepare(data: Dataset[Row]): PrevalidationVal = { val dataSetSize = data.count().toDouble val sampleF = getMaxTrainingSample / dataSetSize - val DownSampleFraction = if (sampleF < 1) sampleF else 1 + val DownSampleFraction = if (getMaxTrainingSample < dataSetSize) sampleF else 1 summary = Option(DataSplitterSummary(DownSampleFraction)) setDownSampleFraction(DownSampleFraction) PrevalidationVal(summary, None) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala index ed30355232..f36a63006c 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala @@ -43,7 +43,7 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma val seed = 1234L val dataCount = 1000 - val MaxTrainingSampleDefault = 1E6.toInt + val MaxTrainingSampleDefault = 1E6.toLong val data = RandomRDDs.normalVectorRDD(sc, 1000, 3, seed = seed) @@ -58,18 +58,18 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma } it should "down-sample when the data count is above the max allowed" in { - val numRows = 1E6.toInt + 1E6.toInt + val numRows = MaxTrainingSampleDefault * 2 val data = RandomRDDs.normalVectorRDD(sc, numRows, 3, seed = seed) .map(v => (1.0, Vectors.dense(v.toArray), "A")).toDF() dataSplitter.preValidationPrepare(data) - val dataBalanced = dataSplitter.validationPrepare(data) + val dataBalanced = dataSplitter.validationPrepare(data) // validationPrepare calls the data sample method that samples the data to a target ratio but there is an epsilon // to how precise this function is which is why we need to check around that epsilon - val samplingErrorEpsilon = (0.1 * MaxTrainingSampleDefault).toInt + val samplingErrorEpsilon = (0.1 * MaxTrainingSampleDefault).toLong - dataBalanced.count() shouldBe MaxTrainingSampleDefault.toLong +- samplingErrorEpsilon.toLong + dataBalanced.count() shouldBe MaxTrainingSampleDefault +- samplingErrorEpsilon } it should "set and get maxTrainingSample" in { From 34d5bf1fd50b851547883b1abbc84229da146a35 Mon Sep 17 00:00:00 2001 From: "adam.chit" Date: Thu, 3 Oct 2019 14:24:49 -0700 Subject: [PATCH 06/16] changed the test to reduce run time --- .../impl/regression/RegressionModelSelectorTest.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala index 7e2e1bce5b..d5fe2ef69d 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala @@ -134,11 +134,16 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext implicit val e1 = Encoders.tuple(Encoders.scalaDouble, vectorEncoder) val maxTrainingSample = 100 val sampleF = maxTrainingSample / dataCount.toDouble - val downSampleFraction = if (sampleF < 1) sampleF else 1 + val downSampleFraction = if (maxTrainingSample < dataCount) sampleF else 1 val dataSplitter = DataSplitter(maxTrainingSample = maxTrainingSample, seed = seed, reserveTestFraction = 0.0) - val modelSelector = RegressionModelSelector.withCrossValidation(Option(dataSplitter), seed = seed) + val modelSelector = + RegressionModelSelector.withTrainValidationSplit( + modelTypesToUse = Seq(RMT.OpLinearRegression), + dataSplitter = Option(dataSplitter), + seed = seed) val model = modelSelector.setInput(label, features).fit(data) val metaData = ModelSelectorSummary.fromMetadata(model.getMetadata().getSummaryMetadata()) + val modelDownSampleFraction = metaData.dataPrepParameters("downSampleFraction" ) modelDownSampleFraction shouldBe downSampleFraction From 433d4836b19bdc74596708e3f5de5061cfc7d242 Mon Sep 17 00:00:00 2001 From: "adam.chit" Date: Fri, 4 Oct 2019 14:37:41 -0700 Subject: [PATCH 07/16] test now checks all data splitter params --- .../op/stages/impl/tuning/DataSplitterTest.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala index f36a63006c..208b7c90ac 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala @@ -72,14 +72,19 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma dataBalanced.count() shouldBe MaxTrainingSampleDefault +- samplingErrorEpsilon } - it should "set and get maxTrainingSample" in { - val numRows = 2000 - val maxRows = numRows / 2 + it should "set and get all data splitter params" in { + val maxRows = dataCount / 2 + val downSampleFraction = maxRows / dataCount.toDouble - dataSplitter + val dataSplitter = DataSplitter() .setReserveTestFraction(0.0) + .setSeed(seed) .setMaxTrainingSample(maxRows) + .setDownSampleFraction(downSampleFraction) + dataSplitter.getReserveTestFraction shouldBe 0.0 + dataSplitter.getDownSampleFraction shouldBe downSampleFraction + dataSplitter.getSeed shouldBe seed dataSplitter.getMaxTrainingSample shouldBe maxRows } From 2b02f8a3ae76203fb2d6ff6f8785ac306c435032 Mon Sep 17 00:00:00 2001 From: AdamChit Date: Fri, 4 Oct 2019 14:46:43 -0700 Subject: [PATCH 08/16] Update core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala Co-Authored-By: Christopher Rupley --- .../com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala index 208b7c90ac..16eb7d08cf 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala @@ -104,7 +104,7 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma val summary = dataSplitter.preValidationPrepare(data) val train = dataSplitter.validationPrepare(data) val sampleF = MaxTrainingSampleDefault / dataCount.toDouble - val downSampleFraction = if (sampleF < 1) sampleF else 1 + val downSampleFraction = math.min(sampleF, 1.0) train.collect().zip(data.collect()).foreach { case (a, b) => a shouldBe b } assertDataSplitterSummary(summary.summaryOpt) { s => s shouldBe DataSplitterSummary(downSampleFraction) } } From 0521a37ed4fc7adcdc7a885853d099b191cbbcd1 Mon Sep 17 00:00:00 2001 From: AdamChit Date: Fri, 4 Oct 2019 14:48:07 -0700 Subject: [PATCH 09/16] Update core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala Co-Authored-By: Christopher Rupley --- .../op/stages/impl/regression/RegressionModelSelectorTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala index d5fe2ef69d..7d7b924907 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala @@ -134,7 +134,7 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext implicit val e1 = Encoders.tuple(Encoders.scalaDouble, vectorEncoder) val maxTrainingSample = 100 val sampleF = maxTrainingSample / dataCount.toDouble - val downSampleFraction = if (maxTrainingSample < dataCount) sampleF else 1 + val downSampleFraction = math.min(sampleF, 1.0) val dataSplitter = DataSplitter(maxTrainingSample = maxTrainingSample, seed = seed, reserveTestFraction = 0.0) val modelSelector = RegressionModelSelector.withTrainValidationSplit( From 962e06f605a56453fe33a5d940043f19c7cdddd4 Mon Sep 17 00:00:00 2001 From: "adam.chit" Date: Fri, 4 Oct 2019 16:35:54 -0700 Subject: [PATCH 10/16] added downSampleFraction default value and made style changes --- .../salesforce/op/stages/impl/tuning/DataSplitter.scala | 7 ++++--- .../com/salesforce/op/stages/impl/tuning/Splitter.scala | 1 + .../op/stages/impl/tuning/DataSplitterTest.scala | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala index d0ff81f1be..98b1014e9e 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala @@ -73,9 +73,9 @@ class DataSplitter(uid: String = UID[DataSplitter]) extends Splitter(uid = uid) override def preValidationPrepare(data: Dataset[Row]): PrevalidationVal = { val dataSetSize = data.count().toDouble val sampleF = getMaxTrainingSample / dataSetSize - val DownSampleFraction = if (getMaxTrainingSample < dataSetSize) sampleF else 1 - summary = Option(DataSplitterSummary(DownSampleFraction)) - setDownSampleFraction(DownSampleFraction) + val downSampleFraction = math.min(sampleF, SplitterParamsDefault.DownSampleFractionDefault) + summary = Option(DataSplitterSummary(downSampleFraction)) + setDownSampleFraction(downSampleFraction) PrevalidationVal(summary, None) } @@ -114,6 +114,7 @@ trait DataSplitterParams extends Params { lowerBound = 0.0, upperBound = 1.0, lowerInclusive = false, upperInclusive = true ) ) + setDefault(downSampleFraction, SplitterParamsDefault.DownSampleFractionDefault) private[op] def setDownSampleFraction(value: Double): this.type = set(downSampleFraction, value) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala index 110dbdfa1f..b0b4c2bb3c 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala @@ -161,6 +161,7 @@ object SplitterParamsDefault { val MaxTrainingSampleDefault = 1E6.toInt val MaxLabelCategoriesDefault = 100 val MinLabelFractionDefault = 0.0 + val DownSampleFractionDefault = 1.0 } trait SplitterSummary extends MetadataLike diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala index 208b7c90ac..16eb7d08cf 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala @@ -104,7 +104,7 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma val summary = dataSplitter.preValidationPrepare(data) val train = dataSplitter.validationPrepare(data) val sampleF = MaxTrainingSampleDefault / dataCount.toDouble - val downSampleFraction = if (sampleF < 1) sampleF else 1 + val downSampleFraction = math.min(sampleF, 1.0) train.collect().zip(data.collect()).foreach { case (a, b) => a shouldBe b } assertDataSplitterSummary(summary.summaryOpt) { s => s shouldBe DataSplitterSummary(downSampleFraction) } } From 0ab4d9a2f5188c0fbced8f19f90966bc25aa8fe3 Mon Sep 17 00:00:00 2001 From: "adam.chit" Date: Fri, 4 Oct 2019 16:54:07 -0700 Subject: [PATCH 11/16] renamed test --- .../op/stages/impl/regression/RegressionModelSelectorTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala index 7d7b924907..54c66868ed 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/regression/RegressionModelSelectorTest.scala @@ -128,7 +128,7 @@ class RegressionModelSelectorTest extends FlatSpec with TestSparkContext modelSelector.splitter.get.getMaxTrainingSample shouldBe 1000 } - it should "set maxTrainingSample and down-sample" in { + it should "down-sample when the training set is greater than the maxTrainingSample" in { implicit val vectorEncoder: org.apache.spark.sql.Encoder[Vector] = ExpressionEncoder() implicit val e1 = Encoders.tuple(Encoders.scalaDouble, vectorEncoder) From ef4327cbe19035383eec199f802e42543f1e69cc Mon Sep 17 00:00:00 2001 From: "adam.chit" Date: Fri, 4 Oct 2019 17:01:31 -0700 Subject: [PATCH 12/16] changed getDownSampleFraction to protected --- .../com/salesforce/op/stages/impl/tuning/DataSplitter.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala index 98b1014e9e..2e46591847 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala @@ -109,16 +109,16 @@ trait DataSplitterParams extends Params { * * @group param */ - private[op] final val downSampleFraction = new DoubleParam(this, "downSampleFraction", + protected[op] final val downSampleFraction = new DoubleParam(this, "downSampleFraction", "fraction to down sample data", ParamValidators.inRange( lowerBound = 0.0, upperBound = 1.0, lowerInclusive = false, upperInclusive = true ) ) setDefault(downSampleFraction, SplitterParamsDefault.DownSampleFractionDefault) - private[op] def setDownSampleFraction(value: Double): this.type = set(downSampleFraction, value) + protected[op] def setDownSampleFraction(value: Double): this.type = set(downSampleFraction, value) - private[op] def getDownSampleFraction: Double = $(downSampleFraction) + protected[op] def getDownSampleFraction: Double = $(downSampleFraction) } /** From 8ca0e78fb2f5e1f7e0ba869f9fa86a8387f4f969 Mon Sep 17 00:00:00 2001 From: "adam.chit" Date: Fri, 4 Oct 2019 17:14:20 -0700 Subject: [PATCH 13/16] name change based on RP comments --- .../op/stages/impl/tuning/DataSplitterTest.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala index 16eb7d08cf..02f50d0fb2 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala @@ -43,7 +43,7 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma val seed = 1234L val dataCount = 1000 - val MaxTrainingSampleDefault = 1E6.toLong + val trainingLimitDefault = 1E6.toLong val data = RandomRDDs.normalVectorRDD(sc, 1000, 3, seed = seed) @@ -57,8 +57,8 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma train.count() shouldBe dataCount } - it should "down-sample when the data count is above the max allowed" in { - val numRows = MaxTrainingSampleDefault * 2 + it should "down-sample when the data count is above the default training limit" in { + val numRows = trainingLimitDefault * 2 val data = RandomRDDs.normalVectorRDD(sc, numRows, 3, seed = seed) .map(v => (1.0, Vectors.dense(v.toArray), "A")).toDF() @@ -67,9 +67,9 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma val dataBalanced = dataSplitter.validationPrepare(data) // validationPrepare calls the data sample method that samples the data to a target ratio but there is an epsilon // to how precise this function is which is why we need to check around that epsilon - val samplingErrorEpsilon = (0.1 * MaxTrainingSampleDefault).toLong + val samplingErrorEpsilon = (0.1 * trainingLimitDefault).toLong - dataBalanced.count() shouldBe MaxTrainingSampleDefault +- samplingErrorEpsilon + dataBalanced.count() shouldBe trainingLimitDefault +- samplingErrorEpsilon } it should "set and get all data splitter params" in { @@ -103,7 +103,7 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma it should "keep the data unchanged when prepare is called" in { val summary = dataSplitter.preValidationPrepare(data) val train = dataSplitter.validationPrepare(data) - val sampleF = MaxTrainingSampleDefault / dataCount.toDouble + val sampleF = trainingLimitDefault / dataCount.toDouble val downSampleFraction = math.min(sampleF, 1.0) train.collect().zip(data.collect()).foreach { case (a, b) => a shouldBe b } assertDataSplitterSummary(summary.summaryOpt) { s => s shouldBe DataSplitterSummary(downSampleFraction) } From 8e67f277309840c3d37722f025b19e5f17dbb04b Mon Sep 17 00:00:00 2001 From: "adam.chit" Date: Fri, 4 Oct 2019 17:37:10 -0700 Subject: [PATCH 14/16] added datacount to summary --- .../op/stages/impl/selector/ModelSelectorNames.scala | 1 + .../salesforce/op/stages/impl/tuning/DataSplitter.scala | 9 +++++---- .../com/salesforce/op/stages/impl/tuning/Splitter.scala | 1 + .../op/stages/impl/tuning/DataSplitterTest.scala | 3 ++- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorNames.scala b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorNames.scala index b36709438a..d863ba5349 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorNames.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorNames.scala @@ -45,6 +45,7 @@ case object ModelSelectorNames { val HoldOutEval = "testSetEvaluationResults" val ResampleValues = "resamplingValues" val CuttValues = "cuttValues" + val PreSplitterDataCount = "preSplitterDataCount" val BestModelUid = "bestModelUID" val BestModelName = "bestModelName" val Positive = "positiveLabels" diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala index 2e46591847..cbcfc3e9c0 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/DataSplitter.scala @@ -71,10 +71,10 @@ class DataSplitter(uid: String = UID[DataSplitter]) extends Splitter(uid = uid) * @return Parameters set in examining data */ override def preValidationPrepare(data: Dataset[Row]): PrevalidationVal = { - val dataSetSize = data.count().toDouble - val sampleF = getMaxTrainingSample / dataSetSize + val dataSetSize = data.count() + val sampleF = getMaxTrainingSample / dataSetSize.toDouble val downSampleFraction = math.min(sampleF, SplitterParamsDefault.DownSampleFractionDefault) - summary = Option(DataSplitterSummary(downSampleFraction)) + summary = Option(DataSplitterSummary(dataSetSize, downSampleFraction)) setDownSampleFraction(downSampleFraction) PrevalidationVal(summary, None) } @@ -125,7 +125,7 @@ trait DataSplitterParams extends Params { * Summary for data splitter run for storage in metadata * @param downSamplingFraction down sampling fraction for training set */ -case class DataSplitterSummary(downSamplingFraction: Double) extends SplitterSummary { +case class DataSplitterSummary(preSplitterDataCount: Long, downSamplingFraction: Double) extends SplitterSummary { /** * Converts to [[Metadata]] @@ -137,6 +137,7 @@ case class DataSplitterSummary(downSamplingFraction: Double) extends SplitterSum def toMetadata(skipUnsupported: Boolean): Metadata = { new MetadataBuilder() .putString(SplitterSummary.ClassName, this.getClass.getName) + .putLong(ModelSelectorNames.PreSplitterDataCount, preSplitterDataCount) .putDouble(ModelSelectorNames.DownSample, downSamplingFraction) .build() } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala index b0b4c2bb3c..ec83367f46 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/tuning/Splitter.scala @@ -172,6 +172,7 @@ private[op] object SplitterSummary { def fromMetadata(metadata: Metadata): Try[SplitterSummary] = Try { metadata.getString(ClassName) match { case s if s == classOf[DataSplitterSummary].getName => DataSplitterSummary( + preSplitterDataCount = metadata.getLong(ModelSelectorNames.PreSplitterDataCount), downSamplingFraction = metadata.getDouble(ModelSelectorNames.DownSample) ) case s if s == classOf[DataBalancerSummary].getName => DataBalancerSummary( diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala index 02f50d0fb2..91911ad711 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/tuning/DataSplitterTest.scala @@ -101,12 +101,13 @@ class DataSplitterTest extends FlatSpec with TestSparkContext with SplitterSumma } it should "keep the data unchanged when prepare is called" in { + val dataCount = data.count() val summary = dataSplitter.preValidationPrepare(data) val train = dataSplitter.validationPrepare(data) val sampleF = trainingLimitDefault / dataCount.toDouble val downSampleFraction = math.min(sampleF, 1.0) train.collect().zip(data.collect()).foreach { case (a, b) => a shouldBe b } - assertDataSplitterSummary(summary.summaryOpt) { s => s shouldBe DataSplitterSummary(downSampleFraction) } + assertDataSplitterSummary(summary.summaryOpt) { s => s shouldBe DataSplitterSummary(dataCount, downSampleFraction) } } } From 009706dab8d5cf7b34c9ea5796c63fa8636dc1b3 Mon Sep 17 00:00:00 2001 From: "adam.chit" Date: Mon, 7 Oct 2019 11:08:37 -0700 Subject: [PATCH 15/16] Trigger re-build From cfbe22fdf9082ab43a4df1ef206f489aa72d4fee Mon Sep 17 00:00:00 2001 From: "adam.chit" Date: Mon, 7 Oct 2019 13:19:12 -0700 Subject: [PATCH 16/16] Trigger travis re-build