Friday Jan 03, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 2

In part 1 of Invoking R scripts via Oracle Database: Theme and Variation, we introduced features of Oracle R Enterprise embedded R execution, focusing on the functions ore.doEval and rqEval. In this blog post, we’ll cover the next in our theme and variation series involving ore.tableApply and rqTableEval.

The variation on embedded R execution for ore.tableApply involves passing an ore.frame to the function such that the first parameter of your embedded R function receives a data.frame. The rqTableEval function in SQL allows users to specify a data cursor to be delivered to your embedded R function as a data.frame.

Let’s look at a few examples.


R API

In the following example, we’re using ore.tableApply to build a Naïve Bayes model on the iris data set. Naïve Bayes is found in the e1071 package, which must be installed on both the client and database server machine R engines.

library(e1071)
mod <- ore.tableApply(
ore.push(iris),
function(dat) {
library(e1071)
dat$Species <- as.factor(dat$Species)
naiveBayes(Species ~ ., dat)
})
class(mod)
mod

A few points to highlight:
• To use the CRAN package e1071 on the client, we first load the library.
• The iris data set is pushed to the database to create an ore.frame as the first argument to ore.tableApply. This would normally refer to an ore.frame that refers to a table that exists in Oracle Database. If not obvious, note that we could have previously assigned dat <- ore.push(iris) and passed dat as the argument as well.
• The embedded R function is supplied as the second argument to ore.tableApply as a function object. Recall from Part 1 that we could have alternatively assigned this function to a variable and passed the variable as an argument, or stored the function in the R script repository and passed the argument FUN.NAME with the assigned function name.
• The user-defined embedded R function takes dat as its first argument which will contain a data.frame derived from the ore.frame supplied.
• The model itself is returned from the function.
• The result of the ore.tableApply execution will be an ore.object.

SQL API

We can invoke the function through the SQL API by storing the function in the R script repository. Recall that the call to sys.rqScriptCreate must be wrapped in a BEGIN-END PL/SQL block.


begin
sys.rqScriptCreate('myNaiveBayesModel',
'function(dat) {
library(e1071)
dat$Species <- as.factor(dat$Species)
naiveBayes(Species ~ ., dat)
}');
end;
/

Invoking the function myNaiveBayesModel occurs in a SQL SELECT statement as shown below. The first argument to rqTableEval specifies a cursor that retrieves the IRIS table. Note that the IRIS table could have been created earlier using ore.create(iris,"IRIS"). The second argument, NULL, indicates that no arguments are supplied to the function.

The function returns an R object of type naiveBayes, but as a serialized object that is chunked into a table. This likely is not useful to most users.


select *
from table(rqTableEval(cursor(select * from IRIS), NULL, NULL, 'myNaiveBayesModel'));

If we want to keep the model in a more usable form, we can store it in an ORE datastore in Oracle Database. For this, we require a change to the user-defined R function and the SQL invocation.


begin
sys.rqScriptCreate('myNaiveBayesModel',
'function(dat) {
library(e1071)
dat$Species <- as.factor(dat$Species)
mod <- naiveBayes(Species ~ ., dat)
ore.save(mod, name="myNaiveBayesDatastore")
TRUE

}');
end;
/
select *
from table(rqTableEval(cursor(select * from IRIS), cursor(select 1 as "ore.connect" from dual), 'XML', 'myNaiveBayesModel'));

Highlighted in red, we’ve stored the model in the datastore named ‘myNaiveBayesDatastore’. We’ve also returned TRUE to have a simple value that can show up as the result of the function execution. In the SQL query, we changed the third parameter to ‘XML’ to return an XML string containing “TRUE”. The name of the datastore could be passed as an argument as follows:


begin
sys.rqScriptCreate('myNaiveBayesModel',
'function(dat, datastoreName) {
library(e1071)
dat$Species <- as.factor(dat$Species)
mod <- naiveBayes(Species ~ ., dat)
ore.save(mod, name=datastoreName)
TRUE
}');
end;
/
select *
from table(rqTableEval(
cursor(select * from IRIS),
cursor(select 'myNaiveBayesDatastore' "datastoreName", 1 as "ore.connect" from dual),
'XML',
'myNaiveBayesModel'));

Memory considerations with ore.tableApply and rqTableEval

The input data provided as the first argument to a user-defined R function invoked using ore.tableApply or rqTableEval is physically being moved from Oracle Database to the database server R engine. It’s important to realize that R’s memory limitations still apply. If your database server machine has 32 GB RAM and your data table is 64 GB, ORE will not be able to load the data into the R function’s dat argument.
You may see errors like:


Error : vector memory exhausted (limit reached)

or

ORA-28579: network error during callback from external procedure agent

See the blog post on Managing Memory Limits and Configuring Exadata for Embedded R Execution where we discuss setting memory limits for the database server R engine. This can be necessary to load reasonably sized data tables.

Parallelism

As with ore.doEval / rqEval, user-defined R functions invoked using ore.tableApply / rqTableEval are not executed in parallel, i.e., a single R engine is used to execute the user-defined R function.

Invoking certain ORE advanced analytics functions

In the current ORE release, some advanced analytics functions, like ore.lm or ore.glm, which use the embedded R execution framework, cannot be used within other embedded R calls such as ore.doEval / rqEval and ore.tableApply / rqTableEval.

You can expect to see an error like the following:


ORA-28580: recursive external procedures are not supported

In the next post in this series, I’ll discuss ore.groupApply and the corresponding definitions required for SQL execution, since there is no rqGroupApply function. I’ll also cover the relationship of various “group apply” constructs to map-reduce paradigm.

ORAAH - Enabling high performance R workloads on Hadoop


One of the features of Oracle R Advanced Analytics for Hadoop (ORAAH) is enabling Hadoop jobs written in the R language. R is a popular open-source language and environment for statistical computing and graphics. ORAAH enables R programmers to leverage a Hadoop cluster operating on data resident in HDFS files.

In this blog post, we examine the performance characteristics of ORAAH with an example and explain what makes ORAAH the fastest alternative available to run Hadoop-R jobs. We also compare the results with another popular Hadoop interface for R, rmr.

Credit to Vlad Sharanhovich and Anand Srinivasan for providing the content for this blog post.

In probability theory and statistics, covariance is a measure of how much two variables change together.

Variables that tend to show similar behavior exhibit positive covariance. Alternatively if the greater values of one variable correspond with the smaller values of another then the covariance between the variables is negative. We use covariance computation as the running example below that you can use to reproduce the results detailed here.

The tests were performed on a 6-node cluster running ORAAH version 2.3.1.


Cluster configuration:


  • 6 node cluster

  • BDA v2.3.1 (based on CDH 4.4)

  • 4 tasktrackers

  • CPU: Intel ® Xeon® CPU X5675  @ 3.07GHz

  • RAM: 47GB


ORAAH comes with a convenience function for data generation. We use this function to generate a 100 GB HDFS input file with numeric values for 200 variables (columns), as follows:


# Generate 100GB input dataset (using pre-release 2.4.0)
inputCsv <- orch.datagen(1e+11, numeric.col.count=200, parts=100)
# 15 mins, 53 sec
# 200 mappers

