Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid creating SparseVectors for LOCO #377

Merged
merged 19 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,22 @@

package com.salesforce.op.stages.impl.insights

import com.salesforce.op.{FeatureInsights, UID}
import com.salesforce.op.UID
import com.salesforce.op.features.types._
import com.salesforce.op.stages.base.unary.UnaryTransformer
import com.salesforce.op.stages.impl.feature.{DateToUnitCircle, TimePeriod}
import com.salesforce.op.stages.impl.feature.TimePeriod
import com.salesforce.op.stages.impl.selector.SelectedModel
import com.salesforce.op.stages.sparkwrappers.specific.OpPredictorWrapperModel
import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._
import com.salesforce.op.utils.spark.RichVector.RichSparseVector
import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorMetadata}
import enumeratum.{Enum, EnumEntry}
import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.Model
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.ml.param.{IntParam, Param, Params}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import scala.reflect.runtime.universe._


trait RecordInsightsLOCOParams extends Params {
Expand Down Expand Up @@ -115,35 +113,29 @@ class RecordInsightsLOCO[T <: Model[T]]
private val dateMapTypes =
Set(FeatureType.typeName[DateMap], FeatureType.typeName[DateTimeMap])

// Indices of features derived from Text(Map)Vectorizer
private lazy val textFeatureIndices = getIndicesOfFeatureType(textTypes ++ textMapTypes)
// Indices of features derived from hashed Text(Map)Vectorizer
private lazy val textFeatureIndices: Seq[Int] = getIndicesOfFeatureType(textTypes ++ textMapTypes,
h => h.indicatorValue.isEmpty && h.descriptorValue.isEmpty)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe update comment to indicate only getting hashed text values


// Indices of features derived from Date(Map)Vectorizer
private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes)
// Indices of features derived from unit Date(Map)Vectorizer
private lazy val dateFeatureIndices = getIndicesOfFeatureType(dateTypes ++ dateMapTypes, _.descriptorValue.isDefined)

/**
* Return the indices of features derived from given types.
* @return Seq[Int]
*/
private def getIndicesOfFeatureType (types: Set[String]): Seq[Int] = histories
.filter(_.parentFeatureType.exists(types.contains))
.map(_.index)
.distinct.sorted
private def getIndicesOfFeatureType(types: Set[String], predicate: OpVectorColumnHistory => Boolean): Seq[Int] =
histories.collect {
case h if h.parentFeatureType.exists(types.contains) && predicate(h) => h.index
}.distinct.sorted

private def computeDiffs
private def computeDiff
(
i: Int,
oldInd: Int,
oldVal: Double,
featureArray: Array[(Int, Double)],
featureSize: Int,
featureSparse: SparseVector,
baseScore: Array[Double]
): Array[Double] = {
featureArray.update(i, (oldInd, 0.0))
val score = modelApply(labelDummy, Vectors.sparse(featureSize, featureArray).toOPVector).score
val diffs = baseScore.zip(score).map { case (b, s) => b - s }
featureArray.update(i, (oldInd, oldVal))
diffs
val score = modelApply(labelDummy, featureSparse.toOPVector).score
(baseScore, score).zipped.map { case (b, s) => b - s }
}

