Wednesday May 06, 2015

Experience using ORAAH on a customer business problem: some basic issues & solutions

We illustrate in this blog a few simple, practical solutions for problems which can arise when developing ORAAH mapreduce applications for the Oracle BDA. These problems were actually encountered during a recent POC engagement. The customer, an  important player in the medical technologies market, was interested in building an analysis flow consisting of a sequence of data manipulation and transformation steps followed by multiple model generation. The data preparation included multiple types of merging, filtering, variable generation based on complex search patterns and represented, by far, the most time consuming component of the flow. The original implementation on the customer's hardware required multiple days per flow to complete. Our ORAAH mapreduce based implementation running on a X5-2 Starter Rack BDA reduced that time to between 4-20 minutes, depending on which flow was tested.

The points which will be addressed in this blog are related to the fact that the data preparation was structured as a chain of task where each tasks performed transformations on HDFS data generated by one or multiple upstream tasks. More precisely we will consider the:


  • Merging of HDFS data from multiple sources

  • Re-balancing and parts reduction for HDFS data

  • Getting unique levels for categorical variables from HDFS data

  • Partitioning the data for distributed mapreduce execution


'Merging data' from above is to be understood as row binding of multiple tables. Re-balancing and parts reduction addresses the fact the HDFS data (generated by upstream jobs) may consist of very unequal parts (chunks) - this would lead to performance losses when this data further processed by other mapreduce jobs. The 3rd and 4th items are related. Getting the unique levels of categorical variables was useful for the data partitioning process, namely for how to generate the key-values pairs within the mapper functions.

1. Merging of hdfs data from multiple sources


The practical case here is that of a data transformation task for which the input consists of several, similarly structured HDFS data sets. As a reminder, data in HDFS is stored as a collection of flat files/chunks (part-00000, part-00001, etc) under an HDFS directory and the hdfs.* functions access the directory, not the 'part-xxxxx' chunks. Also the hadoop.run()/hadoop.exec().* functions work with single input data objects (HDFS object identifier representing a directory in HDFS); R rbind, cbind, merge, etc operations cannot be invoked within mapreduce to bind two or several large tables.

For the case under consideration, each input (dataA_dfs, dataB_dfs, etc) consists of a different number of files/chunks


R> hdfs.ls("dataA_dfs")
[1] "__ORCHMETA__" "part-00000" "part-00001" .... "part-00071"
R> hdfs.ls("dataB_dfs")
[1] "__ORCHMETA__" "part-00000" "part-00001" .... "part-00035"


corresponding to the number of reducers used by the upstream mapreduce jobs which generated this data. As these multiple chunks from various HDFS directories need to be processed as a single input data, they need to be moved into a unique HDFS directory. The 'merge_hdfs_data' function below does just that, by creating a new HDFS directory and copying all the part-xxxxx from each source directory  with proper updating of the resulting parts numbering. :

R> merge_hdfs_data <- function(SrcDirs,TrgtDir) {
  #cat(sprintf("merge_hdfs_files : Creating %s ...\n",TrgtDir))
  hdfs.mkdir(TrgtDir,overwrite=TRUE)
  i <- 0
  for (srcD in SrcDirs) {
    fparts <- hdfs.ls(get(srcD),pattern="part")
    srcd <- (hdfs.describe(get(srcD)))[1,2]
    for (fpart in fparts) {
      #cat(sprintf("merge_hdfs_files : Copying %s/%s to %s ...\n",

                       srcD,fpart,TrgtDir))
      i <- i+1
      hdfs.cp(paste(srcd,fpart,sep="/"),sprintf("%s/part-%05d",TrgtDir,i))
    }
  }
}


Merging of the dataA_dfs and dataB_dfs directories into a new data_merged_dfs directory is achieved through:

R> merge_hdfs_data(c("dataA_dfs","dataB_dfs"),"data_merged_dfs")

2. Data re-balancing / Reduction of the number of parts


Data stored in HDFS can suffer from two key problems that will affect performance: too many small files and files with very different numbers of records, especially those with very few records. The merged data produced by the function  above consists of a number of files equal to the sum of all files from all input HDFS directories. Since the upstream mapeduce jobs generating the inputs were run with a high number of reducers (for faster execution) the resulting total number of files got large (100+). This created an impractical constraint for the subsequent analysis as one cannot run a mapreduce application with a number of mappers less than the number of parts (the reverse is true, hdfs parts are splittable for processing by multiple mappers). Moreover if the parts have very different number of records the performance of the application will be affected since different mappers will handle very different volumes of data.

