## Friday Feb 05, 2016

### Using SVD for Dimensionality Reduction

SVD, or Singular Value Decomposition, is one of several techniques that can be used to reduce the dimensionality, i.e., the number of columns, of a data set. Why would we want to reduce the number of dimensions? In predictive analytics, more columns normally means more time required to build models and score data. If some columns have no predictive value, this means wasted time, or worse, those columns contribute noise to the model and reduce model quality or predictive accuracy.

Dimensionality reduction can be achieved by simply dropping columns, for example, those that may show up as collinear with others or identified as not being particularly predictive of the target as determined by an attribute importance ranking technique. But it can also be achieved by deriving new columns based on linear combinations of the original columns. In both cases, the resulting transformed data set can be provided to machine learning algorithms to yield faster model build times, faster scoring times, and more accurate models.

While SVD can be used for dimensionality reduction, it is often used in digital signal processing for noise reduction, image compression, and other areas.

SVD is an algorithm that factors an m x n matrix, M, of real or complex values into three component matrices, where the factorization has the form USV*. U is an m x p matrix. S is a p x p diagonal matrix. V is an n x p matrix, with V* being the transpose of V, a p x n matrix, or the conjugate transpose if M contains complex values. The value p is called the rank. The diagonal entries of S are referred to as the singular values of M. The columns of U are typically called the left-singular vectors of M, and the columns of V are called the right-singular vectors of M.

Consider the following visual representation of these matrices:

One of the features of SVD is that given the decomposition of M into U, S, and V, one can reconstruct the original matrix M, or an approximation of it. The singular values in the diagonal matrix S can be used to understand the amount of variance explained by each of the singular vectors. In R, this can be achieved using the computation:

cumsum(S^2/sum(S^2))

When plotted, this provides a visual understanding of the variance captured by the model. The figure below indicates that the first singular vector accounts for 96.5% of the variance, the second with the first accounts for over 99.5%, and so on.

As such, we can use this information to limit the number of vectors to the amount of variance we wish to capture. Reducing the number of vectors can help eliminate noise in the original data set when that data set is reconstructed using the subcomponents of U, S, and V.

ORE’s parallel, distributed SVD

With Oracle R Enterprise’s parallel distributed implementation of R’s svd function, only the S and V components are returned. More specifically, the diagonal singular values are returned of S as the vector d. If we store the result of invoking svd on matrix dat in svd.mod, U can be derived from these using M as follows:

svd.mod <- svd(dat)
U <- dat %*% svd.mod\$v %*% diag(1./svd.mod\$d)

So, how do we achieve dimensionality reduction using SVD? We can use the first k columns of V and S and achieve U’ with fewer columns.

U.reduced <-dat %*% svd.mod\$v[,1:k,drop=FALSE] %*% diag((svd.mod\$d)[1:k,drop=FALSE])

This reduced U can now be used as a proxy for matrix dat with fewer columns.

The function dimReduce introduced below accepts a matrix x, the number of columns desired k, and a request for any supplemental columns to return with the transformed matrix.

dimReduce <- function(x, k=floor(ncol(x)/2), supplemental.cols=NULL) {
colIdxs <- which(colnames(x) %in% supplemental.cols)
colNames <- names(x[,-colIdxs])
sol <- svd(x[,-colIdxs])
sol.U <- as.matrix(x[,-colIdxs]) %*% (sol\$v)[,1:k,drop=FALSE] %*%
diag((sol\$d)[1:k,drop=FALSE])
sol.U = sol.U@data
res <- cbind(sol.U,x[,colIdxs,drop=FALSE])
names(res) <- c(names(sol.U@data),names(x[,colIdxs]))
res
}

We will now use this function to reduce the iris data set.

To prepare the iris data set, we first add a unique identifier, create the database table IRIS2 in the database, and then assign row names to enable row indexing. We could also make ID the primary key using ore.exec with the ALTER TABLE statement. Refreshing the ore.frame proxy object using ore.sync reflects the change in primary key.

dat <- iris
dat\$ID <- seq_len(nrow(dat))
ore.drop("IRIS2")
ore.create(dat,table="IRIS2")
row.names(IRIS2) <- IRIS2\$ID
# ore.exec("alter table IRIS2 add constraint IRIS2 primary key (\"ID\")")
# ore.sync(table = "IRIS2", use.keys = TRUE)
IRIS2[1:5,]

Using the function defined above, dimReduce, we produce IRIS2.reduced with supplemental columns of ID and Species. This allows us to easily generate a confusion matrix later. You will find that IRIS2.reduced has 4 columns.

IRIS2.reduced <- dimReduce(IRIS2, 2, supplemental.cols=c("ID","Species"))
dim(IRIS2.reduced) # 150 4

Next, we will build an rpart model to predict Species using first the original iris data set, and then the reduced data set so we can compare the confusion matrices of each. Note that to use R's rpart for model building, the data set IRIS2.reduced is pulled to the client.

library(rpart)
m1 <- rpart(Species~.,iris)
res1 <- predict(m1,iris,type="class")
table(res1,iris\$Species)
#res1 setosa versicolor virginica
# setosa 50 0 0
# versicolor 0 49 5
# virginica 0 1 45

dat2 <- ore.pull(IRIS2.reduced)
m2 <- rpart(Species~.-ID,dat2)
res2 <- predict(m2,dat2,type="class")
table(res2,iris\$Species)
# res2 setosa versicolor virginica
# setosa 50 0 0
# versicolor 0 47 0
# virginica 0 3 50

