Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust bin values for text features in RFF #99

Merged
merged 27 commits into from
Sep 4, 2018
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
da5ba89
adjust bin value based on number of token
sxd929 Aug 28, 2018
1d90e8f
cleanup
sxd929 Aug 28, 2018
3b6c45f
address comments
sxd929 Aug 28, 2018
eea1cf2
merge
sxd929 Aug 28, 2018
b958887
add sum and count to summary
sxd929 Aug 29, 2018
a54131b
change formula
sxd929 Aug 29, 2018
dae829d
add todo
sxd929 Aug 29, 2018
d6ad74f
Merge branch 'master' into xs/adjustBinValueForText
tovbinm Aug 30, 2018
d9dee94
use bins as default
sxd929 Aug 30, 2018
1bb28c6
Merge branch 'xs/adjustBinValueForText' of https://github.com/salesfo…
sxd929 Aug 30, 2018
441ab06
Merge branch 'master' into xs/adjustBinValueForText
sxd929 Aug 30, 2018
546fcfc
Merge branch 'master' into xs/adjustBinValueForText
tovbinm Aug 30, 2018
13cb288
Merge branch 'master' into xs/adjustBinValueForText
tovbinm Aug 30, 2018
c47e9bc
address comments
sxd929 Aug 31, 2018
e8d616b
cleanup
sxd929 Aug 31, 2018
e133bb4
Merge branch 'xs/adjustBinValueForText' of https://github.com/salesfo…
sxd929 Aug 31, 2018
20a9c40
cleanup
sxd929 Aug 31, 2018
e39dad6
Merge branch 'master' into xs/adjustBinValueForText
sxd929 Aug 31, 2018
1c6d359
Merge branch 'master' into xs/adjustBinValueForText
tovbinm Aug 31, 2018
5e2857c
Added textBinsFormula to RFF
tovbinm Aug 31, 2018
a2ffd3a
make scalastyle happy
tovbinm Aug 31, 2018
3c35a13
Merge branch 'master' into xs/adjustBinValueForText
tovbinm Aug 31, 2018
2f4f77a
Merge branch 'master' into xs/adjustBinValueForText
tovbinm Sep 1, 2018
6187e55
Merge branch 'master' into xs/adjustBinValueForText
tovbinm Sep 2, 2018
7ff99fe
Merge branch 'master' into xs/adjustBinValueForText
tovbinm Sep 3, 2018
bdc0608
update docs
tovbinm Sep 4, 2018
dfc1a40
Added docs
tovbinm Sep 4, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
package com.salesforce.op

import com.salesforce.op.features.OPFeature
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter}
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter, Summary}
import com.salesforce.op.readers.Reader
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.impl.preparators.CorrelationType
Expand Down Expand Up @@ -488,22 +488,25 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
* Add a raw features filter to the workflow to look at fill rates and distributions of raw features and exclude
* features that do not meet specifications from modeling DAG
*
* @param trainingReader training reader to use in filter if not suplied will fall back to reader specified for
* workflow (note that this reader will take precidence over readers directly input to the
* workflow if both are supplied)
* @param scoringReader scoring reader to use in filter if not supplied will do the checks possible with only
* training data avaialable
* @param bins number of bins to use in estimating feature distributions
* @param minFillRate minimum non-null fraction of instances that a feature should contain
* @param trainingReader training reader to use in filter if not supplied will fall back to reader specified for
* workflow (note that this reader will take precedence over readers directly input to the
* workflow if both are supplied)
* @param scoringReader scoring reader to use in filter if not supplied will do the checks possible with only
* training data available
* @param bins number of bins to use in estimating feature distributions
* @param minFillRate minimum non-null fraction of instances that a feature should contain
* @param maxFillDifference maximum absolute difference in fill rate between scoring and training data for a feature
* @param maxFillRatioDiff maximum difference in fill ratio (symetric) between scoring and training data for a feature
* @param maxJSDivergence maximum Jensen-Shannon divergence between the training and scoring distributions
* for a feature
* @param maxFillRatioDiff maximum difference in fill ratio (symmetric) between scoring and training data for
* a feature
* @param maxJSDivergence maximum Jensen-Shannon divergence between the training and scoring distributions
* for a feature
* @param protectedFeatures list of features that should never be removed (features that are used to create them will
* also be protected)
* @param textBinsFormula formula to compute the text features bin size
* @tparam T Type of the data read in
*/
@Experimental
// scalastyle:off parameter.number
def withRawFeatureFilter[T](
trainingReader: Option[Reader[T]],
scoringReader: Option[Reader[T]],
Expand All @@ -514,7 +517,8 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
maxJSDivergence: Double = 0.90,
maxCorrelation: Double = 0.95,
correlationType: CorrelationType = CorrelationType.Pearson,
protectedFeatures: Array[OPFeature] = Array.empty
protectedFeatures: Array[OPFeature] = Array.empty,
textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the int?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's the bins.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but that is not what we want the formula to look like long term

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what is your suggestion on the inputs / outputs then?

): this.type = {
val training = trainingReader.orElse(reader).map(_.asInstanceOf[Reader[T]])
require(training.nonEmpty, "Reader for training data must be provided either in withRawFeatureFilter or directly" +
Expand All @@ -531,9 +535,12 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
maxJSDivergence = maxJSDivergence,
maxCorrelation = maxCorrelation,
correlationType = correlationType,
protectedFeatures = protectedRawFeatures)
protectedFeatures = protectedRawFeatures,
textBinsFormula = textBinsFormula
)
}
this
}
// scalastyle:on

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
package com.salesforce.op.filters

