Sunday Jan 05, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 3

In the first two parts of Invoking R scripts via Oracle Database: Theme and Variation, we introduced features of Oracle R Enterprise embedded R execution, focusing on the functions ore.doEval / rqEval and ore.tableApply / rqTableEval. In this blog post, we’ll cover the next in our theme and variation series involving ore.groupApply and the corresponding definitions required for SQL execution. The “group apply” function is one of the parallel-enabled embedded R execution functions. It supports data-parallel execution, where one or more R engines perform the same R function, or task, on different partitions of data. This functionality is essential to enable the building of potentially 10s or 100s of thousands of predictive models, e.g., one per customer, and for taking advantage of high-performance computing hardware like Exadata.

Oracle Database handles the management and control of potentially multiple R engines at the database server machine, automatically partitioning and passing data to parallel executing R engines. It ensures that all R function executions for all partitions complete, or the ORE function returns an error. The result from the execution of each user-defined embedded R function is gathered in an ore.list. This list remains in the database until the user requires the result.

The variation on embedded R execution for ore.groupApply involves passing not only an ore.frame to the function such that the first parameter of your embedded R function receives a data.frame, but also an INDEX argument that specifies the name of a column by which the rows will be partitioned for processing by a user-defined R function.

Let’s look at an example. We’re going to use the C50 package to build a C5.0 decision tree model on the churn data set from C50. The goal is to build one churn model on the data for each state.


library(C50)
data(churn)

ore.create(churnTrain, "CHURN_TRAIN")

modList <- ore.groupApply(
  CHURN_TRAIN,
  INDEX=CHURN_TRAIN$state,
    function(dat) {
      library(C50)
      dat$state <- NULL
      dat$churn <- as.factor(dat$churn)
      dat$area_code <- as.factor(dat$area_code)
      dat$international_plan <- as.factor(dat$international_plan)
      dat$voice_mail_plan <- as.factor(dat$voice_mail_plan)
      C5.0(churn ~ ., data = dat, rules = TRUE)
    });
mod.MA <- ore.pull(modList$MA)
summary(mod.MA)

A few points to highlight:
• As noted in Part 2 of this series, to use the CRAN package C50 on the client, we first load the library, and then the churn data set.
• Since the data is a data.frame, we’ll create a table in the database with this data. Notice that if you compare the results of str(churnTrain) with str(CHURN_TRAIN), you will see that the factor columns have been retained. This becomes relevant later.
• The function ore.groupApply will return a list of models stored as ore.object instances. The first argument is the ore.frame CHURN_TRAIN and the second argument indicates to partition the data on column state such that the user-defined function is invoked on each partition of the data.
• The next argument specifies the function, which could alternatively have been the function name if the FUN.NAME argument were used and the function saved explicitly in the R script repository. The function’s first argument (whatever its name) will receive one partition of data, e.g., all data associated with a single state.
• Regarding the user-defined function body, we explicitly load the package we’re using, C50, so the function body has access to it. Recall that this user-defined R function will execute at the database server in a separate R engine from the client.
• Since we don’t need to know which state we’re working with and we don’t want this included in the model, we delete the column from the data.frame.
• Although the ore.frame defined functions, when they are loaded to the user-defined embedded R function, factors appear as character vectors. As a result, we need to convert them back to factors explicitly.
• The model is built and returned from the function.
• The result from ore.groupApply is a list containing the results from the execution of the user-defined function on each partition of the data. In this case, it will be one C5.0 model per state.
• To view the model, we first use ore.pull to retrieve it from the database and then invoke summary on it. The class of mod.MA is “C5.0”.

SQL API

We can invoke the function through the SQL API by storing the function in the R script repository. Previously we showed doing this using the SQL API, however, we can also do this using the R API , but we’re going to modify the function to store the resulting models in an ORE datastore by state name:


ore.scriptCreate("myC5.0Function",
  function(dat,datastorePrefix) {
    library(C50)
    datastoreName <- paste(datastorePrefix,dat[1,"state"],sep="_")
    dat$state <- NULL
    dat$churn <- as.factor(dat$churn)
    dat$area_code <- as.factor(dat$area_code)
    dat$international_plan <- as.factor(dat$international_plan)
    dat$voice_mail_plan <- as.factor(dat$voice_mail_plan)
    mod <- C5.0(churn ~ ., data = dat, rules = TRUE)
    ore.save(mod, name=datastoreName)
    TRUE
  })

Just for comparison, we could invoke this from the R API as follows:


res <- ore.groupApply( CHURN_TRAIN, INDEX=CHURN_TRAIN$state,
          FUN.NAME="myC5.0Function",
          datastorePrefix="myC5.0model", ore.connect=TRUE)