Notice that the resulting models are comparable, but that the model that used IRIS2.reduced actually has better overall accuracy, making just 3 mistakes instead of 6. Of course, a more accurate assessment of error would be to use cross validation, however, this is left as an exercise for the reader.

We can build a similar model using the in-database decision tree algorithm, via ore.odmDT, and get the same results on this particular data set.

m2.1 <- ore.odmDT(Species~.-ID, IRIS2.reduced)
res2.1 <- predict(m2.1,IRIS2.reduced,type="class",supplemental.cols = "Species")
table(res2.1\$PREDICTION, res2.1\$Species)
# res2 setosa versicolor virginica
# setosa 50 0 0
# versicolor 0 47 0
# virginica 0 3 50

A more interesting example is based on the digit-recognizer data which can be located on the Kaggle website here. In this example, we first use Support Vector Machine as the algorithm with default parameters on split train and test samples of the original training data. This allows us to get an objective assessment of model accuracy. Then, we preprocess the train and test sets using the in-database SVD algorithm and reduce the original 785 predictors to 40. The reduced number of variables specified is subject to experimentation. Degree of parallelism for SVD was set to 4.

The results highlight that reducing data dimensionality can improve overall model accuracy, and that overall execution time can be significantly faster. Specifically, using ore.odmSVM for model building saw a 43% time reduction and a 4.2% increase in accuracy by preprocessing the train and test data using SVD.

However, it should be noted that not all algorithms are necessarily aided by dimensionality reduction with SVD. In a second test on the same data using ore.odmRandomForest with 25 trees and defaults for other settings, accuracy of 95.3% was achieved using the original train and test sets. With the SVD reduced train and test sets, accuracy was 93.7%. While the model building time was reduced by 80% and scoring time reduced by 54%, if we factor in the SVD execution time, however, using the straight random forest algorithm does better by a factor of two.

Details

For this scenario, we modify the dimReduce function introduced above and add another function dimReduceApply. In dimReduce, we save the model in an ORE Datastore so that the same model can be used to transform the test data set for scoring. In dimReduceApply, that same model is loaded for use in constructing the reduced U matrix.

dimReduce <- function(x, k=floor(ncol(x)/2), supplemental.cols=NULL, dsname="svd.model") {
colIdxs <- which(colnames(x) %in% supplemental.cols)
if (length(colIdxs) > 0) {
sol <- svd(x[,-colIdxs])
sol.U <- as.matrix(x[,-colIdxs]) %*% (sol\$v)[,1:k,drop=FALSE] %*%
diag((sol\$d)[1:k,drop=FALSE])
res <- cbind(sol.U@data,x[,colIdxs,drop=FALSE])
# names(res) <- c(names(sol.U@data),names(x[,colIdxs]))
res
} else {
sol <- svd(x)
sol.U <- as.matrix(x) %*% (sol\$v)[,1:k,drop=FALSE] %*%
diag((sol\$d)[1:k,drop=FALSE])
res <- sol.U@data
}
ore.save(sol, name=dsname, overwrite=TRUE)
res
}

dimReduceApply <- function(x, k=floor(ncol(x)/2), supplemental.cols=NULL, dsname="svd.model") {
colIdxs <- which(colnames(x) %in% supplemental.cols)
if (length(colIdxs) > 0) {
sol.U <- as.matrix(x[,-colIdxs]) %*% (sol\$v)[,1:k,drop=FALSE] %*%
diag((sol\$d)[1:k,drop=FALSE])
res <- cbind(sol.U@data,x[,colIdxs,drop=FALSE])
# names(res) <- c(names(sol.U@data),names(x[,colIdxs]))
res
} else {
sol.U <- as.matrix(x) %*% (sol\$v)[,1:k,drop=FALSE] %*%
diag((sol\$d)[1:k,drop=FALSE])
res <- sol.U@data
}
res
}

Here is the script used for the digit data:

dim(train) # 42000 786

train\$ID <- 1:nrow(train) # assign row id
ore.drop(table="DIGIT_TRAIN")
ore.create(train,table="DIGIT_TRAIN") # create as table in the database
dim(DIGIT_TRAIN) # 42000 786

# Split the original training data into train and
# test sets to evaluate model accuracy
set.seed(0)
dt <- DIGIT_TRAIN
ind <- sample(1:nrow(dt),nrow(dt)*.6)
group <- as.integer(1:nrow(dt) %in% ind)

row.names(dt) <- dt\$ID
sample.train <- dt[group==TRUE,]
sample.test <- dt[group==FALSE,]
dim(sample.train) # 25200 786
dim(sample.test) # 16800 786
# Create train table in database
ore.create(sample.train, table="DIGIT_SAMPLE_TRAIN")
# Create test table in database
ore.create(sample.test, table="DIGIT_SAMPLE_TEST")

# Add persistent primary key for row indexing
# Note: could be done using row.names(DIGIT_SAMPLE_TRAIN) <- DIGIT_SAMPLE_TRAIN\$ID
DIGIT_SAMPLE_TRAIN primary key (\"ID\")")
DIGIT_SAMPLE_TEST primary key (\"ID\")")
ore.sync(table = c("DIGIT_SAMPLE_TRAIN","DIGIT_SAMPLE_TRAIN"), use.keys = TRUE)

# SVM model
m1.svm <- ore.odmSVM(label~.-ID, DIGIT_SAMPLE_TRAIN, type="classification")
pred.svm <- predict(m1.svm, DIGIT_SAMPLE_TEST,
supplemental.cols=c("ID","label"),type="class")
cm <- with(pred.svm, table(label,PREDICTION))

