Friday Jan 03, 2014

ORAAH - Enabling high performance R workloads on Hadoop


One of the features of Oracle R Advanced Analytics for Hadoop (ORAAH) is enabling Hadoop jobs written in the R language. R is a popular open-source language and environment for statistical computing and graphics. ORAAH enables R programmers to leverage a Hadoop cluster operating on data resident in HDFS files.

In this blog post, we examine the performance characteristics of ORAAH with an example and explain what makes ORAAH the fastest alternative available to run Hadoop-R jobs. We also compare the results with another popular Hadoop interface for R, rmr.

Credit to Vlad Sharanhovich and Anand Srinivasan for providing the content for this blog post.

In probability theory and statistics, covariance is a measure of how much two variables change together.

Variables that tend to show similar behavior exhibit positive covariance. Alternatively if the greater values of one variable correspond with the smaller values of another then the covariance between the variables is negative. We use covariance computation as the running example below that you can use to reproduce the results detailed here.

The tests were performed on a 6-node cluster running ORAAH version 2.3.1.


Cluster configuration:


  • 6 node cluster

  • BDA v2.3.1 (based on CDH 4.4)

  • 4 tasktrackers

  • CPU: Intel ® Xeon® CPU X5675  @ 3.07GHz

  • RAM: 47GB


ORAAH comes with a convenience function for data generation. We use this function to generate a 100 GB HDFS input file with numeric values for 200 variables (columns), as follows:


# Generate 100GB input dataset (using pre-release 2.4.0)
inputCsv <- orch.datagen(1e+11, numeric.col.count=200, parts=100)
# 15 mins, 53 sec
# 200 mappers

ORAAH supports 2 types of HDFS input: delimited text files and a binary RDATA representation (R's own binary representation). In many cases, RDATA representation provides much better I/O throughput compared to delimited text files.


# Converting into ORAAH native format
inputRdata <- hdfs.toRData(inputCsv, out.name="100G_200n_rd")
# 4 mins, 54 sec
# 400 mappers

Next, we write the mapper and reducer code for parallel/distributed covariance computation. Below, the mapper function accepts a data.frame representation of the input data and generates structured output with 3 components: a matrix, a vector of column sums, and input row count. The single reducer function merges the structured output generated from the mappers to produce the final covariance matrix.

The mapper and reducer functions are supplied as input to ORAAH's hadoop.run() function. This function additionally takes an HDFS file as input in the dataargument. The structured output from the mapper and reducer is defined in mapred.config data structure. Optionally, the Hadoop job can be given a name (in this case "cov") for traceability.

ORCH_cov
<- function(x) {
hadoop.run(
data = x,
mapper = function(k, v) {
m <- as.matrix(v)
cs <- colSums(m)
nr <- nrow(m)
mtm <- t(m) %*% m
l <- list(mat=mtm, colsum=cs, nrow=nr)
orch.keyval(NULL, orch.pack(l)),
reducer = function(k, v) {
mapres <- orch.unpack(v$val, as.list=T)
xy <- Reduce("+", lapply(mapres,function(x) x$mat))
csf <- Reduce("+", lapply(mapres,function(x) x$colsum))
nrf <- Reduce("+", lapply(mapres,function(x) x$nrow))
sts <- csf %*% t(csf)
m1 <- xy / (nrf -1)
m2 <- sts / (nrf * (nrf-1))
m3 <- 2 * sts / (nrf * (nrf-1))
covmat <- m1 + m2 - m3
orch.keyval(NULL, orch.pack(covmat))
},
config = new("mapred.config",
map.output = data.frame(key="none",
val="c"),
reduce.output = data.frame(key="none", val="c")),
job.name = "cov")
}

The execution times are shown below:


# Using text input
ORCH_cov(inputCsv)
# 7 mins, 19 sec
# 400 mappers / 45 reducers



# Using binary input
ORCH_cov(inputRdata)
# 4 mins, 18 sec
# 400 mappers / 45 reducers

A few points to observe:
1. Before an HDFS file can be used with ORAAH's hadoop.run() function, its metadata must be known. ORAAH automatically determines the data types of the columns in the HDFS file by sampling rows. The metadata is created during the hdfs.attach() call. This metadata enables ORAAH to generate highly optimized scan routines to read rows from the file.
2. ORAAH implements caching of input and output structures from mappers and reducers thereby lifting the burden on dealing with large data volumes from the R engine.
3. ORAAH's orch.pack() and orch.unpack() functions enable transfer of structured constructs between mappers and reducers, which further improves I/O throughput by eliminating the need to scan/parse string inputs.
4. ORAAH leverages R's own RDATA representation as the binary representation. The key to better execution performance of R jobs on Hadoop is managing I/O throughput and carefully bypassing R's inherent limitations with parsing strings.

We contrast ORAAH's performance by comparing it with an open source package called rmr (https://github.com/RevolutionAnalytics/RHadoop/wiki/rmr) repeating the covariance calculation on the same cluster and input data set. For this experiment, we used rmr version 2.3.0.

The covariance computation is written in rmr as shown below.


RMR2_cov <- function(x, input.format) {
mapreduce(
x,
input.format = input.format,
map = function(k, v) {
m <- as.matrix(v)
cs <- colSums(m)
nr <- nrow(m)
mtm <- t(m) %*% m
l <- list(mat=mtm, colsum=cs, nrow=nr)
keyval(1, paste0(deparse(l),
collapse="\n"))
},
reduce = function(k, v) {
mapres <- lapply(v, function(x)
eval(parse(text=x)))
xy <- Reduce("+", lapply(mapres, function(x) x$mat))
csf <- Reduce("+", lapply(mapres, function(x) x$colsum))
nrf <- Reduce("+", lapply(mapres, function(x) x$nrow))
sts <- csf %*% t(csf)
m1 <- xy / (nrf -1)
m2 <- sts / (nrf * (nrf-1))
m3 <- 2 * sts / (nrf * (nrf-1))
covmat <- m1 + m2 - m3
keyval(1, paste0(deparse(covmat),
collapse="\n"))
}
)
}

Notice that the code is somewhat similar to the version used with ORAAH with 3 key differences:
   i) Mapper output is serialized as a string
 ii) Reducer, thus, is forced to parse input strings
