Wednesday May 06, 2015

Experience using ORAAH on a customer business problem: some basic issues & solutions

We illustrate in this blog a few simple, practical solutions for problems which can arise when developing ORAAH mapreduce applications for the Oracle BDA. These problems were actually encountered during a recent POC engagement. The customer, an  important player in the medical technologies market, was interested in building an analysis flow consisting of a sequence of data manipulation and transformation steps followed by multiple model generation. The data preparation included multiple types of merging, filtering, variable generation based on complex search patterns and represented, by far, the most time consuming component of the flow. The original implementation on the customer's hardware required multiple days per flow to complete. Our ORAAH mapreduce based implementation running on a X5-2 Starter Rack BDA reduced that time to between 4-20 minutes, depending on which flow was tested.

The points which will be addressed in this blog are related to the fact that the data preparation was structured as a chain of task where each tasks performed transformations on HDFS data generated by one or multiple upstream tasks. More precisely we will consider the:


  • Merging of HDFS data from multiple sources

  • Re-balancing and parts reduction for HDFS data

  • Getting unique levels for categorical variables from HDFS data

  • Partitioning the data for distributed mapreduce execution


'Merging data' from above is to be understood as row binding of multiple tables. Re-balancing and parts reduction addresses the fact the HDFS data (generated by upstream jobs) may consist of very unequal parts (chunks) - this would lead to performance losses when this data further processed by other mapreduce jobs. The 3rd and 4th items are related. Getting the unique levels of categorical variables was useful for the data partitioning process, namely for how to generate the key-values pairs within the mapper functions.

1. Merging of hdfs data from multiple sources


The practical case here is that of a data transformation task for which the input consists of several, similarly structured HDFS data sets. As a reminder, data in HDFS is stored as a collection of flat files/chunks (part-00000, part-00001, etc) under an HDFS directory and the hdfs.* functions access the directory, not the 'part-xxxxx' chunks. Also the hadoop.run()/hadoop.exec().* functions work with single input data objects (HDFS object identifier representing a directory in HDFS); R rbind, cbind, merge, etc operations cannot be invoked within mapreduce to bind two or several large tables.

For the case under consideration, each input (dataA_dfs, dataB_dfs, etc) consists of a different number of files/chunks


R> hdfs.ls("dataA_dfs")
[1] "__ORCHMETA__" "part-00000" "part-00001" .... "part-00071"
R> hdfs.ls("dataB_dfs")
[1] "__ORCHMETA__" "part-00000" "part-00001" .... "part-00035"


corresponding to the number of reducers used by the upstream mapreduce jobs which generated this data. As these multiple chunks from various HDFS directories need to be processed as a single input data, they need to be moved into a unique HDFS directory. The 'merge_hdfs_data' function below does just that, by creating a new HDFS directory and copying all the part-xxxxx from each source directory  with proper updating of the resulting parts numbering. :

R> merge_hdfs_data <- function(SrcDirs,TrgtDir) {
  #cat(sprintf("merge_hdfs_files : Creating %s ...\n",TrgtDir))
  hdfs.mkdir(TrgtDir,overwrite=TRUE)
  i <- 0
  for (srcD in SrcDirs) {
    fparts <- hdfs.ls(get(srcD),pattern="part")
    srcd <- (hdfs.describe(get(srcD)))[1,2]
    for (fpart in fparts) {
      #cat(sprintf("merge_hdfs_files : Copying %s/%s to %s ...\n",

                       srcD,fpart,TrgtDir))
      i <- i+1
      hdfs.cp(paste(srcd,fpart,sep="/"),sprintf("%s/part-%05d",TrgtDir,i))
    }
  }
}


Merging of the dataA_dfs and dataB_dfs directories into a new data_merged_dfs directory is achieved through:

R> merge_hdfs_data(c("dataA_dfs","dataB_dfs"),"data_merged_dfs")

2. Data re-balancing / Reduction of the number of parts


