Thursday Aug 14, 2014

Selecting the most predictive variables – returning Attribute Importance results as a database table

Attribute Importance (AI) is a technique of Oracle Advanced Analytics (OAA) that ranks the relative importance of predictors given a categorical or numeric target for classification or regression models, respectively. OAA AI uses the minimum description length algorithm and produces importance scores such that predictors with positive scores help predict the target, while zero or negative do not, and may even contribute noise to a model, making it less accurate. OAA AI, however, considers predictors only pairwise with the target, so any interactions among predictors are addressed. OAA AI is a good first assessment of which predictors should be included in a classification or regression model, enabling what is sometimes called feature selection or variable selection.

In my series on Oracle R Enterprise Embedded R Execution, I explored how structured table results could be returned from embedded R calls. In a subsequent post, I explored how to return select results from a principal components analysis (PCA) model as a table. In this post, I describe how you can work with results from an Attribute Importance model from ORE embedded R execution via an R function. This R function takes a table name and target variable name as input, places the predictor rankings in an named ORE datastore also specified as input, and returns a data.frame with the predictor variable name, rank, importance value.

The function below implements this functionality. Notice that we dynamically sync the named table and get its ore.frame proxy object. From here, we invoke ore.odmAI using the dynamically generated formula using the targetName argument. We pull out the importance component of the result, explicitly assign the column variable to the row names, and then reorder the columns. Next, we nullify the row names since these are now redundant with column variable.

The next three lines assign the result to a datastore. This is technically not necessary since the result is returned by this function, but if a user wanted to access this result without recomputing it, the user could simply retrieve the datastore object using another embedded R function. This is left as an exercise for the reader to load the named datastore and return the contents as an ore.frame in R or database table in SQL.

Lastly, the resulting data.frame is returned.

rankPredictors <- function(tableName,targetName,dsName) {
  ore.sync(table=tableName)
  ore.attach()
  dat <- ore.get(tableName)
  formulaStr <- paste(targetName,".",sep="~")
  res <- ore.odmAI(as.formula(formulaStr),dat)
  res <- res$importance
  res$variable <- rownames(res)
  res <- res[,c("variable","rank","importance")]
  row.names(res) <- NULL
  resName <- paste(tableName,targetName,"AI",sep=".")
  assign(resName,res)
  ore.save(list=c(resName),name=dsName,overwrite=TRUE)
  res
}

To test this funtion, we invoke it explicitly with suitable arguments.

res <- rankPredictors ("IRIS","Species","/DS/Test1")
res

Here, you see the results.

> res
    variable rank importance
1  Petal.Width    1  1.1701851
2 Petal.Length    2  1.1494402
3 Sepal.Length    3  0.5248815
4  Sepal.Width    4  0.2504077

The contents of the datastore can be accessed as well.

ore.datastore(pattern="/DS")
ore.datastoreSummary(name="/DS/Test1")
ore.load("/DS/Test1")
IRIS.Species.AI
> ore.datastore(pattern="/DS")
  datastore.name object.count size    creation.date description
1 /DS/Test1 1 355 2014-08-14 16:38:46 <na>
> ore.datastoreSummary(name="/DS/Test1")
object.name class size length row.count col.count
1 IRIS.Species.AI data.frame 355 3 4 3
> ore.load("/DS/Test1")
[1] "IRIS.Species.AI"
> IRIS.Species.AI
    variable rank importance
1  Petal.Width    1  1.1701851
2 Petal.Length    2  1.1494402
3 Sepal.Length    3  0.5248815
4  Sepal.Width    4  0.2504077

With the confidence that our R function is behaving correctly, we load it into the R Script Repository in Oracle Database.

ore.scriptDrop("rankPredictors")
ore.scriptCreate("rankPredictors",rankPredictors)

To test that the function behaves properly with embedded R execution, we invoke it first from R using ore.doEval, passing the desired parameters and returning the result as an ore.frame. This last part is enabled through the specification of the FUN.VALUE argument. Since we are using a datastore and the transparency layer, ore.connect is set to TRUE.

ore.doEval(
FUN.NAME="rankPredictors",
  tableName="IRIS",
  target="Species",
  dsName="/AttributeImportance/IRIS/Species",
  FUN.VALUE=data.frame(variable=character(0)
      ,rank=numeric(0)
      ,importance=numeric(0)),
  ore.connect=TRUE
)

Notice we get the same result as above.

    variable rank importance
1  Petal.Width    1  1.1701851
2 Petal.Length    2  1.1494402
3 Sepal.Length    3  0.5248815
4  Sepal.Width    4  0.2504077

Again, we can view the datastore contents for the execution above. Notice our use of the “/” notation to organize our datastore content. While we can name datastores with any arbitrary string, this approach can help structure the retrieval of datastore contents.

ore.datastore(pattern="/AttributeImportance/IRIS")
ore.datastoreSummary(name="/AttributeImportance/IRIS/Species")

We have a single datastore matching our IRIS data set followed by the summary with the IRIS.Species.AI object, which is an R data.frame with 3 columns and 4 rows.

> ore.datastore(pattern="/AttributeImportance/IRIS")
datastore.name object.count size creation.date description
1 /AttributeImportance/IRIS/Species 1 355 2014-08-14 16:55:40
> ore.datastoreSummary(name="/AttributeImportance/IRIS/Species")
object.name class size length row.count col.count
1 IRIS.Species.AI data.frame 355 3 4 3

To execute this R script from SQL, use the ORE SQL API.

select * from table(rqEval(
  cursor(select 1 "ore.connect",
      'IRIS' "tableName",
      'Species' "targetName",
      '/AttributeImportance/IRIS/Species' "dsName"
      from dual),
  'select cast(''a'' as varchar2(50)) "variable",
  1 "rank",
  1 "importance"
  from dual',
  'rankPredictors'));

In summary, we’ve explored how to use ORE embedded R execution to extract model elements from an in-database algorithm and present it as an R data.frame, ore.frame, and SQL table.

The process used above can also serve as a template for working on your own embedded R execution projects:

+ Interactively develop an R script that does what you need and wrap it in a function
+ Validate that the R function behaves as expected
+ Store the function in the R Script Repository
+ Validate that the R interface to embedded R execution produces the desired results
+ Generate SQL query that invokes the R function
+ Validate that the SQL interface to embedded R execution produces the desired resultsv

Thursday Jun 05, 2014

Convert ddply {plyr} to Oracle R Enterprise, or use with Embedded R Execution

The plyr package contains a set of tools for partitioning a problem into smaller sub-problems that can be more easily processed. One function within {plyr} is ddply, which allows you to specify subsets of a data.frame and then apply a function to each subset. The result is gathered into a single data.frame. Such a capability is very convenient. The function ddply also has a parallel option that if TRUE, will apply the function in parallel, using the backend provided by foreach.

This type of functionality is available through Oracle R Enterprise using the ore.groupApply function. In this blog post, we show a few examples from Sean Anderson's "A quick introduction to plyr" to illustrate the correpsonding functionality using ore.groupApply.

To get started, we'll create a demo data set and load the plyr package.


set.seed(1)
d <- data.frame(year = rep(2000:2014, each = 3),
        count = round(runif(45, 0, 20)))
dim(d)
library(plyr)

This first example takes the data frame, partitions it by year, and calculates the coefficient of variation of the count, returning a data frame.


# Example 1
res <- ddply(d, "year", function(x) {
  mean.count <- mean(x$count)
  sd.count <- sd(x$count)
  cv <- sd.count/mean.count
  data.frame(cv.count = cv)
  })

To illustrate the equivalent functionality in Oracle R Enterprise, using embedded R execution, we use the ore.groupApply function on the same data, but pushed to the database, creating an ore.frame. The function ore.push creates a temporary table in the database, returning a proxy object, the ore.frame.


D <- ore.push(d)
res <- ore.groupApply (D, D$year, function(x) {
  mean.count <- mean(x$count)
  sd.count <- sd(x$count)
  cv <- sd.count/mean.count
  data.frame(year=x$year[1], cv.count = cv)
  }, FUN.VALUE=data.frame(year=1, cv.count=1))

You'll notice the similarities in the first three arguments. With ore.groupApply, we augment the function to return the specific data.frame we want. We also specify the argument FUN.VALUE, which describes the resulting data.frame. From our previous blog posts, you may recall that by default, ore.groupApply returns an ore.list containing the results of each function invocation. To get a data.frame, we specify the structure of the result.

The results in both cases are the same, however the ore.groupApply result is an ore.frame. In this case the data stays in the database until it's actually required. This can result in significant memory and time savings whe data is large.


R> class(res)
[1] "ore.frame"
attr(,"package")
[1] "OREbase"
R> head(res)
   year cv.count
1 2000 0.3984848
2 2001 0.6062178
3 2002 0.2309401
4 2003 0.5773503
5 2004 0.3069680
6 2005 0.3431743

To make the ore.groupApply execute in parallel, you can specify the argument parallel with either TRUE, to use default database parallelism, or to a specific number, which serves as a hint to the database as to how many parallel R engines should be used.

The next ddply example uses the summarise function, which creates a new data.frame. In ore.groupApply, the year column is passed in with the data. Since no automatic creation of columns takes place, we explicitly set the year column in the data.frame result to the value of the first row, since all rows received by the function have the same year.


# Example 2
ddply(d, "year", summarise, mean.count = mean(count))

res <- ore.groupApply (D, D$year, function(x) {
  mean.count <- mean(x$count)
  data.frame(year=x$year[1], mean.count = mean.count)
  }, FUN.VALUE=data.frame(year=1, mean.count=1))

R> head(res)
   year mean.count
1 2000 7.666667
2 2001 13.333333
3 2002 15.000000
4 2003 3.000000
5 2004 12.333333
6 2005 14.666667

Example 3 uses the transform function with ddply, which modifies the existing data.frame. With ore.groupApply, we again construct the data.frame explicilty, which is returned as an ore.frame.


# Example 3

ddply(d, "year", transform, total.count = sum(count))

res <- ore.groupApply (D, D$year, function(x) {
  total.count <- sum(x$count)
  data.frame(year=x$year[1], count=x$count, total.count = total.count)
  }, FUN.VALUE=data.frame(year=1, count=1, total.count=1))

> head(res)
   year count total.count
1 2000 5 23
2 2000 7 23
3 2000 11 23
4 2001 18 40
5 2001 4 40
6 2001 18 40

In Example 4, the mutate function with ddply enables you to define new columns that build on columns just defined. Since the construction of the data.frame using ore.groupApply is explicit, you always have complete control over when and how to use columns.


# Example 4

ddply(d, "year", mutate, mu = mean(count), sigma = sd(count),
      cv = sigma/mu)

res <- ore.groupApply (D, D$year, function(x) {
  mu <- mean(x$count)
  sigma <- sd(x$count)
  cv <- sigma/mu
  data.frame(year=x$year[1], count=x$count, mu=mu, sigma=sigma, cv=cv)
  }, FUN.VALUE=data.frame(year=1, count=1, mu=1,sigma=1,cv=1))

R> head(res)
   year count mu sigma cv
1 2000 5 7.666667 3.055050 0.3984848
2 2000 7 7.666667 3.055050 0.3984848
3 2000 11 7.666667 3.055050 0.3984848
4 2001 18 13.333333 8.082904 0.6062178
5 2001 4 13.333333 8.082904 0.6062178
6 2001 18 13.333333 8.082904 0.6062178