iii) Reducer output is once again a string, requiring client to reconstruct the covariance matrix

Not only is passing strings limiting from the R programmer's perspective, but also has a negative effect on I/O throughput. rmr also supports a proprietary binary representation of delimited text data.

Below we repeat the tests with both delimite and binary representations. Not only is the conversion to the binary representation more expensive, the resulting I/O throughput is not substantially better.


# Convert to RMR2 native format
inputRMR <- mapreduce(inputCsv, 
  input.format = make.input.format("csv",sep=","),
  map = function(k, v) keyval(NULL, v)
)
# 20 mins, 17 sec
# 400 mappers
#
# Using text input
RMR2_cov(inputCsv, make.input.format("csv",sep=","))
# 32 mins, 14 sec
# 400 mappers / 45 reducers
#
# Using binary input
RMR2_cov(inputRMR, "native")
# 17 mins, 18 sec
# 400 mappers / 45 reducers

To summarize ORAAH is 4x faster than rmr out of the box for a simple covariance calculation.





























 Text Input
 Binary Input
 Text to Binary Conversion
 ORAAH  7 min, 19 sec
 4 min, 18 sec
 4 min 54 sec
 rmr  32 min, 14 sec
 17 min, 18 sec
 20 min 17 sec

 4.4x faster
 4x faster
 4.14x faster

Wednesday Feb 06, 2013

Oracle R Enterprise 1.3 gives predictive analytics an in-database performance boost

Recently released Oracle R Enterprise 1.3 adds packages to R that enable even more in-database analytics. These packages provide horizontal, commonly used techniques that are blazingly fast in-database for large data. With Oracle R Enterprise 1.3, Oracle makes R even better and usable in enterprise settings. (You can download ORE 1.3 here and documentation here.)

When it comes to predictive analytics, scoring (predicting outcomes using a data mining model) is often a time critical operation. Scoring can be done online (real-time), e.g., while a customer is browsing a webpage or using a mobile app, where on-the-spot recommendations can be made based on current actions. Scoring can also be done offline (batch), e.g., predict which of your 100 million customers will respond to each of a dozen offers, e.g., where applications leverage results to identify which customers should be targeted with a particular ad campaign or special offer.

