Announcing ML pipelines for automating machine learning workflows

January 26, 2023 | 9 minute read
Tzvi Keisar
Director of Product Management
Text Size 100%:

Machine learning (ML) pipelines are a crucial component of the modern Data Science workflow. They help automate the process of building, training, and deploying machine learning models, allowing data scientists to focus on more important tasks such as data exploration and model evaluation.

Machine learning, by nature, is a highly repetitive, iterative process. Changing and evolving data requires models to be continuously retrained to keep prediction accuracy high. The workflow itself, however, remains mostly the same. Here, machine learning pipelines comes in.

We’re excited to announce machine learning pipelines, a new service-managed capability in Oracle Cloud Infrastructure (OCI) Data Science to build, manage, and run machine learning pipelines.

What is a machine learning pipeline?

At a high level, an ML pipeline consists of several steps, each of which performs a specific task, working together to complete a workflow. For example, the first step might be data preprocessing, where raw data is cleaned and transformed into a format that can feed into an ML algorithm. The next step might be model training, where the algorithm is trained on the processed data to learn the patterns and relationships within it. You can run steps in sequence or in parallel, speeding up the time to complete the workflow.

One of the key advantages of using ML pipelines is the ability to easily repeat and reproduce the entire workflow. This capability ensures the reliability and reproducibility of the results and make it easier to experiment with different algorithms and parameters finding the best model for a given problem.

After you have completed your model experimentation phase, you can deploy your model to preproduction or production environments. This phase is delicate and complex because it involves many moving parts, including maintainability, scalability, and governance challenges. In this phase, many data science projects fail.

ML pipelines help automate the process of collecting and preparing the data, training the model, evaluating, and deploying, making it easier to add new capabilities to the process and repeatedly test it end to end or in parts.

Many ML use cases can benefit from using pipelines, including the following examples:

  • Healthcare providers can use an ML pipeline to predict patient outcomes and provide personalized treatment plans. The pipeline includes stages for data cleaning, feature engineering, model training, and model deployment, allowing the provider to identify high-risk patients and provide them with the appropriate care quickly and accurately.

  • Retail companies can use an ML pipeline to predict customer behavior and recommend products to them. The pipeline includes the entire workflow from importing data all the way to deploying models for real time serving, allowing the company to make personalized product recommendations to customers and adapt to changes in customer data quickly.

With this new addition to the Data Science service arsenal, you now have the following capabilities:

  • Create an ML pipeline by defining the workflow of the steps.

  • Write reusable code for each pipeline step or use existing ML jobs as steps.

  • Run the pipeline and set parameters for each run.

  • Monitor the process of the pipeline and review logs outputted from the steps.

Create and run an ML pipeline in OCI Data Science

You can create, manage, and run ML pipelines from a notebook in the Oracle Cloud Console through the Python software developer kit (SDK) and from the CLI using the APIs. In this example, we focus on using Jupyter notebooks with the Accelerated Data Science (ADS) SDK. It includes easy-to-use interfaces for data scientists to work from their familiar notebook environment. For the required prerequisites and full list of capabilities, check out the documentation.

The sample notebook is based on the employee attrition dataset, predicting the probability that an employee leaves, based on a set of attributes. The complete notebook is available in the Data Science GitHub samples repository.

Creating the pipeline steps

In this pipeline, we import the data, prepare it for machine learning training, train three different models in parallel, and then compare them to deploy the best model.

Data preparation

employee-attr-dataproc.py contains code to import the CSV file, apply one-hot encoding and feature scaling transformations, remove unnecessary columns, and then split the data into train dataset and test dataset. Those datasets are then saved into an OCI Object Storage bucket.

Model training

employee-attr-train-lr.py, employee-attr-train-rf.py, employee-attr-train-xgb.py perform Logistic Regression, Random Forest, and XGBoost model training. The same train and test datasets from the data processing step are loaded. Then each model is saved into the model catalog with its area under ROC curve (AUC) score.

Model evaluation and deployment

employee-attr-eval-deploy.py reads the AUC scores of the models, selects the best, highest-AUC model, and deploys that model.

How to create an ML pipeline

Now, let’s create the pipeline with the steps to do an end-to-end workflow. To speed up the workflow, we let the models train in parallel because they don’t have dependencies between them. Their only dependency is on the data.

Set your resources with the following details:

project_id = "YOUR_PROJECT_ID"
log_group_id = "YOUR_LOG_GROUP_ID"
pipeline_name = "YOUR_PIPELINE_NAME"

Define the steps and create the pipeline’s directed acyclic graph (DAG). The DAG is the definition of the workflow. Steps can run in sequence or in parallel as long the DAG has no cycles.

(Click here to expand the code)
from ads.pipeline.ads_pipeline_step import PipelineStep
from ads.pipeline.ads_pipeline import Pipeline
from ads.pipeline import CustomScriptStep
from ads.jobs import ScriptRuntime

# use the same infrastructure for all steps to save some code. You can choose to use a different infrastructure for each step.
infrastructure = (
    CustomScriptStep()  # using a python script as the step code. ML Jobs and Notebook files are supported as well.
    .with_block_storage_size(50)
    .with_shape_name("VM.Standard2.4")
)
 
# define the data processing step
step_data_processing = (
    PipelineStep("data_processing")
    .with_description("Import data, feature engineering, train-test split")
    .with_infrastructure(infrastructure)
    .with_maximum_runtime_in_minutes(30)
    .with_runtime(
        ScriptRuntime()
        .with_source("employee-attr-dataproc.zip")  # this is the step’s artifact. It includes all the files necessary to execute the step.
        .with_service_conda("onnx110_p37_cpu_v1")   # using data science conda pack. custom published packs are also supported
        .with_environment_variable(PIPELINE_STEP_RUN_ENTRYPOINT="employee-attr-dataproc.py")    # define the main script in the artifact to execute.
    )
)
 
