Monday Jan 20, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 5


In the first four parts of Invoking R scripts via Oracle Database: Theme and Variation, we introduced features of Oracle R Enterprise embedded R execution involving the functions ore.doEval / rqEval, ore.tableApply / rqTableEval, ore.groupApply / “rqGroupApply”, and ore.rowApply / rqRowEval. In this blog post, we cover ore.indexApply. Note that there is no corresponding rqIndexEval – more on that later. The “index apply” function is also one of the parallel-enabled embedded R execution functions. It supports task-parallel execution, where one or more R engines perform the same or different calculations, or task. A number, associated with the index of the execution, is provided to the function. Any required data is expected to be explicitly generated or loaded within the function.

This functionality is valuable in a variety of settings, e.g., simulations, for taking advantage of high-performance computing hardware like Exadata.

As for “group apply” and “row apply”, Oracle Database handles the management and control of potentially multiple R engines at the database server machine, with only the index passed to the function as the first argument. Oracle Database ensures that each R function execution completes, otherwise the ORE function returns an error. Output formats as supported by the other embedded R functions are possible for ore.indexApply, for example, returning an ore.list or combining data.frame data into an ore.frame.

The variation on embedded R execution for ore.indexApply involves passing as an argument the number of times the user-defined R function should be executed.

Let’s look at a simple example.

The following code specifies to execute the function five times in parallel.


res <- ore.indexApply(5,
      function(index) {
        paste("IndexApply:",index)
      },
    parallel=TRUE)
class(res)
res

Notice that the class of the result is an ore.list, and when we print res, we have 5 character vectors, each with the index that was passed to the user-defined function. As with other parallel embedded R functions, the number of concurrently executing R engines can be limited by specifying the degree of parallelism of the database. As we’ll see in ORE 1.4, the parallel argument can specify a preferred number of parallel R engines, as an upper bound.


> class(res)
[1] "ore.list"
attr(,"package")
[1] "OREbase"
> res
$`1`
[1] "IndexApply: 1"

$`2`
[1] "IndexApply: 2"

$`3`
[1] "IndexApply: 3"

$`4`
[1] "IndexApply: 4"

$`5`
[1] "IndexApply: 5"

Column-parallel use case

If we wanted to parallelize R’s summary function, we could compute the summary statistics on each column in parallel and combine them into a final result. The following example does exactly that. While we could generalize this example, we focus on the iris data set and computing summary statistics on the first four numeric columns. Since iris comes standard with R, there’s no need to load data from any other source, we simply access it. The first argument to ore.indexApply is 4, the number of columns we wish to summarize in parallel. The function takes one argument, index, which will be a value between 1 and 4, and will be used to select the column to summarize. We massage the result of summary into a data.frame and add the column name to the result. Note that the function returns a single row: the summary statistics for the column.


res <- NULL
res <- ore.indexApply(4,
      function(index) {
        ss <- summary(iris[,index])
        attr.names <- attr(ss,"names")
        stats <- data.frame(matrix(ss,1,length(ss)))
        names(stats) <- attr.names
        stats$col <- names(iris)[index]
        stats
      },
      parallel=TRUE)
res

The result comes back as an ore.list object:


> res
$`1`
Min. 1st Qu. Median Mean 3rd Qu. Max. col
1 4.3 5.1 5.8 5.843 6.4 7.9 Sepal.Length

$`2`
Min. 1st Qu. Median Mean 3rd Qu. Max. col
1 2 2.8 3 3.057 3.3 4.4 Sepal.Width

$`3`
Min. 1st Qu. Median Mean 3rd Qu. Max. col
1 1 1.6 4.35 3.758 5.1 6.9 Petal.Length

$`4`
Min. 1st Qu. Median Mean 3rd Qu. Max. col
1 0.1 0.3 1.3 1.199 1.8 2.5 Petal.Width

This is good, but it would be better if the result was returned as an ore.frame, especially since all the columns are the same. To enable this, we’ll do a slight variation on the result by specifying FUN.VALUE with the structure of the result defined.


res <- ore.indexApply(4,
      function(index) {
        ss <- summary(iris[,index])
        attr.names <- attr(ss,"names")
        stats <- data.frame(matrix(ss,1,length(ss)))
        names(stats) <- attr.names
        stats$col <- names(iris)[index]
        stats
      },
      FUN.VALUE=data.frame(Min.=numeric(0),
        "1st Qu."=numeric(0),
        Median=numeric(0),
        Mean=numeric(0),
        "3rd Qu."=numeric(0),
        Max.=numeric(0),
        col=character(0)),
      parallel=TRUE)
res

Now, the result comes back as an ore.frame.


> res
  Min. X1st.Qu. Median  Mean X3rd.Qu. Max.      col
1 0.1 0.3 1.30 1.199 1.8 2.5 Petal.Width
2 1.0 1.6 4.35 3.758 5.1 6.9 Petal.Length
3 4.3 5.1 5.80 5.843 6.4 7.9 Sepal.Length
4 2.0 2.8 3.00 3.057 3.3 4.4 Sepal.Width
Simulation use case

The ore.indexApply function can be used in simulations as well. In this next example we take multiple samples from a random normal distribution with the goal to compare the distribution of the summary statistics. For this, we build upon the example above. We provide parameters such as the sample size, mean and standard deviation of the random numbers, and the number of simulations we want to perform. Each one of these simulations will occur in a separate R engine, in parallel, up to the degree of parallelism allowed by the database.

We specify num.simulations as the first parameter to ore.indexApply. Inside the user-defined function, we pass the index and three arguments to the function. The function then sets the random seed based on the index. This allows each invocation to generate a different set of random numbers. Using rnorm, the function produces sample.size random normal values. We invoke summary on the vector of random numbers, and then prepare a data.frame result to be returned. We’re using the FUN.VALUE to get an ore.frame as the final result.


res <- NULL
sample.size = 1000
mean.val = 100
std.dev.val = 10
num.simulations = 1000

