Oracle Cloud Infrastructure (OCI) Functions service provides functions as a service and allows you to create, run, and scale applications and business logic, all without provisioning or managing infrastructure. Functions allows you to build independent pieces of code that can work together as a microservices application. Functions work great, but for some cases, your application requires orchestration and dependencies to accomplish a larger goal, often called a state machine. State machines give you the following capabilities:

  • Choose between branches of execution

  • Stop a run with a failure or success

  • Pass its input to its output or inject some fixed data

  • Provide a delay for a certain amount of time or until a specified time or date

  • Begin parallel branches of execution

Some real-world use cases include the following examples:

  • Infrastructure automation: You can utilize workflows to automate events and in your infrastructure operations, such as rotating keys to services for security and powering down resources to manage costs.

  • Extract, transform, load (ETL): Coordinate data between upstream, downstream sources, and the transformation in between.

  • Microservices orchestration

  • Transcoding workflows

Apache Airflow and OCI

Apache Airflow is an open source tool used for building, scheduling, and orchestrating data workflows. Apache Airflow allows you to define a workflow that OCI Functions runs and provides a GUI to track workflows, runs, and how to recover from failure.

Airflow has the following features and capabilities

  • Operator: A worker that knows how to perform a task. For example, a Python operator can run Python code, while a MySQL operator can run SQL commands in a MySQL database.

  • Direct acyclic graph (DAG): A DAG describes the order of tasks from start to finish. DAGs can hav dependencies between tasks that make  effective workflows.

  • Hooks: Interfaces to services external to airflow. OCI includes a base hook that can refer to any OCI Python SDK class. OCI has hooks, operators, and sample DAGs to our services, such as Object Storage, Data Flow, Autonomous Database, and Data Catalog

  • Task: A specific job done by an operator.

  • DAG run: Individual execution of a DAG

We can use Apache Airflow to help coordinate our function workflows. Airflow acts as our orchestration engine with each state in the workflow performing a specific task as its own standalone function. In this simple example, I orchestrate an e-commerce workflow that, depending on the type of transaction, processes a purchase or a return.

Our functions

To set up and deploy OCI functions, refer to these Quick Starts In OCI, I have created three functions.

Function 1: process_purchase

This function passes in an item from the customer and processes it as a purchase.

import io
import json
import logging

from fdk import response 
def handler(ctx, data: io.BytesIO = None): 
    product = "" 
    try: 
        body = json.loads(data.getvalue()) 
        product = body.get("product") 
    except (Exception, ValueError) as ex: 
        logging.getLogger().info('error parsing json payload: ' + str(ex))

    logging.getLogger().info("Processing a purchase for Product {0}".format(product)) 
    return response.Response( 
        ctx, response_data=json.dumps( 
            {"message": "Purchase for product {0} succeeded!".format(product)}), 
        headers={"Content-Type": "application/json"} 
    )

A screenshot of the successful purchase message in Cloud Shell.

Function 2: ship_item

This function generates an order number and passes it back to the customer. In a real-world example, I subtract the item from my inventory in a database.

import io 
import json 
import logging 
import random

from fdk import response 
def handler(ctx, data: io.BytesIO = None): 
    product = "" 
    try: 
        body = json.loads(data.getvalue()) 
        product = body.get("product") 
        shipping_number = random.randint(10000, 99999) 
    except (Exception, ValueError) as ex: 
        logging.getLogger().info('error parsing json payload: ' + str(ex))

    logging.getLogger().info("Generated shipping number {0}".format(shipping_number)) 
    return response.Response( 
        ctx, response_data=json.dumps( 
            {"message": "You shipping number for order {0} is {1}!".format(product, shipping_number)}), 
        headers={"Content-Type": "application/json"} 
    )

A screenshot of a successful order creation in Cloud Shell.

Function 3: process_refund

This function takes in the item and processes a refund.

import io 
import json 
import logging

from fdk import response 
def handler(ctx, data: io.BytesIO = None):

    logging.getLogger().info("Process Refund function!") 
    return response.Response( 
        ctx, response_data=json.dumps( 
            {"message": "Processing Refund Succesful!"}), 
        headers={"Content-Type": "application/json"} 
    ) 

A screenshot of the successful processing refund in Cloud Shell.

For airflow, you can install this on Oracle Container Engine for Kubernetes (OKE). Read the documentation and perform all the prerequisites before deployment, because items such as the vault secret and instance principal configurations are key to a successful deployment.

We then define our DAG file with the following commands:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import BranchPythonOperator
import oci
 
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 2, 14),
}
 
dag = DAG(dag_id='OCIStateMachine_FN', default_args=default_args, schedule_interval=None)
config = oci.config.from_file(Variable.get("config"), "DEFAULT")
service_endpoint=Variable.get("service_endpoint_2")
 
def mycondition(ds, **kwargs):
if kwargs['dag_run'].conf['transaction'] == 'Refund':
return "process_refund"
elif kwargs['dag_run'].conf['transaction'] == 'Purchase':
return "process_purchase"
else:
return "end"
print('no match')
 
def process_purchase(ds, **kwargs):
invoke_client = oci.functions.FunctionsInvokeClient(config, service_endpoint=service_endpoint)
resp = invoke_client.invoke_function(Variable.get("process_purchase_fn"),
invoke_function_body=kwargs['dag_run'].conf['product'])
print(resp.data.text)
 
\def process_refund(ds, **kwargs):
invoke_client = oci.functions.FunctionsInvokeClient(config, service_endpoint=service_endpoint)
resp = invoke_client.invoke_function(Variable.get("process_refund_fn"),
invoke_function_body=kwargs['dag_run'].conf['product'])
def ship_item(ds, **kwargs):
invoke_client = oci.functions.FunctionsInvokeClient(config, service_endpoint=service_endpoint)
resp = invoke_client.invoke_function(Variable.get("ship_item_fn"),
invoke_function_body=kwargs['dag_run'].conf['product'])
print(resp.data.text) 

process_purchase = PythonOperator(task_id='process_purchase', python_callable=process_purchase, dag=dag)
ship_item = PythonOperator(task_id='ship_item', python_callable=ship_item, dag=dag)
process_refund = PythonOperator(task_id='process_refund', python_callable=process_refund, dag=dag)

condition = BranchPythonOperator(task_id='process_transaction', provide_context=True, python_callable=mycondition, dag=dag)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

start >> condition >> process_purchase >> ship_item >> end
start >> condition >> process_refund >> end

When this process is finished and uploaded to our DAG location, we can see our DAG in Airflow with a graphical representation of our state machine. We can trigger the DAG as shown in the following screenshot:

A screenshot of the Trigger DAG: OCIStateMachine_FN configuration in the Console.

We can monitor the run and debug our run as needed. The following example shows a successful DAG run.

A screenshot of the workflow of the state machine function in Apache Airflow.

Conclusion

We have successfully used Airflow to orchestrate a state machine that uses OCI Functions. For more information about Oracle Cloud Functions, check out these examples on GitHub. In part 2 of this blog, we look at how tasks can talk to each other and use OCI’s API Gateway to automate DAG runs.

Oracle Cloud Infrastructure provides enterprise features for developers to build modern cloud applications. If you want to try out this lab for free, I recommend the Oracle Cloud Free Tier with US$300 credits for a 30-day free trial. Free Tier also includes several “Always Free” services that are available for an unlimited time, even after your free credits expire.