Data stored in HDFS can suffer from two key problems that will affect performance: too many small files and files with very different numbers of records, especially those with very few records. The merged data produced by the function  above consists of a number of files equal to the sum of all files from all input HDFS directories. Since the upstream mapeduce jobs generating the inputs were run with a high number of reducers (for faster execution) the resulting total number of files got large (100+). This created an impractical constraint for the subsequent analysis as one cannot run a mapreduce application with a number of mappers less than the number of parts (the reverse is true, hdfs parts are splittable for processing by multiple mappers). Moreover if the parts have very different number of records the performance of the application will be affected since different mappers will handle very different volumes of data.

The rebalance_data function below represents a simple way of addressing these issues. Every mapper splits its portion of the data into a user-defined number of parts (nparts) containing quasi the same number of records. A key is associated with each part. In this implementation the number of reducers is set to the number of parts. After shuffling each reducer will collect the records corresponding to one particular key and write them to the output. The overall output consists of nparts parts with quasi equal size. A basic mechanism for preserving the data types is illustrated (see the map.output and reduce.output constructs below).

R> rebalance_data <- function(HdfsData,nmap,nparts)
{
  mapper_func <- function(k,v) {
    nlin <- nrow(v)
    if(nlin>0) {
      idx.seq <- seq(1,nlin)
      kk <- ceiling(idx.seq/(nlin/nparts))
      orch.keyvals(kk,v)
    }
  }
  reducer_func <- function(k,v) {
    if (nrow(v) > 0) { orch.keyvals(k=NULL,v) }
  }
  dtypes.out <- sapply(hdfs.meta(HdfsData)$types,
                       function(x) ifelse(x=="character","\"a\"",
                                          ifelse(x=="logical","FALSE","0")))
  val.str <- paste0(hdfs.meta(HdfsData)$names,"=",dtypes.out,collapse=",")
  meta.map.str <- sprintf("data.frame(key=0,%s)",val.str)
  meta.red.str <- sprintf("data.frame(key=NA,%s)",val.str)

  config <- new("mapred.config",
                job.name      = "rebalance_data",
                map.output    = eval(parse(text=meta.map.str)),
                reduce.output = eval(parse(text=meta.red.str)),
                map.tasks     = nmap,
                reduce.tasks  = nparts)
                reduce.split  = 1e5)
  res <- hadoop.run(data = HdfsData,
                    mapper = mapper_func,
                    reducer = reducer_func,
                    config = config,
                    cleanup = TRUE
  )
  res
}

Before using this function, the data associated with the new data_merged_dfs directory needs to be attached to the ORAAH framework:

R> data_merged_dfs <- hdfs.attach("data_merged_dfs")

The invocation below uses 144 mappers for splitting the data into 4 parts:

R> x <- rebalance_data(data_merged_dfs,nmap=144,nparts=4)


The user may also want to save the resulting object, permanently, under some convenient/recognizable name like 'data_rebalanced_dfs' for example. The path to the temporary object x is retrieved with the hdfs.describe() command and provided as first argument to the hdfs.cp() command.

R> tmp_dfs_name <- hdfs.describe(x)[1,2]
R> hdfs.cp(tmp_dfs_name,"data_rebalanced_dfs",overwrite=TRUE)

The choice of the number of parts is up to the user. It is better to have a few parts to avoid constraining from below the number of mappers for the downstream runs but one should consider other factors like the read/write performance related to the size of the data sets, the HDFS block size, etc which are not the topic of the present blog.

3. Getting unique levels


Determining the unique levels of categorical variables in a dataset is of basic interest for any data exploration procedure. If the data is distributed in HDFS, this determination requires an appropriate solution. For the application under consideration here, getting the unique levels serves another purpose; the unique levels are used to generate data splits better suited for distributed execution by the downstream mapreduce jobs. More details are available in the next section.