res <- ore.indexApply(num.simulations,
      function(index, sample.size=1000, mean=0, std.dev=1) {
        set.seed(index)
        x <- rnorm(sample.size, mean, std.dev)
        ss <- summary(x)
        attr.names <- attr(ss,"names")
        stats <- data.frame(matrix(ss,1,length(ss)))
        names(stats) <- attr.names
        stats$index <- index
        stats
      },
      FUN.VALUE=data.frame(Min.=numeric(0),
        "1st Qu."=numeric(0),
        Median=numeric(0),
        Mean=numeric(0),
        "3rd Qu."=numeric(0),
        Max.=numeric(0),
        index=numeric(0)),
      parallel=TRUE,
      sample.size=sample.size,
      mean=mean.val, std.dev=std.dev.val)
res
boxplot(ore.pull(res[,1:6]),
  main=sprintf("Boxplot of %d rnorm samples size %d, mean=%d, sd=%d",
        num.simulations, sample.size, mean.val, std.dev.val))

To get the distribution of samples, we invoke boxplot on the data.frame after pulling the result to the client.

Here are a couple of plots showing results for different parameters:


In both cases, we run 10,000 samples. The first graph uses a sample size of 10 and the second uses a sample size of 1000. From these results, it is clear that a larger sample size significantly reduces the variance in each of the summary statistics - confirming our Statistics 101 understanding.

Error reporting

As introduced above, Oracle Database ensures that each embedded R user-defined function execution completes, otherwise the ORE function returns an error. Of course, any side-effects of the user-defined function need to be manually cleaned up. Operations that produce files, create tables in the database, or result in completed database transactions through ROracle will remain intact. The ORE embedded R infrastructure will report errors as produced by the function as illustrated in the following example.

The code specifies to invoke 4 parallel R engines. If the index has value 3, attempt to load the non-existant package "abc123" (which produces an error), otherwise return the index value.