In this blog post, we explore where using Oracle R Enterprise pays huge dividends. When working with small data, R can be sufficient, even when pulling data from a database. However, depending on the algorithm, benefits of in-database computation can be seen in a few thousand rows. The time difference with 10s of thousands of rows makes an interactive session more interactive, whereas 100s of thousands of rows becomes a real productivity gain, and on millions (or billions) of rows, becomes a competitive advantage! In addition to performance benefits, ORE integrates R into the database enabling you to leave data in place.

We’ll look at a few proof points across Oracle R Enterprise features, including:

  • OREdm – a new package that provides R access to several in-database Oracle Data Mining algorithms (Attribute Importance, Decision Tree, Generalized Linear Models, K-Means, Naïve Bayes, Support Vector Machine).
  • OREpredict – a new package that enables scoring models built using select standard R algorithms in the database (glm, negbin, hclust, kmeans, lm, multinom, nnet, rpart).
  • Embedded R Execution – an ORE feature that allows running R under database control and boosts real performance of CRAN predictive analytics packages by providing faster access to data than occurs between the database and client, as well as leveraging a more powerful database machine with greater RAM and CPU resources.

OREdm

Pulling data out of a database for any analytical tool impedes interactive data analysis due to access latency, either directly when pulling data out of the database or indirectly via an IT process that involves requesting data to be staged in flat files. Such latencies can quickly become intolerable. On the R front, you’ll also need to consider whether the data will fit in memory. If flat files are involved, consideration needs to be given to how files will be stored, backed up, and secured.

Of course, model building and data scoring execution time is only part of the story. Consider a scenario A, the “build combined script,” where data is extracted from the database, and an R model built and persisted for later use. In the corresponding scenario B, the “score combined script”, data is pulled from the database, a previously built model loaded, data scored, and the scores written to the database. This is a typical scenario for use in, e.g., enterprise dashboards or within an application supporting campaign management or next-best-offer generation. In-database execution provides significant performance benefits, even for relatively small data sets as included below. Readers should be able to reproduce such results at these scales. We’ve also included a Big Data example by replicating the 123.5 million row ONTIME data set to 1 billion rows. Consider the following examples:

Linear Models: We compared R lm and ORE ore.lm in-database algorithm on the combined scripts. On datasets ranging from 500K to 1.5M rows with 3-predictors, in-database analytics showed an average 2x-3x performance improvement for build, and nearly 4x performance improvement for scoring. Notice in Figure 1 that the trend is significantly less for ore.lm than lm, indicating greater scalability for ore.lm.

Figure 1. Overall lm and ore.lm execution time for model building (A) and data scoring (B)

Figure 2 provides a more detailed view comparing data pull and model build time for build detail, followed by data pull, data scoring, and score writing for score detail. For model building, notice that while data pull is a significant part of lm’s total build time, the actual build time is still greater than ore.lm. A similar statement can be made in the case of scoring.

Figure 2. Execution time components for lm and ore.lm (excluding model write and load)

Naïve Bayes from the e1071 package: On 20-predictor datasets ranging from 50k to 150k rows, in-database ore.odmNB improved data scoring performance by a factor of 118x to 418x, while the full scenario B execution time yielded a 13x performance improvement, as depicted in Figure 3B. Using a non-parallel execution of ore.odmNB, we see the cross-over point where ore.odmNB overtakes R, but more importantly, the slope of the trend points to the greater scalability of ORE, as depicted in Figure 3A for the full scenario A execution time.


Figure 3. Overall naiveBayes and ore.odmNB execution time for model building (A) and data scoring (B)

K-Means clustering: Using 6 numeric columns from the ONTIME airline data set ranging from 1 million to 1 billion rows, we compare in-database ore.odmKMeans with R kmeans through embedded R execution with ore.tableApply. At 100 million rows, ore.odmKMeans demonstrates better performance than kmeans , and scalability at 1 billion rows. The performance results depicted in Figure 4 uses a log-log plot. The legend shows the function invoked and corresponding parameters, using subset of ONTIME data set d. While ore.odmKMeans scales linearly with number of rows, R kmeans does not. Further, R kmeans did not complete at 1 billion rows.

Figure 4: K-Means clustering model building on Big Data

OREpredict

With OREpredict, R users can also benefit from in-database scoring of R models. This becomes evident not only when considering the full “round trip” of pulling data from the database, scoring in R, and writing data back to the database, but also for the scoring itself.

