Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write and read Spark stages to/from MLeap instead of Spark classes #475

Merged
merged 20 commits into from
Sep 2, 2020

Conversation

leahmcguire
Copy link
Collaborator

@leahmcguire leahmcguire commented May 5, 2020

Related issues
Currently, Spark save method is used to serialize and deserialize Spark wrapped stages. This PR changes the underlying serialization to write and read from MLeap bundles.

Describe the proposed solution
Writes to MLeap and reads from MLeap with fallback to trying to read from Spark save.

Describe alternatives you've considered
N/A

Additional context
Next steps will be PR's to read the stages directly with the MLeap context rather than the Spark context for local scoring (and possibly all scoring - to better optimize the DAG)

@codecov
Copy link

codecov bot commented May 5, 2020

Codecov Report

Merging #475 into master will decrease coverage by 0.30%.
The diff coverage is 70.41%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #475      +/-   ##
==========================================
- Coverage   87.04%   86.74%   -0.31%     
==========================================
  Files         346      346              
  Lines       11782    11848      +66     
  Branches      385      374      -11     
==========================================
+ Hits        10256    10277      +21     
- Misses       1526     1571      +45     
Impacted Files Coverage Δ
...impl/classification/OpDecisionTreeClassifier.scala 63.63% <ø> (-7.80%) ⬇️
...p/stages/impl/classification/OpGBTClassifier.scala 46.66% <ø> (-8.89%) ⬇️
...ges/impl/classification/OpLogisticRegression.scala 56.00% <ø> (-4.72%) ⬇️
...ssification/OpMultilayerPerceptronClassifier.scala 60.00% <ø> (-9.24%) ⬇️
...e/op/stages/impl/classification/OpNaiveBayes.scala 71.42% <ø> (-8.58%) ⬇️
...impl/classification/OpRandomForestClassifier.scala 66.66% <ø> (-5.56%) ⬇️
...ages/impl/regression/OpDecisionTreeRegressor.scala 50.00% <ø> (ø)
...rce/op/stages/impl/regression/OpGBTRegressor.scala 53.33% <ø> (ø)
...op/stages/impl/regression/OpLinearRegression.scala 76.00% <ø> (ø)
...ages/impl/regression/OpRandomForestRegressor.scala 50.00% <ø> (ø)
... and 23 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a350711...38960e9. Read the comment docs.

@leahmcguire leahmcguire changed the title [WIP] write and read spark stages to/from mleap instead of spark write and read spark stages to/from mleap instead of spark Aug 3, 2020
@leahmcguire
Copy link
Collaborator Author

@TuanNguyen27 the test that you put in that should have failed on the local XGboost is (correctly) failing in this PR.

.setParent(this)
.setInput(in1.asFeatureLike[RealNN], in2.asFeatureLike[OPVector])
.setMetadata(getMetadata())
.setOutputFeatureName(getOutputFeatureName)