R> ore.indexApply(4,
+ function(index) {
+ if (index==3) {library(abc123)}
+ else {return(index)}
+ }
+ )
Error in .oci.GetQuery(conn, statement, data = data, prefetch = prefetch, :
ORA-12801: error signaled in parallel query server P000
ORA-20000: RQuery error
Error in library(abc123) : there is no package called 'abc123'
ORA-06512: at "RQSYS.RQGROUPEVALIMPL", line 121
ORA-06512: at "RQSYS.RQGROUPEVALIMPL", line 118

Notice that the first reported error is an ORE-12801: error signaled in parallel query server. Then the ORA-20000: RQuery error indicates the error as returned by the R engine. Also interesting to note is that the ORA-06512 errors reveal the underlying implementation of ore.indexApply "RQSYS.RQGROUPEVALIMPL". Which leads us to the next topic.

No rqIndexEval?

“Index apply” is really a variation of “group apply” where the INDEX column is a numeric vector that is pushed to the database. With n distinct numbers, one number is provided to each function as its index. As a result, there is no corresponding rqIndexEval in the SQL API. The user would have to create a similar package and function as was illustrated in the blog post on “group apply.”

Thursday Jan 09, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 4

In the first three parts of Invoking R scripts via Oracle Database: Theme and Variation, we introduced features of Oracle R Enterprise embedded R execution involving the functions ore.doEval / rqEval, ore.tableApply / rqTableEval, and ore.groupApply / “rqGroupApply”. In this blog post, we’ll cover the next in our theme and variation series involving ore.rowApply and rqRowEval. The “row apply” function is also one of the parallel-enabled embedded R execution functions. It supports data-parallel execution, where one or more R engines perform the same R function, or task, on disjoint chunks of data. This functionality is essential to enable scalable model scoring/predictions on large data sets and for taking advantage of high-performance computing hardware like Exadata.

As for ore.groupApply, Oracle Database handles the management and control of potentially multiple R engines at the database server machine, automatically chunking and passing data to parallel executing R engines. Oracle Database ensures that R function executions for all chunks of rows complete, or the ORE function returns an error. The result from the execution of each user-defined embedded R function is gathered in an ore.list. This list remains in the database until the user requires the result. However, we’ll also show how data.frame results from each execution can be combined into a single ore.frame. This features works for return values of other embedded R functions as well.

The variation on embedded R execution for ore.rowApply involves passing not only an ore.frame to the function such that the first parameter of your embedded R function receives a data.frame, but also the number of rows that should be passed to each invocation of the user-defined R function. The last chunk, of course, may have fewer rows than specified.

Let’s look at an example. We’re going to use the C50 package to score churn data (i.e., predict which customers are likely to churn) using the C5.0 decision tree models we built in the previous blog post with ore.groupApply. (Well, almost. We need to rebuild the models to take into account the full data set levels.) The goal is to score the customers in parallel leveraging the power of a high performance computing platform, such as Exadata.


library(C50)
data(churn)

ore.create(churnTest, "CHURN_TEST")

myFunction <- function(dat, xlevels, datastorePrefix) {
  library(C50)
  state <- dat[1,"state"]
  datastoreName <- paste(datastorePrefix,state,sep="_")
  dat$state <- NULL
  for (j in names(xlevels))
    dat[[j]] <- factor(dat[[j]], levels = xlevels[[j]])
  ore.load(name=datastoreName)
  res <- data.frame(pred=predict(mod,dat, type="class"),
        actual=dat$churn,
        state=state)
  res
}

xlevels <- ore.getXlevels(~ ., CHURN_TEST[,-1])
scoreList <- ore.groupApply(
  CHURN_TEST,
  INDEX=CHURN_TEST$state,
  myFunction,
  datastorePrefix="myC5.0model3",xlevels=xlevels, ore.connect=TRUE)
score.MA <- ore.pull(scoreList$MA)
table(score.MA$actual, score.MA$pred)

A few points to highlight:

• Instead of computing the levels using the as.factor function inside the user-defined function, we’ll use ore.getXlevels, which returns the levels for each factor column. We don’t need this for the state column, so we exclude it (“-1”). In the previous post we noted that factor data is passed as character columns in the data.frame. Computing the levels first can ensure that all possible levels are provided during model building, even if there are no rows with some of the level values.
• When building models where some levels were missing (due to using as.factor on each partition of data), scoring can fail if the test data has unknown level values. For this reason, the models built in Part 3 need to be rebuilt using the approach above with ore.getXlevels. This is left as an exercise for the reader.
• Assign the function to the variable “myFunction” to facilitate reuse (see below).
• We construct the datastore name to be the same as when we were building the models, i.e., appending the state value to the datastore prefix separated by an ‘_’.
• The for loop iterates over the levels passed in as xlevels, creating a factor using the provided levels and assigning it back to the data.frame.
• Loading the datastore by name, we have access to the variable mod, which contains the model for the particular state.
• The result is constructed as a data.frame with the prediction and the actual values.
• Three arguments are passed: the datastore prefix, the levels that were pre-computed, and that we need to connect to the database because we’re using a datastore.
• The results are stored as a list of ore.frames. We can pull the scores for MA and compute a confusion matrix using table.

This is fine. However, we likely don’t want to have a list of separate ore.frames as the result. We’d prefer to have a single ore.frame with all the results. This can be accomplished using the FUN.VALUE argument. Whenever a data.frame is the result of the user-defined R function, and if the structure of that data.frame is the same across all invocations of the group apply or row apply, you can combine them into a single result by defining the structure as follows:

scores <- ore.groupApply(
  CHURN_TEST,
  INDEX=CHURN_TEST$state,
  myFunction,
  datastorePrefix="myC5.0model3",xlevels=xlevels, ore.connect=TRUE,
  FUN.VALUE=data.frame(pred=character(0),
        actual=character(0),
        state=character(0)));
head(scores)
scores.local <- ore.pull(scores)
table(scores.local[scores.local$state=="MA",c("actual","pred")])

scores.MA <- scores[scores$state=="MA",c("actual","pred")]
table(scores.MA$actual, scores.MA$pred)

A few important points to highlight:

• FUN.VALUE is set to a data.frame that describes the format of the result. By providing this argument, you will get back a single ore.frame, not an ore.list object.
• The group apply completes instantaneously because it is only defining the ore.frame, not actually performing the scoring. Not until the values are needed does the result get computed. We invoke head on the ore.frame in scores to highlight this.
• We can pull the scores to the client to invoke table as before, but subselecting for state MA. However, we can also do this computation in the database using the transparency layer. First, we filter the rows for MA in scores.MA, and then invoke table on the two columns. Note: ORE requires passing the two columns explicitly to the overloaded function table.
• To do this in parallel, add the argument parallel=TRUE to the ore.groupApply call.

Wait! What happened to ore.rowApply?

Above, we showed how to score with multiple models using ore.groupApply. But what if we had customers from a single state that we wanted to score in parallel? We can use ore.rowApply and rqRowEval to invoke a function on chunks of data (rows) at a time, from 1 to the total number of rows. (Note that values closer to the latter will have no benefit from parallelism, obviously.)


scores <- ore.rowApply(
  CHURN_TEST[CHURN_TEST$state=="MA",],
  myFunction,
  datastorePrefix="myC5.0model3",xlevels=xlevels,
  ore.connect=TRUE, parallel=TRUE,
  FUN.VALUE=data.frame(pred=character(0),
        actual=character(0),
        state=character(0)),
  rows=200)
scores
table(scores$actual, scores$pred)

A few points to highlight:

• Since we want to perform the scoring in parallel by state, we filter the rows for MA. This will ensure that all rows processed can use the same predictive model.
• We set the rows argument to 200. CHURN_TEST has 1667 rows, so this will result in nine executions of myFunction. The first eight receiving 200 rows each and the last receiving 67 rows.
• We also set parallel=TRUE above since we want the scoring performed in parallel.
• The invocation of ore.rowApply returns immediately. Not until we print scores do we incur the cost of executing the underlying query. However, also note that each time we access scores, for example in the following call to table, we incur the cost of executing the query. If the result will be used many times in subsequent operations, you may want to create a table with the result using ore.create.

In SQL, we can do the same, but we’ll need to store the function in the R script repository (perhaps called "myScoringFunction") and also store xlevels in a datastore (perhaps called "myXLevels"). While we can pass complex objects in the R interface to embedded R functions, we cannot do that in SQL. Instead, we must pass the name of a datastore. Since the xlevels are in a datastore, the user-defined R function needs to be modified to take this other datastore name and load that datastore to have access to xlevels. This set of changes is left to the reader as an exercise.


select * from table(rqRowEval(
  cursor(select /*+ parallel(t, 4) */ *
        from CHURN_TEST t
        where "state" = 'MA'),
  cursor(select 1 as "ore.connect",
        'myC5.0model3' as "datastorePrefix",
        'myXLevels' as "xlevelsDatastore"
        from dual),
  'select ''aaa'' "pred",''aaa'' "actual" , ''aa'' "state" from dual',
    200, 'myScoringFunction'));

A few points to highlight:

• The input cursor specifies a parallel hint on the input data cursor and filtering data for MA as well.
• Several arguments are being passed, including the new argument to our function myXLevels.
• The output form is specified in the SQL string. Care must be taken to ensure that the column names, ordering, and the length of character strings match the returned data.frame.

Map Reduce

The “row apply” functionality can be thought of in terms of the map-reduce paradigm where the mapper performs the scoring and outputs a data.frame value (no key required). There is no reducer, or the reducer is simply a pass-through.

Memory and performance considerations

Unlike with group apply, the rows argument in row apply ensures an upper bound on the number of rows (and hence memory requirement). The value of rows should be chosen to balance memory and parallel performance. The usual measures can be taken regarding setting memory limits on the R engine – as noted in Part 2.

There may be instances where setting rows = 1 makes sense. For example, if the computation per row is intensive (i.e., takes a long time), sending one row per R engine may be appropriate. Experiment with a range of values for rows to determine the best value for your particular scenario.

Sunday Jan 05, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 3

In the first two parts of Invoking R scripts via Oracle Database: Theme and Variation, we introduced features of Oracle R Enterprise embedded R execution, focusing on the functions ore.doEval / rqEval and ore.tableApply / rqTableEval. In this blog post, we’ll cover the next in our theme and variation series involving ore.groupApply and the corresponding definitions required for SQL execution. The “group apply” function is one of the parallel-enabled embedded R execution functions. It supports data-parallel execution, where one or more R engines perform the same R function, or task, on different partitions of data. This functionality is essential to enable the building of potentially 10s or 100s of thousands of predictive models, e.g., one per customer, and for taking advantage of high-performance computing hardware like Exadata.

Oracle Database handles the management and control of potentially multiple R engines at the database server machine, automatically partitioning and passing data to parallel executing R engines. It ensures that all R function executions for all partitions complete, or the ORE function returns an error. The result from the execution of each user-defined embedded R function is gathered in an ore.list. This list remains in the database until the user requires the result.

The variation on embedded R execution for ore.groupApply involves passing not only an ore.frame to the function such that the first parameter of your embedded R function receives a data.frame, but also an INDEX argument that specifies the name of a column by which the rows will be partitioned for processing by a user-defined R function.

Let’s look at an example. We’re going to use the C50 package to build a C5.0 decision tree model on the churn data set from C50. The goal is to build one churn model on the data for each state.


library(C50)
data(churn)

ore.create(churnTrain, "CHURN_TRAIN")

modList <- ore.groupApply(
  CHURN_TRAIN,
  INDEX=CHURN_TRAIN$state,
    function(dat) {
      library(C50)
      dat$state <- NULL
      dat$churn <- as.factor(dat$churn)
      dat$area_code <- as.factor(dat$area_code)
      dat$international_plan <- as.factor(dat$international_plan)
      dat$voice_mail_plan <- as.factor(dat$voice_mail_plan)
      C5.0(churn ~ ., data = dat, rules = TRUE)
    });
mod.MA <- ore.pull(modList$MA)
summary(mod.MA)

A few points to highlight:
• As noted in Part 2 of this series, to use the CRAN package C50 on the client, we first load the library, and then the churn data set.
• Since the data is a data.frame, we’ll create a table in the database with this data. Notice that if you compare the results of str(churnTrain) with str(CHURN_TRAIN), you will see that the factor columns have been retained. This becomes relevant later.
• The function ore.groupApply will return a list of models stored as ore.object instances. The first argument is the ore.frame CHURN_TRAIN and the second argument indicates to partition the data on column state such that the user-defined function is invoked on each partition of the data.
• The next argument specifies the function, which could alternatively have been the function name if the FUN.NAME argument were used and the function saved explicitly in the R script repository. The function’s first argument (whatever its name) will receive one partition of data, e.g., all data associated with a single state.
• Regarding the user-defined function body, we explicitly load the package we’re using, C50, so the function body has access to it. Recall that this user-defined R function will execute at the database server in a separate R engine from the client.
• Since we don’t need to know which state we’re working with and we don’t want this included in the model, we delete the column from the data.frame.
• Although the ore.frame defined functions, when they are loaded to the user-defined embedded R function, factors appear as character vectors. As a result, we need to convert them back to factors explicitly.
• The model is built and returned from the function.
• The result from ore.groupApply is a list containing the results from the execution of the user-defined function on each partition of the data. In this case, it will be one C5.0 model per state.
• To view the model, we first use ore.pull to retrieve it from the database and then invoke summary on it. The class of mod.MA is “C5.0”.

SQL API

We can invoke the function through the SQL API by storing the function in the R script repository. Previously we showed doing this using the SQL API, however, we can also do this using the R API , but we’re going to modify the function to store the resulting models in an ORE datastore by state name:


ore.scriptCreate("myC5.0Function",
  function(dat,datastorePrefix) {
    library(C50)
    datastoreName <- paste(datastorePrefix,dat[1,"state"],sep="_")
    dat$state <- NULL
    dat$churn <- as.factor(dat$churn)
    dat$area_code <- as.factor(dat$area_code)
    dat$international_plan <- as.factor(dat$international_plan)
    dat$voice_mail_plan <- as.factor(dat$voice_mail_plan)
    mod <- C5.0(churn ~ ., data = dat, rules = TRUE)
    ore.save(mod, name=datastoreName)
    TRUE
  })

Just for comparison, we could invoke this from the R API as follows:


res <- ore.groupApply( CHURN_TRAIN, INDEX=CHURN_TRAIN$state,
          FUN.NAME="myC5.0Function",
          datastorePrefix="myC5.0model", ore.connect=TRUE)
res
res <- ore.pull(res)
all(as.logical(res) == TRUE)

Since we’re using a datastore, we need to connect to the database setting ore.connect to TRUE. We also pass the datastorePrefix. The result res is an ore.list of logical values. To test if all are TRUE, we first pull the result and use the R all function.

Back to the SQL API…Now that we can refer to the function in the SQL API, we invoke the function that places one model per datastore, each with the given prefix and state.


select *
from table(churnGroupEval(
  cursor(select * from CHURN_TRAIN),
  cursor(select 1 as "ore.connect",' myC5.0model2' as "datastorePrefix" from dual),
  'XML', 'state', 'myC5.0Function'));

There’s one thing missing, however. We don’t have the function churnGroupEval. There is no generic “rqGroupEval” in the API – we need to define our own table function that matches the data provided. Due to this and the parallel nature of the implementation, we need to create a PL/SQL FUNCTION and supporting PACKAGE:


CREATE OR REPLACE PACKAGE churnPkg AS
  TYPE cur IS REF CURSOR RETURN CHURN_TRAIN%ROWTYPE;
END churnPkg;
/
CREATE OR REPLACE FUNCTION churnGroupEval(
  inp_cur churnPkg.cur,
  par_cur SYS_REFCURSOR,
  out_qry VARCHAR2,
  grp_col VARCHAR2,
  exp_txt CLOB)
RETURN SYS.AnyDataSet
PIPELINED PARALLEL_ENABLE (PARTITION inp_cur BY HASH ("state"))
CLUSTER inp_cur BY ("state")
USING rqGroupEvalImpl;
/

The highlights in red indicate the specific parameters that need to be changed to create this function for any particular data set. There are other variants, but this will get you quite far.

To validate that our datastores were created, we invoke ore.datastore(). This returns the datastores present and we will see 51 such entries – one for each state and the District of Columbia.

Parallelism

Above, we mentioned that “group apply” supports data parallelism. By default, parallelism is turned off. To enable parallelism, the parameter to ore.groupApply needs to be set to TRUE.


ore.groupApply( CHURN_TRAIN, INDEX=CHURN_TRAIN$state,
          FUN.NAME="myC5.0Function",
          datastorePrefix="myC5.0model",
          ore.connect=TRUE,
          parallel=TRUE
)

In the case of the SQL API, the parallel hint can be provided with the input cursor. This indicates that a degree of parallelism up to 4 should be enabled.


select *
from table(churnGroupEval(
  cursor(select * /*+ parallel(t,4) */ from CHURN_TRAIN t),
  cursor(select 1 as "ore.connect",' myC5.0model2' as "datastorePrefix" from dual),
  'XML', 'state', 'myC5.0Function'));
Map Reduce

The “group apply” functionality can be thought of in terms of the map-reduce paradigm where the mapper performs the partitioning by outputting the INDEX value as key and the data.frame as value. Then, each reducer receives the rows associated with one key. In our example above, INDEX was the column state and so each reducer would receive rows associated with a single state.

Memory and performance considerations

While the data is partitioned by the INDEX column, it is still possible that a given partition is quite large, such that either the partition of data will not fit in the R engine memory or the user-defined embedded R function will not be able to execute to completion. The usual remedial measures can be taken regarding setting memory limits – as noted in Part 2.

If the partitions are not balanced, you would have to configure the system’s memory for the largest partition. This will also have implications for performance, obviously, since smaller partitions of data will likely complete faster than larger ones.

The blog post Managing Memory Limits and Configuring Exadata for Embedded R Execution discusses how to instrument your code to understand the memory usage of your R function. This is done in the context of ore.indexApply (to be discussed later in this blog series), but the approach is analogous for “group apply.”

Friday Jan 03, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 2

In part 1 of Invoking R scripts via Oracle Database: Theme and Variation, we introduced features of Oracle R Enterprise embedded R execution, focusing on the functions ore.doEval and rqEval. In this blog post, we’ll cover the next in our theme and variation series involving ore.tableApply and rqTableEval.

The variation on embedded R execution for ore.tableApply involves passing an ore.frame to the function such that the first parameter of your embedded R function receives a data.frame. The rqTableEval function in SQL allows users to specify a data cursor to be delivered to your embedded R function as a data.frame.

Let’s look at a few examples.


R API

In the following example, we’re using ore.tableApply to build a Naïve Bayes model on the iris data set. Naïve Bayes is found in the e1071 package, which must be installed on both the client and database server machine R engines.

library(e1071)
mod <- ore.tableApply(
ore.push(iris),
function(dat) {
library(e1071)
dat$Species <- as.factor(dat$Species)
naiveBayes(Species ~ ., dat)
})
class(mod)
mod

A few points to highlight:
• To use the CRAN package e1071 on the client, we first load the library.
• The iris data set is pushed to the database to create an ore.frame as the first argument to ore.tableApply. This would normally refer to an ore.frame that refers to a table that exists in Oracle Database. If not obvious, note that we could have previously assigned dat <- ore.push(iris) and passed dat as the argument as well.
• The embedded R function is supplied as the second argument to ore.tableApply as a function object. Recall from Part 1 that we could have alternatively assigned this function to a variable and passed the variable as an argument, or stored the function in the R script repository and passed the argument FUN.NAME with the assigned function name.
• The user-defined embedded R function takes dat as its first argument which will contain a data.frame derived from the ore.frame supplied.
• The model itself is returned from the function.
• The result of the ore.tableApply execution will be an ore.object.

SQL API

We can invoke the function through the SQL API by storing the function in the R script repository. Recall that the call to sys.rqScriptCreate must be wrapped in a BEGIN-END PL/SQL block.


begin
sys.rqScriptCreate('myNaiveBayesModel',
'function(dat) {
library(e1071)
dat$Species <- as.factor(dat$Species)
naiveBayes(Species ~ ., dat)
}');
end;
/

Invoking the function myNaiveBayesModel occurs in a SQL SELECT statement as shown below. The first argument to rqTableEval specifies a cursor that retrieves the IRIS table. Note that the IRIS table could have been created earlier using ore.create(iris,"IRIS"). The second argument, NULL, indicates that no arguments are supplied to the function.

The function returns an R object of type naiveBayes, but as a serialized object that is chunked into a table. This likely is not useful to most users.


select *
from table(rqTableEval(cursor(select * from IRIS), NULL, NULL, 'myNaiveBayesModel'));

If we want to keep the model in a more usable form, we can store it in an ORE datastore in Oracle Database. For this, we require a change to the user-defined R function and the SQL invocation.


begin
sys.rqScriptCreate('myNaiveBayesModel',
'function(dat) {
library(e1071)
dat$Species <- as.factor(dat$Species)
mod <- naiveBayes(Species ~ ., dat)
ore.save(mod, name="myNaiveBayesDatastore")
TRUE

}');
end;
/
select *
from table(rqTableEval(cursor(select * from IRIS), cursor(select 1 as "ore.connect" from dual), 'XML', 'myNaiveBayesModel'));

Highlighted in red, we’ve stored the model in the datastore named ‘myNaiveBayesDatastore’. We’ve also returned TRUE to have a simple value that can show up as the result of the function execution. In the SQL query, we changed the third parameter to ‘XML’ to return an XML string containing “TRUE”. The name of the datastore could be passed as an argument as follows:


begin
sys.rqScriptCreate('myNaiveBayesModel',
'function(dat, datastoreName) {
library(e1071)
dat$Species <- as.factor(dat$Species)
mod <- naiveBayes(Species ~ ., dat)
ore.save(mod, name=datastoreName)
TRUE
}');
end;
/
select *
from table(rqTableEval(
cursor(select * from IRIS),
cursor(select 'myNaiveBayesDatastore' "datastoreName", 1 as "ore.connect" from dual),
'XML',
'myNaiveBayesModel'));

Memory considerations with ore.tableApply and rqTableEval

The input data provided as the first argument to a user-defined R function invoked using ore.tableApply or rqTableEval is physically being moved from Oracle Database to the database server R engine. It’s important to realize that R’s memory limitations still apply. If your database server machine has 32 GB RAM and your data table is 64 GB, ORE will not be able to load the data into the R function’s dat argument.
You may see errors like:


Error : vector memory exhausted (limit reached)

or

ORA-28579: network error during callback from external procedure agent

See the blog post on Managing Memory Limits and Configuring Exadata for Embedded R Execution where we discuss setting memory limits for the database server R engine. This can be necessary to load reasonably sized data tables.

Parallelism

As with ore.doEval / rqEval, user-defined R functions invoked using ore.tableApply / rqTableEval are not executed in parallel, i.e., a single R engine is used to execute the user-defined R function.

Invoking certain ORE advanced analytics functions

In the current ORE release, some advanced analytics functions, like ore.lm or ore.glm, which use the embedded R execution framework, cannot be used within other embedded R calls such as ore.doEval / rqEval and ore.tableApply / rqTableEval.

You can expect to see an error like the following:


ORA-28580: recursive external procedures are not supported

In the next post in this series, I’ll discuss ore.groupApply and the corresponding definitions required for SQL execution, since there is no rqGroupApply function. I’ll also cover the relationship of various “group apply” constructs to map-reduce paradigm.

ORAAH - Enabling high performance R workloads on Hadoop


One of the features of Oracle R Advanced Analytics for Hadoop (ORAAH) is enabling Hadoop jobs written in the R language. R is a popular open-source language and environment for statistical computing and graphics. ORAAH enables R programmers to leverage a Hadoop cluster operating on data resident in HDFS files.

In this blog post, we examine the performance characteristics of ORAAH with an example and explain what makes ORAAH the fastest alternative available to run Hadoop-R jobs. We also compare the results with another popular Hadoop interface for R, rmr.

Credit to Vlad Sharanhovich and Anand Srinivasan for providing the content for this blog post.

In probability theory and statistics, covariance is a measure of how much two variables change together.

Variables that tend to show similar behavior exhibit positive covariance. Alternatively if the greater values of one variable correspond with the smaller values of another then the covariance between the variables is negative. We use covariance computation as the running example below that you can use to reproduce the results detailed here.

The tests were performed on a 6-node cluster running ORAAH version 2.3.1.


Cluster configuration:


  • 6 node cluster

  • BDA v2.3.1 (based on CDH 4.4)

  • 4 tasktrackers

  • CPU: Intel ® Xeon® CPU X5675  @ 3.07GHz

  • RAM: 47GB


ORAAH comes with a convenience function for data generation. We use this function to generate a 100 GB HDFS input file with numeric values for 200 variables (columns), as follows:


# Generate 100GB input dataset (using pre-release 2.4.0)
inputCsv <- orch.datagen(1e+11, numeric.col.count=200, parts=100)
# 15 mins, 53 sec
# 200 mappers

ORAAH supports 2 types of HDFS input: delimited text files and a binary RDATA representation (R's own binary representation). In many cases, RDATA representation provides much better I/O throughput compared to delimited text files.


# Converting into ORAAH native format
inputRdata <- hdfs.toRData(inputCsv, out.name="100G_200n_rd")
# 4 mins, 54 sec
# 400 mappers

Next, we write the mapper and reducer code for parallel/distributed covariance computation. Below, the mapper function accepts a data.frame representation of the input data and generates structured output with 3 components: a matrix, a vector of column sums, and input row count. The single reducer function merges the structured output generated from the mappers to produce the final covariance matrix.

The mapper and reducer functions are supplied as input to ORAAH's hadoop.run() function. This function additionally takes an HDFS file as input in the dataargument. The structured output from the mapper and reducer is defined in mapred.config data structure. Optionally, the Hadoop job can be given a name (in this case "cov") for traceability.

ORCH_cov
<- function(x) {
hadoop.run(
data = x,
mapper = function(k, v) {
m <- as.matrix(v)
cs <- colSums(m)
nr <- nrow(m)
mtm <- t(m) %*% m
l <- list(mat=mtm, colsum=cs, nrow=nr)
orch.keyval(NULL, orch.pack(l)),
reducer = function(k, v) {
mapres <- orch.unpack(v$val, as.list=T)
xy <- Reduce("+", lapply(mapres,function(x) x$mat))
csf <- Reduce("+", lapply(mapres,function(x) x$colsum))
nrf <- Reduce("+", lapply(mapres,function(x) x$nrow))
sts <- csf %*% t(csf)
m1 <- xy / (nrf -1)
m2 <- sts / (nrf * (nrf-1))
m3 <- 2 * sts / (nrf * (nrf-1))
covmat <- m1 + m2 - m3
orch.keyval(NULL, orch.pack(covmat))
},
config = new("mapred.config",
map.output = data.frame(key="none",
val="c"),
reduce.output = data.frame(key="none", val="c")),
job.name = "cov")
}

The execution times are shown below:


# Using text input
ORCH_cov(inputCsv)
# 7 mins, 19 sec
# 400 mappers / 45 reducers



# Using binary input
ORCH_cov(inputRdata)
# 4 mins, 18 sec
# 400 mappers / 45 reducers

A few points to observe:
1. Before an HDFS file can be used with ORAAH's hadoop.run() function, its metadata must be known. ORAAH automatically determines the data types of the columns in the HDFS file by sampling rows. The metadata is created during the hdfs.attach() call. This metadata enables ORAAH to generate highly optimized scan routines to read rows from the file.
2. ORAAH implements caching of input and output structures from mappers and reducers thereby lifting the burden on dealing with large data volumes from the R engine.
3. ORAAH's orch.pack() and orch.unpack() functions enable transfer of structured constructs between mappers and reducers, which further improves I/O throughput by eliminating the need to scan/parse string inputs.
4. ORAAH leverages R's own RDATA representation as the binary representation. The key to better execution performance of R jobs on Hadoop is managing I/O throughput and carefully bypassing R's inherent limitations with parsing strings.

We contrast ORAAH's performance by comparing it with an open source package called rmr (https://github.com/RevolutionAnalytics/RHadoop/wiki/rmr) repeating the covariance calculation on the same cluster and input data set. For this experiment, we used rmr version 2.3.0.

The covariance computation is written in rmr as shown below.


RMR2_cov <- function(x, input.format) {
mapreduce(
x,
input.format = input.format,
map = function(k, v) {
m <- as.matrix(v)
cs <- colSums(m)
nr <- nrow(m)
mtm <- t(m) %*% m
l <- list(mat=mtm, colsum=cs, nrow=nr)
keyval(1, paste0(deparse(l),
collapse="\n"))
},
reduce = function(k, v) {
mapres <- lapply(v, function(x)
eval(parse(text=x)))
xy <- Reduce("+", lapply(mapres, function(x) x$mat))
csf <- Reduce("+", lapply(mapres, function(x) x$colsum))
nrf <- Reduce("+", lapply(mapres, function(x) x$nrow))
sts <- csf %*% t(csf)
m1 <- xy / (nrf -1)
m2 <- sts / (nrf * (nrf-1))
m3 <- 2 * sts / (nrf * (nrf-1))
covmat <- m1 + m2 - m3
keyval(1, paste0(deparse(covmat),
collapse="\n"))
}
)
}

Notice that the code is somewhat similar to the version used with ORAAH with 3 key differences:
   i) Mapper output is serialized as a string
 ii) Reducer, thus, is forced to parse input strings
iii) Reducer output is once again a string, requiring client to reconstruct the covariance matrix