Consider an lm model built using a dataset with 4-predictors and 1 million to 5 million rows. Pulling data from the database, scoring, and writing the results back to the database shows a pure R-based approach taking 4x - 9x longer than in-database scoring using ore.predict with that same R model. Notice in Figure 5 that the slope of the trend is dramatically less for ore.predict than predict, indicating greater scalability. When considering the scoring time only, ore.predict was 20x faster than predict in R for 5M rows. In ORE 1.3, ore.predict is recommended and will provide speedup over R for numeric predictors.

Figure 5. Overall lm execution time using R predict vs. ORE ore.predict

For rpart, we see a similar result. On a 20-predictor, 1 million to 5 million row data set, ore.predict resulted in a 6x – 7x faster execution. In Figure 5, we again see that the slope of the trend is dramatically less for ore.predict than predict, indicating greater scalability. When considering the scoring time only, ore.predict was 123x faster than predict in R for 5 million rows.

Figure 6. Overall rpart Execution Time using R predict vs. ORE ore.predict

This scenario is summarized in Figure 7. In the client R engine, we have the ORE packages installed. There, we invoke the pure R-based script, which requires pulling data from the database. We also invoke the ORE-based script that keeps the data in the database.

Figure 7. Summary of OREpredict performance gains

To use a real world data set, we again consider the ONTIME airline data set with 123.5 million rows. We will build lm models with varying number of coefficients derived by converting categorical data to multiple columns. The variable p corresponds to the number of coefficients resulting from the transformed formula and is dependent on the number of distinct values in the column. For example, DAYOFWEEK has 7 values, so with DEPDELAY, p=9. In Figure 8, you see that using an lm model with embedded R for a single row (e.g., one-off or real-time scoring), has much more overhead (as expected given that an R engine is being started) compared to ore.predict, which shows subsecond response time through 40 coefficients at 0.54 seconds, and the 106 coefficients at 1.1 seconds. Here are the formulas describing the columns included in the analysis:

  • ARRDELY ~ DEPDELAY (p=2)
  • ARRDELY ~ DEPDELAY + DAYOFWEEK (p=8)
  • ARRDELY ~ DEPDELAY + DAYOFWEEK + MONTH (p=19)
  • ARRDELY ~ DEPDELAY + DAYOFWEEK + MONTH + YEAR (p=40)
  • ARRDELY ~ DEPDELAY + DAYOFWEEK + MONTH + YEAR (p=106)


Figure 8. Comparing performance of ore.predict with Embedded R Execution for lm

Compare this with scoring the entire ONTIME table of 123.5 million rows. We see that ore.predict outperforms embedded R until about 80 coefficients, when embedded R becomes the preferred choice. 

Data Movement between R and Database: Embedded R Execution

One advantage of R is its community and CRAN packages. The goal for Oracle R Enterprise with CRAN packages is to enable reuse of these packages while:

  • Leveraging the parallelization and efficient data processing capabilities of Oracle Database
  • Minimizing data transfer and communication overhead between R and the database
  • Leveraging R as a programming language for writing custom analytics

There are three ways in which we’ll explore the performance of pulling data.

1) Using ore.pull at a separate client R engine to pull data from the database

2) Using Embedded R Execution and ore.pull within an embedded R script from a database-spawned R engine

3) Using Embedded R Execution functions for data-parallelism and task-parallelism to pass database data to the embedded R script via function parameter

With ORE Embedded R Execution (ERE), the database delivers data-parallelism and task-parallelism, and reduces data access latency due to optimized data transfers into R. Essentially, R runs under the control of the database. As illustrated in Figure 9, loading data at the database server is 12x faster than loading data from the database to a separate R client. Embedded R Execution also provides a 13x advantage when using ore.pull invoked at the database server within an R closure (function) compared with a separate R client. The data load from database to R client is depicted as 1x – the baseline for comparison with embedded R execution data loading.

Figure 9. Summary of Embedded R Execution data load performance gains

Data transfer rates are displayed in Figure 10, for a table with 11 columns and 5 million to 15 million rows of data. Loading data via ORE embedded R execution using server-side ore.pull or through the framework with, e.g., ore.tableApply (one of the embedded R execution functions) is dramatically faster than a non-local client load via ore.pull. The numbers shown reflect MB/sec data transfer rates, so a bigger bar is better!

