Skip to content

Commit

Permalink
Avoid creating SparseVectors for LOCO (#377)
Browse files Browse the repository at this point in the history
* WIP

* code check fixes

* try sparse vector clone for test fix

* WIP

* code check fixes

* try sparse vector clone for test fix

* better names

* Skip Zeros when computing LOCOs for Text and Date fields

* Fix Scalastyle

* minor fixes

* zipped

* redundant toArray

* Updating comments + adding extra checks

* Adding TODO
  • Loading branch information
gerashegalov authored and leahmcguire committed Aug 21, 2019
1 parent dc64b4f commit 16ea717
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,22 @@

package com.salesforce.op.stages.impl.insights

import com.salesforce.op.{FeatureInsights, UID}
import com.salesforce.op.UID
import com.salesforce.op.features.types._
import com.salesforce.op.stages.base.unary.UnaryTransformer
import com.salesforce.op.stages.impl.feature.{DateToUnitCircle, TimePeriod}
import com.salesforce.op.stages.impl.feature.TimePeriod
import com.salesforce.op.stages.impl.selector.SelectedModel
import com.salesforce.op.stages.sparkwrappers.specific.OpPredictorWrapperModel
import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._
import com.salesforce.op.utils.spark.RichVector.RichSparseVector
import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorMetadata}
import enumeratum.{Enum, EnumEntry}
import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.Model
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.ml.param.{IntParam, Param, Params}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import scala.reflect.runtime.universe._


trait RecordInsightsLOCOParams extends Params {
Expand Down Expand Up @@ -115,35 +113,29 @@ class RecordInsightsLOCO[T <: Model[T]]
private val dateMapTypes =
Set(FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap])

// Indices of features derived from Text(Map)Vectorizer
private lazy val textFeatureIndices = getIndicesOfFeatureType(textTypes ++ textMapTypes)
// Indices of features derived from hashed Text(Map)Vectorizer
private lazy val textFeatureIndices: Seq[Int] = getIndicesOfFeatureType(textTypes ++ textMapTypes,
h => h.indicatorValue.isEmpty && h.descriptorValue.isEmpty)

// Indices of features derived from Date(Map)Vectorizer
private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes)
// Indices of features derived from unit Date(Map)Vectorizer
private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes, _.descriptorValue.isDefined)

/**
* Return the indices of features derived from given types.
* @return Seq[Int]
*/
private def getIndicesOfFeatureType (types: Set[String]): Seq[Int] = histories
.filter(_.parentFeatureType.exists(types.contains))
.map(_.index)
.distinct.sorted
private def getIndicesOfFeatureType(types: Set[String], predicate: OpVectorColumnHistory => Boolean): Seq[Int] =
histories.collect {
case h if h.parentFeatureType.exists(types.contains) && predicate(h) => h.index
}.distinct.sorted

private def computeDiffs
private def computeDiff
(
i: Int,
oldInd: Int,
oldVal: Double,
featureArray: Array[(Int, Double)],
featureSize: Int,
featureSparse: SparseVector,
baseScore: Array[Double]
): Array[Double] = {
featureArray.update(i, (oldInd, 0.0))
val score = modelApply(labelDummy, Vectors.sparse(featureSize, featureArray).toOPVector).score
val diffs = baseScore.zip(score).map { case (b, s) => b - s }
featureArray.update(i, (oldInd, oldVal))
diffs
val score = modelApply(labelDummy, featureSparse.toOPVector).score
(baseScore, score).zipped.map { case (b, s) => b - s }
}