# define the logistic regression training step
step_train_logistic_regression = (
    PipelineStep("train_logistic_regression")
    .with_description("Train a Logistic Regression model and save to the model catalog with its AUC score")
    .with_infrastructure(infrastructure)
    .with_maximum_runtime_in_minutes(120)
    .with_runtime(
        ScriptRuntime()
        .with_source("employee-attr-train-lr.zip")
        .with_service_conda("onnx110_p37_cpu_v1")
        .with_environment_variable(PIPELINE_STEP_RUN_ENTRYPOINT="employee-attr-train-lr.py")
    )
)
 
# define the random forest training step
step_train_random_forest = (
    PipelineStep("train_random_forest")
    .with_description("Train a Random Forest model and save to the model catalog with its AUC score")
    .with_infrastructure(infrastructure)
    .with_maximum_runtime_in_minutes(120)
    .with_runtime(
        ScriptRuntime()
        .with_source("employee-attr-train-rf.zip")
        .with_service_conda("onnx110_p37_cpu_v1")
        .with_environment_variable(PIPELINE_STEP_RUN_ENTRYPOINT="employee-attr-train-rf.py")
    )
)
 
# define the xgboost training step
step_train_xgboost = (
    PipelineStep("train_xgboost")
    .with_description("Train a model with XGBoost and save to the model catalog with its AUC score")
    .with_infrastructure(infrastructure)
    .with_maximum_runtime_in_minutes(120)
    .with_runtime(
        ScriptRuntime()
        .with_source("employee-attr-train-xgb.zip")
        .with_service_conda("onnx110_p37_cpu_v1")
        .with_environment_variable(PIPELINE_STEP_RUN_ENTRYPOINT="employee-attr-train-xgb.py")
    )
)
 
# define the model evaluation and deploy step
step_evaluate_and_deploy = (
    PipelineStep("evaluate_and_deploy")
    .with_description("Find the best model by their AUC score and deploy")
    .with_infrastructure(infrastructure)
    .with_maximum_runtime_in_minutes(30)
    .with_runtime(
        ScriptRuntime()
        .with_source("employee-attr-eval-deploy.zip")
        .with_service_conda("onnx110_p37_cpu_v1")
        .with_environment_variable(PIPELINE_STEP_RUN_ENTRYPOINT="employee-attr-eval-deploy.py")
    )
)
 
# define the pipeline
pipeline = (
    Pipeline(pipeline_name)
    .with_compartment_id(compartment_id)
    .with_project_id(project_id)
    .with_log_group_id(log_group_id)  # if you define the LogGroupID but not the LogID, logs will be created automatically in the specified LogGroup
    .with_freeform_tags({"pipeline-sample":"employee-attrition-sample"})
    .with_step_details([step_data_processing, step_train_logistic_regression, step_train_random_forest, step_train_xgboost, step_evaluate_and_deploy])    # set the steps to include in the pipeline
    .with_dag(["data_processing >> (train_logistic_regression, train_random_forest, train_xgboost) >> evaluate_and_deploy"])  # define the DAG
)

 

Each step is defined separately and can be used across different pipelines.

 

The DAG is defined by using the >> operator to declare dependency. In our code, the three training steps depend on the data processing step, and so all three run in parallel when the data processing step completes.

In this example, we’re using Python scripts in all the steps. You can also construct steps from existing ML jobs, notebook files, and bash scripts, which can run Java files.

Now create the pipeline resource with the following command:

pipeline.create()

You can view the pipeline visually by using the show() command:

pipeline graph

Using YAML for pipeline definition

One of the most powerful capabilities that come with ADS, is the ability to export a pipeline to YAML file and import a pipeline from YAML file. This process makes the pipeline definition code-independent and portable. You can save pipeline YAML definition in a code repository and create the pipelines quickly by using the YAML file.

# Export pipeline definition to yaml file (can load from yaml file too with from_yaml())
pipeline.to_yaml(’my_pipeline.yaml’)

Running the pipeline

To run the pipeline, use the following command:

pipeline_run = pipeline.run()

The command can take parameters to override configurations, such as the infrastructure, to use for this run, environment variables, logging, and more. You can track the progress of the pipeline visually.

 

pipeline run graph view

In the graphic, the first step was completed successfully, and now the three training steps are running in parallel.

You can also view the logs directly from the notebook with the following command:

pipeline_run.watch()

You can create, run, and monitor the pipeline from the Console too.

A screenshot of the Pipeline Details page in the Oracle Cloud Console.

When the pipeline run completes, you have a deployed model ready for prediction requests. You can run the pipeline again with different parameters on variations of the data to produce fresh models.

Conclusion

Machine learning pipelines are an essential part of the machine learning process. They provide a structured approach to training and deploying machine learning models and enable data scientists and ML engineers to experiment and iterate quickly. With this new addition of ML pipelines, we’re enriching our MLOps offering in the OCI Data Science service to enable our customers to create their machine learning workloads in an enterprise-grade, secure, scalable, automated platform.

Try Oracle Cloud Free Trial! A 30-day trial with US$300 in free credits gives you access to Oracle Cloud Infrastructure Data Science service. For more information, see the following resources:

Tzvi Keisar

Director of Product Management

Product Manager in OCI Data Science


Previous Post

Responsible AI for healthcare and financial services industries

Sanjay Basu PhD | 6 min read

Next Post


Using APEX to apply trained Data Science models

Bob Peulen | 17 min read