Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
gerashegalov committed Aug 5, 2019
1 parent e1bab3b commit 66a5e99
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,22 @@

package com.salesforce.op.stages.impl.insights

import com.salesforce.op.{FeatureInsights, UID}
import com.salesforce.op.UID
import com.salesforce.op.features.types._
import com.salesforce.op.stages.base.unary.UnaryTransformer
import com.salesforce.op.stages.impl.feature.{DateToUnitCircle, TimePeriod}
import com.salesforce.op.stages.impl.feature.TimePeriod
import com.salesforce.op.stages.impl.selector.SelectedModel
import com.salesforce.op.stages.sparkwrappers.specific.OpPredictorWrapperModel
import com.salesforce.op.stages.sparkwrappers.specific.SparkModelConverter._
import com.salesforce.op.utils.spark.RichVector._
import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorMetadata}
import enumeratum.{Enum, EnumEntry}
import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.Model
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.SparseVector
import org.apache.spark.ml.param.{IntParam, Param, Params}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import scala.reflect.runtime.universe._


trait RecordInsightsLOCOParams extends Params {
Expand Down Expand Up @@ -134,15 +132,13 @@ class RecordInsightsLOCO[T <: Model[T]]
(
i: Int,
oldInd: Int,
oldVal: Double,
featureArray: Array[(Int, Double)],
featureSize: Int,
featureSparse: SparseVector,
baseScore: Array[Double]
): Array[Double] = {
featureArray.update(i, (oldInd, 0.0))
val score = modelApply(labelDummy, Vectors.sparse(featureSize, featureArray).toOPVector).score
val oldVal = featureSparse.update(i, oldInd, 0.0)
val score = modelApply(labelDummy, featureSparse.toOPVector).score
val diffs = baseScore.zip(score).map { case (b, s) => b - s }
featureArray.update(i, (oldInd, oldVal))
featureSparse.update(i, oldInd, oldVal)
diffs
}

Expand All @@ -165,20 +161,51 @@ class RecordInsightsLOCO[T <: Model[T]]

private def returnTopPosNeg
(
featureArray: Array[(Int, Double)],
featureSparse: SparseVector,
zeroValIndices: Array[Int],
featureSize: Int,
baseScore: Array[Double],
k: Int,
indexToExamine: Int
): Seq[LOCOValue] = {

val zdif = Array.fill(baseScore.length)(0.0)
val minMaxHeap = new MinMaxHeap(k)
val aggregationMap = mutable.Map.empty[String, (Array[Int], Array[Double])]
for {i <- featureArray.indices} {
val (oldInd, oldVal) = featureArray(i)
val diffToExamine = computeDiffs(i, oldInd, oldVal, featureArray, featureSize, baseScore)
val history = histories(oldInd)

agggregateDiffs(0, Left(featureSparse), indexToExamine, zdif, minMaxHeap, aggregationMap,
baseScore)
agggregateDiffs(featureSparse.size, Right(zeroValIndices), indexToExamine, zdif, minMaxHeap,
aggregationMap, baseScore)

// Adding LOCO results from aggregation map into heaps
for {(indices, ar) <- aggregationMap.values} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val diffToExamine = ar.map(_ / n)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}

private def agggregateDiffs(
offset: Int,
featureVec: Either[SparseVector, Array[Int]],
indexToExamine: Int,
zdif: Array[Double],
minMaxHeap: MinMaxHeap,
aggregationMap: mutable.Map[String, (Array[Int], Array[Double])],
baseScore: Array[Double]
): Unit = {
(
featureVec match {
case Left(sparse) => (0 until sparse.size, sparse.indices).zipped
.map { case ( i, oldInd) => (i, oldInd, computeDiffs(i, oldInd, sparse, baseScore)) }
case Right(zeroeIndices) => (0 until zeroeIndices.length, zeroeIndices).zipped
.map { case (i, oldInd) => (i + offset, oldInd, zdif) }
}
).foreach { case (i, oldInd, diffToExamine) =>
val history = histories(oldInd)
history match {
// If indicator value and descriptor value of a derived text feature are empty, then it is likely
// to be a hashing tf output. We aggregate such features for each (rawFeatureName).
Expand All @@ -198,30 +225,21 @@ class RecordInsightsLOCO[T <: Model[T]]
case _ => minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}
}

// Adding LOCO results from aggregation map into heaps
for {(indices, ar) <- aggregationMap.values} {
// The index here is arbitrary
val (i, n) = (indices.head, indices.length)
val diffToExamine = ar.map(_ / n)
minMaxHeap enqueue LOCOValue(i, diffToExamine(indexToExamine), diffToExamine)
}

minMaxHeap.dequeueAll
}

override def transformFn: OPVector => TextMap = features => {
val baseResult = modelApply(labelDummy, features)
val baseScore = baseResult.score
val featureSize = features.value.size

// TODO: sparse implementation only works if changing values to zero - use dense vector to test effect of zeros
val featuresSparse = features.value.toSparse
val res = ArrayBuffer.empty[(Int, Double)]
featuresSparse.foreachActive((i, v) => res += i -> v)
val featureIndexSet = featuresSparse.indices.toSet

// Besides non 0 values, we want to check the text/date features as well
(textFeatureIndices ++ dateFeatureIndices).foreach(i => if (!featuresSparse.indices.contains(i)) res += i -> 0.0)
val featureArray = res.toArray
val featureSize = featuresSparse.size
val zeroValIndices = (textFeatureIndices ++ dateFeatureIndices)
.filterNot { featureIndexSet.contains }
.toArray

val k = $(topK)
// Index where to examine the difference in the prediction vector
Expand All @@ -232,15 +250,16 @@ class RecordInsightsLOCO[T <: Model[T]]
// For MultiClassification, the value is from the predicted class(i.e. the class having the highest probability)
case n if n > 2 => baseResult.prediction.toInt
}
val topPosNeg = returnTopPosNeg(featureArray, featureSize, baseScore, k, indexToExamine)
val topPosNeg = returnTopPosNeg(featuresSparse, zeroValIndices, featureSize, baseScore, k, indexToExamine)
val top = getTopKStrategy match {
case TopKStrategy.Abs => topPosNeg.sortBy { case LOCOValue(_, v, _) => -math.abs(v) }.take(k)
// Take top K positive and top K negative LOCOs, hence 2 * K
case TopKStrategy.PositiveNegative => topPosNeg.sortBy { case LOCOValue(_, v, _) => -v }.take(2 * k)
}

val allIndices = featuresSparse.indices ++ zeroValIndices
top.map { case LOCOValue(i, _, diffs) =>
RecordInsightsParser.insightToText(featureInfo(featureArray(i)._1), diffs)
RecordInsightsParser.insightToText(featureInfo(allIndices(i)), diffs)
}.toMap.toTextMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@

package com.salesforce.op.stages.impl.insights

import com.salesforce.op.{FeatureHistory, OpWorkflow}
import com.salesforce.op.features.FeatureLike
import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.classification.{OpLogisticRegression, OpRandomForestClassifier}
import com.salesforce.op._
import com.salesforce.op.features.FeatureLike
import com.salesforce.op.stages.impl.feature.{DateListPivot, TransmogrifierDefaults}
import com.salesforce.op.stages.impl.insights.RecordInsightsParser.Insights
import com.salesforce.op.stages.impl.preparators.{SanityCheckDataTest, SanityChecker}
Expand All @@ -45,17 +43,18 @@ import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext}
import com.salesforce.op.testkit.{RandomIntegral, RandomMap, RandomReal, RandomText, RandomVector}
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.spark.{OpVectorColumnHistory, OpVectorColumnMetadata, OpVectorMetadata}
import org.apache.spark.ml.{Model, PredictionModel}
import com.salesforce.op.{FeatureHistory, OpWorkflow, _}
import org.apache.spark.ml.PredictionModel
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.{DataFrame, Encoder, Row}
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.types.StructType
import org.apache.spark.ml.linalg._
import org.apache.spark.sql.{DataFrame, Encoder, Row}
import org.junit.runner.RunWith
import org.scalatest.{FlatSpec, Suite}
import org.scalatest.junit.JUnitRunner
import org.scalatest.{FlatSpec, Suite}


@RunWith(classOf[JUnitRunner])
Expand Down Expand Up @@ -307,8 +306,9 @@ class RecordInsightsLOCOTest extends FlatSpec with TestSparkContext with RecordI
}
withClue("SmartTextVectorizer detects country feature as a PickList, hence no " +
"aggregation required for LOCO on this field.") {
testData.actualRecordInsights.foreach(p => assert(p.keys.exists(r =>
r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined)))
testData.actualRecordInsights.foreach { p =>
assert(p.keys.exists(r => r.parentFeatureOrigins == Seq(countryFeatureName) && r.indicatorValue.isDefined))
}
}

assertLOCOSum(testData.actualRecordInsights)
Expand Down Expand Up @@ -624,4 +624,4 @@ case class RecordInsightsTestData[M <: PredictionModel[Vector, M]]
label: FeatureLike[RealNN],
sparkModel: OpPredictorWrapperModel[M],
actualRecordInsights: Array[Map[OpVectorColumnHistory, Insights]]
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,21 @@ object RichVector {
Vectors.sparse(size, indices.toArray, values.toArray).compressed
}

implicit class RichSparseVector(val v: SparseVector) extends AnyVal {
def foreachNonZeroIndexedValue(f: (Int, Int, Double) => Unit): Unit = {
(0 until v.indices.length)
.withFilter(v.values(_) != 0.0)
.foreach(i => f(i, v.indices(i), v.values(i)))
}

def update(index: Int, indexVal: Int, value: Double): Double = {
require(v.indices(index) == indexVal,
s"Invalid index: indices($index)==${v.indices(index)}, expected: $indexVal")
val oldVal = v.values(index)
v.values(index) = value
oldVal
}

def toIndexedArray: Array[(Int, Double)] = v.indices.zip(v.values)
}
}

0 comments on commit 66a5e99

Please sign in to comment.