library(caret)
confusionMatrix(cm)
# Confusion Matrix and Statistics
#
# PREDICTION
# label 0 1 2 3 4 5 6 7 8 9
# 0 1633 0 4 2 3 9 16 2 7 0
# 1 0 1855 12 3 2 5 4 2 23 3
# 2 9 11 1445 22 26 8 22 30 46 10
# 3 8 9 57 1513 2 57 16 16 41 15
# 4 5 9 10 0 1508 0 10 4 14 85
# 5 24 12 14 52 28 1314 26 6 49 34
# 6 10 2 7 1 8 26 1603 0 6 0
# 7 10 8 27 4 21 8 1 1616 4 70
# 8 12 45 14 40 7 47 13 10 1377 30
# 9 12 10 6 19 41 15 2 54 15 1447
#
# Overall Statistics
#
# Accuracy : 0.9114
# 95% CI : (0.907, 0.9156)
# No Information Rate : 0.1167
# P-Value [Acc > NIR] : < 2.2e-16
#...

options(ore.parallel=4)
sample.train.reduced <- dimReduce(DIGIT_SAMPLE_TRAIN, 40, supplemental.cols=c("ID","label"))
sample.test.reduced <- dimReduceApply(DIGIT_SAMPLE_TEST, 40, supplemental.cols=c("ID","label"))
ore.drop(table="DIGIT_SAMPLE_TRAIN_REDUCED")
ore.create(sample.train.reduced,table="DIGIT_SAMPLE_TRAIN_REDUCED")
ore.drop(table="DIGIT_SAMPLE_TEST_REDUCED")
ore.create(sample.test.reduced,table="DIGIT_SAMPLE_TEST_REDUCED")

m2.svm <- ore.odmSVM(label~.-ID,
DIGIT_SAMPLE_TRAIN_REDUCED, type="classification")
pred2.svm <- predict(m2.svm, DIGIT_SAMPLE_TEST_REDUCED,
supplemental.cols=c("label"),type="class")
cm <- with(pred2.svm, table(label,PREDICTION))
confusionMatrix(cm)
# Confusion Matrix and Statistics
#
# PREDICTION
# label 0 1 2 3 4 5 6 7 8 9
# 0 1652 0 3 3 2 7 4 1 3 1
# 1 0 1887 8 2 2 1 1 3 3 2
# 2 3 4 1526 11 20 3 7 21 27 7
# 3 0 3 29 1595 3 38 4 16 34 12
# 4 0 4 8 0 1555 2 11 5 9 51
# 5 5 6 2 31 6 1464 13 6 10 16
# 6 2 1 5 0 5 18 1627 0 5 0
# 7 2 6 22 7 10 2 0 1666 8 46
# 8 3 9 9 34 7 21 9 7 1483 13
# 9 5 2 8 17 30 10 3 31 20 1495
#
# Overall Statistics
#
# Accuracy : 0.9494
# 95% CI : (0.946, 0.9527)
# No Information Rate : 0.1144
# P-Value [Acc > NIR] : < 2.2e-16
#...

# CASE 2 with Random Forest
m2.rf <- ore.randomForest(label~.-ID, DIGIT_SAMPLE_TRAIN,ntree=25)
pred2.rf <- predict(m2.rf, DIGIT_SAMPLE_TEST, supplemental.cols=c("label"),type="response")
cm <- with(pred2.rf, table(label,prediction))
confusionMatrix(cm)
# Confusion Matrix and Statistics
#
# prediction
# label 0 1 2 3 4 5 6 7 8 9
# 0 1655 0 1 1 2 0 7 0 9 1
# 1 0 1876 12 8 2 1 1 2 6 1
# 2 7 4 1552 14 10 2 5 22 10 3
# 3 9 5 33 1604 1 21 4 16 27 14
# 4 1 4 3 0 1577 1 9 3 3 44
# 5 9 6 2 46 3 1455 18 1 9 10
# 6 13 2 3 0 6 14 1621 0 3 1
# 7 1 6 31 5 16 3 0 1675 3 29
# 8 3 7 15 31 11 20 8 4 1476 20
# 9 9 2 7 23 32 5 1 15 12 1515
#
# Overall Statistics
#
# Accuracy : 0.9527
# 95% CI : (0.9494, 0.9559)
# No Information Rate : 0.1138
# P-Value [Acc > NIR] : < 2.2e-16
#...

m1.rf <- ore.randomForest(label~.-ID, DIGIT_SAMPLE_TRAIN_REDUCED,ntree=25)
pred1.rf <- predict(m1.rf, DIGIT_SAMPLE_TEST_REDUCED,
supplemental.cols=c("label"),type="response")
cm <- with(pred1.rf, table(label,prediction))
confusionMatrix(cm)
# Confusion Matrix and Statistics
#
# prediction
# label 0 1 2 3 4 5 6 7 8 9
# 0 1630 0 4 5 2 8 16 3 5 3
# 1 0 1874 17 4 0 5 2 2 4 1
# 2 15 2 1528 17 10 5 10 21 16 5
# 3 7 1 32 1601 4 25 10 8 34 12
# 4 2 6 6 3 1543 2 17 4 4 58
# 5 9 1 5 45 12 1443 11 3 15 15
# 6 21 3 8 0 5 15 1604 0 7 0
# 7 5 11 33 7 17 6 1 1649 2 38
# 8 5 13 27 57 14 27 9 12 1404 27
# 9 10 2 6 22 52 8 5 41 12 1463
#
# Overall Statistics
#
# Accuracy : 0.9368
# 95% CI : (0.9331, 0.9405)
# No Information Rate : 0.1139
# P-Value [Acc > NIR] : < 2.2e-16
#...

