-
Notifications
You must be signed in to change notification settings - Fork 393
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Apply DateToUnitCircleTransformer logic in raw feature transformations. #130
Changes from all commits
3664d3a
f62c38d
c93d083
6a41399
5afa43c
37f4c41
84f72ae
9d6ea44
bc39bfe
f255f87
e1cc959
0ef87da
3635e24
cb63dd4
724b22e
9582002
2758202
104169e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,9 +30,10 @@ | |
|
||
package com.salesforce.op.filters | ||
|
||
|
||
import com.salesforce.op.features.TransientFeature | ||
import com.salesforce.op.features.types._ | ||
import com.salesforce.op.stages.impl.feature.TextTokenizer | ||
import com.salesforce.op.stages.impl.feature.{DateToUnitCircle, TextTokenizer, TimePeriod} | ||
import com.salesforce.op.utils.spark.RichRow._ | ||
import com.salesforce.op.utils.text.Language | ||
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} | ||
|
@@ -125,17 +126,23 @@ private[filters] object PreparedFeatures { | |
* @param row data frame row | ||
* @param responses transient features derived from responses | ||
* @param predictors transient features derived from predictors | ||
* @param timePeriod optional time period to use raw feature binning, otherwise standard numeric transformation | ||
* is applied | ||
* @return set of prepared features | ||
*/ | ||
def apply(row: Row, responses: Array[TransientFeature], predictors: Array[TransientFeature]): PreparedFeatures = { | ||
def apply( | ||
row: Row, | ||
responses: Array[TransientFeature], | ||
predictors: Array[TransientFeature], | ||
timePeriod: Option[TimePeriod]): PreparedFeatures = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. docs for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
val empty: Map[FeatureKey, ProcessedSeq] = Map.empty | ||
val preparedResponses = responses.foldLeft(empty) { case (map, feature) => | ||
val converter = FeatureTypeSparkConverter.fromFeatureTypeName(feature.typeName) | ||
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter)) | ||
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter), timePeriod) | ||
} | ||
val preparedPredictors = predictors.foldLeft(empty) { case (map, feature) => | ||
val converter = FeatureTypeSparkConverter.fromFeatureTypeName(feature.typeName) | ||
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter)) | ||
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter), timePeriod) | ||
} | ||
|
||
PreparedFeatures(responses = preparedResponses, predictors = preparedPredictors) | ||
|
@@ -146,24 +153,30 @@ private[filters] object PreparedFeatures { | |
* | ||
* @param name feature name | ||
* @param value feature value | ||
* @param timePeriod optional time period to use raw feature binning, otherwise standard numeric transformation | ||
* is applied | ||
* @tparam T type of the feature | ||
* @return tuple containing whether the feature was empty and a sequence of either doubles or strings | ||
*/ | ||
private def prepareFeature[T <: FeatureType](name: String, value: T): Map[FeatureKey, ProcessedSeq] = | ||
private def prepareFeature[T <: FeatureType]( | ||
name: String, | ||
value: T, | ||
timePeriod: Option[TimePeriod]): Map[FeatureKey, ProcessedSeq] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. docs for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
value match { | ||
case v: Text => v.value | ||
.map(s => Map[FeatureKey, ProcessedSeq]((name, None) -> Left(tokenize(s)))).getOrElse(Map.empty) | ||
case v: OPNumeric[_] => v.toDouble | ||
.map(d => Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(d)))).getOrElse(Map.empty) | ||
case SomeValue(v: DenseVector) => Map((name, None) -> Right(v.toArray.toSeq)) | ||
case SomeValue(v: SparseVector) => Map((name, None) -> Right(v.indices.map(_.toDouble).toSeq)) | ||
case ft@SomeValue(_) => ft match { | ||
case v: Text => Map((name, None) -> Left(v.value.toSeq.flatMap(tokenize))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thank you! @marcovivero |
||
case v: Date => Map((name, None) -> Right(v.value.map(prepareDateValue(_, timePeriod)).toSeq)) | ||
case v: OPNumeric[_] => Map((name, None) -> Right(v.toDouble.toSeq)) | ||
case v: Geolocation => Map((name, None) -> Right(v.value)) | ||
case v: TextList => Map((name, None) -> Left(v.value)) | ||
case v: DateList => Map((name, None) -> Right(v.value.map(_.toDouble))) | ||
case v: DateList => Map((name, None) -> Right(v.value.map(prepareDateValue(_, timePeriod)))) | ||
case v: MultiPickList => Map((name, None) -> Left(v.value.toSeq)) | ||
case v: MultiPickListMap => v.value.map { case (k, e) => (name, Option(k)) -> Left(e.toSeq) } | ||
case v: GeolocationMap => v.value.map{ case (k, e) => (name, Option(k)) -> Right(e) } | ||
case v: DateMap => | ||
v.value.map { case (k, e) => (name, Option(k)) -> Right(Seq(prepareDateValue(e, timePeriod))) } | ||
case v: OPMap[_] => v.value.map { case (k, e) => e match { | ||
case d: Double => (name, Option(k)) -> Right(Seq(d)) | ||
// Do not need to distinguish between string map types, all text is tokenized for distribution calculation | ||
|
@@ -183,4 +196,10 @@ private[filters] object PreparedFeatures { | |
* @return array of string tokens | ||
*/ | ||
private def tokenize(s: String) = TextTokenizer.Analyzer.analyze(s, Language.Unknown) | ||
|
||
private def prepareDateValue(timestamp: Long, timePeriod: Option[TimePeriod]): Double = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we please make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
timePeriod match { | ||
case Some(period) => DateToUnitCircle.convertToBin(timestamp, period) | ||
case None => timestamp.toDouble | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ import com.salesforce.op.OpParams | |
import com.salesforce.op.features.types._ | ||
import com.salesforce.op.features.{OPFeature, TransientFeature} | ||
import com.salesforce.op.readers.{DataFrameFieldNames, Reader} | ||
import com.salesforce.op.stages.impl.feature.TimePeriod | ||
import com.salesforce.op.stages.impl.preparators.CorrelationType | ||
import com.salesforce.op.utils.spark.RichRow._ | ||
import com.twitter.algebird.Monoid._ | ||
|
@@ -78,6 +79,8 @@ import scala.util.Failure | |
* Input arguments are [[Summary]] and number of bins to use in computing feature | ||
* distributions (histograms for numerics, hashes for strings). | ||
* Output is the bins for the text features. | ||
* @param timePeriod Time period used to apply circulate date transformation for date features, if | ||
* not specified will use regular numeric feature transformation | ||
* @tparam T datatype of the reader | ||
*/ | ||
class RawFeatureFilter[T] | ||
|
@@ -93,7 +96,8 @@ class RawFeatureFilter[T] | |
val correlationType: CorrelationType = CorrelationType.Pearson, | ||
val jsDivergenceProtectedFeatures: Set[String] = Set.empty, | ||
val protectedFeatures: Set[String] = Set.empty, | ||
val textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula | ||
val textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula, | ||
val timePeriod: Option[TimePeriod] = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. docs for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also, please add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
) extends Serializable { | ||
|
||
assert(bins > 1 && bins <= FeatureDistribution.MaxBins, s"Invalid bin size $bins," + | ||
|
@@ -139,7 +143,7 @@ class RawFeatureFilter[T] | |
(respOut, predOut) | ||
} | ||
val preparedFeatures: RDD[PreparedFeatures] = | ||
data.rdd.map(PreparedFeatures(_, responses, predictors)) | ||
data.rdd.map(PreparedFeatures(_, responses, predictors, timePeriod)) | ||
// Have to use the training summaries do process scoring for comparison | ||
val (responseSummaries, predictorSummaries): (Map[FeatureKey, Summary], Map[FeatureKey, Summary]) = | ||
allFeatureInfo.map(info => info.responseSummaries -> info.predictorSummaries) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protectedJSFeatures
different fromprotectedFeatures
?protectedJSFeatures
paramThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was already enabled in
RawFeatureFilter
, it protects features from JS divergence check. I didn't see itOpWorkflow.withRawFeatureFilter
, just making sure it's here as well.