Parallel Direct Path Inserts into Partitioned Tables

Combining the DBMS_PARALLEL_EXECUTE package in 11gR2 of the Oracle database with direct path inserts into partitioned tables is a useful pairing. Another flexibility facet I used here is the use of interval partitioning - so with the pattern, not only is the ETL loading into the partitions in an efficient, flexible implementation but the database is managing the partitions in the table too. Here I'll show the raw first step of how it's done then go about generalizing it into the tools.

The figure below shows chunks of source data being processed via a pool of jobs writing into a partitioned target table in direct path. The parallel execute package has some tables which define the tasks and chunks to be processed by the parallel job pool, you can query these via the data dictionary views (*_PARALLEL_EXECUTE_TASKS and *_PARALLEL_EXECUTE_CHUNKS). The package also supports resuming a task to reprocess failed chunks which is useful. There is an introductory article on DBMS_PARALLEL_EXECUTE worth checking out in the May/June Oracle magazine from Steven Feuerstein.

image

In the Oracle SQL grammar the partition key value of the partition extension clause in the INSERT DML provides critical information that will enable us to make a pattern for providing parallel direct path loads into partitioned tables.

image

So if we make the chunking column from DBMS_PARALLEL_EXECUTE useful for identifying the partition key value above then we have a winner. The parallel execute chunking identifier is a numeric value - in the example below the SALES table is partitioned by month, so we can imagine the chunking identifier using YYYYMM (ie. 200812 for December 2008) to represent a month in numeric form and this being converted to a date for the partition key value in the INSERT SQL clause using something like TO_DATE(200812, 'YYYYMM').

The illustration here will load a partitioned SALES table that uses interval partitioning so we get a table that the database will manage the addition of partitions.

CREATE TABLE sales
    ( prod_id        NUMBER(6)
    , cust_id        NUMBER
    , time_id        DATE
    , channel_id     CHAR(1)
    , promo_id       NUMBER(6)
    , quantity_sold  NUMBER(3)
    , amount_sold    NUMBER(10,2)
    )
  PARTITION BY RANGE (time_id)
  INTERVAL(NUMTOYMINTERVAL(1, 'MONTH'))
    ( PARTITION p0 VALUES LESS THAN (TO_DATE('1-1-2008', 'DD-MM-YYYY'))
  );

The source table I'm using mirrors the target table, without the partitions, also added some basic data here for a demo - each batch of rows I have added will be processed in a chunk (imagine it was a lot of data in that partition)

CREATE TABLE src_sales
    ( prod_id        NUMBER(6)
    , cust_id        NUMBER
    , time_id        DATE
    , channel_id     CHAR(1)
    , promo_id       NUMBER(6)
    , quantity_sold  NUMBER(3)
    , amount_sold    NUMBER(10,2)
    );
begin
  for c in 1..1000000 loop
    insert into src_sales (prod_id,cust_id,time_id) values (1,1,'01-FEB-10');
    insert into src_sales (prod_id,cust_id,time_id) values (1,1,'01-MAR-10');
    insert into src_sales (prod_id,cust_id,time_id) values (1,1,'01-APR-10');
    insert into src_sales (prod_id,cust_id,time_id) values (1,1,'01-MAY-10');
    insert into src_sales (prod_id,cust_id,time_id) values (1,1,'01-JUN-10');
    insert into src_sales (prod_id,cust_id,time_id) values (1,1,'01-JUL-10');
    commit;
  end loop;
end;
/

To create the task and chunks for the execution we can use the DBMS_PARALLEL_EXECUTE APIs, in the call below we define the task with a name and in this case a SQL statement to identify the chunks (demo example, should be careful on performance here, commonly an indexed numeric field is used);

begin
  begin
    DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => 'TASK_NAME');
    exception when others then null;
  end;
  DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => 'TASK_NAME');
  DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(task_name => 'TASK_NAME',
    sql_stmt =>'select distinct to_number(to_char(time_id,''YYYYMM'')) startid, to_number(to_char(time_id,''YYYYMM'')) endid from src_sales', by_rowid => false);
end;

Then we have to define the meat of the task and the number of jobs to process the chunks, note I am using dynamic SQL since the partition key value cannot be a bind variable - and its value will change in each child task that the parallel execute engine executes (it itself will pass start_id and end_id as bind variables to this block).

