Skip to content

Commit

Permalink
Merge branch 'gera/dense-vec' into gera/perf-regression
Browse files Browse the repository at this point in the history
  • Loading branch information
gerashegalov committed Aug 3, 2019
2 parents 3404ecd + eb7f0f7 commit 9c6cb21
Showing 1 changed file with 52 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
package com.salesforce.op.stages.impl.insights

import com.salesforce.op.UID
import com.salesforce.op.features.types.{OPVector, _}
import com.salesforce.op.features.types._
import com.salesforce.op.stages.base.unary.UnaryTransformer
import com.salesforce.op.stages.impl.feature.TimePeriod
import com.salesforce.op.stages.impl.selector.SelectedModel
Expand All @@ -44,10 +44,10 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.Model
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.ml.param.{IntParam, Param, Params}
import org.apache.spark.ml.linalg.Vectors

import scala.collection.mutable


trait RecordInsightsLOCOParams extends Params {

final val topK = new IntParam(
Expand Down Expand Up @@ -162,60 +162,84 @@ class RecordInsightsLOCO[T <: Model[T]]
private def returnTopPosNeg
(
featureSparse: SparseVector,
zeroValIndices: Array[Int],
featureSize: Int,
baseScore: Array[Double],
k: Int,
indexToExamine: Int
): Seq[LOCOValue] = {

val zdif = Array.fill(baseScore.length)(0.0)
val minMaxHeap = new MinMaxHeap(k)
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])]

featureSparse.foreachNonZeroIndexedValue { case (i, oldInd, _) =>
val diffToExamine = computeDiffs(i, oldInd, featureSparse, baseScore)
agggregateDiffs(0, Left(featureSparse), indexToExamine, zdif, minMaxHeap, aggregationMap,
baseScore)
agggregateDiffs(featureSparse.size, Right(zeroValIndices), indexToExamine, zdif, minMaxHeap,
aggregationMap, baseScore)

// Adding LOCO results from aggregation map into heaps
for {(indices, ar) <- aggregationMap.values} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val diffToExamine = ar.map(_ / n)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}

private def agggregateDiffs(
offset: Int,
featureVec: Either[SparseVector, Array[Int]],
indexToExamine: Int,
zdif: Array[Double],
minMaxHeap: MinMaxHeap,
aggregationMap: mutable.Map[String, (Array[Int], Array[Double])],
baseScore: Array[Double]
): Unit = {
(
featureVec match {
case Left(sparse) => (0 until sparse.size, sparse.indices).zipped
.map { case ( i, oldInd) => (i, oldInd, computeDiffs(i, oldInd, sparse, baseScore)) }
case Right(zeroeIndices) => (0 until zeroeIndices.length, zeroeIndices).zipped
.map { case (i, oldInd) => (i + offset, oldInd, zdif) }
}
).foreach { case (i, oldInd, diffToExamine) =>
val history = histories(oldInd)
history match {
// If indicator value and descriptor value of a derived text feature are empty, then it is likely
// to be a hashing tf output. We aggregate such features for each (rawFeatureName).
case h if h.indicatorValue.isEmpty && h.descriptorValue.isEmpty && textFeatureIndices.contains(oldInd) =>
for {name <- getRawFeatureName(h)} {
val (indices, array) = aggregationMap.getOrElse(name, (Array.empty[Int], Array.empty[Double]))
aggregationMap.update(name, (indices :+ oldInd, sumArrays(array, diffToExamine)))
aggregationMap.update(name, (indices :+ i, sumArrays(array, diffToExamine)))
}
// If the descriptor value of a derived date feature exists, then it is likely to be
// from unit circle transformer. We aggregate such features for each (rawFeatureName, timePeriod).
case h if h.descriptorValue.isDefined && dateFeatureIndices.contains(oldInd) =>
for {name <- getRawFeatureName(h)} {
val key = name + h.descriptorValue.flatMap(convertToTimePeriod).map(p => "_" + p.entryName).getOrElse("")
val (indices, array) = aggregationMap.getOrElse(key, (Array.empty[Int], Array.empty[Double]))
aggregationMap.update(key, (indices :+ oldInd, sumArrays(array, diffToExamine)))
aggregationMap.update(key, (indices :+ i, sumArrays(array, diffToExamine)))
}
case _ => minMaxHeap enqueue LOCOValue(oldInd, diffToExamine(indexToExamine), diffToExamine)
case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
}

// Adding LOCO results from aggregation map into heaps
for {(indices, ar) <- aggregationMap.values} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val diffToExamine = ar.map(_ / n)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}

override def transformFn: OPVector => TextMap = features => {
val baseResult = modelApply(labelDummy, features)
val baseScore = baseResult.score
val numFeatures = features.value.size
val featureSize = features.value.size

// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros
val featuresSparse = features.value.toSparse
val featureIndexSet = featuresSparse.indices.toSet

// Besides non 0 values, we want to check the text/date features as well
val textDateIndexSet = (textFeatureIndices ++ dateFeatureIndices).toSet
val featuresIndexSet = mutable.Set.empty[Int]
features.value.foreachActive { case (i, _) => featuresIndexSet.add(i) }
val textDateExtras = (textDateIndexSet -- featuresIndexSet).map(i => (i, 0.0)).toSeq
val featureSparseArray = features.value.toSparse.toIndexedArray ++ textDateExtras
val featureSparse = Vectors.sparse(numFeatures, featureSparseArray).toSparse
val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices)
.filterNot { featureIndexSet.contains }
.toArray

val k = $(topK)
// Index where to examine the difference in the prediction vector
Expand All @@ -226,15 +250,16 @@ class RecordInsightsLOCO[T <: Model[T]]
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability)
case n if n > 2 => baseResult.prediction.toInt
}
val topPosNeg = returnTopPosNeg(featureSparse, baseScore, k, indexToExamine)
val topPosNeg = returnTopPosNeg(featuresSparse, zeroValIndices, featureSize, baseScore, k, indexToExamine)
val top = getTopKStrategy match {
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k)
// Take top K positive and top K negative LOCOs, hence 2 * K
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k)
}

val allIndices = featuresSparse.indices ++ zeroValIndices
top.map { case LOCOValue(i, _, diffs) =>
RecordInsightsParser.insightToText(featureInfo(i), diffs)
RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs)
}.toMap.toTextMap
}

Expand Down

0 comments on commit 9c6cb21

Please sign in to comment.