Figure 10. Data load and write execution time with 11 columns

While this is impressive, let’s expand our data up to 1 billion rows. To create our 1 billion row data set (1.112 billion rows), we duplicated the 123.5 million row ONTIME dataset 9 times, replacing rows with year 1987 with years 2010 through 2033, and selecting 6 integer columns (YEAR, MONTH, DAYOFMONTH, ARRDELAY, DEPDELAY, DISTANCE) with bitmap index of columns (YEAR, MONTH, DAYOFMONTH). The full data set weighs in at ~53 GB.

In Figure 11, we see linear scalability for loading data into the client R engine. Times range from 2.8 seconds for 1 million rows, to 2700 seconds for 1 billion rows. While your typical user may not need to load 1 billion rows into R memory, this graph demonstrates the feasibility to do so.

Figure 11. Client Load of Data via ore.pull for Big Data

In Figure12, we look at how degree of parallelism (DOP) affects data load times involving ore.rowApply. This test addresses the question of how fast ORE can load 1 billion, e.g., when scoring data. The degree of parallelism corresponds to the number of R engines that are spawned for concurrent execution at the database server. The number of chunks the data is divided into is 1 for a single degree of parallelism, and 10 times the DOP for the remaining tests. For DOP of 160, the data was divided into 1600 chunks, i.e., 160 R engines were spawned, each processing 10 chunks. The graph on the left depicts that execution times improve for the 1 billion row data set through DOP of 160. As expected, at some point, the overhead of spawning additional R engines and partitioning the data outweighs the benefit. At its best time, processing 1 billion rows took 43 seconds.

Figure 12. Client Load of Data via ore.pull for Big Data

In the second graph of Figure 12, we contrast execution time for the “sweet spot” identified in the previous graph with varying number of rows. Using this DOP of 160, with 1600 chunks of data, we see that through 100 million rows, there is very little increase in execution time (between 6.4 and 8.5 seconds in actual time). While 1 billion rows took significantly more, it took only 43 seconds.

We can also consider data write at this scale. In Figure 13, we also depict linear scalability from 1 million through 1 billion rows using the ore.create function to creating database tables from R data. Actual times ranged from 2.6 seconds to roughly 2600 seconds.

Figure 13. Data Write using ore.create for Big Data

ORE supports data-parallelism to enable, e.g., building predictive models in parallel on partitions of the data. Consider a marketing firm that micro-segments customers and builds predictive models on each segment. ORE embedded R execution automatically partitions the data, spawns R engines according to the degree of parallelism specified, and executes the specified user R function. To address how efficiently ore.groupApply can process data, Figure 14 shows the total execution time to process the 123.5M rows from the ONTIME data with varying number of columns. The figure shows that ore.groupApply scales linearly as the number of columns increases. Three columns were selected based on their number of distinct values: TAILNUM 12861, DEST 352, and UNIQUECARRIER 29. For UNIQUECARRIER, all columns (total of 29 columns) could not be completed since 29 categories resulted in data too large for a single R engine.

Figure 14. Processing time for 123.5M rows via ore.groupApply

ORE also supports row-parallelism, where the same embedded R function can be invoked on chunks of rows. As with ore.groupApply, depending on the specified degree of parallelism, a different chunk of rows will be submitted to a dynamically spawned database server-side R engine. Figure 15 depicts a near linear execution time to process the 123.5M rows from ONTIME with varying number of columns. The chunk size can be specified, however, testing 3 chunk sizes (10k, 50k, and 100k rows) showed no significant difference in overall execution time, hence a single line is graphed.

Figure 15. Processing time for 123.5M rows via ore.rowApply for chunk sizes 10k-100k

All tests were performed on an Exadata X3-8. Except as noted, the client R session and database were actually on the same machine, so network latency for data read and write were minimum. Over a LAN or WAN, the benefits of in-database execution and ORE will be even more dramatic.

About

The place for best practices, tips, and tricks for applying Oracle R Enterprise, Oracle R Distribution, ROracle, and Oracle R Advanced Analytics for Hadoop in both traditional and Big Data environments.

Search

Archives
« April 2014
SunMonTueWedThuFriSat
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today