private def sumArrays(left: Array[Double], right: Array[Double]): Array[Double] = {
Expand All @@ -158,70 +150,98 @@ class RecordInsightsLOCO[T <: Model[T]]
private def convertToTimePeriod(descriptorValue: String): Option[TimePeriod] =
descriptorValue.split("_").lastOption.flatMap(TimePeriod.withNameInsensitiveOption)

private def getRawFeatureName(history: OpVectorColumnHistory): Option[String] = history.grouping match {
case Some(grouping) => history.parentFeatureOrigins.headOption.map(_ + "_" + grouping)
case None => history.parentFeatureOrigins.headOption
private def getRawFeatureName(history: OpVectorColumnHistory): Option[String] = {
val groupSuffix = history.grouping.map("_" + _).getOrElse("")
val name = history.parentFeatureOrigins.headOption.map(_ + groupSuffix)

// If the descriptor value of a derived feature exists, then we check if it is
// from unit circle transformer. We aggregate such features for each (rawFeatureName, timePeriod).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is true now - but may not always be true. If you want this to apply only for date unit circles should also check that one of the parentFeatureStages is a DateToUnitCircleTransformer or DateToUnitCircleVectorizer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is not consistent : Unit Circle Transformation in DateMapVectorizer is not reflected in the parentStages (Seq[DateMapVectorizer] instead).
I think the check on descriptor value is coherent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or I can check the parentType instead

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this change is explicitly to deal with date features that are transformed to unit circle then the check needs to be explicitly for that. Otherwise this is also applied to lat lon values (and anything else that we add later) and if we just check the type of the parent it assumes that we will always have unit circle transformation of dates - which could change at some point...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but as I said above checking the parentFeatureStages won't work : for instance DateMapVectorizer may apply Unit circle transformation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DateMapVectorizer does days between reference date and the date. The only two that do unit vector are DateToUnitCircleTransformer and DateToUnitCircleVectorizer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then there must be a bug in the shortcut : when println(s"name ${history.columnName} stage ${history.parentFeatureStages} descriptor value ${history.descriptorValue}") I get

name dateMapFeature_k0_y_DayOfYear_33 stage ArrayBuffer(vecDateMap_DateMapVectorizer_00000000004c) descriptor value Some(y_DayOfYear)
name dateMapFeature_k1_x_DayOfYear_34 stage ArrayBuffer(vecDateMap_DateMapVectorizer_00000000004c) descriptor value Some(x_DayOfYear)
name dateMapFeature_k1_y_DayOfYear_35 stage ArrayBuffer(vecDateMap_DateMapVectorizer_00000000004c) descriptor value Some(y_DayOfYear)
name dateFeature_x_HourOfDay_0 stage ArrayBuffer() descriptor value Some(x_HourOfDay)
name dateFeature_y_HourOfDay_1 stage ArrayBuffer() descriptor value Some(y_HourOfDay)

Those features both use the .vetcorize shortcut.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blarg! you are right there is a bug in the feature history that means we loose info if the same feature undergoes multiple transformations :-( https://github.com/salesforce/TransmogrifAI/blob/master/features/src/main/scala/com/salesforce/op/utils/spark/OpVectorMetadata.scala#L53

Can you put a todo to update once the bug is fixed

name.map { n =>
val timePeriodName = if ((dateTypes ++ dateMapTypes).exists(history.parentFeatureType.contains)) {
history.descriptorValue
.flatMap(convertToTimePeriod)
.map(p => "_" + p.entryName)
} else None
n + timePeriodName.getOrElse("")
}
}

private def returnTopPosNeg
(
featureArray: Array[(Int, Double)],
featureSparse: SparseVector,
zeroCountByFeature: Map[String, Int],
featureSize: Int,
baseScore: Array[Double],
k: Int,
indexToExamine: Int
): Seq[LOCOValue] = {

val minMaxHeap = new MinMaxHeap(k)
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])]
for {i <- featureArray.indices} {
val (oldInd, oldVal) = featureArray(i)
val diffToExamine = computeDiffs(i, oldInd, oldVal, featureArray, featureSize, baseScore)
val history = histories(oldInd)

agggregateDiffs(featureSparse, indexToExamine, minMaxHeap, aggregationMap,
baseScore)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the sparse features you just put in a value of 0? Cant we just skip adding them to the heap?

Copy link
Contributor Author

@gerashegalov gerashegalov Aug 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same idea but in one of the iteration I ran into test failures and deferred it to later. I'll recheck now that I have everything green. @michaelweilsalesforce any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of failures have you encountered?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be that we were doing an unnecessary calculation and that just happened to be captured in the test...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michaelweilsalesforce you can reproduce it by commenting out the line 171-172.

Aggregate all the derived hashing tf features of rawFeature - text. 0.08025355373244505 was not less than 1.0E-10 expected aggregated LOCO value (0.006978569889777832) should be the same as actual (0.08723212362222289)

Aggregate x_HourOfDay and y_HourOfDay of rawFeature - dateFeature. 0.016493734169231777 was not less than 1.0E-10 expected aggregated LOCO value (0.016493734169231777) should be the same as actual (0.032987468338463555)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leahmcguire @gerashegalov The reason for tracking zero values is whenever we want to average LOCOs of a same raw text feature we are also including the zero values.
E.g if text feature TextA has on a row 6 non zero values loco1, ..., loco6 and 4 0s, we are dividing by 10 :
(loco1 + loco2 + ... + loco6)/10

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LEt me write a fix that will not go over the zeros


// Aggregation map contains aggregation of Unit Circle Dates and Hashed Text Features
// Adding LOCO results from aggregation map into heaps
for {(name, (indices, ar)) <- aggregationMap} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val zeroCounts = zeroCountByFeature.get(name).getOrElse(0)
val diffToExamine = ar.map(_ / (n + zeroCounts))
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait so we are aggregating everything into a map and then putting it into a heap and then just taking it out of the heap? doesn't that defeat the whole purpose of the heap? Shouldn't we be putting each value into the heap as we calculating it rather than aggregating the whole thing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are only aggregating TF and Date features

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok - can you add a comment to that effect

}

minMaxHeap.dequeueAll
}