The rebalance_data function below represents a simple way of addressing these issues. Every mapper splits its portion of the data into a user-defined number of parts (nparts) containing quasi the same number of records. A key is associated with each part. In this implementation the number of reducers is set to the number of parts. After shuffling each reducer will collect the records corresponding to one particular key and write them to the output. The overall output consists of nparts parts with quasi equal size. A basic mechanism for preserving the data types is illustrated (see the map.output and reduce.output constructs below).

R> rebalance_data <- function(HdfsData,nmap,nparts)
{
  mapper_func <- function(k,v) {
    nlin <- nrow(v)
    if(nlin>0) {
      idx.seq <- seq(1,nlin)
      kk <- ceiling(idx.seq/(nlin/nparts))
      orch.keyvals(kk,v)
    }
  }
  reducer_func <- function(k,v) {
    if (nrow(v) > 0) { orch.keyvals(k=NULL,v) }
  }
  dtypes.out <- sapply(hdfs.meta(HdfsData)$types,
                       function(x) ifelse(x=="character","\"a\"",
                                          ifelse(x=="logical","FALSE","0")))
  val.str <- paste0(hdfs.meta(HdfsData)$names,"=",dtypes.out,collapse=",")
  meta.map.str <- sprintf("data.frame(key=0,%s)",val.str)
  meta.red.str <- sprintf("data.frame(key=NA,%s)",val.str)

  config <- new("mapred.config",
                job.name      = "rebalance_data",
                map.output    = eval(parse(text=meta.map.str)),
                reduce.output = eval(parse(text=meta.red.str)),
                map.tasks     = nmap,
                reduce.tasks  = nparts)
                reduce.split  = 1e5)
  res <- hadoop.run(data = HdfsData,
                    mapper = mapper_func,
                    reducer = reducer_func,
                    config = config,
                    cleanup = TRUE
  )
  res
}

Before using this function, the data associated with the new data_merged_dfs directory needs to be attached to the ORAAH framework:

R> data_merged_dfs <- hdfs.attach("data_merged_dfs")

The invocation below uses 144 mappers for splitting the data into 4 parts:

R> x <- rebalance_data(data_merged_dfs,nmap=144,nparts=4)


The user may also want to save the resulting object, permanently, under some convenient/recognizable name like 'data_rebalanced_dfs' for example. The path to the temporary object x is retrieved with the hdfs.describe() command and provided as first argument to the hdfs.cp() command.

R> tmp_dfs_name <- hdfs.describe(x)[1,2]
R> hdfs.cp(tmp_dfs_name,"data_rebalanced_dfs",overwrite=TRUE)

The choice of the number of parts is up to the user. It is better to have a few parts to avoid constraining from below the number of mappers for the downstream runs but one should consider other factors like the read/write performance related to the size of the data sets, the HDFS block size, etc which are not the topic of the present blog.

3. Getting unique levels


Determining the unique levels of categorical variables in a dataset is of basic interest for any data exploration procedure. If the data is distributed in HDFS, this determination requires an appropriate solution. For the application under consideration here, getting the unique levels serves another purpose; the unique levels are used to generate data splits better suited for distributed execution by the downstream mapreduce jobs. More details are available in the next section.

Depending on the categorical variables in question and data charactersitics, the determination of unique levels may require different solutions. The implementation below is a generic solution providing these levels for multiple variables bundled together in the input argument 'cols'. The mappers associate a key with each variable and collect the unique levels for each of these variables. The resulting array of values are packed in text stream friendly format and provided as value argument to orch.keyvals() - in this way complex data types can be safely passed between the mappers and reducers (via text-based Hadoop streams). The reducers unpack the strings, retrieve the all values associated with a particular key (variable) and re-calculate the unique levels accounting now for all values of that variable.

R> get_unique_levels <- function(x, cols, nmap, nred) {
  mapper <- function(k, v) {
    for (col in cols) {
      uvals <- unique(v[[col]])
      orch.keyvals(col, orch.pack(uvals))
    }
  }
  reducer <- function(k, v) {
    lvals <- orch.unpack(v$val)
    uvals <- unique(unlist(lvals))
    orch.keyval(k, orch.pack(uvals))
  }
  config <- new("mapred.config",
                job.name      = "get_unique_levls",
                map.output    = data.frame(key="a",val="packed"),
                reduce.output = data.frame(key="a",val="packed"),
                map.tasks     = nmap,
                reduce.tasks  = nred,
  )
  res <- hadoop.run(data = x,
                    mapper = mapper,
                    reducer = reducer,
                    config = config,
                    export = orch.export(cols=cols))
  resl <- (lapply((hdfs.get(res))$val,function(x){orch.unpack(x)}))[[1]]
}

