MapReduce + Oracle = Tablefunctions

One of the things we did do at Openworld was a handson session showing how to implement a mapreduce system on top of a generic Oracle Database. This is based on the original posts on this blog, which shows a nice implementation of tablefunctions and mappers etc.

But then we thought, why not just go through the tablefunction code and kind of map this to a mapreduce paradigm to show everyone which constructs exist within Oracle and how you can create a data processing / analysis pipeline with Oracle... So here is some of the code we used for exactly that at Openworld.

It kind of goes like this, first we discuss the header and highlight the interesting pieces with code annotations, then we discuss the body and the actual (simple) mapper and reducer code. The comments are hopefully making this self-explanatory stuff...

Scenario

We are doing fairly simple here. Create a simple table that has a few records and loop over it. The reducer is than doing an aggregation. That setup is as follows:

CREATE TABLE sls (salesman VARCHAR2(30), quantity number)
/

INSERT INTO sls VALUES('Tom', 100);
INSERT INTO sls VALUES('Chu', 200);
INSERT INTO sls VALUES('Tom', 300);
INSERT INTO sls VALUES('Mike', 100);
INSERT INTO sls VALUES('Scott', 300);
INSERT INTO sls VALUES('Tom', 250);
INSERT INTO sls VALUES('Scott', 100);

commit;
/

Header

create or replace package oracle_map_reduce
is

-- The types we define here is similar to the input files
-- and output files that are used in MR code and are used to
-- store data while we run the actual package.

-- The big advantage is that we do not need to write to disk for
-- intermediate results.

    type sales_t is table of sls%rowtype;
    type sale_cur_t is ref cursor return sls%rowtype;
    type sale_rec_t is record (name varchar2(30), total number);
    type total_sales_t is table of sale_rec_t;

-- Next we define the funtions that do the work and make them known
-- to the outside world

-- Note that both mapper and reducer are tablefunctions!

-- Both mapper and reducer are pipelined and executable in parallel
-- the parallel degree is driven from the database side and is not
-- scheduled by the actual program

    function mapper(inp_cur in sys_refcursor) return sales_t
    pipelined parallel_enable (partition inp_cur by any);

-- the pipelined keyword tells the caller that this function acts as
-- a row source
--
-- parallel_enable indicates that this function can be executed in parallel
-- by the parallel query framework.

    function reducer(in_cur in sale_cur_t) return total_sales_t
    pipelined parallel_enable (partition in_cur by hash(salesman))

-- Finally we can cluster the results so that similar rows are chunked
-- together when used (note this does not drive distribution over the
-- parallel slaves, which is done by the partition clause shown in the mapper
-- and reducers)

    cluster in_cur by (salesman);

end;
/

-- The body of the package has the mapper and the reducer code
-- The header as is shown here by itself defines the signature of
-- the package and declares types and variables to be used in the
-- package.

Body

create or replace package oracle_map_reduce
is
    type sales_t is table of sls%rowtype;
    type sale_cur_t is ref cursor return sls%rowtype;
    type sale_rec_t is record (name varchar2(30), total number);
    type total_sales_t is table of sale_rec_t;

    function mapper(inp_cur in sys_refcursor) return sales_t
    pipelined parallel_enable (partition inp_cur by any);

    function reducer(in_cur in sale_cur_t) return total_sales_t
    pipelined parallel_enable (partition in_cur by hash(salesman))

    cluster in_cur by (salesman);

end;
/

-- The upper part is the header the following part if the body
-- Note the difference in the create statement below as compared
-- to the header

