-
Notifications
You must be signed in to change notification settings - Fork 394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid creating SparseVectors for LOCO #377
Changes from 3 commits
66a5e99
77b97ee
35b8b5c
cb2dc05
a494129
1c383e5
a1d7d81
f95f4bf
a8ff84f
709594f
96b2941
ee76053
2248d49
c3f743d
dc39cb3
25a6ac5
3b7b63a
b271b74
087b1d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,24 +30,22 @@ | |
|
||
package com.salesforce.op.stages.impl.insights | ||
|
||
import com.salesforce.op.{FeatureInsights, UID} | ||
import com.salesforce.op.UID | ||
import com.salesforce.op.features.types._ | ||
import com.salesforce.op.stages.base.unary.UnaryTransformer | ||
import com.salesforce.op.stages.impl.feature.{DateToUnitCircle, TimePeriod} | ||
import com.salesforce.op.stages.impl.feature.TimePeriod | ||
import com.salesforce.op.stages.impl.selector.SelectedModel | ||
import com.salesforce.op.stages.sparkwrappers.specific.OpPredictorWrapperModel | ||
import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._ | ||
import com.salesforce.op.utils.spark.RichVector.RichSparseVector | ||
import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorMetadata} | ||
import enumeratum.{Enum, EnumEntry} | ||
import org.apache.spark.annotation.Experimental | ||
import org.apache.spark.ml.Model | ||
import org.apache.spark.ml.linalg.Vectors | ||
import org.apache.spark.ml.linalg.SparseVector | ||
import org.apache.spark.ml.param.{IntParam, Param, Params} | ||
|
||
import scala.collection.mutable | ||
import scala.collection.mutable.ArrayBuffer | ||
|
||
import scala.reflect.runtime.universe._ | ||
|
||
|
||
trait RecordInsightsLOCOParams extends Params { | ||
|
@@ -130,20 +128,13 @@ class RecordInsightsLOCO[T <: Model[T]] | |
.map(_.index) | ||
.distinct.sorted | ||
|
||
private def computeDiffs | ||
private def computeDiff | ||
( | ||
i: Int, | ||
oldInd: Int, | ||
oldVal: Double, | ||
featureArray: Array[(Int, Double)], | ||
featureSize: Int, | ||
featureSparse: SparseVector, | ||
baseScore: Array[Double] | ||
): Array[Double] = { | ||
featureArray.update(i, (oldInd, 0.0)) | ||
val score = modelApply(labelDummy, Vectors.sparse(featureSize, featureArray).toOPVector).score | ||
val diffs = baseScore.zip(score).map { case (b, s) => b - s } | ||
featureArray.update(i, (oldInd, oldVal)) | ||
diffs | ||
val score = modelApply(labelDummy, featureSparse.toOPVector).score | ||
baseScore.zip(score).map { case (b, s) => b - s } | ||
} | ||
|
||
private def sumArrays(left: Array[Double], right: Array[Double]): Array[Double] = { | ||
|
@@ -165,20 +156,42 @@ class RecordInsightsLOCO[T <: Model[T]] | |
|
||
private def returnTopPosNeg | ||
( | ||
featureArray: Array[(Int, Double)], | ||
featureSparse: SparseVector, | ||
zeroValIndices: Array[Int], | ||
featureSize: Int, | ||
baseScore: Array[Double], | ||
k: Int, | ||
indexToExamine: Int | ||
): Seq[LOCOValue] = { | ||
|
||
val minMaxHeap = new MinMaxHeap(k) | ||
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])] | ||
for {i <- featureArray.indices} { | ||
val (oldInd, oldVal) = featureArray(i) | ||
val diffToExamine = computeDiffs(i, oldInd, oldVal, featureArray, featureSize, baseScore) | ||
val history = histories(oldInd) | ||
|
||
agggregateDiffs(0, Left(featureSparse), indexToExamine, minMaxHeap, aggregationMap, | ||
baseScore) | ||
agggregateDiffs(featureSparse.size, Right(zeroValIndices), indexToExamine, minMaxHeap, | ||
aggregationMap, baseScore) | ||
|
||
// Adding LOCO results from aggregation map into heaps | ||
for {(indices, ar) <- aggregationMap.values} { | ||
// The index here is arbitrary | ||
val (i, n) = (indices.head, indices.length) | ||
val diffToExamine = ar.map(_ / n) | ||
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wait so we are aggregating everything into a map and then putting it into a heap and then just taking it out of the heap? doesn't that defeat the whole purpose of the heap? Shouldn't we be putting each value into the heap as we calculating it rather than aggregating the whole thing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are only aggregating TF and Date features There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah ok - can you add a comment to that effect |
||
} | ||
|
||
minMaxHeap.dequeueAll | ||
} | ||
|
||
private def agggregateDiffs( | ||
offset: Int, | ||
featureVec: Either[SparseVector, Array[Int]], | ||
indexToExamine: Int, | ||
minMaxHeap: MinMaxHeap, | ||
aggregationMap: mutable.Map[String, (Array[Int], Array[Double])], | ||
baseScore: Array[Double] | ||
): Unit = { | ||
computeDiffs(featureVec, offset, baseScore).foreach { case (i, oldInd, diffToExamine) => | ||
val history = histories(oldInd) | ||
history match { | ||
// If indicator value and descriptor value of a derived text feature are empty, then it is likely | ||
// to be a hashing tf output. We aggregate such features for each (rawFeatureName). | ||
|
@@ -198,30 +211,36 @@ class RecordInsightsLOCO[T <: Model[T]] | |
case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) | ||
} | ||
} | ||
} | ||
|
||
// Adding LOCO results from aggregation map into heaps | ||
for {(indices, ar) <- aggregationMap.values} { | ||
// The index here is arbitrary | ||
val (i, n) = (indices.head, indices.length) | ||
val diffToExamine = ar.map(_ / n) | ||
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine) | ||
private def computeDiffs( | ||
featureVec: Either[SparseVector, Array[Int]], | ||
offset: Int, baseScore: Array[Double] | ||
) = { | ||
val zdif = Array.fill(baseScore.length)(0.0) | ||
featureVec match { | ||
case Left(sparse) => (0 until sparse.size, sparse.indices).zipped | ||
.map { case (i, oldInd) => | ||
(i, oldInd, computeDiff(sparse.copy.updated(i, oldInd, 0.0), baseScore)) | ||
} | ||
case Right(zeroeIndices) => (0 until zeroeIndices.length, zeroeIndices).zipped | ||
.map { case (i, oldInd) => (i + offset, oldInd, zdif) } | ||
} | ||
|
||
minMaxHeap.dequeueAll | ||
} | ||
|
||
override def transformFn: OPVector => TextMap = features => { | ||
val baseResult = modelApply(labelDummy, features) | ||
val baseScore = baseResult.score | ||
val featureSize = features.value.size | ||
|
||
// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros | ||
val featuresSparse = features.value.toSparse | ||
val res = ArrayBuffer.empty[(Int, Double)] | ||
featuresSparse.foreachActive((i, v) => res += i -> v) | ||
val featureIndexSet = featuresSparse.indices.toSet | ||
|
||
// Besides non 0 values, we want to check the text/date features as well | ||
(textFeatureIndices ++ dateFeatureIndices).foreach(i => if (!featuresSparse.indices.contains(i)) res += i -> 0.0) | ||
val featureArray = res.toArray | ||
val featureSize = featuresSparse.size | ||
val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices) | ||
.filterNot { featureIndexSet.contains } | ||
.toArray | ||
|
||
val k = $(topK) | ||
// Index where to examine the difference in the prediction vector | ||
|
@@ -232,15 +251,16 @@ class RecordInsightsLOCO[T <: Model[T]] | |
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability) | ||
case n if n > 2 => baseResult.prediction.toInt | ||
} | ||
val topPosNeg = returnTopPosNeg(featureArray, featureSize, baseScore, k, indexToExamine) | ||
val topPosNeg = returnTopPosNeg(featuresSparse, zeroValIndices, featureSize, baseScore, k, indexToExamine) | ||
val top = getTopKStrategy match { | ||
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k) | ||
// Take top K positive and top K negative LOCOs, hence 2 * K | ||
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k) | ||
} | ||
|
||
val allIndices = featuresSparse.indices ++ zeroValIndices | ||
top.map { case LOCOValue(i, _, diffs) => | ||
RecordInsightsParser.insightToText(featureInfo(featureArray(i)._1), diffs) | ||
RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs) | ||
}.toMap.toTextMap | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the sparse features you just put in a value of 0? Cant we just skip adding them to the heap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same idea but in one of the iteration I ran into test failures and deferred it to later. I'll recheck now that I have everything green. @michaelweilsalesforce any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of failures have you encountered?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it may be that we were doing an unnecessary calculation and that just happened to be captured in the test...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@michaelweilsalesforce you can reproduce it by commenting out the line 171-172.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sanmitra
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@leahmcguire @gerashegalov The reason for tracking zero values is whenever we want to average LOCOs of a same raw text feature we are also including the zero values.
E.g if text feature
TextA
has on a row 6 non zero valuesloco1
, ...,loco6
and 4 0s, we are dividing by 10 :(
loco1
+loco2
+ ... +loco6
)/10There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LEt me write a fix that will not go over the zeros