Depending on the categorical variables in question and data charactersitics, the determination of unique levels may require different solutions. The implementation below is a generic solution providing these levels for multiple variables bundled together in the input argument 'cols'. The mappers associate a key with each variable and collect the unique levels for each of these variables. The resulting array of values are packed in text stream friendly format and provided as value argument to orch.keyvals() - in this way complex data types can be safely passed between the mappers and reducers (via text-based Hadoop streams). The reducers unpack the strings, retrieve the all values associated with a particular key (variable) and re-calculate the unique levels accounting now for all values of that variable.

R> get_unique_levels <- function(x, cols, nmap, nred) {
  mapper <- function(k, v) {
    for (col in cols) {
      uvals <- unique(v[[col]])
      orch.keyvals(col, orch.pack(uvals))
    }
  }
  reducer <- function(k, v) {
    lvals <- orch.unpack(v$val)
    uvals <- unique(unlist(lvals))
    orch.keyval(k, orch.pack(uvals))
  }
  config <- new("mapred.config",
                job.name      = "get_unique_levls",
                map.output    = data.frame(key="a",val="packed"),
                reduce.output = data.frame(key="a",val="packed"),
                map.tasks     = nmap,
                reduce.tasks  = nred,
  )
  res <- hadoop.run(data = x,
                    mapper = mapper,
                    reducer = reducer,
                    config = config,
                    export = orch.export(cols=cols))
  resl <- (lapply((hdfs.get(res))$val,function(x){orch.unpack(x)}))[[1]]
}

This implementation works fine provided that the number of levels for the categorical variables are much smaller than the large number of records of the entire data. If some categorical variables have many levels, not far  from order of the total number of records, each mapper may return a large numbers of levels and each reducer may have to handle multiple large objects. An efficient solution for this case requires a different approach. However, if the column associated with one of these variables can  fit in memory, a direct, very crude calculation like below can run faster than the former implementation. Here the mappers extract the column with the values of the variable in question, the column is pulled into an in-memory object and unique() is called to determine the unique levels.

R> get_unique_levels_sngl <- function(HdfsData,col,nmap)
{
  mapper_fun <- function(k,v) { orch.keyvals(key=NULL,v[[col]]) }
  config <- new("mapred.config",
                job.name      = "extract_col",
                map.output    = data.frame(key=NA,VAL=0),
                map.tasks     = nmap)
    x <- hadoop.run(data=HdfsData,
                    mapper=mapper_fun,
                    config=config,
                    export=orch.export(col=col),
                    cleanup=TRUE)
  xl <- hdfs.get(x)
  res <- unique(xl$VAL)
}

R> customers <- get_unique_levls_sngl(data_rebalanced_dfs,"CID",nmap=32)

We obtained thus the unique levels of the categorical variable CID (customer id) from our data_balanced_dfs data.

4. Partitioning the data for mapreduce execution


Let's suppose that the user wants to execute some specific data manipulations at the CID level like aggregations, variable transformations or new variables generation, etc. Associating a key with every customer (CID level) would be a bad idea since there are many customers - our hypothesis was that the number of CID levels is not orders of magnitude below the total number of records. This would lead to an excessive number of reducers with a terrible impact on performance. In such case it would be better, for example, to bag customers into groups and distribute the execution at the group level. The user may want to set the number of this groups ngrp to something commensurate with the number of  BDA cores available for parallelizing the task.

The example below illustrates how to do that at a basic level. The groups are generated within the encapsulating function myMRjob, before the hadoop.run() execution - the var.grp dataframe has two columns : the CID levels and the group number (from 1 to ngrp) with which they are associated. This table is passed to the hadoop execution environment via orch.export() within hadoop.run(). The mapper_fun function extracts the group number as key and inserts the multiple key-values pairs into the output buffer. The reducer gets then a complete set of records for every customer associated with a particular key(group) and can proceed with the transformations/ manipulations within a loop-over-customers or whatever programming construct would be appropriate. Each reducer would handle a quasi-equal number of customers because this is how the groups were generated. However the number of records per customer is not constant and may introduce some imbalances.

