Skip to content

Commit

Permalink
Return scoring feature distributions from RawFeatureFilter (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
tovbinm committed Nov 3, 2018
1 parent b54fbfb commit 6ad1169
Show file tree
Hide file tree
Showing 21 changed files with 489 additions and 232 deletions.
34 changes: 18 additions & 16 deletions core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
package com.salesforce.op

import com.salesforce.op.features.OPFeature
import com.salesforce.op.filters.{FeatureDistribution, RawFeatureFilter, Summary}
import com.salesforce.op.filters.{FeatureDistribution, FilteredRawData, RawFeatureFilter, Summary}
import com.salesforce.op.readers.Reader
import com.salesforce.op.stages.OPStage
import com.salesforce.op.stages.impl.feature.TimePeriod
Expand Down Expand Up @@ -129,13 +129,13 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
initialStages.foreach { stg =>
val inFeatures = stg.getInputFeatures()
val blacklistRemoved = inFeatures
.filterNot{ f => allBlacklisted.exists(bl => bl.sameOrigin(f)) }
.map{ f => if (f.isRaw) f.withDistributions(distributions.collect{ case d if d.name == f.name => d }) else f }
.filterNot { f => allBlacklisted.exists(bl => bl.sameOrigin(f)) }
.map { f =>
if (f.isRaw) f.withDistributions(distributions.collect { case d if d.name == f.name => d }) else f
}
val inputsChanged = blacklistRemoved.map{ f => allUpdated.find(u => u.sameOrigin(f)).getOrElse(f) }
val oldOutput = stg.getOutput()
Try{
stg.setInputFeatureArray(inputsChanged).setOutputFeatureName(oldOutput.name).getOutput()
} match {
Try(stg.setInputFeatureArray(inputsChanged).setOutputFeatureName(oldOutput.name).getOutput()) match {
case Success(out) => allUpdated += out
case Failure(e) =>
if (initialResultFeatures.contains(oldOutput)) throw new RuntimeException(
Expand Down Expand Up @@ -221,23 +221,25 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
*/
protected def generateRawData()(implicit spark: SparkSession): DataFrame = {
(reader, rawFeatureFilter) match {
case (None, None) => throw new IllegalArgumentException("Data reader must be set either directly on the" +
" workflow or through the RawFeatureFilter")
case (None, None) => throw new IllegalArgumentException(
"Data reader must be set either directly on the workflow or through the RawFeatureFilter")
case (Some(r), None) =>
checkReadersAndFeatures()
r.generateDataFrame(rawFeatures, parameters).persist()
case (rd, Some(rf)) =>
rd match {
case None => setReader(rf.trainingReader)
case Some(r) => if (r != rf.trainingReader) log.warn("Workflow data reader and RawFeatureFilter training" +
" reader do not match! The RawFeatureFilter training reader will be used to generate the data for training")
case Some(r) => if (r != rf.trainingReader) log.warn(
"Workflow data reader and RawFeatureFilter training reader do not match! " +
"The RawFeatureFilter training reader will be used to generate the data for training")
}
checkReadersAndFeatures()
val filteredRawData = rf.generateFilteredRaw(rawFeatures, parameters)
setRawFeatureDistributions(filteredRawData.featureDistributions.toArray)
setBlacklist(filteredRawData.featuresToDrop, filteredRawData.featureDistributions)
setBlacklistMapKeys(filteredRawData.mapKeysToDrop)
filteredRawData.cleanedData
val FilteredRawData(cleanedData, featuresToDrop, mapKeysToDrop, featureDistributions) =
rf.generateFilteredRaw(rawFeatures, parameters)
setRawFeatureDistributions(featureDistributions.toArray)
setBlacklist(featuresToDrop, featureDistributions)
setBlacklistMapKeys(mapKeysToDrop)
cleanedData
}
}

Expand Down Expand Up @@ -537,7 +539,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
rawFeatureFilter = Option {
new RawFeatureFilter(
trainingReader = training.get,
scoreReader = scoringReader,
scoringReader = scoringReader,
bins = bins,
minFill = minFillRate,
maxFillDifference = maxFillDifference,
Expand Down
18 changes: 16 additions & 2 deletions core/src/main/scala/com/salesforce/op/OpWorkflowCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ package com.salesforce.op

import com.salesforce.op.utils.stages.FitStagesUtil._
import com.salesforce.op.utils.stages.FitStagesUtil
import com.salesforce.op.features.OPFeature
import com.salesforce.op.features.{FeatureDistributionType, OPFeature}
import com.salesforce.op.features.types.FeatureType
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.readers.{CustomReader, Reader, ReaderKey}
Expand Down Expand Up @@ -195,11 +195,25 @@ private[op] trait OpWorkflowCore {
final def getParameters(): OpParams = parameters

/**
* Get raw feature distribution information computed during raw feature filter
* Get raw feature distribution information computed on training and scoring data during raw feature filter
* @return sequence of feature distribution information
*/
final def getRawFeatureDistributions(): Array[FeatureDistribution] = rawFeatureDistributions

/**
* Get raw feature distribution information computed on training data during raw feature filter
* @return sequence of feature distribution information
*/
final def getRawTrainingFeatureDistributions(): Array[FeatureDistribution] =
rawFeatureDistributions.filter(_.`type` == FeatureDistributionType.Training)

/**
* Get raw feature distribution information computed on scoring data during raw feature filter
* @return sequence of feature distribution information
*/
final def getRawScoringFeatureDistributions(): Array[FeatureDistribution] =
rawFeatureDistributions.filter(_.`type` == FeatureDistributionType.Scoring)

/**
* Determine if any of the raw features do not have a matching reader
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ package com.salesforce.op

import com.salesforce.op.OpWorkflowModelReadWriteShared.FieldNames._
import com.salesforce.op.features.{FeatureJsonHelper, OPFeature, TransientFeature}
import com.salesforce.op.stages.{OpPipelineStageReader, _}
import OpPipelineStageReadWriteShared._
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.utils.json.JsonUtils
import com.salesforce.op.stages.OpPipelineStageReadWriteShared._
import com.salesforce.op.stages.{OpPipelineStageReader, _}
import org.apache.spark.ml.util.MLReader
import org.json4s.JsonAST.{JArray, JNothing, JValue}
import org.json4s.jackson.JsonMethods.parse
Expand Down Expand Up @@ -158,7 +157,7 @@ class OpWorkflowModelReader(val workflow: OpWorkflow) extends MLReader[OpWorkflo
private def resolveRawFeatureDistributions(json: JValue): Try[Array[FeatureDistribution]] = {
if ((json \ RawFeatureDistributions.entryName) != JNothing) { // for backwards compatibility
val distString = (json \ RawFeatureDistributions.entryName).extract[String]
JsonUtils.fromString[Array[FeatureDistribution]](distString)
FeatureDistribution.fromJson(distString)
} else {
Success(Array.empty[FeatureDistribution])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
package com.salesforce.op

import com.salesforce.op.features.FeatureJsonHelper
import com.salesforce.op.filters.FeatureDistribution
import com.salesforce.op.stages.{OpPipelineStageBase, OpPipelineStageWriter}
import com.salesforce.op.utils.json.JsonUtils
import enumeratum._
import org.apache.hadoop.fs.Path
import org.apache.spark.ml.util.MLWriter
Expand Down Expand Up @@ -81,8 +81,7 @@ class OpWorkflowModelWriter(val model: OpWorkflowModel) extends MLWriter {
(FN.AllFeatures.entryName -> allFeaturesJArray) ~
(FN.Parameters.entryName -> model.parameters.toJson(pretty = false)) ~
(FN.TrainParameters.entryName -> model.trainingParams.toJson(pretty = false)) ~
(FN.RawFeatureDistributions.entryName -> JsonUtils.toJsonString(model.getRawFeatureDistributions(),
pretty = false))
(FN.RawFeatureDistributions.entryName -> FeatureDistribution.toJson(model.getRawFeatureDistributions()))
}

private def resultFeaturesJArray(): JArray =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,20 @@ package com.salesforce.op.filters
* Contains all feature distribution summaries and null label-leakage correlations used to
* determine dropped features in [[RawFeatureFilter]].
*
* @param responseSummaries response summaries
* @param responseDistributions response distributions
* @param predictorSummaries predictor summaries
* @param responseSummaries response summaries
* @param responseDistributions response distributions
* @param predictorSummaries predictor summaries
* @param predictorDistributions predictor distributions
* @param correlationInfo null label-leakage correlation map
* 1st level keys correspond to response keys
* 2nd level keys correspond to predictor keys with values being null-label leakage corr. value
* @param correlationInfo null label-leakage correlation map
* 1st level keys correspond to response keys
* 2nd level keys correspond to predictor keys with values being
* null-label leakage corr. value
*/
private[op] case class AllFeatureInformation(
responseSummaries: Map[FeatureKey, Summary],
responseDistributions: Array[FeatureDistribution],
predictorSummaries: Map[FeatureKey, Summary],
predictorDistributions: Array[FeatureDistribution],
correlationInfo: Map[FeatureKey, Map[FeatureKey, Double]])
private[op] case class AllFeatureInformation
(
responseSummaries: Map[FeatureKey, Summary],
responseDistributions: Array[FeatureDistribution],
predictorSummaries: Map[FeatureKey, Summary],
predictorDistributions: Array[FeatureDistribution],
correlationInfo: Map[FeatureKey, Map[FeatureKey, Double]]
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,30 @@

package com.salesforce.op.filters

import com.salesforce.op.features.FeatureDistributionLike
import java.util.Objects

import com.salesforce.op.features.{FeatureDistributionLike, FeatureDistributionType}
import com.salesforce.op.stages.impl.feature.{HashAlgorithm, Inclusion, NumericBucketizer}
import com.salesforce.op.utils.json.EnumEntrySerializer
import com.twitter.algebird.Monoid._
import com.twitter.algebird.Operators._
import com.twitter.algebird.Semigroup
import org.apache.spark.mllib.feature.HashingTF
import org.json4s.jackson.Serialization
import org.json4s.{DefaultFormats, Formats}

import scala.util.Try

/**
* Class containing summary information for a feature
*
* @param name name of the feature
* @param key map key associated with distribution (when the feature is a map)
* @param count total count of feature seen
* @param nulls number of empties seen in feature
* @param name name of the feature
* @param key map key associated with distribution (when the feature is a map)
* @param count total count of feature seen
* @param nulls number of empties seen in feature
* @param distribution binned counts of feature values (hashed for strings, evenly spaced bins for numerics)
* @param summaryInfo either min and max number of tokens for text data,
* or splits used for bins for numeric data
* @param summaryInfo either min and max number of tokens for text data, or splits used for bins for numeric data
* @param `type` feature distribution type: training or scoring
*/
case class FeatureDistribution
(
Expand All @@ -55,7 +62,8 @@ case class FeatureDistribution
count: Long,
nulls: Long,
distribution: Array[Double],
summaryInfo: Array[Double]
summaryInfo: Array[Double],
`type`: FeatureDistributionType = FeatureDistributionType.Training
) extends FeatureDistributionLike {

/**
Expand All @@ -64,12 +72,17 @@ case class FeatureDistribution
def featureKey: FeatureKey = (name, key)

/**
* Check that feature distributions belong to the same feature and key.
* Check that feature distributions belong to the same feature, key and type.
*
* @param fd distribution to compare to
*/
def checkMatch(fd: FeatureDistribution): Unit =
require(name == fd.name && key == fd.key, "Name and key must match to compare or combine FeatureDistribution")
private def checkMatch(fd: FeatureDistribution): Unit = {
def check[T](field: String, v1: T, v2: T): Unit = require(v1 == v2,
s"$field must match to compare or combine feature distributions: $v1 != $v2"
)
check("Name", name, fd.name)
check("Key", key, fd.key)
}

/**
* Get fill rate of feature
Expand All @@ -89,7 +102,7 @@ case class FeatureDistribution
val combinedDist = distribution + fd.distribution
// summary info can be empty or min max if hist is empty but should otherwise match so take the longest info
val combinedSummary = if (summaryInfo.length > fd.summaryInfo.length) summaryInfo else fd.summaryInfo
FeatureDistribution(name, key, count + fd.count, nulls + fd.nulls, combinedDist, combinedSummary)
FeatureDistribution(name, key, count + fd.count, nulls + fd.nulls, combinedDist, combinedSummary, `type`)
}

/**
Expand Down Expand Up @@ -135,57 +148,100 @@ case class FeatureDistribution
}

override def toString(): String = {
s"Name=$name, Key=$key, Count=$count, Nulls=$nulls, Histogram=${distribution.toList}, BinInfo=${summaryInfo.toList}"
val valStr = Seq(
"type" -> `type`.toString,
"name" -> name,
"key" -> key,
"count" -> count.toString,
"nulls" -> nulls.toString,
"distribution" -> distribution.mkString("[", ",", "]"),
"summaryInfo" -> summaryInfo.mkString("[", ",", "]")
).map { case (n, v) => s"$n = $v" }.mkString(", ")

s"${getClass.getSimpleName}($valStr)"
}

override def equals(that: Any): Boolean = that match {
case FeatureDistribution(`name`, `key`, `count`, `nulls`, d, s, `type`) =>
distribution.deep == d.deep && summaryInfo.deep == s.deep
case _ => false
}

override def hashCode(): Int = Objects.hashCode(name, key, count, nulls, distribution, summaryInfo, `type`)
}

private[op] object FeatureDistribution {
object FeatureDistribution {

val MaxBins = 100000

implicit val semigroup: Semigroup[FeatureDistribution] = new Semigroup[FeatureDistribution] {
override def plus(l: FeatureDistribution, r: FeatureDistribution) = l.reduce(r)
override def plus(l: FeatureDistribution, r: FeatureDistribution): FeatureDistribution = l.reduce(r)
}

implicit val formats: Formats = DefaultFormats +
EnumEntrySerializer.json4s[FeatureDistributionType](FeatureDistributionType)

/**
* Feature distributions to json
*
* @param fd feature distributions
* @return json array
*/
def toJson(fd: Array[FeatureDistribution]): String = Serialization.write[Array[FeatureDistribution]](fd)

/**
* Feature distributions from json
*
* @param json feature distributions json
* @return feature distributions array
*/
def fromJson(json: String): Try[Array[FeatureDistribution]] = Try {
Serialization.read[Array[FeatureDistribution]](json)
}

/**
* Facilitates feature distribution retrieval from computed feature summaries
*
* @param featureKey feature key
* @param summary feature summary
* @param value optional processed sequence
* @param bins number of histogram bins
* @param featureKey feature key
* @param summary feature summary
* @param value optional processed sequence
* @param bins number of histogram bins
* @param textBinsFormula formula to compute the text features bin size.
* Input arguments are [[Summary]] and number of bins to use in computing feature distributions
* (histograms for numerics, hashes for strings). Output is the bins for the text features.
* @return a pair consisting of response and predictor feature distributions (in this order)
* @param `type` feature distribution type: training or scoring
* @return feature distribution given the provided information
*/
def apply(
private[op] def fromSummary(
featureKey: FeatureKey,
summary: Summary,
value: Option[ProcessedSeq],
bins: Int,
textBinsFormula: (Summary, Int) => Int
textBinsFormula: (Summary, Int) => Int,
`type`: FeatureDistributionType
): FeatureDistribution = {
val (nullCount, (summaryInfo, distribution)): (Int, (Array[Double], Array[Double])) =
value.map(seq => 0 -> histValues(seq, summary, bins, textBinsFormula))
.getOrElse(1 -> (Array(summary.min, summary.max, summary.sum, summary.count) -> new Array[Double](bins)))
val (name, key) = featureKey
val (nullCount, (summaryInfo, distribution)) =
value.map(seq => 0L -> histValues(seq, summary, bins, textBinsFormula))
.getOrElse(1L -> (Array(summary.min, summary.max, summary.sum, summary.count) -> new Array[Double](bins)))

FeatureDistribution(
name = featureKey._1,
key = featureKey._2,
count = 1,
name = name,
key = key,
count = 1L,
nulls = nullCount,
summaryInfo = summaryInfo,
distribution = distribution)
distribution = distribution,
`type` = `type`
)
}

/**
* Function to put data into histogram of counts
*
* @param values values to bin
* @param summary summary info for feature (max, min, etc)
* @param bins number of bins to produce
* @param values values to bin
* @param summary summary info for feature (max, min, etc)
* @param bins number of bins to produce
* @param textBinsFormula formula to compute the text features bin size.
* Input arguments are [[Summary]] and number of bins to use in computing feature distributions
* (histograms for numerics, hashes for strings). Output is the bins for the text features.
Expand Down
Loading

0 comments on commit 6ad1169

Please sign in to comment.