private def agggregateDiffs(
featureVec: SparseVector,
indexToExamine: Int,
minMaxHeap: MinMaxHeap,
aggregationMap: mutable.Map[String, (Array[Int], Array[Double])],
baseScore: Array[Double]
): Unit = {
computeDiffs(featureVec, baseScore).foreach { case (i, oldInd, diffToExamine) =>
val history = histories(oldInd)
history match {
// If indicator value and descriptor value of a derived text feature are empty, then it is likely
// to be a hashing tf output. We aggregate such features for each (rawFeatureName).
case h if h.indicatorValue.isEmpty && h.descriptorValue.isEmpty && textFeatureIndices.contains(oldInd) =>
// If indicator value and descriptor value of a derived text feature are empty, then it is
// a hashing tf output. We aggregate such features for each (rawFeatureName).
case h if (textFeatureIndices ++ dateFeatureIndices).contains(oldInd) =>
for {name <- getRawFeatureName(h)} {
val (indices, array) = aggregationMap.getOrElse(name, (Array.empty[Int], Array.empty[Double]))
aggregationMap.update(name, (indices :+ i, sumArrays(array, diffToExamine)))
}
// If the descriptor value of a derived date feature exists, then it is likely to be
// from unit circle transformer. We aggregate such features for each (rawFeatureName, timePeriod).
case h if h.descriptorValue.isDefined && dateFeatureIndices.contains(oldInd) =>
for {name <- getRawFeatureName(h)} {
val key = name + h.descriptorValue.flatMap(convertToTimePeriod).map(p => "_" + p.entryName).getOrElse("")
val (indices, array) = aggregationMap.getOrElse(key, (Array.empty[Int], Array.empty[Double]))
aggregationMap.update(key, (indices :+ i, sumArrays(array, diffToExamine)))
}
case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
}
}

// Adding LOCO results from aggregation map into heaps
for {(indices, ar) <- aggregationMap.values} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val diffToExamine = ar.map(_ / n)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
private def computeDiffs(
featureVec: SparseVector,
baseScore: Array[Double]
) = {
(0 until featureVec.size, featureVec.indices).zipped.map { case (i, oldInd) =>
(i, oldInd, computeDiff(featureVec.copy.updated(i, oldInd, 0.0), baseScore))
}

minMaxHeap.dequeueAll
}

override def transformFn: OPVector => TextMap = features => {
val baseResult = modelApply(labelDummy, features)
val baseScore = baseResult.score
val featureSize = features.value.size

// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros
val featuresSparse = features.value.toSparse
val res = ArrayBuffer.empty[(Int, Double)]
featuresSparse.foreachActive((i, v) => res += i -> v)
val featureIndexSet = featuresSparse.indices.toSet

// Besides non 0 values, we want to check the text/date features as well
(textFeatureIndices ++ dateFeatureIndices).foreach(i => if (!featuresSparse.indices.contains(i)) res += i -> 0.0)
val featureArray = res.toArray
val featureSize = featuresSparse.size
val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices)
.filterNot(featureIndexSet.contains)

// Count zeros by feature name
val zeroCountByFeature = zeroValIndices
.groupBy(i => getRawFeatureName(histories(i)).get)
.mapValues(_.length).view.toMap
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What’s the point of .view here?

Copy link
Contributor Author

@gerashegalov gerashegalov Aug 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to force map materialization after toMap in 2.11


val k = $(topK)
// Index where to examine the difference in the prediction vector
Expand All @@ -232,15 +252,16 @@ class RecordInsightsLOCO[T <: Model[T]]
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability)
case n if n > 2 => baseResult.prediction.toInt
}
val topPosNeg = returnTopPosNeg(featureArray, featureSize, baseScore, k, indexToExamine)
val topPosNeg = returnTopPosNeg(featuresSparse, zeroCountByFeature, featureSize, baseScore, k, indexToExamine)
val top = getTopKStrategy match {
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k)
// Take top K positive and top K negative LOCOs, hence 2 * K
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k)
}