Not only is passing strings limiting from the R programmer's perspective, but also has a negative effect on I/O throughput. rmr also supports a proprietary binary representation of delimited text data.

Below we repeat the tests with both delimite and binary representations. Not only is the conversion to the binary representation more expensive, the resulting I/O throughput is not substantially better.


# Convert to RMR2 native format
inputRMR <- mapreduce(inputCsv, 
  input.format = make.input.format("csv",sep=","),
  map = function(k, v) keyval(NULL, v)
)
# 20 mins, 17 sec
# 400 mappers
#
# Using text input
RMR2_cov(inputCsv, make.input.format("csv",sep=","))
# 32 mins, 14 sec
# 400 mappers / 45 reducers
#
# Using binary input
RMR2_cov(inputRMR, "native")
# 17 mins, 18 sec
# 400 mappers / 45 reducers

To summarize ORAAH is 4x faster than rmr out of the box for a simple covariance calculation.





























 Text Input
 Binary Input
 Text to Binary Conversion
 ORAAH  7 min, 19 sec
 4 min, 18 sec
 4 min 54 sec
 rmr  32 min, 14 sec
 17 min, 18 sec
 20 min 17 sec

 4.4x faster
 4x faster
 4.14x faster

Thursday Jan 02, 2014

Invoking R scripts via Oracle Database: Theme and Variation


