Wednesday May 06, 2015

Experience using ORAAH on a customer business problem: some basic issues & solutions

We illustrate in this blog a few simple, practical solutions for problems which can arise when developing ORAAH mapreduce applications for the Oracle BDA. These problems were actually encountered during a recent POC engagement. The customer, an  important player in the medical technologies market, was interested in building an analysis flow consisting of a sequence of data manipulation and transformation steps followed by multiple model generation. The data preparation included multiple types of merging, filtering, variable generation based on complex search patterns and represented, by far, the most time consuming component of the flow. The original implementation on the customer's hardware required multiple days per flow to complete. Our ORAAH mapreduce based implementation running on a X5-2 Starter Rack BDA reduced that time to between 4-20 minutes, depending on which flow was tested.

The points which will be addressed in this blog are related to the fact that the data preparation was structured as a chain of task where each tasks performed transformations on HDFS data generated by one or multiple upstream tasks. More precisely we will consider the:

  • Merging of HDFS data from multiple sources

  • Re-balancing and parts reduction for HDFS data

  • Getting unique levels for categorical variables from HDFS data

  • Partitioning the data for distributed mapreduce execution

'Merging data' from above is to be understood as row binding of multiple tables. Re-balancing and parts reduction addresses the fact the HDFS data (generated by upstream jobs) may consist of very unequal parts (chunks) - this would lead to performance losses when this data further processed by other mapreduce jobs. The 3rd and 4th items are related. Getting the unique levels of categorical variables was useful for the data partitioning process, namely for how to generate the key-values pairs within the mapper functions.

1. Merging of hdfs data from multiple sources

The practical case here is that of a data transformation task for which the input consists of several, similarly structured HDFS data sets. As a reminder, data in HDFS is stored as a collection of flat files/chunks (part-00000, part-00001, etc) under an HDFS directory and the hdfs.* functions access the directory, not the 'part-xxxxx' chunks. Also the* functions work with single input data objects (HDFS object identifier representing a directory in HDFS); R rbind, cbind, merge, etc operations cannot be invoked within mapreduce to bind two or several large tables.

For the case under consideration, each input (dataA_dfs, dataB_dfs, etc) consists of a different number of files/chunks

[1] "__ORCHMETA__" "part-00000" "part-00001" .... "part-00071"
[1] "__ORCHMETA__" "part-00000" "part-00001" .... "part-00035"

corresponding to the number of reducers used by the upstream mapreduce jobs which generated this data. As these multiple chunks from various HDFS directories need to be processed as a single input data, they need to be moved into a unique HDFS directory. The 'merge_hdfs_data' function below does just that, by creating a new HDFS directory and copying all the part-xxxxx from each source directory  with proper updating of the resulting parts numbering. :

