From 2d9cb1d4f3c54e6f47d1abd5e413d1d246fcee31 Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Fri, 7 Sep 2018 17:41:05 +0530 Subject: [PATCH 01/16] creating binaryclassification bin score evaluator --- .../OpBinaryClassifyBinEvaluator.scala | 166 ++++++++++++++++++ .../OpBinaryClassifyBinEvaluatorTest.scala | 123 +++++++++++++ 2 files changed, 289 insertions(+) create mode 100644 core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala create mode 100644 core/src/test/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala new file mode 100644 index 0000000000..a2d3a52913 --- /dev/null +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.salesforce.op.evaluators + +import com.salesforce.op.UID +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.sql.types.DoubleType +import org.slf4j.LoggerFactory +import org.apache.spark.Partitioner + +/** + * + * Instance to evaluate BinaryClassificationBinMetrics metrics + * The metrics are ,AverageScore, count, conversion rate and + * each bin' centers for each bin and its overall brier score. + * Default evaluation returns BrierScore + * + * @param name name of default metric + * @param isLargerBetter is metric better if larger + * @param uid uid for instance + */ +private[op] class OpBinaryClassifyBinEvaluator +( + override val name: EvalMetric = OpEvaluatorNames.Binary, + override val isLargerBetter: Boolean = true, + override val uid: String = UID[BinaryClassificationBinMetrics], + val numBins: Int = 100 +) extends OpBinaryClassificationEvaluatorBase[BinaryClassificationBinMetrics](uid = uid) { + + @transient private lazy val log = LoggerFactory.getLogger(this.getClass) + + def getDefaultMetric: BinaryClassificationBinMetrics => Double = _.BrierScore + + override def evaluateAll(data: Dataset[_]): BinaryClassificationBinMetrics = { + val labelColName = getLabelCol + val dataUse = makeDataToUse(data, labelColName) + + val (rawPredictionColName, predictionColName, probabilityColName) = + (getRawPredictionCol, getPredictionValueCol, getProbabilityCol) + log.debug( + "Evaluating metrics on columns :\n label : {}\n rawPrediction : {}\n prediction : {}\n probability : {}\n", + labelColName, rawPredictionColName, predictionColName, probabilityColName + ) + + import dataUse.sparkSession.implicits._ + val rdd = dataUse.select(predictionColName, labelColName).as[(Double, Double)].rdd + + if (rdd.isEmpty()) { + log.error("The dataset is empty") + BinaryClassificationBinMetrics(0.0, Seq.empty[Double], Seq.empty[Long], Seq.empty[Double], Seq.empty[Double]) + } else { + val scoreAndLabels = + dataUse.select(col(probabilityColName), col(labelColName).cast(DoubleType)).rdd.map { + case Row(prob: Vector, label: Double) => (prob(1), label) + case Row(prob: Double, label: Double) => (prob, label) + } + + BinaryClassificationBinMetrics(0.0, Seq.empty[Double], Seq.empty[Long], Seq.empty[Double], Seq.empty[Double]) + if (numBins == 0) { + log.error("numBins is set to 0. Returning empty metrics") + BinaryClassificationBinMetrics(0.0, Seq.empty[Double], Seq.empty[Long], Seq.empty[Double], Seq.empty[Double]) + } else { + // Find the significant digit to which the scores needs to be rounded, based of numBins. + val significantDigitToRoundOff = math.log10(numBins).toInt + 1 + val scoreAndLabelsRounded = for {i <- scoreAndLabels} + yield (BigDecimal(i._1).setScale(significantDigitToRoundOff, + BigDecimal.RoundingMode.HALF_UP).toDouble, (i._1, i._2)) + + // Create `numBins` bins and place each score in its corresponding bin. + val binnedValues = scoreAndLabelsRounded.partitionBy(new OpBinPartitioner(numBins)).values + + // compute the average score per bin + val averageScore = binnedValues.mapPartitions(scores => { + val (totalScore, count) = scores.foldLeft(0.0, 0)( + (r: (Double, Int), s: (Double, Double)) => (r._1 + s._1, r._2 + 1)) + Iterator(if (count == 0) 0.0 else totalScore / count) + }).collect().toSeq + + // compute the average conversion rate per bin. Convertion rate is the number of 1's in labels. + val averageConvertionRate = binnedValues.mapPartitions(scores => { + val (totalConversion, count) = scores.foldLeft(0.0, 0)( + (r: (Double, Int), s: (Double, Double)) => (r._1 + s._2, r._2 + 1)) + Iterator(if (count == 0) 0.0 else totalConversion / count) + }).collect().toSeq + + // compute total number of data points in each bin. + val numberOfDataPoints = binnedValues.mapPartitions(scores => Iterator(scores.length.toLong)).collect().toSeq + + // binCenters is the center point in each bin. + // e.g., for bins [(0.0 - 0.5), (0.5 - 1.0)], bin centers are [0.25, 0.75]. + val binCenters = (for {i <- 0 to numBins} yield ((i + 0.5) / numBins)).dropRight(1) + + // brier score of entire dataset. + val brierScore = scoreAndLabels.map { case (score, label) => math.pow((score - label), 2) }.mean() + + val metrics = BinaryClassificationBinMetrics( + BrierScore = brierScore, + BinCenters = binCenters, + NumberOfDataPoints = numberOfDataPoints, + AverageScore = averageScore, + AverageConversionRate = averageConvertionRate + ) + + log.info("Evaluated metrics: {}", metrics.toString) + metrics + } + } + } +} + +// BinPartitioner which partition the bins. +class OpBinPartitioner(override val numPartitions: Int) extends Partitioner { + + // computes the bin number(0-indexed) to which the score is assigned to. + // For Score 1.0, overflow happens. So, use math.min(last_bin, bin_computed). + def getPartition(key: Any): Int = key match { + case score: Double => math.min(numPartitions - 1, (score * numPartitions).toInt) + } +} + +/** + * Metrics of BinaryClassificationBinMetrics + * + * @param BinCenters center of each bin + * @param NumberOfDataPoints total number of data points in each bin + * @param AverageScore average score in each bin + * @param AverageConversionRate average conversion rate in each bin + * @param BrierScore brier score for overall dataset + */ +case class BinaryClassificationBinMetrics +( + BrierScore: Double, + BinCenters: Seq[Double], + NumberOfDataPoints: Seq[Long], + AverageScore: Seq[Double], + AverageConversionRate: Seq[Double] +) extends EvaluationMetrics diff --git a/core/src/test/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala b/core/src/test/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala new file mode 100644 index 0000000000..210474e1c1 --- /dev/null +++ b/core/src/test/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.evaluators + +import com.salesforce.op.test.TestSparkContext +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.junit.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class OpBinaryClassifyBinEvaluatorTest extends FlatSpec with TestSparkContext { + + val labelName = "label" + val predictionLabel = "pred" + + val dataset_test = Seq( + (Map("probability_1" -> 0.99999, "probability_0" -> 0.0001, "prediction" -> 1.0), 1.0), + (Map("probability_1" -> 0.99999, "probability_0" -> 0.0001, "prediction" -> 1.0), 1.0), + (Map("probability_1" -> 0.00541, "probability_0" -> 0.99560, "prediction" -> 1.0), 0.0), + (Map("probability_1" -> 0.70, "probability_0" -> 0.30, "prediction" -> 1.0), 0.0), + (Map("probability_1" -> 0.001, "probability_0" -> 0.999, "prediction" -> 0.0), 0.0) + ) + + val dataset_skewed = Seq( + (Map("probability_1" -> 0.99999, "probability_0" -> 0.0001, "prediction" -> 1.0), 1.0), + (Map("probability_1" -> 0.99999, "probability_0" -> 0.0001, "prediction" -> 1.0), 1.0), + (Map("probability_1" -> 0.9987, "probability_0" -> 0.001, "prediction" -> 1.0), 1.0), + (Map("probability_1" -> 0.946, "probability_0" -> 0.0541, "prediction" -> 1.0), 1.0) + ) + + val emptyDataSet = Seq.empty[(Map[String, Double], Double)] + + Spec[OpBinaryClassifyBinEvaluator] should "return the bin metrics" in { + val df = spark.createDataFrame(dataset_test).toDF(predictionLabel, labelName) + + val metrics = new OpBinaryClassifyBinEvaluator(numBins = 4) + .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(df) + + BigDecimal(metrics.BrierScore).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble shouldBe 0.098 + metrics.BinCenters shouldBe Seq(0.125, 0.375, 0.625, 0.875) + metrics.NumberOfDataPoints shouldBe Seq(2, 0, 1, 2) + metrics.AverageScore shouldBe Seq(0.003205, 0.0, 0.7, 0.99999) + metrics.AverageConversionRate shouldBe Seq(0.0, 0.0, 0.0, 1.0) + } + + it should "return the empty bin metrics for numBins == 0" in { + val df = spark.createDataFrame(dataset_test).toDF(predictionLabel, labelName) + + val metrics = new OpBinaryClassifyBinEvaluator(numBins = 0) + .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(df) + + metrics.BrierScore shouldBe 0.0 + metrics.BinCenters shouldBe Seq.empty[Double] + metrics.NumberOfDataPoints shouldBe Seq.empty[Long] + metrics.AverageScore shouldBe Seq.empty[Double] + metrics.AverageConversionRate shouldBe Seq.empty[Double] + } + + it should "return the empty bin metrics for empty data" in { + val df = spark.createDataFrame(emptyDataSet).toDF(predictionLabel, labelName) + + val metrics = new OpBinaryClassifyBinEvaluator(numBins = 10) + .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(df) + + metrics.BrierScore shouldBe 0.0 + metrics.BinCenters shouldBe Seq.empty[Double] + metrics.NumberOfDataPoints shouldBe Seq.empty[Long] + metrics.AverageScore shouldBe Seq.empty[Double] + metrics.AverageConversionRate shouldBe Seq.empty[Double] + } + + it should "return the bin metrics for skewed data" in { + val df = spark.createDataFrame(dataset_skewed).toDF(predictionLabel, labelName) + + val metrics = new OpBinaryClassifyBinEvaluator(numBins = 5) + .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(df) + + metrics.BrierScore shouldBe 7.294225500000013E-4 + metrics.BinCenters shouldBe Seq(0.1, 0.3, 0.5, 0.7, 0.9) + metrics.NumberOfDataPoints shouldBe Seq(0, 0, 0, 0, 4) + metrics.AverageScore shouldBe Seq(0.0, 0.0, 0.0, 0.0, 0.98617) + metrics.AverageConversionRate shouldBe Seq(0.0, 0.0, 0.0, 0.0, 1.0) + } + + it should "return the default metric as BrierScore" in { + val df = spark.createDataFrame(dataset_test).toDF(predictionLabel, labelName) + + val evaluator = new OpBinaryClassifyBinEvaluator(numBins = 4) + .setLabelCol(labelName).setPredictionCol(predictionLabel) + + val brierScore = evaluator.getDefaultMetric(evaluator.evaluateAll(df)) + + BigDecimal(brierScore).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble shouldBe 0.098 + } +} From 9658eb4d22865719ab03d642e91237f9aee66f6f Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Fri, 7 Sep 2018 17:52:07 +0530 Subject: [PATCH 02/16] minor changes --- .../salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala | 1 - .../op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala | 1 - 2 files changed, 2 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala index a2d3a52913..15e553909e 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala @@ -84,7 +84,6 @@ private[op] class OpBinaryClassifyBinEvaluator case Row(prob: Double, label: Double) => (prob, label) } - BinaryClassificationBinMetrics(0.0, Seq.empty[Double], Seq.empty[Long], Seq.empty[Double], Seq.empty[Double]) if (numBins == 0) { log.error("numBins is set to 0. Returning empty metrics") BinaryClassificationBinMetrics(0.0, Seq.empty[Double], Seq.empty[Long], Seq.empty[Double], Seq.empty[Double]) diff --git a/core/src/test/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala b/core/src/test/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala index 210474e1c1..b0bd7899d1 100644 --- a/core/src/test/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala +++ b/core/src/test/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala @@ -117,7 +117,6 @@ class OpBinaryClassifyBinEvaluatorTest extends FlatSpec with TestSparkContext { .setLabelCol(labelName).setPredictionCol(predictionLabel) val brierScore = evaluator.getDefaultMetric(evaluator.evaluateAll(df)) - BigDecimal(brierScore).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble shouldBe 0.098 } } From 56d62800e49b6fbf3854851fed1c410bd78c004e Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Fri, 7 Sep 2018 18:22:17 +0530 Subject: [PATCH 03/16] Doc Changes --- .../OpBinaryClassifyBinEvaluator.scala | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala index 15e553909e..dfbf089ddb 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala @@ -39,14 +39,19 @@ import org.apache.spark.Partitioner /** * - * Instance to evaluate BinaryClassificationBinMetrics metrics - * The metrics are ,AverageScore, count, conversion rate and - * each bin' centers for each bin and its overall brier score. - * Default evaluation returns BrierScore + * Evaluator for Binary Classification which provides statistics about the predicted scores. + * This evaluator creates the specified number of bins and computes the statistics for each bin + * and returns BinaryClassificationBinMetrics, which contains * - * @param name name of default metric - * @param isLargerBetter is metric better if larger - * @param uid uid for instance + * Total number of data points per bin + * Average Score per bin + * Average Conversion rate per bin + * Bin Centers for each bin + * BrierScore for the overall dataset is also computed, which is a default metric as well. + * + * @param name name of default metric + * @param isLargerBetter is metric better if larger + * @param uid uid for instance */ private[op] class OpBinaryClassifyBinEvaluator ( @@ -139,8 +144,8 @@ private[op] class OpBinaryClassifyBinEvaluator // BinPartitioner which partition the bins. class OpBinPartitioner(override val numPartitions: Int) extends Partitioner { - // computes the bin number(0-indexed) to which the score is assigned to. - // For Score 1.0, overflow happens. So, use math.min(last_bin, bin_computed). + // computes the bin number(0-indexed) to which the datapoint is assigned. + // For Score 1.0, overflow happens. So, use math.min(last_bin, bin_index__computed). def getPartition(key: Any): Int = key match { case score: Double => math.min(numPartitions - 1, (score * numPartitions).toInt) } From 0eebe3dbbfb025fe1145e1c2f36fdaecc37e9b89 Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Fri, 7 Sep 2018 19:03:32 +0530 Subject: [PATCH 04/16] Fixing code factor issue --- .../OpBinaryClassifyBinEvaluator.scala | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala index dfbf089ddb..690d15878d 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala @@ -66,25 +66,19 @@ private[op] class OpBinaryClassifyBinEvaluator def getDefaultMetric: BinaryClassificationBinMetrics => Double = _.BrierScore override def evaluateAll(data: Dataset[_]): BinaryClassificationBinMetrics = { - val labelColName = getLabelCol - val dataUse = makeDataToUse(data, labelColName) + val labelColumnName = getLabelCol + val dataProcessed = makeDataToUse(data, labelColumnName) - val (rawPredictionColName, predictionColName, probabilityColName) = - (getRawPredictionCol, getPredictionValueCol, getProbabilityCol) - log.debug( - "Evaluating metrics on columns :\n label : {}\n rawPrediction : {}\n prediction : {}\n probability : {}\n", - labelColName, rawPredictionColName, predictionColName, probabilityColName - ) - - import dataUse.sparkSession.implicits._ - val rdd = dataUse.select(predictionColName, labelColName).as[(Double, Double)].rdd + import dataProcessed.sparkSession.implicits._ + val rdd = dataProcessed.select(getPredictionValueCol, labelColumnName).as[(Double, Double)].rdd if (rdd.isEmpty()) { - log.error("The dataset is empty") + log.error("The dataset is empty. Returning empty metrics") BinaryClassificationBinMetrics(0.0, Seq.empty[Double], Seq.empty[Long], Seq.empty[Double], Seq.empty[Double]) } else { val scoreAndLabels = - dataUse.select(col(probabilityColName), col(labelColName).cast(DoubleType)).rdd.map { + dataProcessed.select(col(getProbabilityCol), col(labelColumnName).cast(DoubleType)).rdd.map + { case Row(prob: Vector, label: Double) => (prob(1), label) case Row(prob: Double, label: Double) => (prob, label) } From 41123d5b30b0840da69774c8f75a4506643112f1 Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Sat, 8 Sep 2018 17:50:41 +0530 Subject: [PATCH 05/16] Incorporating code review comments --- .../salesforce/op/evaluators/Evaluators.scala | 7 + ...luator.scala => OpBinScoreEvaluator.scala} | 123 +++++++++--------- .../op/evaluators/OpEvaluatorBase.scala | 1 + .../op/evaluators/EvaluatorsTest.scala | 8 ++ .../evaluators/OpBinScoreEvaluatorTest.scala | 112 ++++++++++++++++ .../OpBinaryClassifyBinEvaluatorTest.scala | 122 ----------------- 6 files changed, 190 insertions(+), 183 deletions(-) rename core/src/main/scala/com/salesforce/op/evaluators/{OpBinaryClassifyBinEvaluator.scala => OpBinScoreEvaluator.scala} (57%) create mode 100644 core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala delete mode 100644 core/src/test/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala diff --git a/core/src/main/scala/com/salesforce/op/evaluators/Evaluators.scala b/core/src/main/scala/com/salesforce/op/evaluators/Evaluators.scala index cf5c7fb48c..a86ab98140 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/Evaluators.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/Evaluators.scala @@ -52,6 +52,13 @@ object Evaluators { */ def apply(): OpBinaryClassificationEvaluator = auROC() + /* + * Brier Score for the prediction + */ + def brierScore(): OpBinScoreEvaluator = + new OpBinScoreEvaluator( + name = BinaryClassEvalMetrics.BrierScore, isLargerBetter = true) + /** * Area under ROC */ diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala similarity index 57% rename from core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala rename to core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala index 690d15878d..eaf5bde582 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala @@ -29,6 +29,7 @@ */ package com.salesforce.op.evaluators +import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.salesforce.op.UID import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.functions.col @@ -53,17 +54,18 @@ import org.apache.spark.Partitioner * @param isLargerBetter is metric better if larger * @param uid uid for instance */ -private[op] class OpBinaryClassifyBinEvaluator +private[op] class OpBinScoreEvaluator ( override val name: EvalMetric = OpEvaluatorNames.Binary, override val isLargerBetter: Boolean = true, - override val uid: String = UID[BinaryClassificationBinMetrics], + override val uid: String = UID[OpBinScoreEvaluator], val numBins: Int = 100 ) extends OpBinaryClassificationEvaluatorBase[BinaryClassificationBinMetrics](uid = uid) { + require(numBins > 0, "numBins must be positive") @transient private lazy val log = LoggerFactory.getLogger(this.getClass) - def getDefaultMetric: BinaryClassificationBinMetrics => Double = _.BrierScore + def getDefaultMetric: BinaryClassificationBinMetrics => Double = _.brierScore override def evaluateAll(data: Dataset[_]): BinaryClassificationBinMetrics = { val labelColumnName = getLabelCol @@ -74,7 +76,7 @@ private[op] class OpBinaryClassifyBinEvaluator if (rdd.isEmpty()) { log.error("The dataset is empty. Returning empty metrics") - BinaryClassificationBinMetrics(0.0, Seq.empty[Double], Seq.empty[Long], Seq.empty[Double], Seq.empty[Double]) + BinaryClassificationBinMetrics(0.0, Seq(), Seq(), Seq(), Seq()) } else { val scoreAndLabels = dataProcessed.select(col(getProbabilityCol), col(labelColumnName).cast(DoubleType)).rdd.map @@ -83,54 +85,49 @@ private[op] class OpBinaryClassifyBinEvaluator case Row(prob: Double, label: Double) => (prob, label) } - if (numBins == 0) { - log.error("numBins is set to 0. Returning empty metrics") - BinaryClassificationBinMetrics(0.0, Seq.empty[Double], Seq.empty[Long], Seq.empty[Double], Seq.empty[Double]) - } else { - // Find the significant digit to which the scores needs to be rounded, based of numBins. - val significantDigitToRoundOff = math.log10(numBins).toInt + 1 - val scoreAndLabelsRounded = for {i <- scoreAndLabels} - yield (BigDecimal(i._1).setScale(significantDigitToRoundOff, - BigDecimal.RoundingMode.HALF_UP).toDouble, (i._1, i._2)) - - // Create `numBins` bins and place each score in its corresponding bin. - val binnedValues = scoreAndLabelsRounded.partitionBy(new OpBinPartitioner(numBins)).values - - // compute the average score per bin - val averageScore = binnedValues.mapPartitions(scores => { - val (totalScore, count) = scores.foldLeft(0.0, 0)( - (r: (Double, Int), s: (Double, Double)) => (r._1 + s._1, r._2 + 1)) - Iterator(if (count == 0) 0.0 else totalScore / count) - }).collect().toSeq - - // compute the average conversion rate per bin. Convertion rate is the number of 1's in labels. - val averageConvertionRate = binnedValues.mapPartitions(scores => { - val (totalConversion, count) = scores.foldLeft(0.0, 0)( - (r: (Double, Int), s: (Double, Double)) => (r._1 + s._2, r._2 + 1)) - Iterator(if (count == 0) 0.0 else totalConversion / count) - }).collect().toSeq - - // compute total number of data points in each bin. - val numberOfDataPoints = binnedValues.mapPartitions(scores => Iterator(scores.length.toLong)).collect().toSeq - - // binCenters is the center point in each bin. - // e.g., for bins [(0.0 - 0.5), (0.5 - 1.0)], bin centers are [0.25, 0.75]. - val binCenters = (for {i <- 0 to numBins} yield ((i + 0.5) / numBins)).dropRight(1) - - // brier score of entire dataset. - val brierScore = scoreAndLabels.map { case (score, label) => math.pow((score - label), 2) }.mean() - - val metrics = BinaryClassificationBinMetrics( - BrierScore = brierScore, - BinCenters = binCenters, - NumberOfDataPoints = numberOfDataPoints, - AverageScore = averageScore, - AverageConversionRate = averageConvertionRate - ) - - log.info("Evaluated metrics: {}", metrics.toString) - metrics + // Find the significant digit to which the scores needs to be rounded, based of numBins. + val significantDigitToRoundOff = math.log10(numBins).toInt + 1 + val scoreAndLabelsRounded = scoreAndLabels.map { scoreAndLabels => + (BigDecimal(scoreAndLabels._1).setScale(significantDigitToRoundOff, BigDecimal.RoundingMode.HALF_UP).toDouble, + scoreAndLabels) } + // Create `numBins` bins and place each score in its corresponding bin. + val binnedValues = scoreAndLabelsRounded.partitionBy(new OpBinPartitioner(numBins)).values + + // compute the average score per bin + val averageScore = binnedValues.mapPartitions(scores => { + val (totalScore, count) = scores.foldLeft(0.0, 0)( + (r: (Double, Int), s: (Double, Double)) => (r._1 + s._1, r._2 + 1)) + Iterator(if (count == 0) 0.0 else totalScore / count) + }).collect().toSeq + + // compute the average conversion rate per bin. Convertion rate is the number of 1's in labels. + val averageConvertionRate = binnedValues.mapPartitions(scores => { + val (totalConversion, count) = scores.foldLeft(0.0, 0)( + (r: (Double, Int), s: (Double, Double)) => (r._1 + s._2, r._2 + 1)) + Iterator(if (count == 0) 0.0 else totalConversion / count) + }).collect().toSeq + + // compute total number of data points in each bin. + val numberOfDataPoints = binnedValues.mapPartitions(scores => Iterator(scores.length.toLong)).collect().toSeq + + // binCenters is the center point in each bin. + // e.g., for bins [(0.0 - 0.5), (0.5 - 1.0)], bin centers are [0.25, 0.75]. + val binCenters = (for {i <- 0 to numBins} yield ((i + 0.5) / numBins)).dropRight(1) + + // brier score of entire dataset. + val brierScore = scoreAndLabels.map { case (score, label) => math.pow((score - label), 2) }.mean() + + val metrics = BinaryClassificationBinMetrics( + brierScore = brierScore, + binCenters = binCenters, + numberOfDataPoints = numberOfDataPoints, + averageScore = averageScore, + averageConversionRate = averageConvertionRate + ) + + log.info("Evaluated metrics: {}", metrics.toString) + metrics } } } @@ -148,17 +145,21 @@ class OpBinPartitioner(override val numPartitions: Int) extends Partitioner { /** * Metrics of BinaryClassificationBinMetrics * - * @param BinCenters center of each bin - * @param NumberOfDataPoints total number of data points in each bin - * @param AverageScore average score in each bin - * @param AverageConversionRate average conversion rate in each bin - * @param BrierScore brier score for overall dataset + * @param binCenters center of each bin + * @param numberOfDataPoints total number of data points in each bin + * @param averageScore average score in each bin + * @param averageConversionRate average conversion rate in each bin + * @param brierScore brier score for overall dataset */ case class BinaryClassificationBinMetrics ( - BrierScore: Double, - BinCenters: Seq[Double], - NumberOfDataPoints: Seq[Long], - AverageScore: Seq[Double], - AverageConversionRate: Seq[Double] + brierScore: Double, + @JsonDeserialize(contentAs = classOf[java.lang.Double]) + binCenters: Seq[Double], + @JsonDeserialize(contentAs = classOf[java.lang.Long]) + numberOfDataPoints: Seq[Long], + @JsonDeserialize(contentAs = classOf[java.lang.Double]) + averageScore: Seq[Double], + @JsonDeserialize(contentAs = classOf[java.lang.Double]) + averageConversionRate: Seq[Double] ) extends EvaluationMetrics diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpEvaluatorBase.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpEvaluatorBase.scala index 7e913bfc65..99964f7db0 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpEvaluatorBase.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpEvaluatorBase.scala @@ -319,6 +319,7 @@ object BinaryClassEvalMetrics extends Enum[ClassificationEvalMetric] { case object TN extends ClassificationEvalMetric("TN", "true negative") case object FP extends ClassificationEvalMetric("FP", "false positive") case object FN extends ClassificationEvalMetric("FN", "false negative") + case object BrierScore extends ClassificationEvalMetric("brierscore", "mean squared error") } /** diff --git a/core/src/test/scala/com/salesforce/op/evaluators/EvaluatorsTest.scala b/core/src/test/scala/com/salesforce/op/evaluators/EvaluatorsTest.scala index db341d60e3..0a53ebee1f 100644 --- a/core/src/test/scala/com/salesforce/op/evaluators/EvaluatorsTest.scala +++ b/core/src/test/scala/com/salesforce/op/evaluators/EvaluatorsTest.scala @@ -97,6 +97,9 @@ class EvaluatorsTest extends FlatSpec with TestSparkContext { val opBinaryMetrics = new OpBinaryClassificationEvaluator().setLabelCol(test_label) .setPredictionCol(pred).evaluateAll(transformedData) + val opBinScoreMetrics = new OpBinScoreEvaluator().setLabelCol(test_label) + .setPredictionCol(pred).evaluateAll(transformedData) + val sparkMultiEvaluator = new MulticlassClassificationEvaluator().setLabelCol(test_label.name) .setPredictionCol(predValue.name) @@ -115,6 +118,8 @@ class EvaluatorsTest extends FlatSpec with TestSparkContext { evaluateBinaryMetric(Evaluators.BinaryClassification.recall()) shouldBe opBinaryMetrics.Recall evaluateBinaryMetric(Evaluators.BinaryClassification.f1()) shouldBe opBinaryMetrics.F1 evaluateBinaryMetric(Evaluators.BinaryClassification.error()) shouldBe opBinaryMetrics.Error + + evaluateBinScoreMetric(Evaluators.BinaryClassification.brierScore()) shouldBe opBinScoreMetrics.brierScore } it should "have a multi classification factory" in { @@ -148,6 +153,9 @@ class EvaluatorsTest extends FlatSpec with TestSparkContext { def evaluateBinaryMetric(binEval: OpBinaryClassificationEvaluator): Double = binEval.setLabelCol(test_label) .setPredictionCol(pred).evaluate(transformedData3) + def evaluateBinScoreMetric(binEval: OpBinScoreEvaluator): Double = binEval.setLabelCol(test_label) + .setPredictionCol(pred).evaluate(transformedData3) + def evaluateSparkBinaryMetric(metricName: String): Double = sparkBinaryEvaluator.setMetricName(metricName) .evaluate(transformedData3) diff --git a/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala b/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala new file mode 100644 index 0000000000..dcc12f8f9b --- /dev/null +++ b/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.evaluators + +import com.salesforce.op.features.types.Prediction +import com.salesforce.op.test.TestSparkContext +import org.scalatest.FlatSpec +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { + + val labelName = "label" + val predictionLabel = "pred" + + val dataset_test = Seq( + (Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)), 1.0), + (Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)), 1.0), + (Prediction(1.0, Array(10.0, 10.0), Array(0.99560, 0.00541)), 0.0), + (Prediction(1.0, Array(10.0, 10.0), Array(0.30, 0.70)), 0.0), + (Prediction(0.0, Array(10.0, 10.0), Array(0.999, 0.001)), 0.0) + ).map(v => (v._1.toDoubleMap, v._2)) + + val dataset_test_df = spark.createDataFrame(dataset_test).toDF(predictionLabel, labelName) + + val dataset_skewed = Seq( + (Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)), 1.0), + (Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)), 1.0), + (Prediction(1.0, Array(10.0, 10.0), Array(0.001, 0.9987)), 1.0), + (Prediction(1.0, Array(10.0, 10.0), Array(0.0541, 0.946)), 1.0) + ).map(v => (v._1.toDoubleMap, v._2)) + + val dataset_skewed_df = spark.createDataFrame(dataset_skewed).toDF(predictionLabel, labelName) + + val emptyDataSet_df = spark.createDataFrame(Seq.empty[(Map[String, Double], Double)]) + .toDF(predictionLabel, labelName) + + Spec[OpBinScoreEvaluator] should "return the bin metrics" in { + val metrics = new OpBinScoreEvaluator(numBins = 4) + .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(dataset_test_df) + + metrics shouldBe BinaryClassificationBinMetrics( + 0.09800605366, + Seq(0.125, 0.375, 0.625, 0.875), + Seq(2, 0, 1, 2), + Seq(0.003205, 0.0, 0.7, 0.99999), + Seq(0.0, 0.0, 0.0, 1.0)) + } + + it should "error on invalid number of bins" in { + assertThrows[IllegalArgumentException] { + new OpBinScoreEvaluator(numBins = 0) + .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(dataset_test_df) + } + } + + it should "evaluate the empty data" in { + val metrics = new OpBinScoreEvaluator(numBins = 10) + .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(emptyDataSet_df) + + metrics shouldBe BinaryClassificationBinMetrics(0.0, Seq(), Seq(), Seq(), Seq()) + } + + it should "evaluate bin metrics for skewed data" in { + val metrics = new OpBinScoreEvaluator(numBins = 5) + .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(dataset_skewed_df) + + metrics shouldBe BinaryClassificationBinMetrics( + 7.294225500000013E-4, + Seq(0.1, 0.3, 0.5, 0.7, 0.9), + Seq(0, 0, 0, 0, 4), + Seq(0.0, 0.0, 0.0, 0.0, 0.98617), + Seq(0.0, 0.0, 0.0, 0.0, 1.0)) + } + + it should "evaluate the default metric as BrierScore" in { + val evaluator = new OpBinScoreEvaluator(numBins = 4) + .setLabelCol(labelName).setPredictionCol(predictionLabel) + + val brierScore = evaluator.getDefaultMetric(evaluator.evaluateAll(dataset_test_df)) + BigDecimal(brierScore).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble shouldBe 0.098 + } +} diff --git a/core/src/test/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala b/core/src/test/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala deleted file mode 100644 index b0bd7899d1..0000000000 --- a/core/src/test/scala/com/salesforce/op/evaluators/OpBinaryClassifyBinEvaluatorTest.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright (c) 2017, Salesforce.com, Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * * Redistributions of source code must retain the above copyright notice, this - * list of conditions and the following disclaimer. - * - * * Redistributions in binary form must reproduce the above copyright notice, - * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * - * * Neither the name of the copyright holder nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE - * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR - * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, - * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.salesforce.op.evaluators - -import com.salesforce.op.test.TestSparkContext -import org.junit.runner.RunWith -import org.scalatest.FlatSpec -import org.scalatest.junit.JUnitRunner - -@RunWith(classOf[JUnitRunner]) -class OpBinaryClassifyBinEvaluatorTest extends FlatSpec with TestSparkContext { - - val labelName = "label" - val predictionLabel = "pred" - - val dataset_test = Seq( - (Map("probability_1" -> 0.99999, "probability_0" -> 0.0001, "prediction" -> 1.0), 1.0), - (Map("probability_1" -> 0.99999, "probability_0" -> 0.0001, "prediction" -> 1.0), 1.0), - (Map("probability_1" -> 0.00541, "probability_0" -> 0.99560, "prediction" -> 1.0), 0.0), - (Map("probability_1" -> 0.70, "probability_0" -> 0.30, "prediction" -> 1.0), 0.0), - (Map("probability_1" -> 0.001, "probability_0" -> 0.999, "prediction" -> 0.0), 0.0) - ) - - val dataset_skewed = Seq( - (Map("probability_1" -> 0.99999, "probability_0" -> 0.0001, "prediction" -> 1.0), 1.0), - (Map("probability_1" -> 0.99999, "probability_0" -> 0.0001, "prediction" -> 1.0), 1.0), - (Map("probability_1" -> 0.9987, "probability_0" -> 0.001, "prediction" -> 1.0), 1.0), - (Map("probability_1" -> 0.946, "probability_0" -> 0.0541, "prediction" -> 1.0), 1.0) - ) - - val emptyDataSet = Seq.empty[(Map[String, Double], Double)] - - Spec[OpBinaryClassifyBinEvaluator] should "return the bin metrics" in { - val df = spark.createDataFrame(dataset_test).toDF(predictionLabel, labelName) - - val metrics = new OpBinaryClassifyBinEvaluator(numBins = 4) - .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(df) - - BigDecimal(metrics.BrierScore).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble shouldBe 0.098 - metrics.BinCenters shouldBe Seq(0.125, 0.375, 0.625, 0.875) - metrics.NumberOfDataPoints shouldBe Seq(2, 0, 1, 2) - metrics.AverageScore shouldBe Seq(0.003205, 0.0, 0.7, 0.99999) - metrics.AverageConversionRate shouldBe Seq(0.0, 0.0, 0.0, 1.0) - } - - it should "return the empty bin metrics for numBins == 0" in { - val df = spark.createDataFrame(dataset_test).toDF(predictionLabel, labelName) - - val metrics = new OpBinaryClassifyBinEvaluator(numBins = 0) - .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(df) - - metrics.BrierScore shouldBe 0.0 - metrics.BinCenters shouldBe Seq.empty[Double] - metrics.NumberOfDataPoints shouldBe Seq.empty[Long] - metrics.AverageScore shouldBe Seq.empty[Double] - metrics.AverageConversionRate shouldBe Seq.empty[Double] - } - - it should "return the empty bin metrics for empty data" in { - val df = spark.createDataFrame(emptyDataSet).toDF(predictionLabel, labelName) - - val metrics = new OpBinaryClassifyBinEvaluator(numBins = 10) - .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(df) - - metrics.BrierScore shouldBe 0.0 - metrics.BinCenters shouldBe Seq.empty[Double] - metrics.NumberOfDataPoints shouldBe Seq.empty[Long] - metrics.AverageScore shouldBe Seq.empty[Double] - metrics.AverageConversionRate shouldBe Seq.empty[Double] - } - - it should "return the bin metrics for skewed data" in { - val df = spark.createDataFrame(dataset_skewed).toDF(predictionLabel, labelName) - - val metrics = new OpBinaryClassifyBinEvaluator(numBins = 5) - .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(df) - - metrics.BrierScore shouldBe 7.294225500000013E-4 - metrics.BinCenters shouldBe Seq(0.1, 0.3, 0.5, 0.7, 0.9) - metrics.NumberOfDataPoints shouldBe Seq(0, 0, 0, 0, 4) - metrics.AverageScore shouldBe Seq(0.0, 0.0, 0.0, 0.0, 0.98617) - metrics.AverageConversionRate shouldBe Seq(0.0, 0.0, 0.0, 0.0, 1.0) - } - - it should "return the default metric as BrierScore" in { - val df = spark.createDataFrame(dataset_test).toDF(predictionLabel, labelName) - - val evaluator = new OpBinaryClassifyBinEvaluator(numBins = 4) - .setLabelCol(labelName).setPredictionCol(predictionLabel) - - val brierScore = evaluator.getDefaultMetric(evaluator.evaluateAll(df)) - BigDecimal(brierScore).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble shouldBe 0.098 - } -} From a16fc86a4e8b26fb90a3557b208face98d2b4119 Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Sun, 9 Sep 2018 22:48:13 +0530 Subject: [PATCH 06/16] code review comments --- .../com/salesforce/op/ModelInsights.scala | 3 +- .../salesforce/op/evaluators/Evaluators.scala | 2 +- .../op/evaluators/OpBinScoreEvaluator.scala | 66 ++++++++----------- .../op/evaluators/OpEvaluatorBase.scala | 4 +- .../BinaryClassificationModelSelector.scala | 2 +- .../impl/selector/ModelSelectorSummary.scala | 4 ++ .../com/salesforce/op/OpWorkflowTest.scala | 2 +- .../evaluators/OpBinScoreEvaluatorTest.scala | 3 +- .../impl/selector/ModelSelectorTest.scala | 6 +- 9 files changed, 45 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/ModelInsights.scala b/core/src/main/scala/com/salesforce/op/ModelInsights.scala index b29f3e47c3..a561d8954a 100644 --- a/core/src/main/scala/com/salesforce/op/ModelInsights.scala +++ b/core/src/main/scala/com/salesforce/op/ModelInsights.scala @@ -387,7 +387,8 @@ case object ModelInsights { val typeHints = FullTypeHints(List( classOf[Continuous], classOf[Discrete], classOf[DataBalancerSummary], classOf[DataCutterSummary], classOf[DataSplitterSummary], - classOf[SingleMetric], classOf[MultiMetrics], classOf[BinaryClassificationMetrics], classOf[ThresholdMetrics], + classOf[SingleMetric], classOf[MultiMetrics], classOf[BinaryClassificationMetrics], + classOf[BinaryClassificationBinMetrics], classOf[ThresholdMetrics], classOf[MultiClassificationMetrics], classOf[RegressionMetrics] )) val evalMetricsSerializer = new CustomSerializer[EvalMetric](_ => diff --git a/core/src/main/scala/com/salesforce/op/evaluators/Evaluators.scala b/core/src/main/scala/com/salesforce/op/evaluators/Evaluators.scala index a86ab98140..a60e97b51d 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/Evaluators.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/Evaluators.scala @@ -57,7 +57,7 @@ object Evaluators { */ def brierScore(): OpBinScoreEvaluator = new OpBinScoreEvaluator( - name = BinaryClassEvalMetrics.BrierScore, isLargerBetter = true) + name = BinaryClassEvalMetrics.brierScore, isLargerBetter = true) /** * Area under ROC diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala index eaf5bde582..383ff287ae 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala @@ -32,11 +32,12 @@ package com.salesforce.op.evaluators import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.salesforce.op.UID import org.apache.spark.ml.linalg.Vector -import org.apache.spark.sql.functions.col import org.apache.spark.sql.{Dataset, Row} -import org.apache.spark.sql.types.DoubleType import org.slf4j.LoggerFactory import org.apache.spark.Partitioner +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.DoubleType +import com.twitter.algebird.Operators._ /** * @@ -56,7 +57,7 @@ import org.apache.spark.Partitioner */ private[op] class OpBinScoreEvaluator ( - override val name: EvalMetric = OpEvaluatorNames.Binary, + override val name: EvalMetric = OpEvaluatorNames.BinScore, override val isLargerBetter: Boolean = true, override val uid: String = UID[OpBinScoreEvaluator], val numBins: Int = 100 @@ -72,44 +73,30 @@ private[op] class OpBinScoreEvaluator val dataProcessed = makeDataToUse(data, labelColumnName) import dataProcessed.sparkSession.implicits._ - val rdd = dataProcessed.select(getPredictionValueCol, labelColumnName).as[(Double, Double)].rdd - if (rdd.isEmpty()) { + if (dataProcessed.select(getPredictionValueCol, labelColumnName).as[(Double, Double)].rdd.isEmpty()) { log.error("The dataset is empty. Returning empty metrics") BinaryClassificationBinMetrics(0.0, Seq(), Seq(), Seq(), Seq()) } else { - val scoreAndLabels = - dataProcessed.select(col(getProbabilityCol), col(labelColumnName).cast(DoubleType)).rdd.map - { - case Row(prob: Vector, label: Double) => (prob(1), label) - case Row(prob: Double, label: Double) => (prob, label) - } - - // Find the significant digit to which the scores needs to be rounded, based of numBins. - val significantDigitToRoundOff = math.log10(numBins).toInt + 1 - val scoreAndLabelsRounded = scoreAndLabels.map { scoreAndLabels => - (BigDecimal(scoreAndLabels._1).setScale(significantDigitToRoundOff, BigDecimal.RoundingMode.HALF_UP).toDouble, - scoreAndLabels) + val scoreAndLabels = dataProcessed.select(col(getProbabilityCol), col(labelColumnName).cast(DoubleType)).rdd.map { + case Row(prob: Vector, label: Double) => (prob(1), label) + case Row(prob: Double, label: Double) => (prob, label) + } + + val stats = scoreAndLabels.map { + case (score, label) => (getBinIndex(score), (score, label, 1L)) + }.reduceByKeyLocally(_ + _).map { + case (bin, (scoreSum, labelSum, count)) => (bin, (scoreSum / count, labelSum / count, count)) } - // Create `numBins` bins and place each score in its corresponding bin. - val binnedValues = scoreAndLabelsRounded.partitionBy(new OpBinPartitioner(numBins)).values - - // compute the average score per bin - val averageScore = binnedValues.mapPartitions(scores => { - val (totalScore, count) = scores.foldLeft(0.0, 0)( - (r: (Double, Int), s: (Double, Double)) => (r._1 + s._1, r._2 + 1)) - Iterator(if (count == 0) 0.0 else totalScore / count) - }).collect().toSeq - - // compute the average conversion rate per bin. Convertion rate is the number of 1's in labels. - val averageConvertionRate = binnedValues.mapPartitions(scores => { - val (totalConversion, count) = scores.foldLeft(0.0, 0)( - (r: (Double, Int), s: (Double, Double)) => (r._1 + s._2, r._2 + 1)) - Iterator(if (count == 0) 0.0 else totalConversion / count) - }).collect().toSeq - - // compute total number of data points in each bin. - val numberOfDataPoints = binnedValues.mapPartitions(scores => Iterator(scores.length.toLong)).collect().toSeq + + // For the bins, which don't have any scores, fill 0's. + val statsForBins = { + for {i <- 0 to numBins - 1} yield stats.getOrElse(i, (0.0, 0.0, 0L)) + } + + val averageScore = statsForBins.map(_._1) + val averageConversionRate = statsForBins.map(_._2) + val numberOfDataPoints = statsForBins.map(_._3) // binCenters is the center point in each bin. // e.g., for bins [(0.0 - 0.5), (0.5 - 1.0)], bin centers are [0.25, 0.75]. @@ -123,15 +110,20 @@ private[op] class OpBinScoreEvaluator binCenters = binCenters, numberOfDataPoints = numberOfDataPoints, averageScore = averageScore, - averageConversionRate = averageConvertionRate + averageConversionRate = averageConversionRate ) log.info("Evaluated metrics: {}", metrics.toString) metrics } } + + def getBinIndex(score: Double): Int = { + math.min(numBins - 1, (score * numBins).toInt) + } } + // BinPartitioner which partition the bins. class OpBinPartitioner(override val numPartitions: Int) extends Partitioner { diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpEvaluatorBase.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpEvaluatorBase.scala index 99964f7db0..473d00f430 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpEvaluatorBase.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpEvaluatorBase.scala @@ -319,7 +319,7 @@ object BinaryClassEvalMetrics extends Enum[ClassificationEvalMetric] { case object TN extends ClassificationEvalMetric("TN", "true negative") case object FP extends ClassificationEvalMetric("FP", "false positive") case object FN extends ClassificationEvalMetric("FN", "false negative") - case object BrierScore extends ClassificationEvalMetric("brierscore", "mean squared error") + case object brierScore extends ClassificationEvalMetric("brierscore", "brier score") } /** @@ -373,6 +373,8 @@ object OpEvaluatorNames extends Enum[OpEvaluatorNames] { case object Binary extends OpEvaluatorNames("binEval", "binary evaluation metics") + case object BinScore extends OpEvaluatorNames("binScoreEval", "bin score evaluation metrics") + case object Multi extends OpEvaluatorNames("multiEval", "multiclass evaluation metics") case object Regression extends OpEvaluatorNames("regEval", "regression evaluation metics") diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelector.scala b/core/src/main/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelector.scala index 2545e951a6..7d44563755 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelector.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/classification/BinaryClassificationModelSelector.scala @@ -152,7 +152,7 @@ case object BinaryClassificationModelSelector { numFolds = numFolds, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism ) selector(cv, splitter = splitter, - trainTestEvaluators = Seq(new OpBinaryClassificationEvaluator) ++ trainTestEvaluators, + trainTestEvaluators = Seq(new OpBinaryClassificationEvaluator, new OpBinScoreEvaluator) ++ trainTestEvaluators, modelTypesToUse = modelTypesToUse, modelsAndParameters = modelsAndParameters) } diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorSummary.scala b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorSummary.scala index ffe62d6406..43b4c470d7 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorSummary.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/selector/ModelSelectorSummary.scala @@ -241,6 +241,8 @@ case object ModelSelectorSummary { nm match { case OpEvaluatorNames.Binary.humanFriendlyName => nm -> JsonUtils.fromString[BinaryClassificationMetrics](valsJson).get + case OpEvaluatorNames.BinScore.humanFriendlyName => + nm -> JsonUtils.fromString[BinaryClassificationBinMetrics](valsJson).get case OpEvaluatorNames.Multi.humanFriendlyName => nm -> JsonUtils.fromString[MultiClassificationMetrics](valsJson).get case OpEvaluatorNames.Regression.humanFriendlyName => @@ -269,11 +271,13 @@ object ProblemType extends Enum[ProblemType] { def fromEvalMetrics(eval: EvaluationMetrics): ProblemType = { eval match { case _: BinaryClassificationMetrics => ProblemType.BinaryClassification + case _: BinaryClassificationBinMetrics => ProblemType.BinaryClassification case _: MultiClassificationMetrics => ProblemType.MultiClassification case _: RegressionMetrics => ProblemType.Regression case m: MultiMetrics => val keys = m.metrics.keySet if (keys.exists(_.contains(OpEvaluatorNames.Binary.humanFriendlyName))) ProblemType.BinaryClassification + else if (keys.exists(_.contains(OpEvaluatorNames.BinScore.humanFriendlyName))) ProblemType.BinaryClassification else if (keys.exists(_.contains(OpEvaluatorNames.Multi.humanFriendlyName))) ProblemType.MultiClassification else if (keys.exists(_.contains(OpEvaluatorNames.Regression.humanFriendlyName))) ProblemType.Regression else ProblemType.Unknown diff --git a/core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala b/core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala index 3c29c26f00..b3471cc855 100644 --- a/core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala +++ b/core/src/test/scala/com/salesforce/op/OpWorkflowTest.scala @@ -375,7 +375,7 @@ class OpWorkflowTest extends FlatSpec with PassengerSparkFixtureTest { val prettySummary = fittedWorkflow.summaryPretty() log.info(prettySummary) prettySummary should include("Selected Model - OpLogisticRegression") - prettySummary should include("area under precision-recall | 1.0 | 0.0") + prettySummary should include("area under precision-recall | 1.0 | 0.0") prettySummary should include("Model Evaluation Metrics") prettySummary should include("Top Model Insights") prettySummary should include("Top Positive Correlations") diff --git a/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala b/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala index dcc12f8f9b..275d68cc46 100644 --- a/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala +++ b/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala @@ -32,10 +32,9 @@ package com.salesforce.op.evaluators import com.salesforce.op.features.types.Prediction import com.salesforce.op.test.TestSparkContext -import org.scalatest.FlatSpec import org.junit.runner.RunWith +import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner - @RunWith(classOf[JUnitRunner]) class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala index 56c916ddcd..ad1307386c 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/selector/ModelSelectorTest.scala @@ -120,7 +120,7 @@ class ModelSelectorTest extends OpEstimatorSpec[Prediction, SelectedModel, Model numFolds = 3, seed = seed, Evaluators.BinaryClassification.auPR(), stratify = false, parallelism = 1), splitter = Option(DataBalancer(sampleFraction = 0.5, seed = 11L)), models = Seq(lr -> Array.empty[ParamMap]), - evaluators = Seq(new OpBinaryClassificationEvaluator) + evaluators = Seq(new OpBinaryClassificationEvaluator, new OpBinScoreEvaluator) ).setInput(feature1, feature2) val expectedResult = Seq( @@ -144,7 +144,7 @@ class ModelSelectorTest extends OpEstimatorSpec[Prediction, SelectedModel, Model validator = validatorCV, splitter = Option(DataBalancer(sampleFraction = 0.5, seed = 11L)), models = Seq(lr -> lrParams, rf -> rfParams), - evaluators = Seq(new OpBinaryClassificationEvaluator) + evaluators = Seq(new OpBinaryClassificationEvaluator, new OpBinScoreEvaluator()) ).setInput(label, features) val model = testEstimator.fit(data) @@ -229,7 +229,7 @@ class ModelSelectorTest extends OpEstimatorSpec[Prediction, SelectedModel, Model validator = validatorCV, splitter = Option(DataBalancer(sampleFraction = 0.5, seed = 11L)), models = Seq(test -> testParams), - evaluators = Seq(new OpBinaryClassificationEvaluator) + evaluators = Seq(new OpBinaryClassificationEvaluator, new OpBinScoreEvaluator()) ).setInput(label, features) val model = testEstimator.fit(data) From ee8924f79a092366e2fec5beed237a5a6d579150 Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Sun, 9 Sep 2018 22:54:06 +0530 Subject: [PATCH 07/16] removing partitioner --- .../op/evaluators/OpBinScoreEvaluator.scala | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala index 383ff287ae..6ade42c3c8 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala @@ -118,22 +118,11 @@ private[op] class OpBinScoreEvaluator } } - def getBinIndex(score: Double): Int = { + private def getBinIndex(score: Double): Int = { math.min(numBins - 1, (score * numBins).toInt) } } - -// BinPartitioner which partition the bins. -class OpBinPartitioner(override val numPartitions: Int) extends Partitioner { - - // computes the bin number(0-indexed) to which the datapoint is assigned. - // For Score 1.0, overflow happens. So, use math.min(last_bin, bin_index__computed). - def getPartition(key: Any): Int = key match { - case score: Double => math.min(numPartitions - 1, (score * numPartitions).toInt) - } -} - /** * Metrics of BinaryClassificationBinMetrics * From 919b933984c642bc16c7be9941fddf27816d414c Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Mon, 10 Sep 2018 15:27:04 +0530 Subject: [PATCH 08/16] code review comments --- .../op/evaluators/OpBinScoreEvaluator.scala | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala index 6ade42c3c8..c3be83c445 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala @@ -83,24 +83,24 @@ private[op] class OpBinScoreEvaluator case Row(prob: Double, label: Double) => (prob, label) } + // Finding stats per bin -> avg score, avg conv rate, + // total num of data points, bin center. val stats = scoreAndLabels.map { case (score, label) => (getBinIndex(score), (score, label, 1L)) }.reduceByKeyLocally(_ + _).map { - case (bin, (scoreSum, labelSum, count)) => (bin, (scoreSum / count, labelSum / count, count)) + case (bin, (scoreSum, labelSum, count)) => + (bin, (scoreSum / count, labelSum / count, count, (bin + 0.5) / numBins)) } - // For the bins, which don't have any scores, fill 0's. + // For the bins, which don't have any scores, fill 0's and bin center. val statsForBins = { - for {i <- 0 to numBins - 1} yield stats.getOrElse(i, (0.0, 0.0, 0L)) + for {i <- 0 to numBins - 1} yield stats.getOrElse(i, (0.0, 0.0, 0L, (i + 0.5) / numBins)) } - val averageScore = statsForBins.map(_._1) - val averageConversionRate = statsForBins.map(_._2) - val numberOfDataPoints = statsForBins.map(_._3) - - // binCenters is the center point in each bin. - // e.g., for bins [(0.0 - 0.5), (0.5 - 1.0)], bin centers are [0.25, 0.75]. - val binCenters = (for {i <- 0 to numBins} yield ((i + 0.5) / numBins)).dropRight(1) + val (averageScore, averageConversionRate, numberOfDataPoints, binCenters) = + statsForBins.foldLeft((Seq.empty[Double], Seq.empty[Double], Seq.empty[Long], Seq.empty[Double])) { + (columns, row) => (columns._1 :+ row._1, columns._2 :+ row._2, columns._3 :+ row._3, columns._4 :+ row._4) + } // brier score of entire dataset. val brierScore = scoreAndLabels.map { case (score, label) => math.pow((score - label), 2) }.mean() @@ -118,7 +118,8 @@ private[op] class OpBinScoreEvaluator } } - private def getBinIndex(score: Double): Int = { + // getBinIndex finds which bin the score associates with. + private def getBinIndex(score: Double): Int = { math.min(numBins - 1, (score * numBins).toInt) } } From b421d33199c78e462fa18c8e3430bd6900c2ffb4 Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Mon, 10 Sep 2018 23:18:03 +0530 Subject: [PATCH 09/16] fixing testbuilder for tests --- .../evaluators/OpBinScoreEvaluatorTest.scala | 63 +++++++++---------- 1 file changed, 31 insertions(+), 32 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala b/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala index 275d68cc46..dc959e3c71 100644 --- a/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala +++ b/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala @@ -31,41 +31,41 @@ package com.salesforce.op.evaluators import com.salesforce.op.features.types.Prediction -import com.salesforce.op.test.TestSparkContext +import com.salesforce.op.features.types._ +import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext} import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner + @RunWith(classOf[JUnitRunner]) class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { - val labelName = "label" - val predictionLabel = "pred" - - val dataset_test = Seq( - (Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)), 1.0), - (Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)), 1.0), - (Prediction(1.0, Array(10.0, 10.0), Array(0.99560, 0.00541)), 0.0), - (Prediction(1.0, Array(10.0, 10.0), Array(0.30, 0.70)), 0.0), - (Prediction(0.0, Array(10.0, 10.0), Array(0.999, 0.001)), 0.0) - ).map(v => (v._1.toDoubleMap, v._2)) - - val dataset_test_df = spark.createDataFrame(dataset_test).toDF(predictionLabel, labelName) - - val dataset_skewed = Seq( - (Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)), 1.0), - (Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)), 1.0), - (Prediction(1.0, Array(10.0, 10.0), Array(0.001, 0.9987)), 1.0), - (Prediction(1.0, Array(10.0, 10.0), Array(0.0541, 0.946)), 1.0) - ).map(v => (v._1.toDoubleMap, v._2)) - - val dataset_skewed_df = spark.createDataFrame(dataset_skewed).toDF(predictionLabel, labelName) - - val emptyDataSet_df = spark.createDataFrame(Seq.empty[(Map[String, Double], Double)]) - .toDF(predictionLabel, labelName) + val (dataset, prediction, label) = TestFeatureBuilder( + Seq ( + Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)) -> 1.0.toRealNN, + Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)) -> 1.0.toRealNN, + Prediction(1.0, Array(10.0, 10.0), Array(0.99560, 0.00541)) -> 0.0.toRealNN, + Prediction(1.0, Array(10.0, 10.0), Array(0.30, 0.70)) -> 0.0.toRealNN, + Prediction(0.0, Array(10.0, 10.0), Array(0.999, 0.001)) -> 0.0.toRealNN + ) + ) + + val (dataset_skewed, prediction_skewed, label_skewed) = TestFeatureBuilder( + Seq ( + Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)) -> 1.0.toRealNN, + Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)) -> 1.0.toRealNN, + Prediction(1.0, Array(10.0, 10.0), Array(0.001, 0.9987)) -> 1.0.toRealNN, + Prediction(1.0, Array(10.0, 10.0), Array(0.0541, 0.946)) -> 1.0.toRealNN + ) + ) + + val (emptyDataSet, prediction_emptydataset, label_empty_dataset) = TestFeatureBuilder[Prediction, RealNN]( + Seq() + ) Spec[OpBinScoreEvaluator] should "return the bin metrics" in { val metrics = new OpBinScoreEvaluator(numBins = 4) - .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(dataset_test_df) + .setLabelCol(label.name).setPredictionCol(prediction.name).evaluateAll(dataset) metrics shouldBe BinaryClassificationBinMetrics( 0.09800605366, @@ -78,20 +78,20 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { it should "error on invalid number of bins" in { assertThrows[IllegalArgumentException] { new OpBinScoreEvaluator(numBins = 0) - .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(dataset_test_df) + .setLabelCol(label.name).setPredictionCol(prediction.name).evaluateAll(dataset) } } it should "evaluate the empty data" in { val metrics = new OpBinScoreEvaluator(numBins = 10) - .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(emptyDataSet_df) + .setLabelCol(label_empty_dataset.name).setPredictionCol(prediction_emptydataset.name).evaluateAll(emptyDataSet) metrics shouldBe BinaryClassificationBinMetrics(0.0, Seq(), Seq(), Seq(), Seq()) } it should "evaluate bin metrics for skewed data" in { val metrics = new OpBinScoreEvaluator(numBins = 5) - .setLabelCol(labelName).setPredictionCol(predictionLabel).evaluateAll(dataset_skewed_df) + .setLabelCol(label_skewed.name).setPredictionCol(prediction_skewed.name).evaluateAll(dataset_skewed) metrics shouldBe BinaryClassificationBinMetrics( 7.294225500000013E-4, @@ -103,9 +103,8 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { it should "evaluate the default metric as BrierScore" in { val evaluator = new OpBinScoreEvaluator(numBins = 4) - .setLabelCol(labelName).setPredictionCol(predictionLabel) + .setLabelCol(label.name).setPredictionCol(prediction.name) - val brierScore = evaluator.getDefaultMetric(evaluator.evaluateAll(dataset_test_df)) - BigDecimal(brierScore).setScale(3, BigDecimal.RoundingMode.HALF_UP).toDouble shouldBe 0.098 + evaluator.getDefaultMetric(evaluator.evaluateAll(dataset)) shouldBe 0.09800605366 } } From 728aa995da3f160500ae527bb2223355785ead7f Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Tue, 11 Sep 2018 23:44:55 +0530 Subject: [PATCH 10/16] computing stats efficiently --- .../op/evaluators/OpBinScoreEvaluator.scala | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala index c3be83c445..afce00dc6a 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala @@ -34,10 +34,11 @@ import com.salesforce.op.UID import org.apache.spark.ml.linalg.Vector import org.apache.spark.sql.{Dataset, Row} import org.slf4j.LoggerFactory -import org.apache.spark.Partitioner import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.DoubleType import com.twitter.algebird.Operators._ +import com.twitter.algebird.Monoid._ +import org.apache.spark.rdd.RDD /** * @@ -72,41 +73,44 @@ private[op] class OpBinScoreEvaluator val labelColumnName = getLabelCol val dataProcessed = makeDataToUse(data, labelColumnName) - import dataProcessed.sparkSession.implicits._ - - if (dataProcessed.select(getPredictionValueCol, labelColumnName).as[(Double, Double)].rdd.isEmpty()) { + val rdd = dataProcessed.select(col(getProbabilityCol), col(labelColumnName).cast(DoubleType)).rdd + if (rdd.isEmpty()) { log.error("The dataset is empty. Returning empty metrics") BinaryClassificationBinMetrics(0.0, Seq(), Seq(), Seq(), Seq()) } else { - val scoreAndLabels = dataProcessed.select(col(getProbabilityCol), col(labelColumnName).cast(DoubleType)).rdd.map { + val scoreAndLabels = rdd.map { case Row(prob: Vector, label: Double) => (prob(1), label) case Row(prob: Double, label: Double) => (prob, label) } // Finding stats per bin -> avg score, avg conv rate, - // total num of data points, bin center. + // total num of data points and overall brier score. val stats = scoreAndLabels.map { - case (score, label) => (getBinIndex(score), (score, label, 1L)) - }.reduceByKeyLocally(_ + _).map { - case (bin, (scoreSum, labelSum, count)) => - (bin, (scoreSum / count, labelSum / count, count, (bin + 0.5) / numBins)) - } + case (score, label) => (getBinIndex(score), (score, label, 1L, math.pow((score - label), 2))) + }.reduceByKey(_ + _).map { + case (bin, (scoreSum, labelSum, count, squaredError)) => + (bin, scoreSum / count, labelSum / count, count, squaredError) + }.collect() - // For the bins, which don't have any scores, fill 0's and bin center. - val statsForBins = { - for {i <- 0 to numBins - 1} yield stats.getOrElse(i, (0.0, 0.0, 0L, (i + 0.5) / numBins)) - } + val (averageScore, averageConversionRate, numberOfDataPoints, brierScoreSum, numberOfPoints) = + stats.foldLeft((new Array[Double](numBins), new Array[Double](numBins), new Array[Long](numBins), 0.0, 0L)) { + (columns, row) => { + val binIndex = row._1 - val (averageScore, averageConversionRate, numberOfDataPoints, binCenters) = - statsForBins.foldLeft((Seq.empty[Double], Seq.empty[Double], Seq.empty[Long], Seq.empty[Double])) { - (columns, row) => (columns._1 :+ row._1, columns._2 :+ row._2, columns._3 :+ row._3, columns._4 :+ row._4) - } + columns._1(binIndex) = row._2 + columns._2(binIndex) = row._3 + columns._3(binIndex) = row._4 + + (columns._1, columns._2, columns._3, columns._4 + row._5, columns._5 + row._4) + } + } - // brier score of entire dataset. - val brierScore = scoreAndLabels.map { case (score, label) => math.pow((score - label), 2) }.mean() + // binCenters is the center point in each bin. + // e.g., for bins [(0.0 - 0.5), (0.5 - 1.0)], bin centers are [0.25, 0.75]. + val binCenters = (for {i <- 0 to numBins} yield ((i + 0.5) / numBins)).dropRight(1) val metrics = BinaryClassificationBinMetrics( - brierScore = brierScore, + brierScore = brierScoreSum / numberOfPoints, binCenters = binCenters, numberOfDataPoints = numberOfDataPoints, averageScore = averageScore, From db653bf5342d93d88ff4d91270211a862e8dfb88 Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Wed, 12 Sep 2018 10:28:03 +0530 Subject: [PATCH 11/16] unfolding the tuples --- .../op/evaluators/OpBinScoreEvaluator.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala index afce00dc6a..d82992e53c 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala @@ -94,14 +94,14 @@ private[op] class OpBinScoreEvaluator val (averageScore, averageConversionRate, numberOfDataPoints, brierScoreSum, numberOfPoints) = stats.foldLeft((new Array[Double](numBins), new Array[Double](numBins), new Array[Long](numBins), 0.0, 0L)) { - (columns, row) => { - val binIndex = row._1 + case ((score, convRate, dataPoints, brierScoreSum, totalPoints), + (binIndex, avgScore, avgConvRate, counts, squaredError)) => { - columns._1(binIndex) = row._2 - columns._2(binIndex) = row._3 - columns._3(binIndex) = row._4 + score(binIndex) = avgScore + convRate(binIndex) = avgConvRate + dataPoints(binIndex) = counts - (columns._1, columns._2, columns._3, columns._4 + row._5, columns._5 + row._4) + (score, convRate, dataPoints, brierScoreSum + squaredError, totalPoints + counts) } } From 6ee1a82a8d97ad41e182b07ee72c4de83a165d20 Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Wed, 12 Sep 2018 10:40:04 +0530 Subject: [PATCH 12/16] code review comments --- .../op/evaluators/OpBinScoreEvaluatorTest.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala b/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala index dc959e3c71..2c06458f74 100644 --- a/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala +++ b/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala @@ -50,7 +50,7 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { ) ) - val (dataset_skewed, prediction_skewed, label_skewed) = TestFeatureBuilder( + val (dataSkewed, predictionSkewedData, labelSkewedData) = TestFeatureBuilder( Seq ( Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)) -> 1.0.toRealNN, Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 0.99999)) -> 1.0.toRealNN, @@ -59,9 +59,7 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { ) ) - val (emptyDataSet, prediction_emptydataset, label_empty_dataset) = TestFeatureBuilder[Prediction, RealNN]( - Seq() - ) + val (emptyData, predictionEmptyData, labelEmptyData) = TestFeatureBuilder[Prediction, RealNN](Seq()) Spec[OpBinScoreEvaluator] should "return the bin metrics" in { val metrics = new OpBinScoreEvaluator(numBins = 4) @@ -84,14 +82,14 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { it should "evaluate the empty data" in { val metrics = new OpBinScoreEvaluator(numBins = 10) - .setLabelCol(label_empty_dataset.name).setPredictionCol(prediction_emptydataset.name).evaluateAll(emptyDataSet) + .setLabelCol(labelEmptyData.name).setPredictionCol(predictionEmptyData.name).evaluateAll(emptyData) metrics shouldBe BinaryClassificationBinMetrics(0.0, Seq(), Seq(), Seq(), Seq()) } it should "evaluate bin metrics for skewed data" in { val metrics = new OpBinScoreEvaluator(numBins = 5) - .setLabelCol(label_skewed.name).setPredictionCol(prediction_skewed.name).evaluateAll(dataset_skewed) + .setLabelCol(labelSkewedData.name).setPredictionCol(predictionSkewedData.name).evaluateAll(dataSkewed) metrics shouldBe BinaryClassificationBinMetrics( 7.294225500000013E-4, From 4b3225073024e247f884aad3e5c871815ea07443 Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Wed, 12 Sep 2018 23:44:47 +0530 Subject: [PATCH 13/16] Fixing the stats when score goes out of bound --- .../op/evaluators/OpBinScoreEvaluator.scala | 3 ++- .../evaluators/OpBinScoreEvaluatorTest.scala | 20 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala index d82992e53c..29dc3b94c7 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala @@ -124,7 +124,8 @@ private[op] class OpBinScoreEvaluator // getBinIndex finds which bin the score associates with. private def getBinIndex(score: Double): Int = { - math.min(numBins - 1, (score * numBins).toInt) + val binIndex = math.min(numBins - 1, (score * numBins).toInt) + math.max(binIndex, 0) // if score is negative, assign it to bin 0. } } diff --git a/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala b/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala index 2c06458f74..f6556228cc 100644 --- a/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala +++ b/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala @@ -61,6 +61,13 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { val (emptyData, predictionEmptyData, labelEmptyData) = TestFeatureBuilder[Prediction, RealNN](Seq()) + val (outOfBoundScoreDataset, outOfBoundScoreprediction, outOfBoundScorelabel) = TestFeatureBuilder( + Seq ( + Prediction(1.0, Array(10.0, 10.0), Array(0.0001, -0.99999)) -> 1.0.toRealNN, + Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 1.99999)) -> 1.0.toRealNN + ) + ) + Spec[OpBinScoreEvaluator] should "return the bin metrics" in { val metrics = new OpBinScoreEvaluator(numBins = 4) .setLabelCol(label.name).setPredictionCol(prediction.name).evaluateAll(dataset) @@ -73,6 +80,19 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { Seq(0.0, 0.0, 0.0, 1.0)) } + it should "evaluate bin metrics for scores not between 0 and 1" in { + val metrics = new OpBinScoreEvaluator(numBins = 4) + .setLabelCol(outOfBoundScorelabel.name).setPredictionCol(outOfBoundScoreprediction.name) + .evaluateAll(outOfBoundScoreDataset) + + metrics shouldBe BinaryClassificationBinMetrics( + 2.4999700001, + Seq(0.125, 0.375, 0.625, 0.875), + Seq(1, 0, 0, 1), + Seq(-0.99999, 0.0, 0.0, 1.99999), + Seq(1.0, 0.0, 0.0, 1.0)) + } + it should "error on invalid number of bins" in { assertThrows[IllegalArgumentException] { new OpBinScoreEvaluator(numBins = 0) From d928ba7b844cbdbb80e7ba6a70cd4c2aa8d5f72f Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Thu, 13 Sep 2018 16:06:31 +0530 Subject: [PATCH 14/16] fixing an issue where prediction column is missing --- .../op/evaluators/OpBinScoreEvaluator.scala | 15 ++++++++++----- .../op/evaluators/OpBinScoreEvaluatorTest.scala | 17 +++++++++-------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala index 29dc3b94c7..3f03c15755 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala @@ -83,10 +83,14 @@ private[op] class OpBinScoreEvaluator case Row(prob: Double, label: Double) => (prob, label) } + val minScore = math.min(0.0, scoreAndLabels.keys.min()) + val maxScore = math.max(1.0, scoreAndLabels.keys.max()) + // Finding stats per bin -> avg score, avg conv rate, // total num of data points and overall brier score. val stats = scoreAndLabels.map { - case (score, label) => (getBinIndex(score), (score, label, 1L, math.pow((score - label), 2))) + case (score, label) => + (getBinIndex(score, minScore, maxScore), (score, label, 1L, math.pow((score - label), 2))) }.reduceByKey(_ + _).map { case (bin, (scoreSum, labelSum, count, squaredError)) => (bin, scoreSum / count, labelSum / count, count, squaredError) @@ -107,7 +111,8 @@ private[op] class OpBinScoreEvaluator // binCenters is the center point in each bin. // e.g., for bins [(0.0 - 0.5), (0.5 - 1.0)], bin centers are [0.25, 0.75]. - val binCenters = (for {i <- 0 to numBins} yield ((i + 0.5) / numBins)).dropRight(1) + val diff = maxScore - minScore + val binCenters = (for {i <- 0 to numBins-1} yield (minScore + ((diff * i) / numBins) + (diff / (2 * numBins)))) val metrics = BinaryClassificationBinMetrics( brierScore = brierScoreSum / numberOfPoints, @@ -123,9 +128,9 @@ private[op] class OpBinScoreEvaluator } // getBinIndex finds which bin the score associates with. - private def getBinIndex(score: Double): Int = { - val binIndex = math.min(numBins - 1, (score * numBins).toInt) - math.max(binIndex, 0) // if score is negative, assign it to bin 0. + private def getBinIndex(score: Double, minScore: Double, maxScore: Double): Int = { + val binIndex = (numBins * (score - minScore) / (maxScore - minScore)).toInt + math.min(numBins - 1, binIndex) } } diff --git a/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala b/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala index f6556228cc..acf8182bdd 100644 --- a/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala +++ b/core/src/test/scala/com/salesforce/op/evaluators/OpBinScoreEvaluatorTest.scala @@ -63,8 +63,9 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { val (outOfBoundScoreDataset, outOfBoundScoreprediction, outOfBoundScorelabel) = TestFeatureBuilder( Seq ( - Prediction(1.0, Array(10.0, 10.0), Array(0.0001, -0.99999)) -> 1.0.toRealNN, - Prediction(1.0, Array(10.0, 10.0), Array(0.0001, 1.99999)) -> 1.0.toRealNN + Prediction(1.0, Array(0.0001, -0.99999), Array.emptyDoubleArray) -> 0.0.toRealNN, + Prediction(1.0, Array(0.0001, 1.99999), Array.emptyDoubleArray) -> 1.0.toRealNN, + Prediction(1.0, Array(0.0001, 12.0), Array.emptyDoubleArray) -> 1.0.toRealNN ) ) @@ -86,11 +87,11 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { .evaluateAll(outOfBoundScoreDataset) metrics shouldBe BinaryClassificationBinMetrics( - 2.4999700001, - Seq(0.125, 0.375, 0.625, 0.875), - Seq(1, 0, 0, 1), - Seq(-0.99999, 0.0, 0.0, 1.99999), - Seq(1.0, 0.0, 0.0, 1.0)) + 40.999986666733335, + Seq(0.62500875, 3.87500625, 7.125003749999999, 10.37500125), + Seq(2, 0, 0, 1), + Seq(0.49999999999999994, 0.0, 0.0, 12.0), + Seq(0.5, 0.0, 0.0, 1.0)) } it should "error on invalid number of bins" in { @@ -113,7 +114,7 @@ class OpBinScoreEvaluatorTest extends FlatSpec with TestSparkContext { metrics shouldBe BinaryClassificationBinMetrics( 7.294225500000013E-4, - Seq(0.1, 0.3, 0.5, 0.7, 0.9), + Seq(0.1, 0.30000000000000004, 0.5, 0.7, 0.9), Seq(0, 0, 0, 0, 4), Seq(0.0, 0.0, 0.0, 0.0, 0.98617), Seq(0.0, 0.0, 0.0, 0.0, 1.0)) From b36ec1d44684be8b308c5c42111a5290cb1da305 Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Fri, 14 Sep 2018 07:12:24 +0530 Subject: [PATCH 15/16] finding max and min using foldleft --- .../salesforce/op/evaluators/OpBinScoreEvaluator.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala index 3f03c15755..8012cb9905 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala @@ -83,8 +83,14 @@ private[op] class OpBinScoreEvaluator case Row(prob: Double, label: Double) => (prob, label) } - val minScore = math.min(0.0, scoreAndLabels.keys.min()) - val maxScore = math.max(1.0, scoreAndLabels.keys.max()) + val (minScore, maxScore) = scoreAndLabels.keys.collect().foldLeft(0.0, 1.0) { + case((minVal, maxVal), (scores)) => { + val min = math.min(minVal, scores) + val max = math.max(maxVal, scores) + + (math.min(0.0, min), math.max(1.0, max)) + } + } // Finding stats per bin -> avg score, avg conv rate, // total num of data points and overall brier score. From 01250d27f390d379a753928bda431cf2a85aef27 Mon Sep 17 00:00:00 2001 From: Gokulakrishnan <7522696+gokulsfdc@users.noreply.github.com> Date: Fri, 14 Sep 2018 16:19:23 +0530 Subject: [PATCH 16/16] Finding min max using map-fold --- .../op/evaluators/OpBinScoreEvaluator.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala index 8012cb9905..5be7948180 100644 --- a/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala +++ b/core/src/main/scala/com/salesforce/op/evaluators/OpBinScoreEvaluator.scala @@ -83,12 +83,11 @@ private[op] class OpBinScoreEvaluator case Row(prob: Double, label: Double) => (prob, label) } - val (minScore, maxScore) = scoreAndLabels.keys.collect().foldLeft(0.0, 1.0) { - case((minVal, maxVal), (scores)) => { - val min = math.min(minVal, scores) - val max = math.max(maxVal, scores) - - (math.min(0.0, min), math.max(1.0, max)) + val (maxScore, minScore) = scoreAndLabels.map { + case (score , _) => (score, score) + }.fold(1.0, 0.0) { + case((maxVal, minVal), (scoreMax, scoreMin)) => { + (math.max(maxVal, scoreMax), math.min(minVal, scoreMin)) } }