private def sumArrays(left: Array[Double], right: Array[Double]): Array[Double] = {
Expand All @@ -158,70 +150,100 @@ class RecordInsightsLOCO[T <: Model[T]]
private def convertToTimePeriod(descriptorValue: String): Option[TimePeriod] =
descriptorValue.split("_").lastOption.flatMap(TimePeriod.withNameInsensitiveOption)

private def getRawFeatureName(history: OpVectorColumnHistory): Option[String] = history.grouping match {
case Some(grouping) => history.parentFeatureOrigins.headOption.map(_ + "_" + grouping)
case None => history.parentFeatureOrigins.headOption
private def getRawFeatureName(history: OpVectorColumnHistory): Option[String] = {
val groupSuffix = history.grouping.map("_" + _).getOrElse("")
val name = history.parentFeatureOrigins.headOption.map(_ + groupSuffix)

// If the descriptor value of a derived feature exists, then we check if it is
// from unit circle transformer. We aggregate such features for each (rawFeatureName, timePeriod).
// TODO : Filter by parentStage (DateToUnitCircleTransformer & DateToUnitCircleVectorizer) once the bug in the
// feature history after multiple transformations has been fixed
name.map { n =>
val timePeriodName = if ((dateTypes ++ dateMapTypes).exists(history.parentFeatureType.contains)) {
history.descriptorValue
.flatMap(convertToTimePeriod)
.map(p => "_" + p.entryName)
} else None
n + timePeriodName.getOrElse("")
}
}

private def returnTopPosNeg
(
featureArray: Array[(Int, Double)],
featureSparse: SparseVector,
zeroCountByFeature: Map[String, Int],
featureSize: Int,
baseScore: Array[Double],
k: Int,
indexToExamine: Int
): Seq[LOCOValue] = {

val minMaxHeap = new MinMaxHeap(k)
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])]
for {i <- featureArray.indices} {
val (oldInd, oldVal) = featureArray(i)
val diffToExamine = computeDiffs(i, oldInd, oldVal, featureArray, featureSize, baseScore)
val history = histories(oldInd)

agggregateDiffs(featureSparse, indexToExamine, minMaxHeap, aggregationMap,
baseScore)

// Aggregation map contains aggregation of Unit Circle Dates and Hashed Text Features
// Adding LOCO results from aggregation map into heaps
for {(name, (indices, ar)) <- aggregationMap} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val zeroCounts = zeroCountByFeature.get(name).getOrElse(0)
val diffToExamine = ar.map(_ / (n + zeroCounts))
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}

private def agggregateDiffs(
featureVec: SparseVector,
indexToExamine: Int,
minMaxHeap: MinMaxHeap,
aggregationMap: mutable.Map[String, (Array[Int], Array[Double])],
baseScore: Array[Double]
): Unit = {
computeDiffs(featureVec, baseScore).foreach { case (i, oldInd, diffToExamine) =>
val history = histories(oldInd)
history match {
// If indicator value and descriptor value of a derived text feature are empty, then it is likely
// to be a hashing tf output. We aggregate such features for each (rawFeatureName).
case h if h.indicatorValue.isEmpty && h.descriptorValue.isEmpty && textFeatureIndices.contains(oldInd) =>
// If indicator value and descriptor value of a derived text feature are empty, then it is
// a hashing tf output. We aggregate such features for each (rawFeatureName).
case h if (textFeatureIndices ++ dateFeatureIndices).contains(oldInd) =>
for {name <- getRawFeatureName(h)} {
val (indices, array) = aggregationMap.getOrElse(name, (Array.empty[Int], Array.empty[Double]))
aggregationMap.update(name, (indices :+ i, sumArrays(array, diffToExamine)))
}
// If the descriptor value of a derived date feature exists, then it is likely to be
// from unit circle transformer. We aggregate such features for each (rawFeatureName, timePeriod).
case h if h.descriptorValue.isDefined && dateFeatureIndices.contains(oldInd) =>
for {name <- getRawFeatureName(h)} {
val key = name + h.descriptorValue.flatMap(convertToTimePeriod).map(p => "_" + p.entryName).getOrElse("")
val (indices, array) = aggregationMap.getOrElse(key, (Array.empty[Int], Array.empty[Double]))
aggregationMap.update(key, (indices :+ i, sumArrays(array, diffToExamine)))
}
case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
}
}

// Adding LOCO results from aggregation map into heaps
for {(indices, ar) <- aggregationMap.values} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val diffToExamine = ar.map(_ / n)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
private def computeDiffs(
featureVec: SparseVector,
baseScore: Array[Double]
) = {
(0 until featureVec.size, featureVec.indices).zipped.map { case (i, oldInd) =>
(i, oldInd, computeDiff(featureVec.copy.updated(i, oldInd, 0.0), baseScore))
}

minMaxHeap.dequeueAll
}

override def transformFn: OPVector => TextMap = features => {
val baseResult = modelApply(labelDummy, features)
val baseScore = baseResult.score
val featureSize = features.value.size

// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros
val featuresSparse = features.value.toSparse
val res = ArrayBuffer.empty[(Int, Double)]
featuresSparse.foreachActive((i, v) => res += i -> v)
val featureIndexSet = featuresSparse.indices.toSet

// Besides non 0 values, we want to check the text/date features as well
(textFeatureIndices ++ dateFeatureIndices).foreach(i => if (!featuresSparse.indices.contains(i)) res += i -> 0.0)
val featureArray = res.toArray
val featureSize = featuresSparse.size
val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices)
.filterNot(featureIndexSet.contains)

// Count zeros by feature name
val zeroCountByFeature = zeroValIndices
.groupBy(i => getRawFeatureName(histories(i)).get)
.mapValues(_.length).view.toMap

val k = $(topK)
// Index where to examine the difference in the prediction vector
Expand All @@ -232,15 +254,16 @@ class RecordInsightsLOCO[T <: Model[T]]
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability)
case n if n > 2 => baseResult.prediction.toInt
}
val topPosNeg = returnTopPosNeg(featureArray, featureSize, baseScore, k, indexToExamine)
val topPosNeg = returnTopPosNeg(featuresSparse, zeroCountByFeature, featureSize, baseScore, k, indexToExamine)
val top = getTopKStrategy match {
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k)
// Take top K positive and top K negative LOCOs, hence 2 * K
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k)
}