Execution Times

The following numbers reflect the execution times for select operations of the above script. Hardware was a Lenovo Thinkpad with Intel i5 processor and 16 GB RAM.

## Thursday Sep 10, 2015

### Consolidating wide and shallow data with ORE Datastore

Clinical trial data are often characterized by a relatively small set of participants (100s or 1000s) while the data collected and analyzed on each may be significantly larger (1000s or 10,000s). Genomic data alone can easily reach the higher end of this range. In talking with industry leaders, one of the problems pharmaceutical companies and research hospitals encounter is effectively managing such data. Storing data in flat files on myriad servers, perhaps even “closeted” when no longer actively needed, poses problems for data accessibility, backup, recovery, and security. While Oracle Database provides support for wide data using nested tables in a number of contexts, to take advantage of R native functions that handle wide data using data.frames, Oracle R Enterprise allows you to store wide data.frames directly in Oracle Database using Oracle R Enterprise datastores.

With Oracle R Enterprise (ORE), a component of the Oracle Advanced Analytics option, the ORE datastore supports storing arbitrary R objects, including data.frames, in Oracle Database. In particular, users can load wide data from a file into R and store the resulting data.frame directly the R datastore. From there, users can repeatedly load the data at much faster speeds than they can from flat files.

The following benchmark results illustrate the performance of saving and loading data.frames of various dimensions. These tests were performed on an Oracle Exadata 5-2 half rack, ORE 1.4.1, ROracle 1.2-1, and R 3.2.0. Logging is turned off on the datastore table (see performance tip below). The data.frame consists of numeric data.

Comparing Alternatives

When it comes to accessing data and saving data for use with R, there are several options, including: CSV file, .Rdata file, and the ORE datastore. Each comes with its own advantages.

CSV

“Comma separated value” or CSV files are generally portable, provide a common representation for exporting/importing data, and can be readily loaded into a range of applications. However, flat files need to be managed and often have inadequate security, auditing, backup, and recovery. As we’ll see, CSV files provide significantly slower read and write times compared to .Rdata and ORE datastore.

.Rdata

R’s native .Rdata flat file representation is generally efficient for reading/writing R objects since the objects are in serialized form, i.e., not converted to a textual representation as CSV data are. However, .Rdata flat files also need to be managed and often have inadequate security, auditing, backup, and recovery. While faster than CSV read and write times, .Rdata is slower than ORE datastore. Being an R-specific format, access is limited to the R environment, which may or may not be a concern.

ORE Datastore

ORE’s datastore capability allows users to organize and manage all data in a single location – the Oracle Database. This centralized repository provides Oracle Database quality security, auditing, backup, and recovery. The ORE datastore, as you’ll see below, provides read and write performance that is significantly better than CSV and .Rdata. Of course, as with .Rdata being accessed through R, accessing the datastore is through Oracle Database.

Let’s look at a few benchmark comparisons.

First, consider the execution time for loading data using each of these approaches. For 2000 columns, we see that ore.load() is 124X faster than read.csv(), and over 3 times faster than R’s load() function for 5000 rows. At 20,000 rows, ore.load() is 198X faster than read.csv() and almost 4 times faster than load().

Considering the time to save data, ore.save() is over 11X faster than write.csv() and over 8X faster than save() at 2000 rows, with that benefit continuing through 20000 rows.

Looking at this across even wider data.frames, e.g., adding results for 4000 and 16000 columns, we see a similar performance benefit for the ORE datastore over save/load and write.csv/read.csv.

If you are looking to consolidate data while gaining performance benefits along with security, backup, and recovery, the Oracle R Enterprise datastore may be a preferred choice.

Example using ORE Datastore

The ORE datastore functions ore.save() and ore.load() are similar to the corresponding R save() and load() functions.

In the following example, we read a CSV data file, save it in the ORE datastore using ore.save() and associated it with the name “MyDatastore”. Although not shown, multiple objects can be listed in the initial arguments. Note that any R objects can be included here, not just data.frames.

From there, we list the contents of the datastore and see that “MyDatastore” is listed with the number of objects stored and the overall size. Next we can ask for a summary of the contents of “MyDatastore”, which includes the data.frame ‘dat’.

Next we remove ‘dat’ and load the contents of the datastore, reconstituting ‘dat’ as a usable data.frame object. Lastly, we delete the datastore and see that the ORE datastore is empty.

>
> dim(dat)
[1] 300 2000
>
> ore.save(dat, name="MyDatastore")
> ore.datastore()
datastore.name object.count size creation.date description
1 MyDatastore 1 4841036 2015-09-01 12:07:38
>
> ore.datastoreSummary("MyDatastore")
object.name class size length row.count col.count
1 dat data.frame 4841036 2000 300 2000
>
> rm(dat)
[1] "dat"
>
> ore.delete("MyDatastore")
[1] "MyDatastore"
>
> ore.datastore()
[1] datastore.name object.count size creation.date description
<0 rows> (or 0-length row.names)
>

Performance Tip

The performance of saving R objects to the datastore can be increased by temporarily turning off logging on the table that serves as the datastore in the user’s schema: RQ\$DATASTOREINVENTORY. This can be accomplished using the following SQL, which can also be invoked from R:

SQL> alter table RQ\$DATASTOREINVENTORY NOLOGGING;

ORE> ore.exec(“alter table RQ\$DATASTOREINVENTORY NOLOGGING”)

While turning off logging speeds up inserts and index creation, it avoids writing the redo log and as such has implications for database recovery. It can be used in combination with explicit backups before and after loading data.

