X

Oracle AI & Data Science Blog
Learn AI, ML, and data science best practices

  • November 19, 2020

A Simple Guide to Leveraging Parallelization for Machine Learning Tasks

JR Gauthier
Sr Principal Product Data Scientist

Typically, you want to optimize the use of a large VM hosting your notebook session by parallelizing the different workloads that are part of the machine learning (ML) lifecycle. For example, doing extract-transform-load (ETL) operations, data preparation, feature engineering, or model training can be parallelized. You don't want to use a VM.Standard2.24 shape with 24 ocpus and realize that your code is only using one of those CPUs. That would be a costly and suboptimal use. 

This blog post covers a few options that are available to a data scientist who wants to parallelize a workload done on a data frame. It covers approaches that offer multi-threading and multiprocessing execution. Each method provides benchmarks in terms of speed of execution that you can run in your notebook session.

Only data frames loaded in memory of a single VM are presented. All my tests have been run on a VM.GPU3.2 instance in the OCI Data Science service. Even though this VM contains 2 V100 GPU cards, all computations were executed with the CPUs. A follow up post will include a comparison with GPUs. 

Specifically, I will cover the following approaches: 

  • Using Pandas directly with two threads
  • Using Dask with threads and separate processes
  • Using Modin with a Ray backend
  • Using multiprocessing.Pool to launch separate processes 
  • Using joblib.parallel to launch separate threads and processes 

Note: I excluded Pandarallel from this post since it is not yet supported in the notebook session environment. We hope to support it soon. 

You can download a notebook version (.ipynb) of this blog post here. We highly recommend to use the notebook version if you want to reproduce or tweak the benchmarks. 

Before we start comparing different approaches, let's consider a few key factors impacting the performance of a multi-threaded/multi-processes execution. 

Data Size Matters 

Before you jump into re-factoring your code to support multiple threads or processes, you need to ask yourself if the dataset you're working with is large enough to justify such re-factoring. When the dataset is relatively small, you will not gain much speed (or none) by using either a multi-threading or multi-processing approach. In most cases, I recommend that you start with a simple benchmark run on a sample of your data and increase the size of the sample progressively. That helps you decide whether re-factoring is needed or not.

When Possible, Vertical Scaling Always Beats Horizontal Scaling

As much as possible, you will want to maximize the utilization of a single VM (aka vertical scaling) before distributing your workload across multiple VMs (aka horizontal scaling). Horizontal scaling is often used when the amount of data to process exceeds the memory footprint of a single VM. The downside of horizontal scaling is the time taken for messages to be exchanged between VMs. Network bandwidth can be a bottleneck and slow down your computation.  

Consider Using GPUs 

GPUs can significantly speed up your workloads when tasks are executed on the GPU. The NVIDIA V100 generation of GPUs (available on the VM.GPU3 family of notebook session VMs) has 5120 CUDA and 640 Tensor cores! Most libraries, including the deep learning focused ones (PyTorch, TF, mxnet) but also XGBoost and NVIDIA RAPIDs, can leverage GPUs and execute data prep and model training tasks much faster. We observed a speedup of 100x when running XGBoost on a single GPU vs a multi-threaded execution on 24 CPU cores. 

Which Should I Use: Threads or Processes?

Python multiprocessing generally does not share memory. That means each process will require its own dataset loaded to memory. Most of the time, data scientists will divide a dataset into separate partitions or chunks and assign those to separate processes. The downside of this approach is that it requires a larger memory footprint than if the data is shared. In addition, your function needs to be serialized in order to be shared across Python processes. The same goes with the input and output data of that function. This introduces additional delay and reduces performance. 

For most workloads run by data scientists, threads are better than processes. In Python, the global interpreter lock (GIL) controls which thread access a Python object in memory. Effectively, the GIL only allows one thread at a time, rendering multithreading execution impossible. However, most machine learning and scientific libraries used by data scientists (Numpy, Pandas, scikit-learn, and so on) release the GIL, effectively allowing multithreaded execution on separate workers. Another thing to keep in mind is that, when your dataset is large, threads make more sense to use than processes because of the potential memory limitations you may encounter when distributing your dataset to different processes.

In other words, use threads whenever possible. 

Before you start running the benchmark, execute the following cell in your notebook to install all the necessary libraires: 

!pip install pandas==1.1.4
!pip install dask==2.16.0
!pip install modin[ray]==0.8.2

Benchmarks

Let's start running some benchmarks! 

The first thing to do is to generate a synthetic dataset using the scikit-learn make_classification() function. You generate a dataset of 25M rows and 50 columns. The dataset has a total memory footprint of about 9.3GB. Adjust the number of rows according to the size of your notebook session VM. 

from sklearn.datasets import make_classification
import pandas as pd
from random import random 
import numpy as np 
import os

from joblib import Parallel, delayed
import multiprocessing
from multiprocessing import  Pool

# Parameters of the synthetic dataset: 
n_samples = 25000000
n_features = 50 
n_informative = 12
n_redundant = 10 
n_classes = 2

df = make_classification(n_samples=n_samples,
                         n_features=n_features,
                         n_informative=n_informative,
                         n_redundant=n_redundant,
                         n_classes=n_classes)

pandas_df = pd.DataFrame(df[0])
pandas_df.columns = [f"col{x}" for x in range(pandas_df.shape[1])]

pandas_df.head()

The data frame contains only float values. If you execute the top commands in the JupyerLab terminal window while the `make_classification()` cell is executed, you will notice that the process is actually using more than 100% of the CPU. The screenshot below was taken during the execution of the make_classification() command. You can see a single process using over 500% CPU. That is a a sign of threads executed by multiple workers running in a single process. In fact, 24 threads are executed at the peak of the `make_classification()` computation on the VM shape used for this benchmark. 

 