begin
  DBMS_PARALLEL_EXECUTE.RUN_TASK (task_name => 'TASK_NAME',
    sql_stmt =>'declare
      s varchar2(16000); vstart_id number := :start_id; vend_id number:= :end_id;
      begin
        s:=''insert into SALES /*+ APPEND*/
           partition for (to_date(''||vstart_id||'', ''''YYYYMM''''))
           select PROD_ID, CUST_ID, TIME_ID, CHANNEL_ID, PROMO_ID, QUANTITY_SOLD, AMOUNT_SOLD
           from SRC_SALES
           where time_id between to_date(:vstart_id, ''''YYYYMM'''') and last_day(to_date(:vstart_id, ''''YYYYMM'''')) '';
        execute immediate s using vstart_id, vend_id;
        commit;
      end;',
    language_flag => DBMS_SQL.NATIVE, parallel_level => 2 );
end;
/

Whilst the above is running the parallel execution package spawns 2 jobs (since I indicated parallel level of 2). If I quickly look at the USER_PARALLEL_EXECUTE_CHUNKS view I see 6 chunks, since I had 6 distinct months of data. I can see below the first two chunks are in ASSIGNED status and are being processed.

image

Checking the view again I see 2 have are now in PROCESSED status, and 2 are ASSIGNED - note the start_id and end_id columns here, these are the bind variables passed to my PLSQL block in the RUN_TASK routine above, it is these I use for the PARTITION FOR key value.

image

Finally all chunks are processed and I have processed the data in parallel writing direct path into the partitioned target tables.

image

I mentioned earlier the resume capability, this is also very useful. There is another state PROCESSED_WITH_ERROR that will be flagged when the chunk being processed has failed for whatever reason. The RESUME_TASK procedures allow you to retry the task and reprocess just those chunks that failed,

Interesting stuff, combining some capabilities in 11gR2 of the database to boost processing and leverage those CPUs! Hopefully this sparks some other ideas out there. Next up I'll take this into the data integration tools and illustrate how it can be commoditized and avoid the programming.

Comments:

This is very neat... but Oracle will automatically parallelize a direct-path insert by simply doing an "ALTER SESSION ENABLE PARALLEL DML". Cheers, Stewart

Posted by Stewart Bryson on December 03, 2010 at 12:31 PM PST #

Hi Stewart Yep of course. The DBMS_PARALLEL_EXECUTE package documentation and posts are primarily slated for large table updates, I'm sure there are other cases where old fashioned divide and conquer are useful. The approach outlined could be useful IF you can efficiently deliver each data chunk to the parallel slaves without the database having to compute which slave needs to process each row - also lets you do other operations in parallel in other processes (but that's more just about naming the partition). Not the everyday case I guess, but they are ones I have heard over the past couple of years. Cheers David

Posted by David Allan on December 05, 2010 at 03:46 AM PST #

David - Nicely done. Your hint is missing the ever important "+" in the syntax. Your code: begin s:=''insert into SALES /*APPEND*/ ... Was this intended for the demo? Tim

Posted by Tim Trauernicht on December 15, 2010 at 12:34 AM PST #

Good spot, it was an editorial error on my part!

Posted by David Allan on December 15, 2010 at 12:50 AM PST #

I've updated the post, thanks.

Posted by David Allan on December 15, 2010 at 12:51 AM PST #

Hi,
I wanted to insert data into 4 different non-partitioned tables in parallel using dbms_parallel_execute. Is that possible?

create chunks by SQL: select distinct level_key, level_key from chunk_table;

level_key table_name
----------- ------------------
1 sales_level1
2 sales_level2
4 sales_level3
3 sales_level4

My run_task SQL should be as below

insert into table_name select /*+ parallel(stg, 8) */ from staging_table stg where level_key=:start_id;

Thanks

Posted by guest on November 02, 2011 at 05:17 AM PDT #

Hi

Yes you can do this, the start/end values have to be number, so you will have to lookup the table name inside your PLSQL block within the statement provided in the run_task call.

This block has your query to determine the chunks .....
begin
begin
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => 'TASK_NAME');
exception when others then null;
end;
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => 'TASK_NAME');
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(task_name => 'TASK_NAME',
sql_stmt =>'select distinct level_key, level_key from chunk_table', by_rowid => false);
end;
/

Then the next block will construct the and process the tasks......
begin
DBMS_PARALLEL_EXECUTE.RUN_TASK (task_name => 'TASK_NAME',
sql_stmt =>'declare
s varchar2(16000); vstart_id number := :start_id; vend_id number:= :end_id;
table_name varchar2(30);
begin
select table_name into table_name from chunk_table where level_key=vstart_id;
s:=''insert into ''||table_name||'' select /*+ PARALLEL(STG, 8) */ colx from STAGING_TABLE STG
where level_key =:vstart_id'';
execute immediate s using vstart_id;
commit;
end;',
language_flag => DBMS_SQL.NATIVE, parallel_level => 2 );
end;
/