## Wednesday May 06, 2015

### Experience using ORAAH on a customer business problem: some basic issues & solutions

We illustrate in this blog a few simple, practical solutions for problems which can arise when developing ORAAH mapreduce applications for the Oracle BDA. These problems were actually encountered during a recent POC engagement. The customer, an  important player in the medical technologies market, was interested in building an analysis flow consisting of a sequence of data manipulation and transformation steps followed by multiple model generation. The data preparation included multiple types of merging, filtering, variable generation based on complex search patterns and represented, by far, the most time consuming component of the flow. The original implementation on the customer's hardware required multiple days per flow to complete. Our ORAAH mapreduce based implementation running on a X5-2 Starter Rack BDA reduced that time to between 4-20 minutes, depending on which flow was tested.

The points which will be addressed in this blog are related to the fact that the data preparation was structured as a chain of task where each tasks performed transformations on HDFS data generated by one or multiple upstream tasks. More precisely we will consider the:

• Merging of HDFS data from multiple sources

• Re-balancing and parts reduction for HDFS data

• Getting unique levels for categorical variables from HDFS data

• Partitioning the data for distributed mapreduce execution

'Merging data' from above is to be understood as row binding of multiple tables. Re-balancing and parts reduction addresses the fact the HDFS data (generated by upstream jobs) may consist of very unequal parts (chunks) - this would lead to performance losses when this data further processed by other mapreduce jobs. The 3rd and 4th items are related. Getting the unique levels of categorical variables was useful for the data partitioning process, namely for how to generate the key-values pairs within the mapper functions.

### 1. Merging of hdfs data from multiple sources

The practical case here is that of a data transformation task for which the input consists of several, similarly structured HDFS data sets. As a reminder, data in HDFS is stored as a collection of flat files/chunks (part-00000, part-00001, etc) under an HDFS directory and the hdfs.* functions access the directory, not the 'part-xxxxx' chunks. Also the hadoop.run()/hadoop.exec().* functions work with single input data objects (HDFS object identifier representing a directory in HDFS); R rbind, cbind, merge, etc operations cannot be invoked within mapreduce to bind two or several large tables.

For the case under consideration, each input (dataA_dfs, dataB_dfs, etc) consists of a different number of files/chunks

R> hdfs.ls("dataA_dfs")
[1] "__ORCHMETA__" "part-00000" "part-00001" .... "part-00071"
R> hdfs.ls("dataB_dfs")
[1] "__ORCHMETA__" "part-00000" "part-00001" .... "part-00035"

corresponding to the number of reducers used by the upstream mapreduce jobs which generated this data. As these multiple chunks from various HDFS directories need to be processed as a single input data, they need to be moved into a unique HDFS directory. The 'merge_hdfs_data' function below does just that, by creating a new HDFS directory and copying all the part-xxxxx from each source directory  with proper updating of the resulting parts numbering. :

R> merge_hdfs_data <- function(SrcDirs,TrgtDir) {
#cat(sprintf("merge_hdfs_files : Creating %s ...\n",TrgtDir))
hdfs.mkdir(TrgtDir,overwrite=TRUE)
i <- 0
for (srcD in SrcDirs) {
fparts <- hdfs.ls(get(srcD),pattern="part")
srcd <- (hdfs.describe(get(srcD)))[1,2]
for (fpart in fparts) {
#cat(sprintf("merge_hdfs_files : Copying %s/%s to %s ...\n",

srcD,fpart,TrgtDir))
i <- i+1
hdfs.cp(paste(srcd,fpart,sep="/"),sprintf("%s/part-%05d",TrgtDir,i))
}
}
}

Merging of the dataA_dfs and dataB_dfs directories into a new data_merged_dfs directory is achieved through:

R> merge_hdfs_data(c("dataA_dfs","dataB_dfs"),"data_merged_dfs")

### 2. Data re-balancing / Reduction of the number of parts

Data stored in HDFS can suffer from two key problems that will affect performance: too many small files and files with very different numbers of records, especially those with very few records. The merged data produced by the function  above consists of a number of files equal to the sum of all files from all input HDFS directories. Since the upstream mapeduce jobs generating the inputs were run with a high number of reducers (for faster execution) the resulting total number of files got large (100+). This created an impractical constraint for the subsequent analysis as one cannot run a mapreduce application with a number of mappers less than the number of parts (the reverse is true, hdfs parts are splittable for processing by multiple mappers). Moreover if the parts have very different number of records the performance of the application will be affected since different mappers will handle very different volumes of data.

The rebalance_data function below represents a simple way of addressing these issues. Every mapper splits its portion of the data into a user-defined number of parts (nparts) containing quasi the same number of records. A key is associated with each part. In this implementation the number of reducers is set to the number of parts. After shuffling each reducer will collect the records corresponding to one particular key and write them to the output. The overall output consists of nparts parts with quasi equal size. A basic mechanism for preserving the data types is illustrated (see the map.output and reduce.output constructs below).

R> rebalance_data <- function(HdfsData,nmap,nparts)
{
mapper_func <- function(k,v) {
nlin <- nrow(v)
if(nlin>0) {
idx.seq <- seq(1,nlin)
kk <- ceiling(idx.seq/(nlin/nparts))
orch.keyvals(kk,v)
}
}
reducer_func <- function(k,v) {
if (nrow(v) > 0) { orch.keyvals(k=NULL,v) }
}
dtypes.out <- sapply(hdfs.meta(HdfsData)\$types,
function(x) ifelse(x=="character","\"a\"",
ifelse(x=="logical","FALSE","0")))
val.str <- paste0(hdfs.meta(HdfsData)\$names,"=",dtypes.out,collapse=",")
meta.map.str <- sprintf("data.frame(key=0,%s)",val.str)
meta.red.str <- sprintf("data.frame(key=NA,%s)",val.str)

config <- new("mapred.config",
job.name      = "rebalance_data",
map.output    = eval(parse(text=meta.map.str)),
reduce.output = eval(parse(text=meta.red.str)),
reduce.split  = 1e5)
mapper = mapper_func,
reducer = reducer_func,
config = config,
cleanup = TRUE
)
res
}

