Combining the DBMS_PARALLEL_EXECUTE package in 11gR2 of the Oracle database with direct path inserts into partitioned tables is a useful pairing. Another flexibility facet I used here is the use of interval partitioning – so with the pattern, not only is the ETL loading into the partitions in an efficient, flexible implementation but the database is managing the partitions in the table too. Here I’ll show the raw first step of how it’s done then go about generalizing it into the tools.

The figure below shows chunks of source data being processed via a pool of jobs writing into a partitioned target table in direct path. The parallel execute package has some tables which define the tasks and chunks to be processed by the parallel job pool, you can query these via the data dictionary views (*_PARALLEL_EXECUTE_TASKS and *_PARALLEL_EXECUTE_CHUNKS). The package also supports resuming a task to reprocess failed chunks which is useful. There is an introductory article on DBMS_PARALLEL_EXECUTE worth checking out in the May/June Oracle magazine from Steven Feuerstein.

image

In the Oracle SQL grammar the partition key value of the partition extension clause in the INSERT DML provides critical information that will enable us to make a pattern for providing parallel direct path loads into partitioned tables.

image

So if we make the chunking column from DBMS_PARALLEL_EXECUTE useful for identifying the partition key value above then we have a winner. The parallel execute chunking identifier is a numeric value – in the example below the SALES table is partitioned by month, so we can imagine the chunking identifier using YYYYMM (ie. 200812 for December 2008) to represent a month in numeric form and this being converted to a date for the partition key value in the INSERT SQL clause using something like TO_DATE(200812, ‘YYYYMM’).

The illustration here will load a partitioned SALES table that uses interval partitioning so we get a table that the database will manage the addition of partitions.

CREATE TABLE sales
    ( prod_id        NUMBER(6)
    , cust_id        NUMBER
    , time_id        DATE
    , channel_id     CHAR(1)
    , promo_id       NUMBER(6)
    , quantity_sold  NUMBER(3)
    , amount_sold    NUMBER(10,2)
    )
  PARTITION BY RANGE (time_id)
  INTERVAL(NUMTOYMINTERVAL(1, ‘MONTH’))
    ( PARTITION p0 VALUES LESS THAN (TO_DATE(‘1-1-2008’, ‘DD-MM-YYYY’))
  );

The source table I’m using mirrors the target table, without the partitions, also added some basic data here for a demo – each batch of rows I have added will be processed in a chunk (imagine it was a lot of data in that partition)

CREATE TABLE src_sales
    ( prod_id        NUMBER(6)
    , cust_id        NUMBER
    , time_id        DATE
    , channel_id     CHAR(1)
    , promo_id       NUMBER(6)
    , quantity_sold  NUMBER(3)
    , amount_sold    NUMBER(10,2)
    );
begin
  for c in 1..1000000 loop
    insert into src_sales (prod_id,cust_id,time_id) values (1,1,’01-FEB-10′);
    insert into src_sales (prod_id,cust_id,time_id) values (1,1,’01-MAR-10′);
    insert into src_sales (prod_id,cust_id,time_id) values (1,1,’01-APR-10′);
    insert into src_sales (prod_id,cust_id,time_id) values (1,1,’01-MAY-10′);
    insert into src_sales (prod_id,cust_id,time_id) values (1,1,’01-JUN-10′);
    insert into src_sales (prod_id,cust_id,time_id) values (1,1,’01-JUL-10′);
    commit;
  end loop;
end;
/

To create the task and chunks for the execution we can use the DBMS_PARALLEL_EXECUTE APIs, in the call below we define the task with a name and in this case a SQL statement to identify the chunks (demo example, should be careful on performance here, commonly an indexed numeric field is used);

begin
  begin
    DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => ‘TASK_NAME’);
    exception when others then null;
  end;
  DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => ‘TASK_NAME’);
  DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(task_name => ‘TASK_NAME’,
    sql_stmt =>’select distinct to_number(to_char(time_id,”YYYYMM”)) startid, to_number(to_char(time_id,”YYYYMM”)) endid from src_sales’, by_rowid => false);
end;

Then we have to define the meat of the task and the number of jobs to process the chunks, note I am using dynamic SQL since the partition key value cannot be a bind variable – and its value will change in each child task that the parallel execute engine executes (it itself will pass start_id and end_id as bind variables to this block).

begin
  DBMS_PARALLEL_EXECUTE.RUN_TASK (task_name => ‘TASK_NAME’,
    sql_stmt =>’declare
      s varchar2(16000); vstart_id number := :start_id; vend_id number:= :end_id;
      begin
        s:=”insert into SALES /*+ APPEND*/
           partition for (to_date(”||vstart_id||”, ””YYYYMM””))
           select PROD_ID, CUST_ID, TIME_ID, CHANNEL_ID, PROMO_ID, QUANTITY_SOLD, AMOUNT_SOLD
           from SRC_SALES
           where time_id between to_date(:vstart_id, ””YYYYMM””) and last_day(to_date(:vstart_id, ””YYYYMM””)) ”;
        execute immediate s using vstart_id, vend_id;
        commit;
      end;’,
    language_flag => DBMS_SQL.NATIVE, parallel_level => 2 );
end;
/

Whilst the above is running the parallel execution package spawns 2 jobs (since I indicated parallel level of 2). If I quickly look at the USER_PARALLEL_EXECUTE_CHUNKS view I see 6 chunks, since I had 6 distinct months of data. I can see below the first two chunks are in ASSIGNED status and are being processed.

image

Checking the view again I see 2 have are now in PROCESSED status, and 2 are ASSIGNED – note the start_id and end_id columns here, these are the bind variables passed to my PLSQL block in the RUN_TASK routine above, it is these I use for the PARTITION FOR key value.

image

Finally all chunks are processed and I have processed the data in parallel writing direct path into the partitioned target tables.

image

I mentioned earlier the resume capability, this is also very useful. There is another state PROCESSED_WITH_ERROR that will be flagged when the chunk being processed has failed for whatever reason. The RESUME_TASK procedures allow you to retry the task and reprocess just those chunks that failed,

Interesting stuff, combining some capabilities in 11gR2 of the database to boost processing and leverage those CPUs! Hopefully this sparks some other ideas out there. Next up I’ll take this into the data integration tools and illustrate how it can be commoditized and avoid the programming.