Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid creating SparseVectors for LOCO #377

Merged
merged 19 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,22 @@

package com.salesforce.op.stages.impl.insights

import com.salesforce.op.{FeatureInsights, UID}
import com.salesforce.op.UID
import com.salesforce.op.features.types._
import com.salesforce.op.stages.base.unary.UnaryTransformer
import com.salesforce.op.stages.impl.feature.{DateToUnitCircle, TimePeriod}
import com.salesforce.op.stages.impl.feature.TimePeriod
import com.salesforce.op.stages.impl.selector.SelectedModel
import com.salesforce.op.stages.sparkwrappers.specific.OpPredictorWrapperModel
import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._
import com.salesforce.op.utils.spark.RichVector.RichSparseVector
import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorMetadata}
import enumeratum.{Enum, EnumEntry}
import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.Model
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.ml.param.{IntParam, Param, Params}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import scala.reflect.runtime.universe._


trait RecordInsightsLOCOParams extends Params {
Expand Down Expand Up @@ -130,20 +128,13 @@ class RecordInsightsLOCO[T <: Model[T]]
.map(_.index)
.distinct.sorted

private def computeDiffs
private def computeDiff
(
i: Int,
oldInd: Int,
oldVal: Double,
featureArray: Array[(Int, Double)],
featureSize: Int,
featureSparse: SparseVector,
baseScore: Array[Double]
): Array[Double] = {
featureArray.update(i, (oldInd, 0.0))
val score = modelApply(labelDummy, Vectors.sparse(featureSize, featureArray).toOPVector).score
val diffs = baseScore.zip(score).map { case (b, s) => b - s }
featureArray.update(i, (oldInd, oldVal))
diffs
val score = modelApply(labelDummy, featureSparse.toOPVector).score
baseScore.zip(score).map { case (b, s) => b - s }
}

private def sumArrays(left: Array[Double], right: Array[Double]): Array[Double] = {
Expand All @@ -165,20 +156,42 @@ class RecordInsightsLOCO[T <: Model[T]]

private def returnTopPosNeg
(
featureArray: Array[(Int, Double)],
featureSparse: SparseVector,
zeroValIndices: Array[Int],
featureSize: Int,
baseScore: Array[Double],
k: Int,
indexToExamine: Int
): Seq[LOCOValue] = {

val minMaxHeap = new MinMaxHeap(k)
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])]
for {i <- featureArray.indices} {
val (oldInd, oldVal) = featureArray(i)
val diffToExamine = computeDiffs(i, oldInd, oldVal, featureArray, featureSize, baseScore)
val history = histories(oldInd)

agggregateDiffs(0, Left(featureSparse), indexToExamine, minMaxHeap, aggregationMap,
baseScore)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the sparse features you just put in a value of 0? Cant we just skip adding them to the heap?

Copy link
Contributor Author

@gerashegalov gerashegalov Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same idea but in one of the iteration I ran into test failures and deferred it to later. I'll recheck now that I have everything green. @michaelweilsalesforce any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of failures have you encountered?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be that we were doing an unnecessary calculation and that just happened to be captured in the test...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michaelweilsalesforce you can reproduce it by commenting out the line 171-172.

Aggregate all the derived hashing tf features of rawFeature - text. 0.08025355373244505 was not less than 1.0E-10 expected aggregated LOCO value (0.006978569889777832) should be the same as actual (0.08723212362222289)

Aggregate x_HourOfDay and y_HourOfDay of rawFeature - dateFeature. 0.016493734169231777 was not less than 1.0E-10 expected aggregated LOCO value (0.016493734169231777) should be the same as actual (0.032987468338463555)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leahmcguire @gerashegalov The reason for tracking zero values is whenever we want to average LOCOs of a same raw text feature we are also including the zero values.
E.g if text feature TextA has on a row 6 non zero values loco1, ..., loco6 and 4 0s, we are dividing by 10 :
(loco1 + loco2 + ... + loco6)/10

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LEt me write a fix that will not go over the zeros

agggregateDiffs(featureSparse.size, Right(zeroValIndices), indexToExamine, minMaxHeap,
aggregationMap, baseScore)

// Adding LOCO results from aggregation map into heaps
for {(indices, ar) <- aggregationMap.values} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val diffToExamine = ar.map(_ / n)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait so we are aggregating everything into a map and then putting it into a heap and then just taking it out of the heap? doesn't that defeat the whole purpose of the heap? Shouldn't we be putting each value into the heap as we calculating it rather than aggregating the whole thing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are only aggregating TF and Date features

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok - can you add a comment to that effect

}

minMaxHeap.dequeueAll
}