Before using this function, the data associated with the new data_merged_dfs directory needs to be attached to the ORAAH framework:

R> data_merged_dfs <- hdfs.attach("data_merged_dfs")

The invocation below uses 144 mappers for splitting the data into 4 parts:

R> x <- rebalance_data(data_merged_dfs,nmap=144,nparts=4)

The user may also want to save the resulting object, permanently, under some convenient/recognizable name like 'data_rebalanced_dfs' for example. The path to the temporary object x is retrieved with the hdfs.describe() command and provided as first argument to the hdfs.cp() command.

R> tmp_dfs_name <- hdfs.describe(x)[1,2]
R> hdfs.cp(tmp_dfs_name,"data_rebalanced_dfs",overwrite=TRUE)

The choice of the number of parts is up to the user. It is better to have a few parts to avoid constraining from below the number of mappers for the downstream runs but one should consider other factors like the read/write performance related to the size of the data sets, the HDFS block size, etc which are not the topic of the present blog.

### 3. Getting unique levels

Determining the unique levels of categorical variables in a dataset is of basic interest for any data exploration procedure. If the data is distributed in HDFS, this determination requires an appropriate solution. For the application under consideration here, getting the unique levels serves another purpose; the unique levels are used to generate data splits better suited for distributed execution by the downstream mapreduce jobs. More details are available in the next section.

Depending on the categorical variables in question and data charactersitics, the determination of unique levels may require different solutions. The implementation below is a generic solution providing these levels for multiple variables bundled together in the input argument 'cols'. The mappers associate a key with each variable and collect the unique levels for each of these variables. The resulting array of values are packed in text stream friendly format and provided as value argument to orch.keyvals() - in this way complex data types can be safely passed between the mappers and reducers (via text-based Hadoop streams). The reducers unpack the strings, retrieve the all values associated with a particular key (variable) and re-calculate the unique levels accounting now for all values of that variable.

R> get_unique_levels <- function(x, cols, nmap, nred) {
mapper <- function(k, v) {
for (col in cols) {
uvals <- unique(v[[col]])
orch.keyvals(col, orch.pack(uvals))
}
}
reducer <- function(k, v) {
lvals <- orch.unpack(v\$val)
uvals <- unique(unlist(lvals))
orch.keyval(k, orch.pack(uvals))
}
config <- new("mapred.config",
job.name      = "get_unique_levls",
map.output    = data.frame(key="a",val="packed"),
reduce.output = data.frame(key="a",val="packed"),
)
mapper = mapper,
reducer = reducer,
config = config,
export = orch.export(cols=cols))
resl <- (lapply((hdfs.get(res))\$val,function(x){orch.unpack(x)}))[[1]]
}

This implementation works fine provided that the number of levels for the categorical variables are much smaller than the large number of records of the entire data. If some categorical variables have many levels, not far  from order of the total number of records, each mapper may return a large numbers of levels and each reducer may have to handle multiple large objects. An efficient solution for this case requires a different approach. However, if the column associated with one of these variables can  fit in memory, a direct, very crude calculation like below can run faster than the former implementation. Here the mappers extract the column with the values of the variable in question, the column is pulled into an in-memory object and unique() is called to determine the unique levels.

R> get_unique_levels_sngl <- function(HdfsData,col,nmap)
{
mapper_fun <- function(k,v) { orch.keyvals(key=NULL,v[[col]]) }
config <- new("mapred.config",
job.name      = "extract_col",
map.output    = data.frame(key=NA,VAL=0),
mapper=mapper_fun,
config=config,
export=orch.export(col=col),
cleanup=TRUE)
xl <- hdfs.get(x)
res <- unique(xl\$VAL)
}

R> customers <- get_unique_levls_sngl(data_rebalanced_dfs,"CID",nmap=32)

We obtained thus the unique levels of the categorical variable CID (customer id) from our data_balanced_dfs data.

### 4. Partitioning the data for mapreduce execution

Let's suppose that the user wants to execute some specific data manipulations at the CID level like aggregations, variable transformations or new variables generation, etc. Associating a key with every customer (CID level) would be a bad idea since there are many customers - our hypothesis was that the number of CID levels is not orders of magnitude below the total number of records. This would lead to an excessive number of reducers with a terrible impact on performance. In such case it would be better, for example, to bag customers into groups and distribute the execution at the group level. The user may want to set the number of this groups ngrp to something commensurate with the number of  BDA cores available for parallelizing the task.

The example below illustrates how to do that at a basic level. The groups are generated within the encapsulating function myMRjob, before the hadoop.run() execution - the var.grp dataframe has two columns : the CID levels and the group number (from 1 to ngrp) with which they are associated. This table is passed to the hadoop execution environment via orch.export() within hadoop.run(). The mapper_fun function extracts the group number as key and inserts the multiple key-values pairs into the output buffer. The reducer gets then a complete set of records for every customer associated with a particular key(group) and can proceed with the transformations/ manipulations within a loop-over-customers or whatever programming construct would be appropriate. Each reducer would handle a quasi-equal number of customers because this is how the groups were generated. However the number of records per customer is not constant and may introduce some imbalances.

