Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XGBoostSageMakerEstimator.fit() returns libsvm exception when reading csv file. #47

Open
haowang-ms89 opened this issue May 2, 2018 · 32 comments
Labels

Comments

@haowang-ms89
Copy link

haowang-ms89 commented May 2, 2018

I write my python code with Zeppelin 0.7.3 and Spark 2.3.0 on an EMR (emr-5.13.0) cluster to use SageMaker's XGBoost algorithm. The input data is a csv file. The first 3 lines of the file are (the first column is 0 or 1 for target class, and there is no header line):
0,9.6071,2,1,1,2,1,1,1,1,3,1,0,0,0,0,3,0,0,3,0,0,3,0,2,1,1,1 0,2.7296,3,1,1,1,1,1,0,0,8,1,0,0,0,0,3,0,0,3,0,0,3,0,1,1,1,1 0,10.3326,1,0,1,2,1,1,0,0,4,1,1,0,1,0,3,0,0,3,0,0,3,0,0,3,0,0

I imported as the example does:
%pyspark from pyspark import SparkContext, SparkConf from sagemaker_pyspark import IAMRole, classpath_jars from sagemaker_pyspark.algorithms import XGBoostSageMakerEstimator

I initialize the estimator:
%pyspark xgboost_estimator = XGBoostSageMakerEstimator( trainingInstanceType="ml.m3.xlarge", trainingInstanceCount=1, endpointInstanceType="ml.m3.xlarge", endpointInitialInstanceCount=1) xgboost_estimator.setObjective('multi:softprob') xgboost_estimator.setNumRound(25) xgboost_estimator.setNumClasses(2)

I read the csv file with:
training_data = spark.read.csv("s3:https://poc.sagemaker.myfile/myfile.csv", sep=",", header="false", inferSchema="true")

training_data.show() gives:
+---+-------+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+ |_c0| _c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27| +---+-------+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+ | 0| 7.1732| 1| 0| 1| 2| 2| 2| 0| 0| 5| 1| 1| 0| 1| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 1.3087| 1| 0| 1| 2| 1| 1| 0| 0| 2| 1| 1| 0| 2| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 3.3539| 1| 0| 1| 2| 2| 1| 0| 0| 6| 1| 1| 0| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 1.9767| 1| 0| 1| 1| 1| 1| 1| 1| 73| 1| 0| 0| 1| 0| 3| 0| 0| 3| 0| 1| 0| 1| 1| 0| 1| 1| | 0| 5.7194| 1| 0| 1| 2| 1| 1| 0| 0| 3| 1| 0| 0| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 9.8398| 3| 1| 1| 2| 1| 1| 0| 0| 2| 1| 1| 0| 1| 0| 3| 0| 0| 3| 0| 2| 1| 1| 2| 1| 1| 1| | 0| 2.4942| 1| 0| 1| 2| 1| 1| 0| 0| 377| 1| 1| 0| 2| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 7.9179| 4| 1| 1| 2| 1| 1| 0| 0| 4| 1| 1| 0| 2| 0| 3| 0| 0| 3| 0| 2| 0| 1| 2| 1| 1| 1|

When I try to fit the xgboost model with:
xgboost_model = xgboost_estimator.fit(training_data)

The following exception returns:
Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 367, in <module> raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 360, in <module> exec(code, _zcUserQueryNameSpace) File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/SageMakerEstimator.py", line 253, in fit return self._call_java("fit", dataset) File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/wrapper.py", line 76, in _call_java java_value = super(SageMakerJavaWrapper, self)._call_java(name, *java_args) File "/usr/lib/spark/python/pyspark/ml/wrapper.py", line 51, in _call_java return _java2py(sc, m(*java_args)) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o130.fit. : java.io.IOException: Illegal schema for libsvm data, schema=StructType(StructField(_c0,IntegerType,true), StructField(_c1,DoubleType,true), StructField(_c2,IntegerType,true), StructField(_c3,IntegerType,true), StructField(_c4,IntegerType,true), StructField(_c5,IntegerType,true), StructField(_c6,IntegerType,true), StructField(_c7,IntegerType,true), StructField(_c8,IntegerType,true), StructField(_c9,IntegerType,true), StructField(_c10,IntegerType,true), StructField(_c11,IntegerType,true), StructField(_c12,IntegerType,true), StructField(_c13,IntegerType,true), StructField(_c14,IntegerType,true), StructField(_c15,IntegerType,true), StructField(_c16,IntegerType,true), StructField(_c17,IntegerType,true), StructField(_c18,IntegerType,true), StructField(_c19,IntegerType,true), StructField(_c20,IntegerType,true), StructField(_c21,IntegerType,true), StructField(_c22,IntegerType,true), StructField(_c23,IntegerType,true), StructField(_c24,IntegerType,true), StructField(_c25,IntegerType,true), StructField(_c26,IntegerType,true), StructField(_c27,IntegerType,true)) at org.apache.spark.ml.source.libsvm.LibSVMFileFormat.verifySchema(LibSVMRelation.scala:86) at org.apache.spark.ml.source.libsvm.LibSVMFileFormat.prepareWrite(LibSVMRelation.scala:122) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:140) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225) at com.amazonaws.services.sagemaker.sparksdk.internal.DataUploader.writeData(DataUploader.scala:111) at com.amazonaws.services.sagemaker.sparksdk.internal.DataUploader.uploadData(DataUploader.scala:90) at com.amazonaws.services.sagemaker.sparksdk.SageMakerEstimator.fit(SageMakerEstimator.scala:299) at com.amazonaws.services.sagemaker.sparksdk.SageMakerEstimator.fit(SageMakerEstimator.scala:175) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)

Did I miss some steps so that the estimator use the libsvm libraries to process the csv input?

@laurenyu
Copy link
Contributor

laurenyu commented May 2, 2018

Hi, thanks for using SageMaker Spark!

XGBoostSageMakerEstimator uses Spark's LibSVMOutputWriter, which is rather restrictive in its schema validation: https://github.com/apache/spark/blob/930b90a84871e2504b57ed50efa7b8bb52d3ba44/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala#L79

I think the issue stems from the number of columns in your training data? There was some discussion of extra columns in #12 - not sure if anything in that issue might be relevant here.

@haowang-ms89
Copy link
Author

Thanks for laurenyu's reply. I wonder whether XGBoostSageMakerEstimator use the verifySchema() provided here https://github.com/apache/spark/blob/930b90a84871e2504b57ed50efa7b8bb52d3ba44/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala#L79 even if the input is in csv format? The official guide https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html says that input can be libsvm or csv, and

For CSV training, the algorithm assumes that the target variable is in the first column and that the CSV does not have a header record.

So I think the data read in the csv way dose not need to fit the two column schema. In this example https://github.com/awslabs/amazon-sagemaker-examples/blob/master/introduction_to_applying_machine_learning/xgboost_customer_churn/xgboost_customer_churn.ipynb, the author also said that

Amazon SageMaker XGBoost can train on data in either a CSV or LibSVM format. For this example, we'll stick with CSV. It should:
Have the predictor variable in the first column
Not have a header row

I followed the example's steps to create the csv file it uses, the first 3 lines are:
0,106,0,274.4,120,198.6,82,160.8,62,6.0,3,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,1,0,1,0 0,28,0,187.8,94,248.6,86,208.8,124,10.6,5,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1,0,1,0 1,148,0,279.3,104,201.6,87,280.8,99,7.9,2,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1,0
Which looks quite like mine, also has more than 2 columns. However, this example uses sagemaker.estimator.Estimator (which is not available in my EMR+Zeppline environment) instead of sagemaker_pyspark.algorithms.XGBoostSageMakerEstimator. Doesn't XGBoostSageMakerEstimator support multi-column csv input yet? Or the sagemaker.s3_input() function used in the example somehow make the csv to be a 2-column structure?

@andremoeller
Copy link
Contributor

andremoeller commented May 3, 2018

@haowang-ms89

All SageMakerEstimators rely on Spark's DataFrame writers. The XGBoostSageMakerEstimator defaults to write data using "libsvm" format. Can you try passing in "csv" to "trainingSparkDataFormat" (or "com.databricks.spark.csv" if you're using spark-csv)?

@haowang-ms89
Copy link
Author

haowang-ms89 commented May 3, 2018

@andremoeller
Thanks a lot! Pass trainingSparkDataFormat="csv" when initializing XGBoostSageMakerEstimator and the exception no longer appears. But now I face the same issue reported here (which you added a bug tag) #27. I am using EMR+Zeppline environment instead of SageMaker's native notebook. I will ask in that thread. I will close this thread after I confirm there is no related issue. Thanks again!

@andremoeller
Copy link
Contributor

andremoeller commented May 3, 2018

@haowang-ms89

Sure! I just commented on that issue.

You will also have to pass in Some("csv") for the trainingContentType, or XGBoost will think you're trying to give it LibSVM data.

https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html

For Training ContentType, valid inputs are libsvm or csv.

@haowang-ms89
Copy link
Author

@andremoeller
After passing trainingSparkDataFormat="csv" and trainingContentType="csv", the model can be trained with the csv file. However, when trying to do inference, I removed the first column (the target value) from the csv file and serve it as the test data. I assume it is supposed to work since in this document https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html:

For CSV training, the algorithm assumes that the target variable is in the first column and that the CSV does not have a header record. For CSV inference, the algorithm assumes that CSV input does not have the label column.

So I think that the only difference between training and inferencing csv files is the first column. The question is that when I call:
transformed_data = xgboost_model.transform(test_data)
The following stacktrace appears:
Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 367, in <module> raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 360, in <module> exec(code, _zcUserQueryNameSpace) File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/SageMakerModel.py", line 408, in transform return self._call_java("transform", dataset) File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/wrapper.py", line 76, in _call_java java_value = super(SageMakerJavaWrapper, self)._call_java(name, *java_args) File "/usr/lib/spark/python/pyspark/ml/wrapper.py", line 51, in _call_java return _java2py(sc, m(*java_args)) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/pyspark/sql/utils.py", line 79, in deco raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace) IllegalArgumentException: u'Expecting schema with DoubleType column with name label and Vector column with name features. Got StructType(StructField(_c0,DoubleType,true), StructField(_c1,IntegerType,true), StructField(_c2,IntegerType,true), StructField(_c3,IntegerType,true), StructField(_c4,IntegerType,true), StructField(_c5,IntegerType,true), StructField(_c6,IntegerType,true), StructField(_c7,IntegerType,true), StructField(_c8,IntegerType,true), StructField(_c9,IntegerType,true), StructField(_c10,IntegerType,true), StructField(_c11,IntegerType,true), StructField(_c12,IntegerType,true), StructField(_c13,IntegerType,true), StructField(_c14,IntegerType,true), StructField(_c15,IntegerType,true), StructField(_c16,IntegerType,true), StructField(_c17,IntegerType,true), StructField(_c18,IntegerType,true), StructField(_c19,IntegerType,true), StructField(_c20,IntegerType,true), StructField(_c21,IntegerType,true), StructField(_c22,IntegerType,true), StructField(_c23,IntegerType,true), StructField(_c24,IntegerType,true), StructField(_c25,IntegerType,true), StructField(_c26,IntegerType,true))'

Seems that the tranform() function wants the "label" and "features" 2 column format. I tried to find some parameters to pass in the initializer like before but can't find one in the page you provided before. Thank you!

@andremoeller
Copy link
Contributor

andremoeller commented May 4, 2018

transform() is trying to convert your DataFrame to LibSVM for inference because the requestRowSerializer is set to be LibSVMRequestRowSerializer:

override val requestRowSerializer : RequestRowSerializer =
new LibSVMRequestRowSerializer(),

If you want to send CSV, you should use this UnlabeledCSVRequestRowSerializer and pass it in as the requestRowSerializer to XGBoostSageMakerEstimator:

https://github.com/aws/sagemaker-spark/blob/master/sagemaker-spark-sdk/src/main/scala/com/amazonaws/services/sagemaker/sparksdk/transformation/serializers/UnlabeledCSVRequestRowSerializer.scala

The UnlabeledCSVRequestRowSerializer serializes a column of type Vector to CSV. You can pass in what column name UnlabeledCSVRequestRowSerializer serializes, but it defaults to "features":

class UnlabeledCSVRequestRowSerializer(val schema : Option[StructType] = None,
val featuresColumnName : String = "features")

Right now, your DataFrame doesn't have such a column for a features vector, but you can make one with a VectorAssembler. After making an XGBoost estimator with UnlabeledCSVRequestRowSerializer:

import org.apache.spark.ml.feature.VectorAssembler
val assembler = new VectorAssembler()
// converts all columns in df to an appended column named "features"
// that holds a Vector
assembler.setInputCols(df.columns).setOutputCol("features")
val dfWithFeatures = assembler.transform(df)

// `UnlabeledCSVRequestRowSerializer` transforms on "features" column
myXGBoostEstimator.transform(dfWithFeatures).show()

Feel free to reach out if you run into trouble or if this was unclear.

@haowang-ms89
Copy link
Author

@andremoeller
Many thanks! I have a very basic question now. I initialize the estimator with:
xgboost_estimator = XGBoostSageMakerEstimator( trainingInstanceType="ml.m4.xlarge", trainingInstanceCount=1, endpointInstanceType="ml.m4.xlarge", endpointInitialInstanceCount=1, trainingSparkDataFormat="csv", trainingContentType="csv", requestRowSerializer= UnlabeledCSVRequestRowSerializer(), sagemakerRole=IAMRole(iamRole))
And I got NameError: name 'UnlabeledCSVRequestRowSerializer' is not defined

Which python library should I import to have UnlabeledCSVRequestRowSerializer()?

@andremoeller
Copy link
Contributor

@haowang-ms89
For PySpark, it's here:

class UnlabeledCSVRequestRowSerializer(RequestRowSerializer):
"""
Serializes according to the current implementation of the scoring service.
Args:
schema (StructType): tbd
featuresColumnName (str): name of the features column.
"""
_wrapped_class = "com.amazonaws.services.sagemaker.sparksdk.transformation." \
"serializers.UnlabeledCSVRequestRowSerializer"

@haowang-ms89
Copy link
Author

@andremoeller
Thanks a lot! My current question is that the assembled "features" looks strange. Here are 2 records of the assembled data frame:
| 3.3539| 1| 0| 1| 2| 2| 1| 0| 0| 6| 1| 1| 0| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0|(27,[0,1,3,4,5,6,...|

| 1.9767| 1| 0| 1| 1| 1| 1| 1| 1| 73| 1| 0| 0| 1| 0| 3| 0| 0| 3| 0| 1| 0| 1| 1| 0| 1| 1|[1.9767,1.0,0.0,1...|

The "features" column in line 1 contains 27, the number of features, but the feature vector is wrong (the feature vector of all vectors with the number 27 are [0,1,3,4,5,6,...]).
The second line seems to be fine but not having the number of features. I also found that the value in the column right before the "features" column (the 27th column named _c26) seems to relate to which pattern would be in "features". In the first 50 records I checked, if the value in the 27th column is 0, the "features" column looks like line 1. If the value in column 27 is 1, the "features" column looks like line 2.

I use the Python version of assembler I found here https://spark.apache.org/docs/2.3.0/ml-features.html#vectorassembler:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler( inputCols=test_data.columns, outputCol="features")
test_data_features = assembler.transform(test_data)

And test_data_features.show() gives:
+-------+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+--------------------+
| _c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26| features|
+-------+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+--------------------+
| 7.1732| 1| 0| 1| 2| 2| 2| 0| 0| 5| 1| 1| 0| 1| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0|(27,[0,1,3,4,5,6,...|
| 1.3087| 1| 0| 1| 2| 1| 1| 0| 0| 2| 1| 1| 0| 2| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0|(27,[0,1,3,4,5,6,...|
| 3.3539| 1| 0| 1| 2| 2| 1| 0| 0| 6| 1| 1| 0| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0|(27,[0,1,3,4,5,6,...|
| 1.9767| 1| 0| 1| 1| 1| 1| 1| 1| 73| 1| 0| 0| 1| 0| 3| 0| 0| 3| 0| 1| 0| 1| 1| 0| 1| 1|[1.9767,1.0,0.0,1...|

@andremoeller
Copy link
Contributor

andremoeller commented May 4, 2018

@haowang-ms89

That's normal. The VectorAssembler sparsely encodes vectors if there are lots of zeros in the data to save memory. The rows with 27 are SparseVectors. The 27 is the size of the array, followed by an array of indices, followed by an array of values. The densely encoded rows just have more nonzero values.

I believe the UnlabeledCSVRequestRowSerializer handles Sparse vectors correctly (that is, fills in the zeros when serializing to CSV).

@haowang-ms89
Copy link
Author

@andremoeller
Thanks for your patience. When I call:
xgboost_model.transform(test_data_features.select('features')).show()

The following exception occurs:
Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 367, in <module> raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 360, in <module> exec(code, _zcUserQueryNameSpace) File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/SageMakerModel.py", line 408, in transform return self._call_java("transform", dataset) File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/wrapper.py", line 76, in _call_java java_value = super(SageMakerJavaWrapper, self)._call_java(name, *java_args) File "/usr/lib/spark/python/pyspark/ml/wrapper.py", line 51, in _call_java return _java2py(sc, m(*java_args)) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/pyspark/sql/utils.py", line 79, in deco raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
IllegalArgumentException: u'Expecting schema with DoubleType column with name label and Vector column with name features. Got StructType(StructField(features,org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7,true))'

Does this mean that the transform() function take "label" column? But label should not be required when doing prediction?

@andremoeller
Copy link
Contributor

That looks like it's still using the LibSVM serializer, not the UnlabeledCSVRequestRowSerializer. The LibSVM serializer validates the schema like this:

throw new IllegalArgumentException(s"Expecting schema with DoubleType column with name " +
s"$labelColumnName and Vector column with name $featuresColumnName. Got ${schema.toString}")
}

Did you set xgboost_model.requestRowSerializer = UnlabeledCSVRequestRowSerializer() before transforming?

@haowang-ms89
Copy link
Author

haowang-ms89 commented May 5, 2018

@andremoeller
Thanks a lot! I forgot to train xgboost_model again after I added the UnlabeledCSVRequestRowSerializer() in the initializer. I am having this exception now when calling xgboost_model.transform(test_data_features.select('features')).show():

Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 367, in <module> raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 360, in <module> exec(code, _zcUserQueryNameSpace) File "<stdin>", line 1, in <module> File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 350, in show print(self._jdf.showString(n, 20, vertical)) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o1330.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 43.0 failed 4 times, most recent failure: Lost task 0.3 in stage 43.0 (TID 46, ip-10-104-118-9.us-west-2.compute.internal, executor 27): java.lang.NumberFormatException: For input string: "[0.8876568078994751" at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:284) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:29) at com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer$$anonfun$deserializeResponse$1.apply(XGBoostCSVRowDeserializer.scala:41) at com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer$$anonfun$deserializeResponse$1.apply(XGBoostCSVRowDeserializer.scala:41) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer.deserializeResponse(XGBoostCSVRowDeserializer.scala:41) at com.amazonaws.services.sagemaker.sparksdk.transformation.util.RequestBatchIterator.hasNext(RequestBatchIterator.scala:132) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1750) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1738) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1737) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1737) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:871) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1971) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1920) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1909) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:682) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484) at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252) at org.apache.spark.sql.Dataset.head(Dataset.scala:2484) at org.apache.spark.sql.Dataset.take(Dataset.scala:2698) at org.apache.spark.sql.Dataset.showString(Dataset.scala:254) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NumberFormatException: For input string: "[0.8876568078994751" at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:284) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:29) at com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer$$anonfun$deserializeResponse$1.apply(XGBoostCSVRowDeserializer.scala:41) at com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer$$anonfun$deserializeResponse$1.apply(XGBoostCSVRowDeserializer.scala:41) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer.deserializeResponse(XGBoostCSVRowDeserializer.scala:41) at com.amazonaws.services.sagemaker.sparksdk.transformation.util.RequestBatchIterator.hasNext(RequestBatchIterator.scala:132) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more