private def agggregateDiffs(
offset: Int,
featureVec: Either[SparseVector, Array[Int]],
indexToExamine: Int,
minMaxHeap: MinMaxHeap,
aggregationMap: mutable.Map[String, (Array[Int], Array[Double])],
baseScore: Array[Double]
): Unit = {
computeDiffs(featureVec, offset, baseScore).foreach { case (i, oldInd, diffToExamine) =>
val history = histories(oldInd)
history match {
// If indicator value and descriptor value of a derived text feature are empty, then it is likely
// to be a hashing tf output. We aggregate such features for each (rawFeatureName).
Expand All @@ -198,30 +211,36 @@ class RecordInsightsLOCO[T <: Model[T]]
case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
}
}

// Adding LOCO results from aggregation map into heaps
for {(indices, ar) <- aggregationMap.values} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val diffToExamine = ar.map(_ / n)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
private def computeDiffs(
featureVec: Either[SparseVector, Array[Int]],
offset: Int, baseScore: Array[Double]
) = {
val zdif = Array.fill(baseScore.length)(0.0)
featureVec match {
case Left(sparse) => (0 until sparse.size, sparse.indices).zipped
.map { case (i, oldInd) =>
(i, oldInd, computeDiff(sparse.copy.updated(i, oldInd, 0.0), baseScore))
}
case Right(zeroeIndices) => (0 until zeroeIndices.length, zeroeIndices).zipped
.map { case (i, oldInd) => (i + offset, oldInd, zdif) }
}

minMaxHeap.dequeueAll
}

override def transformFn: OPVector => TextMap = features => {
val baseResult = modelApply(labelDummy, features)
val baseScore = baseResult.score
val featureSize = features.value.size

// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros
val featuresSparse = features.value.toSparse
val res = ArrayBuffer.empty[(Int, Double)]
featuresSparse.foreachActive((i, v) => res += i -> v)
val featureIndexSet = featuresSparse.indices.toSet

// Besides non 0 values, we want to check the text/date features as well
(textFeatureIndices ++ dateFeatureIndices).foreach(i => if (!featuresSparse.indices.contains(i)) res += i -> 0.0)
val featureArray = res.toArray
val featureSize = featuresSparse.size
val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices)
.filterNot { featureIndexSet.contains }
.toArray

val k = $(topK)
// Index where to examine the difference in the prediction vector
Expand All @@ -232,15 +251,16 @@ class RecordInsightsLOCO[T <: Model[T]]
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability)
case n if n > 2 => baseResult.prediction.toInt
}
val topPosNeg = returnTopPosNeg(featureArray, featureSize, baseScore, k, indexToExamine)
val topPosNeg = returnTopPosNeg(featuresSparse, zeroValIndices, featureSize, baseScore, k, indexToExamine)
val top = getTopKStrategy match {
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k)
// Take top K positive and top K negative LOCOs, hence 2 * K
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k)
}

val allIndices = featuresSparse.indices ++ zeroValIndices
top.map { case LOCOValue(i, _, diffs) =>
RecordInsightsParser.insightToText(featureInfo(featureArray(i)._1), diffs)
RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs)
}.toMap.toTextMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@

package com.salesforce.op.stages.impl.insights

import com.salesforce.op.{FeatureHistory, OpWorkflow}
import com.salesforce.op.features.FeatureLike
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.classification.{OpLogisticRegression, OpRandomForestClassifier}
import com.salesforce.op._
import com.salesforce.op.features.FeatureLike
import com.salesforce.op.stages.impl.feature.{DateListPivot, TransmogrifierDefaults}
import com.salesforce.op.stages.impl.insights.RecordInsightsParser.Insights
import com.salesforce.op.stages.impl.preparators.{SanityCheckDataTest, SanityChecker}
Expand All @@ -45,17 +43,18 @@ import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext}
import com.salesforce.op.testkit.{RandomIntegral, RandomMap, RandomReal, RandomText, RandomVector}
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorColumnMetadata, OpVectorMetadata}
import org.apache.spark.ml.{Model, PredictionModel}
import com.salesforce.op.{FeatureHistory, OpWorkflow, _}
import org.apache.spark.ml.PredictionModel
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{DataFrame, Encoder, Row}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.types.StructType
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.{DataFrame, Encoder, Row}
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Suite}
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Suite}


@RunWith(classOf[JUnitRunner])
Expand Down Expand Up @@ -307,8 +306,9 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI
}
withClue("SmartTextVectorizer detects country feature as a PickList, hence no " +
"aggregation required for LOCO on this field.") {
testData.actualRecordInsights.foreach(p => assert(p.keys.exists(r =>
r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined)))
testData.actualRecordInsights.foreach { p =>
assert(p.keys.exists(r => r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined))
}
}

assertLOCOSum(testData.actualRecordInsights)
Expand Down Expand Up @@ -624,4 +624,4 @@ case class RecordInsightsTestData[M <: PredictionModel[Vector, M]]
label: FeatureLike[RealNN],
sparkModel: OpPredictorWrapperModel[M],
actualRecordInsights: Array[Map[OpVectorColumnHistory, Insights]]
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,12 @@ object RichVector {
Vectors.sparse(size, indices.toArray, values.toArray).compressed
}

implicit class RichSparseVector(val v: SparseVector) extends AnyVal {
def updated(index: Int, indexVal: Int, value: Double): SparseVector = {
require(v.indices(index) == indexVal,
s"Invalid index: indices($index)==${v.indices(index)}, expected: $indexVal")
v.values(index) = value
v
}
}
}