Oracle R Enterprise provides several ways for you to invoke R scripts through Oracle Database. From the same R script you can get structured data, an XML representation of R objects and images, and even PNG images via a BLOB column in a database table. This series of blog posts will take you through the various ways you can interact with R scripts and Oracle Database. In this first post, we explore the benefits of embedded R execution and usage of the ore.doEval and rqEval functions. Subsequent posts will detail the use of the other embedded R execution functions, and in the case of data- and task-parallel capabilities, how these relate to the MapReduce paradigm.

Introduction


In Oracle R Enterprise, we use the phrase “embedded R execution” to characterize the storing of R scripts in Oracle Database – using the ORE R script repository – and invoking that script in one or more database-side R engines.

This is a powerful capability for several reasons:


  • enable data- and task-parallel execution of user-defined R functions that correspond to special cases of Hadoop Map-Reduce jobs

  • leverage a more powerful database server machine for R engine execution – both RAM and CPU

  • transfer data between Oracle Database and R engine much faster than to a separate client R engine

  • invoke user-defined R functions from SQL and retrieve results in various forms depending on application requirements: tables, XML, PNG BLOB

  • leverage open source CRAN packages at the database server machine

  • schedule R scripts for automatic execution via SQL with Oracle Database DBMS_SCHEDULER PL/SQL package