In Example 5, ddply is used to partition data on multiple columns before constructing the result. Realizing this with ore.groupApply involves creating an index column out of the concatenation of the columns used for partitioning. This example also allows us to illustrate using the ORE transparency layer to subset the data.


# Example 5

baseball.dat <- subset(baseball, year > 2000) # data from the plyr package
x <- ddply(baseball.dat, c("year", "team"), summarize,
           homeruns = sum(hr))

We first push the data set to the database to get an ore.frame. We then add the composite column and perform the subset, using the transparency layer. Since the results from database execution are unordered, we will explicitly sort these results and view the first 6 rows.


BB.DAT <- ore.push(baseball)
BB.DAT$index <- with(BB.DAT, paste(year, team, sep="+"))
BB.DAT2 <- subset(BB.DAT, year > 2000)
X <- ore.groupApply (BB.DAT2, BB.DAT2$index, function(x) {
  data.frame(year=x$year[1], team=x$team[1], homeruns=sum(x$hr))
  }, FUN.VALUE=data.frame(year=1, team="A", homeruns=1), parallel=FALSE)
res <- ore.sort(X, by=c("year","team"))

R> head(res)
   year team homeruns
1 2001 ANA 4
2 2001 ARI 155
3 2001 ATL 63
4 2001 BAL 58
5 2001 BOS 77
6 2001 CHA 63

Our next example is derived from the ggplot function documentation. This illustrates the use of ddply within using the ggplot2 package. We first create a data.frame with demo data and use ddply to create some statistics for each group (gp). We then use ggplot to produce the graph. We can take this same code, push the data.frame df to the database and invoke this on the database server. The graph will be returned to the client window, as depicted below.


# Example 6 with ggplot2

library(ggplot2)
df <- data.frame(gp = factor(rep(letters[1:3], each = 10)),
                 y = rnorm(30))
# Compute sample mean and standard deviation in each group
library(plyr)
ds <- ddply(df, .(gp), summarise, mean = mean(y), sd = sd(y))

# Set up a skeleton ggplot object and add layers:
ggplot() +
  geom_point(data = df, aes(x = gp, y = y)) +
  geom_point(data = ds, aes(x = gp, y = mean),
             colour = 'red', size = 3) +
  geom_errorbar(data = ds, aes(x = gp, y = mean,
                               ymin = mean - sd, ymax = mean + sd),
             colour = 'red', width = 0.4)

DF <- ore.push(df)
ore.tableApply(DF, function(df) {
  library(ggplot2)
  library(plyr)
  ds <- ddply(df, .(gp), summarise, mean = mean(y), sd = sd(y))
  ggplot() +
    geom_point(data = df, aes(x = gp, y = y)) +
    geom_point(data = ds, aes(x = gp, y = mean),
               colour = 'red', size = 3) +
    geom_errorbar(data = ds, aes(x = gp, y = mean,
                                 ymin = mean - sd, ymax = mean + sd),
                  colour = 'red', width = 0.4)
})

But let's take this one step further. Suppose we wanted to produce multiple graphs, partitioned on some index column. We replicate the data three times and add some noise to the y values, just to make the graphs a little different. We also create an index column to form our three partitions. Note that we've also specified that this should be executed in parallel, allowing Oracle Database to control and manage the server-side R engines. The result of ore.groupApply is an ore.list that contains the three graphs. Each graph can be viewed by printing the list element.

df2 <- rbind(df,df,df)
df2$y <- df2$y + rnorm(nrow(df2))
df2$index <- c(rep(1,300), rep(2,300), rep(3,300))
DF2 <- ore.push(df2)
res <- ore.groupApply(DF2, DF2$index, function(df) {
  df <- df[,1:2]
  library(ggplot2)
  library(plyr)
  ds <- ddply(df, .(gp), summarise, mean = mean(y), sd = sd(y))
  ggplot() +
    geom_point(data = df, aes(x = gp, y = y)) +
    geom_point(data = ds, aes(x = gp, y = mean),
               colour = 'red', size = 3) +
    geom_errorbar(data = ds, aes(x = gp, y = mean,
                                 ymin = mean - sd, ymax = mean + sd),
                  colour = 'red', width = 0.4)
  }, parallel=TRUE)
res[[1]]
res[[2]]
res[[3]]

To recap, we've illustrated how various uses of ddply from the plyr package can be realized in ore.groupApply, which affords the user explicit control over the contents of the data.frame result in a straightforward manner. We've also highlighted how ddply can be used within an ore.groupApply call.

Sunday Apr 27, 2014

Step-by-step: Returning R statistical results as a Database Table


R provides a rich set of statistical functions that we may want to use directly from SQL. Many of these results can be readily expressed as structured table data for use with other SQL tables, or for use by SQL-enabled applications, e.g., dashboards or other statistical tools.

In this blog post, we illustrate in a sequence of five simple steps  how to go from an R function to a SQL-enabled result. Taken from recent "proof of concept" customer engagement, our example involves using the function princomp, which performs a principal components analysis on a given numeric data matrix and returns the results as an object of class princomp. The customer actively uses this R function to produce loadings used in subsequent computations and analysis. The loadings is a matrix whose columns contain the eigenvectors).

The current process of pulling data from their Oracle Database, starting an R  engine, invoking the R script, and placing the results back in the database was proving non-performant and unnecessarily complex. The goal was to leverage Oracle R Enterprise to streamline this process and allow the results to be immediately accessible
through SQL.

As a best practice, here is a process that can get you from start to finish:

Step 1: Invoke from command line, understand results

If you're using a particular R function, chances are you are familiar with its content. However, you may not be familiar with its structure. We'll use an example from the R princomp documentation that uses the USArrests data set. We see that the class of the result is of type princomp, and the model prints the call and standard deviations of the components. To understand the underlying structure, we invoke the function str and see there are seven elements in the list, one of which is the matrix loadings.

mod <- princomp(USArrests, cor = TRUE)
class(mod)
mod
str(mod)


Results:

R> mod <- princomp(USArrests, cor = TRUE)
R> class(mod)
[1] "princomp"
R> mod
Call:
princomp(x = USArrests, cor = TRUE)

Standard deviations:
   Comp.1    Comp.2    Comp.3    Comp.4
1.5748783 0.9948694 0.5971291 0.4164494

4 variables and 50 observations.

R> str(mod)
List of 7
$ sdev : Named num [1:4] 1.575 0.995 0.597 0.416
..- attr(*, "names")= chr [1:4] "Comp.1" "Comp.2" "Comp.3" "Comp.4"
$ loadings: loadings [1:4, 1:4] -0.536 -0.583 -0.278 -0.543 0.418 ...
..- attr(*, "dimnames")=List of 2
.. ..$ : chr [1:4] "Murder" "Assault" "UrbanPop" "Rape"
.. ..$ : chr [1:4] "Comp.1" "Comap.2" "Comp.3" "Comp.4"
$ center : Named num [1:4] 7.79 170.76 65.54 21.23
..- attr(*, "names")= chr [1:4] "Murder" "Assault" "UrbanPop" "Rape"
$ scale : Named num [1:4] 4.31 82.5 14.33 9.27
..- attr(*, "names")= chr [1:4] "Murder" "Assault" "UrbanPop" "Rape"
$ n.obs : int 50
$ scores : num [1:50, 1:4] -0.986 -1.95 -1.763 0.141 -2.524 ...
..- attr(*, "dimnames")=List of 2
.. ..$ : chr [1:50] "1" "2" "3" "4" ...
.. ..$ : chr [1:4] "Comp.1" "Comp.2" "Comp.3" "Comp.4"
$ call : language princomp(x = dat, cor = TRUE)
- attr(*, "class")= chr "princomp"


Step 2: Wrap script in a function, and invoke from ore.tableApply

Since we want to invoke princomp on database data, we first push the demo data, USArrests, to the database to create an ore.frame. Other data we wish to use will also be in database tables.

We'll use ore.tableApply (for the reasons cited in the previous blog post)  providing the ore.frame as the first argument and simply returning within our function the model produced by princomp. We'll then look at its class, retrieve the result from the database, and check its class and structure once again.

Notice that we are able to obtain the exact same result we received using our local R engine as with the database R engine through embedded R execution.

dat <- ore.push(USArrests)
computePrincomp <- function(dat) princomp(dat, cor=TRUE)
res <- ore.tableApply(dat, computePrincomp)


class(res)
res.local <- ore.pull(res)
class(res.local)
str(res.local)
res.local
res


Results:

R> dat <- ore.push(USArrests)
R> computePrincomp <- function(dat) princomp(dat, cor=TRUE)
R> res <- ore.tableApply(dat, dat, computePrincomp)
R> class(res)
[1] "ore.object"
attr(,"package")
[1] "OREembed"
R> res.local <- ore.pull(res)
R> class(res.local)
[1] "princomp"


R> str(res.local)
List of 7
$ sdev : Named num [1:4] 1.575 0.995 0.597 0.416
..- attr(*, "names")= chr [1:4] "Comp.1" "Comp.2" "Comp.3" "Comp.4"
$ loadings: loadings [1:4, 1:4] -0.536 -0.583 -0.278 -0.543 0.418 ...
..- attr(*, "dimnames")=List of 2
.. ..$ : chr [1:4] "Murder" "Assault" "UrbanPop" "Rape"
.. ..$ : chr [1:4] "Comp.1" "Comap.2" "Comp.3" "Comp.4"
$ center : Named num [1:4] 7.79 170.76 65.54 21.23
..- attr(*, "names")= chr [1:4] "Murder" "Assault" "UrbanPop" "Rape"
$ scale : Named num [1:4] 4.31 82.5 14.33 9.27
..- attr(*, "names")= chr [1:4] "Murder" "Assault" "UrbanPop" "Rape"
$ n.obs : int 50
$ scores : num [1:50, 1:4] -0.986 -1.95 -1.763 0.141 -2.524 ...
..- attr(*, "dimnames")=List of 2
.. ..$ : chr [1:50] "1" "2" "3" "4" ...
.. ..$ : chr [1:4] "Comp.1" "Comp.2" "Comp.3" "Comp.4"
$ call : language princomp(x = dat, cor = TRUE)
- attr(*, "class")= chr "princomp"

R> res.local
Call:
princomp(x = dat, cor = TRUE)

Standard deviations:
   Comp.1    Comp.2    Comp.3    Comp.4
1.5748783 0.9948694 0.5971291 0.4164494

4 variables and 50 observations.
R> res
Call:
princomp(x = dat, cor = TRUE)

Standard deviations:
   Comp.1    Comp.2    Comp.3    Comp.4
1.5748783 0.9948694 0.5971291 0.4164494


4 variables and 50 observations.


Step 3: Determine what results we really need

Since we are only interested in the loadings and any result we return needs to be a data.frame to turn it into a database row set (table result), we build the model, transform the loadings object into a data.frame, and return the data.frame as the function result. We then view the class of the result and its values.

Since we do this from the R API, we can simply print res to display the returned data.frame, as the print does an implicit ore.pull.

returnLoadings <- function(dat) {
                    mod <- princomp(dat, cor=TRUE)
                    dd <- dim(mod$loadings)
                    ldgs <- as.data.frame(mod$loadings[1:dd[1],1:dd[2]])
                    ldgs$variables <- row.names(ldgs)
                    ldgs
                  }
