You can use an Oracle Cloud Infrastructure (OCI) Function to process messages from an OCI Stream. When you publish new messages to your stream, your function receives those messages, processes their content, and writes results to OCI Logging. You perform all steps with the OCI Command Line Interface (OCI CLI) and Fn CLI.

What You Do

  • Create an OCI Stream.
  • Deploy an OCI Function that reads messages from the stream.
  • Set up Connector Hub to invoke the function when new stream messages arrive.
  • Publish messages to the stream and confirm that the function logs each one.

At the end, your OCI Function runs when new messages arrive in your stream. You also assign permissions so your resources interact securely through OCI IAM policies.


Prerequisites

Before you begin, install and configure OCI CLI and Fn CLI:


Create a Stream Pool and Stream

Start by provisioning a Stream Pool and creating a Stream:

  1. Configure an IAM policy that grants STREAM_POOL_CREATE and STREAM_CREATE permissions.
    • See OCI streaming policies for details.
    • Example policy:
      
      Allow any-user to manage stream-family in compartment id <compartment-name>
      
  2. Create a Stream Pool named blog-stream-pool.
  3. Create a Stream named blog-stream in the Stream Pool.

Create the Application and Function

Next, create an Application and Function to process messages.

  1. Confirm you have an IAM policy that permits use of Functions.
  2. Create a Functions Application called stream-application. Assign it to an available subnet in your VCN.
  3. Enable Logging for the application.
  4. Log in with OCI CLI and authenticate with OCIR.
  5. Create and deploy the Function:
    
    fn init --runtime go stream-function
    cd stream-function
    fn --verbose deploy --app stream-application
    fn list fn stream-application
    fn invoke stream-application stream-function
    

The hello-world Function is now ready. If you run into issues, see Functions troubleshooting . You will update this Function in a later step to process stream messages.


Connect the Stream to the Function

Set up OCI Connector Hub to forward messages from your stream to your function:

IAM Policies

  • Grant Service Connector permission to read and consume from the stream
    
    Allow any-user to {STREAM_READ, STREAM_CONSUME} in compartment id <compartment-name> where all {request.principal.type='serviceconnector', request.principal.compartment.id='<compartment-name>'}
    
  • Grant invocation permissions for the function:
    
    Allow any-user to use fn-invocation in compartment id  where all {request.principal.type='serviceconnector', request.principal.compartment.id=''}Allow any-user to use fn-function in compartment id  where all {request.principal.type='serviceconnector', request.principal.compartment.id=''}
    

Create the Connector

  1. Create a Connector named stream-connector.
    • Source: Streaming
    • Target: Functions
    • Source Stream Pool: blog-stream-pool
    • Source Stream: blog-stream
    • Target Function Application: stream-application
    • Target Function: stream-function
    • (Optional) Enable logs

Test the Connection

  1. Open blog-stream and select Produce test message.
  2. Enter any value in the data field and select Produce.

You check invocation metrics in stream-connector:

  • Messages read from source: The connector successfully received messages from the stream.
  • Messages written to target: The connector invoked the function with each message.
  • See errors at source or target? Enable logs and use Connector Hub troubleshooting .

Update Your Function to Process Messages

Update your OCI Function to process stream messages, de-serialize them, and log results to OCI Logging.

Details:

Example stream message:


{
"name": "Processing Function with Streaming Service",
"connector": "streaming"
}

Update go.mod

Add the Fn Events dependency:


module func
go 1.24.5

require (
    github.com/fnproject/fdk-go v0.1.1
    github.com/fnproject/fn-events v0.1.1
)

Update Function Code

Edit func.go:


package main

import (
    "context"
    "log"
    "reflect"

    "github.com/fnproject/fdk-go"
    fn_events "github.com/fnproject/fn-events"
)

type BlogMessage struct {
    Name      string `json:"name"`
    Connector string `json:"connector"`
}

type ExampleConnectorHubHandler struct {
}

func (h *ExampleConnectorHubHandler) Serve(
    ctx context.Context,
    batch fn_events.ConnectorHubBatch[fn_events.StreamingData[BlogMessage]],
) {
    for _, streamMessage := range batch.Batch {
        log.Printf("reading message: %v", streamMessage.Value)
    }
}

func main() {
    log.SetFlags(0) // Removes redundant timestamps from logs
    handler := fn_events.ConnectorHubHandlerStreaming(
        &ExampleConnectorHubHandler{},
        reflect.TypeOf(BlogMessage{}),
    )
    fdk.Handle(handler)
}

Deploy and Test

  1. Deploy the updated function:
    
    fn --verbose deploy --app stream-application
    
  2. Open blog-stream and select Produce test message.
    Paste the following JSON and select Produce
    
    { "name": "Processing Function with Streaming Service", "connector": "streaming"}
    
  3. Go to Function Invocation Logs in the stream-application Monitoring tab and select the log name.

You now see each stream message in OCI Logging.