Introduction

By enabling Oracle Cloud Infrastructure (OCI) notifications, you can receive alerts for data load completions, estimated load completion events, and prioritized data refreshes. This enables you to effectively monitor and manage custom ETL dependencies with real-time notifications.

You can enable notifications in two ways, depending on your needs: send notifications to email, or push pipeline statistics to OCI Object Storage buckets. You may choose either method, or use both, to best suit your workflow.

1. Set up and receive event notifications
2. Enable functions to push pipeline statistics notifications to OCI Object Storage buckets

Set up and receive event notifications

  • Navigate to Developer services and select Notifications.
Notifications
Notifications
  • Click Create Topic and name it, Prioritized_Data_Refresh. Add a description, select your compartment (avoid creating it in the root or tenancy compartment), and click Create.
Topic
Topic
  • Select your topic.
Topic
Topic
  • Navigate to Subscriptions.
Prioritized Data Refresh
Prioritized Data Refresh
  • Click Create Subscription.
Create Subscription
Create Subscription
  • Enter your email address and click Create.
Configure Email
Configure Email
  • You will receive an email to confirm your subscription. Click Confirm subscription.
Confirm Subscription
Confirm Subscription
  • Navigate to Observability & Management in OCI and select Rules under Events Service.
Rules
Rules
  • Click Create Rule.
Create Rule
Create Rule
  • Provide the required details and configure the rule conditions and attributes to receive notifications upon completion of prioritized data refreshes.
Rule Conditions
Rule Conditions
  • Set your topic for notifications under Actions.
Actions
Actions
  • Click Save Changes.
Save Rules
Save Rules

The rule is now ready to use.

Rule Saved
Rule Saved

You will now receive notifications upon completion of prioritized refreshes.

Email Notifications
Email Notifications

Enable functions to push pipeline statistics notifications to OCI Object Storage buckets

NOTE: Create the artifacts described in this section within the same child compartment. Don’t create them in the root compartment, as Dynamic Groups and Policies don’t take effect there.

  • Navigate to User Profile and select the Identity Domain.
User Profile
User Profile
  • Under Dynamic groups, click Create dynamic group.
Dynamic Group
Dynamic Group
  • Enter the group name as FAW_DG and provide a description.
  • Enter the matching rule as:
    ANY {resource.compartment.id='<OCID of the compartment where the bucket and application are located>'}
  • Click Create.
Create Dynamic Group
Create Dynamic Group
  • Navigate to Identity & Security and select Policies.
Policies
Policies
  • Click Create Policy and enter the name as Function_Deploy_Invoke along with a description.
Create Policy
Create Policy
  • Click Show Manual Editor and enter the policies as provided.
Manual Editor
Manual Editor
  • Enter the listed policies:
    Allow group 'Default'/'All Tenant Users' to use cloud-shell in compartment <Child Compartment Name>
    Allow group 'Default'/'All Tenant Users' to manage repos in compartment <Child Compartment Name>
    Allow group 'Default'/'All Tenant Users' to read objectstorage-namespaces in compartment <Child Compartment Name>
    Allow group 'Default'/'All Tenant Users' to manage logging-family in compartment <Child Compartment Name>
    Allow group 'Default'/'All Tenant Users' to read metrics in compartment <Child Compartment Name>
    Allow group 'Default'/'All Tenant Users' to manage functions-family in compartment <Child Compartment Name>
    Allow group 'Default'/'All Tenant Users' to use virtual-network-family in compartment <Child Compartment Name>
    Allow group 'Default'/'All Tenant Users' to use apm-domains in compartment <Child Compartment Name>
    Allow group 'Default'/'All Tenant Users' to read vaults in compartment <Child Compartment Name>
    Allow group 'Default'/'All Tenant Users' to use keys in compartment <Child Compartment Name>
    Allow service faas to use apm-domains in compartment <Child Compartment Name>
    Allow service faas to read repos in compartment <Child Compartment Name> where request.operation='ListContainerImageSignatures'
    Allow service faas to {KEY_READ} in compartment <Child Compartment Name> where request.operation='GetKeyVersion'
    Allow service faas to {KEY_VERIFY} in compartment <Child Compartment Name> where request.operation='Verify'
    Allow dynamic-group FAW_DG to manage object-family in compartment <Child Compartment Name>
  • Click Create. Navigate to Storage and select Buckets.
Buckets
Buckets
  • Click Create Bucket, enter the bucket name as Prioritized_Data_Refresh, and Enable object versioning and Emit object events.
Create Bucket
Create Bucket
  • Click Create bucket.
Bucket
Bucket
  • Navigate to Developer Services and select Applications.
Applications
Applications
  • Click Create Application and name it Prioritized_Data_Refresh in the same compartment.
  • Choose a VCN and subnet (private or public) as defined in your tenancy, and then click Create.
VCN
VCN
  • Select the created application.
Select Application
Select Application
  • Navigate to Functions under your created application and click Create in code editor.
    A code editor displays with a prompt to install the OCI Function CLI.
Code Editor
Code Editor
  • Click Install OCI Function CLI to open a CLI terminal where you can run your commands.
Install OCI Function CLI
Install OCI Function CLI
Code Editor
Code Editor
  • Minimize the Code Editor window, navigate to the Details tab under your application, and click the View guide to the right of Cloud shell setup.

    This opens a tab with the commands you can use in your CLI terminal to set up the function.
View Guide
View Guide
Cloud Shell Setup
Cloud Shell Setup
  • Run the commands in the cloud shell that you previously minimized.

a: Complete steps a-c provided in the View Guide.

