This post discusses an interface between Oracle Cloud Infrastructure (OCI) Streaming service and OCI Data Flow, process data from Streaming in a distributed manner, and persist the same in OCI Object Storage. The data stored using the interface allows developers to add intelligence to their applications to improve the experience of users. Examples of intelligence include providing suggestions and recommendations based on algorithms. The interface also allows algorithms created as part of this project and other algorithms to plug in, allowing both Oracle software-as-a-service developers and customer developers to create intelligent applications more easily without being an expert with machine learning or recommendation systems.
OCI Data Flow is a fully managed Big Data service that runs Apache Spark applications at any scale with almost no administration. Spark has become the leading Big Data processing framework, and OCI Data Flow is the easiest way to run Spark in OCI because Spark developers don’t need to install or manage anything.
Business use case
The following use case uses a data integrator.
The interface provides an API, Service X, which helps to ingest client data. The client service invokes this service using a publish and subscribe model. The client data is ingested into the Streaming service streams. Multiple partner and consumer apps require this data to provide personalized recommendation services to the user. These personalized suggestions are then shown on top of the navigation client data for the user.
This use case has the following requirements:
- The client data is consumed by the data integrator from the multiple streams (as pushed by the Service X API), parsed, and grouped across different tenancies and written into Oracle Object Storage.
- The client data needs to be consumed every 30 seconds because of the high volume of data for the high volume of users across tenants. It allows the consuming partners to rely on Object Storage events like a clock tick for micro-batching.
- The data in Object Storage, based on security policies, is then available to consumers and partners for personalized recommendation algorithm training. Each file contains all the events of all the users in the tenancy for every slice of 30 seconds.
Problem statement and approach
We plan to use OCI Data Flow as a consumer for streams, process the data as when it receives, and then persist the processed data to Object Storage. Data Flow supports this use case using a scheduling and polling mechanism.
The OCI security token used for the Data Flow and Object Storage integration expires every 24 hours. We currently have no token refresh capability or ability to use a new token for an existing instance. We use OCI Data Flow as a control plane to monitor and implement an instance for interleaving process.
In this approach, we use OCI Data Flow to build the integrator for the following reasons:
- Streams are internal to the interface and used for internal storage.
- Object Storage offers better management of data. We can apply a microbatching mechanism to consume data the 30-second file creation acts as a clock-tick. Partners can use the Object Storage creation event to time their microbatching. Object Storage also has longer retention for objects, which you can define using custom retention policies.
- Services and integrators belonging to the interface have write access. Other services and consumers have read-only access and access to events if required.
- OCI Data Flow uses Spark and can consume data from OCI Streaming using the oci-sdk, parse, group the file, and write it to Object Storage. Spark allows this process to happen in a distributed manner. Based on the understanding of the volume of data, having the work distributed across multiple executors can boost performance.
- OCI Data Flow doesn’t currently support streaming methodology. So, we need to use continuous microbatching mechanism, consuming from a stream every 30 seconds, without the Data Flow instance being stopped. The Data Flow job needs to run at a frequent time interval, every 30 seconds using the scheduling mechanism), to minimize unprocessed data so downstream apps can read the latest data from Object Storage. With the current support of Data Flow, we need a custom solution that uses microbatches from the Data Flow entry point program.
- When the data processor is running, Data Flow is managed and orchestrated using Airflow. Airflow schedules and runs the data processor using a directed acyclic graph (DAG). The Airflow DAG checks the data processor every 10 min to see if it’s running. If the data processor has been running for more than 23 hours, a new instance is started and the previously running instance of that processor is deleted. This interleaving method allows you to always have a Data Flow job reading from the streams every 30 seconds without interruption.
- Data Flow applications currently cannot run for longer than 24 hours. This limitation is caused by delegation tokens, which expire after 24 hours and cannot be refreshed. Data Flow loses the ability to communicate with Object Storage after 24 hours, which means we would have to start a new Data Flow instance.
- The stream details are defined in a configuration file with stream names, group cursor name, instance name, and partitions. The file is uploaded to the bucket where the data processor archives reside. The data processor uses this file. Changes to this file dynamically update the workings of the processor, which then starts consuming from new and updated stream details.
Overall, with the time and flexibility needed for our use case, OCI Data Flow works well for our use case.


Key takeaways
Using the interleaving process, using Data Flow and integrated with Airflow allows you to always have readings from streams, every 30 seconds without interruption. Using a clock-ticking mechanism ensures that Data Flow reads data at a continuous periodic interval.
Using OCI Data Flow Streaming support, your data processor can easily ingest and process streaming data with dynamic scaling of executors, based on the load of data.
Every use case is different. The only way to know if Oracle Cloud Infrastructure is right for you is to try it. You can select either the Oracle Cloud Free Tier or a 30-day free trial, which includes US$300 in credit to get you started with a range of services, including compute, storage, and networking.