Users can interactively develop R scripts using their favorite R IDE, and then deploy the script as an R function to the database where it can be invoked either from R or SQL. Embedded R execution facilitates application use of R scripts with better performance and throughput than using a client-side R engine. Executing R scripts from SQL enables integration of R script results with OBIEE, Oracle BI Publisher, and other SQL-enabled tools for structured data, R objects, and images.

Table 1 provides a summary of the embedded R execution functions and R script repository functions available. The function f refers to the user-defined R code, or script, that is provided as either an R function object or a named R function in the database R script repository. To create functions in the R script repository, ORE has functions as described in Table 1.












































R API SQL API Description

ore.doEval

rqEval

Executes f with no automatic transfer of data.

ore.tableApply

rqTableEval

Executes f passing all rows of provided input ore.frame as the first parameter of f. First parameter provided as a data.frame.

ore.groupApply

“rqGroupEval”
(must
be explicitly defined as function by user)

Executes f by partitioning data according to an “index” column’s values. Each data partition provided as a data.frame in the first parameter of f. Supports parallel execution of each f invocation in a pool of database server-side R engines.

ore.rowApply

rqRowEval

Executes f passing a specified number of rows (a “chunk”) of the provided input ore.frame. Each chunk provided as a data.frame in the first parameter of f. Supports parallel execution of each f invocation in a pool of database server-side R engines.

