Monday Jan 20, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 5


In the first four parts of Invoking R scripts via Oracle Database: Theme and Variation, we introduced features of Oracle R Enterprise embedded R execution involving the functions ore.doEval / rqEval, ore.tableApply / rqTableEval, ore.groupApply / “rqGroupApply”, and ore.rowApply / rqRowEval. In this blog post, we cover ore.indexApply. Note that there is no corresponding rqIndexEval – more on that later. The “index apply” function is also one of the parallel-enabled embedded R execution functions. It supports task-parallel execution, where one or more R engines perform the same or different calculations, or task. A number, associated with the index of the execution, is provided to the function. Any required data is expected to be explicitly generated or loaded within the function.

This functionality is valuable in a variety of settings, e.g., simulations, for taking advantage of high-performance computing hardware like Exadata.

As for “group apply” and “row apply”, Oracle Database handles the management and control of potentially multiple R engines at the database server machine, with only the index passed to the function as the first argument. Oracle Database ensures that each R function execution completes, otherwise the ORE function returns an error. Output formats as supported by the other embedded R functions are possible for ore.indexApply, for example, returning an ore.list or combining data.frame data into an ore.frame.

The variation on embedded R execution for ore.indexApply involves passing as an argument the number of times the user-defined R function should be executed.

Let’s look at a simple example.

The following code specifies to execute the function five times in parallel.


res <- ore.indexApply(5,
      function(index) {
        paste("IndexApply:",index)
      },
    parallel=TRUE)
class(res)
res

Notice that the class of the result is an ore.list, and when we print res, we have 5 character vectors, each with the index that was passed to the user-defined function. As with other parallel embedded R functions, the number of concurrently executing R engines can be limited by specifying the degree of parallelism of the database. As we’ll see in ORE 1.4, the parallel argument can specify a preferred number of parallel R engines, as an upper bound.


> class(res)
[1] "ore.list"
attr(,"package")
[1] "OREbase"
> res
$`1`
[1] "IndexApply: 1"

$`2`
[1] "IndexApply: 2"

$`3`
[1] "IndexApply: 3"

$`4`
[1] "IndexApply: 4"

$`5`
[1] "IndexApply: 5"

Column-parallel use case

If we wanted to parallelize R’s summary function, we could compute the summary statistics on each column in parallel and combine them into a final result. The following example does exactly that. While we could generalize this example, we focus on the iris data set and computing summary statistics on the first four numeric columns. Since iris comes standard with R, there’s no need to load data from any other source, we simply access it. The first argument to ore.indexApply is 4, the number of columns we wish to summarize in parallel. The function takes one argument, index, which will be a value between 1 and 4, and will be used to select the column to summarize. We massage the result of summary into a data.frame and add the column name to the result. Note that the function returns a single row: the summary statistics for the column.


res <- NULL
res <- ore.indexApply(4,
      function(index) {
        ss <- summary(iris[,index])
        attr.names <- attr(ss,"names")
        stats <- data.frame(matrix(ss,1,length(ss)))
        names(stats) <- attr.names
        stats$col <- names(iris)[index]
        stats
      },
      parallel=TRUE)
res

The result comes back as an ore.list object:


> res
$`1`
Min. 1st Qu. Median Mean 3rd Qu. Max. col
1 4.3 5.1 5.8 5.843 6.4 7.9 Sepal.Length

$`2`
Min. 1st Qu. Median Mean 3rd Qu. Max. col
1 2 2.8 3 3.057 3.3 4.4 Sepal.Width

$`3`
Min. 1st Qu. Median Mean 3rd Qu. Max. col
1 1 1.6 4.35 3.758 5.1 6.9 Petal.Length

$`4`
Min. 1st Qu. Median Mean 3rd Qu. Max. col
1 0.1 0.3 1.3 1.199 1.8 2.5 Petal.Width

This is good, but it would be better if the result was returned as an ore.frame, especially since all the columns are the same. To enable this, we’ll do a slight variation on the result by specifying FUN.VALUE with the structure of the result defined.


res <- ore.indexApply(4,
      function(index) {
        ss <- summary(iris[,index])
        attr.names <- attr(ss,"names")
        stats <- data.frame(matrix(ss,1,length(ss)))
        names(stats) <- attr.names
        stats$col <- names(iris)[index]
        stats
      },
      FUN.VALUE=data.frame(Min.=numeric(0),
        "1st Qu."=numeric(0),
        Median=numeric(0),
        Mean=numeric(0),
        "3rd Qu."=numeric(0),
        Max.=numeric(0),
        col=character(0)),
      parallel=TRUE)
res

Now, the result comes back as an ore.frame.


> res
  Min. X1st.Qu. Median  Mean X3rd.Qu. Max.      col