val allIndices = featuresSparse.indices ++ zeroValIndices
top.map { case LOCOValue(i, _, diffs) =>
RecordInsightsParser.insightToText(featureInfo(featureArray(i)._1), diffs)
RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs)
}.toMap.toTextMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@

package com.salesforce.op.stages.impl.insights

import com.salesforce.op.{FeatureHistory, OpWorkflow}
import com.salesforce.op.features.FeatureLike
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.classification.{OpLogisticRegression, OpRandomForestClassifier}
import com.salesforce.op._
import com.salesforce.op.features.FeatureLike
import com.salesforce.op.stages.impl.feature.{DateListPivot, TransmogrifierDefaults}
import com.salesforce.op.stages.impl.insights.RecordInsightsParser.Insights
import com.salesforce.op.stages.impl.preparators.{SanityCheckDataTest, SanityChecker}
Expand All @@ -45,17 +43,18 @@ import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext}
import com.salesforce.op.testkit.{RandomIntegral, RandomMap, RandomReal, RandomText, RandomVector}
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorColumnMetadata, OpVectorMetadata}
import org.apache.spark.ml.{Model, PredictionModel}
import com.salesforce.op.{FeatureHistory, OpWorkflow, _}
import org.apache.spark.ml.PredictionModel
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{DataFrame, Encoder, Row}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.types.StructType
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.{DataFrame, Encoder, Row}
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Suite}
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Suite}


@RunWith(classOf[JUnitRunner])
Expand Down Expand Up @@ -307,8 +306,9 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI
}
withClue("SmartTextVectorizer detects country feature as a PickList, hence no " +
"aggregation required for LOCO on this field.") {
testData.actualRecordInsights.foreach(p => assert(p.keys.exists(r =>
r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined)))
testData.actualRecordInsights.foreach { p =>
assert(p.keys.exists(r => r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined))
}
}

assertLOCOSum(testData.actualRecordInsights)
Expand Down Expand Up @@ -624,4 +624,4 @@ case class RecordInsightsTestData[M <: PredictionModel[Vector, M]]
label: FeatureLike[RealNN],
sparkModel: OpPredictorWrapperModel[M],
actualRecordInsights: Array[Map[OpVectorColumnHistory, Insights]]
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,12 @@ object RichVector {
Vectors.sparse(size, indices.toArray, values.toArray).compressed
}

implicit class RichSparseVector(val v: SparseVector) extends AnyVal {
def updated(index: Int, indexVal: Int, value: Double): SparseVector = {
require(v.indices(index) == indexVal,
s"Invalid index: indices($index)==${v.indices(index)}, expected: $indexVal")
v.values(index) = value
v
}
}
}