R> myMRjob <- function(HdfsData,var,ngrp,nmap,nred)
{
  mapper_fun <- function(k,v) {
    ....
    fltr <- <some_row_filetring>
    cID <- which(names(v) %in% "CUSTOMID")
    kk <- var.grps[match(v[fltr,cID],var.grps$CUSTOMID),2]
    orch.keyvals(kk,v[fltr,,drop=FALSE])
  }
  reducer_fun <- function(k,v) { ... }
  config <- new("mapred.config", map.tasks = nmap, reduce.tasks = nred,....)

  var.grps <- data.frame(CUSTOMID=var,
    GRP=rep(1:ngrp,sapply(split(var,ceiling(seq_along(var)/(length(var)/ngrp))),length)))

  res <- hadoop.run(data = HdfsData,
                    mapper = mapper_fun,
                    reducer = reducer_fun,
                    config = config,
                    export = orch.export(var.grps=var.grps,ngrp=ngrp),
                    cleanup = TRUE
  )
  res
}

x <- myMRjob(HdfsData=data_balanced_dfs, var=customers, ngrp=..,nmap=..,nred=..)

Improved data partitioning solutions could be sought for the cases where there are strong imbalances in the number of records per customer or if great variations are noticed between the reducer jobs completion times. This kind of optimization will be addressed in a later blog.

Monday May 19, 2014

Model cross-validation with ore.CV()

In this blog post we illustrate how to use Oracle R Enterprise for performing cross-validation of regression and classification models. We describe a new utility R function ore.CV that leverages features of Oracle R Enterprise and is available for download and use.

Predictive models are usually built on given data and verified on held-aside or unseen data. Cross-validation is a model improvement technique that avoids the limitations of a single train-and-test experiment by building and testing multiple models via repeated sampling from the available data. It's purpose is to offer a better insight into how well the model would generalize to new data and avoid over-fitting and deriving wrong conclusions from misleading peculiarities of the seen data.

In a k-fold cross-validation the data is partitioned into k (roughly) equal size subsets. One of the subsets is retained for testing and the remaining k-1 subsets are used for training. The process is repeated k times with each of the k subsets serving exactly once as testing data. Thus, all observations in the original data set are used for both training and testing.

The choice of k depends, in practice on the size n of the data set. For large data, k=3 could be sufficient. For very small data, the extreme case where k=n, leave-one-out cross-validation (LOOCV) would use a single observation from the original sample as testing data and the remaining observations as training data. Common choices are k=10 or k=5.

For a select set of algorithms and cases, the function ore.CV performs cross-validation for models generated by ORE regression and classification functions using in-databse data. ORE embedded R execution is leveraged to support cross-validation also for models built with vanilla R functions.

Usage

ore.CV(funType, function, formula, dataset, nFolds=<nb.folds>, fun.args=NULL, pred.args=NULL, pckg.lst=NULL)
  • funType - "regression" or "classification"
  • function - ORE predictive modeling functions for regression & classification or R function (regression only)
  • formula - object of class "formula"
  • dataset - name of the ore.frame
  • nFolds - number of folds
  • fun.args - list of supplementary arguments for 'function'
  • pred.args - list of supplementary arguments for 'predict'. Must be consistent with the model object/model generator 'function'.
  • pckg.lst - list of packages to be loaded by the DB R engine for embedded execution.
The set of functions supported for ORE include:
  • ore.lm
  • ore.stepwise
  • ore.neural
  • ore.glm
  • ore.odmDT
  • ore.odmSVM
  • ore.odmGLM
  • ore.odmNB
The set of functions supported for R include:
  • lm
  • glm
  • svm
Note: The 'ggplot' and 'reshape' packages are required on the R client side for data post-processing and plotting (classification CV).

Examples

In the following examples, we illustrate various ways to invoke ore.CV using some datasets we have seen in previous posts. The datasets can be created as ore.frame objects using:
 
IRIS <- ore.push(iris)
LONGLEY <- ore.push(longley)
library(rpart)
KYPHOSIS <- ore.push(kyphosis)
library(PASWR)
TITANIC3 <- ore.push(titanic3)
MTCARS <- pore.push(mtcars)
(A) Cross-validation for models generated with ORE functions.
 