import com.salesforce.op.features.FeatureDistributionLike
import com.salesforce.op.stages.impl.feature.{Inclusion, NumericBucketizer}
import com.twitter.algebird.Semigroup
import com.salesforce.op.stages.impl.feature.{HashAlgorithm, Inclusion, NumericBucketizer}
import com.twitter.algebird.Monoid._
import com.twitter.algebird.Operators._
import com.twitter.algebird.Semigroup
import org.apache.spark.mllib.feature.HashingTF

/**
Expand All @@ -44,7 +44,7 @@ import org.apache.spark.mllib.feature.HashingTF
* @param key map key associated with distribution (when the feature is a map)
* @param count total count of feature seen
* @param nulls number of empties seen in feature
* @param distribution binned counts of feature values (hashed for strings, evently spaced bins for numerics)
* @param distribution binned counts of feature values (hashed for strings, evenly spaced bins for numerics)
* @param summaryInfo either min and max number of tokens for text data,
* or splits used for bins for numeric data
*/
Expand Down Expand Up @@ -125,8 +125,8 @@ case class FeatureDistribution
def jsDivergence(fd: FeatureDistribution): Double = {
checkMatch(fd)
val combinedCounts = distribution.zip(fd.distribution).filterNot{ case (a, b) => a == 0.0 && b == 0.0 }
val (thisCount, thatCount) = combinedCounts
.fold[(Double, Double)]( (0, 0)){ case ((a1, b1), (a2, b2)) => (a1 + a2, b1 + b2) }
val (thisCount, thatCount) =
combinedCounts.fold[(Double, Double)]((0.0, 0.0)){ case ((a1, b1), (a2, b2)) => (a1 + a2, b1 + b2) }
val probs = combinedCounts.map{ case (a, b) => a / thisCount -> b / thatCount }
val meanProb = probs.map{ case (a, b) => (a + b) / 2}
def log2(x: Double) = math.log10(x) / math.log10(2.0)
Expand Down Expand Up @@ -154,19 +154,19 @@ private[op] object FeatureDistribution {
* @param summary feature summary
* @param value optional processed sequence
* @param bins number of histogram bins
* @param hasher hashing method to use for text and categorical features
* @param textBinsFormula formula to compute the text features bin size
* @return feature distribution given the provided information
*/
def apply(
featureKey: FeatureKey,
summary: Summary,
value: Option[ProcessedSeq],
bins: Int,
hasher: HashingTF
textBinsFormula: (Summary, Int) => Int
): FeatureDistribution = {
val (nullCount, (summaryInfo, distribution)): (Int, (Array[Double], Array[Double])) =
value.map(seq => 0 -> histValues(seq, summary, bins, hasher))
.getOrElse(1 -> (Array(summary.min, summary.max) -> Array.fill(bins)(0.0)))
value.map(seq => 0 -> histValues(seq, summary, bins, textBinsFormula))
.getOrElse(1 -> (Array(summary.min, summary.max, summary.sum, summary.count) -> new Array[Double](bins)))

FeatureDistribution(
name = featureKey._1,
Expand All @@ -179,40 +179,46 @@ private[op] object FeatureDistribution {

/**
* Function to put data into histogram of counts
* @param values values to bin
* @param sum summary info for feature (max and min)
* @param bins number of bins to produce
* @param hasher hasing function to use for text
*
* @param values values to bin
* @param summary summary info for feature (max, min, etc)
* @param bins number of bins to produce
* @param textBinsFormula formula to compute the text features bin size
* @return the bin information and the binned counts
*/
// TODO avoid wrapping and unwrapping??
private def histValues(
values: ProcessedSeq,
sum: Summary,
summary: Summary,
bins: Int,
hasher: HashingTF
): (Array[Double], Array[Double]) = {
values match {
case Left(seq) => Array(sum.min, sum.max) -> hasher.transform(seq).toArray // TODO use summary info to pick hashes
case Right(seq) => // TODO use kernel fit instead of histogram
if (sum == Summary.empty) {
Array(sum.min, sum.max) -> seq.toArray // the seq will always be empty in this case
} else if (sum.min < sum.max) {
val step = (sum.max - sum.min) / (bins - 2.0) // total number of bins includes one for edge and one for other
val splits = (0 until bins).map(b => sum.min + step * b).toArray
val binned = seq.map { v =>
NumericBucketizer.bucketize(
splits = splits, trackNulls = false, trackInvalid = true,
splitInclusion = Inclusion.Left, input = Option(v)
).toArray
}
val hist = binned.fold(new Array[Double](bins))(_ + _)
splits -> hist
} else {
val same = seq.map(v => if (v == sum.max) 1.0 else 0.0).sum
val other = seq.map(v => if (v != sum.max) 1.0 else 0.0).sum
Array(sum.min, sum.max) -> Array(same, other)
textBinsFormula: (Summary, Int) => Int
): (Array[Double], Array[Double]) = values match {
case Left(seq) =>
val numBins = textBinsFormula(summary, bins)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would the formula need the bins?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we set is as def textBinsFormula(s: Summary, bins: Int) = bins.
I can image default bins can be returned in some cases.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would you recommend then? @leahmcguire

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bins is the default number (e.g. 100) there might be cases that we will just want to fall back to the defaults

// TODO: creating too many hasher instances may cause problem, efficiency, garbage collection etc
val hasher =
new HashingTF(numFeatures = numBins).setBinary(false)
.setHashAlgorithm(HashAlgorithm.MurMur3.entryName.toLowerCase)
Array(summary.min, summary.max, summary.sum, summary.count) -> hasher.transform(seq).toArray

case Right(seq) => // TODO use kernel fit instead of histogram
if (summary == Summary.empty) {
Array(summary.min, summary.max) -> seq.toArray // the seq will always be empty in this case
} else if (summary.min < summary.max) {
// total number of bins includes one for edge and one for other
val step = (summary.max - summary.min) / (bins - 2.0)
val splits = (0 until bins).map(b => summary.min + step * b).toArray
val binned = seq.map { v =>
NumericBucketizer.bucketize(
splits = splits, trackNulls = false, trackInvalid = true,
splitInclusion = Inclusion.Left, input = Option(v)
).toArray
}
}
val hist = binned.fold(new Array[Double](bins))(_ + _)
splits -> hist
} else {
val same = seq.map(v => if (v == summary.max) 1.0 else 0.0).sum
val other = seq.map(v => if (v != summary.max) 1.0 else 0.0).sum
Array(summary.min, summary.max, summary.sum, summary.count) -> Array(same, other)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import com.salesforce.op.features.types._
import com.salesforce.op.stages.impl.feature.TextTokenizer
import com.salesforce.op.utils.spark.RichRow._
import com.salesforce.op.utils.text.Language
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.sql.Row

Expand Down Expand Up @@ -82,19 +81,19 @@ private[filters] case class PreparedFeatures(
* @param responseSummaries global feature metadata
* @param predictorSummaries set of feature summary statistics (derived from metadata)
* @param bins number of bins to put numerics into
* @param hasher hash function to use on strings
* @param textBinsFormula formula to compute the text features bin size
* @return a pair consisting of response and predictor feature distributions (in this order)
*/
def getFeatureDistributions(
responseSummaries: Array[(FeatureKey, Summary)],
predictorSummaries: Array[(FeatureKey, Summary)],
bins: Int,
hasher: HashingTF
textBinsFormula: (Summary, Int) => Int
): (Array[FeatureDistribution], Array[FeatureDistribution]) = {
val responseFeatureDistributions: Array[FeatureDistribution] =
getFeatureDistributions(responses, responseSummaries, bins, hasher)
getFeatureDistributions(responses, responseSummaries, bins, textBinsFormula)
val predictorFeatureDistributions: Array[FeatureDistribution] =
getFeatureDistributions(predictors, predictorSummaries, bins, hasher)
getFeatureDistributions(predictors, predictorSummaries, bins, textBinsFormula)

responseFeatureDistributions -> predictorFeatureDistributions
}
Expand All @@ -103,14 +102,15 @@ private[filters] case class PreparedFeatures(
features: Map[FeatureKey, ProcessedSeq],
summaries: Array[(FeatureKey, Summary)],
bins: Int,
hasher: HashingTF
textBinsFormula: (Summary, Int) => Int
): Array[FeatureDistribution] = summaries.map { case (featureKey, summary) =>
FeatureDistribution(
featureKey = featureKey,
summary = summary,
value = features.get(featureKey),
bins = bins,
hasher = hasher)
textBinsFormula = textBinsFormula
)
}
}

Expand All @@ -126,7 +126,7 @@ private[filters] object PreparedFeatures {
* @return set of prepared features
*/
def apply(row: Row, responses: Array[TransientFeature], predictors: Array[TransientFeature]): PreparedFeatures = {
val empty: Map[FeatureKey, ProcessedSeq] = Map()
val empty: Map[FeatureKey, ProcessedSeq] = Map.empty
val preparedResponses = responses.foldLeft(empty) { case (map, feature) =>
val converter = FeatureTypeSparkConverter.fromFeatureTypeName(feature.typeName)
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter))
Expand All @@ -150,13 +150,11 @@ private[filters] object PreparedFeatures {
private def prepareFeature[T <: FeatureType](name: String, value: T): Map[FeatureKey, ProcessedSeq] =
value match {
case v: Text => v.value
.map(s => Map[FeatureKey, ProcessedSeq]((name, None) -> Left(tokenize(s))))
.getOrElse(Map())
.map(s => Map[FeatureKey, ProcessedSeq]((name, None) -> Left(tokenize(s)))).getOrElse(Map.empty)
case v: OPNumeric[_] => v.toDouble
.map(d => Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(d))))
.getOrElse(Map())
case ft@SomeValue(v: DenseVector) => Map((name, None) -> Right(v.toArray.toSeq))
case ft@SomeValue(v: SparseVector) => Map((name, None) -> Right(v.indices.map(_.toDouble).toSeq))
.map(d => Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(d)))).getOrElse(Map.empty)
case SomeValue(v: DenseVector) => Map((name, None) -> Right(v.toArray.toSeq))
case SomeValue(v: SparseVector) => Map((name, None) -> Right(v.indices.map(_.toDouble).toSeq))
case ft@SomeValue(_) => ft match {
case v: Geolocation => Map((name, None) -> Right(v.value))
case v: TextList => Map((name, None) -> Left(v.value))
Expand All @@ -173,7 +171,7 @@ private[filters] object PreparedFeatures {
}}
case _ => throw new RuntimeException(s"Feature type $value is not supported in RawFeatureFilter")
}
case _ => Map()
case _ => Map.empty
}

/**
Expand Down
Loading