ORAAH supports 2 types of HDFS input: delimited text files and a binary RDATA representation (R's own binary representation). In many cases, RDATA representation provides much better I/O throughput compared to delimited text files.


# Converting into ORAAH native format
inputRdata <- hdfs.toRData(inputCsv, out.name="100G_200n_rd")
# 4 mins, 54 sec
# 400 mappers

Next, we write the mapper and reducer code for parallel/distributed covariance computation. Below, the mapper function accepts a data.frame representation of the input data and generates structured output with 3 components: a matrix, a vector of column sums, and input row count. The single reducer function merges the structured output generated from the mappers to produce the final covariance matrix.

The mapper and reducer functions are supplied as input to ORAAH's hadoop.run() function. This function additionally takes an HDFS file as input in the dataargument. The structured output from the mapper and reducer is defined in mapred.config data structure. Optionally, the Hadoop job can be given a name (in this case "cov") for traceability.

ORCH_cov
<- function(x) {
hadoop.run(
data = x,
mapper = function(k, v) {
m <- as.matrix(v)
cs <- colSums(m)
nr <- nrow(m)
mtm <- t(m) %*% m
l <- list(mat=mtm, colsum=cs, nrow=nr)
orch.keyval(NULL, orch.pack(l)),
reducer = function(k, v) {
mapres <- orch.unpack(v$val, as.list=T)
xy <- Reduce("+", lapply(mapres,function(x) x$mat))
csf <- Reduce("+", lapply(mapres,function(x) x$colsum))
nrf <- Reduce("+", lapply(mapres,function(x) x$nrow))
sts <- csf %*% t(csf)
m1 <- xy / (nrf -1)
m2 <- sts / (nrf * (nrf-1))
m3 <- 2 * sts / (nrf * (nrf-1))
covmat <- m1 + m2 - m3
orch.keyval(NULL, orch.pack(covmat))
},
config = new("mapred.config",
map.output = data.frame(key="none",
val="c"),
reduce.output = data.frame(key="none", val="c")),
job.name = "cov")
}

The execution times are shown below:


# Using text input
ORCH_cov(inputCsv)
# 7 mins, 19 sec
# 400 mappers / 45 reducers



# Using binary input
ORCH_cov(inputRdata)
# 4 mins, 18 sec
# 400 mappers / 45 reducers

A few points to observe:
1. Before an HDFS file can be used with ORAAH's hadoop.run() function, its metadata must be known. ORAAH automatically determines the data types of the columns in the HDFS file by sampling rows. The metadata is created during the hdfs.attach() call. This metadata enables ORAAH to generate highly optimized scan routines to read rows from the file.
2. ORAAH implements caching of input and output structures from mappers and reducers thereby lifting the burden on dealing with large data volumes from the R engine.
3. ORAAH's orch.pack() and orch.unpack() functions enable transfer of structured constructs between mappers and reducers, which further improves I/O throughput by eliminating the need to scan/parse string inputs.
4. ORAAH leverages R's own RDATA representation as the binary representation. The key to better execution performance of R jobs on Hadoop is managing I/O throughput and carefully bypassing R's inherent limitations with parsing strings.

We contrast ORAAH's performance by comparing it with an open source package called rmr (https://github.com/RevolutionAnalytics/RHadoop/wiki/rmr) repeating the covariance calculation on the same cluster and input data set. For this experiment, we used rmr version 2.3.0.

The covariance computation is written in rmr as shown below.


RMR2_cov <- function(x, input.format) {
mapreduce(
x,
input.format = input.format,
map = function(k, v) {
m <- as.matrix(v)
cs <- colSums(m)
nr <- nrow(m)
mtm <- t(m) %*% m
l <- list(mat=mtm, colsum=cs, nrow=nr)
keyval(1, paste0(deparse(l),
collapse="\n"))
},
reduce = function(k, v) {
mapres <- lapply(v, function(x)
eval(parse(text=x)))
xy <- Reduce("+", lapply(mapres, function(x) x$mat))
csf <- Reduce("+", lapply(mapres, function(x) x$colsum))
nrf <- Reduce("+", lapply(mapres, function(x) x$nrow))
sts <- csf %*% t(csf)
m1 <- xy / (nrf -1)
m2 <- sts / (nrf * (nrf-1))
m3 <- 2 * sts / (nrf * (nrf-1))
covmat <- m1 + m2 - m3
keyval(1, paste0(deparse(covmat),
collapse="\n"))
}
)
}

Notice that the code is somewhat similar to the version used with ORAAH with 3 key differences:
   i) Mapper output is serialized as a string
 ii) Reducer, thus, is forced to parse input strings
iii) Reducer output is once again a string, requiring client to reconstruct the covariance matrix

Not only is passing strings limiting from the R programmer's perspective, but also has a negative effect on I/O throughput. rmr also supports a proprietary binary representation of delimited text data.

Below we repeat the tests with both delimite and binary representations. Not only is the conversion to the binary representation more expensive, the resulting I/O throughput is not substantially better.


# Convert to RMR2 native format
inputRMR <- mapreduce(inputCsv, 
  input.format = make.input.format("csv",sep=","),
  map = function(k, v) keyval(NULL, v)
)
# 20 mins, 17 sec
# 400 mappers
#
# Using text input
RMR2_cov(inputCsv, make.input.format("csv",sep=","))
# 32 mins, 14 sec
# 400 mappers / 45 reducers
#
# Using binary input
RMR2_cov(inputRMR, "native")
# 17 mins, 18 sec
# 400 mappers / 45 reducers

To summarize ORAAH is 4x faster than rmr out of the box for a simple covariance calculation.





























 Text Input
 Binary Input
 Text to Binary Conversion
 ORAAH  7 min, 19 sec
 4 min, 18 sec
 4 min 54 sec
 rmr  32 min, 14 sec
 17 min, 18 sec
 20 min 17 sec

 4.4x faster
 4x faster
 4.14x faster

About

The place for best practices, tips, and tricks for applying Oracle R Enterprise, Oracle R Distribution, ROracle, and Oracle R Advanced Analytics for Hadoop in both traditional and Big Data environments.

Search

Archives
« January 2014 »
SunMonTueWedThuFriSat
   
1
4
6
7
8
10
11
12
13
14
15
16
17
18
19
21
22
23
24
25
26
27
28
29
30
31
 
       
Today