Friday Jan 03, 2014

ORAAH - Enabling high performance R workloads on Hadoop


One of the features of Oracle R Advanced Analytics for Hadoop (ORAAH) is enabling Hadoop jobs written in the R language. R is a popular open-source language and environment for statistical computing and graphics. ORAAH enables R programmers to leverage a Hadoop cluster operating on data resident in HDFS files.

In this blog post, we examine the performance characteristics of ORAAH with an example and explain what makes ORAAH the fastest alternative available to run Hadoop-R jobs. We also compare the results with another popular Hadoop interface for R, rmr.

Credit to Vlad Sharanhovich and Anand Srinivasan for providing the content for this blog post.

In probability theory and statistics, covariance is a measure of how much two variables change together.

Variables that tend to show similar behavior exhibit positive covariance. Alternatively if the greater values of one variable correspond with the smaller values of another then the covariance between the variables is negative. We use covariance computation as the running example below that you can use to reproduce the results detailed here.

The tests were performed on a 6-node cluster running ORAAH version 2.3.1.


Cluster configuration:


  • 6 node cluster

  • BDA v2.3.1 (based on CDH 4.4)

  • 4 tasktrackers

  • CPU: Intel ® Xeon® CPU X5675  @ 3.07GHz

  • RAM: 47GB


ORAAH comes with a convenience function for data generation. We use this function to generate a 100 GB HDFS input file with numeric values for 200 variables (columns), as follows:


# Generate 100GB input dataset (using pre-release 2.4.0)
inputCsv <- orch.datagen(1e+11, numeric.col.count=200, parts=100)
# 15 mins, 53 sec
# 200 mappers

