Skip to content

Commit

Permalink
Time based aggregators (#167)
Browse files Browse the repository at this point in the history
  • Loading branch information
leahmcguire authored and tovbinm committed Oct 31, 2018
1 parent 77f1920 commit 0ad3c28
Show file tree
Hide file tree
Showing 2 changed files with 333 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.aggregators

import com.salesforce.op.features.types._
import com.twitter.algebird.{Monoid, MonoidAggregator}

import scala.reflect.runtime.universe.WeakTypeTag

private[op] abstract class TimeBasedAggregator[T <: FeatureType]
(
compareFun: (Long, Long) => Boolean,
val timeZero: Long
)(implicit val ttag: WeakTypeTag[T]) extends MonoidAggregator[Event[T], (Long, T#Value), T] {

val ftFactory = FeatureTypeFactory[T]()

val monoid: Monoid[(Long, T#Value)] = new Monoid[(Long, T#Value)] {
val zero = timeZero -> FeatureTypeDefaults.default[T].value
def plus(l: (Long, T#Value), r: (Long, T#Value)): (Long, T#Value) = if (compareFun(l._1, r._1)) r else l
}

def prepare(input: Event[T]): (Long, T#Value) = input.date -> input.value.v

def present(reduction: (Long, T#Value)): T = ftFactory.newInstance(reduction._2)
}

/**
* Gives last (most recent) value of feature
* @param ttag feature type tag
* @tparam T type of feature
*/
abstract class LastAggregator[T <: FeatureType](implicit ttag: WeakTypeTag[T]) extends
TimeBasedAggregator(compareFun = (l: Long, r: Long) => l < r, timeZero = 0L)(ttag = ttag)


/**
* Gives the first value of feature
* @param ttag feature type tag
* @tparam T type of feature
*/
abstract class FirstAggregator[T <: FeatureType](implicit ttag: WeakTypeTag[T]) extends
TimeBasedAggregator(compareFun = (l: Long, r: Long) => l >= r, timeZero = Long.MaxValue)(ttag = ttag)


case object LastVector extends LastAggregator[OPVector]
case object FirstVector extends FirstAggregator[OPVector]

case object LastTextList extends LastAggregator[TextList]
case object FirstTextList extends FirstAggregator[TextList]

case object LastDateList extends LastAggregator[DateList]
case object FirstDateList extends FirstAggregator[DateList]

case object LastDateTimeList extends LastAggregator[DateTimeList]
case object FirstDateTimeList extends FirstAggregator[DateTimeList]

case object LastGeolocation extends LastAggregator[Geolocation]
case object FirstGeolocation extends FirstAggregator[Geolocation]

case object LastBase64Map extends LastAggregator[Base64Map]
case object FirstBase64Map extends FirstAggregator[Base64Map]

case object LastBinaryMap extends LastAggregator[BinaryMap]
case object FirstBinaryMap extends FirstAggregator[BinaryMap]

case object LastComboBoxMap extends LastAggregator[ComboBoxMap]
case object FirstComboBoxMap extends FirstAggregator[ComboBoxMap]

case object LastCurrencyMap extends LastAggregator[CurrencyMap]
case object FirstCurrencyMap extends FirstAggregator[CurrencyMap]

case object LastDateMap extends LastAggregator[DateMap]
case object FirstDateMap extends FirstAggregator[DateMap]

case object LastDateTimeMap extends LastAggregator[DateTimeMap]
case object FirstDateTimeMap extends FirstAggregator[DateTimeMap]

case object LastEmailMap extends LastAggregator[EmailMap]
case object FirstEmailMap extends FirstAggregator[EmailMap]

case object LastIDMap extends LastAggregator[IDMap]
case object FirstIDMap extends FirstAggregator[IDMap]

case object LastIntegralMap extends LastAggregator[IntegralMap]
case object FirstIntegralMap extends FirstAggregator[IntegralMap]

case object LastMultiPickListMap extends LastAggregator[MultiPickListMap]
case object FirstMultiPickListMap extends FirstAggregator[MultiPickListMap]

case object LastPercentMap extends LastAggregator[PercentMap]
case object FirstPercentMap extends FirstAggregator[PercentMap]

case object LastPhoneMap extends LastAggregator[PhoneMap]
case object FirstPhoneMap extends FirstAggregator[PhoneMap]

case object LastPickListMap extends LastAggregator[PickListMap]
case object FirstPickListMap extends FirstAggregator[PickListMap]

case object LastRealMap extends LastAggregator[RealMap]
case object FirstRealMap extends FirstAggregator[RealMap]

case object LastTextAreaMap extends LastAggregator[TextAreaMap]
case object FirstTextAreaMap extends FirstAggregator[TextAreaMap]

case object LastTextMap extends LastAggregator[TextMap]
case object FirstTextMap extends FirstAggregator[TextMap]

case object LastURLMap extends LastAggregator[URLMap]
case object FirstURLMap extends FirstAggregator[URLMap]

case object LastCountryMap extends LastAggregator[CountryMap]
case object FirstCountryMap extends FirstAggregator[CountryMap]

case object LastStateMap extends LastAggregator[StateMap]
case object FirstStateMap extends FirstAggregator[StateMap]

case object LastCityMap extends LastAggregator[CityMap]
case object FirstCityMap extends FirstAggregator[CityMap]

case object LastPostalCodeMap extends LastAggregator[PostalCodeMap]
case object FirstPostalCodeMap extends FirstAggregator[PostalCodeMap]

case object LastStreetMap extends LastAggregator[StreetMap]
case object FirstStreetMap extends FirstAggregator[StreetMap]

case object LastGeolocationMap extends LastAggregator[GeolocationMap]
case object FirstGeolocationMap extends FirstAggregator[GeolocationMap]

case object LastBinary extends LastAggregator[Binary]
case object FirstBinary extends FirstAggregator[Binary]

case object LastCurrency extends LastAggregator[Currency]
case object FirstCurrency extends FirstAggregator[Currency]

case object LastDate extends LastAggregator[Date]
case object FirstDate extends FirstAggregator[Date]

case object LastDateTime extends LastAggregator[DateTime]
case object FirstDateTime extends FirstAggregator[DateTime]

case object LastIntegral extends LastAggregator[Integral]
case object FirstIntegral extends FirstAggregator[Integral]

case object LastPercent extends LastAggregator[Percent]
case object FirstPercent extends FirstAggregator[Percent]

case object LastReal extends LastAggregator[Real]
case object FirstReal extends FirstAggregator[Real]

case object LastMultiPickList extends LastAggregator[MultiPickList]
case object FirstMultiPickList extends FirstAggregator[MultiPickList]

case object LastBase64 extends LastAggregator[Base64]
case object FirstBase64 extends FirstAggregator[Base64]

case object LastComboBox extends LastAggregator[ComboBox]
case object FirstComboBox extends FirstAggregator[ComboBox]

case object LastEmail extends LastAggregator[Email]
case object FirstEmail extends FirstAggregator[Email]

case object LastID extends LastAggregator[ID]
case object FirstID extends FirstAggregator[ID]

case object LastPhone extends LastAggregator[Phone]
case object FirstPhone extends FirstAggregator[Phone]

case object LastPickList extends LastAggregator[PickList]
case object FirstPickList extends FirstAggregator[Phone]

case object LastText extends LastAggregator[Text]
case object FirstText extends FirstAggregator[Text]

case object LastTextArea extends LastAggregator[TextArea]
case object FirstTextArea extends FirstAggregator[TextArea]

case object LastURL extends LastAggregator[URL]
case object FirstURL extends FirstAggregator[URL]

case object LastCountry extends LastAggregator[Country]
case object FirstCountry extends FirstAggregator[Country]

case object LastState extends LastAggregator[State]
case object FirstState extends FirstAggregator[State]

case object LastCity extends LastAggregator[City]
case object FirstCity extends FirstAggregator[City]

case object LastPostalCode extends LastAggregator[PostalCode]
case object FirstPostalCode extends FirstAggregator[PostalCode]

case object LastStreet extends LastAggregator[Street]
case object FirstStreet extends FirstAggregator[Street]



Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2017, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice, this
* list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package com.salesforce.op.aggregators

import com.salesforce.op.features.FeatureBuilder
import com.salesforce.op.features.types._
import com.salesforce.op.stages.FeatureGeneratorStage
import com.salesforce.op.test.TestCommon
import org.joda.time.Duration
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class TimeBasedAggregatorTest extends FlatSpec with TestCommon {

private val data = Seq(TimeBasedTest(100L, 1.0, "a", Map("a" -> "a")),
TimeBasedTest(200L, 2.0, "b", Map("b" -> "b")),
TimeBasedTest(300L, 3.0, "c", Map("c" -> "c")),
TimeBasedTest(400L, 4.0, "d", Map("d" -> "d")),
TimeBasedTest(500L, 5.0, "e", Map("e" -> "e")),
TimeBasedTest(600L, 6.0, "f", Map("f" -> "f"))
)

private val timeExt = Option((d: TimeBasedTest) => d.time)

Spec[LastAggregator[_]] should "return the most recent event" in {
val feature = FeatureBuilder.Real[TimeBasedTest].extract(_.real.toRealNN)
.aggregate(LastReal).asPredictor
val aggregator = feature.originStage.asInstanceOf[FeatureGeneratorStage[TimeBasedTest, _]].featureAggregator
val extracted = aggregator.extract(data, timeExt, CutOffTime.NoCutoff())
extracted shouldBe Real(Some(6.0))
}

it should "return the most recent event within the time window" in {
val feature = FeatureBuilder.Text[TimeBasedTest].extract(_.string.toText)
.aggregate(LastText).asResponse
val aggregator = feature.originStage.asInstanceOf[FeatureGeneratorStage[TimeBasedTest, _]].featureAggregator
val extracted = aggregator.extract(data, timeExt, CutOffTime.UnixEpoch(300L),
responseWindow = Option(new Duration(201L)))
extracted shouldBe Text(Some("e"))
}

it should "return the feature type empty value when no events are passed in" in {
val feature = FeatureBuilder.TextMap[TimeBasedTest].extract(_.map.toTextMap)
.aggregate(LastTextMap).asPredictor
val aggregator = feature.originStage.asInstanceOf[FeatureGeneratorStage[TimeBasedTest, _]].featureAggregator
val extracted = aggregator.extract(Seq(), timeExt, CutOffTime.NoCutoff())
extracted shouldBe TextMap.empty
}

Spec[FirstAggregator[_]] should "return the first event" in {
val feature = FeatureBuilder.TextAreaMap[TimeBasedTest].extract(_.map.toTextAreaMap)
.aggregate(FirstTextAreaMap).asResponse
val aggregator = feature.originStage.asInstanceOf[FeatureGeneratorStage[TimeBasedTest, _]].featureAggregator
val extracted = aggregator.extract(data, timeExt, CutOffTime.UnixEpoch(301L))
extracted shouldBe TextAreaMap(Map("d" -> "d"))
}

it should "return the first event within the time window" in {
val feature = FeatureBuilder.Currency[TimeBasedTest].extract(_.real.toCurrency)
.aggregate(FirstCurrency).asPredictor
val aggregator = feature.originStage.asInstanceOf[FeatureGeneratorStage[TimeBasedTest, _]].featureAggregator
val extracted = aggregator.extract(data, timeExt, CutOffTime.UnixEpoch(400L),
predictorWindow = Option(new Duration(201L)))
extracted shouldBe Currency(Some(2.0))
}

it should "return the feature type empty value when no events are passed in" in {
val feature = FeatureBuilder.State[TimeBasedTest].extract(_.string.toState)
.aggregate(FirstState).asPredictor
val aggregator = feature.originStage.asInstanceOf[FeatureGeneratorStage[TimeBasedTest, _]].featureAggregator
val extracted = aggregator.extract(Seq(), timeExt, CutOffTime.NoCutoff())
extracted shouldBe State.empty
}
}

case class TimeBasedTest(time: Long, real: Double, string: String, map: Map[String, String])


0 comments on commit 0ad3c28

Please sign in to comment.