In-Database MapReduce (Map-Reduce)

The Map-Reduce model has become a popular way for programmers to describe and implement parallel programs. These custom map-reduce programs are often used to process a large data set in parallel. This post shows how to implement Map-Reduce Programs within the Oracle database using Parallel Pipelined Table Functions and parallel operations.

The Theory

Pipelined Table Functions were introduced in Oracle 9i as a way of embedding procedural logic within a data flow. At a logical level, a Table Function is a function that can appear in the FROM clause and thus functions as a table returning a stream of rows. Table Functions can also take a stream of rows as an input. Since Pipelined Table Functions are embedded in the data flow they allow data to be 'streamed' to a SQL statement avoiding intermediate materialization in most cases. Additionally, Pipelined Table Functions can be parallelized.

To parallelize a Table Function the programmer specifies a key to repartition the input data. Table Functions can be implemented natively in PL/SQL, Java, and C. You can find more information and examples about Table Functions and the functionality mentioned above at the following URL:

http://download.oracle.com/docs/cd/B10501_01/appdev.920/a96624/08_subs.htm#19677

Pipelined Table Functions have been used by customers for several releases and are a core part of Oracle's extensibility infrastructure. Both external users and Oracle Development have used Table Functions as an efficient and easy way of extending the database kernel.

Examples of table functions being used within Oracle are the implementation of a number of features in Oracle Spatial and Oracle Warehouse Builder. Oracle Spatial usages include spatial joins and several spatial data mining operations. Oracle Warehouse Builder allows end users to leverage Table Functions to parallelize procedural logic in data flows such as the Match-Merge algorithm and other row-by-row processing algorithms.

Step-by-Step Example

All examples are available in plain text in this file: omr.sql.

To illustrate the usage of parallelism, and Pipelined Table Functions to write a Map-Reduce algorithm inside the Oracle database, we describe how to implement the canonical map-reduce example: a word count. For those unfamiliar with the example, the goal of word count is to return all distinct words within a set of documents as well as a count of how often this word occurs within this set of documents.

The procedural code in this word count example is implemented in PL/SQL but, as said before, Oracle allows you to pick your language of choice to implement said procedural logic.

Step 1 - Setting up the Environment

We will be looking at a set of documents, these documents can be either files outside of the database, or they can be stored as Secure Files/CLOB columns within the database. Within this table our documents are stored, effectively reflecting a file system.

In this case we are going to create an table within the database using the following definition:

CREATE TABLE documents (a CLOB)
  LOB(a) STORE AS SECUREFILE(TABLESPACE sysaux);

Each row in this table corresponds to a single document. We populate this table with a very simple corpus resulting in 3 documents with the text shown here:

INSERT INTO documents VALUES ('abc def');
INSERT INTO documents VALUES ('def ghi');
INSERT INTO documents VALUES ('ghi jkl');
commit;

The end result of both the map function and the reduce table function are going to live in a package, keeping the code nice and tidy. To show the steps to be taken we will take snippets from the overall package and show those in the section to follow. The actual package will contain a set of types, which are required for the code to work. All code was tested on Oracle Database 11g (11.1.0.6).

Download the full code here.

The following figures show the package being deployed.

CreatePackageHeader

CreatePackageBody

Step 2 - Creating the Mapper and the Reducer

First we need to create a generic function to "map" (as in map-reduce) or tokenize a document. Note that the goal is not to show the best map function, but how this will work in principle in the database. This specific map function is very basic and better implementations may be found elsewhere. 

You can use the aggregation engine to get the results and only use the mapper. A query and a result would look like this:

FirstSelect

The aggregation is done in SQL, no reducer required.

Of course, you could write your own aggregation Table Function to count the occurrences of words in a document. That is what you would do if you were writing the map-reduce program without leveraging the Oracle aggregation engine as we did before. This aggregation Table Function is the reducer of the map-reduce program unit.

The Table Function specifies that it's input be partitioned by word and could (to use the Oracle execution engine's sort) ask for the data to ordered or clustered by word. We show a sample count program in this post to complete the example.

Step 3 - In-Database Map-Reduce

After you have completed both the mapper and the reducer you are ready to do the full map-reduce in the database. Running a query using this Table Function will give us a parallel workload on external documents, doing what the typical map-reduce programs do.

SecondSelect

Summary

Oracle Table Functions are a proven technology, used by many internal and external parties to extend Oracle Database 11g.

Oracle Table Functions are a robust scalable way to implement Map-Reduce within the Oracle database and leverage the scalability of the Oracle Parallel Execution framework. Using this in combination with SQL provides an efficient and simple mechanism for database developers to develop Map-Reduce functionality within the environment they understand and with the languages they know.

Download the code here: omr.sql. For the example, I ran this in OE (as you can see on the SQL screens). No special privileges required.

Comments:

Many people out there doing work with VLDB and large clusters of data may have heard of Google's research on MapReduce. JP, this is a nice little nifty article to bring some of it down to earth for people to play with. For the research minded, that want to see the real deal - here: http://labs.google.com/papers/mapreduce-osdi04.pdf Another great discussion on this topic is being held by Curt Monash on a webinar on Oct 15: http://www.asterdata.com/masteringmapreduce/ The webinar(s) are sponsored by Aster, but it gets people in the mode of understanding this important concept. -Greg

Posted by Greg Partenach on October 01, 2009 at 07:17 AM PDT #

Hi Greg, Thanks for the feedback... I guess with Openworld going on the other event might get less press... :-) Anyways, the main point of the post was to make sure people understand some of the basics around doing some of this in a relational database. JP

Posted by jean-pierre.dijcks on October 01, 2009 at 07:51 AM PDT #

Oracle likes to point out that Oracle can push down selects, projects, and even simple joins into the Exadata storage layer. Can table functions (i.e. "Map functions" from your article) be similarly pushed down into the Exadata storage layer. If not, is that on the road-map (it seems like a fairly easy and straightforward thing to do)?

Posted by Daniel Abadi on October 06, 2009 at 06:41 AM PDT #

MapReduce is mainly focuses on parallel processing, hence the use of the tablefunction. So a pushdown is not the focus of this post. In general, where appropriate we look at pushing down to the storage layer.

Posted by Jean-Pierre on October 07, 2009 at 05:23 AM PDT #

if I understand this correctly, what you are saying is that Oracle's implementation provides the same flexibility and benefits that other in-database MapReduce implementations provide (such as materializing a stream of data through a pipelined function) and the Map and Reduce functions are written within a package for modularity (in your example, map and reduce are in PL/SQL but there is no restriction that they have to be in PL/SQL).

Posted by amrith kumar on October 08, 2009 at 09:39 PM PDT #

Spot on Amrith!

Posted by jean-pierre.dijcks on October 09, 2009 at 12:30 AM PDT #

Daniel, Actually, Oracle doesn't push down selects to storage but portions (filtration, projection, join filtration, etc) of select statement processing. You'll notice Jean-Pierre's omr.sql has no WHERE clause in it. There is basically nothing to offload here. The I/O, on the other hand, will be serviced extremely fast with Exadata and the flow of data will be extremely efficient into the database due to the balanced nature of Exadata architecture. At the risk of speaking for Jean-Pierre, I'm not sure this is really an Exadata post per se. This code will work on Oracle Database on any platform.

Posted by Kevin Closson on October 21, 2009 at 12:28 AM PDT #

Correct Kevin, this was really intended to show the basics of using a parallel construct (table functions) to build a MapReduce proc in Oracle.

Posted by jean-pierre.dijcks on October 21, 2009 at 01:00 AM PDT #

Hi Jean-Pierre, Thanks for the post on the usage of Pipelined Table functions. I have myself been taken up by this feature, and put it to good practice whenever opportunity is provided to leverage the same. Rather a naive question, any pointers on How to you trace or confirm if the PARALLEL option is actually used by the SQL engine when executing your table function? Regards, Prince

Posted by Prince on October 21, 2009 at 06:59 PM PDT #

Hi Prince, The easiest ways are to look at the execution plan (explain plan or v$sql_plan). Another way would be to go after v$pq_sesstat and have a look there. That is of course mostly the high level stuff, so one of the guys here suggested to do something like this within the table function: select count (*) from v$px_process where sid = SYS_CONTEXT('USERENV','SESSIONID'); If it returns 1 it apparently runs within a slave, so should then be in parallel. Haven't tried this myself, but the source is reliable :-) JP

Posted by jean-pierre.dijcks on October 22, 2009 at 07:52 AM PDT #

do you think it is arguable that SQL with parallel query provides MapReduce functionality even without pipelined functions? If the functionality of that ... um ... function could be expressed in pure SQL, which it probably could with a little fiddling, then you'd still have the same parallelism etc..

Posted by David aldridge on November 04, 2009 at 06:46 PM PST #

Hi David, Yes you could. MapReduce in the end is a programming construct that allows you to leverage parallel processing in a certain form. SQL will allow for massive parallel processing as well. It is all a matter of looking beyond hype and finding a solution you are comfortable with. BTW, we used a table function to show how you can do procedural things in parallel so more complex - not as easily expressed in SQL - operations can be solved as well. Thanks, JP

Posted by jean-pierre.dijcks on November 05, 2009 at 12:45 AM PST #

Nice post.But y doesnt mapreduce support indexing?

Posted by swapnil on January 25, 2010 at 02:44 AM PST #

Post a Comment:
Comments are closed for this entry.
About

The data warehouse insider is written by the Oracle product management team and sheds lights on all thing data warehousing and big data.

Search

Archives
« April 2014
SunMonTueWedThuFriSat
  
2
4
5
6
7
8
9
10
11
12
13
14
16
18
19
20
21
23
24
25
26
27
28
29
30
   
       
Today