# Basic specification
ore.CV("regression","ore.lm",Sepal.Length~.-Species,"IRIS",nFolds=5)
ore.CV("regression","ore.neural",Employed~GNP+Population+Year,
            "LONGLEY",nFolds=5)

#Specification of function arguments
ore.CV("regression","ore.stepwise",Employed~.,"LONGLEY",nFolds=5,
            fun.args= list(add.p=0.15,drop.p=0.15))
ore.CV("regression","ore.odmSVM",Employed~GNP+Population+Year,
             "LONGLEY",nFolds=5, fun.args="regression")

#Specification of function arguments and prediction arguments
ore.CV("classification","ore.glm",Kyphosis~.,"KYPHOSIS",nFolds=5,
             fun.args=list(family=binomial()),pred.args=list(type="response"))
ore.CV("classification","ore.odmGLM",Kyphosis~.,"KYPHOSIS",nFolds=5,
            fun.args= list(type="logistic"),pred.args=list(type="response"))
 
(B) Cross-validation for models generated with R functions via the ORE embedded execution mechanism.

ore.CV("regression","lm",mpg~cyl+disp+hp+drat+wt+qsec,"MTCARS",nFolds=3)
ore.CV("regression","svm",Sepal.Length~.-Species,"IRIS",nFolds=5,
             fun.args=list(type="eps-regression"), pckg.lst=c("e1071")) 


