diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala index cca1d5d1c8..7bcf02f969 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCO.scala @@ -30,24 +30,22 @@ package com.salesforce.op.stages.impl.insights -import com.salesforce.op.{FeatureInsights, UID} +import com.salesforce.op.UID import com.salesforce.op.features.types._ import com.salesforce.op.stages.base.unary.UnaryTransformer -import com.salesforce.op.stages.impl.feature.{DateToUnitCircle, TimePeriod} +import com.salesforce.op.stages.impl.feature.TimePeriod import com.salesforce.op.stages.impl.selector.SelectedModel import com.salesforce.op.stages.sparkwrappers.specific.OpPredictorWrapperModel import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._ +import com.salesforce.op.utils.spark.RichVector.RichSparseVector import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorMetadata} import enumeratum.{Enum, EnumEntry} import org.apache.spark.annotation.Experimental import org.apache.spark.ml.Model -import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.linalg.SparseVector import org.apache.spark.ml.param.{IntParam, Param, Params} import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import scala.reflect.runtime.universe._ trait RecordInsightsLOCOParams extends Params { @@ -115,35 +113,29 @@ class RecordInsightsLOCO[T <: Model[T]] private val dateMapTypes = Set(FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap]) - // Indices of features derived from Text(Map)Vectorizer - private lazy val textFeatureIndices = getIndicesOfFeatureType(textTypes ++ textMapTypes) + // Indices of features derived from hashed Text(Map)Vectorizer + private lazy val textFeatureIndices: Seq[Int] = getIndicesOfFeatureType(textTypes ++ textMapTypes, + h => h.indicatorValue.isEmpty && h.descriptorValue.isEmpty) - // Indices of features derived from Date(Map)Vectorizer - private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes) + // Indices of features derived from unit Date(Map)Vectorizer + private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes, _.descriptorValue.isDefined) /** * Return the indices of features derived from given types. * @return Seq[Int] */ - private def getIndicesOfFeatureType (types: Set[String]): Seq[Int] = histories - .filter(_.parentFeatureType.exists(types.contains)) - .map(_.index) - .distinct.sorted + private def getIndicesOfFeatureType(types: Set[String], predicate: OpVectorColumnHistory => Boolean): Seq[Int] = + histories.collect { + case h if h.parentFeatureType.exists(types.contains) && predicate(h) => h.index + }.distinct.sorted - private def computeDiffs + private def computeDiff ( - i: Int, - oldInd: Int, - oldVal: Double, - featureArray: Array[(Int, Double)], - featureSize: Int, + featureSparse: SparseVector, baseScore: Array[Double] ): Array[Double] = { - featureArray.update(i, (oldInd, 0.0)) - val score = modelApply(labelDummy, Vectors.sparse(featureSize, featureArray).toOPVector).score - val diffs = baseScore.zip(score).map { case (b, s) => b - s } - featureArray.update(i, (oldInd, oldVal)) - diffs + val score = modelApply(labelDummy, featureSparse.toOPVector).score + (baseScore, score).zipped.map { case (b, s) => b - s } } private def sumArrays(left: Array[Double], right: Array[Double]): Array[Double] = { @@ -158,70 +150,100 @@ class RecordInsightsLOCO[T <: Model[T]] private def convertToTimePeriod(descriptorValue: String): Option[TimePeriod] = descriptorValue.split("_").lastOption.flatMap(TimePeriod.withNameInsensitiveOption) - private def getRawFeatureName(history: OpVectorColumnHistory): Option[String] = history.grouping match { - case Some(grouping) => history.parentFeatureOrigins.headOption.map(_ + "_" + grouping) - case None => history.parentFeatureOrigins.headOption + private def getRawFeatureName(history: OpVectorColumnHistory): Option[String] = { + val groupSuffix = history.grouping.map("_" + _).getOrElse("") + val name = history.parentFeatureOrigins.headOption.map(_ + groupSuffix) + + // If the descriptor value of a derived feature exists, then we check if it is + // from unit circle transformer. We aggregate such features for each (rawFeatureName, timePeriod). + // TODO : Filter by parentStage (DateToUnitCircleTransformer & DateToUnitCircleVectorizer) once the bug in the + // feature history after multiple transformations has been fixed + name.map { n => + val timePeriodName = if ((dateTypes ++ dateMapTypes).exists(history.parentFeatureType.contains)) { + history.descriptorValue + .flatMap(convertToTimePeriod) + .map(p => "_" + p.entryName) + } else None + n + timePeriodName.getOrElse("") + } } private def returnTopPosNeg ( - featureArray: Array[(Int, Double)], + featureSparse: SparseVector, + zeroCountByFeature: Map[String, Int], featureSize: Int, baseScore: Array[Double], k: Int, indexToExamine: Int ): Seq[LOCOValue] = { - val minMaxHeap = new MinMaxHeap(k) val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])] - for {i <- featureArray.indices} { - val (oldInd, oldVal) = featureArray(i) - val diffToExamine = computeDiffs(i, oldInd, oldVal, featureArray, featureSize, baseScore) - val history = histories(oldInd) + agggregateDiffs(featureSparse, indexToExamine, minMaxHeap, aggregationMap, + baseScore) + + // Aggregation map contains aggregation of Unit Circle Dates and Hashed Text Features + // Adding LOCO results from aggregation map into heaps + for {(name, (indices, ar)) <- aggregationMap} { + // The index here is arbitrary + val (i, n) = (indices.head, indices.length) + val zeroCounts = zeroCountByFeature.get(name).getOrElse(0) + val diffToExamine = ar.map(_ / (n + zeroCounts)) + minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) + } + + minMaxHeap.dequeueAll + } + + private def agggregateDiffs( + featureVec: SparseVector, + indexToExamine: Int, + minMaxHeap: MinMaxHeap, + aggregationMap: mutable.Map[String, (Array[Int], Array[Double])], + baseScore: Array[Double] + ): Unit = { + computeDiffs(featureVec, baseScore).foreach { case (i, oldInd, diffToExamine) => + val history = histories(oldInd) history match { - // If indicator value and descriptor value of a derived text feature are empty, then it is likely - // to be a hashing tf output. We aggregate such features for each (rawFeatureName). - case h if h.indicatorValue.isEmpty && h.descriptorValue.isEmpty && textFeatureIndices.contains(oldInd) => + // If indicator value and descriptor value of a derived text feature are empty, then it is + // a hashing tf output. We aggregate such features for each (rawFeatureName). + case h if (textFeatureIndices ++ dateFeatureIndices).contains(oldInd) => for {name <- getRawFeatureName(h)} { val (indices, array) = aggregationMap.getOrElse(name, (Array.empty[Int], Array.empty[Double])) aggregationMap.update(name, (indices :+ i, sumArrays(array, diffToExamine))) } - // If the descriptor value of a derived date feature exists, then it is likely to be - // from unit circle transformer. We aggregate such features for each (rawFeatureName, timePeriod). - case h if h.descriptorValue.isDefined && dateFeatureIndices.contains(oldInd) => - for {name <- getRawFeatureName(h)} { - val key = name + h.descriptorValue.flatMap(convertToTimePeriod).map(p => "_" + p.entryName).getOrElse("") - val (indices, array) = aggregationMap.getOrElse(key, (Array.empty[Int], Array.empty[Double])) - aggregationMap.update(key, (indices :+ i, sumArrays(array, diffToExamine))) - } case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) } } + } - // Adding LOCO results from aggregation map into heaps - for {(indices, ar) <- aggregationMap.values} { - // The index here is arbitrary - val (i, n) = (indices.head, indices.length) - val diffToExamine = ar.map(_ / n) - minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) + private def computeDiffs( + featureVec: SparseVector, + baseScore: Array[Double] + ) = { + (0 until featureVec.size, featureVec.indices).zipped.map { case (i, oldInd) => + (i, oldInd, computeDiff(featureVec.copy.updated(i, oldInd, 0.0), baseScore)) } - - minMaxHeap.dequeueAll } override def transformFn: OPVector => TextMap = features => { val baseResult = modelApply(labelDummy, features) val baseScore = baseResult.score + val featureSize = features.value.size // TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros val featuresSparse = features.value.toSparse - val res = ArrayBuffer.empty[(Int, Double)] - featuresSparse.foreachActive((i, v) => res += i -> v) + val featureIndexSet = featuresSparse.indices.toSet + // Besides non 0 values, we want to check the text/date features as well - (textFeatureIndices ++ dateFeatureIndices).foreach(i => if (!featuresSparse.indices.contains(i)) res += i -> 0.0) - val featureArray = res.toArray - val featureSize = featuresSparse.size + val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices) + .filterNot(featureIndexSet.contains) + + // Count zeros by feature name + val zeroCountByFeature = zeroValIndices + .groupBy(i => getRawFeatureName(histories(i)).get) + .mapValues(_.length).view.toMap val k = $(topK) // Index where to examine the difference in the prediction vector @@ -232,15 +254,16 @@ class RecordInsightsLOCO[T <: Model[T]] // For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability) case n if n > 2 => baseResult.prediction.toInt } - val topPosNeg = returnTopPosNeg(featureArray, featureSize, baseScore, k, indexToExamine) + val topPosNeg = returnTopPosNeg(featuresSparse, zeroCountByFeature, featureSize, baseScore, k, indexToExamine) val top = getTopKStrategy match { case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k) // Take top K positive and top K negative LOCOs, hence 2 * K case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k) } + val allIndices = featuresSparse.indices ++ zeroValIndices top.map { case LOCOValue(i, _, diffs) => - RecordInsightsParser.insightToText(featureInfo(featureArray(i)._1), diffs) + RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs) }.toMap.toTextMap } diff --git a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala index 4ca55a7ad2..0e7f8d4fea 100644 --- a/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala +++ b/core/src/test/scala/com/salesforce/op/stages/impl/insights/RecordInsightsLOCOTest.scala @@ -30,11 +30,9 @@ package com.salesforce.op.stages.impl.insights -import com.salesforce.op.{FeatureHistory, OpWorkflow} +import com.salesforce.op.features.FeatureLike import com.salesforce.op.features.types._ import com.salesforce.op.stages.impl.classification.{OpLogisticRegression, OpRandomForestClassifier} -import com.salesforce.op._ -import com.salesforce.op.features.FeatureLike import com.salesforce.op.stages.impl.feature.{DateListPivot, TransmogrifierDefaults} import com.salesforce.op.stages.impl.insights.RecordInsightsParser.Insights import com.salesforce.op.stages.impl.preparators.{SanityCheckDataTest, SanityChecker} @@ -45,17 +43,18 @@ import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext} import com.salesforce.op.testkit.{RandomIntegral, RandomMap, RandomReal, RandomText, RandomVector} import com.salesforce.op.utils.spark.RichDataset._ import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorColumnMetadata, OpVectorMetadata} -import org.apache.spark.ml.{Model, PredictionModel} +import com.salesforce.op.{FeatureHistory, OpWorkflow, _} +import org.apache.spark.ml.PredictionModel import org.apache.spark.ml.classification.LogisticRegressionModel +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.regression.LinearRegressionModel import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.{DataFrame, Encoder, Row} import org.apache.spark.sql.functions.monotonically_increasing_id import org.apache.spark.sql.types.StructType -import org.apache.spark.ml.linalg._ +import org.apache.spark.sql.{DataFrame, Encoder, Row} import org.junit.runner.RunWith -import org.scalatest.{FlatSpec, Suite} import org.scalatest.junit.JUnitRunner +import org.scalatest.{FlatSpec, Suite} @RunWith(classOf[JUnitRunner]) @@ -307,8 +306,9 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI } withClue("SmartTextVectorizer detects country feature as a PickList, hence no " + "aggregation required for LOCO on this field.") { - testData.actualRecordInsights.foreach(p => assert(p.keys.exists(r => - r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined))) + testData.actualRecordInsights.foreach { p => + assert(p.keys.exists(r => r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined)) + } } assertLOCOSum(testData.actualRecordInsights) @@ -624,4 +624,4 @@ case class RecordInsightsTestData[M <: PredictionModel[Vector, M]] label: FeatureLike[RealNN], sparkModel: OpPredictorWrapperModel[M], actualRecordInsights: Array[Map[OpVectorColumnHistory, Insights]] -) \ No newline at end of file +) diff --git a/features/src/main/scala/com/salesforce/op/utils/spark/RichVector.scala b/features/src/main/scala/com/salesforce/op/utils/spark/RichVector.scala index 8adfb2dc50..7d135b8339 100644 --- a/features/src/main/scala/com/salesforce/op/utils/spark/RichVector.scala +++ b/features/src/main/scala/com/salesforce/op/utils/spark/RichVector.scala @@ -133,4 +133,12 @@ object RichVector { Vectors.sparse(size, indices.toArray, values.toArray).compressed } + implicit class RichSparseVector(val v: SparseVector) extends AnyVal { + def updated(index: Int, indexVal: Int, value: Double): SparseVector = { + require(v.indices(index) == indexVal, + s"Invalid index: indices($index)==${v.indices(index)}, expected: $indexVal") + v.values(index) = value + v + } + } }