Skip to content

Commit

Permalink
Adjust bin values for text features in RFF (#99)
Browse files Browse the repository at this point in the history
  • Loading branch information
sxd929 authored and tovbinm committed Sep 4, 2018
1 parent 83baade commit 2d7c275
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 138 deletions.
36 changes: 23 additions & 13 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
package com.salesforce.op

import com.salesforce.op.features.OPFeature
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter}
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter, Summary}
import com.salesforce.op.readers.Reader
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.impl.preparators.CorrelationType
Expand Down Expand Up @@ -488,22 +488,28 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
* Add a raw features filter to the workflow to look at fill rates and distributions of raw features and exclude
* features that do not meet specifications from modeling DAG
*
* @param trainingReader training reader to use in filter if not suplied will fall back to reader specified for
* workflow (note that this reader will take precidence over readers directly input to the
* workflow if both are supplied)
* @param scoringReader scoring reader to use in filter if not supplied will do the checks possible with only
* training data avaialable
* @param bins number of bins to use in estimating feature distributions
* @param minFillRate minimum non-null fraction of instances that a feature should contain
* @param trainingReader training reader to use in filter if not supplied will fall back to reader specified for
* workflow (note that this reader will take precedence over readers directly input to the
* workflow if both are supplied)
* @param scoringReader scoring reader to use in filter if not supplied will do the checks possible with only
* training data available
* @param bins number of bins to use in estimating feature distributions
* @param minFillRate minimum non-null fraction of instances that a feature should contain
* @param maxFillDifference maximum absolute difference in fill rate between scoring and training data for a feature
* @param maxFillRatioDiff maximum difference in fill ratio (symetric) between scoring and training data for a feature
* @param maxJSDivergence maximum Jensen-Shannon divergence between the training and scoring distributions
* for a feature
* @param maxFillRatioDiff maximum difference in fill ratio (symmetric) between scoring and training data for
* a feature
* @param maxJSDivergence maximum Jensen-Shannon divergence between the training and scoring distributions
* for a feature
* @param protectedFeatures list of features that should never be removed (features that are used to create them will
* also be protected)
* @param textBinsFormula formula to compute the text features bin size.
* Input arguments are [[Summary]] and number of bins to use in computing
* feature distributions (histograms for numerics, hashes for strings).
* Output is the bins for the text features.
* @tparam T Type of the data read in
*/
@Experimental
// scalastyle:off parameter.number
def withRawFeatureFilter[T](
trainingReader: Option[Reader[T]],
scoringReader: Option[Reader[T]],
Expand All @@ -514,7 +520,8 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
maxJSDivergence: Double = 0.90,
maxCorrelation: Double = 0.95,
correlationType: CorrelationType = CorrelationType.Pearson,
protectedFeatures: Array[OPFeature] = Array.empty
protectedFeatures: Array[OPFeature] = Array.empty,
textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula
): this.type = {
val training = trainingReader.orElse(reader).map(_.asInstanceOf[Reader[T]])
require(training.nonEmpty, "Reader for training data must be provided either in withRawFeatureFilter or directly" +
Expand All @@ -531,9 +538,12 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
maxJSDivergence = maxJSDivergence,
maxCorrelation = maxCorrelation,
correlationType = correlationType,
protectedFeatures = protectedRawFeatures)
protectedFeatures = protectedRawFeatures,
textBinsFormula = textBinsFormula
)
}
this
}
// scalastyle:on

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
package com.salesforce.op.filters

import com.salesforce.op.features.FeatureDistributionLike
import com.salesforce.op.stages.impl.feature.{Inclusion, NumericBucketizer}
import com.twitter.algebird.Semigroup
import com.salesforce.op.stages.impl.feature.{HashAlgorithm, Inclusion, NumericBucketizer}
import com.twitter.algebird.Monoid._
import com.twitter.algebird.Operators._
import com.twitter.algebird.Semigroup
import org.apache.spark.mllib.feature.HashingTF