b: Complete step d to create a repository under Container Registry in Developer Services, where your function build images will be stored.

c: Replace the placeholder in the command as follows:
fn update context registry iad.ocir.io/obejectstore_namespace/[repo-name-prefix]:
fn update context registry iad.ocir.io/ obejectstore_namespace/prioritized_data_refresh_repo

Container Registry
Container Registry
Container Registry
Container Registry

d: Select the Create repository on first push in root compartment option under Settings in the Container Registry to avoid having to create a repository before pushing to it.

Create Repository
Create Repository

e: Generate the authentication token. Go to the user account that’s being used, create an authentication token, and make a note of it.

Create Authentication Token
Create Authentication Token
Generate Token
Generate Token

f: Change the command to:
docker login iad.ocir.io -u ‘objectstore_namespace/username@oracle.com' -p {Generated Auth token}

g: The docker login command should include the user’s IAM Domain if its other than Default:
docker login -u ‘objectstore_namespace/username@oracle.com' iad.ocir.io - >
docker login -u objectstore_namespace/OracleIdentityCloudService/username@oracle.com'  iad.ocir.io


h: Initializes a new function project in the specified runtime environment. In this example, we are writing Python code.
fn init --runtime java hello-java  -> fn init --runtime python prioritized_data_refresh_func

i: Navigate to the newly created Python function project directory.
cd prioritized_data_refresh_func

j: List the files created under prioritized_data_refresh_func by running the ‘ls’ command.

List
List

k: Open func.py using the following command.
nano func.py

l: In the func.py file, delete the existing code, and replace it with the new function code provided below for OCI.
import io
import json
import oci
from datetime import datetime
def handler(ctx, data: io.BytesIO = None):
try:
payload = json.loads(data.getvalue())
eventTime = payload.get("eventTime", "")
eventType = payload.get("eventType", "")
data_section = payload.get("data", {})
resourceName = data_section.get("resourceName", "")
add_details = data_section.get("additionalDetails", {})
requestType = add_details.get("requestType", "")
status = add_details.get("status", "")      
# ---- Build human-readable event summary ----
text_summary = (
"Oracle Fusion Analytics Pipeline Event Summary\n"
"------------------------------------------------\n"
f"Event Time: {eventTime}\n"
f"Event Type: {eventType}\n"
f"Resource Name: {resourceName}\n"
f"Request Type: {requestType}\n"
f"Status: {status}\n"
"\nThis notification was automatically generated by Oracle Fusion Analytics Pipeline.\n"
)
# ---- Alternatively, build a CSV-style summary ----
csv_summary = (
"eventTime,eventType,resourceName,requestType,status,completionTime\n"
f"{eventTime},{eventType},{resourceName},{requestType},{status}\n"
)
# ---- Upload to OCI Object Storage ----
signer = oci.auth.signers.get_resource_principals_signer()
object_storage = oci.object_storage.ObjectStorageClient(config={}, signer=signer)
namespace = "<objectsore_namespace>"
bucket_name = "Prioritized_Data_Refresh" 
timestamp = datetime.utcnow().strftime('%Y%m%dT%H%M%SZ')
object_name_text = f"pipeline_event_{timestamp}.txt"
object_name_csv = f"pipeline_event_{timestamp}.csv"
# Upload text summary
object_storage.put_object(
namespace,
bucket_name,
object_name_text,
text_summary.encode('utf-8')
)
# Upload CSV summary
object_storage.put_object(
namespace,
bucket_name,
object_name_csv,
csv_summary.encode('utf-8')
)
print(f"Event summaries uploaded as {object_name_text} and {object_name_csv}")
except Exception as ex:
print('ERROR: Failed to process or upload event data:', ex, flush=True)
raise

m: Open the requirements.txt file using the command:
nano requirements.txt

n: Add oci as a dependency since the code uses that library. Save and close the file.

Requirements File
Requirements File

o: Deploy the function to your application
fn -v deploy --app Prioritized_Data_Refresh

p: To test your function, you can use the following command:
fn invoke Prioritized_Data_Refresh prioritized_data_refresh_func
If this command runs successfully, you should see the test function appear as a built image under the application. However, you do not need to run this command if you plan to attach this function as a subscription to a Topic.

Function
Function
  • To monitor the function for any errors during execution, enable log monitoring on the application. This will allow you to review logs and troubleshoot issues as needed.
  • Navigate to Monitoring under your application and click Enable log.
Logging
Enable Log
  • Create the logs under a Log group defined in your tenancy.
Log Group
Enable Log
  • Click the Log Name once it’s created.
Logs
Logs
  • This takes you to the Observability & Management portal. Clicking on Explore Log allows you to search the logs with specific details, such as the time of execution and detailed errors.
Explore Log
Explore Log
Logs generated with detail
Logs generated with detail
  • To subscribe a function to your topic, select Function as the protocol, and then choose the application and function containing your code.
  • Click Create.
Subscription
Create Subscription
  • To test the function without waiting for the actual pipeline trigger, you can use the Publish Message option associated with the topic you just created.
Publish Message
Publish Message

The message can be in any format such as a JSON payload or even a single-line statement depending on what your function requires as input. The title will be used as the subject of the email you receive, if your email address is subscribed to the topic.

Publish
  • Click Publish and validate the results. Navigate to OCI bucket and review the files.
Files
Files

Call to Action

Now that you’ve read this post, try it yourself and let us know your results in the Oracle Analytics Community , where you can also ask questions and post ideas. If you have questions or need more details, you can always check the Help Center for Oracle Analytics.