res <- ore.tableApply(dat, returnLoadings)
class(res)
res

ore.create(USArrests,table="USARRESTS")


Results:

R> res <- ore.tableApply(dat, returnLoadings)
R> class(res)
[1] "ore.object"
attr(,"package")
[1] "OREembed"
R> res
             Comp.1     Comp.2     Comp.3     Comp.4 variables
Murder   -0.5358995  0.4181809 -0.3412327  0.64922780 Murder
Assault  -0.5831836  0.1879856 -0.2681484 -0.74340748 Assault
UrbanPop -0.2781909 -0.8728062 -0.3780158  0.13387773 UrbanPop
Rape     -0.5434321 -0.1673186  0.8177779  0.08902432 Rape


Step 4: Load script into the R Script Repository in the database

We're at the point of being able to load the script into the R Script Repository before invoking it from SQL. We can create the function from R or from SQL. In R,

ore.scriptCreate('princomp.loadings', returnLoadings)

or from SQL,

begin
--sys.rqScriptDrop('princomp.loadings');
sys.rqScriptCreate('princomp.loadings',
      'function(dat) {
        mod <- princomp(dat, cor=TRUE)
        dd <- dim(mod$loadings)
        ldgs <- as.data.frame(mod$loadings[1:dd[1],1:dd[2]])
        ldgs$variables <- row.names(ldgs)
        ldgs
      }');
end;
/


Step 5: invoke from SQL SELECT statement

Finally, we're able to invoke the function from SQL using the rqTableEval table function. We pass in a cursor with the data from our USARRESTS table. We have no parameters, so the next argument is NULL. To get the results as a table, we specify a SELECT string that defines the structure of the result. Note that the column names must be identical to what is returned in the R data.frame. The last parameter is the name of the function we want to invoke from the R script repository.

Invoking this, we see the result as a table from the SELECT statement.

select *
from table(rqTableEval( cursor(select * from USARRESTS),
                        NULL,
                       'select 1 as "Comp.1", 1 as "Comp.2", 1 as "Comp.3", 1 as "Comp.4", cast(''a'' as varchar2(12)) "variables" from dual',
                        'princomp.loadings'));


Results:

SQL> select *
from table(rqTableEval( cursor(select * from USARRESTS),NULL,
          'select 1 as "Comp.1", 1 as "Comp.2", 1 as "Comp.3", 1 as "Comp.4", cast(''a'' as varchar2(12)) "variables" from dual','princomp.loadings'));
2 3
    Comp.1     Comp.2     Comp.3     Comp.4  variables
---------- ---------- ---------- ---------- ------------
-.53589947  .418180865 -.34123273  .649227804 Murder
-.58318363  .187985604 -.26814843 -.74340748  Assault
-.27819087 -.87280619  -.37801579  .133877731 UrbanPop
-.54343209 -.16731864   .817777908 .089024323 Rape

As you see above, we have the loadings result returned as a SQL table.


In this example, we walked through the steps of moving from invoking an R function to obtain a specific result to producing that same result from SQL by invoking an R script at the database server under the control of Oracle Database.

Thursday Feb 13, 2014

Monitoring progress of embedded R functions

When you run R functions in the database, especially functions involving multiple R engines in parallel, you can monitor their progress using the Oracle R Enterprise datastore as a central location for progress notifications, or any intermediate status or results. In the following example, based on ore.groupApply, we illustrate instrumenting a simple function that builds a linear model to predict flight arrival delay based on a few other variables.

In the function modelBuildWithStatus, the function verifies that there are rows for building the model after eliminating incomplete cases supplied in argument dat. If not empty, the function builds a model and reports “success”, otherwise, it reports “no data.” It’s likely that the user would like to use this model in some way or save it in a datastore for future use, but for this example, we just build the model and discard it, validating that a model can be built on the data.


modelBuildWithStatus <-
  function(dat) {
    dat <- dat[complete.cases(dat),]
    if (nrow(dat)>0L) {
      mod <- lm(ARRDELAY ~ DISTANCE + AIRTIME + DEPDELAY, dat);
      "success"
    } else
      "no_data"
    }

When we invoke this using ore.groupApply, the goal is to build one model per “unique carrier” or airline. Using an ORE 1.4 feature, we specify the degree of parallelism using the parallel argument, setting it to 2.


res <- ore.groupApply(ONTIME_S[, c("UNIQUECARRIER","DISTANCE", "ARRDELAY", "DEPDELAY", "AIRTIME")],
        ONTIME_S$UNIQUECARRIER,
        modelBuildWithStatus,
        parallel=2L)

res.local<-ore.pull(res)
res.local[unlist(res.local)=="no_data"]

The result tells us about the status of each execution. Below, we print the unique carries that had no data.


R> res.local<-ore.pull(res)
R> res.local[unlist(res.local)=="no_data"]
$EA
[1] "no_data"

$`ML(1)`
[1] "no_data"

$`PA(1)`
[1] "no_data"

$PI
[1] "no_data"

$PS
[1] "no_data"

To monitor the progress of each execution, we can identify the group of data being processed in each function invocation using the value from the UNIQUECARRIER column. For this particular data set, we use the first two characters of the carrier’s symbol appended to “group.” to form a unique object name for storing in the datastore identified by job.name. (If we don’t do this, the value will form an invalid object name.) Note that since the UNIQUECARRIER column contains uniform data, we need only the first value.

The general idea for monitoring progress is to save an object in the datastore named for each execution of the function on a group. We can then list the contents of the named datastore and compute a percentage complete, which is discussed later in this post. For the “success” case, we assign the value “SUCCESS” to the variable named by the string in nm that we created earlier. Using ore.save, this uniquely named object is stored in the datastore with the name in job.name. We use the append=TRUE flag to indicate that the various function executions will be sharing the same named datastore.
If there is no data left in dat, we assign “NO DATA” to the variable named in nm and save that. Notice in both cases, we’re still returning “success” or “no data” so these come back in the list returned by ore.groupApply. However, we can return other values instead, e.g., the model produced.


modelBuildWithMonitoring <-
  function(dat, job.name) {
  nm <- paste("group.", substr(as.character(dat$UNIQUECARRIER[1L]),1,2), sep="")
  dat <- dat[complete.cases(dat),]
  if (nrow(dat)>0L) {
    mod <- lm(ARRDELAY ~ DISTANCE + AIRTIME + DEPDELAY, dat);
    assign(nm, "SUCCESS")
    ore.save(list=nm, name=job.name, append=TRUE)
    "success"
  } else {
    assign(nm, "NO DATA")
    ore.save(list=nm, name=job.name, append=TRUE)
    "no data"
  }
}

When we use this function in ore.groupApply, we provide the job.name and ore.connect arguments as well. The variable ore.connect must be set to TRUE in order to use the datastore. As the ore.groupApply executes, the datastore named by job.name will be increasingly getting objects added with the name of the carrier. First, delete the datastore named “job1”, if it exists.


ore.delete(name="job1")

res <- ore.groupApply(ONTIME_S[, c("UNIQUECARRIER","DISTANCE", "ARRDELAY", "DEPDELAY", "AIRTIME")],
        ONTIME_S$UNIQUECARRIER,
        modelBuildWithMonitoring,
        job.name="job1", parallel=2L, ore.connect=TRUE)

To see the progress during execution, we can use the following function, which takes a job name and the cardinality of the INDEX column to determine the percent complete. This function is invoked in a separate R engine connected to the same schema. If the job name is found, we print the percent complete, otherwise stop with an error message.


check.progress <- function(job.name, total.groups) {
  if ( job.name %in% ore.datastore()$datastore.name )
    print(sprintf("%.1f%%", nrow(ore.datastoreSummary(name=job.name))/total.groups*100L))
  else
    stop(paste("Job", job.name, " does not exist"))
}

To invoke this, compute the total number of groups and provide this and the job name to the function check.progress.
total.groups <- length(unique(ONTIME_S$UNIQUECARRIER))
check.progress("job1",total.groups)

However, we really want a loop to report on the progress automatically. One simple approach is to set up a while loop with a sleep delay. When we reach 100%, stop. To be self-contained, we include a simplification of the function above as a local function.


check.progress.loop <- function(job.name, total.groups, sleep.time=2) {
  check.progress <- function(job.name, total.groups) {
    if ( job.name %in% ore.datastore()$datastore.name )
      print(sprintf("%.1f%%", nrow(ore.datastoreSummary(name=job.name))/total.groups*100L))
    else
      paste("Job", job.name, " does not exist")
  }
  while(1) {
    try(x <- check.progress(job.name,total.groups))
    Sys.sleep(sleep.time)
    if(x=="100.0%") break
  }
}

As before, this function is invoked in a separate R engine connected to the same schema.


check.progress.loop("job1",total.groups)

Looking at the results, we can see the progress reported at one second intervals. Since the models build quickly, it doesn’t take long to reach 100%. For functions that take longer to execute or where there are more groups to process, you may choose a longer sleep time. Following this, we look at the datastore “job1” using ore.datastore and its contents using ore.datastoreSummary.


R> check.progress.loop("job1",total.groups,sleep.time=1)
[1] "6.9%"
[1] "96.6%"
[1] "100.0%"

R> ore.datastore(name="job1")
  datastore.name object.count size      creation.date description
1 job1 29 1073 2014-02-13 22:03:20
R> ore.datastoreSummary(name="job1")
object.name class size length row.count col.count
1 group.9E character 37 1 NA NA
2 group.AA character 37 1 NA NA
3 group.AQ character 37 1 NA NA
4 group.AS character 37 1 NA NA
5 group.B6 character 37 1 NA NA
6 group.CO character 37 1 NA NA
7 group.DH character 37 1 NA NA
8 group.DL character 37 1 NA NA
9 group.EA character 37 1 NA NA
10 group.EV character 37 1 NA NA
11 group.F9 character 37 1 NA NA
12 group.FL character 37 1 NA NA
13 group.HA character 37 1 NA NA
14 group.HP character 37 1 NA NA
15 group.ML character 37 1 NA NA
16 group.MQ character 37 1 NA NA
17 group.NW character 37 1 NA NA
18 group.OH character 37 1 NA NA
19 group.OO character 37 1 NA NA
20 group.PA character 37 1 NA NA
21 group.PI character 37 1 NA NA
22 group.PS character 37 1 NA NA
23 group.TW character 37 1 NA NA
24 group.TZ character 37 1 NA NA
25 group.UA character 37 1 NA NA
26 group.US character 37 1 NA NA
27 group.WN character 37 1 NA NA
28 group.XE character 37 1 NA NA
29 group.YV character 37 1 NA NA

The same basic technique can be used to note progress in any long running or complex embedded R function, e.g., in ore.tableApply or ore.doEval. At various points in the function, sequence-named objects can be added to a datastore. Moreover, the contents of those objects can contain incremental or partial results, or even debug output.

While we’ve focused on the R API for embedded R execution, the same functions could be invoked using the SQL API. However, monitoring would still be done from an interactive R engine.

Tuesday Feb 04, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 6

How can I use "group apply" to partition data over multiple columns for parallel execution?
How can I use R for statistical computations and return results as a database table?

In this blog post of our theme and variation series, we answer these two questions through several examples, highlighting both R and SQL interfaces.

So far in this blog series on Oracle R Enterprise embedded R execution we've covered:

Part 1: ore.doEval / rqEval
Part 2: ore.tableApply / rqTableEval
Part 3: ore.groupApply / “rqGroupApply”
Part 4: ore.rowApply / rqRowEval
Part 5: ore.indexApply

Using ore.groupApply for partitioning data on multiple columns

While the “group apply” functionality is quite powerful as it is, users sometimes want to partition data on multiple columns. Since ore.groupApply currently takes only a single column for the INDEX argument, users can create a new column that is the concatenation of the columns of interest, and provide this column to the INDEX argument. We’ll illustrate this first using the R API, and then the SQL API.

R API

We adapt an example from Part 3 to illustrate partitioning data on multiple columns. Instead of building a C5.0 model, we’ll use the same CHURN_TRAIN data set, but build an rpart model since it will produce rules on the partitions of data we’ve chosen for the example, namely, voice_mail_plan and international_plan. To understand the number of rows we can expect in each partition, we’ll use the R table function. We then add a new column that pastes together the two columns of interest to create a new column called “vmp_ip”.


library(C50)
data(churn)

ore.create(churnTrain, "CHURN_TRAIN")

table(CHURN_TRAIN$international_plan, CHURN_TRAIN$voice_mail_plan)
CT <- CHURN_TRAIN
CT$vmp_ip <- paste(CT$voice_mail_plan,CT$international_plan,sep="-")
head(CT)

Each invocation of the function “my.rpartFunction” will receive data from one of the partitions identified in vmp_ip. Since our source partition columns are constants, we set them to NULL. The character vectors are converted to factors and the model is built to predict churn and saved in an appropriately named datastore. Instead of returning TRUE as done in the previous example, we create a list to return the specific partition column values, the distribution of churn values, and the model itself.


ore.scriptDrop("my.rpartFunction")
ore.scriptCreate("my.rpartFunction",
  function(dat,datastorePrefix) {
    library(rpart)
    vmp <- dat[1,"voice_mail_plan"]
    ip <- dat[1,"international_plan"]
    datastoreName <- paste(datastorePrefix,vmp,ip,sep="_")
    dat$voice_mail_plan <- NULL
    dat$international_plan <- NULL
    dat$state <- as.factor(dat$state)
    dat$churn <- as.factor(dat$churn)
    dat$area_code <- as.factor(dat$area_code)
    mod <- rpart(churn ~ ., data = dat)
    ore.save(mod, name=datastoreName, overwrite=TRUE)
    list(voice_mail_plan=vmp,
        international_plan=ip,
        churn.table=table(dat$churn),
        rpart.model = mod)
  })

After loading the rpart library and setting the datastore prefix, we invoke ore.groupApply using the derived column vmp_ip as the input to argument INDEX. After building the models, we’ll look at the first entry in the list returned. Using ore.load, we can load the model for the case where the customer neither has the voice mail plan, nor the international plan.


library(rpart)

datastorePrefix="my.rpartModel"

res <- ore.groupApply( CT, INDEX=CT$vmp_ip,
      FUN.NAME="my.rpartFunction",
      datastorePrefix=datastorePrefix,
      ore.connect=TRUE)
res[[1]]
ore.load(name=paste(datastorePrefix,"no","no",sep="_"))
mod
SQL API

To invoke this from the SQL API, we use the same approach as covered in Part 3. While we could create the table CT from the ore.frame used above, instead the following illustrates creating the derived column in SQL and explicitly defining a VIEW.


CREATE OR REPLACE VIEW CT AS
  SELECT t.*, "voice_mail_plan" || '-' || "international_plan" as "vmp_ip"
  FROM CHURN_TRAIN t;

Next, we create a PL/SQL PACKAGE and FUNCTION for the invocation.


CREATE OR REPLACE PACKAGE churnPkg AS
  TYPE cur IS REF CURSOR RETURN CT%ROWTYPE;
END churnPkg;
/
CREATE OR REPLACE FUNCTION churnGroupEval(
  inp_cur churnPkg.cur,
  par_cur SYS_REFCURSOR,
  out_qry VARCHAR2,
  grp_col VARCHAR2,
  exp_txt CLOB)
RETURN SYS.AnyDataSet
PIPELINED PARALLEL_ENABLE (PARTITION inp_cur BY HASH ("vmp_ip"))
CLUSTER inp_cur BY ("vmp_ip")
USING rqGroupEvalImpl;
/

Then, we can invoke the R function by name in the SELECT statement as follows:


select *
from table(churnGroupEval(
  cursor(select * from CT),
  cursor(select 1 as "ore.connect",' my.rpartModel2' as "datastorePrefix" from dual),
  'XML', 'state', 'my.rpartFunction'));

As another variation on this theme, suppose that you didn’t want to include all the columns from the source data set. To achieve this, you could create a view and define the PACKAGE from the view. However, you could also define a record that contains the specific columns of interest. This is a standard PL/SQL specification that can be used in combination with “group apply”.


CREATE OR REPLACE PACKAGE churnPkg2 AS
  TYPE rec IS RECORD ("vmp_ip" varchar2(8),
    "churn" varchar2(4),
    "state" varchar2(4),
    "account_length" NUMBER(38));
  TYPE cur IS REF CURSOR RETURN rec;
END churnPkg2;
/

If you don’t want to or cannot create a view, this allows you to specify the exact columns required for model building. Reducing the number of columns on input can improve performance, since only required data will be passed to the server-side R engine. Notice that we could have used this above since we remove the columns for the source partition columns.

How to return results from R statistical functions as database table data

R provides a wide range of statistical and advanced analytics functions. While Oracle Database contains a wide range of statistical functional in SQL, R further extends this set. In this next topic, we illustrate how to return statistical results as a SQL table for use with other SQL queries or to feed SQL-based applications.

As our example, we’ll use the R principal components function princomp. Our goal is to return the loadings of the PCA model as a database table. For our data set, we’ll use the USArrests data set provided with R. We can view the results of princomp in the mod variable, which has class “princomp”. We then push this data to Oracle Database, getting an ore.frame object.


mod <- princomp(USArrests, cor = TRUE)
class(mod)
mod
dat <- ore.push(USArrests)

R> mod <- princomp(USArrests, cor = TRUE)
R> class(mod)
[1] "princomp"
R> mod
Call:
princomp(x = USArrests, cor = TRUE)

Standard deviations:
   Comp.1    Comp.2    Comp.3    Comp.4
1.5748783 0.9948694 0.5971291 0.4164494

4 variables and 50 observations.
R> dat <- ore.push(USArrests)

In the first case considered, we use ore.tableApply to return simply the princomp object. When we do this we’re getting back a serialized object of type ore.object, but the actual princomp object still resides in the database. We can pull this object from the database to get a local princomp object, but this type of result cannot be directly returned as a SQL table because we need an object of class data.frame (which we’ll address later).


res <- ore.tableApply(dat,
      function(dat) {
        princomp(dat, cor=TRUE)
      })
class(res)
res.local <- ore.pull(res)
class(res.local)
str(res.local)
res.local
res

In the following output, we see the result is an ore.object that we pull from the database to get a princomp object. We examine the structure of the object and focus on the loadings element. In the example, we print res.local and res. Since res is an ore.object, it automatically gets pulled to the client before printing it.


R> res <- ore.tableApply(dat,
+ function(dat) {
+ princomp(dat, cor=TRUE)
+ })
R> class(res)
[1] "ore.object"
attr(,"package")
[1] "OREembed"
R> res.local <- ore.pull(res)
R> class(res.local)
[1] "princomp"
R> str(res.local)
List of 7
$ sdev : Named num [1:4] 1.575 0.995 0.597 0.416
..- attr(*, "names")= chr [1:4] "Comp.1" "Comp.2" "Comp.3" "Comp.4"
$ loadings: loadings [1:4, 1:4] -0.536 -0.583 -0.278 -0.543 0.418 ...
..- attr(*, "dimnames")=List of 2
.. ..$ : chr [1:4] "Murder" "Assault" "UrbanPop" "Rape"
.. ..$ : chr [1:4] "Comp.1" "Comp.2" "Comp.3" "Comp.4"
$ center : Named num [1:4] 7.79 170.76 65.54 21.23
..- attr(*, "names")= chr [1:4] "Murder" "Assault" "UrbanPop" "Rape"
$ scale : Named num [1:4] 4.31 82.5 14.33 9.27
..- attr(*, "names")= chr [1:4] "Murder" "Assault" "UrbanPop" "Rape"
$ n.obs : int 50
$ scores : num [1:50, 1:4] -0.986 -1.95 -1.763 0.141 -2.524 ...
..- attr(*, "dimnames")=List of 2
.. ..$ : chr [1:50] "1" "2" "3" "4" ...
.. ..$ : chr [1:4] "Comp.1" "Comp.2" "Comp.3" "Comp.4"
$ call : language princomp(x = dat, cor = TRUE)
- attr(*, "class")= chr "princomp"
R> res.local
Call:
princomp(x = dat, cor = TRUE)

Standard deviations:
   Comp.1    Comp.2    Comp.3    Comp.4
1.5748783 0.9948694 0.5971291 0.4164494

4 variables and 50 observations.
R> res
Call:
princomp(x = dat, cor = TRUE)

Standard deviations:
   Comp.1    Comp.2    Comp.3    Comp.4
1.5748783 0.9948694 0.5971291 0.4164494

4 variables and 50 observations.

In this next case, we focus on the loadings component of the princomp object, which contains the matrix of variable loadings, that is a matrix whose columns contain the eigenvectors. This is of class "loadings"…still not a data.frame. To convert the loadings component to a data.frame, we determine the dimensions of the matrix and then construct a data.frame by accessing the cells of the loading object. To get the variables associated with each row, we assign to the column variables the row names of the loadings. Finally, we return the loadings data.frame.


res <- ore.tableApply(dat,
      function(dat) {
        mod <- princomp(dat, cor=TRUE)
        dd <- dim(mod$loadings)
        ldgs <- as.data.frame(mod$loadings[1:dd[1],1:dd[2]])
        ldgs$variables <- row.names(ldgs)
        ldgs
      })
class(res)
res

In the output below, notice that we still have an ore.object being returned, but it’s in the form of a data.frame.


R> res <- ore.tableApply(dat,
+ function(dat) {
+ mod <- princomp(dat, cor=TRUE)
+ dd <- dim(mod$loadings)
+ ldgs <- as.data.frame(mod$loadings[1:dd[1],1:dd[2]])
+ ldgs$variables <- row.names(ldgs)
+ ldgs
+ })
R> class(res)
[1] "ore.object"
attr(,"package")
[1] "OREembed"
R> res
        Comp.1    Comp.2     Comp.3     Comp.4 variables
Murder -0.5358995 0.4181809 -0.3412327 0.64922780 Murder
Assault -0.5831836 0.1879856 -0.2681484 -0.74340748 Assault
UrbanPop -0.2781909 -0.8728062 -0.3780158 0.13387773 UrbanPop
Rape -0.5434321 -0.1673186 0.8177779 0.08902432 Rape

We can address this last issue by specifying the FUN.VALUE argument to get an ore.frame result (left as an exercise to the reader). But our main goal is to enable returning the loadings from SQL as a database table. For that, we create the function in the R script repository and construct the appropriate SQL query. In preparation for the next example, we’ll create the table USARRESTS using the R data set.


ore.create(USArrests,table="USARRESTS")