R> merge_hdfs_data <- function(SrcDirs,TrgtDir) {
  #cat(sprintf("merge_hdfs_files : Creating %s ...\n",TrgtDir))
  i <- 0
  for (srcD in SrcDirs) {
    fparts <-,pattern="part")
    srcd <- (hdfs.describe(get(srcD)))[1,2]
    for (fpart in fparts) {
      #cat(sprintf("merge_hdfs_files : Copying %s/%s to %s ...\n",

      i <- i+1

Merging of the dataA_dfs and dataB_dfs directories into a new data_merged_dfs directory is achieved through:

R> merge_hdfs_data(c("dataA_dfs","dataB_dfs"),"data_merged_dfs")

2. Data re-balancing / Reduction of the number of parts

Data stored in HDFS can suffer from two key problems that will affect performance: too many small files and files with very different numbers of records, especially those with very few records. The merged data produced by the function  above consists of a number of files equal to the sum of all files from all input HDFS directories. Since the upstream mapeduce jobs generating the inputs were run with a high number of reducers (for faster execution) the resulting total number of files got large (100+). This created an impractical constraint for the subsequent analysis as one cannot run a mapreduce application with a number of mappers less than the number of parts (the reverse is true, hdfs parts are splittable for processing by multiple mappers). Moreover if the parts have very different number of records the performance of the application will be affected since different mappers will handle very different volumes of data.

The rebalance_data function below represents a simple way of addressing these issues. Every mapper splits its portion of the data into a user-defined number of parts (nparts) containing quasi the same number of records. A key is associated with each part. In this implementation the number of reducers is set to the number of parts. After shuffling each reducer will collect the records corresponding to one particular key and write them to the output. The overall output consists of nparts parts with quasi equal size. A basic mechanism for preserving the data types is illustrated (see the map.output and reduce.output constructs below).

R> rebalance_data <- function(HdfsData,nmap,nparts)
  mapper_func <- function(k,v) {
    nlin <- nrow(v)
    if(nlin>0) {
      idx.seq <- seq(1,nlin)
      kk <- ceiling(idx.seq/(nlin/nparts))
  reducer_func <- function(k,v) {
    if (nrow(v) > 0) { orch.keyvals(k=NULL,v) }
  dtypes.out <- sapply(hdfs.meta(HdfsData)$types,
                       function(x) ifelse(x=="character","\"a\"",
  val.str <- paste0(hdfs.meta(HdfsData)$names,"=",dtypes.out,collapse=",") <- sprintf("data.frame(key=0,%s)",val.str) <- sprintf("data.frame(key=NA,%s)",val.str)

  config <- new("mapred.config",
            = "rebalance_data",
                map.output    = eval(parse(,
                reduce.output = eval(parse(,
                map.tasks     = nmap,
                reduce.tasks  = nparts)
                reduce.split  = 1e5)
  res <- = HdfsData,
                    mapper = mapper_func,
                    reducer = reducer_func,
                    config = config,
                    cleanup = TRUE

Before using this function, the data associated with the new data_merged_dfs directory needs to be attached to the ORAAH framework:

R> data_merged_dfs <- hdfs.attach("data_merged_dfs")

The invocation below uses 144 mappers for splitting the data into 4 parts:

R> x <- rebalance_data(data_merged_dfs,nmap=144,nparts=4)

The user may also want to save the resulting object, permanently, under some convenient/recognizable name like 'data_rebalanced_dfs' for example. The path to the temporary object x is retrieved with the hdfs.describe() command and provided as first argument to the hdfs.cp() command.

R> tmp_dfs_name <- hdfs.describe(x)[1,2]
R> hdfs.cp(tmp_dfs_name,"data_rebalanced_dfs",overwrite=TRUE)

The choice of the number of parts is up to the user. It is better to have a few parts to avoid constraining from below the number of mappers for the downstream runs but one should consider other factors like the read/write performance related to the size of the data sets, the HDFS block size, etc which are not the topic of the present blog.

3. Getting unique levels

Determining the unique levels of categorical variables in a dataset is of basic interest for any data exploration procedure. If the data is distributed in HDFS, this determination requires an appropriate solution. For the application under consideration here, getting the unique levels serves another purpose; the unique levels are used to generate data splits better suited for distributed execution by the downstream mapreduce jobs. More details are available in the next section.

Depending on the categorical variables in question and data charactersitics, the determination of unique levels may require different solutions. The implementation below is a generic solution providing these levels for multiple variables bundled together in the input argument 'cols'. The mappers associate a key with each variable and collect the unique levels for each of these variables. The resulting array of values are packed in text stream friendly format and provided as value argument to orch.keyvals() - in this way complex data types can be safely passed between the mappers and reducers (via text-based Hadoop streams). The reducers unpack the strings, retrieve the all values associated with a particular key (variable) and re-calculate the unique levels accounting now for all values of that variable.

R> get_unique_levels <- function(x, cols, nmap, nred) {
  mapper <- function(k, v) {
    for (col in cols) {
      uvals <- unique(v[[col]])
      orch.keyvals(col, orch.pack(uvals))
  reducer <- function(k, v) {
    lvals <- orch.unpack(v$val)
    uvals <- unique(unlist(lvals))
    orch.keyval(k, orch.pack(uvals))
  config <- new("mapred.config",
            = "get_unique_levls",
                map.output    = data.frame(key="a",val="packed"),
                reduce.output = data.frame(key="a",val="packed"),
                map.tasks     = nmap,
                reduce.tasks  = nred,
  res <- = x,
                    mapper = mapper,
                    reducer = reducer,
                    config = config,
                    export = orch.export(cols=cols))
  resl <- (lapply((hdfs.get(res))$val,function(x){orch.unpack(x)}))[[1]]

This implementation works fine provided that the number of levels for the categorical variables are much smaller than the large number of records of the entire data. If some categorical variables have many levels, not far  from order of the total number of records, each mapper may return a large numbers of levels and each reducer may have to handle multiple large objects. An efficient solution for this case requires a different approach. However, if the column associated with one of these variables can  fit in memory, a direct, very crude calculation like below can run faster than the former implementation. Here the mappers extract the column with the values of the variable in question, the column is pulled into an in-memory object and unique() is called to determine the unique levels.

R> get_unique_levels_sngl <- function(HdfsData,col,nmap)
  mapper_fun <- function(k,v) { orch.keyvals(key=NULL,v[[col]]) }
  config <- new("mapred.config",
            = "extract_col",
                map.output    = data.frame(key=NA,VAL=0),
                map.tasks     = nmap)
    x <-,
  xl <- hdfs.get(x)
  res <- unique(xl$VAL)

R> customers <- get_unique_levls_sngl(data_rebalanced_dfs,"CID",nmap=32)

We obtained thus the unique levels of the categorical variable CID (customer id) from our data_balanced_dfs data.

4. Partitioning the data for mapreduce execution

Let's suppose that the user wants to execute some specific data manipulations at the CID level like aggregations, variable transformations or new variables generation, etc. Associating a key with every customer (CID level) would be a bad idea since there are many customers - our hypothesis was that the number of CID levels is not orders of magnitude below the total number of records. This would lead to an excessive number of reducers with a terrible impact on performance. In such case it would be better, for example, to bag customers into groups and distribute the execution at the group level. The user may want to set the number of this groups ngrp to something commensurate with the number of  BDA cores available for parallelizing the task.

The example below illustrates how to do that at a basic level. The groups are generated within the encapsulating function myMRjob, before the execution - the var.grp dataframe has two columns : the CID levels and the group number (from 1 to ngrp) with which they are associated. This table is passed to the hadoop execution environment via orch.export() within The mapper_fun function extracts the group number as key and inserts the multiple key-values pairs into the output buffer. The reducer gets then a complete set of records for every customer associated with a particular key(group) and can proceed with the transformations/ manipulations within a loop-over-customers or whatever programming construct would be appropriate. Each reducer would handle a quasi-equal number of customers because this is how the groups were generated. However the number of records per customer is not constant and may introduce some imbalances.

R> myMRjob <- function(HdfsData,var,ngrp,nmap,nred)
  mapper_fun <- function(k,v) {
    fltr <- <some_row_filetring>
    cID <- which(names(v) %in% "CUSTOMID")
    kk <- var.grps[match(v[fltr,cID],var.grps$CUSTOMID),2]
  reducer_fun <- function(k,v) { ... }
  config <- new("mapred.config", map.tasks = nmap, reduce.tasks = nred,....)

  var.grps <- data.frame(CUSTOMID=var,

  res <- = HdfsData,
                    mapper = mapper_fun,
                    reducer = reducer_fun,
                    config = config,
                    export = orch.export(var.grps=var.grps,ngrp=ngrp),
                    cleanup = TRUE

x <- myMRjob(HdfsData=data_balanced_dfs, var=customers, ngrp=..,nmap=..,nred=..)

Improved data partitioning solutions could be sought for the cases where there are strong imbalances in the number of records per customer or if great variations are noticed between the reducer jobs completion times. This kind of optimization will be addressed in a later blog.

Thursday Feb 12, 2015

Pain Point #6: “We need to build 10s of thousands of models fast to meet business objectives”

The last pain point in this series on Addressing Analytic Pain Points, involves one aspect of what I call massive predictive modeling. Increasingly, enterprise customers are building a greater number of models. In past decades, producing a handful of production models per year may have been considered a significant accomplishment. With the advent of powerful computing platforms, parallel and distributed algorithms, as well as the wealth of data – Big Data – we see enterprises building hundreds and thousands of models in targeted ways.

For example, consider the utility sector with data being collected from household smart meters. Whether water, gas, or electricity, utility companies can make more precise demand projections by modeling individual customer consumption behavior. Aggregating this behavior across all households can provide more accurate forecasts, since individual household patterns are considered, not just generalizations about all households, or even different household segments.

The concerns associated with this form of massive predictive modeling include: (i) dealing effectively with Big Data from the hardware, software, network, storage and Cloud, (ii) algorithm and infrastructure scalability and performance, (iii) production deployment, and (iv) model storage, backup, recovery and security. Some of these I’ve explored under previous pain points blog posts.

Oracle Advanced Analytics (OAA) and Oracle R Advanced Analytics for Hadoop (ORAAH) both provide support for massive predictive modeling. From the Oracle R Enterprise component of OAA, users leverage embedded R execution to run user-defined R functions in parallel, both from R and from SQL. OAA provides the infrastructure to allow R users to focus on their core R functionality while allowing Oracle Database to handle spawning of R engines, partitioning data and providing data to their R function across parallel R engines, aggregating results, etc. Data parallelism is enabled using the “groupApply” and “rowApply” functions, while task parallelism is enabled using the “indexApply” function. The Oracle Data Mining component of OAA provides "on-the-fly" models, also called "predictive queries," where the model is automatically built on partitions of the data and scoring using those partitioned models is similarly automated.

ORAAH enables the writing of mapper and reducer functions in R where corresponding ORE functionality can be achieved on the Hadoop cluster. For example, to emulate “groupApply”, users write the mapper to partition the data and the reducer to build a model on the resulting data. To emulate “rowApply”, users can simply use the mapper to perform, e.g., data scoring and passing the model to the environment of the mapper. No reducer is required.

Tuesday Feb 18, 2014

Low-Rank Matrix Factorization in Oracle R Advanced Analytics for Hadoop

This guest post from Arun Kumar, a graduate student in the Department of Computer Sciences at the University of Wisconsin-Madison, describes work done during his internship in the Oracle Advanced Analytics group.

Oracle R Advanced Analytics For Hadoop (ORAAH), a component of Oracle’s Big Data Connectors software suite is a collection of statistical and predictive techniques implemented on Hadoop infrastructure. In this post, we introduce and explain techniques for a popular machine learning task that has diverse applications ranging from predicting ratings in recommendation systems to feature extraction in text mining namely matrix completion and factorization. Training, scoring, and prediction phases for matrix completion and factorization are available in ORAAH. The models generated can also be transparently loaded into R for ad-hoc inspection. In this blog, post we describe implementation specifics of these two techniques available in ORAAH.


Consider an e-commerce company that displays products to potential customers on its webpage and collects data about views, purchases, ratings (e.g., 1 to 5 stars), etc. Increasingly, such online retailers are using machine learning techniques to predict in advance which products a customer is likely to rate highly and recommend such products to the customers in the hope that they might purchase them. Users build a statistical model based on the past history of ratings by all customers on all products. One popular model to generate predictions from such a hyper-sparse matrix is the latent factor model, also known as the low-rank matrix factorization model (LMF).

The setup is the following – we are given a large dataset of past ratings (potentially in the billions), say, with the schema (Customer ID, Product ID, Rating). Here, Customer ID refers to a distinct customer, Product ID refers to a distinct product, and Rating refers to a rating value, e.g., 1 to 5. Conceptually, this dataset represents a large matrix D with m rows (number of customers) and n columns (number of products), where the entries are the available ratings. Notice that this matrix is likely to be extremely sparse, i.e., many ratings could be missing since most customers typically rate only a few products. Thus, the task here is matrix completion – we need to predict the missing ratings so that it can be used for downstream processing such as displaying the top recommendations for each customer.

The LMF model assumes that the ratings matrix can be approximately generated as a product of two factor matrices, L and R, which are much smaller than D (lower rank). The idea is that the product L * R will approximately reconstruct the existing ratings and also automatically predict the missing ratings in D. More precisely, for each available rating (i,j,v) in D, we have (L x R) [i,j] ≈ v, while for each missing rating (i',j') in D, the predicted rating is (L x R) [i',j']. The model has a parameter r, which dictates the rank of the factor matrices, i.e., L is m x r, while R is r x n.

Matrix Completion in ORAAH

LMF can be invoked out-of-the-box using the routine orch.lmf. An execution based on the above example is shown below. The dataset of ratings is in a CSV file on HDFS with the schema above (named “retail_ratings” here).

input <- hdfs.attach("retail_ratings")
fit <- orch.lmf(input)

# Export the model into R memory
lr <-

# Compute the prediction for the point (100, 50)

# First column of lr$L contains the userid
userid <- lr$L[,1] == 100 # find row corresponding to user id 100
L <- lr$L[, 2:(rank+1)]

#First column contains the itemid
itemid <- lr$R[,1] == 50 # find row corresponding to item id 50
R <- lr$R[, 2:(rank+1)]

# dot product as sum of terms obtained through component wise multiplication
pred <- sum(L[userid,] * R[itemid,])

The factor matrices can be transparently loaded into R for further inspection and for ad-hoc predictions of specific customer ratings using R. The algorithm we use for training the LMF model is called Incremental Gradient Descent (IGD), which has been shown to be one of the fastest algorithms for this task [1, 2].

The entire set of arguments for the function orch.lmf along with a brief description of each and their default values is given in the table below. The latin parameter configures the degree of parallelism for executing IGD for LMF on Hadoop [2]. ORAAH sets this automatically based on the dimensions of the problem and the memory available to each Mapper. Each Mapper fits its partition of the model in memory, and the multiple partitions run in parallel to learn different parts of the model. The last five parameters configure IGD and need to be tuned by the user to a given dataset since they can impact the quality of the model obtained.

ORAAH also provides routines for predicting ratings as well as for evaluating the model (computing the error of the model on a given labeled dataset) on a large scale over HDFS-resident datasets. The routine for prediction of ratings is predict, and for evaluating is orch.evaluate. Use help(orch.lmf) for online documentation, and demo(orch_lmf_jellyfish) for a fully working example including model fit, evaluation, and prediction.

Other Matrix Factorization Tasks

While LMF is primarily used for matrix completion tasks, it can also be used for other matrix factorization tasks that arise in text mining, computer vision, and bio-informatics, e.g., dimension reduction and feature extraction. In these applications, the input data matrix need not necessarily be sparse. Although many zeros might be present, they are not treated as missing values. The goal here is simply to obtain a low-rank factorization D ≈ L x R as accurately as possible, i.e., the product L x R should recover all entries in D, including the zeros. Typically, such applications use a Non-Negative Matrix Factorization (NMF) approach due to non-negativity constraints on the factor matrix entries. However, many of these applications often do not need non-negativity in the factor matrices. Using NMF algorithms for such applications leads to poorer-quality solutions. Our implementation of matrix factorization for such NMF-style tasks can be invoked out-of-the-box in ORAAH using the routine orch.nmf, which has the same set of arguments as LMF.

Experimental Results & Comparison with Apache Mahout

We now present an empirical evaluation of the performance, quality, and scalability of the ORAAH LMF tool based on IGD and compare it to the most widely used off-the-shelf tool for LMF on Hadoop – an implementation of the ALS algorithm from Apache Mahout [3].

All our experiments are run on an Oracle Big Data Appliance Hadoop cluster with nine nodes, each with Intel Xeon X5675 12-core 3.07GHz processors, 48 GB RAM, and 20 TB disk. We use 256MB HDFS blocks and 10 reducers for MapReduce jobs.

We use two standard public datasets for recommendation tasks – MovieLens10M (referred to as MLens) and Netflix – for the performance and quality comparisons (insert URL). To study scalability aspects, we use several synthetic datasets of different sizes by changing the number of rows, number of columns, and/or number of ratings. The table below presents the data set statistics.

Results: Performance and Quality

We first present end-to-end overview of the performance and quality achieved by our implementation and Mahout on MLens and Netflix. The rank parameter was set at 50 (a typical choice for such tasks) and the other parameters for both tools were chosen using a grid search. The quality of the factor matrices was determined using the standard measure of root mean square error (RMSE) [2]. We use a 70%-15%-15% Wold holdout of the datasets, i.e., 70% for training, 15% for testing, and 15% for validation of generalization error. The training was performed until 0.1% convergence, i.e., until the fractional decrease in the training RMSE after every iteration reached 0.1%. The table below presents the results.

1. ORAAH LMF has a faster performance than Mahout LMF on the overall training runtime on both datasets – 1.8x faster on MLens and 2.3x faster on Netflix.
2. The per-iteration runtime of ORAAH LMF is much lower than that of Mahout LMF – between 4.4x and 5.4x.
3. Although ORAAH LMF runs more iterations than Mahout LMF, the huge difference in the per-iteration runtimes make the overall runtime smaller for ORAAH LMF.
4. The training quality (training RMSE) achieved is comparable across both tools on both datasets. Similarly, the generalization quality is also comparable. Thus, ORAAH LMF can offer state-of-the-art quality along with faster performance.

Results: Scalability

The ability to scale along all possible dimensions of the data is key to big data analytics. Both ORAAH LMF and Mahout LMF are able to scale to billions of ratings by parallelizing and distributing computations on Hadoop. But we now show that unlike Mahout LMF, ORAAH LMF is also able to scale to hundreds of millions of customers (m) and products (n), and also scales well with the rank results along these three dimensions – m, n, and r. parameter (r, which affects the size of the factor matrices). The figure below presents the scalability.

1. Figures (A) and (B) plot the results for the Syn-row and Syn-col datasets, respectively (r = 2). ORAAH LMF scales linearly with both number of rows (m) and number of columns (n), while Mahout LMF does not show up on either plot because it crashes at all these values of m. In fact, we verified that Mahout LMF does not scale beyond even m = 20 M! The situation is similar with n. This is because Mahout LMF assumes that the factor matrices L and R fit entirely in the memory of each Mapper. In contrast, ORAAH LMF uses a clever partitioning scheme on all matrices ([2]) and can thus scale seamlessly on all dataset dimensions.
2. Figure (C) shows the impact of the rank parameter r. ORAAH LMF scales linearly with r and the per-iteration runtime roughly doubles between r = 20 and r = 100. However, the per-iteration runtime of Mahout LMF varies quadratically with r, and in fact, increases by a factor of 40x between r = 20 and r = 100! Thus, ORAAH LMF is also able to scale better with r.
3. Finally, on the tera-scale dataset Syn-tera with 1 billion rows, 10 million columns, and 20 billion ratings, ORAAH LMF (for r = 2) finishes an iteration in just under 2 hours!


The matrix factorization features in ORAAH were implemented and benchmarked by Arun Kumar during his summer internship at Oracle under the guidance of Vaishnavi Sashikanth. He is pursuing his PhD in computer science from the University of Wisconsin-Madison. This work is the result of a collaboration between Oracle and the research group of Dr. Christopher Ré, who is now at Stanford University. Anand Srinivasan helped integrate these features into ORAAH.


[1] Towards a Unified Architecture for in-RDBMS Analytics. Xixuan Feng, Arun Kumar, Benjamin Recht, and Christopher Ré. ACM SIGMOD 2012.

[2] Parallel Stochastic Gradient Algorithms for Large-Scale Matrix Completion. Benjamin Recht and Christopher Ré. Mathematical Programming Computation 2013.

[3] Apache Mahout.

Friday Jan 03, 2014

ORAAH - Enabling high performance R workloads on Hadoop

One of the features of Oracle R Advanced Analytics for Hadoop (ORAAH) is enabling Hadoop jobs written in the R language. R is a popular open-source language and environment for statistical computing and graphics. ORAAH enables R programmers to leverage a Hadoop cluster operating on data resident in HDFS files.

In this blog post, we examine the performance characteristics of ORAAH with an example and explain what makes ORAAH the fastest alternative available to run Hadoop-R jobs. We also compare the results with another popular Hadoop interface for R, rmr.

Credit to Vlad Sharanhovich and Anand Srinivasan for providing the content for this blog post.

In probability theory and statistics, covariance is a measure of how much two variables change together.

Variables that tend to show similar behavior exhibit positive covariance. Alternatively if the greater values of one variable correspond with the smaller values of another then the covariance between the variables is negative. We use covariance computation as the running example below that you can use to reproduce the results detailed here.

The tests were performed on a 6-node cluster running ORAAH version 2.3.1.

Cluster configuration:

  • 6 node cluster

  • BDA v2.3.1 (based on CDH 4.4)

  • 4 tasktrackers

  • CPU: Intel ® Xeon® CPU X5675  @ 3.07GHz

  • RAM: 47GB

ORAAH comes with a convenience function for data generation. We use this function to generate a 100 GB HDFS input file with numeric values for 200 variables (columns), as follows:

# Generate 100GB input dataset (using pre-release 2.4.0)
inputCsv <- orch.datagen(1e+11, numeric.col.count=200, parts=100)
# 15 mins, 53 sec
# 200 mappers

ORAAH supports 2 types of HDFS input: delimited text files and a binary RDATA representation (R's own binary representation). In many cases, RDATA representation provides much better I/O throughput compared to delimited text files.

# Converting into ORAAH native format
inputRdata <- hdfs.toRData(inputCsv,"100G_200n_rd")
# 4 mins, 54 sec
# 400 mappers

Next, we write the mapper and reducer code for parallel/distributed covariance computation. Below, the mapper function accepts a data.frame representation of the input data and generates structured output with 3 components: a matrix, a vector of column sums, and input row count. The single reducer function merges the structured output generated from the mappers to produce the final covariance matrix.

The mapper and reducer functions are supplied as input to ORAAH's function. This function additionally takes an HDFS file as input in the dataargument. The structured output from the mapper and reducer is defined in mapred.config data structure. Optionally, the Hadoop job can be given a name (in this case "cov") for traceability.

<- function(x) {
data = x,
mapper = function(k, v) {
m <- as.matrix(v)
cs <- colSums(m)
nr <- nrow(m)
mtm <- t(m) %*% m
l <- list(mat=mtm, colsum=cs, nrow=nr)
orch.keyval(NULL, orch.pack(l)),
reducer = function(k, v) {
mapres <- orch.unpack(v$val, as.list=T)
xy <- Reduce("+", lapply(mapres,function(x) x$mat))
csf <- Reduce("+", lapply(mapres,function(x) x$colsum))
nrf <- Reduce("+", lapply(mapres,function(x) x$nrow))
sts <- csf %*% t(csf)
m1 <- xy / (nrf -1)
m2 <- sts / (nrf * (nrf-1))
m3 <- 2 * sts / (nrf * (nrf-1))
covmat <- m1 + m2 - m3
orch.keyval(NULL, orch.pack(covmat))
config = new("mapred.config",
map.output = data.frame(key="none",
reduce.output = data.frame(key="none", val="c")), = "cov")

The execution times are shown below:

# Using text input
# 7 mins, 19 sec
# 400 mappers / 45 reducers

# Using binary input
# 4 mins, 18 sec
# 400 mappers / 45 reducers

A few points to observe:
1. Before an HDFS file can be used with ORAAH's function, its metadata must be known. ORAAH automatically determines the data types of the columns in the HDFS file by sampling rows. The metadata is created during the hdfs.attach() call. This metadata enables ORAAH to generate highly optimized scan routines to read rows from the file.
2. ORAAH implements caching of input and output structures from mappers and reducers thereby lifting the burden on dealing with large data volumes from the R engine.
3. ORAAH's orch.pack() and orch.unpack() functions enable transfer of structured constructs between mappers and reducers, which further improves I/O throughput by eliminating the need to scan/parse string inputs.
4. ORAAH leverages R's own RDATA representation as the binary representation. The key to better execution performance of R jobs on Hadoop is managing I/O throughput and carefully bypassing R's inherent limitations with parsing strings.

We contrast ORAAH's performance by comparing it with an open source package called rmr ( repeating the covariance calculation on the same cluster and input data set. For this experiment, we used rmr version 2.3.0.

The covariance computation is written in rmr as shown below.

RMR2_cov <- function(x, input.format) {
input.format = input.format,
map = function(k, v) {
m <- as.matrix(v)
cs <- colSums(m)
nr <- nrow(m)
mtm <- t(m) %*% m
l <- list(mat=mtm, colsum=cs, nrow=nr)
keyval(1, paste0(deparse(l),
reduce = function(k, v) {
mapres <- lapply(v, function(x)
xy <- Reduce("+", lapply(mapres, function(x) x$mat))
csf <- Reduce("+", lapply(mapres, function(x) x$colsum))
nrf <- Reduce("+", lapply(mapres, function(x) x$nrow))
sts <- csf %*% t(csf)
m1 <- xy / (nrf -1)
m2 <- sts / (nrf * (nrf-1))
m3 <- 2 * sts / (nrf * (nrf-1))
covmat <- m1 + m2 - m3
keyval(1, paste0(deparse(covmat),

Notice that the code is somewhat similar to the version used with ORAAH with 3 key differences:
   i) Mapper output is serialized as a string
 ii) Reducer, thus, is forced to parse input strings
iii) Reducer output is once again a string, requiring client to reconstruct the covariance matrix

Not only is passing strings limiting from the R programmer's perspective, but also has a negative effect on I/O throughput. rmr also supports a proprietary binary representation of delimited text data.

Below we repeat the tests with both delimite and binary representations. Not only is the conversion to the binary representation more expensive, the resulting I/O throughput is not substantially better.

# Convert to RMR2 native format
inputRMR <- mapreduce(inputCsv, 
  input.format = make.input.format("csv",sep=","),
  map = function(k, v) keyval(NULL, v)
# 20 mins, 17 sec
# 400 mappers
# Using text input
RMR2_cov(inputCsv, make.input.format("csv",sep=","))
# 32 mins, 14 sec
# 400 mappers / 45 reducers
# Using binary input
RMR2_cov(inputRMR, "native")
# 17 mins, 18 sec
# 400 mappers / 45 reducers

To summarize ORAAH is 4x faster than rmr out of the box for a simple covariance calculation.

 Text Input
 Binary Input
 Text to Binary Conversion
 ORAAH  7 min, 19 sec
 4 min, 18 sec
 4 min 54 sec
 rmr  32 min, 14 sec
 17 min, 18 sec
 20 min 17 sec

 4.4x faster
 4x faster
 4.14x faster


The place for best practices, tips, and tricks for applying Oracle R Enterprise, Oracle R Distribution, ROracle, and Oracle R Advanced Analytics for Hadoop in both traditional and Big Data environments.


« June 2016