Hope this is what you are looking for.
Cheers
David

Posted by David on November 02, 2011 at 06:08 AM PDT #

hi --- thanks for sharing this info. .
I am trying to implement the parallel data load by following the technique mentioned by you.. the thing is it is taking more time than the stored procedure, cursor approach (provided the code as well) and when i ran the parallel process, it is not inserting any data... Really wasn't sure if i am missing any thing.... can you please let me know if the approach is correct or not... based on the o/p the select statement,which o/p's the name of the table , need to run the SQL with dynamic SQL and insert data into a single table... provided below was the proc... can you please point me to a direction if i am doing the parallel execute the right way?

drop table table_ASERCARE purge;
drop table table_MEDCARE purge;
DROP TABLE TABLE_XYCARE PURGE;
DROP TABLE TABLE_TIME PURGE;
DROP TABLE TABLE_LOCATION PURGE;
drop table table_group purge;
drop table tablex purge;

-- select distinct TABLE_NAME from ALL_TAB_COLS where TABLE_NAME like 'EMP%';

create table table_asercare (time number(30), location_number number(5), value number(5),catg_id number(5));
insert into table_asercare values (20110111, 01, 55, 1200);
insert into table_asercare values (20110131, 01, 31, 1223);
insert into table_asercare values (20120131, 15, 24,1224);
insert into table_ASERCARE values (20130131, 03, 555,1200);

-- Truncate table table_MEDCARE
create table table_medcare (time number(30), location_number number(5), value number(5),catg_id number(5));
insert into table_medcare values (20110113, 01, 23, 1200);
insert into table_medcare values (20110128, 02, 78, 1223);
insert into table_medcare values (20110130, 03, 100, 1224);
insert into table_medcare values (20120111, 04, 57, 1200);
insert into table_medcare values (20120221, 05, 64, 1223);
insert into table_MEDCARE values (20130321, 15, 48, 1224);

create table table_xycare (time number(30), location_number number(5), value number(5),catg_id number(5));
insert into table_xycare values (20100113, 01, 99, 1200);
insert into table_xycare values (20110128, 02, 90, 1223);
insert into table_XYCARE values (20130128, 03, 24, 1224);

create table table_LOCATION ( LOCATION_NUMBER number(5), LOCATION_NAME varchar2(50));
insert into table_LOCATION values (01, 'atlanta1');
insert into table_LOCATION values (02, 'atlanta2');
insert into table_LOCATION values (03, 'atlanta3');
insert into table_LOCATION values (04, 'atlanta4');
insert into table_LOCATION values (05, 'atlanta5');
insert into table_location values (15, 'atlanta15');

create table table_category (catg_id number(5), catg_name varchar2(30));
insert into table_category values (1200, 'EMS');
insert into table_category values (1223, 'LJM');
insert into table_category values (1224, 'LIO');

create table table_TIME (YEAR_MONTH_DATE number(30), YEAR_VAL number(4), MONTH_VAL number(2),DATE_VAL number(2));
insert into table_TIME values (20110111, 2011, 01,11 );
insert into table_TIME values (20110131, 2011, 01,31);
insert into table_TIME values (20120131, 2012, 01,31);
insert into table_TIME values (20130131, 2013, 01,31);
insert into table_TIME values (20110128, 2011, 01,28 );
insert into table_TIME values (20110130, 2011, 01,30 );
insert into table_TIME values (20120111, 2012, 01,11 );
insert into table_TIME values (20120221, 2012, 02,21 );
insert into table_TIME values (20130321, 2013, 03,21 );
insert into table_TIME values (20100113, 2010, 01,13 );
insert into table_TIME values (20130128, 2013, 01,28 );

--Truncate table table_group
CREATE TABLE table_group (group_key number,table_name VARCHAR2(30), group_name VARCHAR2(30), catg_name varchar2(30));

insert into table_group values (1,'table_ASERCARE', 'GROUP_ONE','EMS');
insert into table_group values (2,'table_MEDCARE', 'GROUP_ONE','LJM');
INSERT INTO TABLE_GROUP VALUES (3,'table_XYCARE', 'GROUP_TWO','LIO');