/**
Expand All @@ -44,7 +44,7 @@ import org.apache.spark.mllib.feature.HashingTF
* @param key map key associated with distribution (when the feature is a map)
* @param count total count of feature seen
* @param nulls number of empties seen in feature
* @param distribution binned counts of feature values (hashed for strings, evently spaced bins for numerics)
* @param distribution binned counts of feature values (hashed for strings, evenly spaced bins for numerics)
* @param summaryInfo either min and max number of tokens for text data,
* or splits used for bins for numeric data
*/
Expand Down Expand Up @@ -125,8 +125,8 @@ case class FeatureDistribution
def jsDivergence(fd: FeatureDistribution): Double = {
checkMatch(fd)
val combinedCounts = distribution.zip(fd.distribution).filterNot{ case (a, b) => a == 0.0 && b == 0.0 }
val (thisCount, thatCount) = combinedCounts
.fold[(Double, Double)]( (0, 0)){ case ((a1, b1), (a2, b2)) => (a1 + a2, b1 + b2) }
val (thisCount, thatCount) =
combinedCounts.fold[(Double, Double)]((0.0, 0.0)){ case ((a1, b1), (a2, b2)) => (a1 + a2, b1 + b2) }
val probs = combinedCounts.map{ case (a, b) => a / thisCount -> b / thatCount }
val meanProb = probs.map{ case (a, b) => (a + b) / 2}
def log2(x: Double) = math.log10(x) / math.log10(2.0)
Expand Down Expand Up @@ -154,19 +154,22 @@ private[op] object FeatureDistribution {
* @param summary feature summary
* @param value optional processed sequence
* @param bins number of histogram bins
* @param hasher hashing method to use for text and categorical features
* @param textBinsFormula formula to compute the text features bin size.
* Input arguments are [[Summary]] and number of bins to use in computing feature distributions
* (histograms for numerics, hashes for strings). Output is the bins for the text features.
* @return a pair consisting of response and predictor feature distributions (in this order)
* @return feature distribution given the provided information
*/
def apply(
featureKey: FeatureKey,
summary: Summary,
value: Option[ProcessedSeq],
bins: Int,
hasher: HashingTF
textBinsFormula: (Summary, Int) => Int
): FeatureDistribution = {
val (nullCount, (summaryInfo, distribution)): (Int, (Array[Double], Array[Double])) =
value.map(seq => 0 -> histValues(seq, summary, bins, hasher))
.getOrElse(1 -> (Array(summary.min, summary.max) -> Array.fill(bins)(0.0)))
value.map(seq => 0 -> histValues(seq, summary, bins, textBinsFormula))
.getOrElse(1 -> (Array(summary.min, summary.max, summary.sum, summary.count) -> new Array[Double](bins)))

FeatureDistribution(
name = featureKey._1,
Expand All @@ -179,40 +182,49 @@ private[op] object FeatureDistribution {

/**
* Function to put data into histogram of counts
* @param values values to bin
* @param sum summary info for feature (max and min)
* @param bins number of bins to produce
* @param hasher hasing function to use for text
*
* @param values values to bin
* @param summary summary info for feature (max, min, etc)
* @param bins number of bins to produce
* @param textBinsFormula formula to compute the text features bin size.
* Input arguments are [[Summary]] and number of bins to use in computing feature distributions
* (histograms for numerics, hashes for strings). Output is the bins for the text features.
* @return a pair consisting of response and predictor feature distributions (in this order)
* @return the bin information and the binned counts
*/
// TODO avoid wrapping and unwrapping??
private def histValues(
values: ProcessedSeq,
sum: Summary,
summary: Summary,
bins: Int,
hasher: HashingTF
): (Array[Double], Array[Double]) = {
values match {
case Left(seq) => Array(sum.min, sum.max) -> hasher.transform(seq).toArray // TODO use summary info to pick hashes
case Right(seq) => // TODO use kernel fit instead of histogram
if (sum == Summary.empty) {
Array(sum.min, sum.max) -> seq.toArray // the seq will always be empty in this case
} else if (sum.min < sum.max) {
val step = (sum.max - sum.min) / (bins - 2.0) // total number of bins includes one for edge and one for other
val splits = (0 until bins).map(b => sum.min + step * b).toArray
val binned = seq.map { v =>
NumericBucketizer.bucketize(
splits = splits, trackNulls = false, trackInvalid = true,
splitInclusion = Inclusion.Left, input = Option(v)
).toArray
}
val hist = binned.fold(new Array[Double](bins))(_ + _)
splits -> hist
} else {
val same = seq.map(v => if (v == sum.max) 1.0 else 0.0).sum
val other = seq.map(v => if (v != sum.max) 1.0 else 0.0).sum
Array(sum.min, sum.max) -> Array(same, other)
textBinsFormula: (Summary, Int) => Int
): (Array[Double], Array[Double]) = values match {
case Left(seq) =>
val numBins = textBinsFormula(summary, bins)
// TODO: creating too many hasher instances may cause problem, efficiency, garbage collection etc
val hasher =
new HashingTF(numFeatures = numBins).setBinary(false)
.setHashAlgorithm(HashAlgorithm.MurMur3.entryName.toLowerCase)
Array(summary.min, summary.max, summary.sum, summary.count) -> hasher.transform(seq).toArray

case Right(seq) => // TODO use kernel fit instead of histogram
if (summary == Summary.empty) {
Array(summary.min, summary.max) -> seq.toArray // the seq will always be empty in this case
} else if (summary.min < summary.max) {
// total number of bins includes one for edge and one for other
val step = (summary.max - summary.min) / (bins - 2.0)
val splits = (0 until bins).map(b => summary.min + step * b).toArray
val binned = seq.map { v =>
NumericBucketizer.bucketize(
splits = splits, trackNulls = false, trackInvalid = true,
splitInclusion = Inclusion.Left, input = Option(v)
).toArray
}
}
val hist = binned.fold(new Array[Double](bins))(_ + _)
splits -> hist
} else {
val same = seq.map(v => if (v == summary.max) 1.0 else 0.0).sum
val other = seq.map(v => if (v != summary.max) 1.0 else 0.0).sum
Array(summary.min, summary.max, summary.sum, summary.count) -> Array(same, other)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.feature.TextTokenizer
import com.salesforce.op.utils.spark.RichRow._
import com.salesforce.op.utils.text.Language
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.sql.Row

Expand Down Expand Up @@ -82,19 +81,21 @@ private[filters] case class PreparedFeatures(
* @param responseSummaries global feature metadata
* @param predictorSummaries set of feature summary statistics (derived from metadata)
* @param bins number of bins to put numerics into
* @param hasher hash function to use on strings
* @param textBinsFormula formula to compute the text features bin size.
* Input arguments are [[Summary]] and number of bins to use in computing feature distributions
* (histograms for numerics, hashes for strings). Output is the bins for the text features.
* @return a pair consisting of response and predictor feature distributions (in this order)
*/
def getFeatureDistributions(
responseSummaries: Array[(FeatureKey, Summary)],
predictorSummaries: Array[(FeatureKey, Summary)],
bins: Int,
hasher: HashingTF
textBinsFormula: (Summary, Int) => Int
): (Array[FeatureDistribution], Array[FeatureDistribution]) = {
val responseFeatureDistributions: Array[FeatureDistribution] =
getFeatureDistributions(responses, responseSummaries, bins, hasher)
getFeatureDistributions(responses, responseSummaries, bins, textBinsFormula)
val predictorFeatureDistributions: Array[FeatureDistribution] =
getFeatureDistributions(predictors, predictorSummaries, bins, hasher)
getFeatureDistributions(predictors, predictorSummaries, bins, textBinsFormula)

responseFeatureDistributions -> predictorFeatureDistributions
}
Expand All @@ -103,14 +104,15 @@ private[filters] case class PreparedFeatures(
features: Map[FeatureKey, ProcessedSeq],
summaries: Array[(FeatureKey, Summary)],
bins: Int,
hasher: HashingTF
textBinsFormula: (Summary, Int) => Int
): Array[FeatureDistribution] = summaries.map { case (featureKey, summary) =>
FeatureDistribution(
featureKey = featureKey,
summary = summary,
value = features.get(featureKey),
bins = bins,
hasher = hasher)
textBinsFormula = textBinsFormula
)
}
}

Expand All @@ -126,7 +128,7 @@ private[filters] object PreparedFeatures {
* @return set of prepared features
*/
def apply(row: Row, responses: Array[TransientFeature], predictors: Array[TransientFeature]): PreparedFeatures = {
val empty: Map[FeatureKey, ProcessedSeq] = Map()
val empty: Map[FeatureKey, ProcessedSeq] = Map.empty
val preparedResponses = responses.foldLeft(empty) { case (map, feature) =>
val converter = FeatureTypeSparkConverter.fromFeatureTypeName(feature.typeName)
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter))
Expand All @@ -150,13 +152,11 @@ private[filters] object PreparedFeatures {
private def prepareFeature[T <: FeatureType](name: String, value: T): Map[FeatureKey, ProcessedSeq] =
value match {
case v: Text => v.value
.map(s => Map[FeatureKey, ProcessedSeq]((name, None) -> Left(tokenize(s))))
.getOrElse(Map())
.map(s => Map[FeatureKey, ProcessedSeq]((name, None) -> Left(tokenize(s)))).getOrElse(Map.empty)
case v: OPNumeric[_] => v.toDouble
.map(d => Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(d))))
.getOrElse(Map())
case ft@SomeValue(v: DenseVector) => Map((name, None) -> Right(v.toArray.toSeq))
case ft@SomeValue(v: SparseVector) => Map((name, None) -> Right(v.indices.map(_.toDouble).toSeq))
.map(d => Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(d)))).getOrElse(Map.empty)
case SomeValue(v: DenseVector) => Map((name, None) -> Right(v.toArray.toSeq))
case SomeValue(v: SparseVector) => Map((name, None) -> Right(v.indices.map(_.toDouble).toSeq))
case ft@SomeValue(_) => ft match {
case v: Geolocation => Map((name, None) -> Right(v.value))
case v: TextList => Map((name, None) -> Left(v.value))
Expand All @@ -173,7 +173,7 @@ private[filters] object PreparedFeatures {
}}
case _ => throw new RuntimeException(s"Feature type $value is not supported in RawFeatureFilter")
}
case _ => Map()
case _ => Map.empty
}

/**
Expand Down
Loading

0 comments on commit 2d7c275

Please sign in to comment.