Pandas 

Pandas is actually quite fast when the data fits nicely in memory. In the cell below, I apply a simple transformation to a column of the data frame. By default, Pandas executes a single process using a single CPU core. Note that each OCPU can execute two threads in the same process (i.e. 2 vCPUs). You can confirm that by running top in your terminal. You should see two vCPUs running close to 100% and two threads running most of the time. The same operation (adding a random value to a column) is repeated throughout this post as a baseline for comparison purposes. 

%%time
pandas_df['col0'] = pandas_df['col0'].apply(lambda x: x + 100.0 + random())

print(pandas_df.head())

This transformation takes approximately 19 seconds in my notebook session with two threads running. 

Dask

Dask is a flexible library for parallel computing in Python, and it is the backbone of the Oracle ADS library. Dask has a data frame object that is composed of parallel Pandas data frames that are split based on the index value of the pandas data frames. Dask uses multithreaded scheduling by default when dealing with arrays and dataframes. You can always change the default and use processes instead. In the code below, we use the default thread scheduler: 

from dask import dataframe as ddf 

dask_df = ddf.from_pandas(pandas_df, npartitions=20)
dask_df = dask_df.persist()

Then I execute the same operation on the Dask data frame: 

%%time 

dask_df['col1'] = dask_df['col1'] + 100.0 + random()
print(dask_df.compute().head())

This provides a better performance. It takes approximately 6 seconds in my notebook session with multiple threads running. 

You can use threads instead by configuring dask: 

dask.config.set(scheduler='processes')

and execute the same code: 

%%time 

dask_df['col1'] = dask_df['col1'] + 100.0 + random()
print(dask_df.compute().head())

There is a big difference between threads and processes. This code took 51 seconds to run, and is similar to what you get using multiprocessing.pool.

 

Modin

Modin is a data frame library that wraps around Pandas API and distributes the data and computation completely transparently. It is very convenient to use because you only have to change one line of your Pandas code to run it with Modin. Modin supports two backends (Ray and Dask) to speedup the processing of Pandas data frames.  

First, make sure to use Ray as the backend for Modin. Use a different backend than Dask, since Dask was used in the preceding. Also, ensure that the computation is run on CPUs and not on GPUs. You will most likely get a PermissionError in your notebook when you run the cell below, but that is ok. Modin is trying to access the GPUs, but we do not want the computation to be done on GPUs. 

os.environ['MODIN_ENGINE'] = "ray"
import ray 
ray.init(num_gpus=0)
import modin.pandas as mpd

Convert a pandas data frame into a modin data frame: 
modin_df = mpd.DataFrame(pandas_df)

Now the simple benchmark transformation: 

%%time 

modin_df['col2'] = modin_df['col2'] + 100.0 + random()
print(modin_df.head())

You get great performance! It takes less than 1 second in my notebook session with multiple threads running. 

Multiprocessing.pool 

Next, we look at the multiprocessing module, which is part of the Python Standard library. Multiprocessing offers the ability to spawn multiple processes using a simple API. It allows data scientists to leverage multiple cores on a machine and is very flexible. I have used multiprocessing in the past for expensive row-wise computations on data frames, such as calculating fast Fourier transforms (FFT) on audio waveform data. It had a large array of audio clips that I split and sent to different cores for processing. This reduced the computation time from 45 minutes on two cores to 10 minutes on 24 cores. Although I did not try a multithreading approach, I believe it would have performed comparatively well. 

The catch is that we need to split the data frame into separate chunks that get manipulated by each process. First, define a function that will parallelize the task (parallelize_dataframe()) and a definition of the task itself (simple_transformation()).

def parallelize_dataframe(df, func, num_cores=2):
    """ Utility function that distributes the application 
    of function func on dataframe df by using Pool()
    """
    dfs = np.array_split(df, num_cores)
    with Pool(num_cores) as pl: 
        df = pd.concat(pl.map(func, dfs))
        pl.close()
        pl.join()
    return df

def simple_transformation(df): 
    """Add random value to a column 
    """
    df['col3'] = df['col3'].apply(lambda x: x+ 100.0 + random())
    return df

Next, execute the following: 

%%time

num_cores = multiprocessing.cpu_count()
print(f"nb of cores used {num_cores}")
pool_df = parallelize_dataframe(pandas_df, simple_transformation, num_cores=num_cores)
pool_df.head()

Here, the performance is worse than Pandas. The cell above takes approximately 57 seconds to run. The cost of using multiple processes is quite obvious here compared to threads. You can try with a different number of cores (num_cores) and see what happens. 

 

Joblib Parallel

Joblib Parallel is used throughout scikit-learn to parallelize a workload. It provides a simple API to parallelize for loops using the multiprocessing module. By default, Joblib Parallel runs each function call in a separate process, though Joblib Parallel also supports a multithreading mode. You can overwrite the selection by changing the prefer parameter to "threads" like I do here:  

%%time 
df2s = np.array_split(pandas_df, num_cores)
results = Parallel(n_jobs=num_cores, prefer="threads")(delayed(simple_transformation)(x) for x in df2s )
df3 = pd.concat(results)
df3.head()

The cell above takes approximately 42 seconds to run. It is definitely faster than using processes (try removing the prefer parameter) but nowhere near the speedup you gain by using either Dask directly or Modin with a Ray backend. 

This post reviews a few different ways of parallelizing a transformation done on a data frame object in memory of a single VM running in the OCI Data Science service. These different approaches give you the tools to maximize your use of large VMs with multiple ocpus.

 

Keep in Touch! 

-    Visit our website

-    Visit our service documentation

-    (Oracle Internal) Visit our slack channel #oci_datascience_users

-    Visit our YouTube Playlist

-    Visit our LiveLabs Hands-on Workshop

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.