-
Notifications
You must be signed in to change notification settings - Fork 394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adjust bin values for text features in RFF #99
Changes from 8 commits
da5ba89
1d90e8f
3b6c45f
eea1cf2
b958887
a54131b
dae829d
d6ad74f
d9dee94
1bb28c6
441ab06
546fcfc
13cb288
c47e9bc
e8d616b
e133bb4
20a9c40
e39dad6
1c6d359
5e2857c
a2ffd3a
3c35a13
2f4f77a
6187e55
7ff99fe
bdc0608
dfc1a40
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
|
||
package com.salesforce.op.filters | ||
|
||
import com.salesforce.op.stages.impl.feature.{HashAlgorithm, Inclusion, NumericBucketizer} | ||
import com.salesforce.op.features.FeatureDistributionLike | ||
import com.salesforce.op.stages.impl.feature.{Inclusion, NumericBucketizer} | ||
import com.twitter.algebird.Semigroup | ||
|
@@ -142,6 +143,13 @@ case class FeatureDistribution | |
private[op] object FeatureDistribution { | ||
|
||
val MaxBins = 100000 | ||
val AvgBinValue = 5000 | ||
val MaxTokenLowerLimit = 10 | ||
val getBins = (sum: Summary, bins: Int) => { | ||
// To catch categoricals | ||
if (sum.max < MaxTokenLowerLimit) bins | ||
else math.min(math.max(bins, sum.sum / AvgBinValue), MaxBins).intValue() | ||
} | ||
|
||
implicit val semigroup: Semigroup[FeatureDistribution] = new Semigroup[FeatureDistribution] { | ||
override def plus(l: FeatureDistribution, r: FeatureDistribution) = l.reduce(r) | ||
|
@@ -154,19 +162,17 @@ private[op] object FeatureDistribution { | |
* @param summary feature summary | ||
* @param value optional processed sequence | ||
* @param bins number of histogram bins | ||
* @param hasher hashing method to use for text and categorical features | ||
* @return feature distribution given the provided information | ||
*/ | ||
def apply( | ||
featureKey: FeatureKey, | ||
summary: Summary, | ||
value: Option[ProcessedSeq], | ||
bins: Int, | ||
hasher: HashingTF | ||
bins: Int | ||
): FeatureDistribution = { | ||
val (nullCount, (summaryInfo, distribution)): (Int, (Array[Double], Array[Double])) = | ||
value.map(seq => 0 -> histValues(seq, summary, bins, hasher)) | ||
.getOrElse(1 -> (Array(summary.min, summary.max) -> Array.fill(bins)(0.0))) | ||
value.map(seq => 0 -> histValues(seq, summary, bins, getBins)) | ||
.getOrElse(1 -> (Array(summary.min, summary.max, summary.sum, summary.count) -> Array.fill(bins)(0.0))) | ||
|
||
FeatureDistribution( | ||
name = featureKey._1, | ||
|
@@ -182,18 +188,26 @@ private[op] object FeatureDistribution { | |
* @param values values to bin | ||
* @param sum summary info for feature (max and min) | ||
* @param bins number of bins to produce | ||
* @param hasher hasing function to use for text | ||
* @param getBins | ||
* @return the bin information and the binned counts | ||
*/ | ||
// TODO avoid wrapping and unwrapping?? | ||
private def histValues( | ||
values: ProcessedSeq, | ||
sum: Summary, | ||
bins: Int, | ||
hasher: HashingTF | ||
getBins: (Summary, Int) => Int | ||
): (Array[Double], Array[Double]) = { | ||
values match { | ||
case Left(seq) => Array(sum.min, sum.max) -> hasher.transform(seq).toArray // TODO use summary info to pick hashes | ||
case Left(seq) => { | ||
val numBins = getBins(sum, bins) | ||
|
||
// Todo: creating too many hashers may cause problem, efficiency, garbage collection etc | ||
val hasher: HashingTF = new HashingTF(numFeatures = numBins) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we have to create hasher every time or perhaps we can create it once? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the hashing dimension can be different for different features, the minimum number would be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let see if we can reuse the hashing function without creating There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and I think we can. See |
||
.setBinary(false) | ||
.setHashAlgorithm(HashAlgorithm.MurMur3.toString.toLowerCase) | ||
Array(sum.min, sum.max, sum.sum, sum.count) -> hasher.transform(seq).toArray | ||
} | ||
case Right(seq) => // TODO use kernel fit instead of histogram | ||
if (sum == Summary.empty) { | ||
Array(sum.min, sum.max) -> seq.toArray // the seq will always be empty in this case | ||
|
@@ -211,7 +225,7 @@ private[op] object FeatureDistribution { | |
} else { | ||
val same = seq.map(v => if (v == sum.max) 1.0 else 0.0).sum | ||
val other = seq.map(v => if (v != sum.max) 1.0 else 0.0).sum | ||
Array(sum.min, sum.max) -> Array(same, other) | ||
Array(sum.min, sum.max, sum.sum, sum.count) -> Array(same, other) | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,12 +30,9 @@ | |
|
||
package com.salesforce.op.filters | ||
|
||
import com.salesforce.op.OpParams | ||
import com.salesforce.op.features.{OPFeature, TransientFeature} | ||
import com.salesforce.op.stages.impl.feature.HashAlgorithm | ||
import com.salesforce.op.features.TransientFeature | ||
import com.salesforce.op.test.PassengerSparkFixtureTest | ||
import com.salesforce.op.utils.spark.RichDataset._ | ||
import org.apache.spark.mllib.feature.HashingTF | ||
import com.salesforce.op.testkit.RandomText | ||
import org.junit.runner.RunWith | ||
import org.scalatest.FlatSpec | ||
import org.scalatest.junit.JUnitRunner | ||
|
@@ -50,18 +47,16 @@ class FeatureDistributionTest extends FlatSpec with PassengerSparkFixtureTest wi | |
(true, Left(Seq.empty[String])), (false, Right(Seq(1.0, 3.0, 5.0))) | ||
) | ||
val summary = | ||
Array(Summary(0.0, 1.0), Summary(-1.6, 10.6), Summary(0.0, 3.0), Summary(0.0, 0.0), Summary(1.0, 5.0)) | ||
Array(Summary(0.0, 1.0, 6.0, 10), Summary(-1.6, 10.6, 3.0, 10), | ||
Summary(0.0, 3.0, 7.0, 10), Summary(0.0, 0.0, 5.0, 10), Summary(1.0, 5.0, 10.0, 10)) | ||
val bins = 10 | ||
val hasher: HashingTF = new HashingTF(numFeatures = bins) | ||
.setBinary(false) | ||
.setHashAlgorithm(HashAlgorithm.MurMur3.toString.toLowerCase) | ||
|
||
val featureKeys: Array[FeatureKey] = features.map(f => (f.name, None)) | ||
val processedSeqs: Array[Option[ProcessedSeq]] = values.map { case (isEmpty, processed) => | ||
if (isEmpty) None else Option(processed) | ||
} | ||
val distribs = featureKeys.zip(summary).zip(processedSeqs).map { case ((key, summ), seq) => | ||
FeatureDistribution(key, summ, seq, bins, hasher) | ||
FeatureDistribution(key, summ, seq, bins) | ||
} | ||
distribs.foreach{ d => | ||
d.key shouldBe None | ||
|
@@ -72,30 +67,47 @@ class FeatureDistributionTest extends FlatSpec with PassengerSparkFixtureTest wi | |
distribs(1).nulls shouldBe 1 | ||
distribs(1).distribution.sum shouldBe 0 | ||
distribs(2).distribution.sum shouldBe 2 | ||
distribs(2).summaryInfo should contain theSameElementsAs Array(0.0, 3.0) | ||
distribs(2).summaryInfo should contain theSameElementsAs Array(0.0, 3.0, 7.0, 10.0) | ||
distribs(3).distribution.sum shouldBe 0 | ||
distribs(4).distribution.sum shouldBe 3 | ||
distribs(4).summaryInfo.length shouldBe bins | ||
} | ||
|
||
it should "be correctly created for text features" in { | ||
val features = Array(description, gender) | ||
val values: Array[(Boolean, ProcessedSeq)] = Array( | ||
(false, Left(RandomText.strings(1, 10).take(10000).toSeq.map(_.value.get))) | ||
) | ||
val summary = Array(Summary(1000.0, 50000.0, 70000.0, 10)) | ||
val bins = 100 | ||
val featureKeys: Array[FeatureKey] = features.map(f => (f.name, None)) | ||
val processedSeqs: Array[Option[ProcessedSeq]] = values.map { case (isEmpty, processed) => | ||
if (isEmpty) None else Option(processed) | ||
} | ||
val distribs = featureKeys.zip(summary).zip(processedSeqs).map { case ((key, summ), seq) => | ||
FeatureDistribution(key, summ, seq, bins) | ||
} | ||
|
||
distribs(0).distribution.length shouldBe 100 | ||
distribs(0).distribution.sum shouldBe 10000 | ||
|
||
} | ||
|
||
it should "be correctly created for map features" in { | ||
val features = Array(stringMap, numericMap, booleanMap).map(TransientFeature.apply) | ||
val values: Array[Map[String, ProcessedSeq]] = Array( | ||
Map("A" -> Left(Seq("male", "female"))), | ||
Map("A" -> Right(Seq(1.0)), "B" -> Right(Seq(1.0))), | ||
Map("B" -> Right(Seq(0.0)))) | ||
val summary = Array( | ||
Map("A" -> Summary(0.0, 1.0), "B" -> Summary(0.0, 5.0)), | ||
Map("A" -> Summary(-1.6, 10.6), "B" -> Summary(0.0, 3.0)), | ||
Map("B" -> Summary(0.0, 0.0))) | ||
Map("A" -> Summary(0.0, 2.0, 100.0, 10), "B" -> Summary(0.0, 5.0, 10.0, 10)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did the max change for just the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is hardcoded, I changed it since a sample of "A" is Seq("male", "female"), number of hashes in the text is already 2, so the max should not be smaller than 2 |
||
Map("A" -> Summary(-1.6, 10.6, 30.0, 10), "B" -> Summary(0.0, 3.0, 11.0, 10)), | ||
Map("B" -> Summary(0.0, 0.0, 0.0, 10))) | ||
val bins = 10 | ||
val hasher: HashingTF = new HashingTF(numFeatures = bins) | ||
.setBinary(false) | ||
.setHashAlgorithm(HashAlgorithm.MurMur3.toString.toLowerCase) | ||
val distribs = features.map(_.name).zip(summary).zip(values).flatMap { case ((name, summaryMaps), valueMaps) => | ||
summaryMaps.map { case (key, summary) => | ||
val featureKey = (name, Option(key)) | ||
FeatureDistribution(featureKey, summary, valueMaps.get(key), bins, hasher) | ||
FeatureDistribution(featureKey, summary, valueMaps.get(key), bins) | ||
} | ||
} | ||
|
||
|
@@ -107,15 +119,15 @@ class FeatureDistributionTest extends FlatSpec with PassengerSparkFixtureTest wi | |
else d.distribution.length shouldBe 2 | ||
} | ||
distribs(0).nulls shouldBe 0 | ||
distribs(0).summaryInfo should contain theSameElementsAs Array(0.0, 1.0) | ||
distribs(0).summaryInfo should contain theSameElementsAs Array(0.0, 2.0, 100.0, 10.0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, why did the max change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this one reads the info from hardcoded 'summaries' above |
||
distribs(1).nulls shouldBe 1 | ||
distribs(0).distribution.sum shouldBe 2 | ||
distribs(1).distribution.sum shouldBe 0 | ||
distribs(2).summaryInfo.length shouldBe bins | ||
distribs(2).distribution.sum shouldBe 1 | ||
distribs(4).distribution(0) shouldBe 1 | ||
distribs(4).distribution(1) shouldBe 0 | ||
distribs(4).summaryInfo.length shouldBe 2 | ||
distribs(4).summaryInfo.length shouldBe 4 | ||
} | ||
|
||
it should "correctly compare fill rates" in { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add move this method to Summary class., ie.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed!