Save and load Random Forest Model in Pyspark script on amazon S3

Hi,

I am training using Random Forest along with cross validation in pyspark script. I want to save the model in amazon s3 so that I could load the model in later stage.I have used the following script to save in S3

import tempfile
import boto3
import joblib

s3 = boto3.resource('s3')

bucket_name ='my-bucket'
key = "model.pkl"

# WRITE
with tempfile.TemporaryFile() as fp:
	pickle.dump(cvModel, fp)
	fp.seek(0)
	s3_resource.put_object(Body=fp.read(), Bucket=bucket_name, Key=key)

But it is giving me this error:
py4j.py4jexception method getstate( ) does not exist

Kindly let me know is there a way to save and load my model on amazon S3

Hi @Anushka

all spark.ml models have a save() method where you can pass in a s3:// URL. Have you tried that?

I am sceptical that pickling the model object does the right thing, as some model types also hold references to DataFrames (which are distributed).

Hi,

Thanks for clarifying.
I tried using save() method but end up getting this error

py4j.protocol.Py4JJavaError: An error occurred while calling o1462.save.
: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: A5DB5B90701619A7; S3 Extended Request ID: NIxtS3INMbF3hJEYEKeB1h5OUgvbyezhEYLlVw5502TN3lIB5Ld23Hy3QjQnOuOZ+SBVshgWFmo=), S3 Extended Request ID: NIxtS3INMbF3hJEYEKeB1h5OUgvbyezhEYLlVw5502TN3lIB5Ld23Hy3QjQnOuOZ+SBVshgWFmo=
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1742)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1371)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1347)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1127)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:784)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:752)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5052)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4998)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1335)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:22)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:8)
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:114)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:189)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:184)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:96)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:43)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.getFileMetadataFromCacheOrS3(Jets3tNativeFileSystemStore.java:497)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:223)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:590)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1440)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:357)
at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:696)
at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:179)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

Kindly let me know how to resolve this

Hi @Anushka,

looks like you try to save the model from PySpark snippet running on EMR? You can verify that the EMR cluster can read and write to the S3 bucket using the Spark to Parquet and Parquet to Spark nodes. If this works, i would suggest to post your python code here and take a look into the spark logs that might contains a more detailed error message.

1 Like

Yes, I am running the PySpark script on EMR cluster. Do I have to save it using Spark to Parquet?
If yes, how can I save a model using it as Spark to Parquet node is used to store dataframes?

I suggest to write some test data using the Spark to Parquet node and then read it back to ensure the EMR cluster has the permissions to read/write data to your S3 bucket. You can use the Table Creator node to create some test data instead of your PySpark snippet.

If everything works, continue to get your PySpark snippet working. As you already mention, the Spark to Parquet node does not help here and you have to use the save method. If this still does not work, ensure your output path matches the bucket name and does not have any special characters in it.

The EMR cluster has the permission to read/write data. I re-checked it using Spark to Parquet node. The output path for saving the model is the same as the one used to save the test data file.
It is working fine on the test data but not on the models. I am not sure if the output path is the issue.
This is the output path which I am using
s3://‘my-bucket-name’/s3-insightsmax-orch/NBA_Output_folder/Production/ModellingScenariosResults/ModelTrainStructure/

The code which I am using to save the model:
cvModel.write().save(output_path)
I even run the code without write function but it showed the same error

It seems like I am trying to read/write the models/dataframe directly from Pyspark script(EMR clusters) to Amazon S3 without bringing it into the KNIME environment.
Is it mandatory to bring the models/dataframe into KNIME environment before reading/writing them on S3.

If yes, how we can achieve model read/write in PySpark script since it allows only dataframes