ORAAH supports 2 types of HDFS input: delimited text files and a binary RDATA representation (R's own binary representation). In many cases, RDATA representation provides much better I/O throughput compared to delimited text files.


# Converting into ORAAH native format
inputRdata <- hdfs.toRData(inputCsv, out.name="100G_200n_rd")
# 4 mins, 54 sec
# 400 mappers

Next, we write the mapper and reducer code for parallel/distributed covariance computation. Below, the mapper function accepts a data.frame representation of the input data and generates structured output with 3 components: a matrix, a vector of column sums, and input row count. The single reducer function merges the structured output generated from the mappers to produce the final covariance matrix.

The mapper and reducer functions are supplied as input to ORAAH's hadoop.run() function. This function additionally takes an HDFS file as input in the dataargument. The structured output from the mapper and reducer is defined in mapred.config data structure. Optionally, the Hadoop job can be given a name (in this case "cov") for traceability.

ORCH_cov
<- function(x) {
hadoop.run(
data = x,
mapper = function(k, v) {
m <- as.matrix(v)
cs <- colSums(m)
nr <- nrow(m)
mtm <- t(m) %*% m
l <- list(mat=mtm, colsum=cs, nrow=nr)
orch.keyval(NULL, orch.pack(l)),
reducer = function(k, v) {
mapres <- orch.unpack(v$val, as.list=T)
xy <- Reduce("+", lapply(mapres,function(x) x$mat))
csf <- Reduce("+", lapply(mapres,function(x) x$colsum))
nrf <- Reduce("+", lapply(mapres,function(x) x$nrow))
sts <- csf %*% t(csf)
m1 <- xy / (nrf -1)
m2 <- sts / (nrf * (nrf-1))
m3 <- 2 * sts / (nrf * (nrf-1))
covmat <- m1 + m2 - m3
orch.keyval(NULL, orch.pack(covmat))
},
config = new("mapred.config",
map.output = data.frame(key="none",
val="c"),
reduce.output = data.frame(key="none", val="c")),
job.name = "cov")
}

The execution times are shown below:


# Using text input
ORCH_cov(inputCsv)
# 7 mins, 19 sec
# 400 mappers / 45 reducers



# Using binary input
ORCH_cov(inputRdata)
# 4 mins, 18 sec
# 400 mappers / 45 reducers

A few points to observe:
1. Before an HDFS file can be used with ORAAH's hadoop.run() function, its metadata must be known. ORAAH automatically determines the data types of the columns in the HDFS file by sampling rows. The metadata is created during the hdfs.attach() call. This metadata enables ORAAH to generate highly optimized scan routines to read rows from the file.
2. ORAAH implements caching of input and output structures from mappers and reducers thereby lifting the burden on dealing with large data volumes from the R engine.
3. ORAAH's orch.pack() and orch.unpack() functions enable transfer of structured constructs between mappers and reducers, which further improves I/O throughput by eliminating the need to scan/parse string inputs.
4. ORAAH leverages R's own RDATA representation as the binary representation. The key to better execution performance of R jobs on Hadoop is managing I/O throughput and carefully bypassing R's inherent limitations with parsing strings.

We contrast ORAAH's performance by comparing it with an open source package called rmr (https://github.com/RevolutionAnalytics/RHadoop/wiki/rmr) repeating the covariance calculation on the same cluster and input data set. For this experiment, we used rmr version 2.3.0.

The covariance computation is written in rmr as shown below.


RMR2_cov <- function(x, input.format) {
mapreduce(
x,
input.format = input.format,
map = function(k, v) {
m <- as.matrix(v)
cs <- colSums(m)
nr <- nrow(m)
mtm <- t(m) %*% m
l <- list(mat=mtm, colsum=cs, nrow=nr)
keyval(1, paste0(deparse(l),
collapse="\n"))
},
reduce = function(k, v) {
mapres <- lapply(v, function(x)
eval(parse(text=x)))
xy <- Reduce("+", lapply(mapres, function(x) x$mat))
csf <- Reduce("+", lapply(mapres, function(x) x$colsum))
nrf <- Reduce("+", lapply(mapres, function(x) x$nrow))
sts <- csf %*% t(csf)
m1 <- xy / (nrf -1)
m2 <- sts / (nrf * (nrf-1))
m3 <- 2 * sts / (nrf * (nrf-1))
covmat <- m1 + m2 - m3
keyval(1, paste0(deparse(covmat),
collapse="\n"))
}
)
}

Notice that the code is somewhat similar to the version used with ORAAH with 3 key differences:
   i) Mapper output is serialized as a string
 ii) Reducer, thus, is forced to parse input strings
iii) Reducer output is once again a string, requiring client to reconstruct the covariance matrix

Not only is passing strings limiting from the R programmer's perspective, but also has a negative effect on I/O throughput. rmr also supports a proprietary binary representation of delimited text data.

Below we repeat the tests with both delimite and binary representations. Not only is the conversion to the binary representation more expensive, the resulting I/O throughput is not substantially better.


# Convert to RMR2 native format
inputRMR <- mapreduce(inputCsv, 
  input.format = make.input.format("csv",sep=","),
  map = function(k, v) keyval(NULL, v)
)
# 20 mins, 17 sec
# 400 mappers
#
# Using text input
RMR2_cov(inputCsv, make.input.format("csv",sep=","))
# 32 mins, 14 sec
# 400 mappers / 45 reducers
#
# Using binary input
RMR2_cov(inputRMR, "native")
# 17 mins, 18 sec
# 400 mappers / 45 reducers

To summarize ORAAH is 4x faster than rmr out of the box for a simple covariance calculation.





























 Text Input
 Binary Input
 Text to Binary Conversion
 ORAAH  7 min, 19 sec
 4 min, 18 sec
 4 min 54 sec
 rmr  32 min, 14 sec
 17 min, 18 sec
 20 min 17 sec

 4.4x faster
 4x faster
 4.14x faster

About

The place for best practices, tips, and tricks for applying Oracle R Enterprise, Oracle R Distribution, ROracle, and Oracle R Advanced Analytics for Hadoop in both traditional and Big Data environments.

Search

Archives
« April 2014
SunMonTueWedThuFriSat
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today