res
res <- ore.pull(res)
all(as.logical(res) == TRUE)

Since we’re using a datastore, we need to connect to the database setting ore.connect to TRUE. We also pass the datastorePrefix. The result res is an ore.list of logical values. To test if all are TRUE, we first pull the result and use the R all function.

Back to the SQL API…Now that we can refer to the function in the SQL API, we invoke the function that places one model per datastore, each with the given prefix and state.


select *
from table(churnGroupEval(
  cursor(select * from CHURN_TRAIN),
  cursor(select 1 as "ore.connect",' myC5.0model2' as "datastorePrefix" from dual),
  'XML', 'state', 'myC5.0Function'));

There’s one thing missing, however. We don’t have the function churnGroupEval. There is no generic “rqGroupEval” in the API – we need to define our own table function that matches the data provided. Due to this and the parallel nature of the implementation, we need to create a PL/SQL FUNCTION and supporting PACKAGE:


CREATE OR REPLACE PACKAGE churnPkg AS
  TYPE cur IS REF CURSOR RETURN CHURN_TRAIN%ROWTYPE;
END churnPkg;
/
CREATE OR REPLACE FUNCTION churnGroupEval(
  inp_cur churnPkg.cur,
  par_cur SYS_REFCURSOR,
  out_qry VARCHAR2,
  grp_col VARCHAR2,
  exp_txt CLOB)
RETURN SYS.AnyDataSet
PIPELINED PARALLEL_ENABLE (PARTITION inp_cur BY HASH ("state"))
CLUSTER inp_cur BY ("state")
USING rqGroupEvalImpl;
/

The highlights in red indicate the specific parameters that need to be changed to create this function for any particular data set. There are other variants, but this will get you quite far.

To validate that our datastores were created, we invoke ore.datastore(). This returns the datastores present and we will see 51 such entries – one for each state and the District of Columbia.

Parallelism

Above, we mentioned that “group apply” supports data parallelism. By default, parallelism is turned off. To enable parallelism, the parameter to ore.groupApply needs to be set to TRUE.


ore.groupApply( CHURN_TRAIN, INDEX=CHURN_TRAIN$state,
          FUN.NAME="myC5.0Function",
          datastorePrefix="myC5.0model",
          ore.connect=TRUE,
          parallel=TRUE
)

In the case of the SQL API, the parallel hint can be provided with the input cursor. This indicates that a degree of parallelism up to 4 should be enabled.


select *
from table(churnGroupEval(
  cursor(select * /*+ parallel(t,4) */ from CHURN_TRAIN t),
  cursor(select 1 as "ore.connect",' myC5.0model2' as "datastorePrefix" from dual),
  'XML', 'state', 'myC5.0Function'));
Map Reduce

The “group apply” functionality can be thought of in terms of the map-reduce paradigm where the mapper performs the partitioning by outputting the INDEX value as key and the data.frame as value. Then, each reducer receives the rows associated with one key. In our example above, INDEX was the column state and so each reducer would receive rows associated with a single state.

Memory and performance considerations

While the data is partitioned by the INDEX column, it is still possible that a given partition is quite large, such that either the partition of data will not fit in the R engine memory or the user-defined embedded R function will not be able to execute to completion. The usual remedial measures can be taken regarding setting memory limits – as noted in Part 2.

If the partitions are not balanced, you would have to configure the system’s memory for the largest partition. This will also have implications for performance, obviously, since smaller partitions of data will likely complete faster than larger ones.

The blog post Managing Memory Limits and Configuring Exadata for Embedded R Execution discusses how to instrument your code to understand the memory usage of your R function. This is done in the context of ore.indexApply (to be discussed later in this blog series), but the approach is analogous for “group apply.”

Wednesday Oct 02, 2013

Managing Memory Limits and Configuring Exadata for Embedded R Execution

An R engine can consume significant memory resources in the course of running R scripts. R users who work with Oracle R Enterprise Embedded R Execution on sizable data, especially application designers and database administrators (DBAs), have a vested interest in understanding and controlling the memory demands of R script execution to help ensure that sufficient memory resources are available for both their application and Oracle Database. ORE Embedded R Execution enables running R scripts managed by Oracle Database, both through R and SQL APIs. The SQL API enables seamless integration with database-based applications, data-parallel and task-parallel R script execution, and ease of production deployment.