Restrictions

  • The signature of the model generator ‘function’ must be of the following type: function(formula,data,...). For example, functions like, ore.stepwise, ore.odmGLM and lm are supported but the R step(object,scope,...) function for AIC model selection via the stepwise algorithm, does not satisfy this requirement.
  • The model validation process requires the prediction function to return a (1-dimensional) vector with the predicted values. If the (default) returned object is different the requirement must be met by providing an appropriate argument through ‘pred.args’. For example, for classification with ore.glm or ore.odmGLM the user should specify pred.args=list(type="response").
  • Cross-validation of classification models via embedded R execution of vanilla R functions is not supported yet.
  • Remark: Cross-validation is not a technique intended for large data as the cost of multiple model training and testing can become prohibitive. Moreover, with large data sets, it is possible to effectively produce an effective sampled train and test data set. The current ore.CV does not impose any restrictions on the size of the input and the user working with large data should use good judgment when choosing the model generator and the number of folds.

    Output

    The function ore.CV provides output on several levels: datastores to contain model results, plots, and text output.

    Datastores

    The results of each cross-validation run are saved into a datastore named dsCV_funTyp_data_Target_function_nFxx where funTyp, function, nF(=nFolds) have been described above and Target is the left-hand-side of the formula. For example, if one runs the ore.neural, ore.glm, and ore.odmNB-based cross-validation examples from above, the following three datastores are produced:
    
    R> ds <- ore.datastore(pattern="dsCV")
    R> print(ds)
    datastore.name object.count size creation.date description
    1 dsCV_classification_KYPHOSIS_Kyphosis_ore.glm_nF5 104480326 2014-04-30 18:19:55 <NA>
    2 dsCV_classification_TITANIC3_survived_ore.odmNB_nF5 10 592083 2014-04-30 18:21:35 <NA>
    3 dsCV_regression_LONGLEY_Employed_ore.neural_nF5 10 497204 2014-04-30 18:16:35 <NA>
    
    Each datastore contains the models and prediction tables for every fold. Every prediction table has 3 columns: the fold index together with the target variable/class and the predicted values. If we consider the example from above and examine the most recent datastore (the Naive Bayes classification CV), we would see:
    
    R> ds.last <- ds$datastore.name[which.max(as.numeric(ds$creation.date))]
    R> ore.datastoreSummary(name=ds.last)
    object.name class size length row.count col.count
    1 model.fold1 ore.odmNB 66138 9 NA NA
    2 model.fold2 ore.odmNB 88475 9 NA NA
    3 model.fold3 ore.odmNB 110598 9 NA NA
    4 model.fold4 ore.odmNB 133051 9 NA NA
    5 model.fold5 ore.odmNB 155366 9 NA NA
    6 test.fold1 ore.frame 7691 3 261 3
    7 test.fold2 ore.frame 7691 3 262 3
    8 test.fold3 ore.frame 7691 3 262 3
    9 test.fold4 ore.frame 7691 3 262 3
    10 test.fold5 ore.frame 7691 3 262 3
    
    

    Plots

    The following plots are generated automatically by ore.CV and saved in an automatically generated OUTPUT directory:

  • Regression: ore.CV compares predicted vs target values, root mean square error (RMSE) and relative error (RERR) boxplots per fold. The example below is based on 5-fold cross-validation with the ore.lm regression model for Sepal.Length ~.-Species using the ore.frame IRIS dataset.
  • Classification : ore.CV outputs a multi plot figure for classification metrics like Precision, Recall and F-measure. Each metrics is captured per target class (side-by-side barplots) and fold (groups of barplots). The example below is based on the 5-folds CV of the ore.odmSVM classification model for Species ~. using the ore.frame IRIS dataset.
  • Text output
    For classification problems, the confusion tables for each fold are saved in an ouput file residing in the OUTPUT directory together with a summary table displaying the precision, recall and F-measure metrics for every fold and predicted class.
    file.show("OUTDIR/tbl_CV_classification_IRIS_Species_ore.odmSVM_nF5")
    
    Confusion table for fold 1 :           
                 setosa versicolor virginica
      setosa          9          0         0
      versicolor      0         12         1
      virginica       0          1         7
    Confusion table for fold 2 :            
                 setosa versicolor virginica
      setosa          9          0         0
      versicolor      0          8         1
      virginica       0          2        10
    Confusion table for fold 3 :           
                 setosa versicolor virginica
      setosa         11          0         0
      versicolor      0         10         2
      virginica       0          0         7
    Confusion table for fold 4 :            
                 setosa versicolor virginica
      setosa          9          0         0
      versicolor      0         10         0
      virginica       0          2         9
    Confusion table for fold 5 :            
                 setosa versicolor virginica
      setosa         12          0         0
      versicolor      0          5         1
      virginica       0          0        12
    Accuracy, Recall & F-measure table per {class,fold}
       fold      class TP  m  n Precision Recall F_meas
    1     1     setosa  9  9  9     1.000  1.000  1.000
    2     1 versicolor 12 13 13     0.923  0.923  0.923
    3     1  virginica  7  8  8     0.875  0.875  0.875
    4     2     setosa  9  9  9     1.000  1.000  1.000
    5     2 versicolor  8  9 10     0.889  0.800  0.842
    6     2  virginica 10 12 11     0.833  0.909  0.870
    7     3     setosa 11 11 11     1.000  1.000  1.000
    8     3 versicolor 10 12 10     0.833  1.000  0.909
    9     3  virginica  7  7  9     1.000  0.778  0.875
    10    4     setosa  9  9  9     1.000  1.000  1.000
    11    4 versicolor 10 10 12     1.000  0.833  0.909
    12    4  virginica  9 11  9     0.818  1.000  0.900
    13    5     setosa 12 12 12     1.000  1.000  1.000
    14    5 versicolor  5  6  5     0.833  1.000  0.909
    15    5  virginica 12 12 13     1.000  0.923  0.960
    
              
    What's next
    Several extensions of ore.CV are possible involving sampling, parallel model training and testing, support for vanilla R classifiers, post-processing and output. More material for future posts.
    About

    The place for best practices, tips, and tricks for applying Oracle R Enterprise, Oracle R Distribution, ROracle, and Oracle R Advanced Analytics for Hadoop in both traditional and Big Data environments.

    Search

    Categories
    Archives
    « May 2015
    SunMonTueWedThuFriSat
         
    1
    2
    3
    4
    5
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
          
    Today