1 0.1 0.3 1.30 1.199 1.8 2.5 Petal.Width
2 1.0 1.6 4.35 3.758 5.1 6.9 Petal.Length
3 4.3 5.1 5.80 5.843 6.4 7.9 Sepal.Length
4 2.0 2.8 3.00 3.057 3.3 4.4 Sepal.Width
Simulation use case

The ore.indexApply function can be used in simulations as well. In this next example we take multiple samples from a random normal distribution with the goal to compare the distribution of the summary statistics. For this, we build upon the example above. We provide parameters such as the sample size, mean and standard deviation of the random numbers, and the number of simulations we want to perform. Each one of these simulations will occur in a separate R engine, in parallel, up to the degree of parallelism allowed by the database.

We specify num.simulations as the first parameter to ore.indexApply. Inside the user-defined function, we pass the index and three arguments to the function. The function then sets the random seed based on the index. This allows each invocation to generate a different set of random numbers. Using rnorm, the function produces sample.size random normal values. We invoke summary on the vector of random numbers, and then prepare a data.frame result to be returned. We’re using the FUN.VALUE to get an ore.frame as the final result.


res <- NULL
sample.size = 1000
mean.val = 100
std.dev.val = 10
num.simulations = 1000

res <- ore.indexApply(num.simulations,
      function(index, sample.size=1000, mean=0, std.dev=1) {
        set.seed(index)
        x <- rnorm(sample.size, mean, std.dev)
        ss <- summary(x)
        attr.names <- attr(ss,"names")
        stats <- data.frame(matrix(ss,1,length(ss)))
        names(stats) <- attr.names
        stats$index <- index
        stats
      },
      FUN.VALUE=data.frame(Min.=numeric(0),
        "1st Qu."=numeric(0),
        Median=numeric(0),
        Mean=numeric(0),
        "3rd Qu."=numeric(0),
        Max.=numeric(0),
        index=numeric(0)),
      parallel=TRUE,
      sample.size=sample.size,
      mean=mean.val, std.dev=std.dev.val)
res
boxplot(ore.pull(res[,1:6]),
  main=sprintf("Boxplot of %d rnorm samples size %d, mean=%d, sd=%d",
        num.simulations, sample.size, mean.val, std.dev.val))

To get the distribution of samples, we invoke boxplot on the data.frame after pulling the result to the client.

Here are a couple of plots showing results for different parameters:


In both cases, we run 10,000 samples. The first graph uses a sample size of 10 and the second uses a sample size of 1000. From these results, it is clear that a larger sample size significantly reduces the variance in each of the summary statistics - confirming our Statistics 101 understanding.

Error reporting

As introduced above, Oracle Database ensures that each embedded R user-defined function execution completes, otherwise the ORE function returns an error. Of course, any side-effects of the user-defined function need to be manually cleaned up. Operations that produce files, create tables in the database, or result in completed database transactions through ROracle will remain intact. The ORE embedded R infrastructure will report errors as produced by the function as illustrated in the following example.

The code specifies to invoke 4 parallel R engines. If the index has value 3, attempt to load the non-existant package "abc123" (which produces an error), otherwise return the index value.


R> ore.indexApply(4,
+ function(index) {
+ if (index==3) {library(abc123)}
+ else {return(index)}
+ }
+ )
Error in .oci.GetQuery(conn, statement, data = data, prefetch = prefetch, :
ORA-12801: error signaled in parallel query server P000
ORA-20000: RQuery error
Error in library(abc123) : there is no package called 'abc123'
ORA-06512: at "RQSYS.RQGROUPEVALIMPL", line 121
ORA-06512: at "RQSYS.RQGROUPEVALIMPL", line 118

Notice that the first reported error is an ORE-12801: error signaled in parallel query server. Then the ORA-20000: RQuery error indicates the error as returned by the R engine. Also interesting to note is that the ORA-06512 errors reveal the underlying implementation of ore.indexApply "RQSYS.RQGROUPEVALIMPL". Which leads us to the next topic.

No rqIndexEval?

“Index apply” is really a variation of “group apply” where the INDEX column is a numeric vector that is pushed to the database. With n distinct numbers, one number is provided to each function as its index. As a result, there is no corresponding rqIndexEval in the SQL API. The user would have to create a similar package and function as was illustrated in the blog post on “group apply.”

About

The place for best practices, tips, and tricks for applying Oracle R Enterprise, Oracle R Distribution, ROracle, and Oracle R Advanced Analytics for Hadoop in both traditional and Big Data environments.

Search

Archives
« January 2014 »
SunMonTueWedThuFriSat
   
1
4
6
7
8
10
11
12
13
14
15
16
17
18
19
21
22
23
24
25
26
27
28
29
30
31
 
       
Today