Saving and Retrieving ML Models Using PySpark in Cloud Platform

S M Nahian Al Sunny
Walmart Global Tech Blog
8 min readMay 10, 2022

--

Machine learning pipeline for cloud applications

PySpark has become a preferred platform to many data science and machine learning (ML) enthusiasts for scaling data science and ML models because of its superior and easy-to-use parallel computing capabilities and its ability to easily integrate with external Python libraries. Oftentimes, it is required to save a model after training to be retrieved and used later by other application(s). In this article, I will explain two ways to save and retrieve ML models using PySpark. The first method uses Spark’s native MLlib module while the second one is based on a custom approach that is focused primarily on storing a large number of model files using the least amount of storage. These methods were applied and validated in Google Cloud Platform’s (GCP) dataproc cluster and storage, so the code snippets shared below are mostly specific to GCP.

This article assumes that you are already familiar with using PySpark with Pandas UDF. If not, here is a quick and simple summary. Also, this article skips some trivial steps, like data cleaning, as the primary focus here is saving and loading ML models.

Getting the Data

The dataset here is stored in Google Storage (GS) as parquet files. The following code snippet loads the dataset into a PySpark DataFrame.

sc = SparkContext()#........Read data from parquet files........#
dataDir = "gs://mlmodels/data/*"
dataTable = spark.read.parquet(dataDir)
dataTable.createOrReplaceTempView("inputTable")
inputDf = spark.sql('select * from inputTable ')

Method 1 — Using PySpark MLlib Module

MLlib is Spark’s native API for building ML models at scale. The following code snippet first splits the input data into training and testing sets and then trains a Random Forest (RF) model using training data. (Note: The parameter values should be chosen according to input data.)

from pyspark.mllib.tree import RandomForest, RandomForestModel

# Split the data into training and test sets
(trainingData, testData) = inputDf.randomSplit([0.8, 0.2])


model = RandomForest.trainRegressor(trainingData, numTrees = 10, categoricalFeaturesInfo={}, featureSubsetStrategy="auto",
impurity='variance', maxDepth=5)

Saving the trained model is quite simple using the save method:

model.save(sc,"gs://mlmodels/models/myRF")

The resulting directory contains several folders with multiple Parquet files. Depending on the size and complexity of the data, the number of files can vary (in my case, there were almost 350 files for each RF model).

To retrieve a previously saved model, we need to use the load method:

savedModel = RandomForestModel.load(sc,"gs://mlmodels/models/myRF")

Method 2 — Using joblib and gsutil

Why is the second method needed?

As you can see, using MLlib is easy and straightforward. But saving and retrieving models with this method is a bit time- and resource-consuming. This may not be an issue for most use cases, but if you are building an application that has to generate, save, and load multiple models (in my case, 100+ models) as fast as possible using as little storage as possible, then using MLlib may not be the best choice. That brings us to the second approach.

For this method, we are grouping our dataset based on their ID and then applying Pandas UDF to create an RF model for each group. For each group, first its dataframe is divided into a training and a testing set (80:20 ratio). Then the RF model is instantiated and trained with a training dataset.

from sklearn.model_selection import train_test_split#........Define schema for output dataframe here........#
outSchema = StructType([...])
#........Define pandas UDF........#
@pandas_udf(outSchema, PandasUDFType.GROUPED_MAP)
def generate model(df):
label = np.array(df['label'])
features = df.drop('label', axis = 1)
# splitting training and testing data
train_features, test_features, train_labels, test_labels = train_test_split(features, label, test_size = 0.2, random_state = 42)

# Instantiate model with decision trees
model = RandomForestRegressor(bootstrap = True, max_depth = 5, max_features = 'auto', min_samples_leaf = 2, min_samples_split = 8, n_estimators = 10, oob_score = True, random_state = 42)

# Train the model on training data
model.fit(train_features, train_labels)
...outputDf = inputDf.groupby('id').apply(generate_model)

Saving models

Now that we have our model, it’s time to save it. Let’s divide the problem into two parts: 1) we need to find a way to save the model object into file(s), and 2) upload the corresponding file(s) to the GS bucket.