create or replace package body oracle_map_reduce
is

    function mapper(inp_cur in sys_refcursor) return sales_t
    pipelined parallel_enable (partition inp_cur by any)
    is
        sales_rec sls%ROWTYPE;
        -- construct a record to hold an entire row from the SLS table
        begin
            -- First loop over all records in the table
            loop
                fetch inp_cur into sales_rec;
                exit when inp_cur%notfound;
                -- Place the found records from SLS into the variable
                -- end the loop when there are no more rows to loop over
                pipe row (sales_rec);
                -- by using pipe row here we are giving back rows in streaming
                -- fashion as you would expect from a table
                -- this in combination with pipelined in the definition allows
                -- the pipelining (e.g. giving data as it comes on board) of
                -- a table function
            end loop;
            return;

-- Return is a mandatory piece that allows the consumer of data (our reducer
-- in this case)
-- to ensure all data has been sent. After return the rowsource is exhausted
-- and no more data comes from this function.

        end mapper;

-- The above mapper does in effect nothing other than streaming data
-- partitioned

-- over to the next step. In MR the stream would be written to a file and then -- redistributed to the reducers

-- The reducer below computes and emits the sales figures
 

    function reducer(in_cur in sale_cur_t) return total_sales_t
    pipelined parallel_enable (partition in_cur by hash(salesman))
    -- The partition by clause indicates that all instances of a particular
    -- salesman must be sent to one instances of the reducer function

    cluster in_cur by (salesman)

    -- The cluster by clause tells the parallel query framework to cluster
    -- all instances of a particular salesman together.

        IS

        sale_rec sls%ROWTYPE;
        total_sale_rec sale_rec_t;

        -- two containers are created, one as input the other as output

        begin

            total_sale_rec.total := 0;
            total_sale_rec.name := NULL;

            -- reset the values to initial values
            loop
                fetch in_cur into sale_rec;
                exit when in_cur%notfound;

           -- some if then logic to ensure we pipe a row once all is processed

                if (total_sale_rec.name is null) then
           -- The first instance is arriving, set the salesman value to that
           -- input value
           -- update 0 plus the incoming value for total

                    total_sale_rec.name := sale_rec.salesman;
                    total_sale_rec.total := total_sale_rec.total + 
                    sale_rec.quantity;

                elsif ( total_sale_rec.name <> sale_rec.salesman) then

                -- We now switch sales man, and are done with the first
                -- salesman (as rows are partitioned and clustered)
                -- First pipe out the result of the previous salesman we
                -- processed
                -- then update the information to work on this new salesman
                    pipe row (total_sale_rec);
                    total_sale_rec.name := sale_rec.salesman;
                    total_sale_rec.total := sale_rec.quantity;

                else

                -- We get here when we work on the same salesman and just add
                -- the totals, the move on to the next record

                    total_sale_rec.total := total_sale_rec.total +
                    sale_rec.quantity;

                end if;
            end loop;

            -- The next piece of code ensures that any remaining rows that
            -- have not been piped out
            -- are piped out to the consumer. If there is a single salesman,
            -- he is only piped out
            -- in this piece of logic as we (in the above example code) only
            -- pipe out upon a change
            -- of salesman

            if total_sale_rec.total<> 0 then
                pipe row (total_sale_rec);
            end if;

            return;

            -- Again, we are now done and have piped all rows to our consumer

        end reducer;
    end;
    /

Using this in a SQL Query

It took me a bit, but once you see the select you start to see how you can build a set of pre-defined programs that you can then string together to achieve a set of results.

select *
from table(oracle_map_reduce.reducer(cursor(
          select * from table(oracle_map_reduce.mapper(cursor(
                 select * from sls))) map_result)));

All of the logic pipelines data to the next consumer and all of it is done in parallel. This makes it ideal for anything like heavy duty in-database ETL (which is what we invented it for in the first place) and anything that requires a lot of logic to be applied to records (like analytical processing).

Comments:

Post a Comment:
Comments are closed for this entry.
About

The data warehouse insider is written by the Oracle product management team and sheds lights on all thing data warehousing and big data.

Search

Archives
« April 2014
SunMonTueWedThuFriSat
  
2
4
5
6
7
8
9
10
11
12
13
14
16
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today