diff --git a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala index af45e49a15..d0c176f161 100644 --- a/core/src/main/scala/com/salesforce/op/OpWorkflow.scala +++ b/core/src/main/scala/com/salesforce/op/OpWorkflow.scala @@ -34,6 +34,7 @@ import com.salesforce.op.features.OPFeature import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter, Summary} import com.salesforce.op.readers.Reader import com.salesforce.op.stages.OPStage +import com.salesforce.op.stages.impl.feature.TimePeriod import com.salesforce.op.stages.impl.preparators.CorrelationType import com.salesforce.op.stages.impl.selector.ModelSelector import com.salesforce.op.utils.reflection.ReflectionUtils @@ -488,24 +489,27 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { * Add a raw features filter to the workflow to look at fill rates and distributions of raw features and exclude * features that do not meet specifications from modeling DAG * - * @param trainingReader training reader to use in filter if not supplied will fall back to reader specified for - * workflow (note that this reader will take precedence over readers directly input to the - * workflow if both are supplied) - * @param scoringReader scoring reader to use in filter if not supplied will do the checks possible with only - * training data available - * @param bins number of bins to use in estimating feature distributions - * @param minFillRate minimum non-null fraction of instances that a feature should contain - * @param maxFillDifference maximum absolute difference in fill rate between scoring and training data for a feature - * @param maxFillRatioDiff maximum difference in fill ratio (symmetric) between scoring and training data for - * a feature - * @param maxJSDivergence maximum Jensen-Shannon divergence between the training and scoring distributions - * for a feature - * @param protectedFeatures list of features that should never be removed (features that are used to create them will - * also be protected) - * @param textBinsFormula formula to compute the text features bin size. - * Input arguments are [[Summary]] and number of bins to use in computing - * feature distributions (histograms for numerics, hashes for strings). - * Output is the bins for the text features. + * @param trainingReader training reader to use in filter if not supplied will fall back to reader specified for + * workflow (note that this reader will take precedence over readers directly input to the + * workflow if both are supplied) + * @param scoringReader scoring reader to use in filter if not supplied will do the checks possible with only + * training data available + * @param bins number of bins to use in estimating feature distributions + * @param minFillRate minimum non-null fraction of instances that a feature should contain + * @param maxFillDifference maximum absolute difference in fill rate between scoring and training data for a feature + * @param maxFillRatioDiff maximum difference in fill ratio (symmetric) between scoring and training data for + * a feature + * @param maxJSDivergence maximum Jensen-Shannon divergence between the training and scoring distributions + * for a feature + * @param protectedFeatures list of features that should never be removed (features that are used to create them will + * also be protected) + * @param protectedJSFeatures features that are protected from removal by JS divergence check + * @param textBinsFormula formula to compute the text features bin size. + * Input arguments are [[Summary]] and number of bins to use in computing + * feature distributions (histograms for numerics, hashes for strings). + * Output is the bins for the text features. + * @param timePeriod Time period used to apply circulate date transformation for date features, if not + * specified will use numeric feature transformation * @tparam T Type of the data read in */ @Experimental @@ -521,12 +525,15 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { maxCorrelation: Double = 0.95, correlationType: CorrelationType = CorrelationType.Pearson, protectedFeatures: Array[OPFeature] = Array.empty, - textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula + protectedJSFeatures: Array[OPFeature] = Array.empty, + textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula, + timePeriod: Option[TimePeriod] = None ): this.type = { val training = trainingReader.orElse(reader).map(_.asInstanceOf[Reader[T]]) require(training.nonEmpty, "Reader for training data must be provided either in withRawFeatureFilter or directly" + "as the reader for the workflow") val protectedRawFeatures = protectedFeatures.flatMap(_.rawFeatures).map(_.name).toSet + val protectedRawJSFeatures = protectedJSFeatures.flatMap(_.rawFeatures).map(_.name).toSet rawFeatureFilter = Option { new RawFeatureFilter( trainingReader = training.get, @@ -539,8 +546,9 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore { maxCorrelation = maxCorrelation, correlationType = correlationType, protectedFeatures = protectedRawFeatures, - textBinsFormula = textBinsFormula - ) + jsDivergenceProtectedFeatures = protectedRawJSFeatures, + textBinsFormula = textBinsFormula, + timePeriod = timePeriod) } this } diff --git a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala index 142fa02694..6ba44c0034 100644 --- a/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala +++ b/core/src/main/scala/com/salesforce/op/filters/PreparedFeatures.scala @@ -30,9 +30,10 @@ package com.salesforce.op.filters + import com.salesforce.op.features.TransientFeature import com.salesforce.op.features.types._ -import com.salesforce.op.stages.impl.feature.TextTokenizer +import com.salesforce.op.stages.impl.feature.{DateToUnitCircle, TextTokenizer, TimePeriod} import com.salesforce.op.utils.spark.RichRow._ import com.salesforce.op.utils.text.Language import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} @@ -125,17 +126,23 @@ private[filters] object PreparedFeatures { * @param row data frame row * @param responses transient features derived from responses * @param predictors transient features derived from predictors + * @param timePeriod optional time period to use raw feature binning, otherwise standard numeric transformation + * is applied * @return set of prepared features */ - def apply(row: Row, responses: Array[TransientFeature], predictors: Array[TransientFeature]): PreparedFeatures = { + def apply( + row: Row, + responses: Array[TransientFeature], + predictors: Array[TransientFeature], + timePeriod: Option[TimePeriod]): PreparedFeatures = { val empty: Map[FeatureKey, ProcessedSeq] = Map.empty val preparedResponses = responses.foldLeft(empty) { case (map, feature) => val converter = FeatureTypeSparkConverter.fromFeatureTypeName(feature.typeName) - map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter)) + map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter), timePeriod) } val preparedPredictors = predictors.foldLeft(empty) { case (map, feature) => val converter = FeatureTypeSparkConverter.fromFeatureTypeName(feature.typeName) - map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter)) + map ++ prepareFeature(feature.name, row.getFeatureType(feature)(converter), timePeriod) } PreparedFeatures(responses = preparedResponses, predictors = preparedPredictors) @@ -146,24 +153,30 @@ private[filters] object PreparedFeatures { * * @param name feature name * @param value feature value + * @param timePeriod optional time period to use raw feature binning, otherwise standard numeric transformation + * is applied * @tparam T type of the feature * @return tuple containing whether the feature was empty and a sequence of either doubles or strings */ - private def prepareFeature[T <: FeatureType](name: String, value: T): Map[FeatureKey, ProcessedSeq] = + private def prepareFeature[T <: FeatureType]( + name: String, + value: T, + timePeriod: Option[TimePeriod]): Map[FeatureKey, ProcessedSeq] = value match { - case v: Text => v.value - .map(s => Map[FeatureKey, ProcessedSeq]((name, None) -> Left(tokenize(s)))).getOrElse(Map.empty) - case v: OPNumeric[_] => v.toDouble - .map(d => Map[FeatureKey, ProcessedSeq]((name, None) -> Right(Seq(d)))).getOrElse(Map.empty) case SomeValue(v: DenseVector) => Map((name, None) -> Right(v.toArray.toSeq)) case SomeValue(v: SparseVector) => Map((name, None) -> Right(v.indices.map(_.toDouble).toSeq)) case ft@SomeValue(_) => ft match { + case v: Text => Map((name, None) -> Left(v.value.toSeq.flatMap(tokenize))) + case v: Date => Map((name, None) -> Right(v.value.map(prepareDateValue(_, timePeriod)).toSeq)) + case v: OPNumeric[_] => Map((name, None) -> Right(v.toDouble.toSeq)) case v: Geolocation => Map((name, None) -> Right(v.value)) case v: TextList => Map((name, None) -> Left(v.value)) - case v: DateList => Map((name, None) -> Right(v.value.map(_.toDouble))) + case v: DateList => Map((name, None) -> Right(v.value.map(prepareDateValue(_, timePeriod)))) case v: MultiPickList => Map((name, None) -> Left(v.value.toSeq)) case v: MultiPickListMap => v.value.map { case (k, e) => (name, Option(k)) -> Left(e.toSeq) } case v: GeolocationMap => v.value.map{ case (k, e) => (name, Option(k)) -> Right(e) } + case v: DateMap => + v.value.map { case (k, e) => (name, Option(k)) -> Right(Seq(prepareDateValue(e, timePeriod))) } case v: OPMap[_] => v.value.map { case (k, e) => e match { case d: Double => (name, Option(k)) -> Right(Seq(d)) // Do not need to distinguish between string map types, all text is tokenized for distribution calculation @@ -183,4 +196,10 @@ private[filters] object PreparedFeatures { * @return array of string tokens */ private def tokenize(s: String) = TextTokenizer.Analyzer.analyze(s, Language.Unknown) + + private def prepareDateValue(timestamp: Long, timePeriod: Option[TimePeriod]): Double = + timePeriod match { + case Some(period) => DateToUnitCircle.convertToBin(timestamp, period) + case None => timestamp.toDouble + } } diff --git a/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala b/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala index 2bd190755e..7862e9353d 100644 --- a/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala +++ b/core/src/main/scala/com/salesforce/op/filters/RawFeatureFilter.scala @@ -34,6 +34,7 @@ import com.salesforce.op.OpParams import com.salesforce.op.features.types._ import com.salesforce.op.features.{OPFeature, TransientFeature} import com.salesforce.op.readers.{DataFrameFieldNames, Reader} +import com.salesforce.op.stages.impl.feature.TimePeriod import com.salesforce.op.stages.impl.preparators.CorrelationType import com.salesforce.op.utils.spark.RichRow._ import com.twitter.algebird.Monoid._ @@ -78,6 +79,8 @@ import scala.util.Failure * Input arguments are [[Summary]] and number of bins to use in computing feature * distributions (histograms for numerics, hashes for strings). * Output is the bins for the text features. + * @param timePeriod Time period used to apply circulate date transformation for date features, if + * not specified will use regular numeric feature transformation * @tparam T datatype of the reader */ class RawFeatureFilter[T] @@ -93,7 +96,8 @@ class RawFeatureFilter[T] val correlationType: CorrelationType = CorrelationType.Pearson, val jsDivergenceProtectedFeatures: Set[String] = Set.empty, val protectedFeatures: Set[String] = Set.empty, - val textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula + val textBinsFormula: (Summary, Int) => Int = RawFeatureFilter.textBinsFormula, + val timePeriod: Option[TimePeriod] = None ) extends Serializable { assert(bins > 1 && bins <= FeatureDistribution.MaxBins, s"Invalid bin size $bins," + @@ -139,7 +143,7 @@ class RawFeatureFilter[T] (respOut, predOut) } val preparedFeatures: RDD[PreparedFeatures] = - data.rdd.map(PreparedFeatures(_, responses, predictors)) + data.rdd.map(PreparedFeatures(_, responses, predictors, timePeriod)) // Have to use the training summaries do process scoring for comparison val (responseSummaries, predictorSummaries): (Map[FeatureKey, Summary], Map[FeatureKey, Summary]) = allFeatureInfo.map(info => info.responseSummaries -> info.predictorSummaries) diff --git a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala index d995cc010d..58e678558d 100644 --- a/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala +++ b/core/src/main/scala/com/salesforce/op/stages/impl/feature/DateToUnitCircleTransformer.scala @@ -35,7 +35,7 @@ import com.salesforce.op.utils.spark.OpVectorMetadata import com.salesforce.op.{FeatureHistory, UID} import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param.{Param, Params} -import org.joda.time.{DateTime => JDateTime} +import org.joda.time.{DateTime => JDateTime, DateTimeZone} import scala.reflect.runtime.universe.TypeTag @@ -104,19 +104,27 @@ private[op] object DateToUnitCircle { def metadataValues(timePeriod: TimePeriod): Seq[String] = Seq(s"x_$timePeriod", s"y_$timePeriod") - def convertToRandians(timestamp: Option[Long], timePeriodDesired: TimePeriod): Array[Double] = { - val datetime: Option[JDateTime] = timestamp.map(new JDateTime(_)) - val (timePeriod, periodSize) = timePeriodDesired match { - case TimePeriod.DayOfMonth => (datetime.map(_.dayOfMonth().get() - 1), 31) - case TimePeriod.DayOfWeek => (datetime.map(_.dayOfWeek().get() - 1), 7) - case TimePeriod.DayOfYear => (datetime.map(_.dayOfYear().get() - 1), 366) - case TimePeriod.HourOfDay => (datetime.map(_.hourOfDay().get()), 24) - case TimePeriod.MonthOfYear => (datetime.map(_.monthOfYear().get() - 1), 12) - case TimePeriod.WeekOfMonth => ( - datetime.map(x => x.weekOfWeekyear().get() - x.withDayOfMonth(1).weekOfWeekyear().get()), 6) - case TimePeriod.WeekOfYear => (datetime.map(_.weekOfWeekyear().get() - 1), 53) + def convertToBin(timestamp: Long, timePeriodDesired: TimePeriod): Double = + getPeriodWithSize(timestamp, timePeriodDesired)._1 + + def convertToRandians(timestamp: Option[Long], timePeriodDesired: TimePeriod): Array[Double] = + timestamp.map { ts => + val (timePeriod, periodSize) = getPeriodWithSize(ts, timePeriodDesired) + val radians = (2 * math.Pi * timePeriod) / periodSize + Array(math.cos(radians), math.sin(radians)) + }.getOrElse(Array(0.0, 0.0)) + + private def getPeriodWithSize(timestamp: Long, timePeriod: TimePeriod): (Double, Int) = { + val dt = new JDateTime(timestamp).withZone(DateTimeZone.UTC) + timePeriod match { + case TimePeriod.DayOfMonth => (dt.dayOfMonth.get.toDouble - 1, 31) + case TimePeriod.DayOfWeek => (dt.dayOfWeek.get.toDouble - 1, 7) + case TimePeriod.DayOfYear => (dt.dayOfYear.get.toDouble - 1, 366) + case TimePeriod.HourOfDay => (dt.hourOfDay.get.toDouble, 24) + case TimePeriod.MonthOfYear => (dt.monthOfYear.get.toDouble - 1, 12) + case TimePeriod.WeekOfMonth => + ((dt.weekOfWeekyear.get - dt.withDayOfMonth(1).weekOfWeekyear.get).toDouble, 6) + case TimePeriod.WeekOfYear => (dt.weekOfWeekyear.get.toDouble - 1, 53) } - val radians = timePeriod.map(2 * math.Pi * _ / periodSize) - radians.map(r => Array(math.cos(r), math.sin(r))).getOrElse(Array(0.0, 0.0)) } } diff --git a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala index 6ae18b080b..2c53192c3f 100644 --- a/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala +++ b/core/src/test/scala/com/salesforce/op/filters/PreparedFeaturesTest.scala @@ -30,21 +30,22 @@ package com.salesforce.op.filters -import scala.math.round - +import com.salesforce.op.features.types._ +import com.salesforce.op.features.{FeatureBuilder, OPFeature, TransientFeature} +import com.salesforce.op.stages.impl.feature.TimePeriod import com.salesforce.op.stages.impl.preparators.CorrelationType -import com.salesforce.op.test.TestSparkContext +import com.salesforce.op.test.{Passenger, PassengerSparkFixtureTest} import com.twitter.algebird.Monoid._ import com.twitter.algebird.Operators._ -import org.apache.spark.mllib.linalg.{Matrix, Vector} import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD +import org.apache.spark.sql.DataFrame import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) -class PreparedFeaturesTest extends FlatSpec with TestSparkContext { +class PreparedFeaturesTest extends FlatSpec with PassengerSparkFixtureTest { val responseKey1: FeatureKey = "Response1" -> None val responseKey2: FeatureKey = "Response2" -> None @@ -72,6 +73,7 @@ class PreparedFeaturesTest extends FlatSpec with TestSparkContext { val allPredictorKeys1 = Array(predictorKey1, predictorKey2A, predictorKey2B) val allPredictorKeys2 = Array(predictorKey1) + Spec[PreparedFeatures] should "produce correct summaries" in { val (responseSummaries1, predictorSummaries1) = preparedFeatures1.summaries val (responseSummaries2, predictorSummaries2) = preparedFeatures2.summaries @@ -157,13 +159,56 @@ class PreparedFeaturesTest extends FlatSpec with TestSparkContext { testCorrMatrix(allResponseKeys2, CorrelationType.Spearman, expected) } + it should "transform dates for each period" in { + val expectedBins = Map( + TimePeriod.DayOfMonth -> 17.0, + TimePeriod.DayOfWeek -> 6.0, + TimePeriod.DayOfYear -> 17.0, + TimePeriod.HourOfDay -> 0.0, + TimePeriod.MonthOfYear -> 0.0, + TimePeriod.WeekOfMonth -> 2.0, + TimePeriod.WeekOfYear -> 2.0 + ) + expectedBins.keys should contain theSameElementsAs TimePeriod.values + + val dateMap = FeatureBuilder.DateMap[Passenger] + .extract(p => Map("DTMap" -> p.getBoarded.toLong).toDateMap).asPredictor + + val dateFeatures: Array[OPFeature] = Array(boarded, boardedTime, boardedTimeAsDateTime, dateMap) + val dateDataFrame: DataFrame = dataReader.generateDataFrame(dateFeatures).persist() + + for { + (period, expectedBin) <- expectedBins + } { + def createExpectedDateMap(d: Double, aggregates: Int): Map[FeatureKey, ProcessedSeq] = Map( + (boarded.name, None) -> Right((0 until aggregates).map(_ => d).toList), + (boardedTime.name, None) -> Right(List(d)), + (boardedTimeAsDateTime.name, None) -> Right(List(d)), + (dateMap.name, Option("DTMap")) -> Right(List(d))) + + val res = dateDataFrame.rdd + .map(PreparedFeatures(_, Array.empty, dateFeatures.map(TransientFeature(_)), Option(period))) + .map(_.predictors.mapValues(_.right.map(_.toList))) + .collect() + + val expectedResults: Seq[Map[FeatureKey, ProcessedSeq]] = + // The first observation is expected to be aggregated twice + Seq(createExpectedDateMap(expectedBin, 2)) ++ + Seq.fill(4)(expectedBin).map(createExpectedDateMap(_, 1)) ++ + Seq(Map[FeatureKey, ProcessedSeq]()) + + withClue(s"Computed bin for $period period does not match:\n") { + res should contain theSameElementsAs expectedResults + } + } + } + def testCorrMatrix( responseKeys: Array[FeatureKey], correlationType: CorrelationType, expectedResult: Seq[Array[Double]] ): Unit = { - val corrRDD = - sc.parallelize(allPreparedFeatures.map(_.getNullLabelLeakageVector(responseKeys, allPredictorKeys1))) + val corrRDD = sc.parallelize(allPreparedFeatures.map(_.getNullLabelLeakageVector(responseKeys, allPredictorKeys1))) val corrMatrix = Statistics.corr(corrRDD, correlationType.sparkName) corrMatrix.colIter.zipWithIndex.map { case(vec, idx) =>