To provide greater control over R memory consumption, Oracle R Enterprise provides a privileged SQL function for configuring a database server with R memory limits. In this blog post, we provide a discussion of R memory usage and garbage collection, and how this SQL function can be used to limit the amount of memory consumed by individual R engines started as part of ORE’s embedded R execution framework. We follow with an example of involving memory limit calculations on Exadata and some recommendations for DBAs to consider when configuring Exadata for embedded R execution. Note that such calculations and configuration settings are applicable to non-Exadata (single instance or custom RAC) environments as well. At the end, there a “tip of the day” for R memory management.

Garbage Collection as a concept

For those familiar with languages like C, memory is explicitly managed by the programmer through invocations of functions to allocate and free memory (malloc, calloc, free). Failing to free memory when finished with it results in “memory leaks” that can cause a process to consume (or exhaust) memory unnecessarily, often resulting in a program or system crash.

To alleviate programmers from this burden, languages like R and Java rely on garbage collection. “Garbage” is memory that is no longer being used, i.e., no longer referenceable, within your program. With garbage collection, programmers avoid dealing with memory management. The underlying system determines what memory is used or available, and frees memory periodically. Garbage collection, however, is not a panacea. Garbage collection can take time to process, e.g., on the order of seconds, which can make response time for certain functions unpredictable – although modern garbage collection mechanisms have largely mitigated this drawback. In addition, when garbage collection occurs is essentially non-deterministic, depending on heuristics set up by the language implementation. This means that memory may be retained longer than necessary.

Memory in R

In R, memory can be characterized along two dimensions: memory allocated for vectors and arrays (referred to as Vcells), and memory allocated for objects such as lists (referred to as “cons” cells or Ncells). When invoking R’s garbage collection function, gc(), you’ll see results like these:

The function gc() returns a matrix with rows Ncells, which corresponds to the cons cells, and Vcells, which corresponds to vector heap memory. The Ncells are 56 bytes/cell (49.2*1024*1024/.920477) on a 64-bit machine, and Vcells are ~8 bytes/cell (22.6*1024*1024/2.956944). The “used” column indicates the number of cells allocated, along with their corresponding megabytes. The column “gc trigger” indicates at what point garbage collection will kick in. The column “max used” indicates the maximum space used since the last call to gc(reset=TRUE) or since R started if gc(reset=TRUE) wasn’t invoked.

As an example of affecting Ncells, consider the following example where we initialize a list as a sequence of 100K numbers. We see that roughly 5.4 MB of RAM were consumed for the 100K cells.

For Vcells, we create a vector of 1M elements. This consumes roughly 3.8 MB of RAM for the 1M cells. R optimizes for integers.

The same test with floats consumes 7.6 MB of RAM for the 1M cells of floats.

How does R’s garbage collector use VSize and NSize?

We’ll discuss VSize, as NSize is analogous. The garbage collector recovers memory that is no longer in use, determining when to perform garbage collection and how much memory to recover. Looking at heap memory for Vcells, as depicted in the figure below, there are a few key points: Min_VSize, VSizeInUse, R VSize, and Max_VSize. The R VSize serves as the gc() trigger. The Min_VSize and Max_VSize are the specified lower and upper memory limits. Min_VSize is the minimal size for the vector heap as well as its initial value. From there, R grows or shrinks the vector heap depending on memory demands. However, it doesn’t exceed the Max_VSize limit nor go below the Min_VSize limit. In the figure, VSizeInUse reflects the memory currently used by R objects. R_VSize is how much memory can be requested without triggering gc(). As you would expect: Min_VSize <= R_VSize <= Max_VSize and VSizeInUse < R_VSize.


Limiting memory on the database server R engine

Oracle R Enterprise provides the SQL function sys.rqconfigset to set memory limits. Use of this function requires the sys privilege and the setting is applied only to embedded R engines. Consider the following examples:

sys.rqconfigset('MIN_VSIZE', '10M') -- min heap 10MB, default 32MB
sys.rqconfigset('MAX_VSIZE', '100M') -- max heap 100MB, default 4GB
sys.rqconfigset('MIN_NSIZE', '500K') -- min number cons cells 500x1024, default 1M
sys.rqconfigset('MAX_NSIZE', '2M') -- max number cons cells 2M, default 20M

Note that either numeric or string values can be provided to sys.rqconfigset. Default constants are defined as follows:


#define RQET_DEF_MINVSZ 33554432 /* RQER DEFault MIN_VSiZe 32Mb */
#define RQET_DEF_MAXVSZ 4294967296 /* RQER DEFault MAX_VSiZe 4Gb */
#define RQET_DEF_MINNSZ 1048576 /* RQER DEFault MIN_NSiZe 1M */
#define RQET_DEF_MAXNSZ 20971520 /* RQER DEFault MAX_NSiZe 20M */

Getting memory settings and usage through an embedded R engine