It looks like there is a parsing error? The bracket '[' should not stick with the number? java.lang.NumberFormatException: For input string: "[0.8876568078994751"
By the way, this number 0.8876... is not part of my data. Would it be some number produced in calculation or parameter in the model?

@andremoeller
Copy link
Contributor

andremoeller commented May 7, 2018

Hi @haowang-ms89 ,

com.amazonaws.services.sagemaker.sparksdk.transformation.deserializers.XGBoostCSVRowDeserializer$$anonfun$deserializeResponse$1.apply(XGBoostCSVRowDeserializer.scala:41)

This line indicates that the XGBoost CSV deserializer is failing to deserialize the response from the XGBoost model. That number (0.88765...) is apparently one of the predictions from the model, but the response doesn't seem to be formatted correctly as CSV, which the XGBoost endpoint should respond with. My current suspicion is that the model, in some cases, returns a multidimensional array that isn't being flattened before being joined with commas. I don't think using the CSV serializer has anything to do with it -- I modified the XGBoost notebook on Notebook Instances to use the CSV serializer, and still got back the same predictions, as expected.

Would it be possible to send request body you're sending to help us reproduce? I believe you can find it in the endpoint logs for your endpoint, in CloudWatch. In the meantime, I'll reach out to the developers of the XGBoost SageMaker algorithm.

Thanks!

@haowang-ms89
Copy link
Author

Hi @andremoeller
Our DevOps is helping me and we can't find the request body you mentioned. We find the SageMaker endpoint log here:
CloudWatch > Log Groups > /aws/sagemaker/Endpoints/endpoint-6b8e172a3395-2018-05-05T02-46-01-833 > AllRequests/i-0eb94294080366ebe
The log only contains:
/opt/amazon/lib/python2.7/site-packages/sage_xgboost/serve.py:117: ParserWarning: Falling back to the 'python' engine because the 'c' engine does not support sep=None with delim_whitespace=False; you can avoid this warning by specifying engine='python'. test = pd.read_csv(f.name, sep=None, header=None)
Thank you!

@andremoeller
Copy link
Contributor

Hi @haowang-ms89 ,

Huh, it's possible that they don't log failed requests. Thank you for that warning, though. I'll update this issue when I hear back from them.

@haowang-ms89
Copy link
Author

Hi @andremoeller
Will it be helpful if I provide my code and training/testing csv files?

@andremoeller
Copy link
Contributor

@haowang-ms89 ,

Yes, it sure would. If you can post it, I'll try to reproduce the issue.

@haowang-ms89
Copy link
Author

@andremoeller
Thanks for help! Here are the 2 csv files I use for training and testing (they are exactly the same except for the absent of label column in the testing file). I changed the file name to .txt due to the uploading constraint, simply change it back to .csv should be fine.

Here is the code I wrote:
from pyspark import SparkContext, SparkConf
import sagemaker_pyspark
from pyspark.sql import SparkSession
from sagemaker_pyspark import IAMRole, classpath_jars
from sagemaker_pyspark.algorithms import XGBoostSageMakerEstimator
from sagemaker_pyspark.transformation import serializers
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
classpath = ":".join(sagemaker_pyspark.classpath_jars())
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath).getOrCreate()