R> myMRjob <- function(HdfsData,var,ngrp,nmap,nred)
{
mapper_fun <- function(k,v) {
....
fltr <- <some_row_filetring>
cID <- which(names(v) %in% "CUSTOMID")
kk <- var.grps[match(v[fltr,cID],var.grps\$CUSTOMID),2]
orch.keyvals(kk,v[fltr,,drop=FALSE])
}
reducer_fun <- function(k,v) { ... }

var.grps <- data.frame(CUSTOMID=var,
GRP=rep(1:ngrp,sapply(split(var,ceiling(seq_along(var)/(length(var)/ngrp))),length)))

mapper = mapper_fun,
reducer = reducer_fun,
config = config,
export = orch.export(var.grps=var.grps,ngrp=ngrp),
cleanup = TRUE
)
res
}

x <- myMRjob(HdfsData=data_balanced_dfs, var=customers, ngrp=..,nmap=..,nred=..)

Improved data partitioning solutions could be sought for the cases where there are strong imbalances in the number of records per customer or if great variations are noticed between the reducer jobs completion times. This kind of optimization will be addressed in a later blog.

## Monday May 19, 2014

### Model cross-validation with ore.CV()

In this blog post we illustrate how to use Oracle R Enterprise for performing cross-validation of regression and classification models. We describe a new utility R function ore.CV that leverages features of Oracle R Enterprise and is available for download and use.

Predictive models are usually built on given data and verified on held-aside or unseen data. Cross-validation is a model improvement technique that avoids the limitations of a single train-and-test experiment by building and testing multiple models via repeated sampling from the available data. It's purpose is to offer a better insight into how well the model would generalize to new data and avoid over-fitting and deriving wrong conclusions from misleading peculiarities of the seen data.

In a k-fold cross-validation the data is partitioned into k (roughly) equal size subsets. One of the subsets is retained for testing and the remaining k-1 subsets are used for training. The process is repeated k times with each of the k subsets serving exactly once as testing data. Thus, all observations in the original data set are used for both training and testing.

The choice of k depends, in practice on the size n of the data set. For large data, k=3 could be sufficient. For very small data, the extreme case where k=n, leave-one-out cross-validation (LOOCV) would use a single observation from the original sample as testing data and the remaining observations as training data. Common choices are k=10 or k=5.

For a select set of algorithms and cases, the function ore.CV performs cross-validation for models generated by ORE regression and classification functions using in-databse data. ORE embedded R execution is leveraged to support cross-validation also for models built with vanilla R functions.

Usage

ore.CV(funType, function, formula, dataset, nFolds=<nb.folds>, fun.args=NULL, pred.args=NULL, pckg.lst=NULL)
• funType - "regression" or "classification"
• function - ORE predictive modeling functions for regression & classification or R function (regression only)
• formula - object of class "formula"
• dataset - name of the ore.frame
• nFolds - number of folds
• fun.args - list of supplementary arguments for 'function'
• pred.args - list of supplementary arguments for 'predict'. Must be consistent with the model object/model generator 'function'.
• pckg.lst - list of packages to be loaded by the DB R engine for embedded execution.
The set of functions supported for ORE include:
• ore.lm
• ore.stepwise
• ore.neural
• ore.glm
• ore.odmDT
• ore.odmSVM
• ore.odmGLM
• ore.odmNB
The set of functions supported for R include:
• lm
• glm
• svm
Note: The 'ggplot' and 'reshape' packages are required on the R client side for data post-processing and plotting (classification CV).

Examples

In the following examples, we illustrate various ways to invoke ore.CV using some datasets we have seen in previous posts. The datasets can be created as ore.frame objects using:

IRIS <- ore.push(iris)
LONGLEY <- ore.push(longley)
library(rpart)
KYPHOSIS <- ore.push(kyphosis)
library(PASWR)
TITANIC3 <- ore.push(titanic3)
MTCARS <- pore.push(mtcars)
(A) Cross-validation for models generated with ORE functions.

# Basic specification
ore.CV("regression","ore.lm",Sepal.Length~.-Species,"IRIS",nFolds=5)
ore.CV("regression","ore.neural",Employed~GNP+Population+Year,
"LONGLEY",nFolds=5)

