Skip to content

Commit

Permalink
Making model selectors robust to failing models (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
leahmcguire authored and tovbinm committed Aug 2, 2019
1 parent b505ff7 commit 496174c
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 92 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/com/salesforce/op/OpWorkflow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ class OpWorkflow(val uid: String = UID[OpWorkflow]) extends OpWorkflowCore {
log.info("Estimate best Model with CV/TS. Stages included in CV are: {}, {}",
during.flatMap(_.map(_._1.stageName)).mkString(", "), modelSelector.uid: Any
)
modelSelector.findBestEstimator(trainFixed, during, persistEveryKStages)
modelSelector.findBestEstimator(trainFixed, Option(during))
val remainingDAG: StagesDAG = (during :+ (Array(modelSelector -> distance): Layer)) ++ after

log.info("Applying DAG after CV/TS. Stages: {}", remainingDAG.flatMap(_.map(_._1.stageName)).mkString(", "))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ class RawFeatureFilter[T]
@transient protected lazy val log = LoggerFactory.getLogger(this.getClass)

/**
* Get binned counts of the feature distribution and empty count for each raw feature
* Get binned counts of the feature distribution and empty count for each raw feature. This computes all the
* statistics on the training and scoring data. It does two map reduce operations, the first to produce a Summary
* of each feature, the second to produce a binned histogram (Distribution) for each feature based on the Summary.
*
* @param data data frame to compute counts on
* @param features list of raw, non-protected, features contained in the dataframe
Expand All @@ -151,6 +153,7 @@ class RawFeatureFilter[T]
val predOut = allPredictors.map(TransientFeature(_))
(respOut, predOut)
}
// process all features based on raw type so that they can be summerized as either text or numeric
val preparedFeatures: RDD[PreparedFeatures] = data.rdd.map(PreparedFeatures(_, responses, predictors, timePeriod))

implicit val sgTuple2Maps = new Tuple2Semigroup[Map[FeatureKey, Summary], Map[FeatureKey, Summary]]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import enumeratum.Enum
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.ParamGridBuilder

import scala.concurrent.duration.Duration


/**
* A factory for Binary Classification Model Selector
Expand Down Expand Up @@ -153,6 +155,7 @@ case object BinaryClassificationModelSelector extends ModelSelectorFactory {
* for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be
* an Estimator that takes in a label (RealNN) and features (OPVector) and returns a
* prediction (Prediction)
* @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day)
* @return Classification Model Selector with a Cross Validation
*/
def withCrossValidation(
Expand All @@ -164,10 +167,12 @@ case object BinaryClassificationModelSelector extends ModelSelectorFactory {
stratify: Boolean = ValidatorParamDefaults.Stratify,
parallelism: Int = ValidatorParamDefaults.Parallelism,
modelTypesToUse: Seq[BinaryClassificationModelsToTry] = Defaults.modelTypesToUse,
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty,
maxWait: Duration = ValidatorParamDefaults.MaxWait
): ModelSelector[ModelType, EstimatorType] = {
val cv = new OpCrossValidation[ModelType, EstimatorType](
numFolds = numFolds, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism
numFolds = numFolds, seed = seed, evaluator = validationMetric, stratify = stratify,
parallelism = parallelism, maxWait = maxWait
)
selector(cv,
splitter = splitter,
Expand Down Expand Up @@ -198,6 +203,7 @@ case object BinaryClassificationModelSelector extends ModelSelectorFactory {
* for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be
* an Estimator that takes in a label (RealNN) and features (OPVector) and returns a
* prediction (Prediction)
* @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day)
* @return Classification Model Selector with a Train Validation Split
*/
def withTrainValidationSplit(
Expand All @@ -209,10 +215,12 @@ case object BinaryClassificationModelSelector extends ModelSelectorFactory {
stratify: Boolean = ValidatorParamDefaults.Stratify,
parallelism: Int = ValidatorParamDefaults.Parallelism,
modelTypesToUse: Seq[BinaryClassificationModelsToTry] = Defaults.modelTypesToUse,
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty,
maxWait: Duration = ValidatorParamDefaults.MaxWait
): ModelSelector[ModelType, EstimatorType] = {
val ts = new OpTrainValidationSplit[ModelType, EstimatorType](
trainRatio = trainRatio, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism
trainRatio = trainRatio, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism,
maxWait = maxWait
)
selector(ts,
splitter = splitter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ package com.salesforce.op.stages.impl.classification
import com.salesforce.op.evaluators._
import com.salesforce.op.stages.impl.ModelsToTry
import com.salesforce.op.stages.impl.classification.{MultiClassClassificationModelsToTry => MTT}
import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelectorFactory, ModelSelector}
import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelector, ModelSelectorFactory}
import com.salesforce.op.stages.impl.tuning._
import enumeratum.Enum
import com.salesforce.op.stages.impl.selector.ModelSelectorNames.{EstimatorType, ModelType}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.ParamGridBuilder

import scala.concurrent.duration.Duration


/**
* A factory for Multi Classification Model Selector
Expand Down Expand Up @@ -133,6 +135,7 @@ case object MultiClassificationModelSelector extends ModelSelectorFactory {
* for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be
* an Estimator that takes in a label (RealNN) and features (OPVector) and returns a
* prediction (Prediction)
* @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day)
* @return MultiClassification Model Selector with a Cross Validation
*/
def withCrossValidation(
Expand All @@ -144,10 +147,12 @@ case object MultiClassificationModelSelector extends ModelSelectorFactory {
stratify: Boolean = ValidatorParamDefaults.Stratify,
parallelism: Int = ValidatorParamDefaults.Parallelism,
modelTypesToUse: Seq[MultiClassClassificationModelsToTry] = Defaults.modelTypesToUse,
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty,
maxWait: Duration = ValidatorParamDefaults.MaxWait
): ModelSelector[ModelType, EstimatorType] = {
val cv = new OpCrossValidation[ModelType, EstimatorType](
numFolds = numFolds, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism
numFolds = numFolds, seed = seed, evaluator = validationMetric, stratify = stratify, parallelism = parallelism,
maxWait = maxWait
)
selector(cv,
splitter = splitter,
Expand Down Expand Up @@ -178,6 +183,7 @@ case object MultiClassificationModelSelector extends ModelSelectorFactory {
* for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be
* an Estimator that takes in a label (RealNN) and features (OPVector) and returns a
* prediction (Prediction)
* @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day)
* @return MultiClassification Model Selector with a Train Validation Split
*/
def withTrainValidationSplit(
Expand All @@ -189,7 +195,8 @@ case object MultiClassificationModelSelector extends ModelSelectorFactory {
stratify: Boolean = ValidatorParamDefaults.Stratify,
parallelism: Int = ValidatorParamDefaults.Parallelism,
modelTypesToUse: Seq[MultiClassClassificationModelsToTry] = Defaults.modelTypesToUse,
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty,
maxWait: Duration = ValidatorParamDefaults.MaxWait
): ModelSelector[ModelType, EstimatorType] = {
val ts = new OpTrainValidationSplit[ModelType, EstimatorType](
trainRatio = trainRatio, seed = seed, validationMetric, stratify = stratify, parallelism = parallelism
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ import com.salesforce.op.evaluators._
import com.salesforce.op.stages.impl.ModelsToTry
import com.salesforce.op.stages.impl.regression.{RegressionModelsToTry => MTT}
import com.salesforce.op.stages.impl.selector.ModelSelectorNames.{EstimatorType, ModelType}
import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelectorFactory, ModelSelector}
import com.salesforce.op.stages.impl.selector.{DefaultSelectorParams, ModelSelector, ModelSelectorFactory}
import com.salesforce.op.stages.impl.tuning._
import enumeratum.Enum
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.ParamGridBuilder

import scala.concurrent.duration.Duration


/**
* A factory for Regression Model Selector
Expand All @@ -57,7 +59,7 @@ case object RegressionModelSelector extends ModelSelectorFactory {
* Note: [[OpDecisionTreeRegressor]] and [[OpXGBoostRegressor]] are off by default
*/
val modelTypesToUse: Seq[RegressionModelsToTry] = Seq(
MTT.OpLinearRegression, MTT.OpRandomForestRegressor, MTT.OpGBTRegressor, MTT.OpGeneralizedLinearRegression
MTT.OpLinearRegression, MTT.OpRandomForestRegressor, MTT.OpGBTRegressor
)

/**
Expand Down Expand Up @@ -107,6 +109,7 @@ case object RegressionModelSelector extends ModelSelectorFactory {
val glrParams = new ParamGridBuilder()
.addGrid(glr.fitIntercept, DefaultSelectorParams.FitIntercept)
.addGrid(glr.family, DefaultSelectorParams.DistFamily)
.addGrid(glr.link, DefaultSelectorParams.LinkFunction)
.addGrid(glr.maxIter, DefaultSelectorParams.MaxIterLin)
.addGrid(glr.regParam, DefaultSelectorParams.Regularization)
.addGrid(glr.tol, DefaultSelectorParams.Tol)
Expand Down Expand Up @@ -145,6 +148,7 @@ case object RegressionModelSelector extends ModelSelectorFactory {
* for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be
* an Estimator that takes in a label (RealNN) and features (OPVector) and returns a
* prediction (Prediction)
* @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day)
* @return Regression Model Selector with a Cross Validation
*/
def withCrossValidation(
Expand All @@ -155,10 +159,11 @@ case object RegressionModelSelector extends ModelSelectorFactory {
seed: Long = ValidatorParamDefaults.Seed,
parallelism: Int = ValidatorParamDefaults.Parallelism,
modelTypesToUse: Seq[RegressionModelsToTry] = Defaults.modelTypesToUse,
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty,
maxWait: Duration = ValidatorParamDefaults.MaxWait
): ModelSelector[ModelType, EstimatorType] = {
val cv = new OpCrossValidation[ModelType, EstimatorType](
numFolds = numFolds, seed = seed, validationMetric, parallelism = parallelism
numFolds = numFolds, seed = seed, evaluator = validationMetric, parallelism = parallelism, maxWait = maxWait
)
selector(cv,
splitter = dataSplitter,
Expand Down Expand Up @@ -188,6 +193,7 @@ case object RegressionModelSelector extends ModelSelectorFactory {
* for model selection Seq[(EstimatorType, Array[ParamMap])] where Estimator type must be
* an Estimator that takes in a label (RealNN) and features (OPVector) and returns a
* prediction (Prediction)
* @param maxWait maximum allowable time to wait for a model to finish running (default is 1 day)
* @return Regression Model Selector with a Train Validation Split
*/
def withTrainValidationSplit(
Expand All @@ -198,7 +204,8 @@ case object RegressionModelSelector extends ModelSelectorFactory {
seed: Long = ValidatorParamDefaults.Seed,
parallelism: Int = ValidatorParamDefaults.Parallelism,
modelTypesToUse: Seq[RegressionModelsToTry] = Defaults.modelTypesToUse,
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty
modelsAndParameters: Seq[(EstimatorType, Array[ParamMap])] = Seq.empty,
maxWait: Duration = ValidatorParamDefaults.MaxWait
): ModelSelector[ModelType, EstimatorType] = {
val ts = new OpTrainValidationSplit[ModelType, EstimatorType](
trainRatio = trainRatio, seed = seed, validationMetric, parallelism = parallelism
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,16 @@ object DefaultSelectorParams {
val RegSolver = Array("auto") // regression solver spark default auto
val FitIntercept = Array(true) // fit intercept spark default true
val NbSmoothing = Array(1.0) // spark default 1.0
val DistFamily = Array("gaussian", "poisson") // generalized linear model link family
val DistFamily = Array("gaussian", "binomial", "poisson", "gamma", "tweedie") // glm distribution family
val LinkFunction = Array("identity", "log", "inverse", "logit", "probit", "cloglog", "sqrt") // glm link function
// Valid link functions for each family is listed below. The first link function of each family
// is the default one.
// - "gaussian" : "identity", "log", "inverse"
// - "binomial" : "logit", "probit", "cloglog"
// - "poisson" : "log", "identity", "sqrt"
// - "gamma" : "inverse", "identity", "log"
// - "tweedie" : power link function specified through "linkPower". The default link power in
// the tweedie family is 1 - variancePower.
val NumRound = Array(100) // number of rounds for xgboost (default 1)
val Eta = Array(0.1 , 0.3) // step size shrinkage for xgboost (default 0.3)
val MinChildWeight = Array(1.0, 5.0, 10.0) // minimum sum of instance weight needed in a child for xgboost (default 1)
Expand Down
Loading

0 comments on commit 496174c

Please sign in to comment.