ore.indexApply

N/A

Executes f with no automatic transfer of data, but provides the index of the invocation, 1 through n, where n is the number of functions to invoke. Supports parallel execution of each f invocation in a pool of database server-side R engines.

ore.scriptCreate

sys.rqScriptCreate

Load the provided R function into the R script repository with the provided name.

ore.scriptDrop

sys.rqScriptDrop

Remove the named R function from the R script repository.

Table 1: Embedded R Execution and R Script Repository function summary


Using ore.doEval and rqEval


The first of the embedded R functions we cover are also the simplest. The R function ore.doEval and the SQL function rqEval do not automatically receive any data from the database. They simply execute the function f provided. Any needed data is either generated within f or explicitly retrieved from a data source such as Oracle Database, other databases, or flat files.

R API


In the R interface, users can specify an R function as an argument to ore.doEval, or use the name of that function as stored in the R script repository. For example, the function RandomRedDots returns a data.frame with two columns and plots 100 random normal values. To invoke the function through the database server requires minimal specification with ore.doEval.

RandomRedDots <- function(divisor=100){
id<- 1:10
plot(1:100, rnorm(100), pch = 21, bg = "red", cex = 2 )
data.frame(id=id, val=id / divisor)
}
ore.doEval(RandomRedDots)


Here is the result, where the image is displayed at the client, but generated by the database server R engine that executed the function f.