#Specification of function arguments
ore.CV("regression","ore.stepwise",Employed~.,"LONGLEY",nFolds=5,
ore.CV("regression","ore.odmSVM",Employed~GNP+Population+Year,
"LONGLEY",nFolds=5, fun.args="regression")

#Specification of function arguments and prediction arguments
ore.CV("classification","ore.glm",Kyphosis~.,"KYPHOSIS",nFolds=5,
fun.args=list(family=binomial()),pred.args=list(type="response"))
ore.CV("classification","ore.odmGLM",Kyphosis~.,"KYPHOSIS",nFolds=5,
fun.args= list(type="logistic"),pred.args=list(type="response"))

(B) Cross-validation for models generated with R functions via the ORE embedded execution mechanism.

ore.CV("regression","lm",mpg~cyl+disp+hp+drat+wt+qsec,"MTCARS",nFolds=3)
ore.CV("regression","svm",Sepal.Length~.-Species,"IRIS",nFolds=5,
fun.args=list(type="eps-regression"), pckg.lst=c("e1071"))

Restrictions

• The signature of the model generator ‘function’ must be of the following type: function(formula,data,...). For example, functions like, ore.stepwise, ore.odmGLM and lm are supported but the R step(object,scope,...) function for AIC model selection via the stepwise algorithm, does not satisfy this requirement.
• The model validation process requires the prediction function to return a (1-dimensional) vector with the predicted values. If the (default) returned object is different the requirement must be met by providing an appropriate argument through ‘pred.args’. For example, for classification with ore.glm or ore.odmGLM the user should specify pred.args=list(type="response").
• Cross-validation of classification models via embedded R execution of vanilla R functions is not supported yet.
• Remark: Cross-validation is not a technique intended for large data as the cost of multiple model training and testing can become prohibitive. Moreover, with large data sets, it is possible to effectively produce an effective sampled train and test data set. The current ore.CV does not impose any restrictions on the size of the input and the user working with large data should use good judgment when choosing the model generator and the number of folds.

Output

The function ore.CV provides output on several levels: datastores to contain model results, plots, and text output.

Datastores

The results of each cross-validation run are saved into a datastore named dsCV_funTyp_data_Target_function_nFxx where funTyp, function, nF(=nFolds) have been described above and Target is the left-hand-side of the formula. For example, if one runs the ore.neural, ore.glm, and ore.odmNB-based cross-validation examples from above, the following three datastores are produced:

R> ds <- ore.datastore(pattern="dsCV")
R> print(ds)
datastore.name object.count size creation.date description
1 dsCV_classification_KYPHOSIS_Kyphosis_ore.glm_nF5 104480326 2014-04-30 18:19:55 <NA>
2 dsCV_classification_TITANIC3_survived_ore.odmNB_nF5 10 592083 2014-04-30 18:21:35 <NA>
3 dsCV_regression_LONGLEY_Employed_ore.neural_nF5 10 497204 2014-04-30 18:16:35 <NA>

Each datastore contains the models and prediction tables for every fold. Every prediction table has 3 columns: the fold index together with the target variable/class and the predicted values. If we consider the example from above and examine the most recent datastore (the Naive Bayes classification CV), we would see:

R> ds.last <- ds\$datastore.name[which.max(as.numeric(ds\$creation.date))]
R> ore.datastoreSummary(name=ds.last)
object.name class size length row.count col.count
1 model.fold1 ore.odmNB 66138 9 NA NA
2 model.fold2 ore.odmNB 88475 9 NA NA
3 model.fold3 ore.odmNB 110598 9 NA NA
4 model.fold4 ore.odmNB 133051 9 NA NA
5 model.fold5 ore.odmNB 155366 9 NA NA
6 test.fold1 ore.frame 7691 3 261 3
7 test.fold2 ore.frame 7691 3 262 3
8 test.fold3 ore.frame 7691 3 262 3
9 test.fold4 ore.frame 7691 3 262 3
10 test.fold5 ore.frame 7691 3 262 3

Plots

The following plots are generated automatically by ore.CV and saved in an automatically generated OUTPUT directory:

• Regression: ore.CV compares predicted vs target values, root mean square error (RMSE) and relative error (RERR) boxplots per fold. The example below is based on 5-fold cross-validation with the ore.lm regression model for Sepal.Length ~.-Species using the ore.frame IRIS dataset.
• Classification : ore.CV outputs a multi plot figure for classification metrics like Precision, Recall and F-measure. Each metrics is captured per target class (side-by-side barplots) and fold (groups of barplots). The example below is based on the 5-folds CV of the ore.odmSVM classification model for Species ~. using the ore.frame IRIS dataset.
• Text output
For classification problems, the confusion tables for each fold are saved in an ouput file residing in the OUTPUT directory together with a summary table displaying the precision, recall and F-measure metrics for every fold and predicted class.
file.show("OUTDIR/tbl_CV_classification_IRIS_Species_ore.odmSVM_nF5")

Confusion table for fold 1 :
setosa versicolor virginica
setosa          9          0         0
versicolor      0         12         1
virginica       0          1         7
Confusion table for fold 2 :
setosa versicolor virginica
setosa          9          0         0
versicolor      0          8         1
virginica       0          2        10
Confusion table for fold 3 :
setosa versicolor virginica
setosa         11          0         0
versicolor      0         10         2
virginica       0          0         7
Confusion table for fold 4 :
setosa versicolor virginica
setosa          9          0         0
versicolor      0         10         0
virginica       0          2         9
Confusion table for fold 5 :
setosa versicolor virginica
setosa         12          0         0
versicolor      0          5         1
virginica       0          0        12
Accuracy, Recall & F-measure table per {class,fold}
fold      class TP  m  n Precision Recall F_meas
1     1     setosa  9  9  9     1.000  1.000  1.000
2     1 versicolor 12 13 13     0.923  0.923  0.923
3     1  virginica  7  8  8     0.875  0.875  0.875
4     2     setosa  9  9  9     1.000  1.000  1.000
5     2 versicolor  8  9 10     0.889  0.800  0.842
6     2  virginica 10 12 11     0.833  0.909  0.870
7     3     setosa 11 11 11     1.000  1.000  1.000
8     3 versicolor 10 12 10     0.833  1.000  0.909
9     3  virginica  7  7  9     1.000  0.778  0.875
10    4     setosa  9  9  9     1.000  1.000  1.000
11    4 versicolor 10 10 12     1.000  0.833  0.909
12    4  virginica  9 11  9     0.818  1.000  0.900
13    5     setosa 12 12 12     1.000  1.000  1.000
14    5 versicolor  5  6  5     0.833  1.000  0.909
15    5  virginica 12 12 13     1.000  0.923  0.960

What's next
Several extensions of ore.CV are possible involving sampling, parallel model training and testing, support for vanilla R classifiers, post-processing and output. More material for future posts.