if (model.isInstanceOf[XGBoostClassificationModel] || model.isInstanceOf[XGBoostRegressionModel]) {
wrappedModel.setOutputDF(model.transform(dataset.limit(1)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious why do we have .limit(1) here ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only need one example for the xgboost mleap save (it has a step that calls .first() to get the vector size)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment for it. Looks like this is the only such exception so far.

@tovbinm tovbinm changed the title write and read spark stages to/from mleap instead of spark Write and read Spark stages to/from MLeap instead of Spark classes Sep 1, 2020
@@ -125,4 +127,9 @@ class OpRandomForestRegressionModel
ttov: TypeTag[Prediction#Value]
) extends OpPredictionModel[RandomForestRegressionModel](
sparkModel = sparkModel, uid = uid, operationName = operationName
)
){
@transient lazy protected val predict: Vector => Double = getSparkMlStage().map(s => s.predict(_))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also seems to be a very repetitive pattern. We can add a helper method for it as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the problem with a helper function in this one is there is no shared class for the mleap regressors that contains the predict function. they all implement it but you have to cast them to their specific type to get the predict. Thus a helper function only saves one map and makes it hard to read. I suppose I could use reflection in a shared helper...do you think that is better?

features/build.gradle Outdated Show resolved Hide resolved
(opStage, sparkStage, i)
val mleapStages = stagesWithIndex.filterNot(_._1.isInstanceOf[OpTransformer]).collect {
case (opStage: OPStage with SparkWrapperParams[_], i) if opStage.getLocalMlStage().isDefined =>
val model = opStage.getLocalMlStage().get
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better pattern match and error gracefully when local model is missing. For example:

opStage.getLocalMlStage() match {
  case None => throw new RuntimeException("Local model not found for stage ${opStage.uid} of type ${opStage.getClass}")
  case Some(model) =>
     // Apply model 
}

Copy link
Collaborator

@tovbinm tovbinm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!!

@leahmcguire leahmcguire merged commit 1040361 into master Sep 2, 2020
@leahmcguire leahmcguire deleted the lm/mlleapSave branch September 2, 2020 18:10
@tovbinm
Copy link
Collaborator

tovbinm commented Sep 2, 2020

🥳 🥳 🥳

nicodv added a commit that referenced this pull request Sep 16, 2020
@koertkuipers
Copy link

this seems to have broken some of our inhouse unit tests. in some cases it was because we wrote to relative paths i think. those were easily fixed by making paths absolute. in other situations the paths were absolute and i am unsure why it broke at this point...

stacktraces all have to do with mleap BundleFile on reading and writing. always the same NPE in UnixPath.normalizeAndCheck. for example:

[info]   Cause: java.lang.NullPointerException:
[info]   at sun.nio.fs.UnixPath.normalizeAndCheck(UnixPath.java:77)
[info]   at sun.nio.fs.UnixPath.<init>(UnixPath.java:71)
[info]   at sun.nio.fs.UnixFileSystem.getPath(UnixFileSystem.java:281)
[info]   at ml.combust.bundle.BundleFile$.apply(BundleFile.scala:59)
[info]   at ml.combust.bundle.BundleFile$.apply(BundleFile.scala:40)
[info]   at com.salesforce.op.stages.SparkStageParam.$anonfun$jsonDecodeMleap$1(SparkStageParam.scala:164)
[info]   at resource.DefaultManagedResource.open(AbstractManagedResource.scala:110)
[info]   at resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:87)
[info]   at resource.DeferredExtractableManagedResource.either(AbstractManagedResource.scala:29)
[info]   at resource.DeferredExtractableManagedResource.opt(AbstractManagedResource.scala:31)
[info]   at com.salesforce.op.stages.SparkStageParam.jsonDecodeMleap(SparkStageParam.scala:173)
[info]   at com.salesforce.op.stages.SparkStageParam.jsonDecode(SparkStageParam.scala:123)
[info]   at com.salesforce.op.stages.SparkStageParam.jsonDecode(SparkStageParam.scala:55)
[info]   at org.apache.spark.ml.util.DefaultParamsReader$Metadata.$anonfun$setParams$1(ReadWrite.scala:564)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.apache.spark.ml.util.DefaultParamsReader$Metadata.setParams(ReadWrite.scala:561)
[info]   at org.apache.spark.ml.util.DefaultParamsReader$Metadata.getAndSetParams(ReadWrite.scala:549)
[info]   at org.apache.spark.ml.SparkDefaultParamsReadWrite$.getAndSetParams(SparkDefaultParamsReadWrite.scala:126)

@koertkuipers
Copy link

is protobuf 3 going to be an issue on spark/hadoop?

@tovbinm
Copy link
Collaborator

tovbinm commented Sep 21, 2020

@koertkuipers Can you please open an issue to track this? Can you also share which transformer / estimator are you using in your workflow?

@koertkuipers
Copy link

koertkuipers commented Sep 23, 2020 via email

@salesforce-cla
Copy link

salesforce-cla bot commented Nov 3, 2020

Thanks for the contribution! It looks like @leahmcguire is an internal user so signing the CLA is not required. However, we need to confirm this.

@salesforce-cla
Copy link

Thanks for the contribution! Unfortunately we can't verify the commit author(s): leahmcguire <l***@s***.com> Leah McGuire <l***@s***.com>. One possible solution is to add that email to your GitHub account. Alternatively you can change your commits to another email and force push the change. After getting your commits associated with your GitHub account, refresh the status of this Pull Request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants