Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Model combiner #385

Merged
merged 19 commits into from
Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
got class test passing
  • Loading branch information
leahmcguire committed Aug 15, 2019
commit ba8e587a27d5a79463ecf7cb354b15a5d3e8653f
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ case object ModelInsights {
val models = stages.collect{
case s: SelectedModel => s
case s: OpPredictorWrapperModel[_] => s
case s: SelectedModelCombiner => s
case s: SelectedCombinerModel => s
} // TODO support other model types?
val model = models.lastOption
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,43 @@ import com.salesforce.op.UID
import com.salesforce.op.evaluators.{EvalMetric, EvaluationMetrics, OpEvaluatorBase}
import com.salesforce.op.features.TransientFeature
import com.salesforce.op.features.types.{Prediction, RealNN}
import com.salesforce.op.stages.base.ternary.{TernaryEstimator, TernaryModel}
import com.salesforce.op.stages.OpPipelineStage3
import com.salesforce.op.stages.base.ternary.OpTransformer3
import com.salesforce.op.utils.spark.RichMetadata._
import enumeratum.{Enum, EnumEntry}
import org.apache.spark.ml.param.{Param, Params}
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.sql.Dataset

import scala.reflect.runtime.universe._


trait SelectedModelCombinerParams extends Params {
trait SelectedCombinerParams extends Params {

final val combinationStrategy = new Param[String](parent = this, name = "combinationStrategy",
doc = "Method used to combine predictions",
isValid = (in: String) => CombinationStrategy.values.contains(in)
isValid = (in: String) => CombinationStrategy.values.map(_.entryName).contains(in)
)
def setCombinationStrategy(value: CombinationStrategy): this.type = set(combinationStrategy, value.entryName)
def getCombinationStrategy(): CombinationStrategy = CombinationStrategy.withName($(combinationStrategy))
setDefault(combinationStrategy, CombinationStrategy.Best.entryName)

}

class SelectedModelCombiner(uid: String = UID[SelectedModelCombiner]) extends
TernaryEstimator[RealNN, Prediction, Prediction, Prediction]( operationName = "combineModels",
uid = uid
) with SelectedModelCombinerParams with HasEval {

private var p1weight = 0.0
private var p2weight = 0.0
private var metricName = _
class SelectedCombiner
leahmcguire marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SelectedModelCombiner or ModelCombiner perhaps? it's difficult to understand what this stage does otherwise.

(
val operationName: String = "combineModels",
val uid: String = UID[SelectedCombiner]
)(
implicit val tto: TypeTag[Prediction],
val ttov: TypeTag[Prediction#Value]
) extends Estimator[SelectedCombinerModel] with
OpPipelineStage3[RealNN, Prediction, Prediction, Prediction] with SelectedCombinerParams with HasEval {

override def evaluators: Seq[OpEvaluatorBase[_ <: EvaluationMetrics]] = {
val ev1 = in1.getFeature().originStage.asInstanceOf[ModelSelector].evaluators
val ev1 = in2.getFeature().originStage.asInstanceOf[ModelSelector[_, _]].evaluators
val ev1names = ev1.map(_.name).toSet
val ev2 = in2.getFeature().originStage.asInstanceOf[ModelSelector].evaluators
val ev2 = in2.getFeature().originStage.asInstanceOf[ModelSelector[_, _]].evaluators
ev1 ++ ev2.filter(e => ev1names.contains(e.name))
}

Expand All @@ -71,106 +79,126 @@ class SelectedModelCombiner(uid: String = UID[SelectedModelCombiner]) extends
override protected[op] def labelColName: String = in1.name

private def getSummary(feature: TransientFeature) =
ModelSelectorSummary.fromMetadata(getInputSchema()(feature.name).metadata)
ModelSelectorSummary.fromMetadata(getInputSchema()(feature.name).metadata.getSummaryMetadata())

override def onSetInput() = {
super.onSetInput()

require(
in1.getFeature().originStage.isInstanceOf[ModelSelector[_, _]] &&
in2.getFeature().originStage.isInstanceOf[ModelSelector[_, _]],
in2.getFeature().originStage.isInstanceOf[ModelSelector[_, _]] &&
in3.getFeature().originStage.isInstanceOf[ModelSelector[_, _]],
"Predictions must be from model selectors - other types of model are not supported at this time"
)

val summary1 = getSummary(in1)
val summary2 = getSummary(in2)
}

override def fit(dataset: Dataset[_]): SelectedCombinerModel = {
setInputSchema(dataset.schema).transformSchema(dataset.schema)

val summary1 = getSummary(in2)
val summary2 = getSummary(in3)

require(summary1.problemType == summary2.problemType,
s"Cannot combine model selectors for different problem types found ${summary1.problemType}" +
s" and ${summary2.problemType}")

val eval1 = summary1.evaluationMetric
val eval2 = summary2.evaluationMetric

def getMetricValue(metrics: EvaluationMetrics, name: EvalMetric) = metrics.toMap.get(name.entryName)
def getMetricValue(metrics: EvaluationMetrics, name: EvalMetric) = metrics.toMap.get(name.humanFriendlyName)
.map(_.asInstanceOf[Double])

def getWinningModelMetric(summary: ModelSelectorSummary) =
summary.validationResults.collectFirst{
case r if r.modelUID == summary.bestModelUID => getMetricValue(r.metricValues, summary.evaluationMetric)
def getWinningModelMetric(summary: ModelSelectorSummary) = {
summary.validationResults.collectFirst {
case r if r.modelUID == summary.bestModelUID =>
getMetricValue(r.metricValues, summary.evaluationMetric)
}.flatten
}

def getMet(optionMet: Option[Double]) = optionMet.getOrElse {
throw new RuntimeException("Evaluation metrics for two model selectors are non-overlapping")
}

val (metricValueOpt1, metricValueOpt2) =

val eval1 = summary1.evaluationMetric
val eval2 = summary2.evaluationMetric

val (metricValueOpt1: Option[Double], metricValueOpt2: Option[Double], metricName: EvalMetric) =
if (eval1 == eval2) { // same decision metric in validation results
metricName = eval1
(getWinningModelMetric(summary1), getWinningModelMetric(summary2))
(getWinningModelMetric(summary1), getWinningModelMetric(summary2), eval1)
} else { // look for overlapping metrics in training results
val m2e1 = getMetricValue(summary2.trainEvaluation, eval1)
val m1e2 = getMetricValue(summary1.trainEvaluation, eval2)
if (m2e1.nonEmpty) {
metricName = eval1
(getMetricValue(summary1.trainEvaluation, eval1), m2e1)
(getMetricValue(summary1.trainEvaluation, eval1), m2e1, eval1)
} else if (m1e2.nonEmpty) {
metricName = eval2
(m1e2, getMetricValue(summary2.trainEvaluation, eval2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need an eval2 at the end of the tuple here? How did the Scala compiler miss that actually??

Can you add some tests that cover some of these weird edge cases? I feel like there's a lot of new stuff being added.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

} else (None, None)
} else (None, None, eval1)
}

def getMet(optionMet: Option[Double]) = optionMet.getOrElse {
throw new RuntimeException("Evaluation metrics for two model selectors are non-overlapping")
def makeMeta(model: SelectedCombinerModel): Unit = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please make these private methods on the stage instead of them being nested inside fit?

def updateKeys(map: Map[String, Any], string: String) = map.map{ case (k, v) => k + string -> v }

if (model.weight1 == 1.0) //TODO fix
setMetadata(summary1.toMetadata().toSummaryMetadata())
else if (model.weight2 == 1.0)
setMetadata(summary2.toMetadata().toSummaryMetadata())
else {
val summary = new ModelSelectorSummary(
validationType = summary1.validationType,
validationParameters = updateKeys(summary1.validationParameters, "_1") ++
updateKeys(summary2.validationParameters, "_2"),
dataPrepParameters = updateKeys(summary1.dataPrepParameters, "_1") ++
updateKeys(summary2.dataPrepParameters, "_2"),
dataPrepResults = summary1.dataPrepResults.orElse(summary2.dataPrepResults),
evaluationMetric = metricName,
problemType = summary1.problemType,
bestModelUID = summary1.bestModelUID + " " + summary2.bestModelUID,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is space a good separator here? perhaps _ or - instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space will be parsable to get them separated while "-" or "_" would not

bestModelName = summary1.bestModelName + " " + summary2.bestModelName,
bestModelType = summary1.bestModelType + " " +summary2.bestModelType,
validationResults = summary1.validationResults ++ summary2.validationResults,
trainEvaluation = evaluate(model.transform(dataset)),
holdoutEvaluation = None
)
setMetadata(summary.toMetadata().toSummaryMetadata())
}
}

val (metricValue1, metricValue2) = (getMet(metricValueOpt1), getMet(metricValueOpt2))

(CombinationStrategy.withName($(combinationStrategy)), metricValue1 > metricValue2) match {
case (CombinationStrategy.Best, true) => setMetadata(summary1.toMetadata())
p1weight = 1.0
case (CombinationStrategy.Best, false) => setMetadata(summary2.toMetadata())
p2weight = 1.0
case (CombinationStrategy.Weighted, _) =>
p1weight = metricValue1 / (metricValue1 + metricValue2)
p2weight = metricValue2 / (metricValue1 + metricValue2)
val (weight1, weight2) = getCombinationStrategy() match {
case CombinationStrategy.Best =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be a method on CombinationStrategy itself. e.g. val (weight1, weight2) = getCombinationStrategy().computeWeights(metricValue1, metricValue2)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it actually cant unless I move the EvalMetric class to the features module

if (metricValue1 > metricValue2) (1.0, 0.0) else (0.0, 1.0)
case CombinationStrategy.Weighted =>
(metricValue1 / (metricValue1 + metricValue2), metricValue2 / (metricValue1 + metricValue2))
case CombinationStrategy.Equal =>
(0.5, 0.5)
}
}


val model: SelectedCombinerModel =
new SelectedCombinerModel(weight1 = weight1, weight2 = weight2, operationName = operationName, uid = uid)
.setEvaluators(evaluators)
.setParent(this)
.setInput(in1.asFeatureLike[RealNN], in2.asFeatureLike[Prediction], in3.asFeatureLike[Prediction])
.setOutputFeatureName(getOutputFeatureName)

override def fitFn(
dataset: Dataset[(Option[Double],
Map[String, Double],
Map[String, Double])]
): TernaryModel[RealNN, Prediction, Prediction, Prediction] = {
def updateKeys(map: Map[String, Any], string: String) = map.map{ case (k, v) => k + string -> v }

if (CombinationStrategy.withName($(combinationStrategy)) == CombinationStrategy.Weighted) {
val summary1 = getSummary(in1)
val summary2 = getSummary(in2)
val summary = new ModelSelectorSummary(
validationType = summary1.validationType,
validationParameters = updateKeys(summary1.validationParameters, "_1") ++
updateKeys(summary2.validationParameters, "_2"),
dataPrepParameters = updateKeys(summary1.dataPrepParameters, "_1") ++
updateKeys(summary2.dataPrepParameters, "_2"),
dataPrepResults = summary1.dataPrepResults.orElse(summary2.dataPrepResults),
evaluationMetric = metricName,
problemType = summary1.problemType,
bestModelUID = summary1.bestModelUID + summary2.bestModelUID,
bestModelName = summary1.bestModelName + summary2.bestModelName,
bestModelType = summary1.bestModelType + summary2.bestModelType,
validationResults = summary1.validationResults ++ summary2.validationResults,
trainEvaluation = evaluate(dataset),
holdoutEvaluation = None
)
setMetadata(summary.toMetadata())
}
new SelectedModelCombinerModel(weight1 = p1weight, weight2 = p2weight, operationName = operationName, uid = uid)
.setEvaluators(evaluators)
makeMeta(model)
model.setMetadata(getMetadata())
}

}

final class SelectedModelCombinerModel(weight1: Double, weight2: Double, operationName: String, uid: String) extends
TernaryModel[RealNN, Prediction, Prediction, Prediction](operationName = operationName, uid = uid) with HasTestEval {
final class SelectedCombinerModel private[op]
(
val weight1: Double,
val weight2: Double,
val operationName: String,
val uid: String
)(
implicit val tti1: TypeTag[RealNN],
val tti2: TypeTag[Prediction],
val tti3: TypeTag[Prediction],
val tto: TypeTag[Prediction],
val ttov: TypeTag[Prediction#Value]
) extends Model[SelectedCombinerModel] with OpTransformer3[RealNN, Prediction, Prediction, Prediction]
with HasTestEval {

override protected[op] def outputsColNamesMap: Map[String, String] =
Map(ModelSelectorNames.outputParamName -> getOutputFeatureName)
Expand All @@ -194,10 +222,12 @@ final class SelectedModelCombinerModel(weight1: Double, weight2: Double, operati
override def evaluators: Seq[OpEvaluatorBase[_ <: EvaluationMetrics]] = evaluatorList
}


sealed abstract class CombinationStrategy(val name: String) extends EnumEntry with Serializable

object CombinationStrategy extends Enum[CombinationStrategy] {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this enum should be defined in feature module and registered with the json in OpPipelineStageReadWriteFormats

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs here too

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val values = findValues
case object Weighted extends CombinationStrategy("weighted")
case object Equal extends CombinationStrategy("equal")
case object Best extends CombinationStrategy("best")
}
Original file line number Diff line number Diff line change
Expand Up @@ -798,4 +798,8 @@ class ModelInsightsTest extends FlatSpec with PassengerSparkFixtureTest with Dou
}
}
}

it should "return correct insights when a model combiner is used as the final feature" in {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.salesforce.op.stages.impl.selector

import com.salesforce.op.OpWorkflow
import com.salesforce.op.evaluators.OpBinScoreEvaluator
import com.salesforce.op.features.{Feature, FeatureBuilder}
import com.salesforce.op.features.types.{OPVector, Prediction, RealNN}
import com.salesforce.op.stages.impl.PredictionEquality
import com.salesforce.op.stages.impl.classification.{BinaryClassificationModelSelector, OpLogisticRegression, OpRandomForestClassifier}
import com.salesforce.op.test.OpEstimatorSpec
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.mllib.random.RandomRDDs.normalVectorRDD
import org.apache.spark.sql.Dataset
import com.salesforce.op.utils.spark.RichDataset._
import com.salesforce.op.utils.spark.RichMetadata._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class SelectedCombinerTest extends OpEstimatorSpec[Prediction, SelectedCombinerModel, SelectedCombiner]
with PredictionEquality {

val (seed, smallCount, bigCount) = (1234L, 20, 80)

import spark.implicits._

// Generate positive observations following a distribution ~ N((0.0, 0.0, 0.0), I_3)
val positiveData =
normalVectorRDD(sc, bigCount, 3, seed = seed)
.map(v => 1.0 -> Vectors.dense(v.toArray))

// Generate negative observations following a distribution ~ N((10.0, 10.0, 10.0), I_3)
val negativeData =
normalVectorRDD(sc, smallCount, 3, seed = seed)
.map(v => 0.0 -> Vectors.dense(v.toArray.map(_ + 10.0)))

val data = positiveData.union(negativeData).toDF("label", "features")

val (label, Array(features: Feature[OPVector]@unchecked)) = FeatureBuilder.fromDataFrame[RealNN](
data, response = "label", nonNullable = Set("features")
)

val lr = new OpLogisticRegression()
val lrParams = new ParamGridBuilder()
.addGrid(lr.regParam, DefaultSelectorParams.Regularization)
.build()

val rf = new OpRandomForestClassifier()
val rfParams = new ParamGridBuilder()
.addGrid(rf.maxDepth, Array(2))
.addGrid(rf.minInfoGain, Array(10.0))
.build()

val ms1 = BinaryClassificationModelSelector
.withCrossValidation(modelsAndParameters = Seq(lr -> lrParams),
trainTestEvaluators = Seq(new OpBinScoreEvaluator()))
.setInput(label, features)
.getOutput()

val ms2 = BinaryClassificationModelSelector
.withCrossValidation(modelsAndParameters = Seq(rf -> rfParams))
.setInput(label, features)
.getOutput()


override val inputData: Dataset[_] = new OpWorkflow()
.setResultFeatures(ms1, ms2)
.transform(data)

override val estimator: SelectedCombiner = new SelectedCombiner().setInput(label, ms1, ms2)

override val expectedResult: Seq[Prediction] = inputData.collect(ms1)

it should "have the correct metadata for the best model" in {
ModelSelectorSummary.fromMetadata(estimator.fit(inputData).getMetadata().getSummaryMetadata()) shouldBe
ModelSelectorSummary.fromMetadata(inputData.schema(ms1.name).metadata.getSummaryMetadata())
}

it should "combine model results correctly" in {
val model = estimator.setCombinationStrategy(CombinationStrategy.Weighted).fit(inputData)
.asInstanceOf[SelectedCombinerModel]
val outfeature = model.getOutput()
val outdata = model.transform(inputData)
outdata.collect(outfeature).map(_.probability(0)) shouldEqual inputData.collect(ms1, ms2)
.map{ case (p1, p2) => p1.probability(0) * model.weight1 + p2.probability(0) * model.weight2}
val meta = ModelSelectorSummary.fromMetadata(outdata.schema(outfeature.name).metadata.getSummaryMetadata())
val meta1 = ModelSelectorSummary.fromMetadata(inputData.schema(ms1.name).metadata.getSummaryMetadata())
val meta2 = ModelSelectorSummary.fromMetadata(inputData.schema(ms2.name).metadata.getSummaryMetadata())
meta.bestModelUID shouldBe meta1.bestModelUID + " " + meta2.bestModelUID
meta.trainEvaluation == meta1.trainEvaluation shouldBe false
meta.trainEvaluation == meta2.trainEvaluation shouldBe false
meta.trainEvaluation.toMap.keySet shouldBe meta1.trainEvaluation.toMap.keySet
.union(meta2.trainEvaluation.toMap.keySet)
}
}