We can provide arguments to f as well. To override the divisor argument, provide it as an argument to ore.doEval. Note
that any number of parameters, including more complex R objects such as models can be passed as arguments this way.

ore.doEval(RandomRedDots, divisor=50)


Behind the scenes: when passing the function itself (as above), ORE implicitly stores the function in the R script repository before executing it. When finished executing, the function is dropped from the repository. If we want to store this function explicitly in the repository, we can use ore.scriptCreate:

ore.scriptCreate("myRandomRedDots",
RandomRedDots)


Now, the function can be invoked by name:

ore.doEval(FUN.NAME="myRandomRedDots",divisor=50)


The return value of f is a data.frame, however, if we capture the result in a variable, we’ll notice two things: the class of the return value is ore.object and the image does not display.

res <- ore.doEval(FUN.NAME="myRandomRedDots",
divisor=50)
class(res)



To get back the data.frame, we must invoke ore.pull to pull the result to the client R engine.

res.local <- ore.pull(res)
class(res.local)


If we wanted to return an ore.frame instead of an ore.object, specify the argument FUN.VALUE that describes the structure of the result.

res.of <- ore.doEval(FUN.NAME="myRandomRedDots", divisor=50,
FUN.VALUE= data.frame(id=1, val=1))
class(res.of)


SQL API


Now, we’ll look at executing the same R function f using the SQL interface of embedded R execution, while pointing out a few significant differences in the API. The first difference is that the R functions are defined as strings, not R objects. This should be no surprise since we’ll be using a SQL interface like SQL Developer or SQL*Plus. Also, the R function string cannot be passed directly in the rqEval function, but must first be stored in the R script repository. The call to sys.rqScriptCreate must be wrapped in a BEGIN-END PL/SQL block.

begin
sys.rqScriptCreate('myRandomRedDots2',
'function(divisor=100,numDots=100){
id <- 1:10
plot( 1:numDots, rnorm(numDots), pch = 21, bg = "red", cex = 2 )
data.frame(id = id, val = id / divisor)
}');
end;
/


Invoking the function myRandomRedDots2 occurs in a SQL SELECT statement as shown below. The first NULL argument to rqEval indicates that no arguments are supplied to the function myRandomRedDots2. In the SQL API, we can ask for the data.frame returned by f to appear as a SQL table. For this, the second parameter can take a SQL string that describes the column names and data types that correspond to the returned data.frame. You can provide a prototype row using the dual dummy table, however, the select statement can be based on an existing table or view as well.

select *
from table(rqEval(NULL, 'select 1 id, 1 val from dual', 'myRandomRedDots2'));


To pass parameters in SQL, we can replace the first NULL argument with a cursor that specifies a single row of scalar values. Multiple arguments can be specified as shown below. Note that argument names are case sensitive, so it is best to include column names in double quotes. Note also that the first argument is a cursor whereas the second parameter is a string. The former provides data values, whereas the latter is parsed to determine the structure of the result.

select *
from table(rqEval(cursor(select 50 "divisor", 500 "numDots" from dual),
'select 1 id, 1 val from dual',
'myRandomRedDots2'));


When specifying a table structure for the result as above, any image data is discarded. To get back both structured data and images, we replace the second argument with ‘XML’. This instructs the database to generate an XML string, first with any structured or semi-structured R objects, followed by the image or images generated by the R function f. Images are returned as a base 64 encoding of the PNG representation.

select *
from table(rqEval(cursor(select 50 "divisor", 500 "numDots" from dual),
'XML',
'myRandomRedDots2'));


Advanced features


To establish a connection to Oracle Database within the R function f, a special argument ore.connect can be set to TRUE. This uses the credentials of the user who invoked the embedded R function ore.doEval or rqEval to establish a connection and also automatically load the ORE package. This capability can be useful to explicitly use the ORE Transparency Layer or to save and load objects with ORE R object datastores.

RandomRedDots <- function(divisor=100, datatstore.name="myDatastore"){
id <- 1:10
plot(1:100, rnorm(100), pch = 21, bg = "red", cex = 2 )
ore.load(datastore.name) # contains numeric variable myVar
data.frame(id=id, val=id / divisor, num=myVar)
}


ore.doEval(RandomRedDots, datastore.name="datastore_1", ore.connect=TRUE)


Notice the additions in red. We pass the name of a datastore to load. That datastore is expected to contain a variable myVar. Arguments prefixed with ‘ore.’ are control arguments and are not passed to f. Other control arguments include: ore.drop which if set to TRUE converts a one-column input data.frame to a vector, ore.graphics which if set to TRUE starts a graphical driver to look for images being returned from f, ore.png.* which provides additional parameters for the PNG graphics device. The ore.png.* control arguments include (replace * with): width, height, units, pointsize, bg, res, type, etc.

In the next post, we will explore ore.tableApply and rqTableEval.

About

The place for best practices, tips, and tricks for applying Oracle R Enterprise, Oracle R Distribution, ROracle, and Oracle R Advanced Analytics for Hadoop in both traditional and Big Data environments.

Search

Archives
« January 2014 »
SunMonTueWedThuFriSat
   
1
4
6
7
8
10
11
12
13
14
15
16
17
18
19
21
22
23
24
25
26
27
28
29
30
31
 
       
Today