val allIndices = featuresSparse.indices ++ zeroValIndices
top.map { case LOCOValue(i, _, diffs) =>
RecordInsightsParser.insightToText(featureInfo(featureArray(i)._1), diffs)
RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs)
}.toMap.toTextMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@

package com.salesforce.op.stages.impl.insights

import com.salesforce.op.{FeatureHistory, OpWorkflow}
import com.salesforce.op.features.FeatureLike
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.classification.{OpLogisticRegression, OpRandomForestClassifier}
import com.salesforce.op._
import com.salesforce.op.features.FeatureLike
import com.salesforce.op.stages.impl.feature.{DateListPivot, TransmogrifierDefaults}
import com.salesforce.op.stages.impl.insights.RecordInsightsParser.Insights
import com.salesforce.op.stages.impl.preparators.{SanityCheckDataTest, SanityChecker}
Expand All @@ -45,17 +43,18 @@ import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext}
import com.salesforce.op.testkit.{RandomIntegral, RandomMap, RandomReal, RandomText, RandomVector}
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorColumnMetadata, OpVectorMetadata}
import org.apache.spark.ml.{Model, PredictionModel}
import com.salesforce.op.{FeatureHistory, OpWorkflow, _}
import org.apache.spark.ml.PredictionModel
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{DataFrame, Encoder, Row}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.types.StructType
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.{DataFrame, Encoder, Row}
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Suite}
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Suite}


@RunWith(classOf[JUnitRunner])
Expand Down Expand Up @@ -307,8 +306,9 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI
}
withClue("SmartTextVectorizer detects country feature as a PickList, hence no " +
"aggregation required for LOCO on this field.") {
testData.actualRecordInsights.foreach(p => assert(p.keys.exists(r =>
r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined)))
testData.actualRecordInsights.foreach { p =>
assert(p.keys.exists(r => r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined))
}
}

assertLOCOSum(testData.actualRecordInsights)
Expand Down Expand Up @@ -624,4 +624,4 @@ case class RecordInsightsTestData[M <: PredictionModel[Vector, M]]
label: FeatureLike[RealNN],
sparkModel: OpPredictorWrapperModel[M],
actualRecordInsights: Array[Map[OpVectorColumnHistory, Insights]]
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,12 @@ object RichVector {
Vectors.sparse(size, indices.toArray, values.toArray).compressed
}

implicit class RichSparseVector(val v: SparseVector) extends AnyVal {
def updated(index: Int, indexVal: Int, value: Double): SparseVector = {
require(v.indices(index) == indexVal,
s"Invalid index: indices($index)==${v.indices(index)}, expected: $indexVal")
v.values(index) = value
v
}
}
}

0 comments on commit 16ea717

Please sign in to comment.