iamRole = "arn:aws:iam::000000000000:role/service-role/AmazonSageMaker-ExecutionRole-20180427T160487"
xgboost_estimator = XGBoostSageMakerEstimator(
trainingInstanceType="ml.m4.xlarge",
trainingInstanceCount=1,
endpointInstanceType="ml.m4.xlarge",
endpointInitialInstanceCount=1,
trainingSparkDataFormat="csv",
trainingContentType="csv",
requestRowSerializer= serializers.UnlabeledCSVRequestRowSerializer(),
sagemakerRole=IAMRole(iamRole))
xgboost_estimator.setObjective('multi:softprob')
xgboost_estimator.setNumRound(25)
xgboost_estimator.setNumClasses(2)

training_data = spark.read.csv("s3a:https://poc.sagemaker.hao0427/productionData0207_noheader_allnumber.csv", sep=",", header="false", inferSchema="true")
xgboost_model = xgboost_estimator.fit(training_data)

test_data = spark.read.csv("s3a:https://poc.sagemaker.hao0427/productionData0207_noheader_allnumber_notarget.csv", sep=",", header="false", inferSchema="true")
assembler = VectorAssembler(
inputCols=test_data.columns,
outputCol="features")
test_data_features = assembler.transform(test_data)
xgboost_model.transform(test_data_features.select('features')).show()

productionData0207_noheader_allnumber.txt
productionData0207_noheader_allnumber_notarget.txt

@andremoeller
Copy link
Contributor

Hi @haowang-ms89 ,

Thanks! I could reproduce this. I've contacted the XGBoost developers and asked them to take a look at what's going wrong.

@yijiezh
Copy link
Contributor

yijiezh commented May 14, 2018

Hi @haowang-ms89,

Thanks for sharing the details.

Is the issue here that you cannot get the predictions results on hosting?

@haowang-ms89
Copy link
Author

Hi @EvanZzZz
Yes, the exception appears after I call xgboost_model.transform(test_data_features.select('features')).show()
Thanks for your help.

@andremoeller
Copy link
Contributor

@haowang-ms89 ,

There's a bug in the XGBoost container with the multi:softprob objective. Multi-dimensional arrays aren't being serialized from the hosting container back to the client in the expected format. The XGBoost team is working on a fix, but I can't give you an ETA.

Other objectives (those that return a scalar per record rather than a vector) still work as expected. You could also call InvokeEndpoint directly using the AWS Java SDK or boto3 client (or another AWS client) for SageMaker Runtime.

Please let us know if you have any other questions.

Thanks!

@haowang-ms89
Copy link
Author

Hi @andremoeller
Thanks! I guess since the target to be predicted now is either 1 or 0, maybe I can try a single return value objective as the probability of being 1. I found the following choices in the source code ("reg:linear", "reg:logistic", "binary:logistic", "binary:logistraw", "count:poisson", "multi:softmax", "multi:softprob", "rank:pairwise", "reg:gamma", "reg:tweedie")
Is binary:logistic I should use for now?

@andremoeller
Copy link
Contributor

@haowang-ms89 ,

Yeah, I think that's right. Hyperparameters are passed in to XGBoost just as documented on the XGBoost GitHub page: https://github.com/dmlc/xgboost/blob/master/doc/parameter.md

@haowang-ms89
Copy link
Author

@andremoeller
Thanks for the information! I will try it later and report the results.

@haowang-ms89
Copy link
Author

@andremoeller
It works fine with binary:logistic setting, thank you. I have a general question: is there a way that model.fit() output the training error/loss during training?

@andremoeller
Copy link
Contributor

@haowang-ms89 ,

Do they show up in your CloudWatch logs for your XGBoost training job? If not, we won't be able to get them, but if so: streaming logs from CloudWatch to Spark is possible, but just not implemented.

@andremoeller
Copy link
Contributor

Labeling this as a bug and keeping this open to track the new output format for XGBoost for multi-dimensional arrays.

@DanyalAndriano
Copy link

Hi there,

I am looking to use the multi:softprob objective - I have a multiclass problem and this objective is needed. Has this bug been fixed?

@pauloarantes
Copy link

Hello, I'm on the same situation as @DanyalAndriano and can't make multi:softprob work correctly, any updates on this issue? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

6 participants