For the first part, we are using a package called joblib. The joblib.dump() method allows us to compress and store an object into a single file. It takes three parameters — the object to be dumped, the directory location where it should be dumped, and the compression algorithm to be used while dumping. I am using compress = 3as according to the joblib docs, this algorithm is a good compromise between size and speed. For now, let’s save the model in a local directory and name it based on the corresponding group’s ID.

import joblibfileName = str(id) + "_rf.joblib"
joblib.dump(model, "./" + fileName, compress = 3)

This creates a single .joblibfile instead of multiple Parquet files as done in method 1. The size depends on the amount of training data, but it is still much smaller than the total size of MLlib’s generated files for the same model. Also, the execution time is much faster here as well.

Once we have our model file, it’s time to upload it. Surprisingly, this was the part that I was stuck on for the majority of my project! If this was a Python project, not PySpark, the solution would be simple. We would use GCP’s storage library and upload the model file from the local directory to the GS bucket, like this:

from google.cloud import storagesrcPath = "./" + fileName
bucketName = "mlmodels/"
storage_client = storage.Client()
bucket = storage_client.bucket(bucketName)
blob = bucket.blob('models')
with open(srcPath, "w") as f:
blob.upload_from_file(f)

But for PySpark, the process is rather complex. The first issue is the way Pandas UDF are handled by PySpark. For scalability, Spark distributes the dataset groups to worker nodes. So, the model files are created in the worker’s local storage. But the storage library only works if executed from the master node (not sure why, but that’s what happened in my case!). To upload files from worker nodes, we need a different approach. When I ssh-ed into individual nodes via terminal, I found that gsutil commands (cpor rsync) were functional. To use gsutil commands in our application, we need to use the Popen method from the subprocess package. In PySpark, the default local directory for application files is /hadoop/spark/tmp/. Our code goes like this:

from subprocess import Popen, PIPEsrcPath = "/hadoop/spark/tmp/" + fileName
destPath = "gs://mlmodels/models/"
p = Popen("gsutil -m rsync -r "+srcPath+" "+destPath, shell=True, stdout=PIPE, stderr=PIPE)

But after using this, I found that only the models that were saved in the master node were uploaded! The same command worked when sent via the terminal (ssh-ing to a worker node) but did not work from the application! I literally had no idea why! After many days of searching through countless online resources, I finally found the solution here, courtesy of Dennis Huo! Here is his explanation:

For things running in YARN containers, despite being run as user yarn, where if you just run sudo su yarn you'll see ~ resolve to /var/lib/hadoop-yarn on a Dataproc node, YARN actually propagates yarn.nodemanager.user-home-dir as the container's homedir, and this defaults to /home/. For this reason, even though you can sudo -u yarn gsutil ..., it doesn't behave the same way as gsutil in a YARN container, and naturally, only root is able to create directories in the base /home/ directory.

The solution is rather simple — we need to add HOME=/var/lib/hadoop-yarn right before our gsutil command. (Note: Dennis suggested other solutions, which I didn’t try; read his answer for more details.)

p = Popen("HOME=/var/lib/hadoop-yarn gsutil -m rsync -r "+srcPath+" "+destPath, shell=True, stdout=PIPE, stderr=PIPE)

And voilà! Worked like a charm! All my model files from all worker nodes were uploaded to the designated GS bucket.

Here are some more tips that I discovered while figuring this out:

i) Using /hadoop/spark/tmp/as the directory to save model files initially can be risky, as this is Spark’s default temporary directory and some other process may overwrite your files. Instead, use some other directory. But most directories in a worker node are write-protected, so you may need to spend time to find the appropriate one. In my case, I found /usr/local/man/ to be the one.

ii) Because of Spark’s “lazy” and parallel computation, it’s difficult to predict when the model files will be created and where. If you upload the model files immediately after dumping, you may end up missing some files. The reason is the corresponding file was not yet created when the upload code was being executed. So it’s better to generate and dump the model files in one UDF and then upload the model files in another. This will ensure that all files are completely dumped before uploading. The final code snippet is:

#........Define schema for output dataframe here........#
outSchema = StructType([...])
uploadSchema = StructType([...])
#........UDF for generating and dumping models........#
@pandas_udf(outSchema, PandasUDFType.GROUPED_MAP)
def generate model(df):
label = np.array(df['label'])
features = df.drop('label', axis = 1)
# splitting training and testing data
train_features, test_features, train_labels, test_labels = train_test_split(features, label, test_size = 0.2, random_state = 42)

# Instantiate model with decision trees
model = RandomForestRegressor(bootstrap = True, max_depth = 5, max_features = 'auto', min_samples_leaf = 2, min_samples_split = 8, n_estimators = 10, oob_score = True, random_state = 42)

# Train the model on training data
model.fit(train_features, train_labels)
fileName = str(id) + "_rf.joblib"
joblib.dump(model, "/usr/local/man/" + fileName, compress = 3)
...
#........UDF for uploading models to gcs bucket........#
@pandas_udf(uploadSchema, PandasUDFType.GROUPED_MAP)
def upload_model(df):
destPath = gs://mlmodels/models/
srcPath = "/usr/local/man/*"
p = Popen("HOME=/var/lib/hadoop-yarn gsutil -m rsync -r "+srcPath+" "+destPath, shell=True, stdout=PIPE, stderr=PIPE)
return pd.DataFrame({"comment":"Success"},index=[0])
outputDf = inputDf.groupby('id').apply(generate_model)
uploadDf = outputDf.groupby('id').apply(upload_model)

Note that Pandas UDF always needs to return a dataframe. In the second UDF, even though we have nothing to return, we are returning a dummy dataframe. Also, outputDf should have enough groups so that the second UDF is executed in all worker nodes.

iii) Again, because of Spark’s “lazy” and parallel computation, the second UDF may return before all files have finished uploading (i.e., the upload was initiated but hasn’t finished yet). So it’s better to add a delay afterwards, just to be sure.

Retrieving models

To retrieve model files previously saved to GS using the above method, we need to download the files from GS to cluster nodes (assuming that we will be using all model files). The first step is to get the list of files in GS bucket and then we can add them to the Spark context one by one. This will download all files to a master node. The code is given below:

from subprocess import Popen, PIPEsrcPath = "gs://mlmodels/models/"
p = Popen("gsutil ls " + srcPath, shell=True, stdout=PIPE, stderr=PIPE)
saved_files = str(p.stdout.read())[2:-1].split("\\n")[1:-1]
for filePath in saved_files:
sc.addFile(filePath)

Once all files are downloaded, we can use joblib.load() to load the model. We need to provide the local path of the corresponding model file, which can be retrieved by SparkFiles.get() method. As all files are stored in the master node, this method can also be used in Pandas UDF, as all worker nodes have access to the master node’s filesystem.

from pyspark import SparkFiles model = joblib.load(SparkFiles.get(fileName))

Conclusion

The code for the second method may seem more complicated than using MLlib, but it is much more efficient in terms of execution time and required storage when generating a large number of models. A word of caution though — in my case, each group’s dataset was not very large, so the second method may lead to an out-of-memory exception for very large datasets.

It took me quite a while to complete as there were multiple obstacles to overcome, but I enjoyed the journey. I sincerely hope this article will help someone find the solution to a similar problem much faster. Please drop a comment if you have any suggestions or issues.

Resources

  1. MLlib — https://spark.apache.org/mllib/
  2. Google Cloud Dataproc — https://cloud.google.com/sdk/gcloud/reference/dataproc
  3. Google Cloud Storage — https://cloud.google.com/storage
  4. Multi Time Series Forecasting in Spark — https://medium.com/walmartglobaltech/multi-time-series-forecasting-in-spark-cc42be812393
  5. gsutil Tool — https://cloud.google.com/storage/docs/gsutil

--

--

S M Nahian Al Sunny
Walmart Global Tech Blog

Software Engineer @ Walmart Global Tech, PhD in Computer Engineering @ University of Arkansas