To obtain the current set of default values, you can invoke the following SQL statement when connected to the database server using the table sys.rq_config.

select name, value from sys.rq_config;

This can be done from within an embedded R function invocation using, for example:

getMemorySettings <- function() {
con <- dbConnect(Extproc())
rs <- dbSendQuery(con, "select name, value from sys.rq_config")
dat <- fetch(rs)
dat
}
ore.doEval(getMemorySettings,ore.connect=TRUE)

To obtain the current memory usage within an individual embedded R engine, instrumenting your embedded R function with gc() and returning the results of gc() will provide this insight:

getMemoryUse <- function() {
gc.dat <- gc()
list(pid=Sys.getpid(), gc.dat=gc.dat)
}
ore.doEval(getMemoryUse)

Note that the result from an embedded R call will also include the memory limits set for the R engines, shown below in column 6 “limit (Mb)” of the result. This occurs whenever memory limits are in place for an R engine. Here, we’ve included getting the process id of the R engine.

An example of computing memory limits

Consider you have an Exadata X2-2 that has 1152 GB RAM (~1.2 TB) and your DBA allocates you a maximum of 60 GB RAM for parallel R engines per Exadata node. If we set the degree of parallelism at 32, to enable 32 R engines to execute concurrently, this allows 1.875 GB RAM / R engine. If we allocate 2/3 of this for Vcells, we would allocate ~1.25 GB for the MAX_VSIZE. The remaining 1/3, or 625 MB, would translate into 11.6M cells for MAX_NSIZE.

60 GB allocated to R engines per Exadata node

DOP=32

60GB / 32 R engines = 1.875 GB / R Engine

~2/3 for Vcells = 1.25 GB; 1.25 GB / 8 Bytes/Cell = 156.25M Cells

~1/3 for Ncells = 625 MB; 625 MB / 54 Bytes/Cell = 11.6M Cells

sys.rqconfigset('MAX_VSIZE', '1250M')
sys.rqconfigset('MAX_NSIZE', '11600K')

While this example focuses on parallel execution, such as for ore.groupApply, ore.rowApply, and ore.indexApply (or SQL rqRowEval and “rqGroupEval”), the same type of analysis applies to non-parallel embedded R functions, like ore.doEval and ore.tableApply (or SQL rqEval and rqTableEval).

Consider an example that builds a randomForest model using the ore.doEval function. We can compute the amount of RAM consumed by the function by invoking gc() at the beginning and end of the function and subtracting the max used “(Mb)” columns as depicted here:

The result is that 1.4 MB were consumed for Ncells and 11.1 MB for Vcells. This can similarly be done for ore.indexApply to see the amount of RAM consumed by each embedded R function execution and to sum up the actual usage for each of the embedded R engines (assuming they run fully concurrently).

Generating such numbers on real data gives users a sense of how much memory embedded R jobs may require.

For DBAs

When configuring a database on Exadata for parallel R engines, consider the following options. In the following scenario, we contrast the scenarios when the execution time of any one given embedded R function is fast, e.g., 10s of seconds, and there are many such executions, versus few parallel R engines where the execution time is long with fewer such executions. Note that these must be considered in context of other Exadata uses:

· Set parallel_degree_policy to MANUAL. This allows ORE to choose when to apply parallelism, as opposed to setting it to AUTO which allows Oracle Database to decide.

· Set parallel_min_servers to the number of parallel slave processes to be started when the database instances start, e.g., 64, which is the number of parallel slave processes per Exadata node. This avoids incurring the time required to start these processes as needed to service R engines, and is particularly important when individual embedded R function execution time is short, e.g., 10s of seconds. If embedded R function execution time is long, the percentage of time for starting up the parallel slave will not dominate the overall execution time.

· Set parallel_max_servers to the maximum number of parallel slave processes that should be allowed per Exadata node, e.g., 128. This ensures that no more than parallel_max_servers will be active at one time, and in turn corresponds to the maximum number of R engines that can be active at one time.

· To avoid overloading the CPUs if the parallel_max_servers limit is reached, set the hidden parameter _parallel_statement_queuing to TRUE. This parameter is turned off by setting parallel_degree_policy to MANUAL. The _parallel_statement_queuing parameter allows for queuing of parallel requests when they exceed the parallel_server_target, which should be set to a value between parallel_min_servers and parallel_max_servers, e.g., 96. Once the parallel_server_target is reached, an embedded R execution will be allowed to execute in parallel using the remaining available parallel servers. If none are available, parallel requests will be queued. This ensures that parallel requests will be run in parallel, as opposed to being forced to serial execution, and be able to take advantage of parallel slaves as they become available. This can dramatically improve overall embedded R execution completion time. Note that parallel_max_servers cannot be changed during database operation, but the parallel_server_target can be to tune Exadata performance. Note that queuing effectively takes no CPU resources.

