-
Notifications
You must be signed in to change notification settings - Fork 394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Apply DateToUnitCircleTransformer logic in raw feature transformations. #130
Changes from 1 commit
3664d3a
f62c38d
c93d083
6a41399
5afa43c
37f4c41
84f72ae
9d6ea44
bc39bfe
f255f87
e1cc959
0ef87da
3635e24
cb63dd4
724b22e
9582002
2758202
104169e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,9 +30,14 @@ | |
|
||
package com.salesforce.op.filters | ||
|
||
|
||
import java.time.{Instant, OffsetDateTime, ZoneOffset} | ||
import java.time.temporal.WeekFields | ||
import java.util.Locale | ||
|
||
import com.salesforce.op.features.TransientFeature | ||
import com.salesforce.op.features.types._ | ||
import com.salesforce.op.stages.impl.feature.TextTokenizer | ||
import com.salesforce.op.stages.impl.feature.{TextTokenizer, TimePeriod} | ||
import com.salesforce.op.utils.spark.RichRow._ | ||
import com.salesforce.op.utils.text.Language | ||
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} | ||
|
@@ -127,15 +132,19 @@ private[filters] object PreparedFeatures { | |
* @param predictors transient features derived from predictors | ||
* @return set of prepared features | ||
*/ | ||
def apply(row: Row, responses: Array[TransientFeature], predictors: Array[TransientFeature]): PreparedFeatures = { | ||
def apply( | ||
row: Row, | ||
responses: Array[TransientFeature], | ||
predictors: Array[TransientFeature], | ||
timePeriod: Option[TimePeriod]): PreparedFeatures = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. docs for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
val empty: Map[FeatureKey, ProcessedSeq] = Map.empty | ||
val preparedResponses = responses.foldLeft(empty) { case (map, feature) => | ||
val converter = FeatureTypeSparkConverter.fromFeatureTypeName(feature.typeName) | ||
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter)) | ||
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter), timePeriod) | ||
} | ||
val preparedPredictors = predictors.foldLeft(empty) { case (map, feature) => | ||
val converter = FeatureTypeSparkConverter.fromFeatureTypeName(feature.typeName) | ||
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter)) | ||
map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter), timePeriod) | ||
} | ||
|
||
PreparedFeatures(responses = preparedResponses, predictors = preparedPredictors) | ||
|
@@ -149,21 +158,29 @@ private[filters] object PreparedFeatures { | |
* @tparam T type of the feature | ||
* @return tuple containing whether the feature was empty and a sequence of either doubles or strings | ||
*/ | ||
private def prepareFeature[T <: FeatureType](name: String, value: T): Map[FeatureKey, ProcessedSeq] = | ||
private def prepareFeature[T <: FeatureType]( | ||
name: String, | ||
value: T, | ||
timePeriod: Option[TimePeriod]): Map[FeatureKey, ProcessedSeq] = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. docs for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
value match { | ||
case v: Text => v.value | ||
.map(s => Map[FeatureKey, ProcessedSeq]((name, None) -> Left(tokenize(s)))).getOrElse(Map.empty) | ||
case v: Date => v.value.map { timestamp => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder why Text, Date and OPNumeric are handled differently than the other values which require some value to be present There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this is because all of the values inside |
||
Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(prepareDateValue(timestamp, timePeriod)))) | ||
}.getOrElse(Map.empty) | ||
case v: OPNumeric[_] => v.toDouble | ||
.map(d => Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(d)))).getOrElse(Map.empty) | ||
case SomeValue(v: DenseVector) => Map((name, None) -> Right(v.toArray.toSeq)) | ||
case SomeValue(v: SparseVector) => Map((name, None) -> Right(v.indices.map(_.toDouble).toSeq)) | ||
case ft@SomeValue(_) => ft match { | ||
case v: Geolocation => Map((name, None) -> Right(v.value)) | ||
case v: TextList => Map((name, None) -> Left(v.value)) | ||
case v: DateList => Map((name, None) -> Right(v.value.map(_.toDouble))) | ||
case v: DateList => Map((name, None) -> Right(v.value.map(prepareDateValue(_, timePeriod)))) | ||
case v: MultiPickList => Map((name, None) -> Left(v.value.toSeq)) | ||
case v: MultiPickListMap => v.value.map { case (k, e) => (name, Option(k)) -> Left(e.toSeq) } | ||
case v: GeolocationMap => v.value.map{ case (k, e) => (name, Option(k)) -> Right(e) } | ||
case v: DateMap => | ||
v.value.map { case (k, e) => (name, Option(k)) -> Right(Seq(prepareDateValue(e, timePeriod))) } | ||
case v: OPMap[_] => v.value.map { case (k, e) => e match { | ||
case d: Double => (name, Option(k)) -> Right(Seq(d)) | ||
// Do not need to distinguish between string map types, all text is tokenized for distribution calculation | ||
|
@@ -183,4 +200,38 @@ private[filters] object PreparedFeatures { | |
* @return array of string tokens | ||
*/ | ||
private def tokenize(s: String) = TextTokenizer.Analyzer.analyze(s, Language.Unknown) | ||
|
||
private def prepareDateValue(timestamp: Long, timePeriod: Option[TimePeriod]): Double = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we please make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
timePeriod match { | ||
case Some(period) => | ||
val dt = Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC).toOffsetDateTime | ||
val unproj = period match { | ||
case TimePeriod.DayOfMonth => dt.getDayOfMonth.toDouble | ||
case TimePeriod.DayOfWeek => dt.getDayOfWeek.getValue.toDouble | ||
case TimePeriod.DayOfYear => dt.getDayOfYear.toDouble | ||
case TimePeriod.HourOfDay => dt.getHour.toDouble | ||
case TimePeriod.MonthOfYear => dt.getMonthValue.toDouble | ||
case TimePeriod.WeekOfMonth => dt.get(WeekFields.of(Locale.US).weekOfMonth).toDouble | ||
case TimePeriod.WeekOfYear => dt.get(WeekFields.of(Locale.US).weekOfYear).toDouble | ||
} | ||
|
||
unproj % getTimePeriodMaxBins(period) | ||
case None => timestamp.toDouble | ||
} | ||
|
||
/** | ||
* Utility for getting maximum number of bins to use given an input time period. | ||
* | ||
* @param period input time period | ||
* @return maximum number of bins associated to input time period | ||
*/ | ||
private def getTimePeriodMaxBins(period: TimePeriod): Int = period match { | ||
case TimePeriod.DayOfMonth => 31 | ||
case TimePeriod.DayOfWeek => 7 | ||
case TimePeriod.DayOfYear => 366 | ||
case TimePeriod.HourOfDay => 24 | ||
case TimePeriod.MonthOfYear => 12 | ||
case TimePeriod.WeekOfMonth => 5 | ||
case TimePeriod.WeekOfYear => 53 | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ import com.salesforce.op.OpParams | |
import com.salesforce.op.features.types._ | ||
import com.salesforce.op.features.{OPFeature, TransientFeature} | ||
import com.salesforce.op.readers.{DataFrameFieldNames, Reader} | ||
import com.salesforce.op.stages.impl.feature.TimePeriod | ||
import com.salesforce.op.stages.impl.preparators.CorrelationType | ||
import com.salesforce.op.utils.spark.RichRow._ | ||
import com.twitter.algebird.Monoid._ | ||
|
@@ -93,7 +94,8 @@ class RawFeatureFilter[T] | |
val correlationType: CorrelationType = CorrelationType.Pearson, | ||
val jsDivergenceProtectedFeatures: Set[String] = Set.empty, | ||
val protectedFeatures: Set[String] = Set.empty, | ||
val textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula | ||
val textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula, | ||
val timePeriod: Option[TimePeriod] = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. docs for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also, please add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
) extends Serializable { | ||
|
||
assert(bins > 1 && bins <= FeatureDistribution.MaxBins, s"Invalid bin size $bins," + | ||
|
@@ -139,7 +141,7 @@ class RawFeatureFilter[T] | |
(respOut, predOut) | ||
} | ||
val preparedFeatures: RDD[PreparedFeatures] = | ||
data.rdd.map(PreparedFeatures(_, responses, predictors)) | ||
data.rdd.map(PreparedFeatures(_, responses, predictors, timePeriod)) | ||
// Have to use the training summaries do process scoring for comparison | ||
val (responseSummaries, predictorSummaries): (Map[FeatureKey, Summary], Map[FeatureKey, Summary]) = | ||
allFeatureInfo.map(info => info.responseSummaries -> info.predictorSummaries) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,19 +32,24 @@ package com.salesforce.op.filters | |
|
||
import scala.math.round | ||
|
||
import com.salesforce.op.features.{FeatureBuilder, OPFeature, TransientFeature} | ||
import com.salesforce.op.features.types._ | ||
import com.salesforce.op.readers._ | ||
import com.salesforce.op.stages.impl.feature.TimePeriod | ||
import com.salesforce.op.stages.impl.preparators.CorrelationType | ||
import com.salesforce.op.test.TestSparkContext | ||
import com.salesforce.op.test.{Passenger, PassengerSparkFixtureTest} | ||
import com.twitter.algebird.Monoid._ | ||
import com.twitter.algebird.Operators._ | ||
import org.apache.spark.mllib.linalg.{Matrix, Vector} | ||
import org.apache.spark.mllib.stat.Statistics | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.{DataFrame, Row} | ||
import org.junit.runner.RunWith | ||
import org.scalatest.FlatSpec | ||
import org.scalatest.junit.JUnitRunner | ||
|
||
@RunWith(classOf[JUnitRunner]) | ||
class PreparedFeaturesTest extends FlatSpec with TestSparkContext { | ||
class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { | ||
|
||
val responseKey1: FeatureKey = "Response1" -> None | ||
val responseKey2: FeatureKey = "Response2" -> None | ||
|
@@ -157,6 +162,68 @@ class PreparedFeaturesTest extends FlatSpec with TestSparkContext { | |
testCorrMatrix(allResponseKeys2, CorrelationType.Spearman, expected) | ||
} | ||
|
||
it should "correctly transform date features when time period is specified" in { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this test has to be split into separate cases or be a for loop not a copy/paste There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done, there's a single function for running each check. |
||
val dateMap = | ||
FeatureBuilder.DateMap[Passenger].extract(p => Map("DTMap" -> p.getBoarded.toLong).toDateMap).asPredictor | ||
val dateFeatures: Array[OPFeature] = Array(boarded, boardedTime, boardedTimeAsDateTime, dateMap) | ||
val df: DataFrame = dataReader.generateDataFrame(dateFeatures) | ||
|
||
val ppRDD5: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd | ||
.map(PreparedFeatures( | ||
_, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.WeekOfMonth))) | ||
.map(_.predictors.mapValues(_.right.map(_.toList))) | ||
val ppRDD7: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd | ||
.map(PreparedFeatures( | ||
_, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.DayOfWeek))) | ||
.map(_.predictors.mapValues(_.right.map(_.toList))) | ||
val ppRDD12: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd | ||
.map(PreparedFeatures( | ||
_, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.MonthOfYear))) | ||
.map(_.predictors.mapValues(_.right.map(_.toList))) | ||
val ppRDD24: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd | ||
.map(PreparedFeatures( | ||
_, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.HourOfDay))) | ||
.map(_.predictors.mapValues(_.right.map(_.toList))) | ||
val ppRDD31: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd | ||
.map(PreparedFeatures( | ||
_, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.DayOfMonth))) | ||
.map(_.predictors.mapValues(_.right.map(_.toList))) | ||
val ppRDD53: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd | ||
.map(PreparedFeatures( | ||
_, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.WeekOfYear))) | ||
.map(_.predictors.mapValues(_.right.map(_.toList))) | ||
val ppRDD366: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd | ||
.map(PreparedFeatures( | ||
_, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), Option(TimePeriod.DayOfYear))) | ||
.map(_.predictors.mapValues(_.right.map(_.toList))) | ||
val ppRDDNone: RDD[Map[FeatureKey, ProcessedSeq]] = df.rdd | ||
.map(PreparedFeatures( | ||
_, Array.empty[TransientFeature], dateFeatures.map(TransientFeature(_)), None)) | ||
.map(_.predictors.mapValues(_.right.map(_.toList))) | ||
|
||
def createExpected(d: Double): Seq[(FeatureKey, ProcessedSeq)] = Seq( | ||
(boarded.name, None) -> Right(List(d, d)), | ||
(boarded.name, None) -> Right(List(d)), | ||
(boardedTime.name, None) -> Right(List(d)), | ||
(boardedTimeAsDateTime.name, None) -> Right(List(d)), | ||
(dateMap.name, Option("DTMap")) -> Right(List(d))) | ||
|
||
val expected5 = Seq(4.0).map(createExpected) | ||
val expected7 = Seq(0.0).map(createExpected) | ||
val expected12 = Seq(1.0).map(createExpected) | ||
val expected24 = Seq(0.0).map(createExpected) | ||
val expected31 = Seq(18.0).map(createExpected) | ||
val expected53 = Seq(4.0).map(createExpected) | ||
val expected366 = Seq(18.0).map(createExpected) | ||
|
||
ppRDD5.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected5.flatMap(identity(_)) | ||
ppRDD7.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected7.flatMap(identity(_)) | ||
ppRDD12.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected12.flatMap(identity(_)) | ||
ppRDD24.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected24.flatMap(identity(_)) | ||
ppRDD53.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected53.flatMap(identity(_)) | ||
ppRDD366.collect.flatMap(identity(_)).toSet should contain theSameElementsAs expected366.flatMap(identity(_)) | ||
} | ||
|
||
def testCorrMatrix( | ||
responseKeys: Array[FeatureKey], | ||
correlationType: CorrelationType, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please do not use
java.time
. instead use joda time