This implementation works fine provided that the number of levels for the categorical variables are much smaller than the large number of records of the entire data. If some categorical variables have many levels, not far  from order of the total number of records, each mapper may return a large numbers of levels and each reducer may have to handle multiple large objects. An efficient solution for this case requires a different approach. However, if the column associated with one of these variables can  fit in memory, a direct, very crude calculation like below can run faster than the former implementation. Here the mappers extract the column with the values of the variable in question, the column is pulled into an in-memory object and unique() is called to determine the unique levels.

R> get_unique_levels_sngl <- function(HdfsData,col,nmap)
{
  mapper_fun <- function(k,v) { orch.keyvals(key=NULL,v[[col]]) }
  config <- new("mapred.config",
                job.name      = "extract_col",
                map.output    = data.frame(key=NA,VAL=0),
                map.tasks     = nmap)
    x <- hadoop.run(data=HdfsData,
                    mapper=mapper_fun,
                    config=config,
                    export=orch.export(col=col),
                    cleanup=TRUE)
  xl <- hdfs.get(x)
  res <- unique(xl$VAL)
}

R> customers <- get_unique_levls_sngl(data_rebalanced_dfs,"CID",nmap=32)

We obtained thus the unique levels of the categorical variable CID (customer id) from our data_balanced_dfs data.

4. Partitioning the data for mapreduce execution


Let's suppose that the user wants to execute some specific data manipulations at the CID level like aggregations, variable transformations or new variables generation, etc. Associating a key with every customer (CID level) would be a bad idea since there are many customers - our hypothesis was that the number of CID levels is not orders of magnitude below the total number of records. This would lead to an excessive number of reducers with a terrible impact on performance. In such case it would be better, for example, to bag customers into groups and distribute the execution at the group level. The user may want to set the number of this groups ngrp to something commensurate with the number of  BDA cores available for parallelizing the task.

The example below illustrates how to do that at a basic level. The groups are generated within the encapsulating function myMRjob, before the hadoop.run() execution - the var.grp dataframe has two columns : the CID levels and the group number (from 1 to ngrp) with which they are associated. This table is passed to the hadoop execution environment via orch.export() within hadoop.run(). The mapper_fun function extracts the group number as key and inserts the multiple key-values pairs into the output buffer. The reducer gets then a complete set of records for every customer associated with a particular key(group) and can proceed with the transformations/ manipulations within a loop-over-customers or whatever programming construct would be appropriate. Each reducer would handle a quasi-equal number of customers because this is how the groups were generated. However the number of records per customer is not constant and may introduce some imbalances.

R> myMRjob <- function(HdfsData,var,ngrp,nmap,nred)
{
  mapper_fun <- function(k,v) {
    ....
    fltr <- <some_row_filetring>
    cID <- which(names(v) %in% "CUSTOMID")
    kk <- var.grps[match(v[fltr,cID],var.grps$CUSTOMID),2]
    orch.keyvals(kk,v[fltr,,drop=FALSE])
  }
  reducer_fun <- function(k,v) { ... }
  config <- new("mapred.config", map.tasks = nmap, reduce.tasks = nred,....)

  var.grps <- data.frame(CUSTOMID=var,
    GRP=rep(1:ngrp,sapply(split(var,ceiling(seq_along(var)/(length(var)/ngrp))),length)))

  res <- hadoop.run(data = HdfsData,
                    mapper = mapper_fun,
                    reducer = reducer_fun,
                    config = config,
                    export = orch.export(var.grps=var.grps,ngrp=ngrp),
                    cleanup = TRUE
  )
  res
}

x <- myMRjob(HdfsData=data_balanced_dfs, var=customers, ngrp=..,nmap=..,nred=..)

Improved data partitioning solutions could be sought for the cases where there are strong imbalances in the number of records per customer or if great variations are noticed between the reducer jobs completion times. This kind of optimization will be addressed in a later blog.

Thursday Jan 09, 2014

Invoking R scripts via Oracle Database: Theme and Variation, Part 4