· To minimize RAC or cluster overhead for fast-executing individual embedded R functions, set parallel_force_local to TRUE to keep all parallel servers allocated and running on the same database server node. With this setting, starting an embedded R execution with DOP 32, all 32 R engines will run on the same Exadata node. A new embedded R execution also with DOP 32 may be started on a different node. If the embedded R functions are long-running, the setup time is propotionately small so spreading the IO over multiple nodes will not adversely impact overall performance. Having parallel slaves span multiple Exadata nodes results in communication / handshaking across nodes, which requires more resources. If the embedded R functions are fast, this overhead can adversely impact overall performance. When all parallel slaves are local, fewer resources are used.

· Where applicable, set application tables and their indexes to DOP 1 to reinforce the ability of ORE to determine when to use parallelism and not be overridden by table or index settings of DEFAULT or a specific degree of parallelism.

R memory management tip

There are many optimizations to make more efficient use of memory. To end this point, here is a tip to reduce memory consumptions significantly and avoid unnecessary replication of data.

If you know the size of your result in advance, pre-allocate the memory required, whether a vector, list, or matrix, as opposed to building up the result incrementally such as using cbind for adding columns to a matrix or data.frame. For example:

num.rows <- 1000
num.cols <- 2000
myFunction <- function(col) {col:(num.rows+col-1)} # produces vector of values
myMatrix <- matrix(NA, num.rows, num.cols) # pre-allocate required memory

for(col in 1:num.cols) {
myMatrix[,col] <- myFunction(col)
}

A note of thanks to Qin Wang and Martin Farber for their input on this blog post.

Friday Feb 03, 2012

What is R?

For many in the Oracle community, the addition of R through Oracle R Enterprise could leave them wondering "What is R?"

R has been receiving a lot of attention recently, although it’s been around for over 15 years. R is an open-source language and environment for statistical computing and data visualization, supporting data manipulation and transformations, as well as sophisticated graphical displays. It's being taught in colleges and universities in courses on statistics and advanced analytics - even replacing more traditional statistical software tools. Corporate data analysts and statisticians often know R and use it in their daily work, either writing their own R functionality, or leveraging the more than 3400 open source packages. The Comprehensive R Archive Network (CRAN) open source packages support a wide range of statistical and data analysis capabilities. They also focus on analytics specific to individual fields, such as bioinformatics, finance, econometrics, medical image analysis, and others (see CRAN Task Views).

So why do statisticians and data analysts use R?

Well, R is a statistics language similar to SAS or SPSS. It’s a powerful, extensible environment, and as noted above, it has a wide range of statistics and data visualization capabilities. It’s easy to install and use, and it’s free – downloadable from the CRAN R project website.

In contrast, statisticians and data analysts typically don’t know SQL and are not familiar with database tasks. R provides statisticians and data analysts access a wide range of analytical capabilities in a natural statistical language, allowing them to remain highly productive. For example, writing R functions is simple and can be done quickly. Functions can be made to return R objects that can be easily passed to and manipulated by other R functions. By comparison, traditional statistical tools can make the implementation of functions cumbersome, such that programmers resort to macro-oriented programming constructs instead.

So why do we need anything else?

R was conceived as a single user tool that is not multi-threaded.  The client and server components are bundled together as a single executable, much like Excel.

R is limited by the memory and processing power of the machine where it runs, but in addition, being single threaded, it cannot automatically leverage the CPU capacity on a user’s multi-processor laptop without special packages and programming.

However, there is another issue that limits R’s scalability…

R’s approach to passing data between function invocations results in data duplication – this chews up memory faster. So inherently, R is not good for big data, or depending on the machine and tasks, even gigabyte-sized data sets.

This is where Oracle R Enterprise comes in. As we'll continue to discuss in this blog, Oracle R Enterprise lifts this memory and computational constraint found in R today by executing requested R calculations on data in the database, using the database itself as the computational engine. Oracle R Enterprise allows users to further leverage Oracle's engineered systems, like Exadata, Big Data Appliance, and Exalytics, for enterprise-wide analytics, as well as reporting tools like Oracle Business Intelligence Enterprise Edition dashboards and BI Publisher documents.





About

The place for best practices, tips, and tricks for applying Oracle R Enterprise, Oracle R Distribution, ROracle, and Oracle R Advanced Analytics for Hadoop in both traditional and Big Data environments.

Search

Archives
« April 2014
SunMonTueWedThuFriSat
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today