Now, we’ll switch to SQL. We’re introducing the functions sys.rqScriptDrop and sys.rqScriptCreate, which are used within a BEGIN END PL/SQL block, to store the R function ‘princomp.loadings’.


begin
--sys.rqScriptDrop('princomp.loadings');
sys.rqScriptCreate('princomp.loadings',
      'function(dat) {
        mod <- princomp(dat, cor=TRUE)
        dd <- dim(mod$loadings)
        ldgs <- as.data.frame(mod$loadings[1:dd[1],1:dd[2]])
        ldgs$variables <- row.names(ldgs)
        ldgs
      }');
end;
/

The SELECT statement provides input data by selecting all data from USARRESTS. There are no arguments to pass, so the next parameter is NULL. The SELECT string describes the format of the result. Notice that the column names must match in name (including case) and type. The last parameter is the name of the function stored in the R script repository.


select *
from table(rqTableEval( cursor(select * from USARRESTS),NULL,
          'select 1 as "Comp.1", 1 as "Comp.2", 1 as "Comp.3", 1 as "Comp.4", cast(''a'' as varchar2(12)) "variables" from dual','princomp.loadings'));

SQL> select *
from table(rqTableEval( cursor(select * from USARRESTS),NULL,
          'select 1 as "Comp.1", 1 as "Comp.2", 1 as "Comp.3", 1 as "Comp.4", cast(''a'' as varchar2(12)) "variables" from dual','princomp.loadings'));
2 3
    Comp.1     Comp.2     Comp.3     Comp.4 variables
---------- ---------- ---------- ---------- ------------
-.53589947 .418180865 -.34123273 .649227804 Murder
-.58318363 .187985604 -.26814843 -.74340748 Assault
-.27819087 -.87280619 -.37801579 .133877731 UrbanPop
-.54343209 -.16731864 .817777908 .089024323 Rape

If you have interesting embedded R scenarios to share with the ORE community, please consider posting a comment.

Monday Jan 20, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 5


In the first four parts of Invoking R scripts via Oracle Database: Theme and Variation, we introduced features of Oracle R Enterprise embedded R execution involving the functions ore.doEval / rqEval, ore.tableApply / rqTableEval, ore.groupApply / “rqGroupApply”, and ore.rowApply / rqRowEval. In this blog post, we cover ore.indexApply. Note that there is no corresponding rqIndexEval – more on that later. The “index apply” function is also one of the parallel-enabled embedded R execution functions. It supports task-parallel execution, where one or more R engines perform the same or different calculations, or task. A number, associated with the index of the execution, is provided to the function. Any required data is expected to be explicitly generated or loaded within the function.

This functionality is valuable in a variety of settings, e.g., simulations, for taking advantage of high-performance computing hardware like Exadata.

As for “group apply” and “row apply”, Oracle Database handles the management and control of potentially multiple R engines at the database server machine, with only the index passed to the function as the first argument. Oracle Database ensures that each R function execution completes, otherwise the ORE function returns an error. Output formats as supported by the other embedded R functions are possible for ore.indexApply, for example, returning an ore.list or combining data.frame data into an ore.frame.

The variation on embedded R execution for ore.indexApply involves passing as an argument the number of times the user-defined R function should be executed.

Let’s look at a simple example.

The following code specifies to execute the function five times in parallel.


res <- ore.indexApply(5,
      function(index) {
        paste("IndexApply:",index)
      },
    parallel=TRUE)
class(res)
res

Notice that the class of the result is an ore.list, and when we print res, we have 5 character vectors, each with the index that was passed to the user-defined function. As with other parallel embedded R functions, the number of concurrently executing R engines can be limited by specifying the degree of parallelism of the database. As we’ll see in ORE 1.4, the parallel argument can specify a preferred number of parallel R engines, as an upper bound.


> class(res)
[1] "ore.list"
attr(,"package")
[1] "OREbase"
> res
$`1`
[1] "IndexApply: 1"

$`2`
[1] "IndexApply: 2"

$`3`
[1] "IndexApply: 3"

$`4`
[1] "IndexApply: 4"

$`5`
[1] "IndexApply: 5"

Column-parallel use case

If we wanted to parallelize R’s summary function, we could compute the summary statistics on each column in parallel and combine them into a final result. The following example does exactly that. While we could generalize this example, we focus on the iris data set and computing summary statistics on the first four numeric columns. Since iris comes standard with R, there’s no need to load data from any other source, we simply access it. The first argument to ore.indexApply is 4, the number of columns we wish to summarize in parallel. The function takes one argument, index, which will be a value between 1 and 4, and will be used to select the column to summarize. We massage the result of summary into a data.frame and add the column name to the result. Note that the function returns a single row: the summary statistics for the column.


res <- NULL
res <- ore.indexApply(4,
      function(index) {
        ss <- summary(iris[,index])
        attr.names <- attr(ss,"names")
        stats <- data.frame(matrix(ss,1,length(ss)))
        names(stats) <- attr.names
        stats$col <- names(iris)[index]
        stats
      },
      parallel=TRUE)
res

The result comes back as an ore.list object:


> res
$`1`
Min. 1st Qu. Median Mean 3rd Qu. Max. col
1 4.3 5.1 5.8 5.843 6.4 7.9 Sepal.Length

$`2`
Min. 1st Qu. Median Mean 3rd Qu. Max. col
1 2 2.8 3 3.057 3.3 4.4 Sepal.Width

$`3`
Min. 1st Qu. Median Mean 3rd Qu. Max. col
1 1 1.6 4.35 3.758 5.1 6.9 Petal.Length

$`4`
Min. 1st Qu. Median Mean 3rd Qu. Max. col
1 0.1 0.3 1.3 1.199 1.8 2.5 Petal.Width

This is good, but it would be better if the result was returned as an ore.frame, especially since all the columns are the same. To enable this, we’ll do a slight variation on the result by specifying FUN.VALUE with the structure of the result defined.


res <- ore.indexApply(4,
      function(index) {
        ss <- summary(iris[,index])
        attr.names <- attr(ss,"names")
        stats <- data.frame(matrix(ss,1,length(ss)))
        names(stats) <- attr.names
        stats$col <- names(iris)[index]
        stats
      },
      FUN.VALUE=data.frame(Min.=numeric(0),
        "1st Qu."=numeric(0),
        Median=numeric(0),
        Mean=numeric(0),
        "3rd Qu."=numeric(0),
        Max.=numeric(0),
        col=character(0)),
      parallel=TRUE)
res

Now, the result comes back as an ore.frame.


> res
  Min. X1st.Qu. Median  Mean X3rd.Qu. Max.      col
1 0.1 0.3 1.30 1.199 1.8 2.5 Petal.Width
2 1.0 1.6 4.35 3.758 5.1 6.9 Petal.Length
3 4.3 5.1 5.80 5.843 6.4 7.9 Sepal.Length
4 2.0 2.8 3.00 3.057 3.3 4.4 Sepal.Width
Simulation use case

The ore.indexApply function can be used in simulations as well. In this next example we take multiple samples from a random normal distribution with the goal to compare the distribution of the summary statistics. For this, we build upon the example above. We provide parameters such as the sample size, mean and standard deviation of the random numbers, and the number of simulations we want to perform. Each one of these simulations will occur in a separate R engine, in parallel, up to the degree of parallelism allowed by the database.

We specify num.simulations as the first parameter to ore.indexApply. Inside the user-defined function, we pass the index and three arguments to the function. The function then sets the random seed based on the index. This allows each invocation to generate a different set of random numbers. Using rnorm, the function produces sample.size random normal values. We invoke summary on the vector of random numbers, and then prepare a data.frame result to be returned. We’re using the FUN.VALUE to get an ore.frame as the final result.


res <- NULL
sample.size = 1000
mean.val = 100
std.dev.val = 10
num.simulations = 1000

res <- ore.indexApply(num.simulations,
      function(index, sample.size=1000, mean=0, std.dev=1) {
        set.seed(index)
        x <- rnorm(sample.size, mean, std.dev)
        ss <- summary(x)
        attr.names <- attr(ss,"names")
        stats <- data.frame(matrix(ss,1,length(ss)))
        names(stats) <- attr.names
        stats$index <- index
        stats
      },
      FUN.VALUE=data.frame(Min.=numeric(0),
        "1st Qu."=numeric(0),
        Median=numeric(0),
        Mean=numeric(0),
        "3rd Qu."=numeric(0),
        Max.=numeric(0),
        index=numeric(0)),
      parallel=TRUE,
      sample.size=sample.size,
      mean=mean.val, std.dev=std.dev.val)
res
boxplot(ore.pull(res[,1:6]),
  main=sprintf("Boxplot of %d rnorm samples size %d, mean=%d, sd=%d",
        num.simulations, sample.size, mean.val, std.dev.val))

To get the distribution of samples, we invoke boxplot on the data.frame after pulling the result to the client.

Here are a couple of plots showing results for different parameters:


In both cases, we run 10,000 samples. The first graph uses a sample size of 10 and the second uses a sample size of 1000. From these results, it is clear that a larger sample size significantly reduces the variance in each of the summary statistics - confirming our Statistics 101 understanding.

Error reporting

As introduced above, Oracle Database ensures that each embedded R user-defined function execution completes, otherwise the ORE function returns an error. Of course, any side-effects of the user-defined function need to be manually cleaned up. Operations that produce files, create tables in the database, or result in completed database transactions through ROracle will remain intact. The ORE embedded R infrastructure will report errors as produced by the function as illustrated in the following example.

The code specifies to invoke 4 parallel R engines. If the index has value 3, attempt to load the non-existant package "abc123" (which produces an error), otherwise return the index value.


