Oracle Cloud Infrastructure (OCI) Functions service provides functions as a service and allows you to create, run, and scale applications and business logic, all without provisioning or managing infrastructure. Functions allows you to build independent pieces of code that can work together as a microservices application. Functions work great, but for some cases, your application requires orchestration and dependencies to accomplish a larger goal, often called a state machine. State machines give you the following capabilities:
-
Choose between branches of execution
-
Stop a run with a failure or success
-
Pass its input to its output or inject some fixed data
-
Provide a delay for a certain amount of time or until a specified time or date
-
Begin parallel branches of execution
Some real-world use cases include the following examples:
-
Infrastructure automation: You can utilize workflows to automate events and in your infrastructure operations, such as rotating keys to services for security and powering down resources to manage costs.
-
Extract, transform, load (ETL): Coordinate data between upstream, downstream sources, and the transformation in between.
-
Microservices orchestration
-
Transcoding workflows
Apache Airflow and OCI
Apache Airflow is an open source tool used for building, scheduling, and orchestrating data workflows. Apache Airflow allows you to define a workflow that OCI Functions runs and provides a GUI to track workflows, runs, and how to recover from failure.
Airflow has the following features and capabilities
-
Operator: A worker that knows how to perform a task. For example, a Python operator can run Python code, while a MySQL operator can run SQL commands in a MySQL database.
-
Direct acyclic graph (DAG): A DAG describes the order of tasks from start to finish. DAGs can hav dependencies between tasks that make effective workflows.
-
Hooks: Interfaces to services external to airflow. OCI includes a base hook that can refer to any OCI Python SDK class. OCI has hooks, operators, and sample DAGs to our services, such as Object Storage, Data Flow, Autonomous Database, and Data Catalog
-
Task: A specific job done by an operator.
-
DAG run: Individual execution of a DAG
We can use Apache Airflow to help coordinate our function workflows. Airflow acts as our orchestration engine with each state in the workflow performing a specific task as its own standalone function. In this simple example, I orchestrate an e-commerce workflow that, depending on the type of transaction, processes a purchase or a return.
Our functions
To set up and deploy OCI functions, refer to these Quick Starts In OCI, I have created three functions.
Function 1: process_purchase
This function passes in an item from the customer and processes it as a purchase.
import io
import json
import logging
from fdk import response
def handler(ctx, data: io.BytesIO = None):
product = ""
try:
body = json.loads(data.getvalue())
product = body.get("product")
except (Exception, ValueError) as ex:
logging.getLogger().info('error parsing json payload: ' + str(ex))
logging.getLogger().info("Processing a purchase for Product {0}".format(product))
return response.Response(
ctx, response_data=json.dumps(
{"message": "Purchase for product {0} succeeded!".format(product)}),
headers={"Content-Type": "application/json"}
)
![]()
Function 2: ship_item
This function generates an order number and passes it back to the customer. In a real-world example, I subtract the item from my inventory in a database.
import io
import json
import logging
import random
from fdk import response
def handler(ctx, data: io.BytesIO = None):
product = ""
try:
body = json.loads(data.getvalue())
product = body.get("product")
shipping_number = random.randint(10000, 99999)
except (Exception, ValueError) as ex:
logging.getLogger().info('error parsing json payload: ' + str(ex))
logging.getLogger().info("Generated shipping number {0}".format(shipping_number))
return response.Response(
ctx, response_data=json.dumps(
{"message": "You shipping number for order {0} is {1}!".format(product, shipping_number)}),
headers={"Content-Type": "application/json"}
)
![]()
Function 3: process_refund
This function takes in the item and processes a refund.
import io
import json
import logging
from fdk import response
def handler(ctx, data: io.BytesIO = None):
logging.getLogger().info("Process Refund function!")
return response.Response(
ctx, response_data=json.dumps(
{"message": "Processing Refund Succesful!"}),
headers={"Content-Type": "application/json"}
)

For airflow, you can install this on Oracle Container Engine for Kubernetes (OKE). Read the documentation and perform all the prerequisites before deployment, because items such as the vault secret and instance principal configurations are key to a successful deployment.
We then define our DAG file with the following commands:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.models import Variable
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import BranchPythonOperator
import oci
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 2, 14),
}
dag = DAG(dag_id='OCIStateMachine_FN', default_args=default_args, schedule_interval=None)
config = oci.config.from_file(Variable.get("config"), "DEFAULT")
service_endpoint=Variable.get("service_endpoint_2")
def mycondition(ds, **kwargs):
if kwargs['dag_run'].conf['transaction'] == 'Refund':
return "process_refund"
elif kwargs['dag_run'].conf['transaction'] == 'Purchase':
return "process_purchase"
else:
return "end"
print('no match')
def process_purchase(ds, **kwargs):
invoke_client = oci.functions.FunctionsInvokeClient(config, service_endpoint=service_endpoint)
resp = invoke_client.invoke_function(Variable.get("process_purchase_fn"),
invoke_function_body=kwargs['dag_run'].conf['product'])
print(resp.data.text)
\def process_refund(ds, **kwargs):
invoke_client = oci.functions.FunctionsInvokeClient(config, service_endpoint=service_endpoint)
resp = invoke_client.invoke_function(Variable.get("process_refund_fn"),
invoke_function_body=kwargs['dag_run'].conf['product'])
def ship_item(ds, **kwargs):
invoke_client = oci.functions.FunctionsInvokeClient(config, service_endpoint=service_endpoint)
resp = invoke_client.invoke_function(Variable.get("ship_item_fn"),
invoke_function_body=kwargs['dag_run'].conf['product'])
print(resp.data.text)
process_purchase = PythonOperator(task_id='process_purchase', python_callable=process_purchase, dag=dag)
ship_item = PythonOperator(task_id='ship_item', python_callable=ship_item, dag=dag)
process_refund = PythonOperator(task_id='process_refund', python_callable=process_refund, dag=dag)
condition = BranchPythonOperator(task_id='process_transaction', provide_context=True, python_callable=mycondition, dag=dag)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> condition >> process_purchase >> ship_item >> end
start >> condition >> process_refund >> end
When this process is finished and uploaded to our DAG location, we can see our DAG in Airflow with a graphical representation of our state machine. We can trigger the DAG as shown in the following screenshot:

We can monitor the run and debug our run as needed. The following example shows a successful DAG run.

Conclusion
We have successfully used Airflow to orchestrate a state machine that uses OCI Functions. For more information about Oracle Cloud Functions, check out these examples on GitHub. In part 2 of this blog, we look at how tasks can talk to each other and use OCI’s API Gateway to automate DAG runs.
Oracle Cloud Infrastructure provides enterprise features for developers to build modern cloud applications. If you want to try out this lab for free, I recommend the Oracle Cloud Free Tier with US$300 credits for a 30-day free trial. Free Tier also includes several “Always Free” services that are available for an unlimited time, even after your free credits expire.