In the first three parts of Invoking R scripts via Oracle Database: Theme and Variation, we introduced features of Oracle R Enterprise embedded R execution involving the functions ore.doEval / rqEval, ore.tableApply / rqTableEval, and ore.groupApply / “rqGroupApply”. In this blog post, we’ll cover the next in our theme and variation series involving ore.rowApply and rqRowEval. The “row apply” function is also one of the parallel-enabled embedded R execution functions. It supports data-parallel execution, where one or more R engines perform the same R function, or task, on disjoint chunks of data. This functionality is essential to enable scalable model scoring/predictions on large data sets and for taking advantage of high-performance computing hardware like Exadata.

As for ore.groupApply, Oracle Database handles the management and control of potentially multiple R engines at the database server machine, automatically chunking and passing data to parallel executing R engines. Oracle Database ensures that R function executions for all chunks of rows complete, or the ORE function returns an error. The result from the execution of each user-defined embedded R function is gathered in an ore.list. This list remains in the database until the user requires the result. However, we’ll also show how data.frame results from each execution can be combined into a single ore.frame. This features works for return values of other embedded R functions as well.

The variation on embedded R execution for ore.rowApply involves passing not only an ore.frame to the function such that the first parameter of your embedded R function receives a data.frame, but also the number of rows that should be passed to each invocation of the user-defined R function. The last chunk, of course, may have fewer rows than specified.

Let’s look at an example. We’re going to use the C50 package to score churn data (i.e., predict which customers are likely to churn) using the C5.0 decision tree models we built in the previous blog post with ore.groupApply. (Well, almost. We need to rebuild the models to take into account the full data set levels.) The goal is to score the customers in parallel leveraging the power of a high performance computing platform, such as Exadata.


library(C50)
data(churn)

ore.create(churnTest, "CHURN_TEST")

myFunction <- function(dat, xlevels, datastorePrefix) {
  library(C50)
  state <- dat[1,"state"]
  datastoreName <- paste(datastorePrefix,state,sep="_")
  dat$state <- NULL
  for (j in names(xlevels))
    dat[[j]] <- factor(dat[[j]], levels = xlevels[[j]])
  ore.load(name=datastoreName)
  res <- data.frame(pred=predict(mod,dat, type="class"),
        actual=dat$churn,
        state=state)
  res
}

xlevels <- ore.getXlevels(~ ., CHURN_TEST[,-1])
scoreList <- ore.groupApply(
  CHURN_TEST,
  INDEX=CHURN_TEST$state,
  myFunction,
  datastorePrefix="myC5.0model3",xlevels=xlevels, ore.connect=TRUE)
score.MA <- ore.pull(scoreList$MA)
table(score.MA$actual, score.MA$pred)

A few points to highlight:

• Instead of computing the levels using the as.factor function inside the user-defined function, we’ll use ore.getXlevels, which returns the levels for each factor column. We don’t need this for the state column, so we exclude it (“-1”). In the previous post we noted that factor data is passed as character columns in the data.frame. Computing the levels first can ensure that all possible levels are provided during model building, even if there are no rows with some of the level values.
• When building models where some levels were missing (due to using as.factor on each partition of data), scoring can fail if the test data has unknown level values. For this reason, the models built in Part 3 need to be rebuilt using the approach above with ore.getXlevels. This is left as an exercise for the reader.
• Assign the function to the variable “myFunction” to facilitate reuse (see below).
• We construct the datastore name to be the same as when we were building the models, i.e., appending the state value to the datastore prefix separated by an ‘_’.
• The for loop iterates over the levels passed in as xlevels, creating a factor using the provided levels and assigning it back to the data.frame.
• Loading the datastore by name, we have access to the variable mod, which contains the model for the particular state.
• The result is constructed as a data.frame with the prediction and the actual values.
• Three arguments are passed: the datastore prefix, the levels that were pre-computed, and that we need to connect to the database because we’re using a datastore.
• The results are stored as a list of ore.frames. We can pull the scores for MA and compute a confusion matrix using table.

This is fine. However, we likely don’t want to have a list of separate ore.frames as the result. We’d prefer to have a single ore.frame with all the results. This can be accomplished using the FUN.VALUE argument. Whenever a data.frame is the result of the user-defined R function, and if the structure of that data.frame is the same across all invocations of the group apply or row apply, you can combine them into a single result by defining the structure as follows:

scores <- ore.groupApply(
  CHURN_TEST,
  INDEX=CHURN_TEST$state,
  myFunction,
  datastorePrefix="myC5.0model3",xlevels=xlevels, ore.connect=TRUE,
  FUN.VALUE=data.frame(pred=character(0),
        actual=character(0),
        state=character(0)));
head(scores)
scores.local <- ore.pull(scores)
table(scores.local[scores.local$state=="MA",c("actual","pred")])

