Introduction

Customers often need to perform analytics on their MySQL audit logs for compliance, monitoring, and troubleshooting. However, managing large volumes of audit data within MySQL can be inefficient and costly. To address this, an archival system is essential—one that reliably extracts audit logs, stores them in a cost-effective repository, and makes them available for further analysis.

Oracle Cloud Infrastructure (OCI) Object Storage is an ideal target for such archival, given its durability, scalability, and integration capabilities. Once audit logs are archived into Object Storage, organizations can leverage OCI Logging Analytics or any third-party/cloud logging analytics tool that can connect to Object Storage buckets for further insights.

This blog demonstrates how to build an archival pipeline for MySQL Audit Logs into Object Storage using OCI Data Integration (OCI-DI).

Solution Architecture

Solution arch

 

Business Requirements and Challenges

  • Ensure a complete archival system for MySQL audit logs.

  • Store logs in OCI Object Storage for durability and long-term retention.

  • Enable further analytics through OCI Logging Analytics or external BI/Logging analytics tools.

  • Efficiently manage large audit logs by tuning buffer sizes and controlling data extraction.

  • Support incremental extraction of logs from the last processed timestamp and ID to avoid duplicates.

 

Solution Overview

1. Audit Log Extraction with audit_log_read()

MySQL provides the audit_log_read() function for reading audit logs in JSON format. The reading process is controlled by the parameter audit_log_read_buffer_size, which determines the buffer size used for reading logs:

  • Default value: 32 KB

  • Maximum value: 4 MB

  • Scope: Global, Session

  • Dynamic: Yes (can be changed at runtime)

By tuning this parameter, customers can optimize log extraction and reduce the number of iterations needed for data loading.

Example:

SET GLOBAL audit_log_read_buffer_size = 2 * 1024 * 1024; -- Set buffer size to 2 MB 

This ensures that each call to audit_log_read() fetches larger log chunks, improving performance during archival.

 

2. Archival Architecture in OCI

The archival solution leverages the following OCI components:

  • MySQL Database (source of audit logs)

  • OCI Object Storage (target archival repository)

  • OCI Data Integration (OCI-DI) (ETL/ELT engine to extract, transform, and load logs)

High-Level Flow:

  • Initialization:

    • Set up schema, audit tables, and stored functions in MySQL.

    • Define an initial date to begin extraction (if no prior records exist in the audit_data table).

  • Incremental Extraction:

    • First run fetches all records from the initial timestamp.

    • Subsequent runs extract only new records since the last processed timestamp & ID.

  • Archival to Object Storage:

    • Extracted logs are stored as Parquet files within designated bucket directories in Object Storage.

    • Logs can be further consumed by OCI Logging Analytics or external logging analytics tools.

 

3. DDL Needed to be run on MySQL:

create schema audit_archive;
     
    use audit_archive;
     
    CREATE TABLE `audit_data` (
    `server_uuid` varchar(45) NOT NULL,
    `id` int NOT NULL,
    `ts` timestamp NOT NULL,
    `class` varchar(20) DEFAULT NULL,
    `event` varchar(80) DEFAULT NULL,
    `the_account` varchar(80) DEFAULT NULL,
    `login_ip` varchar(200) DEFAULT NULL,
    `login_os` varchar(200) DEFAULT NULL,
    `login_user` varchar(200) DEFAULT NULL,
    `login_proxy` varchar(200) DEFAULT NULL,
    `connection_id` varchar(80) DEFAULT NULL,
    `db` varchar(40) DEFAULT NULL,
    `status` int DEFAULT NULL,
    `connection_type` varchar(40) DEFAULT NULL,
    `connect_os` varchar(40) DEFAULT NULL,
    `pid` varchar(40) DEFAULT NULL,
    `_client_name` varchar(80) DEFAULT NULL,
    `_client_version` varchar(80) DEFAULT NULL,
    `program_name` varchar(80) DEFAULT NULL,
    `_platform` varchar(80) DEFAULT NULL,
    `command` varchar(40) DEFAULT NULL,
    `sql_command` varchar(40) DEFAULT NULL,
    `command_status` varchar(40) DEFAULT NULL,
    `query` varchar(40) DEFAULT NULL,
    `query_status` int DEFAULT NULL,
    `start_server_id` varchar(400) DEFAULT NULL,
    `server_os_version` varchar(100) DEFAULT NULL,
    `server_mysqlversion` varchar(100) DEFAULT NULL,
    `args` varchar(80) DEFAULT NULL,
    `account_host` varchar(80) DEFAULT NULL,
    `mysql_version` varchar(80) DEFAULT NULL,
    `the_os` varchar(80) DEFAULT NULL,
    `the_os_ver` varchar(80) DEFAULT NULL,
    `server_id` varchar(8) DEFAULT NULL,
    PRIMARY KEY (`server_uuid`,`id`,`ts`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    
    -- Following tables help track the incremental extraction of MySQL audit logs.
    
    CREATE TABLE `audit_nextts` (
      `id` int NOT NULL,
      `ts` timestamp NOT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    
    CREATE TABLE `audit_data_count` (
      `count_value` int DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    
    -- MySQL function code to track the audit data count(curr_audit_data_count()) for each load
    DROP FUNCTION IF EXISTS curr_audit_data_count;
    CREATE FUNCTION curr_audit_data_count() RETURNS INT
    DETERMINISTIC
    BEGIN
        DECLARE count_value INT DEFAULT 0;
        
        -- Get the count of rows from the audit_data table
        SELECT COUNT(*) INTO count_value
        FROM audit_archive.audit_data;
        
        -- If count_value is 0, throw an exception
        IF count_value = 0 THEN
            SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No data found in audit_data table';
        END IF;
        
        -- Return the count
        RETURN count_value;
    END;

 

4. OCI Data Integration (OCI-DI) Pipeline

The archival pipeline in OCI-DI is built with four primary tasks that collectively orchestrate the end-to-end process of extracting, staging, and archiving logs into Object Storage. The steps below outline the role of each integration task in the pipeline.

1) IT_LOG_LOAD_MYSQL:

DESCRIPTION: This task either performs the initial load of audit logs into the audit_data table or incrementally loads new data based on the last processed timestamp and ID. The staging of audit log data is handled through the OCI-DI SQL interface using a Data Flow integration task and below is the SQL query code snippet for extracting the audit logs incrementally.

SOURCE:

SELECT * FROM (SELECT
    ROW_NUMBER() OVER() AS row_num,
    @@server_uuid AS server_uuid, auditdata.*, nextts_tab.nextts_rownum
    FROM (
        SELECT
            (CASE
                WHEN (SELECT COUNT(*) FROM audit_archive.audit_nextts) > 0 THEN
                    (
                        SELECT CONCAT(
                            '{"timestamp": "',
                            DATE_FORMAT(ts, '%Y-%m-%d %H:%i:%s'),
                            '", "id": ',
                            id,
                            ', "max_array_length": 1000 }'
                        )
                        FROM audit_archive.audit_nextts                       
                    )
                ELSE
                    '{ "start": { "timestamp": "2024-08-23" }, "max_array_length": 1000 }'
            END) AS nextts,
    (CASE WHEN  (SELECT COUNT(*) FROM audit_archive.audit_nextts) > 0 THEN 1 ELSE 0 END) nextts_rownum
    ) nextts_tab,
    JSON_TABLE(
        AUDIT_LOG_READ(nextts),
        '$[*]'
        COLUMNS (
          id INT PATH '$.id',
          ts TIMESTAMP PATH '$.timestamp',
          class VARCHAR(20) PATH '$.class',
          event VARCHAR(80) PATH '$.event',
          the_account VARCHAR(80) PATH '$.account',
          login_ip VARCHAR(200) PATH '$.login.ip',
          login_os VARCHAR(200) PATH '$.login.os',
          login_user VARCHAR(200) PATH '$.login.user',
          login_proxy VARCHAR(200) PATH '$.login.proxy',
          connection_id VARCHAR(80) PATH '$.connection_id',
          db VARCHAR(40) PATH '$.connection_data.db',
          status INT PATH '$.connection_data.status',
          connection_type VARCHAR(40) PATH '$.connection_data.connection_type',
          connect_os VARCHAR(40) PATH '$.connection_data.connection_attributes._os',
          pid VARCHAR(40) PATH '$.connection_data.connection_attributes._pid',
          c_client_name VARCHAR(80) PATH '$.connection_data.connection_attributes._client_name',
          c_client_version VARCHAR(80) PATH '$.connection_data.connection_attributes._client_version',
          program_name VARCHAR(80) PATH '$.connection_data.connection_attributes.program_name',
          p_platform VARCHAR(80) PATH '$.connection_data.connection_attributes._platform',
          command VARCHAR(40) PATH '$.general_data.command',
          sql_command VARCHAR(40) PATH '$.general_data.sql_command',
          command_status VARCHAR(40) PATH '$.general_data.status',
          query VARCHAR(40) PATH '$.general_data.query',
          query_status INT PATH '$.general_data.status',
          start_server_id VARCHAR(400) PATH '$.startup_data.server_id',
          server_os_version VARCHAR(100) PATH '$.startup_data.os_version',
          server_mysqlversion VARCHAR(100) PATH '$.startup_data.mysql_version',
          args VARCHAR(80) PATH '$.startup_data.args',
          account_host VARCHAR(80) PATH '$.account.host',
          mysql_version VARCHAR(80) PATH '$.startup_data.mysql_version',
          the_os VARCHAR(80) PATH '$.startup_data.os',
          the_os_ver VARCHAR(80) PATH '$.startup_data.os_version',
          server_id VARCHAR(80) PATH '$.startup_data.server_id'
        )
    ) AS auditdata) test_tab
    WHERE row_num >  nextts_rownum
    

TARGET:

audit_archive.audit_data (overwrite)

 

2) IT_AUDIT_COUNT:

DESCRIPTION: This task will load audit data count into audit_data_count table by using a custom mysql function.This function throws a custom sql error if count is zero.So the pipleline will skip next steps if count is 0.

SOURCE: 

select audit_archive.curr_audit_data_count() count_value from dual
    

Note that for the first run set the source to the following SQL

SELECT COUNT(*) as count_value FROM audit_archive.audit_data;


TARGET: 

audit_archive.audit_data_count (overwrite)

 

3) IT_AUDIT_NEXTTS:

DESCRIPTION: Set the timestamp and id for next pipleline run into audit_nextts table.

SOURCE:

SELECT id, ts FROM audit_archive.audit_data ORDER BY ts DESC, id DESC LIMIT 1

TARGET:

audit_archive.audit_nextts (overwrite)

 

4) IT_LOG_LOAD_OBS:

DESCRIPTION: Insert audit_data table contents to object store directory.

SOURCE:

audit_archive.audit_data

TARGET:

Object Store Directory as data entity (Insert Parquet Files)


Below is a screenshot of the OCI Data Integration pipeline task, which orchestrates the extraction and archival of MySQL audit logs into Object Storage.

DI Pipeline


The core logic for this approach is derived from the below MySQL developer blog:
MySQL Audit Data Consolidation Made Simple
 

In conclusion, archiving MySQL audit logs into OCI Object Storage provides a scalable, durable, and cost-effective way of handling large volumes of audit data. By leveraging OCI Data Integration, the process of extracting and loading logs is automated, ensuring that both initial and incremental loads are handled efficiently using the latest timestamp and ID. Once archived, the logs can be seamlessly analyzed through OCI Logging Analytics or any compatible third-party tool, enabling organizations to meet compliance, monitoring, and troubleshooting requirements with greater ease and reliability.