R> ore.indexApply(4,
+ function(index) {
+ if (index==3) {library(abc123)}
+ else {return(index)}
+ }
+ )
Error in .oci.GetQuery(conn, statement, data = data, prefetch = prefetch, :
ORA-12801: error signaled in parallel query server P000
ORA-20000: RQuery error
Error in library(abc123) : there is no package called 'abc123'
ORA-06512: at "RQSYS.RQGROUPEVALIMPL", line 121
ORA-06512: at "RQSYS.RQGROUPEVALIMPL", line 118

Notice that the first reported error is an ORE-12801: error signaled in parallel query server. Then the ORA-20000: RQuery error indicates the error as returned by the R engine. Also interesting to note is that the ORA-06512 errors reveal the underlying implementation of ore.indexApply "RQSYS.RQGROUPEVALIMPL". Which leads us to the next topic.

No rqIndexEval?

“Index apply” is really a variation of “group apply” where the INDEX column is a numeric vector that is pushed to the database. With n distinct numbers, one number is provided to each function as its index. As a result, there is no corresponding rqIndexEval in the SQL API. The user would have to create a similar package and function as was illustrated in the blog post on “group apply.”

Thursday Jan 09, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 4

In the first three parts of Invoking R scripts via Oracle Database: Theme and Variation, we introduced features of Oracle R Enterprise embedded R execution involving the functions ore.doEval / rqEval, ore.tableApply / rqTableEval, and ore.groupApply / “rqGroupApply”. In this blog post, we’ll cover the next in our theme and variation series involving ore.rowApply and rqRowEval. The “row apply” function is also one of the parallel-enabled embedded R execution functions. It supports data-parallel execution, where one or more R engines perform the same R function, or task, on disjoint chunks of data. This functionality is essential to enable scalable model scoring/predictions on large data sets and for taking advantage of high-performance computing hardware like Exadata.

As for ore.groupApply, Oracle Database handles the management and control of potentially multiple R engines at the database server machine, automatically chunking and passing data to parallel executing R engines. Oracle Database ensures that R function executions for all chunks of rows complete, or the ORE function returns an error. The result from the execution of each user-defined embedded R function is gathered in an ore.list. This list remains in the database until the user requires the result. However, we’ll also show how data.frame results from each execution can be combined into a single ore.frame. This features works for return values of other embedded R functions as well.

The variation on embedded R execution for ore.rowApply involves passing not only an ore.frame to the function such that the first parameter of your embedded R function receives a data.frame, but also the number of rows that should be passed to each invocation of the user-defined R function. The last chunk, of course, may have fewer rows than specified.

Let’s look at an example. We’re going to use the C50 package to score churn data (i.e., predict which customers are likely to churn) using the C5.0 decision tree models we built in the previous blog post with ore.groupApply. (Well, almost. We need to rebuild the models to take into account the full data set levels.) The goal is to score the customers in parallel leveraging the power of a high performance computing platform, such as Exadata.


library(C50)
data(churn)

ore.create(churnTest, "CHURN_TEST")

myFunction <- function(dat, xlevels, datastorePrefix) {
  library(C50)
  state <- dat[1,"state"]
  datastoreName <- paste(datastorePrefix,state,sep="_")
  dat$state <- NULL
  for (j in names(xlevels))
    dat[[j]] <- factor(dat[[j]], levels = xlevels[[j]])
  ore.load(name=datastoreName)
  res <- data.frame(pred=predict(mod,dat, type="class"),
        actual=dat$churn,
        state=state)
  res
}

xlevels <- ore.getXlevels(~ ., CHURN_TEST[,-1])
scoreList <- ore.groupApply(
  CHURN_TEST,
  INDEX=CHURN_TEST$state,
  myFunction,
  datastorePrefix="myC5.0model3",xlevels=xlevels, ore.connect=TRUE)
score.MA <- ore.pull(scoreList$MA)
table(score.MA$actual, score.MA$pred)

A few points to highlight:

• Instead of computing the levels using the as.factor function inside the user-defined function, we’ll use ore.getXlevels, which returns the levels for each factor column. We don’t need this for the state column, so we exclude it (“-1”). In the previous post we noted that factor data is passed as character columns in the data.frame. Computing the levels first can ensure that all possible levels are provided during model building, even if there are no rows with some of the level values.
• When building models where some levels were missing (due to using as.factor on each partition of data), scoring can fail if the test data has unknown level values. For this reason, the models built in Part 3 need to be rebuilt using the approach above with ore.getXlevels. This is left as an exercise for the reader.
• Assign the function to the variable “myFunction” to facilitate reuse (see below).
• We construct the datastore name to be the same as when we were building the models, i.e., appending the state value to the datastore prefix separated by an ‘_’.
• The for loop iterates over the levels passed in as xlevels, creating a factor using the provided levels and assigning it back to the data.frame.
• Loading the datastore by name, we have access to the variable mod, which contains the model for the particular state.
• The result is constructed as a data.frame with the prediction and the actual values.
• Three arguments are passed: the datastore prefix, the levels that were pre-computed, and that we need to connect to the database because we’re using a datastore.
• The results are stored as a list of ore.frames. We can pull the scores for MA and compute a confusion matrix using table.

This is fine. However, we likely don’t want to have a list of separate ore.frames as the result. We’d prefer to have a single ore.frame with all the results. This can be accomplished using the FUN.VALUE argument. Whenever a data.frame is the result of the user-defined R function, and if the structure of that data.frame is the same across all invocations of the group apply or row apply, you can combine them into a single result by defining the structure as follows:

scores <- ore.groupApply(
  CHURN_TEST,
  INDEX=CHURN_TEST$state,
  myFunction,
  datastorePrefix="myC5.0model3",xlevels=xlevels, ore.connect=TRUE,
  FUN.VALUE=data.frame(pred=character(0),
        actual=character(0),
        state=character(0)));
head(scores)
scores.local <- ore.pull(scores)
table(scores.local[scores.local$state=="MA",c("actual","pred")])

scores.MA <- scores[scores$state=="MA",c("actual","pred")]
table(scores.MA$actual, scores.MA$pred)

A few important points to highlight:

• FUN.VALUE is set to a data.frame that describes the format of the result. By providing this argument, you will get back a single ore.frame, not an ore.list object.
• The group apply completes instantaneously because it is only defining the ore.frame, not actually performing the scoring. Not until the values are needed does the result get computed. We invoke head on the ore.frame in scores to highlight this.
• We can pull the scores to the client to invoke table as before, but subselecting for state MA. However, we can also do this computation in the database using the transparency layer. First, we filter the rows for MA in scores.MA, and then invoke table on the two columns. Note: ORE requires passing the two columns explicitly to the overloaded function table.
• To do this in parallel, add the argument parallel=TRUE to the ore.groupApply call.

Wait! What happened to ore.rowApply?

Above, we showed how to score with multiple models using ore.groupApply. But what if we had customers from a single state that we wanted to score in parallel? We can use ore.rowApply and rqRowEval to invoke a function on chunks of data (rows) at a time, from 1 to the total number of rows. (Note that values closer to the latter will have no benefit from parallelism, obviously.)


scores <- ore.rowApply(
  CHURN_TEST[CHURN_TEST$state=="MA",],
  myFunction,
  datastorePrefix="myC5.0model3",xlevels=xlevels,
  ore.connect=TRUE, parallel=TRUE,
  FUN.VALUE=data.frame(pred=character(0),
        actual=character(0),
        state=character(0)),
  rows=200)
scores
table(scores$actual, scores$pred)

A few points to highlight:

• Since we want to perform the scoring in parallel by state, we filter the rows for MA. This will ensure that all rows processed can use the same predictive model.
• We set the rows argument to 200. CHURN_TEST has 1667 rows, so this will result in nine executions of myFunction. The first eight receiving 200 rows each and the last receiving 67 rows.
• We also set parallel=TRUE above since we want the scoring performed in parallel.
• The invocation of ore.rowApply returns immediately. Not until we print scores do we incur the cost of executing the underlying query. However, also note that each time we access scores, for example in the following call to table, we incur the cost of executing the query. If the result will be used many times in subsequent operations, you may want to create a table with the result using ore.create.

In SQL, we can do the same, but we’ll need to store the function in the R script repository (perhaps called "myScoringFunction") and also store xlevels in a datastore (perhaps called "myXLevels"). While we can pass complex objects in the R interface to embedded R functions, we cannot do that in SQL. Instead, we must pass the name of a datastore. Since the xlevels are in a datastore, the user-defined R function needs to be modified to take this other datastore name and load that datastore to have access to xlevels. This set of changes is left to the reader as an exercise.


select * from table(rqRowEval(
  cursor(select /*+ parallel(t, 4) */ *
        from CHURN_TEST t
        where "state" = 'MA'),
  cursor(select 1 as "ore.connect",
        'myC5.0model3' as "datastorePrefix",
        'myXLevels' as "xlevelsDatastore"
        from dual),
  'select ''aaa'' "pred",''aaa'' "actual" , ''aa'' "state" from dual',
    200, 'myScoringFunction'));

A few points to highlight:

• The input cursor specifies a parallel hint on the input data cursor and filtering data for MA as well.
• Several arguments are being passed, including the new argument to our function myXLevels.
• The output form is specified in the SQL string. Care must be taken to ensure that the column names, ordering, and the length of character strings match the returned data.frame.

Map Reduce

The “row apply” functionality can be thought of in terms of the map-reduce paradigm where the mapper performs the scoring and outputs a data.frame value (no key required). There is no reducer, or the reducer is simply a pass-through.

Memory and performance considerations

Unlike with group apply, the rows argument in row apply ensures an upper bound on the number of rows (and hence memory requirement). The value of rows should be chosen to balance memory and parallel performance. The usual measures can be taken regarding setting memory limits on the R engine – as noted in Part 2.

There may be instances where setting rows = 1 makes sense. For example, if the computation per row is intensive (i.e., takes a long time), sending one row per R engine may be appropriate. Experiment with a range of values for rows to determine the best value for your particular scenario.

Sunday Jan 05, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 3

In the first two parts of Invoking R scripts via Oracle Database: Theme and Variation, we introduced features of Oracle R Enterprise embedded R execution, focusing on the functions ore.doEval / rqEval and ore.tableApply / rqTableEval. In this blog post, we’ll cover the next in our theme and variation series involving ore.groupApply and the corresponding definitions required for SQL execution. The “group apply” function is one of the parallel-enabled embedded R execution functions. It supports data-parallel execution, where one or more R engines perform the same R function, or task, on different partitions of data. This functionality is essential to enable the building of potentially 10s or 100s of thousands of predictive models, e.g., one per customer, and for taking advantage of high-performance computing hardware like Exadata.

Oracle Database handles the management and control of potentially multiple R engines at the database server machine, automatically partitioning and passing data to parallel executing R engines. It ensures that all R function executions for all partitions complete, or the ORE function returns an error. The result from the execution of each user-defined embedded R function is gathered in an ore.list. This list remains in the database until the user requires the result.

The variation on embedded R execution for ore.groupApply involves passing not only an ore.frame to the function such that the first parameter of your embedded R function receives a data.frame, but also an INDEX argument that specifies the name of a column by which the rows will be partitioned for processing by a user-defined R function.

Let’s look at an example. We’re going to use the C50 package to build a C5.0 decision tree model on the churn data set from C50. The goal is to build one churn model on the data for each state.


library(C50)
data(churn)

ore.create(churnTrain, "CHURN_TRAIN")

modList <- ore.groupApply(
  CHURN_TRAIN,
  INDEX=CHURN_TRAIN$state,
    function(dat) {
      library(C50)
      dat$state <- NULL
      dat$churn <- as.factor(dat$churn)
      dat$area_code <- as.factor(dat$area_code)
      dat$international_plan <- as.factor(dat$international_plan)
      dat$voice_mail_plan <- as.factor(dat$voice_mail_plan)
      C5.0(churn ~ ., data = dat, rules = TRUE)
    });
mod.MA <- ore.pull(modList$MA)
summary(mod.MA)

A few points to highlight:
• As noted in Part 2 of this series, to use the CRAN package C50 on the client, we first load the library, and then the churn data set.
• Since the data is a data.frame, we’ll create a table in the database with this data. Notice that if you compare the results of str(churnTrain) with str(CHURN_TRAIN), you will see that the factor columns have been retained. This becomes relevant later.
• The function ore.groupApply will return a list of models stored as ore.object instances. The first argument is the ore.frame CHURN_TRAIN and the second argument indicates to partition the data on column state such that the user-defined function is invoked on each partition of the data.
• The next argument specifies the function, which could alternatively have been the function name if the FUN.NAME argument were used and the function saved explicitly in the R script repository. The function’s first argument (whatever its name) will receive one partition of data, e.g., all data associated with a single state.
• Regarding the user-defined function body, we explicitly load the package we’re using, C50, so the function body has access to it. Recall that this user-defined R function will execute at the database server in a separate R engine from the client.
• Since we don’t need to know which state we’re working with and we don’t want this included in the model, we delete the column from the data.frame.
• Although the ore.frame defined functions, when they are loaded to the user-defined embedded R function, factors appear as character vectors. As a result, we need to convert them back to factors explicitly.
• The model is built and returned from the function.
• The result from ore.groupApply is a list containing the results from the execution of the user-defined function on each partition of the data. In this case, it will be one C5.0 model per state.
• To view the model, we first use ore.pull to retrieve it from the database and then invoke summary on it. The class of mod.MA is “C5.0”.

SQL API

We can invoke the function through the SQL API by storing the function in the R script repository. Previously we showed doing this using the SQL API, however, we can also do this using the R API , but we’re going to modify the function to store the resulting models in an ORE datastore by state name:


ore.scriptCreate("myC5.0Function",
  function(dat,datastorePrefix) {
    library(C50)
    datastoreName <- paste(datastorePrefix,dat[1,"state"],sep="_")
    dat$state <- NULL
    dat$churn <- as.factor(dat$churn)
    dat$area_code <- as.factor(dat$area_code)
    dat$international_plan <- as.factor(dat$international_plan)
    dat$voice_mail_plan <- as.factor(dat$voice_mail_plan)
    mod <- C5.0(churn ~ ., data = dat, rules = TRUE)
    ore.save(mod, name=datastoreName)
    TRUE
  })

Just for comparison, we could invoke this from the R API as follows:


res <- ore.groupApply( CHURN_TRAIN, INDEX=CHURN_TRAIN$state,
          FUN.NAME="myC5.0Function",
          datastorePrefix="myC5.0model", ore.connect=TRUE)
res
res <- ore.pull(res)
all(as.logical(res) == TRUE)

Since we’re using a datastore, we need to connect to the database setting ore.connect to TRUE. We also pass the datastorePrefix. The result res is an ore.list of logical values. To test if all are TRUE, we first pull the result and use the R all function.

Back to the SQL API…Now that we can refer to the function in the SQL API, we invoke the function that places one model per datastore, each with the given prefix and state.


select *
from table(churnGroupEval(
  cursor(select * from CHURN_TRAIN),
  cursor(select 1 as "ore.connect",' myC5.0model2' as "datastorePrefix" from dual),
  'XML', 'state', 'myC5.0Function'));

There’s one thing missing, however. We don’t have the function churnGroupEval. There is no generic “rqGroupEval” in the API – we need to define our own table function that matches the data provided. Due to this and the parallel nature of the implementation, we need to create a PL/SQL FUNCTION and supporting PACKAGE:


CREATE OR REPLACE PACKAGE churnPkg AS
  TYPE cur IS REF CURSOR RETURN CHURN_TRAIN%ROWTYPE;
END churnPkg;
/
CREATE OR REPLACE FUNCTION churnGroupEval(
  inp_cur churnPkg.cur,
  par_cur SYS_REFCURSOR,
  out_qry VARCHAR2,
  grp_col VARCHAR2,
  exp_txt CLOB)
RETURN SYS.AnyDataSet
PIPELINED PARALLEL_ENABLE (PARTITION inp_cur BY HASH ("state"))
CLUSTER inp_cur BY ("state")
USING rqGroupEvalImpl;
/

The highlights in red indicate the specific parameters that need to be changed to create this function for any particular data set. There are other variants, but this will get you quite far.

To validate that our datastores were created, we invoke ore.datastore(). This returns the datastores present and we will see 51 such entries – one for each state and the District of Columbia.

Parallelism

Above, we mentioned that “group apply” supports data parallelism. By default, parallelism is turned off. To enable parallelism, the parameter to ore.groupApply needs to be set to TRUE.


ore.groupApply( CHURN_TRAIN, INDEX=CHURN_TRAIN$state,
          FUN.NAME="myC5.0Function",
          datastorePrefix="myC5.0model",
          ore.connect=TRUE,
          parallel=TRUE
)

In the case of the SQL API, the parallel hint can be provided with the input cursor. This indicates that a degree of parallelism up to 4 should be enabled.


select *
from table(churnGroupEval(
  cursor(select * /*+ parallel(t,4) */ from CHURN_TRAIN t),
  cursor(select 1 as "ore.connect",' myC5.0model2' as "datastorePrefix" from dual),
  'XML', 'state', 'myC5.0Function'));
Map Reduce

The “group apply” functionality can be thought of in terms of the map-reduce paradigm where the mapper performs the partitioning by outputting the INDEX value as key and the data.frame as value. Then, each reducer receives the rows associated with one key. In our example above, INDEX was the column state and so each reducer would receive rows associated with a single state.

Memory and performance considerations

While the data is partitioned by the INDEX column, it is still possible that a given partition is quite large, such that either the partition of data will not fit in the R engine memory or the user-defined embedded R function will not be able to execute to completion. The usual remedial measures can be taken regarding setting memory limits – as noted in Part 2.

If the partitions are not balanced, you would have to configure the system’s memory for the largest partition. This will also have implications for performance, obviously, since smaller partitions of data will likely complete faster than larger ones.

The blog post Managing Memory Limits and Configuring Exadata for Embedded R Execution discusses how to instrument your code to understand the memory usage of your R function. This is done in the context of ore.indexApply (to be discussed later in this blog series), but the approach is analogous for “group apply.”

Friday Jan 03, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 2

In part 1 of Invoking R scripts via Oracle Database: Theme and Variation, we introduced features of Oracle R Enterprise embedded R execution, focusing on the functions ore.doEval and rqEval. In this blog post, we’ll cover the next in our theme and variation series involving ore.tableApply and rqTableEval.

The variation on embedded R execution for ore.tableApply involves passing an ore.frame to the function such that the first parameter of your embedded R function receives a data.frame. The rqTableEval function in SQL allows users to specify a data cursor to be delivered to your embedded R function as a data.frame.

Let’s look at a few examples.


R API

In the following example, we’re using ore.tableApply to build a Naïve Bayes model on the iris data set. Naïve Bayes is found in the e1071 package, which must be installed on both the client and database server machine R engines.

library(e1071)
mod <- ore.tableApply(
ore.push(iris),
function(dat) {
library(e1071)
dat$Species <- as.factor(dat$Species)
naiveBayes(Species ~ ., dat)
})
class(mod)
mod

A few points to highlight:
• To use the CRAN package e1071 on the client, we first load the library.
• The iris data set is pushed to the database to create an ore.frame as the first argument to ore.tableApply. This would normally refer to an ore.frame that refers to a table that exists in Oracle Database. If not obvious, note that we could have previously assigned dat <- ore.push(iris) and passed dat as the argument as well.
• The embedded R function is supplied as the second argument to ore.tableApply as a function object. Recall from Part 1 that we could have alternatively assigned this function to a variable and passed the variable as an argument, or stored the function in the R script repository and passed the argument FUN.NAME with the assigned function name.
• The user-defined embedded R function takes dat as its first argument which will contain a data.frame derived from the ore.frame supplied.
• The model itself is returned from the function.
• The result of the ore.tableApply execution will be an ore.object.

SQL API

We can invoke the function through the SQL API by storing the function in the R script repository. Recall that the call to sys.rqScriptCreate must be wrapped in a BEGIN-END PL/SQL block.


begin
sys.rqScriptCreate('myNaiveBayesModel',
'function(dat) {
library(e1071)
dat$Species <- as.factor(dat$Species)
naiveBayes(Species ~ ., dat)
}');
end;
/

Invoking the function myNaiveBayesModel occurs in a SQL SELECT statement as shown below. The first argument to rqTableEval specifies a cursor that retrieves the IRIS table. Note that the IRIS table could have been created earlier using ore.create(iris,"IRIS"). The second argument, NULL, indicates that no arguments are supplied to the function.

The function returns an R object of type naiveBayes, but as a serialized object that is chunked into a table. This likely is not useful to most users.


select *
from table(rqTableEval(cursor(select * from IRIS), NULL, NULL, 'myNaiveBayesModel'));

If we want to keep the model in a more usable form, we can store it in an ORE datastore in Oracle Database. For this, we require a change to the user-defined R function and the SQL invocation.


begin
sys.rqScriptCreate('myNaiveBayesModel',
'function(dat) {
library(e1071)
dat$Species <- as.factor(dat$Species)
mod <- naiveBayes(Species ~ ., dat)
ore.save(mod, name="myNaiveBayesDatastore")
TRUE

}');
end;
/
select *
from table(rqTableEval(cursor(select * from IRIS), cursor(select 1 as "ore.connect" from dual), 'XML', 'myNaiveBayesModel'));

Highlighted in red, we’ve stored the model in the datastore named ‘myNaiveBayesDatastore’. We’ve also returned TRUE to have a simple value that can show up as the result of the function execution. In the SQL query, we changed the third parameter to ‘XML’ to return an XML string containing “TRUE”. The name of the datastore could be passed as an argument as follows:


begin
sys.rqScriptCreate('myNaiveBayesModel',
'function(dat, datastoreName) {
library(e1071)
dat$Species <- as.factor(dat$Species)
mod <- naiveBayes(Species ~ ., dat)
ore.save(mod, name=datastoreName)
TRUE
}');
end;
/
select *
from table(rqTableEval(
cursor(select * from IRIS),
cursor(select 'myNaiveBayesDatastore' "datastoreName", 1 as "ore.connect" from dual),
'XML',
'myNaiveBayesModel'));

Memory considerations with ore.tableApply and rqTableEval

The input data provided as the first argument to a user-defined R function invoked using ore.tableApply or rqTableEval is physically being moved from Oracle Database to the database server R engine. It’s important to realize that R’s memory limitations still apply. If your database server machine has 32 GB RAM and your data table is 64 GB, ORE will not be able to load the data into the R function’s dat argument.
You may see errors like:


Error : vector memory exhausted (limit reached)

or

ORA-28579: network error during callback from external procedure agent

See the blog post on Managing Memory Limits and Configuring Exadata for Embedded R Execution where we discuss setting memory limits for the database server R engine. This can be necessary to load reasonably sized data tables.

Parallelism

As with ore.doEval / rqEval, user-defined R functions invoked using ore.tableApply / rqTableEval are not executed in parallel, i.e., a single R engine is used to execute the user-defined R function.

Invoking certain ORE advanced analytics functions

In the current ORE release, some advanced analytics functions, like ore.lm or ore.glm, which use the embedded R execution framework, cannot be used within other embedded R calls such as ore.doEval / rqEval and ore.tableApply / rqTableEval.

You can expect to see an error like the following:


ORA-28580: recursive external procedures are not supported

In the next post in this series, I’ll discuss ore.groupApply and the corresponding definitions required for SQL execution, since there is no rqGroupApply function. I’ll also cover the relationship of various “group apply” constructs to map-reduce paradigm.

Wednesday Oct 02, 2013

Managing Memory Limits and Configuring Exadata for Embedded R Execution

An R engine can consume significant memory resources in the course of running R scripts. R users who work with Oracle R Enterprise Embedded R Execution on sizable data, especially application designers and database administrators (DBAs), have a vested interest in understanding and controlling the memory demands of R script execution to help ensure that sufficient memory resources are available for both their application and Oracle Database. ORE Embedded R Execution enables running R scripts managed by Oracle Database, both through R and SQL APIs. The SQL API enables seamless integration with database-based applications, data-parallel and task-parallel R script execution, and ease of production deployment.

To provide greater control over R memory consumption, Oracle R Enterprise provides a privileged SQL function for configuring a database server with R memory limits. In this blog post, we provide a discussion of R memory usage and garbage collection, and how this SQL function can be used to limit the amount of memory consumed by individual R engines started as part of ORE’s embedded R execution framework. We follow with an example of involving memory limit calculations on Exadata and some recommendations for DBAs to consider when configuring Exadata for embedded R execution. Note that such calculations and configuration settings are applicable to non-Exadata (single instance or custom RAC) environments as well. At the end, there a “tip of the day” for R memory management.

Garbage Collection as a concept

For those familiar with languages like C, memory is explicitly managed by the programmer through invocations of functions to allocate and free memory (malloc, calloc, free). Failing to free memory when finished with it results in “memory leaks” that can cause a process to consume (or exhaust) memory unnecessarily, often resulting in a program or system crash.

To alleviate programmers from this burden, languages like R and Java rely on garbage collection. “Garbage” is memory that is no longer being used, i.e., no longer referenceable, within your program. With garbage collection, programmers avoid dealing with memory management. The underlying system determines what memory is used or available, and frees memory periodically. Garbage collection, however, is not a panacea. Garbage collection can take time to process, e.g., on the order of seconds, which can make response time for certain functions unpredictable – although modern garbage collection mechanisms have largely mitigated this drawback. In addition, when garbage collection occurs is essentially non-deterministic, depending on heuristics set up by the language implementation. This means that memory may be retained longer than necessary.

Memory in R

In R, memory can be characterized along two dimensions: memory allocated for vectors and arrays (referred to as Vcells), and memory allocated for objects such as lists (referred to as “cons” cells or Ncells). When invoking R’s garbage collection function, gc(), you’ll see results like these:

The function gc() returns a matrix with rows Ncells, which corresponds to the cons cells, and Vcells, which corresponds to vector heap memory. The Ncells are 56 bytes/cell (49.2*1024*1024/.920477) on a 64-bit machine, and Vcells are ~8 bytes/cell (22.6*1024*1024/2.956944). The “used” column indicates the number of cells allocated, along with their corresponding megabytes. The column “gc trigger” indicates at what point garbage collection will kick in. The column “max used” indicates the maximum space used since the last call to gc(reset=TRUE) or since R started if gc(reset=TRUE) wasn’t invoked.

As an example of affecting Ncells, consider the following example where we initialize a list as a sequence of 100K numbers. We see that roughly 5.4 MB of RAM were consumed for the 100K cells.

For Vcells, we create a vector of 1M elements. This consumes roughly 3.8 MB of RAM for the 1M cells. R optimizes for integers.

The same test with floats consumes 7.6 MB of RAM for the 1M cells of floats.

How does R’s garbage collector use VSize and NSize?

We’ll discuss VSize, as NSize is analogous. The garbage collector recovers memory that is no longer in use, determining when to perform garbage collection and how much memory to recover. Looking at heap memory for Vcells, as depicted in the figure below, there are a few key points: Min_VSize, VSizeInUse, R VSize, and Max_VSize. The R VSize serves as the gc() trigger. The Min_VSize and Max_VSize are the specified lower and upper memory limits. Min_VSize is the minimal size for the vector heap as well as its initial value. From there, R grows or shrinks the vector heap depending on memory demands. However, it doesn’t exceed the Max_VSize limit nor go below the Min_VSize limit. In the figure, VSizeInUse reflects the memory currently used by R objects. R_VSize is how much memory can be requested without triggering gc(). As you would expect: Min_VSize <= R_VSize <= Max_VSize and VSizeInUse < R_VSize.


Limiting memory on the database server R engine

Oracle R Enterprise provides the SQL function sys.rqconfigset to set memory limits. Use of this function requires the sys privilege and the setting is applied only to embedded R engines. Consider the following examples:

sys.rqconfigset('MIN_VSIZE', '10M') -- min heap 10MB, default 32MB
sys.rqconfigset('MAX_VSIZE', '100M') -- max heap 100MB, default 4GB
sys.rqconfigset('MIN_NSIZE', '500K') -- min number cons cells 500x1024, default 1M
sys.rqconfigset('MAX_NSIZE', '2M') -- max number cons cells 2M, default 20M

Note that either numeric or string values can be provided to sys.rqconfigset. Default constants are defined as follows:


#define RQET_DEF_MINVSZ 33554432 /* RQER DEFault MIN_VSiZe 32Mb */
#define RQET_DEF_MAXVSZ 4294967296 /* RQER DEFault MAX_VSiZe 4Gb */
#define RQET_DEF_MINNSZ 1048576 /* RQER DEFault MIN_NSiZe 1M */
#define RQET_DEF_MAXNSZ 20971520 /* RQER DEFault MAX_NSiZe 20M */

Getting memory settings and usage through an embedded R engine

To obtain the current set of default values, you can invoke the following SQL statement when connected to the database server using the table sys.rq_config.

select name, value from sys.rq_config;

This can be done from within an embedded R function invocation using, for example:

getMemorySettings <- function() {
con <- dbConnect(Extproc())
rs <- dbSendQuery(con, "select name, value from sys.rq_config")
dat <- fetch(rs)
dat
}
ore.doEval(getMemorySettings,ore.connect=TRUE)

To obtain the current memory usage within an individual embedded R engine, instrumenting your embedded R function with gc() and returning the results of gc() will provide this insight:

getMemoryUse <- function() {
gc.dat <- gc()
list(pid=Sys.getpid(), gc.dat=gc.dat)
}
ore.doEval(getMemoryUse)

Note that the result from an embedded R call will also include the memory limits set for the R engines, shown below in column 6 “limit (Mb)” of the result. This occurs whenever memory limits are in place for an R engine. Here, we’ve included getting the process id of the R engine.

An example of computing memory limits

Consider you have an Exadata X2-2 that has 1152 GB RAM (~1.2 TB) and your DBA allocates you a maximum of 60 GB RAM for parallel R engines per Exadata node. If we set the degree of parallelism at 32, to enable 32 R engines to execute concurrently, this allows 1.875 GB RAM / R engine. If we allocate 2/3 of this for Vcells, we would allocate ~1.25 GB for the MAX_VSIZE. The remaining 1/3, or 625 MB, would translate into 11.6M cells for MAX_NSIZE.

60 GB allocated to R engines per Exadata node

DOP=32

60GB / 32 R engines = 1.875 GB / R Engine

~2/3 for Vcells = 1.25 GB; 1.25 GB / 8 Bytes/Cell = 156.25M Cells

~1/3 for Ncells = 625 MB; 625 MB / 54 Bytes/Cell = 11.6M Cells

sys.rqconfigset('MAX_VSIZE', '1250M')
sys.rqconfigset('MAX_NSIZE', '11600K')

While this example focuses on parallel execution, such as for ore.groupApply, ore.rowApply, and ore.indexApply (or SQL rqRowEval and “rqGroupEval”), the same type of analysis applies to non-parallel embedded R functions, like ore.doEval and ore.tableApply (or SQL rqEval and rqTableEval).

Consider an example that builds a randomForest model using the ore.doEval function. We can compute the amount of RAM consumed by the function by invoking gc() at the beginning and end of the function and subtracting the max used “(Mb)” columns as depicted here:

The result is that 1.4 MB were consumed for Ncells and 11.1 MB for Vcells. This can similarly be done for ore.indexApply to see the amount of RAM consumed by each embedded R function execution and to sum up the actual usage for each of the embedded R engines (assuming they run fully concurrently).

Generating such numbers on real data gives users a sense of how much memory embedded R jobs may require.

For DBAs

When configuring a database on Exadata for parallel R engines, consider the following options. In the following scenario, we contrast the scenarios when the execution time of any one given embedded R function is fast, e.g., 10s of seconds, and there are many such executions, versus few parallel R engines where the execution time is long with fewer such executions. Note that these must be considered in context of other Exadata uses:

· Set parallel_degree_policy to MANUAL. This allows ORE to choose when to apply parallelism, as opposed to setting it to AUTO which allows Oracle Database to decide.

· Set parallel_min_servers to the number of parallel slave processes to be started when the database instances start, e.g., 64, which is the number of parallel slave processes per Exadata node. This avoids incurring the time required to start these processes as needed to service R engines, and is particularly important when individual embedded R function execution time is short, e.g., 10s of seconds. If embedded R function execution time is long, the percentage of time for starting up the parallel slave will not dominate the overall execution time.

· Set parallel_max_servers to the maximum number of parallel slave processes that should be allowed per Exadata node, e.g., 128. This ensures that no more than parallel_max_servers will be active at one time, and in turn corresponds to the maximum number of R engines that can be active at one time.

· To avoid overloading the CPUs if the parallel_max_servers limit is reached, set the hidden parameter _parallel_statement_queuing to TRUE. This parameter is turned off by setting parallel_degree_policy to MANUAL. The _parallel_statement_queuing parameter allows for queuing of parallel requests when they exceed the parallel_server_target, which should be set to a value between parallel_min_servers and parallel_max_servers, e.g., 96. Once the parallel_server_target is reached, an embedded R execution will be allowed to execute in parallel using the remaining available parallel servers. If none are available, parallel requests will be queued. This ensures that parallel requests will be run in parallel, as opposed to being forced to serial execution, and be able to take advantage of parallel slaves as they become available. This can dramatically improve overall embedded R execution completion time. Note that parallel_max_servers cannot be changed during database operation, but the parallel_server_target can be to tune Exadata performance. Note that queuing effectively takes no CPU resources.

· To minimize RAC or cluster overhead for fast-executing individual embedded R functions, set parallel_force_local to TRUE to keep all parallel servers allocated and running on the same database server node. With this setting, starting an embedded R execution with DOP 32, all 32 R engines will run on the same Exadata node. A new embedded R execution also with DOP 32 may be started on a different node. If the embedded R functions are long-running, the setup time is propotionately small so spreading the IO over multiple nodes will not adversely impact overall performance. Having parallel slaves span multiple Exadata nodes results in communication / handshaking across nodes, which requires more resources. If the embedded R functions are fast, this overhead can adversely impact overall performance. When all parallel slaves are local, fewer resources are used.

· Where applicable, set application tables and their indexes to DOP 1 to reinforce the ability of ORE to determine when to use parallelism and not be overridden by table or index settings of DEFAULT or a specific degree of parallelism.

R memory management tip

There are many optimizations to make more efficient use of memory. To end this point, here is a tip to reduce memory consumptions significantly and avoid unnecessary replication of data.

If you know the size of your result in advance, pre-allocate the memory required, whether a vector, list, or matrix, as opposed to building up the result incrementally such as using cbind for adding columns to a matrix or data.frame. For example:

num.rows <- 1000
num.cols <- 2000
myFunction <- function(col) {col:(num.rows+col-1)} # produces vector of values
myMatrix <- matrix(NA, num.rows, num.cols) # pre-allocate required memory

for(col in 1:num.cols) {
myMatrix[,col] <- myFunction(col)
}

A note of thanks to Qin Wang and Martin Farber for their input on this blog post.

About

The place for best practices, tips, and tricks for applying Oracle R Enterprise, Oracle R Distribution, ROracle, and Oracle R Advanced Analytics for Hadoop in both traditional and Big Data environments.

Search

Archives
« May 2015
SunMonTueWedThuFriSat
     
1
2
3
4
5
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
      
Today