create table TABLEX (YEAR_VAL number(4) ,LOCATION_NAME varchar2(50),tablename VARCHAR2(30), cnt number ); --> Proc data will be inserted into this...

--------------------
begin
begin
DBMS_PARALLEL_EXECUTE.DROP_TASK(task_name => 'TASK_NAME');
exception when others then null;
end;
DBMS_PARALLEL_EXECUTE.CREATE_TASK(task_name => 'TASK_NAME');
DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL(task_name => 'TASK_NAME', sql_stmt =>'select distinct group_key, group_key from table_group', by_rowid => false);
end;
/
--------

begin
DBMS_PARALLEL_EXECUTE.RUN_TASK (task_name => 'TASK_NAME',
sql_stmt =>'declare
s varchar2(16000); vstart_id number := :start_id; vend_id number:= :end_id;
table_name varchar2(30);
begin
select table_name into table_name from group_table where group_key=vstart_id;
s:=''INSERT INTO tablex (YEAR_VAL,LOCATION_NAME, tablename, cnt)
SELECT
t.YEAR_VAL,l.location_name, :table_name, count(*) as cnt
FROM ''||table_name||'' variable_table
,table_time t
, table_location l
,table_group g
,table_category ctg
WHERE t.year_month_date = variable_table.TIME
and variable_table.location_number = l.location_number
and ctg.catg_id = variable_table.catg_id
and ctg.catg_name = g.catg_name
and g.group_key =:vstart_id
GROUP BY t.YEAR_VAL,l.location_name,g.catg_name'';
execute immediate s using vstart_id;
commit;
end;',
language_flag => DBMS_SQL.NATIVE, parallel_level => 2 );
end;
/

-----------------------

***************************
Store Procedure*******************
CREATE OR REPLACE
PROCEDURE ABC(
GROUP_NAME_IN IN VARCHAR2 )
is
type c1 is ref cursor;
sql_stmt VARCHAR2(200);
v_sql VARCHAR2(30000);
c1_cv c1;
table_name_f VARCHAR2(30);
c1_rec TABLE_GROUP%rowtype;
BEGIN
SQL_STMT := 'SELECT * FROM TABLE_GROUP WHERE GROUP_NAME = :i';
OPEN c1_cv FOR SQL_STMT USING GROUP_NAME_IN ;
loop
fetch c1_cv INTO c1_rec;
exit when c1_cv%notfound;
-- forall i in c1_rec.FIRST ..c1_rec.last loop
table_name_f := c1_rec.table_name;
-- END LOOP;
EXECUTE immediate
'INSERT INTO tablex (YEAR_VAL,LOCATION_NAME, tablename, cnt)
SELECT
t.YEAR_VAL,l.location_name, :table_name, count(*) as cnt
FROM '
||table_name_f||
' variable_table
,table_time t
, table_LOCATION l
WHERE t.year_month_date = variable_table.TIME
and variable_table.location_number = l.location_number
GROUP BY t.YEAR_VAL,l.location_name' USING table_name_f;
--dbms_output.put_line ( 'The SQL is'|| v_sql);
COMMIT;
--dbms_output.put_line ( c1_rec.table_name||','||c1_rec.group_name );
--dbms_output.put_line ( 'The table name is '|| c1_rec.table_name );

end loop;
CLOSE c1_cv;
--null;
END ABC;

Posted by guest on April 28, 2013 at 12:17 PM PDT #

Check the table USER_PARALLEL_EXECUTE_CHUNKS for errors, the column ERROR_MESSAGE will help you plus the STATUS column will indicate state of the job - more than likely PROCESSED_WITH_ERROR.

Your SQL query selects from group_table and your table definition is called TABLE_GROUP... You also need to bind table_name when executing the SQL in the execute immediate. ie. execute immediate s using table_name,vstart_id;

As for the performance, you will need to try on a realistic workload and compare plans to see the suitable implementation configuration to use.
Cheers
David

Posted by David on April 29, 2013 at 08:09 AM PDT #

HI -- Thanks a lot... for the info. also regarding the performance... will keep you posted once i ran on huge data .. really appreciate sharing the new 11g concepts and your knowledge

Posted by guest on April 29, 2013 at 11:00 PM PDT #

Really a Nice article...parallel everything !!!

Posted by Tarun Agrawal on January 19, 2014 at 03:48 PM PST #

Post a Comment:
  • HTML Syntax: NOT allowed
About

ETL, CDC, Real-Time DI and Data Quality for the Oracle Database from the inside.

Search

Archives
« April 2014
SunMonTueWedThuFriSat
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today