Best practices, news, tips and tricks - learn about Oracle's R Technologies for Oracle Database and Big Data

  • August 2, 2012

Data Parallelism Using Oracle R Enterprise

Modern computer
processors are adequately optimized for many statistical calculations,
but large data operations may require hours or days to return a result. 
Oracle R Enterprise (ORE), a set of R packages designed to process large data computations in Oracle Database, can run many R operations in parallel, significantly
reducing processing time. ORE supports parallelism through the
transparency layer, where the database is used as a computational
engine, and embedded R execution, where R scripts can be executed in a
data parallel manner.

backbone of parallel computing is breaking down a resource intensive
computation into chunks that can be performed independently, while
maintaining a framework that allows for the results of those independent
computations to be combined. 
Writing parallel code is typically trickier than writing serial code,
but this is simplified using ORE, as there is no need for the user to
create worker instances or combine results. Using the transparency
layer, users simply execute their ORE code and the database implicitly
manages the entire process, returning results for further processing.

With ORE,
each R function invocation that operates on an ORE object, such as
ore.frame, is translated to a SQL statement behind the scenes. This SQL,
which may be stacked after several function invocations, undergoes
optimization and parallelization when parsed and executed. This
technique enables deferred evaluation, but that's a topic for
another blog. Depending on the resource requirements of the statement,
the database decides if it should leverage parallel execution. 

For embedded R
execution, database degree of parallelism settings help determine the
number of parallel R engines to start.  When data parallel functions
execute in parallel, each unit of work is sent to a different R external
process, or extproc, at the database server.
results are automatically collated and returned as R-proxy objects,
e.g., ore.frame objects, in the R interface and SQL objects in the SQL
interface, which can be processed further in R or by SQL functions.

The SQL functions enable the operationalizion or productization of R
scripts as part of a database-based application, in what we refer to as
"lights out" mode.

In the ORE Transparency Layer,
where the database executes SQL generated from overloaded R functions,
parallelism is automatic, assuming the database or table is configured
for parallelism. Parallel computations in the transparency layer are
ideal for bigger data where functionality exists in the database.

Using Embedded R Script Execution, parallelism is enabled for row, group and index operations if specified using a function parameter or parallel cursor hint:

  • ore.groupApply and rqGroupEval* split the data into grouped partitions and invoke the R function on each partition in a separate engine at the database server.
  • ore.rowApply and rqRowEval split the data into row chunks and invoke the R function on each chunk in a separate engine at the database server.
  • ore.indexApply runs an R function x times, with each iteration of the function invoked in separate engine at the database server.

With embedded R
execution, the expectation is that the database server machine has
greater RAM and CPU capacity than the user's client machine.  So
executing R scipts at the server will inherently allow larger data sets
to be processed by an individual R engine.

In addition, users can include contributed R packages in their embedded R scripts. Consider an example using a sample of the airline on-time performance data
from Research and Innovative Technology Administration (RITA), which
coordinates the U.S. Department of Transportation (DOT) research
programs. The data sample consists of 220K records of U.S. domestic
commercial flights between 1987 and 2008. 

We use the R
interface to embedded R to partition the airline data table (ONTIME_S)
by the DAYOFWEEK variable, fit a linear model using the
biglm package, and then combine the results. Note: To run this example, the biglm package must be installed on both the database server and client machine.

res <- ore.groupApply(ONTIME_S,
               INDEX = ONTIME_S$DAYOFWEEK,
               parallel = TRUE,
               function(dat) {
                ore.connect("rquser", "orcl", "localhost", "rquser")
                biglm(ARRDELAY ~ DEPDELAY + DISTANCE, dat)
R> summary(res$Monday)
Large data regression model: biglm(ARRDELAY ~ DEPDELAY + DISTANCE, dat)
Sample size =  31649
               Coef    (95%     CI)     SE     p
(Intercept)  0.5177  0.2295  0.8058 0.1441 3e-04
DEPDELAY     0.9242  0.9178  0.9305 0.0032 0e+00
DISTANCE    -0.0014 -0.0017 -0.0011 0.0002 0e+00

The call to
ore.groupApply uses Oracle Database to partition the ONTIME_S table by
the categories in the DAYOFWEEK variable.  Each category is sent to an R
engine at the database server machine to apply the R function in
parallel.  The individual category results are combined in the returned
result.  Using embedded R alleviates the typical memory problems
associated with running R serially because we are fitting only a single
partition, or day of the week, in memory of an R engine. Using a Linux
server with 8 GB RAM and 4 CPUs, fitting the model in parallel by
setting parallel = TRUE in the call to ore.groupApply, reduces the
processing time from approximately 30 seconds to 10 seconds.

If the goal is to
integrate the model results as an operationalized process, we can use
rqGroupEval, the SQL interface equivalent to ore.groupApply.  We create a
script to set up the structure of the input and grouping column and
then run the script in SQL. The nature of pipelined table functions
requires that we explicitly represent the type of the result, captured
in the package, and create a function that includes the column used for
partitioning explicitly.

# setup  
  3  END airlinePkg;
  4  /
Package created.
  inp_cur  airlinePkg.cur,
  par_cur  SYS_REFCURSOR,
  out_qry  VARCHAR2,
  grp_col  VARCHAR2,
  exp_txt  CLOB)
USING rqGroupEvalImpl;
# model build
alter table ONTIME_S parallel;
SQL> begin
 'function(dat) {
      ore.connect("rquser", "orcl", "localhost", "rquser")
      result <- biglm(ARRDELAY ~ DISTANCE + DEPDELAY, dat)
create table ONTIME_LM as
select *
  from table(ontimeGroupEval(
         cursor(select /*+ parallel(ONTIME_S)*/
         from ONTIME_S),
         NULL, NULL, 'DAYOFWEEK', 'GroupingExample'));

We use a
parallel hint on the cursor that is the input to our rqGroupEval
function to enable Oracle Database to use parallel R engines.  In this
case, using the same Linux server, the processing time is reduced from
approximately 25 seconds to 7 seconds as we used 7 parallel R engines
(one for each day of the week) across a single server.  Of course, a
real-world scenario may utilize hundreds of parallel engines across many
servers, returning results on large amounts of data in short period of

Additional details on how parallel execution works in Oracle database can be found here.
We encourage you download Oracle software for evaluation from the
Oracle Technology Network. See these links for R-related software: Oracle R Distribution, Oracle R Enterprise, ROracle, Oracle R Connector for Hadoop.  As always, we welcome comments and questions on the Oracle R Forum.

enable execution of an R script in the SQL interface, ORE provides
variants of ore.doEval, ore.groupApply and ore.indexApply in SQL. These
functions are rqEval, rqTableEval, rqRowEval and rqGroupEval.
ore.groupApply feature does not have a direct parallel in the SQL
interface. We refer to rqGroupApply as a concept, however, there is
specific code required to enable this feature. This is highlighted in
the second example.

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.