-
Notifications
You must be signed in to change notification settings - Fork 394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adjust bin values for text features in RFF #99
Changes from 18 commits
da5ba89
1d90e8f
3b6c45f
eea1cf2
b958887
a54131b
dae829d
d6ad74f
d9dee94
1bb28c6
441ab06
546fcfc
13cb288
c47e9bc
e8d616b
e133bb4
20a9c40
e39dad6
1c6d359
5e2857c
a2ffd3a
3c35a13
2f4f77a
6187e55
7ff99fe
bdc0608
dfc1a40
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,18 +35,36 @@ import com.twitter.algebird.Monoid | |
/** | ||
* Class used to get summaries of prepared features to determine distribution binning strategy | ||
* | ||
* @param min minimum value seen | ||
* @param max maximum value seen | ||
* @param min minimum value seen for double, minimum number of tokens in one text for text | ||
* @param max maximum value seen for double, maximum number of tokens in one text for text | ||
* @param sum sum of values for double, total number of tokens for text | ||
* @param count number of doubles for double, number of texts for text | ||
*/ | ||
private[op] case class Summary(min: Double, max: Double) | ||
private[op] case class Summary(min: Double, max: Double, sum: Double, count: Double) { | ||
/** | ||
* to calculate the hashing size for RFF (compare js distance) for Text | ||
* @param bins default bins of RFF | ||
* @return | ||
*/ | ||
def getBinsText(bins: Int): Int = bins | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The user should be able to pass the formula for how to compute the text feature bin size into RawFeatureFilter as a parameter. We should provide a default which for now can be no formula but later can be updated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
// Todo: find out the right formula example: | ||
// val AvgBinValue = 5000 | ||
// val MaxTokenLowerLimit = 10 | ||
// { | ||
// To catch categoricals | ||
// if (max < MaxTokenLowerLimit) bins | ||
// else math.min(math.max(bins, sum / AvgBinValue), MaxBins).intValue() | ||
// } | ||
} | ||
|
||
private[op] case object Summary { | ||
|
||
val empty: Summary = Summary(Double.PositiveInfinity, Double.NegativeInfinity) | ||
val empty: Summary = Summary(Double.PositiveInfinity, Double.NegativeInfinity, 0.0, 0.0) | ||
|
||
implicit val monoid: Monoid[Summary] = new Monoid[Summary] { | ||
override def zero = empty | ||
override def plus(l: Summary, r: Summary) = Summary(math.min(l.min, r.min), math.max(l.max, r.max)) | ||
override def plus(l: Summary, r: Summary) = Summary(math.min(l.min, r.min), math.max(l.max, r.max), | ||
l.sum + r.sum, l.count + r.count) | ||
} | ||
|
||
/** | ||
|
@@ -55,8 +73,8 @@ private[op] case object Summary { | |
*/ | ||
def apply(preppedFeature: ProcessedSeq): Summary = { | ||
preppedFeature match { | ||
case Left(v) => Summary(v.size, v.size) | ||
case Right(v) => monoid.sum(v.map(d => Summary(d, d))) | ||
case Left(v) => Summary(v.size, v.size, v.size, 1.0) | ||
case Right(v) => monoid.sum(v.map(d => Summary(d, d, d, 1.0))) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,12 +30,9 @@ | |
|
||
package com.salesforce.op.filters | ||
|
||
import com.salesforce.op.OpParams | ||
import com.salesforce.op.features.{OPFeature, TransientFeature} | ||
import com.salesforce.op.stages.impl.feature.HashAlgorithm | ||
import com.salesforce.op.features.TransientFeature | ||
import com.salesforce.op.test.PassengerSparkFixtureTest | ||
import com.salesforce.op.utils.spark.RichDataset._ | ||
import org.apache.spark.mllib.feature.HashingTF | ||
import com.salesforce.op.testkit.RandomText | ||
import org.junit.runner.RunWith | ||
import org.scalatest.FlatSpec | ||
import org.scalatest.junit.JUnitRunner | ||
|
@@ -50,18 +47,16 @@ class FeatureDistributionTest extends FlatSpec with PassengerSparkFixtureTest wi | |
(true, Left(Seq.empty[String])), (false, Right(Seq(1.0, 3.0, 5.0))) | ||
) | ||
val summary = | ||
Array(Summary(0.0, 1.0), Summary(-1.6, 10.6), Summary(0.0, 3.0), Summary(0.0, 0.0), Summary(1.0, 5.0)) | ||
Array(Summary(0.0, 1.0, 6.0, 10), Summary(-1.6, 10.6, 3.0, 10), | ||
Summary(0.0, 3.0, 7.0, 10), Summary(0.0, 0.0, 5.0, 10), Summary(1.0, 5.0, 10.0, 10)) | ||
val bins = 10 | ||
val hasher: HashingTF = new HashingTF(numFeatures = bins) | ||
.setBinary(false) | ||
.setHashAlgorithm(HashAlgorithm.MurMur3.toString.toLowerCase) | ||
|
||
val featureKeys: Array[FeatureKey] = features.map(f => (f.name, None)) | ||
val processedSeqs: Array[Option[ProcessedSeq]] = values.map { case (isEmpty, processed) => | ||
if (isEmpty) None else Option(processed) | ||
} | ||
val distribs = featureKeys.zip(summary).zip(processedSeqs).map { case ((key, summ), seq) => | ||
FeatureDistribution(key, summ, seq, bins, hasher) | ||
FeatureDistribution(key, summ, seq, bins) | ||
} | ||
distribs.foreach{ d => | ||
d.key shouldBe None | ||
|
@@ -72,30 +67,47 @@ class FeatureDistributionTest extends FlatSpec with PassengerSparkFixtureTest wi | |
distribs(1).nulls shouldBe 1 | ||
distribs(1).distribution.sum shouldBe 0 | ||
distribs(2).distribution.sum shouldBe 2 | ||
distribs(2).summaryInfo should contain theSameElementsAs Array(0.0, 3.0) | ||
distribs(2).summaryInfo should contain theSameElementsAs Array(0.0, 3.0, 7.0, 10.0) | ||
distribs(3).distribution.sum shouldBe 0 | ||
distribs(4).distribution.sum shouldBe 3 | ||
distribs(4).summaryInfo.length shouldBe bins | ||
} | ||
|
||
it should "be correctly created for text features" in { | ||
val features = Array(description, gender) | ||
val values: Array[(Boolean, ProcessedSeq)] = Array( | ||
(false, Left(RandomText.strings(1, 10).take(10000).toSeq.map(_.value.get))) | ||
) | ||
val summary = Array(Summary(1000.0, 50000.0, 70000.0, 10)) | ||
val bins = 100 | ||
val featureKeys: Array[FeatureKey] = features.map(f => (f.name, None)) | ||
val processedSeqs: Array[Option[ProcessedSeq]] = values.map { case (isEmpty, processed) => | ||
if (isEmpty) None else Option(processed) | ||
} | ||
val distribs = featureKeys.zip(summary).zip(processedSeqs).map { case ((key, summ), seq) => | ||
FeatureDistribution(key, summ, seq, bins) | ||
} | ||
|
||
distribs(0).distribution.length shouldBe 100 | ||
distribs(0).distribution.sum shouldBe 10000 | ||
|
||
} | ||
|
||
it should "be correctly created for map features" in { | ||
val features = Array(stringMap, numericMap, booleanMap).map(TransientFeature.apply) | ||
val values: Array[Map[String, ProcessedSeq]] = Array( | ||
Map("A" -> Left(Seq("male", "female"))), | ||
Map("A" -> Right(Seq(1.0)), "B" -> Right(Seq(1.0))), | ||
Map("B" -> Right(Seq(0.0)))) | ||
val summary = Array( | ||
Map("A" -> Summary(0.0, 1.0), "B" -> Summary(0.0, 5.0)), | ||
Map("A" -> Summary(-1.6, 10.6), "B" -> Summary(0.0, 3.0)), | ||
Map("B" -> Summary(0.0, 0.0))) | ||
Map("A" -> Summary(0.0, 2.0, 100.0, 10), "B" -> Summary(0.0, 5.0, 10.0, 10)), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did the max change for just the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is hardcoded, I changed it since a sample of "A" is Seq("male", "female"), number of hashes in the text is already 2, so the max should not be smaller than 2 |
||
Map("A" -> Summary(-1.6, 10.6, 30.0, 10), "B" -> Summary(0.0, 3.0, 11.0, 10)), | ||
Map("B" -> Summary(0.0, 0.0, 0.0, 10))) | ||
val bins = 10 | ||
val hasher: HashingTF = new HashingTF(numFeatures = bins) | ||
.setBinary(false) | ||
.setHashAlgorithm(HashAlgorithm.MurMur3.toString.toLowerCase) | ||
val distribs = features.map(_.name).zip(summary).zip(values).flatMap { case ((name, summaryMaps), valueMaps) => | ||
summaryMaps.map { case (key, summary) => | ||
val featureKey = (name, Option(key)) | ||
FeatureDistribution(featureKey, summary, valueMaps.get(key), bins, hasher) | ||
FeatureDistribution(featureKey, summary, valueMaps.get(key), bins) | ||
} | ||
} | ||
|
||
|
@@ -107,15 +119,15 @@ class FeatureDistributionTest extends FlatSpec with PassengerSparkFixtureTest wi | |
else d.distribution.length shouldBe 2 | ||
} | ||
distribs(0).nulls shouldBe 0 | ||
distribs(0).summaryInfo should contain theSameElementsAs Array(0.0, 1.0) | ||
distribs(0).summaryInfo should contain theSameElementsAs Array(0.0, 2.0, 100.0, 10.0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, why did the max change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this one reads the info from hardcoded 'summaries' above |
||
distribs(1).nulls shouldBe 1 | ||
distribs(0).distribution.sum shouldBe 2 | ||
distribs(1).distribution.sum shouldBe 0 | ||
distribs(2).summaryInfo.length shouldBe bins | ||
distribs(2).distribution.sum shouldBe 1 | ||
distribs(4).distribution(0) shouldBe 1 | ||
distribs(4).distribution(1) shouldBe 0 | ||
distribs(4).summaryInfo.length shouldBe 2 | ||
distribs(4).summaryInfo.length shouldBe 4 | ||
} | ||
|
||
it should "correctly compare fill rates" in { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have to create hasher every time or perhaps we can create it once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the hashing dimension can be different for different features, the minimum number would be
bins
but we can have a shared one with numFeatures = bins, and use that for every case if there is not too many tokens; OR we can create a couple shared hashers with different scales, and choose one based on the scale of token numbers
What do you think? I was assuming creating a hasher for every feature will not be very resource consuming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let see if we can reuse the hashing function without creating
HashingTF
everytime.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and I think we can. See
HashingTF.transform
andHashingTF
object