scores.MA <- scores[scores$state=="MA",c("actual","pred")]
table(scores.MA$actual, scores.MA$pred)

A few important points to highlight:

• FUN.VALUE is set to a data.frame that describes the format of the result. By providing this argument, you will get back a single ore.frame, not an ore.list object.
• The group apply completes instantaneously because it is only defining the ore.frame, not actually performing the scoring. Not until the values are needed does the result get computed. We invoke head on the ore.frame in scores to highlight this.
• We can pull the scores to the client to invoke table as before, but subselecting for state MA. However, we can also do this computation in the database using the transparency layer. First, we filter the rows for MA in scores.MA, and then invoke table on the two columns. Note: ORE requires passing the two columns explicitly to the overloaded function table.
• To do this in parallel, add the argument parallel=TRUE to the ore.groupApply call.

Wait! What happened to ore.rowApply?

Above, we showed how to score with multiple models using ore.groupApply. But what if we had customers from a single state that we wanted to score in parallel? We can use ore.rowApply and rqRowEval to invoke a function on chunks of data (rows) at a time, from 1 to the total number of rows. (Note that values closer to the latter will have no benefit from parallelism, obviously.)


scores <- ore.rowApply(
  CHURN_TEST[CHURN_TEST$state=="MA",],
  myFunction,
  datastorePrefix="myC5.0model3",xlevels=xlevels,
  ore.connect=TRUE, parallel=TRUE,
  FUN.VALUE=data.frame(pred=character(0),
        actual=character(0),
        state=character(0)),
  rows=200)
scores
table(scores$actual, scores$pred)

A few points to highlight:

• Since we want to perform the scoring in parallel by state, we filter the rows for MA. This will ensure that all rows processed can use the same predictive model.
• We set the rows argument to 200. CHURN_TEST has 1667 rows, so this will result in nine executions of myFunction. The first eight receiving 200 rows each and the last receiving 67 rows.
• We also set parallel=TRUE above since we want the scoring performed in parallel.
• The invocation of ore.rowApply returns immediately. Not until we print scores do we incur the cost of executing the underlying query. However, also note that each time we access scores, for example in the following call to table, we incur the cost of executing the query. If the result will be used many times in subsequent operations, you may want to create a table with the result using ore.create.

In SQL, we can do the same, but we’ll need to store the function in the R script repository (perhaps called "myScoringFunction") and also store xlevels in a datastore (perhaps called "myXLevels"). While we can pass complex objects in the R interface to embedded R functions, we cannot do that in SQL. Instead, we must pass the name of a datastore. Since the xlevels are in a datastore, the user-defined R function needs to be modified to take this other datastore name and load that datastore to have access to xlevels. This set of changes is left to the reader as an exercise.


select * from table(rqRowEval(
  cursor(select /*+ parallel(t, 4) */ *
        from CHURN_TEST t
        where "state" = 'MA'),
  cursor(select 1 as "ore.connect",
        'myC5.0model3' as "datastorePrefix",
        'myXLevels' as "xlevelsDatastore"
        from dual),
  'select ''aaa'' "pred",''aaa'' "actual" , ''aa'' "state" from dual',
    200, 'myScoringFunction'));

A few points to highlight:

• The input cursor specifies a parallel hint on the input data cursor and filtering data for MA as well.
• Several arguments are being passed, including the new argument to our function myXLevels.
• The output form is specified in the SQL string. Care must be taken to ensure that the column names, ordering, and the length of character strings match the returned data.frame.

Map Reduce

The “row apply” functionality can be thought of in terms of the map-reduce paradigm where the mapper performs the scoring and outputs a data.frame value (no key required). There is no reducer, or the reducer is simply a pass-through.

Memory and performance considerations

Unlike with group apply, the rows argument in row apply ensures an upper bound on the number of rows (and hence memory requirement). The value of rows should be chosen to balance memory and parallel performance. The usual measures can be taken regarding setting memory limits on the R engine – as noted in Part 2.

There may be instances where setting rows = 1 makes sense. For example, if the computation per row is intensive (i.e., takes a long time), sending one row per R engine may be appropriate. Experiment with a range of values for rows to determine the best value for your particular scenario.

About

The place for best practices, tips, and tricks for applying Oracle R Enterprise, Oracle R Distribution, ROracle, and Oracle R Advanced Analytics for Hadoop in both traditional and Big Data environments.

Search

Archives
« August 2015
SunMonTueWedThuFriSat
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
     
Today