Tuesday Jul 26, 2016

Early detection of process anomalies with SPRT

Developed by Abraham Wald more than a half century ago, the Sequential Probability Ratio Test (SPRT) is a statistical technique for binary hypothesis testing (helping to decide between two hypothesis H0 and H1) and extensively used for system monitoring and early annunciation of signal drifting. SPRT is very popular for quality control and equipment surveillance applications, in industries and areas requiring a highly sensitive, reliable and especially fast detection of degradation behavior and/or sensor malfunctions. Examples of SPRT-based applications include process anomaly detection for nuclear power plants, early fault predictions for wind turbines, anomaly surveillance for key satellite components in the aerospace industry, pro-active fault monitoring for entreprise servers, quality control studies in manufacturing, drug and vaccine safety surveillance, construction and administration of computerized adatptive tests (CAT) and many other.

Conventional techniques for signal monitoring rely on simple tests, based, for example on control chart schemes with thresholds, mean values, etc, and are sensitive only to spikes exceeding some limits or abrupt changes in the process mean. They trigger alarms generally just before failures or only after the process drifted significantly. Tighter thresholds lead to high numbers of false alarms and relaxed thresholds result in high numbers of missed alarms. Moreover, these techniques can perform very poorly in the presence of noise.

The popularity of using SPRT for surveillance applications derives from the following characteristics:

  • SPRT detects signal discrepancies at the earliest mathematically possible time after the onset of system disturbances and/or sensor degradations. This property results from the fact that SPRT examines the statistical qualities of the monitored signals and catches nuances induced by disturbances well in advance of any measurable changes in the mean values of the signals.

  • SPRT examines successive observations of a discrete process and is very cheap to compute. Thus, it is perfectly adapted for real time analysis with streaming data flows.

  • SPRT offers control over false-alarms and missed-alarms rates via user-defined parameters.

The mathematical expressions for SPRT were derived under two assumptions: (1) The samples follow an a-priori known distribution function and (2) The samples are independent and identically distributed (i.i.d). Let us consider a sequence of values {Yn} = y0, y1 ... yn resulting from a stationary process satisfying the previous assumptions. Let us further consider the case where the signal data obeys a Gaussian distribution with mean µ0 and variance σ02.  The normal signal behavior is referred to as the null hypothesis, H0.  Alternative hypothesis express abnormal behavior. For example one can formulate alternate H1,H2,H3 and H4 to characterize signals with larger or smaller mean or variance. More explicitly, the null and alternate hypothesis could be written as:

H0 :  mean µ0 , variance σ02
H1 :  mean µ1 >  µ0 , variance σ02
H2 :  mean µ2 <  µ0 , variance σ02
H3 :  mean µ0 , variance σ32 = V σ02
H4 :  mean µ0 , variance σ42 = (1/V) σ02

The likelihood ratio (LR) is the ratio of probabilities for observing the sequence {Yn} under an alternate hypothesis Hi, versus the null hypothesis H0  :

At each step of the {Yn} sequence, SPRT calculates a test index as the natural log of LR, referred to as LLR and compares it to two stopping boundaries:

If LLR is greater or equal to the upper boundary log(B) then H0 is rejected and Hi is accepted. If LLR is less or equal to the lower boundary log(A) then Hi is rejected and H0 is accepted. For as long as LLR remains between these two boundaries there is not enough evidence to reach a conclusion; the sampling yn continues and LLR(n) is updated for a new comparison. The decision boundaries are derived from the false and missed alarm probabilities via the following equations:


α = probability of accepting Hi when H0 is true (false alarm probability)
β = probability of accepting H0 when Hi is true (missed alarm probability)
After a decision is reached (Hi or H0 are accepted), LLR is reset to zero, the sampling y1,y2,… is re-started and a new sequence of calculations and comparisons is performed to validate Hi or H0 anew.
The power of SPRT comes from the simplicity of the LLR(n) expression for some common distributions. For the normal distribution we are considering and under the i.i.d. assumption mentionded above, the P(Y{n} |H0) and P(Y{n} |H1)  probabilities reduce to

leading to the following expression for LLR :

For each new sample yn the update of this expression is trivial with the cost of one multiplication and two additions:

Similarly, the LLR(n) expression for testing H3  (with σ32 = V σ02) against H0 is:

leading to an also fast and cheap LLR update as the sampling {Yn} continues.

Change of mean

The script sprt.R illustrates a simple implementation of these formulas and the SPRT decision process. A random sequence with 1000 points (t=1,…1000) is generated from a normal distribution with µ = 0 and σ = 1. The signal is modified between t=101 and t=200 with samples drawn from a normal distribution with µ = 1 and σ = 1.

R> vals <- rnorm(1000,mean=0,sd=1)

R> vals[101:200] <- rnorm(100,mean=1,sd=1)

The resulting signal is captured in the plot below, for t=1,..200

The SPRT LLR equation for change of mean are used with the following null and alternate hypothesis:

H0 : µ0 = 0 and σ0 = 1
H1 : µ1 = 0.8 and σ1 = 1
The value µ1 = 0.8 is arbitrary as we are not supposed to know if the signal mean shifted and by how much. The mysprt(..) function (see the script) is used to  calculate LLR(n), perform the comparisons, decide whether to accept the null or alternate hypothesis or keep sampling and updating LLR or restart the LLR sequence from zero after a decision was taken. The values false alarm and missed alarm parameters α and β were both set to 0.05. The plot_sprt() function generates several plots as shown below.
The first plot illustrates LLR(t) for t=1,…200. Starting on the left one can see that LLR(1) is in the undecided region log(A) < LLR < log(B). At t=12 LLR falls under log(A) confirming H0. LLR is reset to zero and a new LLR(n) sequence is calculated until, at t=23 H0 is confirmed again. These sequences continue several times until t=101 when the mean shift kicks in. Very fast, at t=105 LLR rises above log(B) and H1 is confirmed. LLR is reset and at t=119 H1 is confirmed again. H1 continues to be confirmed over several sequences until we reach t=200 where mysprt() was stopped (arbitrarily as in a real application it should continue running)

Over the t=1,..,200 interval, the application of SPRT generated 26 LLR (restart to H1/H0 decision) sequences. The length of a sequence corresponds to the number of samples/time needed to decide in favor of H1 or H0. The next two plots show the length of each successive sequence and the distribution (histogram) of the sequence lengths.

The decision time is generally short; for some sequences only 3 samples are necessary to validate H0 or H1, for some other up to 15 samples are required.
The next plot illustrates LLR without restart, that an LLR updated disregarding the log(A) and log(B) boundaries without stop and without decision. We see that as long as the samples come from the un-shifted signal (t=1,..,100), the non-restarted LLR continues to drop. Once the signal changes, LLR reverts the trend and keeps climbing. A posteriori, it is very clear where the signal change occurred : at t~100.

Change of variance

In this section we discuss the SPRT results when detecting a change in variance. For this purpose the original signal is modified between t=501 and t=600 with samples drawn, this time, from a normal distribution with µ = 0 and σ = 1.4

R> vals[501:600] <- rnorm(100,mean=0,sd=1.4)

The plot below represents the signal for t=401,..,600

We examined this subset of the signal with SPRT by calculating LLR with the following null and alternate hypothesis:

H0 : µ0 = 0 and σ0 = 1
H3 : µ3 = 0 and σ3 = 1.5
α and β were again set to 0.05. Note that we are constructing an alternate hypothesis with a σ3 value higher than the actual σ of the modified signal. The LLR sequences are illustrated below, together with the lengths of each sequence and the sequence lengths distribution.

One can see that the LLR sequences are longer and quite a few require a number of sample of O[30-50] to decide in favor of H1 or H0. The variance-shift detection appears, for this case, to be a tougher problem. But the comparison with the mean-shift case is not apple to apple and the {µ0,σ0,µi,σi,α,β} parameter space needs to be properly explored before drawing conclusions.

Practical aspects

For real world processes, signal shifts occur in multiple ways and SPRT should be applied for both mean and variance and for shifts in both directions (positive and negative).  SPRT could be also run for multiple shift values and false and missed alarms probabilities. The efficient monitoring of a large number of signals with multiple SPRT tests per signal requires task parallelism capabilities and a flexible management of the various types of alarms which can be generated.

Often, SPRT is applied not to the raw signals but to the residuals between signals and signal predictions. The 'normal-operation' signal behavior is learned via specific techniques during a generally off-line training stage, after which the system is put in surveillance mode where departure from ‘normality’ is caught, on-line, by SPRT at the earliest stage of process shifts.

Mathematical expressions for LLR can also be derived for distributions other than Gaussian but the analytical complexity can escalate quickly.

Some of these topics and especially how to put this technique into production, using, for example ORE's Embedded R Execution mechanism, will be expanded in future blogs.

Wednesday Jul 13, 2016

Predicting Energy Demand using IoT

The Internet of Things (IoT) presents new opportunities for applying advanced analytics. Sensors are everywhere collecting data – on airplanes, trains, and cars, in semiconductor production machinery and the Large Hadron Collider, and even in our homes. One such sensor is the home energy smart meter, which can report household energy consumption every 15 minutes. This data enables energy companies to not only model each customer’s energy consumption patterns, but also to forecast individual usage. Across all customers, energy companies can compute aggregate demand, which enables more efficient deployment of personnel, redirection or purchase of energy, etc., often a few days or weeks out.

To build one predictive model per customer, when an energy company can have millions of customers, poses some interesting challenges. Consider an energy company with 1 million customers. Over the course of a single year, these smart meters will collect over 35 billion readings. Each customer, however, generates only about 35,000 readings. On most hardware, R can easily build a model on 35,000 readings. Note that if each model requires even only 10 seconds to build, doing this serially will require roughly 116 days to build all models. Since the results are need a few days or weeks out, a delay of months makes this project a non-starter. If powerful hardware, such as Oracle Exadata, can be leveraged to compute these models in parallel, say with degree of parallelism of 128, all models can be computed in less than one day.

While users can leverage parallelism enabled by various R packages, there are several factors that need to be taken into account. For example, what happens if certain models fail? Will the models be stored as 1 million separate flat files – one per customer? For flat files, how will backup, recovery, and security be handled? How can these models be used for forecasting customer usage and where will the forecasts be stored? How can these R models be incorporated into a production environment where applications and dashboards normally work with SQL?

Using the Embedded R Execution capability of Oracle R Enterprise, Data Scientists can focus on the task of building a model for a single customer. This model is stored in the R Script Repository in Oracle Database. ORE enables invoking this script from a single function, i.e., ore.groupApply, relying on the database to spawn multiple R engines, load one partition of data from the database to the function produced by the Data Scientist, and then store the resulting model immediately in the R Datastore, again in Oracle Database. This greatly simplifies the process of computing and storing models. Moreover, standard database backup and recovery mechanisms already in place can be used to avoid having to devise separate special practices. Forecasting using these models is handled in an analogous way.

To put these R scripts into production, users can invoke the same R scripts produced by the Data Scientist from SQL, both for the model building and forecasting. The forecasts can be immediately available as a database table that can be read by applications and dashboards, or used in other SQL queries. In addition, these SQL statements that invoke the R functions can be scheduled for periodic execution using the DBMS_SCHEDULER package of Oracle Database.

Leveraging the built-in functionality of ORE, Data Scientists, application developers, and administrators do not have to reinvent complex code and testing strategies, often done for each new project. Instead, they benefit from Oracle's integration of R with Oracle Database - to easily design and implement R-based solutions for use with applications and dashboards, and scale to the enterprise.

Wednesday Mar 30, 2016

Real-time model scoring for streaming data - a prototype based on Oracle Stream Explorer and Oracle R Enterprise

Whether applied to manufacturing, financial services, energy, transportation, retail, government, security or other domains, real-time analytics is an umbrella term which covers a broad spectrum of capabilities (data integration, analytics, business intelligence) built on streaming input from multiple channels. Examples of such channels are: sensor data, log data, market data, click streams, social media and monitoring imagery.

Key metrics separating real-time analytics from more traditional, batch, off-line analytics are latency and availability. At one end of the analytics spectrum are complex, long running batch analyses with slow response time and low availability requirements. At the other end are real-time, lightweight analytic applications with fast response time (O[ms]) and high availability (99.99..%). Another distinction is between the capability for responding to individual events and/or ordered sequences of events versus the capability for handling only event collections in micro batches without preservation of their ordered characteristics. The complexity of the analysis performed on the real-time data is also a big differentiator: capabilities range from simple filtering and aggregations to complex predictive procedures. The level of integration between the model generation and the model scoring functionalities needs also to be considered for real-time applications. Machine learning algorithms specially designed for online model building exist and are offered by some streaming data platforms but their number is small. Practical solutions could be built by combining an off-line model generation platform with a data streaming platform augmented with scoring capabilities.

In this blog we describe a new prototype for real time analytics integrating two components : Oracle Stream Explorer (OSX) and Oracle R Enterprise (ORE). Examples of target applications for this type of integration are: equipment monitoring through sensors, anomaly detection and failure prediction for large systems made of a high number of components.

The basic architecture is illustrated below:

ORE is used for model building, in batch mode, at low frequency, and OSX handles the high frequency streams and pushes data toward a scoring application, performs predictions in real time and returns results to consumer applications connected to the output channels.

OSX is a middleware platform for developing streaming data applications. These applications monitor and process large amounts of streaming data in real time, from a multitude of sources like sensors, social media, financial feeds, etc. Readers unfamiliar with OSX should visit Getting Started with Event Processing for OSX.

In OSX, streaming data flows into, through, and out of an application. The applications can be created, configured and deployed with pre-built components provided with the platform or built from customized adapters and event beans. The application in this case is a custom scoring application for real time data.  A thorough description of the application building process can be found in the following guide: Developing Applications for Event Processing with Oracle Stream Explorer.

In our solution prototype for streaming analytics, the model exchange between ORE and OSX is realized by converting the R models to a PMML representation. After that, JPMML - the Java Evaluator API for PMML - is leveraged for reading the model and building a custom OSX scoring application.
The end-to-end workflow is represented below:

and the subsequent sections of this blog will summarize the essentials aspects.

Model Generation

As previously stated, the use cases targeted by this OSX-ORE integration prototype application consist of systems made of a large number of different components. Each  component type is abstracted by a different model. We leverage ORE's Embedded R Execution capability for data and task parallelism to generate a large number of models concurrently. This is accomplished for example with ore.groupApply():

res <- ore.groupApply(
   function(dat,frml) {mdl<-...},

Model representation in PMML

The model transfer between the model generator and the scoring engine is enabled by conversion to a PMML representation. PMML is an XML-based mature standard for model exchange. A model in PMML format is represented by a collection of XML elements, or PMML components, which completely describe the modeling flow. For example, the Data Dictionary component contains the definitions for all fields used by the model (attribute types, value ranges, etc) the Data Transformations component describes the mapping functions between the
raw data and its desired form for the modeling algorithms, the Mining Schema component assigns the active and target variables and enumerates the policies for missing data, outliers, and so on. Besides the specifications for the data mining algorithms together with accompanying of pre- and post-processing steps, PMML can also describe more complex modeling concepts like model composition, model hierarchies, model verification and fields scoping - to find out more about PMML's structure and functionality go to General Structure. PMML representations have been standardized for several classes of data mining algorithms.  Details are available at the same location.


In R the conversion/translation to PMML formats is enabled through the pmml package. The following algorithms are supported:

ada (ada)
arules (arules)
coxph (survival)
glm (stats)
glmnet (glmnet)
hclust (stats)
kmeans (stats)
ksvm (kernlab)
lm (stats)
multinom (nnet)
naiveBayes (e1071)
nnet (nnet)
randomForest (randomFoerst)
rfsrc (randomForestSRC)
rpart (rpart)
svm (e1071)

The r2pmml package offers complementary support for

gbm (gbm)
and a much better (performance-wise) conversion to PMML for randomForest. Check the details at converting_randomforest.

The conversion to pmml is done via the pmml() generic function which dispatches the appropriate method for the supplied model,  depending on it's class.

mdl <- randomForest(...)
pmld <- pmml(mdl)

Exporting the PMML model

In the current prototype, the pmml model is exported to the streaming platform as a physical XML file. A better solution is to leverage R's serialization interface which supports a rich set of connections through pipes, url's, sockets, etc.
The pmml objects can be also saved in ORE datastores within the database and specific policies can be implemented to control the access and usage.

ore.grant(name=dsname, type="datastore", user=...)

OSX Applications and the Event Processing Network (EPN)

The OSX workflow, implemented as an OSX application, consists of three logical steps: the pmml model is imported into OSX, a scoring application is created and scoring is performed on the input streams.

In OSX, applications are modeled as Data Flow graphs named Event Processing Networks (EPN). Data flows into, through, and out of EPNs. When raw data flows into an OSX application it is first converted into events. Events flow through the different stages of application where they are processed according to the specifics of the application. At the end, events are converted back to data in suitable format for consumption by downstream applications. 

The EPN for our prototype is basic:

Streaming data flows from the Input Adapters through Input Channels, reaches the Scoring Processor where the prediction is performed, flows through the Output Channel to an Output Adapter and exits the application in a desired form. In our demo application the data is streamed out of a CSV file into the Input Adapter. The top adaptors (left & right) on the EPN  diagram represent connections to  the Stream Explorer User Interface (UI). Their purpose is to demonstrate options for controlling the scoring process (like, for example, change the model while the application is still running) and visualizing the predictions.

The JPMML-Evaluator

The Scoring Processor was implemented by leveraging the open source library JPMML library, the Java Evaluator API for PMML. The methods of this class allow, among others to pre-process the active & target fields according to the DataDictionary and MiningSchema elements, evaluate the model for several classes of algorithms and post-process the results according to the Targets element.

JPMML offers support for:

Association Rules
Cluster Models
General Regression
k-Nearest Neighbors
Naïve Bayes
Neural Networks
Tree Models
Support Vector Machines
Ensemble Models
which covers most of the models which can be converted to PMML from R, using the pmml() method, except for time series, sequence rules & text models.

The Scoring Processor

The Scoring Processor (see EPN) is implemented as a JAVA class with methods that automate scoring based on the PMML model. The important steps of this automation are enumerated below:

  • The PMML schema is loaded, from the xml document,

  •     pmml = pmmlUtil.loadModel(pmmlFileName);

  • An instance of the Model Evaluator is created. In the example below we assume that we don't know what type of model we are dealing with so the instantiation is delegated to an instance of a ModelEvaluatorFactory class.

  •     ModelEvaluatorFactory modelEvaluatorFactory =
        ModelEvaluator<?>  evaluator = modelEvaluatorFactory.newModelManager(pmml);

  • This Model Evaluator instance is queried for the fields definitions. For the active fields:

  •     List<FieldName>  activeModelFields = evaluator.getActiveFields();

  • The subsequent data preparation performs several tasks: value conversions between the Java type system and the PMML type system, validation of these values according to the specifications in the Data Field element, handling of invalid, missing values and outliers as per the Mining Field element.

  •     FieldValue activeValue = evaluator.prepare(activeField, inputValue)
        pmmlArguments.put(activeField, activeValue);

  • The prediction is executed next

  •     Map<FieldName, ?> results = evaluator.evaluate(pmmlArguments);

  • Once this is done, the mapping between the scoring results & other fields to output events is performed. This needs to differentiate between the cases where the target values are Java primitive values or smth different.

  •     FieldName targetName = evaluator.getTargetField();
        Object targetValue = results.get(targetName);
        if (targetValue instanceof Computable){ ….

    More details about this approach can be found at JPMML-Evaluator: Preparing arguments for evaluation and Java (JPMML) Prediction using R PMML model.

    The key aspect is that the JPMML Evaluator API provides the functionality for implementing the Scoring Processor independently of the actual model being used. The active variables, mappings, assignments, predictor invocations are figured out automatically, from the PMML representation. This approach allows flexibility for the scoring application. Suppose that several PMML models have been generated off-line, for the same system component/equipment part, etc. Then, for example, an n-variables logistic model could be replaced by an m-variables decision tree model via the UI control by just pointing a Scoring Processor variable to the new PMML object. Moreover the substitution can be executed via signal events sent through the UI Application Control (upper left of EPN) without stopping and restarting the scoring application. This is practical because the real-time data keeps flowing in !

    Tested models

    The R algorithms listed below were tested and identical results were obtained for predictions based on the OSX PMML/JPMML scoring application and predictions in R.

    lm (stats)
    glm (stats)
    rpart (rpart)
    naiveBayes (e1071)
    nnet (nnet)
    randomForest (randomForest)

    The prototype is new and other algorithms are currently tested. The details will follow in a subsequent post.


    The OSX-ORE PMML/JPMML-based prototype for real-time scoring was developed togehther with Mauricio Arango | A-Team Cloud Solutions Architects. The work was presented at BIWA 2016.

    Wednesday Mar 23, 2016

    R Consortium Announces New Projects

    The R Consortium works with and provides support to the R Foundation and other organizations developing, maintaining and distributing R software and provides a unifying framework for the R user community. The R Consortium Infrastructure Steering Committee (ISC) supports projects that help the R community, whether through software development, developing new teaching materials, documenting best practices, promoting R to new audiences, standardizing APIs, or doing research.

    In the first open call for proposals, Oracle submitted three proposals, each of which has been accepted by the ISC: “R Implementation, Optimization and Tooling Workshops” which received a grant, and two working groups “Future-proof native APIs for R” and “Code Coverage Tool for R.” These were officially announced by the R Consortium here.

    R Implementation, Optimization and Tooling Workshops

    Following the successful first edition of the R Implementation, Optimization and Tooling (RIOT) Workshop collocated with ECOOP 2015 conference, the second edition of the workshop will be collocated with useR! 2016 and held on July 3rd at Stanford University. Similarly to last year’s event, RIOT 2016 is a one-day workshop dedicated to exploring future directions for development of R language implementations and tools. The goals of the workshop include, but are not limited to, sharing experiences of developing different R language implementations and tools and evaluating their status, exploring possibilities to increase involvement of the R user community in constructing different R implementations, identifying R language development and tooling opportunities, and discussing future directions for the R language. The workshop will consist of a number of short talks and discussions and will bring together developers of R language implementations and tools. See this link for more information.

    Code Coverage Tool for R

    Code coverage helps to ensure greater software quality by reporting how thoroughly test suites cover the various code paths. Having a tool that supports the breadth of the R language across multiple platforms, and that is used by R package developers and R core teams, helps to improve software quality for the R Community. While a few code coverage tools exist for R, this Oracle-proposed ISC project aims to provide an enhanced tool that addresses feature and platform limitations of existing tools via an ISC-established working group. It also aims to promote the use of code coverage more systematically within the R ecosystem.

    Future-proof native APIs for R

    This project aims to develop a future-proof native API for R. The current native API evolved gradually, adding new functionality incrementally, as opposed to reflecting an overall design with one consistent API, which makes it harder than necessary to understand and use. As the R ecosystem evolves, the native API is becoming a bottleneck, preventing crucial changes to the GNUR runtime, while presenting difficulties for alternative implementations of the R language. The ISC recognizes this as critical to the R ecosystem and will create a working group to facilitate cooperation on this issue. This project's goal is to assess current native API usage, gather community input, and work toward a modern, future-proof, easy to understand, consistent and verifiable API that will make life easier for both users and implementers of the R language.

    Oracle is pleased to be a founding member of the R Consortium and to contribute to these and other projects that support the R community and ecosystem.

    Friday Feb 05, 2016

    Using SVD for Dimensionality Reduction

    SVD, or Singular Value Decomposition, is one of several techniques that can be used to reduce the dimensionality, i.e., the number of columns, of a data set. Why would we want to reduce the number of dimensions? In predictive analytics, more columns normally means more time required to build models and score data. If some columns have no predictive value, this means wasted time, or worse, those columns contribute noise to the model and reduce model quality or predictive accuracy.

    Dimensionality reduction can be achieved by simply dropping columns, for example, those that may show up as collinear with others or identified as not being particularly predictive of the target as determined by an attribute importance ranking technique. But it can also be achieved by deriving new columns based on linear combinations of the original columns. In both cases, the resulting transformed data set can be provided to machine learning algorithms to yield faster model build times, faster scoring times, and more accurate models.

    While SVD can be used for dimensionality reduction, it is often used in digital signal processing for noise reduction, image compression, and other areas.

    SVD is an algorithm that factors an m x n matrix, M, of real or complex values into three component matrices, where the factorization has the form USV*. U is an m x p matrix. S is a p x p diagonal matrix. V is an n x p matrix, with V* being the transpose of V, a p x n matrix, or the conjugate transpose if M contains complex values. The value p is called the rank. The diagonal entries of S are referred to as the singular values of M. The columns of U are typically called the left-singular vectors of M, and the columns of V are called the right-singular vectors of M.

    Consider the following visual representation of these matrices:

    One of the features of SVD is that given the decomposition of M into U, S, and V, one can reconstruct the original matrix M, or an approximation of it. The singular values in the diagonal matrix S can be used to understand the amount of variance explained by each of the singular vectors. In R, this can be achieved using the computation:


    When plotted, this provides a visual understanding of the variance captured by the model. The figure below indicates that the first singular vector accounts for 96.5% of the variance, the second with the first accounts for over 99.5%, and so on.

    As such, we can use this information to limit the number of vectors to the amount of variance we wish to capture. Reducing the number of vectors can help eliminate noise in the original data set when that data set is reconstructed using the subcomponents of U, S, and V.

    ORE’s parallel, distributed SVD

    With Oracle R Enterprise’s parallel distributed implementation of R’s svd function, only the S and V components are returned. More specifically, the diagonal singular values are returned of S as the vector d. If we store the result of invoking svd on matrix dat in svd.mod, U can be derived from these using M as follows:

    svd.mod <- svd(dat)
    U <- dat %*% svd.mod$v %*% diag(1./svd.mod$d)

    So, how do we achieve dimensionality reduction using SVD? We can use the first k columns of V and S and achieve U’ with fewer columns.

    U.reduced <-dat %*% svd.mod$v[,1:k,drop=FALSE] %*% diag((svd.mod$d)[1:k,drop=FALSE])

    This reduced U can now be used as a proxy for matrix dat with fewer columns.

    The function dimReduce introduced below accepts a matrix x, the number of columns desired k, and a request for any supplemental columns to return with the transformed matrix.

    dimReduce <- function(x, k=floor(ncol(x)/2), supplemental.cols=NULL) {
    colIdxs <- which(colnames(x) %in% supplemental.cols)
    colNames <- names(x[,-colIdxs])
    sol <- svd(x[,-colIdxs])
    sol.U <- as.matrix(x[,-colIdxs]) %*% (sol$v)[,1:k,drop=FALSE] %*%
    sol.U = sol.U@data
    res <- cbind(sol.U,x[,colIdxs,drop=FALSE])
    names(res) <- c(names(sol.U@data),names(x[,colIdxs]))

    We will now use this function to reduce the iris data set.

    To prepare the iris data set, we first add a unique identifier, create the database table IRIS2 in the database, and then assign row names to enable row indexing. We could also make ID the primary key using ore.exec with the ALTER TABLE statement. Refreshing the ore.frame proxy object using ore.sync reflects the change in primary key.

    dat <- iris
    dat$ID <- seq_len(nrow(dat))
    row.names(IRIS2) <- IRIS2$ID
    # ore.exec("alter table IRIS2 add constraint IRIS2 primary key (\"ID\")")
    # ore.sync(table = "IRIS2", use.keys = TRUE)

    Using the function defined above, dimReduce, we produce IRIS2.reduced with supplemental columns of ID and Species. This allows us to easily generate a confusion matrix later. You will find that IRIS2.reduced has 4 columns.

    IRIS2.reduced <- dimReduce(IRIS2, 2, supplemental.cols=c("ID","Species"))
    dim(IRIS2.reduced) # 150 4

    Next, we will build an rpart model to predict Species using first the original iris data set, and then the reduced data set so we can compare the confusion matrices of each. Note that to use R's rpart for model building, the data set IRIS2.reduced is pulled to the client.

    m1 <- rpart(Species~.,iris)
    res1 <- predict(m1,iris,type="class")
    #res1 setosa versicolor virginica
    # setosa 50 0 0
    # versicolor 0 49 5
    # virginica 0 1 45

    dat2 <- ore.pull(IRIS2.reduced)
    m2 <- rpart(Species~.-ID,dat2)
    res2 <- predict(m2,dat2,type="class")
    # res2 setosa versicolor virginica
    # setosa 50 0 0
    # versicolor 0 47 0
    # virginica 0 3 50

    Notice that the resulting models are comparable, but that the model that used IRIS2.reduced actually has better overall accuracy, making just 3 mistakes instead of 6. Of course, a more accurate assessment of error would be to use cross validation, however, this is left as an exercise for the reader.

    We can build a similar model using the in-database decision tree algorithm, via ore.odmDT, and get the same results on this particular data set.

    m2.1 <- ore.odmDT(Species~.-ID, IRIS2.reduced)
    res2.1 <- predict(m2.1,IRIS2.reduced,type="class",supplemental.cols = "Species")
    table(res2.1$PREDICTION, res2.1$Species)
    # res2 setosa versicolor virginica
    # setosa 50 0 0
    # versicolor 0 47 0
    # virginica 0 3 50

    A more interesting example is based on the digit-recognizer data which can be located on the Kaggle website here. In this example, we first use Support Vector Machine as the algorithm with default parameters on split train and test samples of the original training data. This allows us to get an objective assessment of model accuracy. Then, we preprocess the train and test sets using the in-database SVD algorithm and reduce the original 785 predictors to 40. The reduced number of variables specified is subject to experimentation. Degree of parallelism for SVD was set to 4.

    The results highlight that reducing data dimensionality can improve overall model accuracy, and that overall execution time can be significantly faster. Specifically, using ore.odmSVM for model building saw a 43% time reduction and a 4.2% increase in accuracy by preprocessing the train and test data using SVD.

    However, it should be noted that not all algorithms are necessarily aided by dimensionality reduction with SVD. In a second test on the same data using ore.odmRandomForest with 25 trees and defaults for other settings, accuracy of 95.3% was achieved using the original train and test sets. With the SVD reduced train and test sets, accuracy was 93.7%. While the model building time was reduced by 80% and scoring time reduced by 54%, if we factor in the SVD execution time, however, using the straight random forest algorithm does better by a factor of two.


    For this scenario, we modify the dimReduce function introduced above and add another function dimReduceApply. In dimReduce, we save the model in an ORE Datastore so that the same model can be used to transform the test data set for scoring. In dimReduceApply, that same model is loaded for use in constructing the reduced U matrix.

    dimReduce <- function(x, k=floor(ncol(x)/2), supplemental.cols=NULL, dsname="svd.model") {
    colIdxs <- which(colnames(x) %in% supplemental.cols)
    if (length(colIdxs) > 0) {
    sol <- svd(x[,-colIdxs])
    sol.U <- as.matrix(x[,-colIdxs]) %*% (sol$v)[,1:k,drop=FALSE] %*%
    res <- cbind(sol.U@data,x[,colIdxs,drop=FALSE])
    # names(res) <- c(names(sol.U@data),names(x[,colIdxs]))
    } else {
    sol <- svd(x)
    sol.U <- as.matrix(x) %*% (sol$v)[,1:k,drop=FALSE] %*%
    res <- sol.U@data
    ore.save(sol, name=dsname, overwrite=TRUE)

    dimReduceApply <- function(x, k=floor(ncol(x)/2), supplemental.cols=NULL, dsname="svd.model") {
    colIdxs <- which(colnames(x) %in% supplemental.cols)
    if (length(colIdxs) > 0) {
    sol.U <- as.matrix(x[,-colIdxs]) %*% (sol$v)[,1:k,drop=FALSE] %*%
    res <- cbind(sol.U@data,x[,colIdxs,drop=FALSE])
    # names(res) <- c(names(sol.U@data),names(x[,colIdxs]))
    } else {
    sol.U <- as.matrix(x) %*% (sol$v)[,1:k,drop=FALSE] %*%
    res <- sol.U@data

    Here is the script used for the digit data:

    # load data from file
    train <- read.csv("D:/datasets/digit-recognizer-train.csv")
    dim(train) # 42000 786

    train$ID <- 1:nrow(train) # assign row id
    ore.create(train,table="DIGIT_TRAIN") # create as table in the database
    dim(DIGIT_TRAIN) # 42000 786

    # Split the original training data into train and
    # test sets to evaluate model accuracy
    dt <- DIGIT_TRAIN
    ind <- sample(1:nrow(dt),nrow(dt)*.6)
    group <- as.integer(1:nrow(dt) %in% ind)

    row.names(dt) <- dt$ID
    sample.train <- dt[group==TRUE,]
    sample.test <- dt[group==FALSE,]
    dim(sample.train) # 25200 786
    dim(sample.test) # 16800 786
    # Create train table in database
    ore.create(sample.train, table="DIGIT_SAMPLE_TRAIN")
    # Create test table in database
    ore.create(sample.test, table="DIGIT_SAMPLE_TEST")

    # Add persistent primary key for row indexing
    # Note: could be done using row.names(DIGIT_SAMPLE_TRAIN) <- DIGIT_SAMPLE_TRAIN$ID
    ore.exec("alter table DIGIT_SAMPLE_TRAIN add constraint
    DIGIT_SAMPLE_TRAIN primary key (\"ID\")")
    ore.exec("alter table DIGIT_SAMPLE_TEST add constraint
    DIGIT_SAMPLE_TEST primary key (\"ID\")")
    ore.sync(table = c("DIGIT_SAMPLE_TRAIN","DIGIT_SAMPLE_TRAIN"), use.keys = TRUE)

    # SVM model
    m1.svm <- ore.odmSVM(label~.-ID, DIGIT_SAMPLE_TRAIN, type="classification")
    pred.svm <- predict(m1.svm, DIGIT_SAMPLE_TEST,
    cm <- with(pred.svm, table(label,PREDICTION))

    # Confusion Matrix and Statistics
    # label 0 1 2 3 4 5 6 7 8 9
    # 0 1633 0 4 2 3 9 16 2 7 0
    # 1 0 1855 12 3 2 5 4 2 23 3
    # 2 9 11 1445 22 26 8 22 30 46 10
    # 3 8 9 57 1513 2 57 16 16 41 15
    # 4 5 9 10 0 1508 0 10 4 14 85
    # 5 24 12 14 52 28 1314 26 6 49 34
    # 6 10 2 7 1 8 26 1603 0 6 0
    # 7 10 8 27 4 21 8 1 1616 4 70
    # 8 12 45 14 40 7 47 13 10 1377 30
    # 9 12 10 6 19 41 15 2 54 15 1447
    # Overall Statistics
    # Accuracy : 0.9114
    # 95% CI : (0.907, 0.9156)
    # No Information Rate : 0.1167
    # P-Value [Acc > NIR] : < 2.2e-16

    sample.train.reduced <- dimReduce(DIGIT_SAMPLE_TRAIN, 40, supplemental.cols=c("ID","label"))
    sample.test.reduced <- dimReduceApply(DIGIT_SAMPLE_TEST, 40, supplemental.cols=c("ID","label"))

    m2.svm <- ore.odmSVM(label~.-ID,
    DIGIT_SAMPLE_TRAIN_REDUCED, type="classification")
    pred2.svm <- predict(m2.svm, DIGIT_SAMPLE_TEST_REDUCED,
    cm <- with(pred2.svm, table(label,PREDICTION))
    # Confusion Matrix and Statistics
    # label 0 1 2 3 4 5 6 7 8 9
    # 0 1652 0 3 3 2 7 4 1 3 1
    # 1 0 1887 8 2 2 1 1 3 3 2
    # 2 3 4 1526 11 20 3 7 21 27 7
    # 3 0 3 29 1595 3 38 4 16 34 12
    # 4 0 4 8 0 1555 2 11 5 9 51
    # 5 5 6 2 31 6 1464 13 6 10 16
    # 6 2 1 5 0 5 18 1627 0 5 0
    # 7 2 6 22 7 10 2 0 1666 8 46
    # 8 3 9 9 34 7 21 9 7 1483 13
    # 9 5 2 8 17 30 10 3 31 20 1495
    # Overall Statistics
    # Accuracy : 0.9494
    # 95% CI : (0.946, 0.9527)
    # No Information Rate : 0.1144
    # P-Value [Acc > NIR] : < 2.2e-16

    # CASE 2 with Random Forest
    m2.rf <- ore.randomForest(label~.-ID, DIGIT_SAMPLE_TRAIN,ntree=25)
    pred2.rf <- predict(m2.rf, DIGIT_SAMPLE_TEST, supplemental.cols=c("label"),type="response")
    cm <- with(pred2.rf, table(label,prediction))
    # Confusion Matrix and Statistics
    # prediction
    # label 0 1 2 3 4 5 6 7 8 9
    # 0 1655 0 1 1 2 0 7 0 9 1
    # 1 0 1876 12 8 2 1 1 2 6 1
    # 2 7 4 1552 14 10 2 5 22 10 3
    # 3 9 5 33 1604 1 21 4 16 27 14
    # 4 1 4 3 0 1577 1 9 3 3 44
    # 5 9 6 2 46 3 1455 18 1 9 10
    # 6 13 2 3 0 6 14 1621 0 3 1
    # 7 1 6 31 5 16 3 0 1675 3 29
    # 8 3 7 15 31 11 20 8 4 1476 20
    # 9 9 2 7 23 32 5 1 15 12 1515
    # Overall Statistics
    # Accuracy : 0.9527
    # 95% CI : (0.9494, 0.9559)
    # No Information Rate : 0.1138
    # P-Value [Acc > NIR] : < 2.2e-16

    m1.rf <- ore.randomForest(label~.-ID, DIGIT_SAMPLE_TRAIN_REDUCED,ntree=25)
    pred1.rf <- predict(m1.rf, DIGIT_SAMPLE_TEST_REDUCED,
    cm <- with(pred1.rf, table(label,prediction))
    # Confusion Matrix and Statistics
    # prediction
    # label 0 1 2 3 4 5 6 7 8 9
    # 0 1630 0 4 5 2 8 16 3 5 3
    # 1 0 1874 17 4 0 5 2 2 4 1
    # 2 15 2 1528 17 10 5 10 21 16 5
    # 3 7 1 32 1601 4 25 10 8 34 12
    # 4 2 6 6 3 1543 2 17 4 4 58
    # 5 9 1 5 45 12 1443 11 3 15 15
    # 6 21 3 8 0 5 15 1604 0 7 0
    # 7 5 11 33 7 17 6 1 1649 2 38
    # 8 5 13 27 57 14 27 9 12 1404 27
    # 9 10 2 6 22 52 8 5 41 12 1463
    # Overall Statistics
    # Accuracy : 0.9368
    # 95% CI : (0.9331, 0.9405)
    # No Information Rate : 0.1139
    # P-Value [Acc > NIR] : < 2.2e-16

    Execution Times

    The following numbers reflect the execution times for select operations of the above script. Hardware was a Lenovo Thinkpad with Intel i5 processor and 16 GB RAM.

    Tuesday Jan 12, 2016

    Learn, Share, and Network! Join us at BIWA Summit, Oracle HQ, January 26-28

    Join us at BIWA Summit held at Oracle Headquarters to learn about the latest in Oracle technology, customer experiences, and best practices, while sharing your experiences with colleagues, and networking with technology experts. BIWA Summit 2016, the Oracle Big Data + Analytics User Group Conference is joining forces with the NoCOUG SIG’s YesSQL Summit, Spatial SIG’s Spatial Summit and DWGL for the biggest BIWA Summit ever.  Check out the BIWA Summit’16 agenda.

    The BIWA Summit 2016 sessions and hands-on-labs are excellent opportunities for attendees to learn about Advanced Analytics / Predictive Analytics, R, Spatial Geo-location, Graph/Social Network Analysis, Big Data Appliance and Hadoop, Cloud, Big Data Discovery, OBIEE & Business Intelligence, SQL Patterns, SQL Statistical Functions, and more!
    Check out these R technology-related sessions:

    • Hands-on lab with Oracle R Enterprise "Scaling R to New Heights with Oracle Database"

    • Oracle R Enterprise 1.5 - Hot new features!

    • Large Scale Machine Learning with Big Data SQL, Hadoop and Spark

    • Improving Predictive Model Development Time with R and Oracle Big Data Discovery

    • Fiserv Case Study: Using Oracle Advanced Analytics for Fraud Detection in Online Payments

    • Machine Learning on Streaming Data via Integration of Oracle R Enterprise and Oracle Stream Explorer

    • Fault Detection using Advanced Analytics at CERN's Large Hadron Collider: Too Hot or Too Cold

    • Stubhub and Oracle Advanced Analytics

    • ...and more

    Monday Jan 04, 2016

    ORE Random Forest

    Random Forest is a popular ensemble learning technique for classification and regression, developed by Leo Breiman and Adele Cutler. By combining the ideas of “bagging” and random selection of variables, the algorithm produces a collection of decision trees with controlled variance, while avoiding overfitting – a common problem for decision trees. By constructing many trees, classification predictions are made by selecting the mode of classes predicted, while regression predictions are computed using the mean from the individual tree predictions.

    Although the Random Forest algorithm provides high accuracy, performance and scalability can be issues for larger data sets. Oracle R Enterprise 1.5 introduces Random Forest for classification with three enhancements:

      •  ore.randomForest uses the ore.frame proxy for database tables so that data remain in the database server
      •  ore.randomForest executes in parallel for model building and scoring while using Oracle R Distribution or R’s randomForest package 4.6-10
      •  randomForest in Oracle R Distribution significantly reduces memory requirements of R’s algorithm, providing only the functionality required for use by ore.randomForest


    Consider the model build performance of randomForest for 500 trees (the default) and three data set sizes (10K, 100K, and 1M rows). The formula is


    using samples of the popular ONTIME domestic flight dataset.

    With ORE’s parallel, distributed implementation, ore.randomForest is an order of magnitude faster than the commonly used randomForest package. While the first plot uses the original execution times, the second uses a log scale to facilitate interpreting scalability.

    Memory vs. Speed
    is designed for speed, relying on ORE embedded R execution for parallelism to achieve the order of magnitude speedup. However, the data set is loaded into memory for each parallel R engine, so high degrees of parallelism (DOP) will result in the corresponding use of memory. Since Oracle R Distribution’s randomForest improves memory usage over R's randomForest (approximately 7X less), larger data sets can be accommodated. Users can specify the DOP using the ore.parallel global option.


    The ore.randomForest API:

    ore.randomForest(formula, data, ntree=500, mtry = NULL,
                    replace = TRUE, classwt = NULL, cutoff = NULL,
                    sampsize = if(replace) nrow(data) else ceiling(0.632*nrow(data)),
                    nodesize = 1L, maxnodes = NULL, confusion.matrix = FALSE,
                    na.action = na.fail, ...)

    To highlight two of the arguments, confusion_matrix is a logical value indicating whether to calculate the confusion matrix. Note that this confusion matrix is not based on OOB (out-of-bag), it is the result of applying the built random forest model to the entire training data.

    Argument groups is the number of tree groups that the total number of trees are divided into during model build. The default is equal to the value of the option 'ore.parallel'. If system memory is limited, it is recommended to set this argument to a value large enough so that the number of trees in each group is small to avoid exceeding memory availability.

    Scoring with ore.randomForest follows other ORE scoring functions:

    predict(object, newdata,
            type = c("response", "prob", "vote", "all"),
            norm.votes = TRUE,
            supplemental.cols = NULL,
            cache.model = TRUE, ...)

    The arguments include:

      •  type: scoring output content – 'response', 'prob', 'votes', or 'all'. Corresponding to predicted values, matrix of class probabilities, matrix of vote counts, or both the vote matrix and predicted values, respectively.
      •  norm.votes: a logical value indicating whether the vote counts in the output vote matrix should be normalized. The argument is ignored if 'type' is 'response' or 'prob'.
      •  supplemental.cols: additional columns from the 'newdata' data set to include in the prediction result. This can be particularly useful for including a key column that can be related back to the original data set.
        cache.model: a logical value indicating whether the entire random forest model is cached in memory during prediction. While the default is TRUE, setting it to FALSE may be beneficial if memory is an issue.


    df <- df[complete.cases(df),]
    mod <- ore.randomForest(DAYOFWEEK~DEPDELAY+DISTANCE+UNIQUECARRIER+DAYOFMONTH+MONTH,                 df, ntree=100,groups=20)
    ans <- predict(mod, df, type="all", supplemental.cols="DAYOFWEEK")

    R> options(ore.parallel=8)
    R> df <- dd[complete.cases(dd),]
    +                 df, ntree=100,groups=20)
    R> ans <- predict(mod, df, type="all", supplemental.cols="DAYOFWEEK")

    R> head(ans)
         1    2    3    4    5    6    7 prediction DAYOFWEEK
    1 0.09 0.01 0.06 0.04 0.70 0.05 0.05 5          5
    2 0.06 0.01 0.02 0.03 0.01 0.38 0.49 7          6
    3 0.11 0.03 0.16 0.02 0.06 0.57 0.05 6          6
    4 0.09 0.04 0.15 0.03 0.02 0.62 0.05 6          6
    5 0.04 0.04 0.04 0.01 0.06 0.72 0.09 6          6
    6 0.35 0.11 0.14 0.27 0.05 0.08 0.00 1          1

    Thursday Dec 24, 2015

    Oracle R Enterprise 1.5 Released

    We’re pleased to announce that Oracle R Enterprise (ORE) 1.5 is now available for download on all supported platforms with Oracle R Distribution 3.2.0 / R-3.2.0. ORE 1.5 introduces parallel distributed implementations of Random Forest, Singular Value Decomposition (SVD), and Principal Component Analysis (PCA) that operate on ore.frame objects. Performance enhancements are included for ore.summary summary statistics.

    In addition, ORE 1.5 enhances embedded R execution with CLOB/BLOB data column support to enable larger text and non-character data to be transferred between Oracle Database and R. CLOB/BLOB support is also enabled for functions ore.push and ore.pull. The ORE R Script Repository now supports finer grained R script access control across schemas. Similarly, the ORE Datastore enables finer grained R object access control across schemas. For ore.groupApply in embedded R execution, ORE 1.5 now supports multi-column partitioning of data using the INDEX argument. Multiple bug fixes are also included in this release.

    Here are the highlights for the new and upgraded features in ORE 1.5:

    Upgraded R version compatibility

    ORE 1.5 is certified with R-3.2.0 - both open source R and Oracle R Distribution. See the server support matrix for the complete list of supported R versions. R-3.2.0 brings improved performance and big in-memory data objects, and compatibility with more than 7000 community-contributed R packages.
    For supporting packages, ORE 1.5 includes one new package, randomForest, with upgrades to other packages:

    arules 1.1-9
    cairo 1.5-8
    DBI 0.3-1
    png 0.1-7
    ROracle 1.2-1
    statmod 1.4.21
    randomForest 4.6-10

    Parallel and distributed algorithms

    While the Random Forest algorithm provides high accuracy, performance and scalability can be issues for large data sets. ORE 1.5 introduces Random Forest in Oracle R Distribution with two enhancements: first, a revision to reduce memory requirements of the open source randomForest algorithm; and second, the function ore.randomForest that executes in parallel for model building and scoring while using the underlying randomForest function either from Oracle R Distribution or R’s randomForest package 4.6-10. ore.randomForest uses ore.frame objects allowing data to remain in the database server.

    The functions svd and prcomp have been overloaded to execute in parallel and accept ore.frame objects. Users now get in-database execution of this functionality to improve scalability and performance – no data movement.

    Performance enhancements

    ore.summary performance enhancements supports executions that are 30x faster than previous releases.

    Capability enhancements

    ore.grant and ore.revoke functions enable users to grant other users read access to their R scripts in the R script repository and individual datastores.

    The database data types CLOB and BLOB are now supported for embedded R execution invocations input and output, as well as for the functions ore.pull and ore.push.

    For embedded R execution ore.groupApply, users can now specify multiple columns for automatically partitioning data via the INDEX argument.

    For a complete list of new features, see the Oracle R Enterprise User's Guide. To learn more about Oracle R Enterprise, visit Oracle R Enterprise on Oracle's Technology Network, or review the variety of use cases on the Oracle R Technologies blog.

    Thursday Nov 19, 2015

    Using RStudio Shiny with ORE for interactive analysis and visualization - Part I

    Shiny, by RStudio, is a popular web application framework for R. It can be used, for example, for building flexible interactive analyses and visualization solutions without requiring web development skills and knowledge of Javascript, HTML, CSS, etc. An overview of it's capabilities with numerous examples is available on RStudio's Shiny web site.

    In this blog we illustrate a simple Shiny application for processing and visualizing data stored in Oracle Database for the special case where this data is wide and shallow. In this use case, analysts wish to interactively compute and visualize correlations between many variables (up to 40k) for genomics-related data where the number of observations is small, typically around 1000 cases. A similar use case was discussed in a recent blog Consolidating wide and shallow data with ORE Datastore, which addressed the performance aspects of reading and saving wide data from/to an Oracle R Enterprise (ORE) datastore. ORE allows users to store any type of R objects, including wide data.frames, directly in an Oracle Database using ORE datastores.

    Our shiny application, invoked at the top level via the shinyApp() command below, has two components: a user-interface (ui) definition myui and a server function myserver.

    R> library("shiny")
    R> shinyApp(ui = myui, server = myserver)

    The functionality is very basic. The user specifies the dataset to be analyzed, the sets of variables to correlate against each other, the correlation method (Pearson, Spearman,etc) and the treatment of missing data as handled by the 'method' and, respectively, the 'use' arguments of R's cor() function. The datasets are all wide (ncols > 1000) and have been already saved in an ORE datastore.

    A partial code for the myui user interface is below. The sidebarPanel section handles the input and the correlation values are displayed graphically by plotOutput() within the mainPanel section. The argument 'corrPlot' corresponds to the function invoked by the server() component.

    R> myui <- fluidPage(...,   
                      label   = "Data Sets",
                      choices = c(...,...,...),                   
                      selected = (...),      
                       label = "Correlation Type",
                       choices = list("Pearson"="pearson",
                       selected = "pearson"),
                      label   = "Use",
                      choices = c("everything","all.obs","complete.obs",

                      selected = "everything"),      
          textInput("yvars",label=...,value=...) ,
        mainPanel( plotOutput("corrPlot") )       

    The server component consists of two functions :

  • The server function myserver(), passed as argument during the top level invocation of shinyApp(). myserver returns the output$corrPlot object (see the ui component) generated by shiny's rendering function renderPlot(). The plot object p is generated within renderPlot() by calling ore.doEval() for the embedded R execution of gener_corr_plot(). The ui input selections are passed to gener_corr_plot() through ore.doEval().

  • R> myserver <- function(input, output) {   
      output$corrPlot <- renderPlot({
        p<- ore.pull(ore.doEval(

  • A core function gener_corr_plot(), which combines loading data from the ORE datastore, invoking the R cor() function with all arguments specified in the ui and generating the plot for the resulting correlation values.

  • R> gener_corr_plot <- function(TblName,corrtyp="pearson",use="everything",
                                xvars=...,yvars=...,ds.name=...) {
           res <- cor(get(TblName)[,filtering_based_on_xvars],
           p <- qplot(y=c(res)....)

    The result of invoking ShinyApp() is illustrated in the figure below. The data picked for this display is DOROTHEA, a drug discovery dataset from the UCI Machine Learning Repository and the plot shows the Pearson correlation between the variable 1 and the first 2000 variables of the dataset. The variables are labeled here generically, by their index number.

    For the type of data considered by the customer (wide tables with 10k-40k columns and ~ 1k rows) a complete iteration (data loading, correlation calculations between one variable and all others & plot rendering) takes from a few seconds up to a few dozen seconds, depending on the correlation method and the handling of missing values. This is considerably faster than the many minutes reported by customers running their code in memory with data loaded from CSV files.

    Several questions may come to mind when thinking about such Shiny-based applications. For example:

  • When and how to decouple the analysis from data reading ?

  • How to build chains of reactive components for more complex applications ?

  • What is an efficient rendering mechanism for multiple plotting methods and increasing data sizes ?

  • What other options for handling data with more than 1000 columns exist ? (nested tables ?)

  • We will address these in future posts. In this Part 1 we illustrated that it is easy to construct a Shiny-based interactive application for wide data by leveraging ORE's datastores capability and support for embedded R execution. Besides improved performance, this solution offers the security, auditing, backup and recovery capabilities of Oracle Database.

    Oracle R Distribution 3.2.0 Benchmarks

    We recently updated the Oracle R Distribution (ORD) benchmarks on ORD version 3.2.0. ORD is Oracle's free distribution of the open source R environment that adds support for dynamically loading the Intel Math Kernel Library (MKL) installed on your system. MKL provides faster performance by taking advantage of hardware-specific math library implementations

    The benchmark results demonstrate the performance of Oracle R Distribution 3.2.0 with and without dynamically loaded MKL. We executed the community-based R-Benchmark-25 script, which consists of a set of tests that benefit from faster matrix computations. The tests were run on a 24 core Linux system with 3.07 GHz per CPU and 47 GB RAM. We previously executed the same scripts against ORD 2.15.1 and ORD 3.0.1 on similar hardware.

      Oracle R Distribution 3.2.0 Benchmarks (Time in seconds)

    Speedup = (Slower time / Faster Time) - 1

    The first graph focuses on shorter running tests.  We see significant performance improvements - SVD with ORD + MKL executes 20 times faster using 4 threads, and 30 times faster using 8 threads. For Cholesky Factorization, ORD + MKL is 15 and 27 times faster for 4 and 8 threads, respectively.

    In the second graph,we focus on the longer running tests. Principal components analysis is 30 and almost 38 times faster for ORD + MKL with 4 and 8 threads, respectively, matrix multiplication is 80 and 139 times faster, and Linear discriminant analysis is almost 4 times faster. 

    By using Oracle R Distribution with MKL, you will see a notable performance boost for many R applications. These improvements happened with the exact same R code, without requiring any linking steps or updating any packages. Oracle R Distribution for R-3.2.0 is available for download on Oracle's Open Source Software portal. Oracle offers support for users of Oracle R Distribution on Windows, Linux, AIX and Solaris 64 bit platforms.

    Monday Nov 16, 2015

    BIWA Summit 2016 - Oracle Big Data + Analytics User Group Conference

    BIWA Summit 2016, the Oracle Big Data + Analytics User Group Conference is joining forces with the NoCOUG SIG’s YesSQL Summit, Spatial SIG’s Spatial Summit and DWGL for the biggest BIWA Summit ever.  Check out the BIWA Summit’16 agenda so far.

    The BIWA Summit’16 sessions and hands-on-labs are excellent opportunities for attendees to learn about Advanced Analytics/Predictive Analytics, R, Spatial Geo-location, Graph/Social Network Analysis, Big Data Appliance and Hadoop, Cloud, Big Data Discovery, OBIEE & Business Intelligence, SQL Patterns, SQL Statistical Functions, and more!

    See you at BIWA’16!

    Friday Oct 23, 2015

    Oracle R Enterprise Performance on Intel® DC P3700 Series SSDs

    Solid-state drives (SSDs) are becoming increasingly popular in enterprise storage systems, providing large caches, permanent storage and low latency. A recent study aimed to characterize the performance of Oracle R Enterprise workloads on the Intel® P3700 SSD versus hard disk drives (HDDs), with IO-WAIT as the key metric of interest. The study showed that Intel® DC P3700 Series SSDs reduced I/O latency for Oracle R Enterprise workloads, most notably when saving objects to Oracle R Enterprise datastore and materializing scoring results.

    The test environment was a 2-node Red Hat Linux 6.5 cluster, each node containing a 2TB
    Intel® DC P3700 Series SSD with attached 2 TB HDD. As the primary objective was to identify applications that could benefit from SSDs as-is, test results showed the performance gain without systems modification or tuning. The tests for the study consisted of a set of single analytic workloads composed of I/O, computational, and memory intensive tests. The tests were run both serially and in parallel up to 100 jobs, which are a good representation of typical workloads for Oracle R Enterprise customers.  

    The figures below show execution time results for datastore save and load, and materializing model scoring results to a database table. The datastore performance is notably improved for the larger test sets (100 million and 1 billion rows). 
    For these workloads, execution time was reduced by an average of 46% with a maximum of 67% compared to the HDD attached to the same cluster. 

    Figure 1: Saving and loading objects to and from the ORE datastore, HDD vs. Intel® P3700 Series SSD

    Figure 2: Model scoring and materializing results, HDD vs. Intel® P3700 Series SSD

    The entire set of test results shows that
    Intel® DC P3700 Series SSDs provides an average reduction of 30% with a maximum of 67% in execution time for I/O heavy Oracle R Enterprise workloads. These results could potentially be improved by working with hardware engineers to tune the host kernel and other settings to optimize SSD performance. Intel® DC P3700 Series SSDs can increase storage I/O by reducing latency and increasing throughput, and it is recommended to explore system tuning options with your engineering team to achieve the best result. 

    Thursday Sep 10, 2015

    Consolidating wide and shallow data with ORE Datastore

    Clinical trial data are often characterized by a relatively small set of participants (100s or 1000s) while the data collected and analyzed on each may be significantly larger (1000s or 10,000s). Genomic data alone can easily reach the higher end of this range. In talking with industry leaders, one of the problems pharmaceutical companies and research hospitals encounter is effectively managing such data. Storing data in flat files on myriad servers, perhaps even “closeted” when no longer actively needed, poses problems for data accessibility, backup, recovery, and security. While Oracle Database provides support for wide data using nested tables in a number of contexts, to take advantage of R native functions that handle wide data using data.frames, Oracle R Enterprise allows you to store wide data.frames directly in Oracle Database using Oracle R Enterprise datastores.

    With Oracle R Enterprise (ORE), a component of the Oracle Advanced Analytics option, the ORE datastore supports storing arbitrary R objects, including data.frames, in Oracle Database. In particular, users can load wide data from a file into R and store the resulting data.frame directly the R datastore. From there, users can repeatedly load the data at much faster speeds than they can from flat files.

    The following benchmark results illustrate the performance of saving and loading data.frames of various dimensions. These tests were performed on an Oracle Exadata 5-2 half rack, ORE 1.4.1, ROracle 1.2-1, and R 3.2.0. Logging is turned off on the datastore table (see performance tip below). The data.frame consists of numeric data.

    Comparing Alternatives

    When it comes to accessing data and saving data for use with R, there are several options, including: CSV file, .Rdata file, and the ORE datastore. Each comes with its own advantages.


    “Comma separated value” or CSV files are generally portable, provide a common representation for exporting/importing data, and can be readily loaded into a range of applications. However, flat files need to be managed and often have inadequate security, auditing, backup, and recovery. As we’ll see, CSV files provide significantly slower read and write times compared to .Rdata and ORE datastore.


    R’s native .Rdata flat file representation is generally efficient for reading/writing R objects since the objects are in serialized form, i.e., not converted to a textual representation as CSV data are. However, .Rdata flat files also need to be managed and often have inadequate security, auditing, backup, and recovery. While faster than CSV read and write times, .Rdata is slower than ORE datastore. Being an R-specific format, access is limited to the R environment, which may or may not be a concern.

    ORE Datastore

    ORE’s datastore capability allows users to organize and manage all data in a single location – the Oracle Database. This centralized repository provides Oracle Database quality security, auditing, backup, and recovery. The ORE datastore, as you’ll see below, provides read and write performance that is significantly better than CSV and .Rdata. Of course, as with .Rdata being accessed through R, accessing the datastore is through Oracle Database.

    Let’s look at a few benchmark comparisons.

    First, consider the execution time for loading data using each of these approaches. For 2000 columns, we see that ore.load() is 124X faster than read.csv(), and over 3 times faster than R’s load() function for 5000 rows. At 20,000 rows, ore.load() is 198X faster than read.csv() and almost 4 times faster than load().

    Considering the time to save data, ore.save() is over 11X faster than write.csv() and over 8X faster than save() at 2000 rows, with that benefit continuing through 20000 rows.

    Looking at this across even wider data.frames, e.g., adding results for 4000 and 16000 columns, we see a similar performance benefit for the ORE datastore over save/load and write.csv/read.csv.

    If you are looking to consolidate data while gaining performance benefits along with security, backup, and recovery, the Oracle R Enterprise datastore may be a preferred choice.

    Example using ORE Datastore

    The ORE datastore functions ore.save() and ore.load() are similar to the corresponding R save() and load() functions.

    In the following example, we read a CSV data file, save it in the ORE datastore using ore.save() and associated it with the name “MyDatastore”. Although not shown, multiple objects can be listed in the initial arguments. Note that any R objects can be included here, not just data.frames.

    From there, we list the contents of the datastore and see that “MyDatastore” is listed with the number of objects stored and the overall size. Next we can ask for a summary of the contents of “MyDatastore”, which includes the data.frame ‘dat’.

    Next we remove ‘dat’ and load the contents of the datastore, reconstituting ‘dat’ as a usable data.frame object. Lastly, we delete the datastore and see that the ORE datastore is empty.

    > dat <- read.csv("df.dat")
    > dim(dat)
    [1] 300 2000
    > ore.save(dat, name="MyDatastore")
    > ore.datastore()
    datastore.name object.count size creation.date description
    1 MyDatastore 1 4841036 2015-09-01 12:07:38
    > ore.datastoreSummary("MyDatastore")
    object.name class size length row.count col.count
    1 dat data.frame 4841036 2000 300 2000
    > rm(dat)
    > ore.load("MyDatastore")
    [1] "dat"
    > ore.delete("MyDatastore")
    [1] "MyDatastore"
    > ore.datastore()
    [1] datastore.name object.count size creation.date description
    <0 rows> (or 0-length row.names)

    Performance Tip

    The performance of saving R objects to the datastore can be increased by temporarily turning off logging on the table that serves as the datastore in the user’s schema: RQ$DATASTOREINVENTORY. This can be accomplished using the following SQL, which can also be invoked from R:


    ORE> ore.exec(“alter table RQ$DATASTOREINVENTORY NOLOGGING”)

    While turning off logging speeds up inserts and index creation, it avoids writing the redo log and as such has implications for database recovery. It can be used in combination with explicit backups before and after loading data.

    Thursday Aug 06, 2015

    Oracle R Advanced Analytics for Hadoop on the Fast Lane: Spark-based Logistic Regression and MLP Neural Networks

    This is the first in a series of blogs that is going to explore the capabilities of the newly released Oracle R Advanced Analytics for Hadoop 2.5.0, part of Oracle Big Data Connectors, which includes two new algorithm implementations that can take advantage of an Apache Spark cluster for a significant performance gains on Model Build and Scoring time. These algorithms are a redesigned version of the Multi-Layer Perceptron Neural Networks (orch.neural) and a brand new implementation of a Logistic Regression model (orch.glm2).

    Through large scale benchmarks we are going to see the improvements in performance that the new custom algorithms bring to enterprise Data Science when running on top of a Hadoop Cluster with an available Apache Spark infrastructure.

    In this first part, we are going to compare only model build performance and feasibility of the new algorithms against the same algorithms running on Map-Reduce, and we are not going to be concerned with model quality or precision. Model scoring, quality and precision are going to be part of a future Blog.

    The Documentation on the new Components can be found on the product itself (with help and sample code), and also on the ORAAH 2.5.0 Documentation Tab on OTN.

    Hardware and Software used for testing

    As a test Bed, we are using an Oracle Big Data Appliance X3-2 cluster with 6-nodes.  Each node consists of two Intel® Xeon® 6-core X5675 (3.07 GHz), for a total of 12 cores (24 threads), and 96 GB of RAM per node is available.

    The BDA nodes run Oracle Enterprise Linux release 6.5, Cloudera Hadoop Distribution 5.3.0 (that includes Apache Spark release 1.2.0). 

    Each node also is running Oracle R Distribution release 3.1.1 and Oracle R Advanced Analytics for Hadoop release 2.5.0.

    Dataset used for Testing

    For the test we are going to use a classic Dataset that consists of Arrival and Departure information of all major Airports in the USA.  The data is available online in different formats.  The most used one contains 123 million records and has been used for many benchmarks, originally cited by the American Statistical Association for their Data Expo 2009.  We have augmented the data available in that file by downloading additional months of data from the official Bureau of Transportation Statistics website.  Our starting point is going to be this new dataset that contains 159 million records and has information up to September 2014.

    For smaller tests, we created a simple subset of this dataset of 1, 10 and 100 million records.  We also created a 1 billion-record dataset by appending the 159 million-record data over and over until we reached 1 billion records.

    Connecting to a Spark Cluster

    In release 2.5.0 we are introducing a new set of R commands that will allow the Data Scientist to request the creation of a Spark Context, in either YARN or Standalone modes.

    For this release, the Spark Context is exclusively used for accelerating the creation of Levels and Factor variables, the Model Matrix, the final solution to the Logistic and Neural Networks models themselves, and Scoring (in the case of GLM).

    The new commands are highlighted below:
    spark.connect(master, name = NULL, memory = NULL,  dfs.namenode = NULL)

    spark.connect() requires loading the ORCH library first to read the configuration of the Hadoop Cluster.

    The “master” variable can be specified as either “yarn-client” to use YARN for Resource Allocation, or the direct reference to a Spark Master service and port, in which case it will use Spark in Standalone Mode. 
    The “name” variable is optional, and it helps centralized logging of the Session on the Spark Master.  By default, the Application name showing on the Spark Master is “ORCH”.
    The “memory” field indicates the amount of memory per Spark Worker to dedicate to this Spark Context.

    Finally, the dfs.namenode points to the Apache HDFS Namenode Server, in order to exchange information with HDFS.

    In summary, to establish a Spark connection, one could do:
    > spark.connect("yarn-client", memory="2g", dfs.namenode=”my.namenode.server.com")
    Conversely, to disconnect the Session after the work is done, you can use spark.disconnect() without options.

    The command spark.connected() checks the status of the current Session and contains the information of the connection to the Server. It is automatically called by the new algorithms to check for a valid connection.

    ORAAH 2.5.0 introduces support for loading data to Spark cache from an HDFS file via the function hdfs.toRDD(). ORAAH dfs.id objects were also extended to support both data residing in HDFS and in Spark memory, and allow the user to cache the HDFS data to an RDD object for use with the new algorithms.

    For all the examples used in this Blog, we used the following command in the R Session:

    Oracle Distribution of R version 3.1.1 (--) -- "Sock it to Me"
    Copyright (C) The R Foundation for Statistical Computing
    Platform: x86_64-unknown-linux-gnu (64-bit)

    R is free software and comes with ABSOLUTELY NO WARRANTY.
    You are welcome to redistribute it under certain conditions.
    Type 'license()' or 'licence()' for distribution details.

    R is a collaborative project with many contributors.
    Type 'contributors()' for more information and
    'citation()' on how to cite R or R packages in publications.

    Type 'demo()' for some demos, 'help()' for on-line help, or
    'help.start()' for an HTML browser interface to help.
    Type 'q()' to quit R.

    You are using Oracle's distribution of R. Please contact
    Oracle Support for any problems you encounter with this

    [Workspace loaded from ~/.RData]

    > library(ORCH)
    Loading required package: OREbase

    Attaching package: ‘OREbase’

    The following objects are masked from ‘package:base’:

    cbind, data.frame, eval, interaction, order, paste, pmax, pmin, rbind, table

    Loading required package: OREstats
    Loading required package: MASS
    Loading required package: ORCHcore
    Loading required package: rJava
    Oracle R Connector for Hadoop 2.5.0 (rev. 307)
    Info: using native C base64 encoding implementation
    Info: Hadoop distribution is Cloudera's CDH v5.3.0
    Info: using auto-detected ORCH HAL v4.2
    Info: HDFS workdir is set to "/user/oracle"
    Warning: mapReduce checks are skipped due to "ORCH_MAPRED_CHECK"=FALSE
    Warning: HDFS checks are skipped due to "ORCH_HDFS_CHECK"=FALSE
    Info: Hadoop 2.5.0-cdh5.3.0 is up
    Info: Sqoop 1.4.5-cdh5.3.0 is up
    Info: OLH 3.3.0 is up
    Info: loaded ORCH core Java library "orch-core-2.5.0-mr2.jar"
    Loading required package: ORCHstats
    > # Spark Context Creation
    > spark.connect(master="spark://my.spark.server:7077", memory="24G",dfs.namenode="my.dfs.namenode.server")

    In this case, we are requesting the usage of 24 Gb of RAM per node in Standalone Mode. Since our BDA has 6 nodes, the total RAM assigned to our Spark Context is 144 GB, which can be verified in the Spark Master screen shown below.

    GLM – Logistic Regression

    In this release, because of a totally overhauled computation engine, we created a new function called orch.glm2() that is going to execute exclusively the Logistic Regression model using Apache Spark as platform.  The input data expected by the algorithm is an ORAAH dfs.id object, which means an HDFS CSV dataset, a HIVE Table that was made compatible by using the hdfs.fromHive() command, or HDFS CSV dataset that has been cached into Apache Spark as an RDD object using the command hdfs.toRDD().

    A simple example of the new algorithm running on the ONTIME dataset with 1 billion records is shown below. The objective of the Test Model is the prediction of Cancelled Flights. The new model requires the indication of a Factor variable as an F() in the formula, and the default (and only family available in this release) is the binomial().

    The R code and the output below assumes that the connection to the Spark Cluster is already done.

    > # Attaches the HDFS file for use within R
    > ont1bi <- hdfs.attach("/user/oracle/ontime_1bi")
    > # Checks the size of the Dataset
    > hdfs.dim(ont1bi)
     [1] 1000000000         30
    > # Testing the GLM Logistic Regression Model on Spark
    > # Formula definition: Cancelled flights (0 or 1) based on other attributes

    > form_oraah_glm2 <- CANCELLED ~ DISTANCE + ORIGIN + DEST + F(YEAR) + F(MONTH) +

    > # ORAAH GLM2 Computation from HDFS data (computing factor levels on its own)

    > system.time(m_spark_glm <- orch.glm2(formula=form_oraah_glm2, ont1bi))
     ORCH GLM: processed 6 factor variables, 25.806 sec
     ORCH GLM: created model matrix, 100128 partitions, 32.871 sec
     ORCH GLM: iter  1,  deviance   1.38433414089348300E+09,  elapsed time 9.582 sec
     ORCH GLM: iter  2,  deviance   3.39315388583931150E+08,  elapsed time 9.213 sec
     ORCH GLM: iter  3,  deviance   2.06855738812683250E+08,  elapsed time 9.218 sec
     ORCH GLM: iter  4,  deviance   1.75868100359263200E+08,  elapsed time 9.104 sec
     ORCH GLM: iter  5,  deviance   1.70023181759611580E+08,  elapsed time 9.132 sec
     ORCH GLM: iter  6,  deviance   1.69476890425481350E+08,  elapsed time 9.124 sec
     ORCH GLM: iter  7,  deviance   1.69467586045954760E+08,  elapsed time 9.077 sec
     ORCH GLM: iter  8,  deviance   1.69467574351380850E+08,  elapsed time 9.164 sec
    user  system elapsed
    84.107   5.606 143.591 

    > # Shows the general features of the GLM Model
    > summary(m_spark_glm)
                   Length Class  Mode   
    coefficients   846    -none- numeric
    deviance         1    -none- numeric
    solutionStatus   1    -none- character
    nIterations      1    -none- numeric
    formula          1    -none- character
    factorLevels     6    -none- list  

    A sample benchmark against the same models running on Map-Reduce are illustrated below.  The Map-Reduce models used the call orch.glm(formula, dfs.id, family=(binomial()), and used as.factor() in the formula.

    We can see that the Spark-based GLM2 is capable of a large performance advantage over the model executing in Map-Reduce.

    Later in this Blog we are going to see the performance of the Spark-based GLM Logistic Regression on 1 billion records.

    Linear Model with Neural Networks

    For the MLP Neural Networks model, the same algorithm was adapted to execute using the Spark Caching.  The exact same code and function call will recognize if there is a connection to a Spark Context, and if so, will execute the computations using it.

    In this case, the code for both the Map-Reduce and the Spark-based executions is exactly the same, with the exception of the spark.connect() call that is required for the Spark-based version to kick in.

    The objective of the Test Model is the prediction of Arrival Delays of Flights in minutes, so the model class is a Regression Model. The R code used to run the benchmarks and the output is below, and it assumes that the connection to the Spark Cluster is already done.

    > # Attaches the HDFS file for use within R
    > ont1bi <- hdfs.attach("/user/oracle/ontime_1bi")

    > # Checks the size of the Dataset
    > hdfs.dim(ont1bi)
     [1] 1000000000         30

    > # Testing Neural Model on Spark
    > # Formula definition: Arrival Delay based on other attributes

    > form_oraah_neu <- ARRDELAY ~ DISTANCE + ORIGIN + DEST + as.factor(MONTH) +
    +   as.factor(YEAR) + as.factor(DAYOFMONTH) + as.factor(DAYOFWEEK)

    > # Compute Factor Levels from HDFS data
    > system.time(xlev <- orch.getXlevels(form_oraah_neu, dfs.dat = ont1bi))
        user  system elapsed
    17.717   1.348  50.495

    > # Compute and Cache the Model Matrix from HDFS data, passing factor levels
    > system.time(Mod_Mat <- orch.prepare.model.matrix(form_oraah_neu, dfs.dat = ont1bi,xlev=xlev))
       user  system elapsed
    17.933   1.524  95.624

    > # Compute Neural Model from RDD cached Model Matrix
    > system.time(mod_neu <- orch.neural(formula=form_oraah_neu, dfs.dat=Mod_Mat, xlev=xlev, trace=T))
    Unconstrained Nonlinear Optimization
    L-BFGS (Limited-memory Broyden-Fletcher-Goldfarb-Shanno)
    Iter           Objective Value   Grad Norm        Step   Evals
      1   5.08900838381104858E+11   2.988E+12   4.186E-16       2
      2   5.08899723803646790E+11   2.987E+12   1.000E-04       3
      3   5.08788839748061768E+11   2.958E+12   1.000E-02       3
      4   5.07751213455999573E+11   2.662E+12   1.000E-01       4
      5   5.05395855303159180E+11   1.820E+12   3.162E-01       1
      6   5.03327619811536194E+11   2.517E+09   1.000E+00       1
      7   5.03327608118144775E+11   2.517E+09   1.000E+00       6
      8   4.98952182330299011E+11   1.270E+12   1.000E+00       1
      9   4.95737805642779968E+11   1.504E+12   1.000E+00       1
     10   4.93293224063758362E+11   8.360E+11   1.000E+00       1
     11   4.92873433106044373E+11   1.989E+11   1.000E+00       1
     12   4.92843500119498352E+11   9.659E+09   1.000E+00       1
     13   4.92843044802041565E+11   6.888E+08   1.000E+00       1
    Solution status             Optimal (objMinProgress)
    Number of L-BFGS iterations 13
    Number of objective evals   27
    Objective value             4.92843e+11
    Gradient norm               6.88777e+08
       user  system elapsed
    43.635   4.186  63.319 

    > # Checks the general information of the Neural Network Model
    > mod_neu
    Number of input units      845
    Number of output units     1
    Number of hidden layers    0
    Objective value            4.928430E+11
    Solution status            Optimal (objMinProgress)
    Output layer               number of neurons 1, activation 'linear'
    Optimization solver        L-BFGS
    Scale Hessian inverse      1
    Number of L-BFGS updates   20
    > mod_neu$nObjEvaluations
    [1] 27
    > mod_neu$nWeights
    [1] 846

    A sample benchmark against the same models running on Map-Reduce are illustrated below.  The Map-Reduce models used the exact same orch.neural() calls as the Spark-based ones, with only the Spark connection as a difference.

    We can clearly see that the larger the dataset, the larger the difference in speeds of the Spark-based computation compared to the Map-Reduce ones, reducing the times from many hours to a few minutes.

    This new performance makes possible to run much larger problems and test several models on 1 billion records, something that took half a day just to run one model.

    Logistic and Deep Neural Networks with 1 billion records

    To prove that it is now feasible not only to run Logistic and Linear Model Neural Networks on large scale datasets, but also complex Multi-Layer Neural Network Models, we decided to test the same 1 billion record dataset against several different architectures.

    These tests were done to check for performance and feasibility of these types of models, and not for comparison of precision or quality, which will be part of a different Blog.

    The default activation function for all Multi-Layer Neural Network models was used, which is the bipolar sigmoid function, and also the default output activation layer was also user, which is the linear function.

    As a reminder, the number of weights we need to compute for a Neural Networks is as follows:

    The generic formulation for the number of weights to be computed is then:
    Total Number of weights = SUM of all Layers from First Hidden to the Output of [(Number of inputs into each Layer + 1) * Number of Neurons)]
    In the simple example, we had [(3 inputs + 1 bias) * 2 neurons] + [(2 neurons + 1 bias) * 1 output ] = 8 + 3 = 11 weights

    In our tests for the Simple Neural Network model (Linear Model), using the same formula, we can see that we were computing 846 weights, because it is using 845 inputs plus the Bias.

    Thus, to calculate the number of weights necessary for the Deep Multi-layer Neural Networks that we are about to Test below, we have the following:

    MLP 3 Layers (50,25,10) => [(845+1)*50]+[(50+1)*25]+[(25+1)*10]+[(10+1)*1] = 43,846 weights

    MLP 4 Layers (100,50,25,10) => [(845+1)*100]+[(100+1)*50]+[(50+1)*25]+[(25+1)*10]+[(10+1)*1] = 91,196 weights

    MLP 5 Layers (200,100,50,25,10) => [(845+1)*200]+[(200+1)*100]+[(100+1)*50]+[(50+1)*25]+ [(25+1)*10]+[(10+1)*1] = 195,896 weights

    The times required to compute the GLM Logistic Regression Model that predicts the Flight Cancellations on 1 billion records is included just as an illustration point of the performance of the new Spark-based algorithms.

    The Neural Network Models are all predicting Arrival Delay of Flights, so they are either Linear Models (the first one, with no Hidden Layers) or Non-linear Models using the bipolar sigmoid activation function (the Multi-Layer ones).

    This demonstrates that the capability of building Very Complex and Deep Networks is available with ORAAH, and it makes possible to build networks with hundreds of thousands or millions of weights for more complex problems.

    Not only that, but a Logistic Model can be computed on 1 billion records in less than 2 and a half minutes, and a Linear Neural Model in almost 3 minutes.

    The R Output Listing of the Logistic Regression computation and of the MLP Neural Networks are below.

    > # Spark Context Creation
    > spark.connect(master="spark://my.spark.server:7077", memory="24G",dfs.namenode="my.dfs.namenode.server")

    > # Attaches the HDFS file for use with ORAAH
    > ont1bi <- hdfs.attach("/user/oracle/ontime_1bi")

    > # Checks the size of the Dataset
    > hdfs.dim(ont1bi)
    [1] 1000000000         30

    GLM - Logistic Regression

    > # Testing GLM Logistic Regression on Spark
    > # Formula definition: Cancellation of Flights in relation to other attributes
    > form_oraah_glm2 <- CANCELLED ~ DISTANCE + ORIGIN + DEST + F(YEAR) + F(MONTH) +

    > # ORAAH GLM2 Computation from RDD cached data (computing factor levels on its own)
    > system.time(m_spark_glm <- orch.glm2(formula=form_oraah_glm2, ont1bi))
     ORCH GLM: processed 6 factor variables, 25.806 sec
     ORCH GLM: created model matrix, 100128 partitions, 32.871 sec
     ORCH GLM: iter  1,  deviance   1.38433414089348300E+09,  elapsed time 9.582 sec
     ORCH GLM: iter  2,  deviance   3.39315388583931150E+08,  elapsed time 9.213 sec
     ORCH GLM: iter  3,  deviance   2.06855738812683250E+08,  elapsed time 9.218 sec
     ORCH GLM: iter  4,  deviance   1.75868100359263200E+08,  elapsed time 9.104 sec
     ORCH GLM: iter  5,  deviance   1.70023181759611580E+08,  elapsed time 9.132 sec
     ORCH GLM: iter  6,  deviance   1.69476890425481350E+08,  elapsed time 9.124 sec
     ORCH GLM: iter  7,  deviance   1.69467586045954760E+08,  elapsed time 9.077 sec
     ORCH GLM: iter  8,  deviance   1.69467574351380850E+08,  elapsed time 9.164 sec
    user  system elapsed
    84.107   5.606 143.591

    > # Checks the general information of the GLM Model
    > summary(m_spark_glm)
                   Length Class  Mode   
    coefficients   846    -none- numeric
    deviance         1    -none- numeric
    solutionStatus   1    -none- character
    nIterations      1    -none- numeric
    formula          1    -none- character
    factorLevels     6    -none- list 

    Neural Networks - Initial Steps

    For the Neural Models, we have to add the times for computing the Factor Levels plus the time for creating the Model Matrix to the Total elapsed time of the Model computation itself.

    > # Testing Neural Model on Spark
    > # Formula definition
    > form_oraah_neu <- ARRDELAY ~ DISTANCE + ORIGIN + DEST + as.factor(MONTH) +
    +   as.factor(YEAR) + as.factor(DAYOFMONTH) + as.factor(DAYOFWEEK)
    > # Compute Factor Levels from HDFS data
    > system.time(xlev <- orch.getXlevels(form_oraah_neu, dfs.dat = ont1bi))
      user  system elapsed
    12.598   1.431  48.765

    > # Compute and Cache the Model Matrix from cached RDD data
    > system.time(Mod_Mat <- orch.prepare.model.matrix(form_oraah_neu, dfs.dat = ont1bi,xlev=xlev))
      user  system elapsed
      9.032   0.960  92.953

    Neural Networks Model with 3 Layers of Neurons

    > # Compute DEEP Neural Model from RDD cached Model Matrix (passing xlevels)
    > # Three Layers, with 50, 25 and 10 neurons respectively.

    > system.time(mod_neu <- orch.neural(formula=form_oraah_neu, dfs.dat=Mod_Mat,
    +                                    xlev=xlev, hiddenSizes=c(50,25,10),trace=T))
    Unconstrained Nonlinear Optimization
    L-BFGS (Limited-memory Broyden-Fletcher-Goldfarb-Shanno)
    Iter           Objective Value   Grad Norm        Step   Evals
      0   5.12100202340115967E+11   5.816E+09   1.000E+00       1
      1   4.94849165811250305E+11   2.730E+08   1.719E-10       1
      2   4.94849149028958862E+11   2.729E+08   1.000E-04       3
      3   4.94848409777413513E+11   2.702E+08   1.000E-02       3
      4   4.94841423640935242E+11   2.437E+08   1.000E-01       4
      5   4.94825372589270386E+11   1.677E+08   3.162E-01       1
      6   4.94810879175052673E+11   1.538E+07   1.000E+00       1
      7   4.94810854064597107E+11   1.431E+07   1.000E+00       1
    Solution status             Optimal (objMinProgress)
    Number of L-BFGS iterations 7
    Number of objective evals   15
    Objective value             4.94811e+11
    Gradient norm               1.43127e+07
        user   system  elapsed
      91.024    8.476 1975.947

    > # Checks the general information of the Neural Network Model
    > mod_neu
    Number of input units      845
    Number of output units     1
    Number of hidden layers    3
    Objective value            4.948109E+11
    Solution status            Optimal (objMinProgress)
    Hidden layer [1]           number of neurons 50, activation 'bSigmoid'
    Hidden layer [2]           number of neurons 25, activation 'bSigmoid'
    Hidden layer [3]           number of neurons 10, activation 'bSigmoid'
    Output layer               number of neurons 1, activation 'linear'
    Optimization solver        L-BFGS
    Scale Hessian inverse      1
    Number of L-BFGS updates   20
    > mod_neu$nObjEvaluations
    [1] 15
    > mod_neu$nWeights
    [1] 43846

    Neural Networks Model with 4 Layers of Neurons

    > # Compute DEEP Neural Model from RDD cached Model Matrix (passing xlevels)
    > # Four Layers, with 100, 50, 25 and 10 neurons respectively.

    > system.time(mod_neu <- orch.neural(formula=form_oraah_neu, dfs.dat=Mod_Mat,
    +                                    xlev=xlev, hiddenSizes=c(100,50,25,10),trace=T))
    Unconstrained Nonlinear Optimization
    L-BFGS (Limited-memory Broyden-Fletcher-Goldfarb-Shanno)
    Iter           Objective Value   Grad Norm        Step   Evals
       0   5.15274440087001343E+11   7.092E+10   1.000E+00       1
       1   5.10168177067538818E+11   2.939E+10   1.410E-11       1
       2   5.10086354184862549E+11   5.467E+09   1.000E-02       2
       3   5.10063808510261475E+11   5.463E+09   1.000E-01       4
       4   5.07663007172408386E+11   5.014E+09   3.162E-01       1
       5   4.97115989230861267E+11   2.124E+09   1.000E+00       1
       6   4.94859162124700928E+11   3.085E+08   1.000E+00       1
       7   4.94810727630636963E+11   2.117E+07   1.000E+00       1
       8   4.94810490064279846E+11   7.036E+06   1.000E+00       1
    Solution status             Optimal (objMinProgress)
    Number of L-BFGS iterations 8
    Number of objective evals   13
    Objective value             4.9481e+11
    Gradient norm               7.0363e+06
        user   system  elapsed
    166.169   19.697 6467.703
    > # Checks the general information of the Neural Network Model
    > mod_neu
    Number of input units      845
    Number of output units     1
    Number of hidden layers    4
    Objective value            4.948105E+11
    Solution status            Optimal (objMinProgress)
    Hidden layer [1]           number of neurons 100, activation 'bSigmoid'
    Hidden layer [2]           number of neurons 50, activation 'bSigmoid'
    Hidden layer [3]           number of neurons 25, activation 'bSigmoid'
    Hidden layer [4]           number of neurons 10, activation 'bSigmoid'
    Output layer               number of neurons 1, activation 'linear'
    Optimization solver        L-BFGS
    Scale Hessian inverse      1
    Number of L-BFGS updates   20
    > mod_neu$nObjEvaluations
    [1] 13
    > mod_neu$nWeights
    [1] 91196

    Neural Networks Model with 5 Layers of Neurons

    > # Compute DEEP Neural Model from RDD cached Model Matrix (passing xlevels)
    > # Five Layers, with 200, 100, 50, 25 and 10 neurons respectively.

    > system.time(mod_neu <- orch.neural(formula=form_oraah_neu, dfs.dat=Mod_Mat,
    +                                    xlev=xlev, hiddenSizes=c(200,100,50,25,10),trace=T))

    Unconstrained Nonlinear Optimization
    L-BFGS (Limited-memory Broyden-Fletcher-Goldfarb-Shanno)
    Iter           Objective Value   Grad Norm        Step   Evals
      0   5.14697806831633850E+11   6.238E+09   1.000E+00       1

     6   4.94837221890043518E+11   2.293E+08   1.000E+00  1                                                                   
     7   4.94810299190365112E+11   9.268E+06   1.000E+00       1
     8   4.94810277908935242E+11   8.855E+06   1.000E+00       1
    Solution status             Optimal (objMinProgress)
    Number of L-BFGS iterations 8
    Number of objective evals   16
    Objective value             4.9481e+11
    Gradient norm               8.85457e+06
         user    system   elapsed
      498.002    90.940 30473.421
    > # Checks the general information of the Neural Network Model
    > mod_neu
    Number of input units      845
    Number of output units     1
    Number of hidden layers    5
    Objective value            4.948103E+11
    Solution status            Optimal (objMinProgress)
    Hidden layer [1]           number of neurons 200, activation 'bSigmoid'
    Hidden layer [2]           number of neurons 100, activation 'bSigmoid'
    Hidden layer [3]           number of neurons 50, activation 'bSigmoid'
    Hidden layer [4]           number of neurons 25, activation 'bSigmoid'
    Hidden layer [5]           number of neurons 10, activation 'bSigmoid'
    Output layer               number of neurons 1, activation 'linear'
    Optimization solver        L-BFGS
    Scale Hessian inverse      1
    Number of L-BFGS updates   20
    > mod_neu$nObjEvaluations
    [1] 16
    > mod_neu$nWeights
    [1] 195896

    Best Practices on logging level for using Apache Spark with ORAAH

    It is important to note that Apache Spark’s log is by default verbose, so it might be useful after a few tests with different settings to turn down the level of logging, something a System Administrator typically will do by editing the file $SPARK_HOME/etc/log4j.properties (see Best Practices below).

    By default, that file is going to look something like this:

    # cat $SPARK_HOME/etc/log4j.properties # Set everything to be logged to the console
    log4j.rootCategory=INFO, console
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

    # Settings to quiet third party logs that are too verbose

    A typical full log will provide the below information, but also might provide too much logging when running the Models themselves, so it will be more useful for the first tests and diagnostics.

    > # Creates the Spark Context. Because the Memory setting is not specified,
    > # the defaults of 1 GB of RAM per Spark Worker is used
    > spark.connect("yarn-client", dfs.namenode="my.hdfs.namenode")
    15/02/18 13:05:44 WARN SparkConf:
    SPARK_JAVA_OPTS was detected (set to '-Djava.library.path=/usr/lib64/R/lib'). This is deprecated in Spark 1.0+.
    Please instead use:
    - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
    - ./spark-submit with --driver-java-options to set -X options for a driver
    - spark.executor.extraJavaOptions to set -X options for executors
     SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker)
    15/02/18 13:05:44 WARN SparkConf: Setting 'spark.executor.extraJavaOptions' to '- Djava.library.path=/usr/lib64/R/lib' as a work-around.
    15/02/18 13:05:44 WARN SparkConf: Setting 'spark.driver.extraJavaOptions' to '- Djava.library.path=/usr/lib64/R/lib' as a work-around
    15/02/18 13:05:44 INFO SecurityManager: Changing view acls to: oracle
    15/02/18 13:05:44 INFO SecurityManager: Changing modify acls to: oracle
    15/02/18 13:05:44 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(oracle); users with modify permissions: Set(oracle)
    15/02/18 13:05:44 INFO Slf4jLogger: Slf4jLogger started
    15/02/18 13:05:44 INFO Remoting: Starting remoting
    15/02/18 13:05:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@my.spark.master:35936]
    15/02/18 13:05:45 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@my.spark.master:35936]
    15/02/18 13:05:46 INFO SparkContext: Added JAR /u01/app/oracle/product/12.2.0/dbhome_1/R/library/ORCHcore/java/orch-core-2.4.1-mr2.jar at with timestamp 1424264746100
    15/02/18 13:05:46 INFO SparkContext: Added JAR /u01/app/oracle/product/12.2.0/dbhome_1/R/library/ORCHcore/java/orch-bdanalytics-core-2.4.1- mr2.jar at with timestamp 1424264746101
    15/02/18 13:05:46 INFO RMProxy: Connecting to ResourceManager at my.hdfs.namenode /
    Utils: Successfully started service 'sparkDriver' on port 35936. SparkEnv: Registering MapOutputTracker
    SparkEnv: Registering BlockManagerMaster
    DiskBlockManager: Created local directory at /tmp/spark-local-
    MemoryStore: MemoryStore started with capacity 265.1 MB
    HttpFileServer: HTTP File server directory is /tmp/spark-7c65075f-850c-
    HttpServer: Starting HTTP Server
    Utils: Successfully started service 'HTTP file server' on port 11491. Utils: Successfully started service 'SparkUI' on port 4040.
    SparkUI: Started SparkUI at http://my.hdfs.namenode:4040
    15/02/18 13:05:46 INFO Client: Requesting a new application from cluster with 1 NodeManagers
    15/02/18 13:05:46 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
    15/02/18 13:05:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    15/02/18 13:05:46 INFO Client: Uploading resource file:/opt/cloudera/parcels/CDH-5.3.1- 1.cdh5.3.1.p0.5/lib/spark/lib/spark-assembly.jar -> hdfs://my.hdfs.namenode:8020/user/oracle/.sparkStaging/application_1423701785613_0009/spark- assembly.jar
    15/02/18 13:05:47 INFO Client: Setting up the launch environment for our AM container
    15/02/18 13:05:47 INFO SecurityManager: Changing view acls to: oracle
    15/02/18 13:05:47 INFO SecurityManager: Changing modify acls to: oracle
    15/02/18 13:05:47 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(oracle); users with modify permissions: Set(oracle) 15/02/18 13:05:47 INFO Client: Submitting application 9 to ResourceManager
    15/02/18 13:05:47 INFO YarnClientImpl: Submitted application application_1423701785613_0009 15/02/18 13:05:48 INFO Client: Application report for application_1423701785613_0009 (state: ACCEPTED)
    13:05:48 INFO Client:
    client token: N/A
    diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: root.oracle
    start time: 1424264747559 final status: UNDEFINED tracking URL: http:// my.hdfs.namenode
    13:05:46 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB
    13:05:46 INFO Client: Setting up container launch context for our AM
    13:05:46 INFO Client: Preparing resources for our AM container
    my.hdfs.namenode:8088/proxy/application_1423701785613_0009/ user: oracle
    15/02/18 13:05:49 INFO Client: Application report for application_1423701785613_0009 (state: ACCEPTED)
    15/02/18 13:05:50 INFO Client: Application report for application_1423701785613_0009 (state: ACCEPTED)

    Please note that all those warnings are expected, and may vary depending on the release of Spark used.

    With the Console option in the log4j.properties settings are lowered from INFO to WARN, the request for a Spark Context would return the following:

    # cat $SPARK_HOME/etc/log4j.properties

    # Set everything to be logged to the console
    log4j.rootCategory=INFO, console
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

    # Settings to quiet third party logs that are too verbose

    Now the R Log is going to show only a few details about the Spark Connection.

    > # Creates the Spark Context. Because the Memory setting is not specified,
    > # the defaults of 1 GB of RAM per Spark Worker is used
    > spark.connect(master="yarn-client", dfs.namenode="my.hdfs.server")
    15/04/09 23:32:11 WARN SparkConf:
    SPARK_JAVA_OPTS was detected (set to '-Djava.library.path=/usr/lib64/R/lib').
    This is deprecated in Spark 1.0+.

    Please instead use:
    - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
    - ./spark-submit with --driver-java-options to set -X options for a driver
    - spark.executor.extraJavaOptions to set -X options for executors
    - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker)

    15/04/09 23:32:11 WARN SparkConf: Setting 'spark.executor.extraJavaOptions' to '-Djava.library.path=/usr/lib64/R/lib' as a work-around.
    15/04/09 23:32:11 WARN SparkConf: Setting 'spark.driver.extraJavaOptions' to '-Djava.library.path=/usr/lib64/R/lib' as a work-around.
    15/04/09 23:32:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

    Finally, with the Console logging option in the log4j.properties file set to ERROR instead of INFO or WARN, the request for a Spark Context would return nothing in case of success:

    # cat $SPARK_HOME/etc/log4j.properties

    # Set everything to be logged to the console
    log4j.rootCategory=INFO, console
    log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

    # Settings to quiet third party logs that are too verbose

    This time there is no message returned back to the R Session, because we requested it to only return feedback in case of an error:

    > # Creates the Spark Context. Because the Memory setting is not specified,
    > # the defaults of 1 GB of RAM per Spark Worker is used
    > spark.connect(master="yarn-client", dfs.namenode="my.hdfs.server")

    In summary, it is practical to start any Project with the full logging, but it would be a good idea to bring the level of logging down to WARN or ERROR after the environment has been tested to be working OK and the settings are stable.

    Wednesday Aug 05, 2015

    ROracle 1.2-1 released

    We are pleased to announce the latest update of the open source ROracle package, version 1.2-1, with enhancements and bug fixes. ROracle provides high performance and scalable interaction between R and Oracle Database. In addition to availability on CRAN, ROracle binaries specific to Windows and other platforms can be downloaded from the Oracle Technology Network. Users of ROracle, please take our brief survey. Your feedback is important and we want to hear from you!

    Latest enhancements in version 1.2-1 include:

    • Support for NATIVE, UTF8 and LATIN1 encoded data in query and results

    • enhancement 20603162 - CLOB/BLOB enhancement, see man page on attributes ore.type, ora.encoding, ora.maxlength, and ora.fractional_seconds_precision.

    • bug 15937661 – mapping of dbWriteTable BLOB, CLOB, NCLOB, NCHAR AND NVARCHAR columns. Data frame mapping to Oracle Database type is provided.

    • bug 16017358 – proper handling of NULL extproc context when passed to in ORE embedded R execution

    • bug 16907374 - ROracle creates time stamp column for R Date with dbWriteTable

    • ROracle now displays NCHAR, NVARCHAR2 and NCLOB data types defined for
    columns in the server using dbColumnInfo and dbGetInfo

    In addition, enhancements in the previous release of ROracle, version 1.1-12, include:

    • Add bulk_write parameter to specify number of rows to bind at a time to improve performance for dbWriteTable and DML operations

    • Date, Timestamp, Timestamp with time zone and Timestamp with local time zone data are maintained in R and Oracle's session time zone. Oracle session time zone environment variable ORA_SDTZ and R's environment variable TZ must be the same for this to work else an error is reported when operating on any of these column types

    • bug 16198839 - Allow selecting data from time stamp with time zone and time stamp with local time zone without reporting error 1805

    • bug 18316008 - increases the bind limit from 4000 to 2GB for inserting data into BLOB, CLOB and 32K VARCHAR and RAW data types. Changes describe lengths to NA for all types except for CHAR, VARCHAR2, and RAW

    • and other performance improvements and bug fixes

    See the ROracle NEWS for the complete list of updates.

    We encourage ROracle users to post questions and provide feedback on the Oracle R Technology Forum and the ROracle survey.

    ROralce is not only a high performance interface to Oracle Database from R for direct use, ROracle supports database access for Oracle R Enterprise from the Oracle Advanced Analytics option to Oracle Database.

    Monday Jul 13, 2015

    BIWASummit 2016 "Call for Speakers" is open!

    Oracle BIWA Summit is an annual conference that provides attendees a concentrated three days of content focused on Big Data and Analytics. Once again, it will be held at the Oracle Headquarters Conference Center in Redwood Shores, CA. As part of the organizing committee, I invite you to submit session proposals, especially those involving Oracle's R technologies.

    BIWA Summit attendees want to hear about your use of Oracle technology. Proposals will be accepted through Monday evening November 2, 2015, at midnight EST.

    To submit your abstract, click here.

    This year's tracks include:

    Oracle BIWA Summit 2016 is organized and managed by the Oracle BIWA SIG, the Oracle Spatial SIG, and the Oracle Northern California User Group. The event attracts top BI, data warehousing, analytics, Spatial, IoT and Big Data experts.

    The three-day event includes keynotes from industry experts, educational sessions, hands-on labs, and networking events.

    Hot topics include:

    • Database, data warehouse and cloud, Big Data architecture

    • Deep dives and hands-on labs on existing Oracle BI, data warehouse, and analytics products

    • Updates on the latest Oracle products and technologies (e.g. Big Data Discovery, Oracle Visual Analyzer, Oracle Big Data SQL)

    • Novel and interesting use cases on everything – Spatial, Graph, Text, Data Mining, IoT, ETL, Security, Cloud

    • Working with Big Data (e.g., Hadoop, "Internet of Things,” SQL, R, Sentiment Analysis)

    • Oracle Business Intelligence (OBIEE), Oracle Big Data Discovery, Oracle Spatial, and Oracle Advanced Analytics—Better Together

    I look forward to seeing you there!

    Tuesday Jun 30, 2015

    R Consortium Launched!

    The Linux Foundation announces the R Consortium to support R users globally.

    The R Consortium works with and provides support to the R Foundation and other organizations developing, maintaining and distributing R software and provides a unifying framework for the R user community.

    “Data science is pushing the boundaries of what is possible in business, science, and technology, where the R language and ecosystem is a major enabling force,” said Neil Mendelson, Vice President, Big Data and Advanced Analytics, Oracle “The R Consortium is an important enabling body to support and help grow the R user community, which increasingly includes enterprise data scientists.”

    R is a key enabling technology for data science as evidenced by its dramatic rise in adoption over the past several years. We look forward to contributing to R's continued success through the R Consortium.

    Friday Jun 12, 2015

    Variable Selection with ORE varclus - Part 2

    In our previous post we talked about variable selection and introduced a technique based on hierarchical divisive clustering and implemented using the Oracle R Enterprise embedded execution capabilities. In this post we illustrate how to visualize the clustering solution, discuss stopping criteria and highlight some performance aspects.


    The clustering efficiency can be assessed, from a high level perspective, through a visual representation of metrics related to variability. The plot.clusters() function provided as example in varclus_lib.R takes the datastore name, the iteration number (nclust corresponds to the number of clusters after the final iteration) and an output directory to generate a png output file with two plots.

    R> plot.clusters(dsname="datstr.MYDATA",nclust=6,

    unix> ls -1 out.varclus.MYDATA

    The upper plot focuses on the last iteration. The x axis represents the cluster id (1 to 6 for six clusters after the 6-th and final iteration). The variation explained and proportion of variation explained (Variation.Explained and Proportion.Explained from 'Cluster Summary') are rendered by the blue curve with units on the left y axis and the red curve with units on the right y axis). Clusters 1,2,3,4,6 are well represented by their first principal component. Cluster 5, contains variation which is not well captured by a single component (only 47.8% is explained, as alraedy mentioned in Part 1). This can be seen also from the r2.own values for the variables of Cluster 5, VAR20, VAR26,...,VAR29 , between 0.24 and 0.62 indicating that their are not well correlated with the 1st principal component score. For this kind of situation, domain expertise will be needed to evaluate the results and decide the course of action : does it make sense to have VAR20, VAR26,...,VAR29 clustered together/keep VAR27 as representative variable or should Cluster 5 be further split by lowering eig2.threshold (below the corresponding Secnd.Eigenval value from the 'Clusters Summary' section) ?

    The bottom plot illustrates the entire clustering sequence (all iterations) The x axis represents the iteration number or the numbers of clusters after that iteration. The total variation explained and proportion of total variation explained (Tot.Var.Explained and Prop.Var.Explained from 'Grand Summary' are rendered by the blue curve with units on the left y axis and the red curve with units on the right y axis). One can see how Prop.Var.Explained tends to flatten below 90% (86.3% for the last iteration).

    For the case above a single cluster was 'weak' and there were no ambiguities about where to start examining the results or search for issues.  Below is the same output for a different problem with 120 variables and 29 final clusters. For this case, the proportion of variation explained by the 1st component (red curve, upper plot) shows several 'weak' clusters : 23, 28, 27, 4, 7, 19.  The Prop.Var.Explained is below 60% for these clusters. Which one should be examined first ? A good choice could be Cluster 7 because it plays a more important role as measured by the absolute value of Variation.Explained. Here again, domain knowledge will be required to examine these clusters and decide if and for how long how one should continue the splitting process.

    Stopping criteria & number of variables

    As illustrated in the previous section, the number of final clusters can be raised or reduced by lowering or increasing the eig2.trshld parameter. For problems with many variables the user may want to stop the iterations at lower values and inspect the clustering results & history before convergence to gain a better understanding of the variable selection process. Early stopping is achieved through the maxclust argument, as discussed in the previous post, and can be used also if the user wants/has to keep the number of selected variables below an upper limit.


    The clustering runtime is entirely dominated  by the cost of the PCA analysis. The 1st split is the most expensive as PCA is run for the entire data; the subsequent splits are executed faster and faster as the PCAs handle clusters with less and less variables. For the 39 variables & 55k rows case presented it took ~10s for the entire run (splitting into 6 clusters, post-processing from datastore, generation). The 120 variables & 55k rows case required ~54s. For a larger case with 666 variables & 64k rows the execution completed in 112s and generated 128 clusters. These numbers were obtained on a Intel Xeon 2.9Ghz OL6 machine.The customer ran cases with more than 600 variable & O[1e6] rows in 5-10 mins.

    Thursday Jun 04, 2015

    Variable Selection with ORE varclus - Part 1

    Variable selection also known as feature or attribute selection is an important technique for data mining and predictive analytics. It is used when the number of variables is large and has received a special attention from application areas where this number is very large (like genomics, combinatorial chemistry, text mining, etc). The underlying hypothesis for variable selection is that the data can contain many variables which are either irrelevant or redundant. Solutions are therefore sought for selecting subsets of these variables which can predict the output with an accuracy comparable to that of the complete input set.

    Variable selection serves multiple purposes: (1) It provides a faster and more cost-effective model generation (2) It simplifies the model interpretation as it based on a (much) smaller and more effective set of predictors (3) It supports a better generalization because the elimination of irrelevant features can reduce model over-fitting.

    There are many approaches for feature selection differentiated by search techniques, validation methods or optimality considerations. In this blog we will describe a solution based on hierarchical and divisive variable clustering which generates disjoint groups of variables such that each group can be interpreted essentially as uni-dimensional and represented by a single variable from the original set.
    This solution was developed and implemented during a POC with a customer from the banking sector. The data consisted of tables with several hundred variables and O[1e5-1e6] observations. The customer wanted to build an analysis flow operating with a much smaller number of 'relevant' attributes, from the original set, which would best capture the variability expressed in the data.

    The procedure is iterative and starts from a single cluster containing all original variables. This cluster is divided in two clusters and variables assigned to one or another of the two children clusters. At every iteration one particular cluster is selected for division and the procedure continues until there are no more suitable candidates for division or if the user decided to stop the procedure once n clusters were generated (and n representative variables were identified)

    The selection criteria for division is related to the variation contained in the candidate cluster, more precisely to how this variation is distributed among it's principal components. PCA is performed on the initial (starting) cluster and on every cluster resulting from divisions. If the 2nd eigenvalue is large it means that the variation is distributed at least between two principal axis or components. We are not looking beyond the 2nd eigenvalue and divide that cluster's variables into two groups depending on how they are associated with the first two axis of variability. The division process continues until every clusters has variables associated with only one principal component i.e. until every cluster has a 2nd PCA eigenvalue less than a specified threshold. During the iterative process, the cluster picked for splitting is the one having the largest 2nd eigenvalue.

    The assignment of variables to clusters is based on the matrix of factor loadings or the correlation between the original variables and the PCA factors. Actually the factor loadings matrix is not directly used but a rotated matrix which improves separability. Details on the principle of factor rotations and the various types of rotations can be found in Choosing the Right Type of Rotation in PCA and EFA and Factor Rotations in Factor Analyses.
    The rotations are performed with the function GPFoblq() from the GPArotation package, a pre-requisite for ORE varclus.

    The next sections will describe how to run the variable clustering algorithm and interpret the results.

    The ORE varclus scripts

    The present version of ORE varclus is implemented in a function, ore.varclus() to be run in embedded execution mode. The driver script example, varclus_run.R illustrates how to call this function with ore.doEval:

    R> clust.log <- ore.doEval(FUN.NAME="ore.varclus",

    The arguments passed to ore.varclus() are :

    ore.varclus() is implemented in the varclus_lib.R script. The script contains also examples of post-processing functions illustrating how to selectively extract results from the datastore and generate reports and plots. The current version of ore.varclus() supports only numerical attributes. Details on the usage of the post-processing functions are provided in the next section.

    The output


    We illustrate the output of ORE varclus for a particular dataset (MYDATA) containing 39 numeric variables and 54k observations. ore.varclus() saves the history of the entire cluster generation in a datastore specified via the dsname argument:

      datastore.name object.count  size       creation.date description
    1 datstr.MYDATA            13 30873 2015-05-28 01:03:42        <NA>

         object.name      class size length row.count col.count
    1  Grand.Summary data.frame  562      5         6         5
    2  clusters.ncl1       list 2790      1        NA        NA
    3  clusters.ncl2       list 3301      2        NA        NA
    4  clusters.ncl3       list 3811      3        NA        NA
    5  clusters.ncl4       list 4322      4        NA        NA
    6  clusters.ncl5       list 4833      5        NA        NA
    7  clusters.ncl6       list 5344      6        NA        NA
    8   summary.ncl1       list  527      2        NA        NA
    9   summary.ncl2       list  677      2        NA        NA
    10  summary.ncl3       list  791      2        NA        NA
    11  summary.ncl4       list  922      2        NA        NA
    12  summary.ncl5       list 1069      2        NA        NA
    13  summary.ncl6       list 1232      2        NA        NA    

    For this dataset the algorithm generated 6 clusters after 6 iterations with a threshold eigv2.trshld=1.00. The datastore contains several types of objects : clusters.nclX, summary.nclX and Grand.Summary. The suffix X indicates the iteration step. For example clusters.ncl4 does not mean the 4th cluster; it is a list of objects (numbers and tables) related to the 4 clusters generated during the 4th iteration. summary.ncl4 contains summarizing information about each of the 4 clusters generated during the  4th iteration. Grand.Summary provides the same metrics but aggregated for all clusters per iteration. More details will be provided below.

    The user can load and inspect each clusters.nclX or summary.nclX individually to track for example how variables are assigned to clusters during the iterative process. Saving the results on a per iteration basis becomes practical when the number of starting variables is several hundreds large and many clusters are generated.

    Text based output

    ore.varclus_lib.R contains a function write.clusters.to.file() which allows to concatenate all the information from either one single or multiple iterations and dump it in formatted text for visual inspection. In the example below the results from the last two step (5 and 6) specified via the clust.steps argument is written to the file named via the fout argument.

    R> fclust <- "out.varclus.MYDATA/out.MYDATA.clusters"
    R> write.clusters.to.file(fout=fclust,

    The output contains now the info from summary.ncl5, clusters.ncl5, summary.ncl6, clusters.ncl6, and Grand.Summary in that order. Below we show only the output corresponding to the 6th iteration which contains the final results.

    The output starts with data collected from summary.ncl6 and displayed as two sections 'Clusters Summary' and 'Inter-Clusters Correlation'. The columns of  'Clusters Summary' are:

    The 'Inter-Clusters Correlation' matrix is the correlation matrix between the scores of (data on) the 1st principal component of every cluster. It is a measure of how much the clusters are uncorrelated when represented by the 1st principal component.

    Clustering step 6
    Clusters Summary :

      Cluster Members Variation.Explained Proportion.Explained Secnd.Eigenval Represent.Var
    1       1      13           11.522574            0.8863518   7.856187e-01         VAR25
    2       2       6            5.398123            0.8996871   3.874496e-01         VAR13
    3       3       6            5.851600            0.9752667   1.282750e-01          VAR9
    4       4       3            2.999979            0.9999929   2.112009e-05         VAR10
    5       5       5            2.390534            0.4781069   8.526650e-01         VAR27
    6       6       6            5.492897            0.9154828   4.951499e-01         VAR14

    Inter-Clusters Correlation :

                 Clust.1      Clust.2       Clust.3       Clust.4       Clust.5       Clust.6
    Clust.1  1.000000000  0.031429267  0.0915034534 -0.0045104029 -0.0341091948  0.0284033464
    Clust.2  0.031429267  1.000000000  0.0017441189 -0.0014435672 -0.0130659191  0.8048780461
    Clust.3  0.091503453  0.001744119  1.0000000000  0.0007563413 -0.0080611117 -0.0002118345
    Clust.4 -0.004510403 -0.001443567  0.0007563413  1.0000000000 -0.0008410022 -0.0022667776
    Clust.5 -0.034109195 -0.013065919 -0.0080611117 -0.0008410022  1.0000000000 -0.0107850694
    Clust.6  0.028403346  0.804878046 -0.0002118345 -0.0022667776 -0.0107850694  1.0000000000

    Cluster 1
                 Comp.1       Comp.2    r2.own     r2.next   r2.ratio var.idx
    VAR25 -0.3396562963  0.021849138 0.9711084 0.010593134 0.02920095      25
    VAR38 -0.3398365257  0.021560264 0.9710107 0.010590140 0.02929962      38
    VAR23 -0.3460431639  0.011946665 0.9689027 0.010689408 0.03143329      23
    VAR36 -0.3462378084  0.011635813 0.9688015 0.010685952 0.03153546      36
    VAR37 -0.3542777932 -0.001166427 0.9647680 0.010895771 0.03562009      37
    VAR24 -0.3543088809 -0.001225793 0.9647155 0.010898262 0.03567326      24
    VAR22 -0.3688379400 -0.026782777 0.9484384 0.011098450 0.05214028      22
    VAR35 -0.3689127408 -0.026900129 0.9484077 0.011093779 0.05217103      35
    VAR30 -0.0082726659  0.478137910 0.8723316 0.006303141 0.12847817      30
    VAR32  0.0007818601  0.489061629 0.8642301 0.006116234 0.13660543      32
    VAR31  0.0042646500  0.493099400 0.8605441 0.005992662 0.14029666      31
    VAR33  0.0076560545  0.497131056 0.8573146 0.005934929 0.14353729      33
    VAR34 -0.0802417381  0.198756967 0.3620001 0.007534643 0.64284346      34

    Cluster 2
               Comp.1      Comp.2    r2.own   r2.next  r2.ratio var.idx
    VAR13 -0.50390550 -0.03826113 0.9510065 0.6838419 0.1549652      13
    VAR3  -0.50384385 -0.03814382 0.9509912 0.6838322 0.1550089       3
    VAR18 -0.52832332 -0.09384185 0.9394948 0.6750884 0.1862204      18
    VAR11 -0.31655455  0.33594147 0.9387738 0.5500716 0.1360798      11
    VAR16 -0.34554284  0.26587848 0.9174539 0.5351907 0.1775913      16
    VAR39 -0.02733522 -0.90110241 0.7004025 0.3805168 0.4836249      39

    Cluster 3
                 Comp.1       Comp.2    r2.own      r2.next    r2.ratio var.idx
    VAR9  -4.436290e-01  0.010645774 0.9944599 0.0111098555 0.005602316       9
    VAR8  -4.440656e-01  0.009606151 0.9944375 0.0113484256 0.005626315       8
    VAR7  -4.355970e-01  0.028881014 0.9931890 0.0110602004 0.006887179       7
    VAR6  -4.544373e-01 -0.016395561 0.9914545 0.0114996393 0.008644956       6
    VAR21 -4.579777e-01 -0.027336302 0.9865562 0.0004552779 0.013449888      21
    VAR5   1.566362e-06  0.998972842 0.8915032 0.0093737140 0.109523464       5

    Cluster 4
                Comp.1        Comp.2    r2.own      r2.next     r2.ratio var.idx
    VAR10 7.067763e-01  0.0004592019 0.9999964 1.899033e-05 3.585911e-06      10
    VAR1  7.074371e-01 -0.0004753728 0.9999964 1.838949e-05 3.605506e-06       1
    VAR15 2.093320e-11  0.9999997816 0.9999859 2.350467e-05 1.408043e-05      15

    Cluster 5
                Comp.1       Comp.2    r2.own      r2.next  r2.ratio var.idx
    VAR27 -0.556396037 -0.031563215 0.6199740 0.0001684573 0.3800900      27
    VAR29 -0.532122723 -0.041330455 0.5586173 0.0001938785 0.4414683      29
    VAR28 -0.506440510 -0.002599593 0.5327290 0.0001494172 0.4673408      28
    VAR26 -0.389716922  0.198849850 0.4396647 0.0001887849 0.5604411      26
    VAR20  0.003446542  0.979209797 0.2395493 0.0076757755 0.7663329      20

    Cluster 6
                 Comp.1        Comp.2    r2.own   r2.next  r2.ratio var.idx
    VAR14 -0.0007028647  0.5771114183 0.9164991 0.7063442 0.2843495      14
    VAR4  -0.0007144334  0.5770967589 0.9164893 0.7063325 0.2843714       4
    VAR12 -0.5779762250 -0.0004781436 0.9164238 0.4914497 0.1643420      12
    VAR2  -0.5779925997 -0.0004993306 0.9164086 0.4914361 0.1643676       2
    VAR17 -0.5760772611  0.0009732350 0.9150015 0.4900150 0.1666686      17
    VAR19  0.0014223072  0.5778410825 0.9120741 0.7019736 0.2950272      19

    Grand Summary
      Nb.of.Clusters Tot.Var.Explained Prop.Var.Explained Min.Prop.Explained Max.2nd.Eigval
    1              1          11.79856          0.3025272          0.3025272       9.787173
    2              2          21.47617          0.5506711          0.4309593       5.778829
    3              3          27.22407          0.6980530          0.5491522       2.999950
    4              4          30.22396          0.7749735          0.6406729       2.389400
    5              5          32.60496          0.8360246          0.4781069       1.205769
    6              6          33.65571          0.8629668          0.4781069       0.852665

    The sections 'Cluster 1' ... 'Cluster 6' contain results collected from the clusters.ncl6 list from the datastore. Each cluster is described by a table where the rows are the variables and the columns correspond to:

    For example, from 'Clusters Summary', the first cluster (index 1) has 13 variables and is best represented by variable VAR25 which, from an inspecting the 'Cluster 1' section, shows the highest r2.own = 0.9711084.

    The section 'Grand Summary' displays the results from the Grand.Summary table in the datastore. The rows correspond to the clustering iterations and the columns are defined as:

    For example, for the final clusters (Nb.of.Clusters = 6) Min.Proportion.Explained is 0.4781069. This corresponds to Cluster 5 - see Proportion.Explained value from 'Clusters Summary'. It means that variation in Cluster 5 is poorly captured by the first principal component (only 47.8%)

    As previously indicated, the representative variables, one per final cluster, are collected in the Represent.Var column from the 'Clusters Summary' section in the output text file. They can be retrieved from the summary.ncl6 object in the datastore as shown below:

    R> ore.load(list=c("summary.ncl6"),name=datstr.name)
    [1] "summary.ncl6"
    R> names(summary.ncl6)
    [1] "clusters.summary"      "inter.clusters.correl"
    R> names(summary.ncl6$clusters.summary)
    [1] "Cluster"  "Members"  "Variation.Explained"  "Proportion.Explained" "Secnd.Eigenval"     
    [6] "Represent.Var"      
    R> summary.ncl6$clusters.summary$Represent.Var
    [1] "VAR25" "VAR13" "VAR9"  "VAR10" "VAR27" "VAR14"

    In our next post we'll look at plots, performance and future developments for ORE varclus.

    Wednesday May 06, 2015

    Experience using ORAAH on a customer business problem: some basic issues & solutions

    We illustrate in this blog a few simple, practical solutions for problems which can arise when developing ORAAH mapreduce applications for the Oracle BDA. These problems were actually encountered during a recent POC engagement. The customer, an  important player in the medical technologies market, was interested in building an analysis flow consisting of a sequence of data manipulation and transformation steps followed by multiple model generation. The data preparation included multiple types of merging, filtering, variable generation based on complex search patterns and represented, by far, the most time consuming component of the flow. The original implementation on the customer's hardware required multiple days per flow to complete. Our ORAAH mapreduce based implementation running on a X5-2 Starter Rack BDA reduced that time to between 4-20 minutes, depending on which flow was tested.

    The points which will be addressed in this blog are related to the fact that the data preparation was structured as a chain of task where each tasks performed transformations on HDFS data generated by one or multiple upstream tasks. More precisely we will consider the:

    • Merging of HDFS data from multiple sources

    • Re-balancing and parts reduction for HDFS data

    • Getting unique levels for categorical variables from HDFS data

    • Partitioning the data for distributed mapreduce execution

    'Merging data' from above is to be understood as row binding of multiple tables. Re-balancing and parts reduction addresses the fact the HDFS data (generated by upstream jobs) may consist of very unequal parts (chunks) - this would lead to performance losses when this data further processed by other mapreduce jobs. The 3rd and 4th items are related. Getting the unique levels of categorical variables was useful for the data partitioning process, namely for how to generate the key-values pairs within the mapper functions.

    1. Merging of hdfs data from multiple sources

    The practical case here is that of a data transformation task for which the input consists of several, similarly structured HDFS data sets. As a reminder, data in HDFS is stored as a collection of flat files/chunks (part-00000, part-00001, etc) under an HDFS directory and the hdfs.* functions access the directory, not the 'part-xxxxx' chunks. Also the hadoop.run()/hadoop.exec().* functions work with single input data objects (HDFS object identifier representing a directory in HDFS); R rbind, cbind, merge, etc operations cannot be invoked within mapreduce to bind two or several large tables.

    For the case under consideration, each input (dataA_dfs, dataB_dfs, etc) consists of a different number of files/chunks

    R> hdfs.ls("dataA_dfs")
    [1] "__ORCHMETA__" "part-00000" "part-00001" .... "part-00071"
    R> hdfs.ls("dataB_dfs")
    [1] "__ORCHMETA__" "part-00000" "part-00001" .... "part-00035"

    corresponding to the number of reducers used by the upstream mapreduce jobs which generated this data. As these multiple chunks from various HDFS directories need to be processed as a single input data, they need to be moved into a unique HDFS directory. The 'merge_hdfs_data' function below does just that, by creating a new HDFS directory and copying all the part-xxxxx from each source directory  with proper updating of the resulting parts numbering. :

    R> merge_hdfs_data <- function(SrcDirs,TrgtDir) {
      #cat(sprintf("merge_hdfs_files : Creating %s ...\n",TrgtDir))
      i <- 0
      for (srcD in SrcDirs) {
        fparts <- hdfs.ls(get(srcD),pattern="part")
        srcd <- (hdfs.describe(get(srcD)))[1,2]
        for (fpart in fparts) {
          #cat(sprintf("merge_hdfs_files : Copying %s/%s to %s ...\n",

          i <- i+1

    Merging of the dataA_dfs and dataB_dfs directories into a new data_merged_dfs directory is achieved through:

    R> merge_hdfs_data(c("dataA_dfs","dataB_dfs"),"data_merged_dfs")

    2. Data re-balancing / Reduction of the number of parts

    Data stored in HDFS can suffer from two key problems that will affect performance: too many small files and files with very different numbers of records, especially those with very few records. The merged data produced by the function  above consists of a number of files equal to the sum of all files from all input HDFS directories. Since the upstream mapeduce jobs generating the inputs were run with a high number of reducers (for faster execution) the resulting total number of files got large (100+). This created an impractical constraint for the subsequent analysis as one cannot run a mapreduce application with a number of mappers less than the number of parts (the reverse is true, hdfs parts are splittable for processing by multiple mappers). Moreover if the parts have very different number of records the performance of the application will be affected since different mappers will handle very different volumes of data.

    The rebalance_data function below represents a simple way of addressing these issues. Every mapper splits its portion of the data into a user-defined number of parts (nparts) containing quasi the same number of records. A key is associated with each part. In this implementation the number of reducers is set to the number of parts. After shuffling each reducer will collect the records corresponding to one particular key and write them to the output. The overall output consists of nparts parts with quasi equal size. A basic mechanism for preserving the data types is illustrated (see the map.output and reduce.output constructs below).

    R> rebalance_data <- function(HdfsData,nmap,nparts)
      mapper_func <- function(k,v) {
        nlin <- nrow(v)
        if(nlin>0) {
          idx.seq <- seq(1,nlin)
          kk <- ceiling(idx.seq/(nlin/nparts))
      reducer_func <- function(k,v) {
        if (nrow(v) > 0) { orch.keyvals(k=NULL,v) }
      dtypes.out <- sapply(hdfs.meta(HdfsData)$types,
                           function(x) ifelse(x=="character","\"a\"",
      val.str <- paste0(hdfs.meta(HdfsData)$names,"=",dtypes.out,collapse=",")
      meta.map.str <- sprintf("data.frame(key=0,%s)",val.str)
      meta.red.str <- sprintf("data.frame(key=NA,%s)",val.str)

      config <- new("mapred.config",
                    job.name      = "rebalance_data",
                    map.output    = eval(parse(text=meta.map.str)),
                    reduce.output = eval(parse(text=meta.red.str)),
                    map.tasks     = nmap,
                    reduce.tasks  = nparts)
                    reduce.split  = 1e5)
      res <- hadoop.run(data = HdfsData,
                        mapper = mapper_func,
                        reducer = reducer_func,
                        config = config,
                        cleanup = TRUE

    Before using this function, the data associated with the new data_merged_dfs directory needs to be attached to the ORAAH framework:

    R> data_merged_dfs <- hdfs.attach("data_merged_dfs")

    The invocation below uses 144 mappers for splitting the data into 4 parts:

    R> x <- rebalance_data(data_merged_dfs,nmap=144,nparts=4)

    The user may also want to save the resulting object, permanently, under some convenient/recognizable name like 'data_rebalanced_dfs' for example. The path to the temporary object x is retrieved with the hdfs.describe() command and provided as first argument to the hdfs.cp() command.

    R> tmp_dfs_name <- hdfs.describe(x)[1,2]
    R> hdfs.cp(tmp_dfs_name,"data_rebalanced_dfs",overwrite=TRUE)

    The choice of the number of parts is up to the user. It is better to have a few parts to avoid constraining from below the number of mappers for the downstream runs but one should consider other factors like the read/write performance related to the size of the data sets, the HDFS block size, etc which are not the topic of the present blog.

    3. Getting unique levels

    Determining the unique levels of categorical variables in a dataset is of basic interest for any data exploration procedure. If the data is distributed in HDFS, this determination requires an appropriate solution. For the application under consideration here, getting the unique levels serves another purpose; the unique levels are used to generate data splits better suited for distributed execution by the downstream mapreduce jobs. More details are available in the next section.

    Depending on the categorical variables in question and data charactersitics, the determination of unique levels may require different solutions. The implementation below is a generic solution providing these levels for multiple variables bundled together in the input argument 'cols'. The mappers associate a key with each variable and collect the unique levels for each of these variables. The resulting array of values are packed in text stream friendly format and provided as value argument to orch.keyvals() - in this way complex data types can be safely passed between the mappers and reducers (via text-based Hadoop streams). The reducers unpack the strings, retrieve the all values associated with a particular key (variable) and re-calculate the unique levels accounting now for all values of that variable.

    R> get_unique_levels <- function(x, cols, nmap, nred) {
      mapper <- function(k, v) {
        for (col in cols) {
          uvals <- unique(v[[col]])
          orch.keyvals(col, orch.pack(uvals))
      reducer <- function(k, v) {
        lvals <- orch.unpack(v$val)
        uvals <- unique(unlist(lvals))
        orch.keyval(k, orch.pack(uvals))
      config <- new("mapred.config",
                    job.name      = "get_unique_levls",
                    map.output    = data.frame(key="a",val="packed"),
                    reduce.output = data.frame(key="a",val="packed"),
                    map.tasks     = nmap,
                    reduce.tasks  = nred,
      res <- hadoop.run(data = x,
                        mapper = mapper,
                        reducer = reducer,
                        config = config,
                        export = orch.export(cols=cols))
      resl <- (lapply((hdfs.get(res))$val,function(x){orch.unpack(x)}))[[1]]

    This implementation works fine provided that the number of levels for the categorical variables are much smaller than the large number of records of the entire data. If some categorical variables have many levels, not far  from order of the total number of records, each mapper may return a large numbers of levels and each reducer may have to handle multiple large objects. An efficient solution for this case requires a different approach. However, if the column associated with one of these variables can  fit in memory, a direct, very crude calculation like below can run faster than the former implementation. Here the mappers extract the column with the values of the variable in question, the column is pulled into an in-memory object and unique() is called to determine the unique levels.

    R> get_unique_levels_sngl <- function(HdfsData,col,nmap)
      mapper_fun <- function(k,v) { orch.keyvals(key=NULL,v[[col]]) }
      config <- new("mapred.config",
                    job.name      = "extract_col",
                    map.output    = data.frame(key=NA,VAL=0),
                    map.tasks     = nmap)
        x <- hadoop.run(data=HdfsData,
      xl <- hdfs.get(x)
      res <- unique(xl$VAL)

    R> customers <- get_unique_levls_sngl(data_rebalanced_dfs,"CID",nmap=32)

    We obtained thus the unique levels of the categorical variable CID (customer id) from our data_balanced_dfs data.

    4. Partitioning the data for mapreduce execution

    Let's suppose that the user wants to execute some specific data manipulations at the CID level like aggregations, variable transformations or new variables generation, etc. Associating a key with every customer (CID level) would be a bad idea since there are many customers - our hypothesis was that the number of CID levels is not orders of magnitude below the total number of records. This would lead to an excessive number of reducers with a terrible impact on performance. In such case it would be better, for example, to bag customers into groups and distribute the execution at the group level. The user may want to set the number of this groups ngrp to something commensurate with the number of  BDA cores available for parallelizing the task.

    The example below illustrates how to do that at a basic level. The groups are generated within the encapsulating function myMRjob, before the hadoop.run() execution - the var.grp dataframe has two columns : the CID levels and the group number (from 1 to ngrp) with which they are associated. This table is passed to the hadoop execution environment via orch.export() within hadoop.run(). The mapper_fun function extracts the group number as key and inserts the multiple key-values pairs into the output buffer. The reducer gets then a complete set of records for every customer associated with a particular key(group) and can proceed with the transformations/ manipulations within a loop-over-customers or whatever programming construct would be appropriate. Each reducer would handle a quasi-equal number of customers because this is how the groups were generated. However the number of records per customer is not constant and may introduce some imbalances.

    R> myMRjob <- function(HdfsData,var,ngrp,nmap,nred)
      mapper_fun <- function(k,v) {
        fltr <- <some_row_filetring>
        cID <- which(names(v) %in% "CUSTOMID")
        kk <- var.grps[match(v[fltr,cID],var.grps$CUSTOMID),2]
      reducer_fun <- function(k,v) { ... }
      config <- new("mapred.config", map.tasks = nmap, reduce.tasks = nred,....)

      var.grps <- data.frame(CUSTOMID=var,

      res <- hadoop.run(data = HdfsData,
                        mapper = mapper_fun,
                        reducer = reducer_fun,
                        config = config,
                        export = orch.export(var.grps=var.grps,ngrp=ngrp),
                        cleanup = TRUE

    x <- myMRjob(HdfsData=data_balanced_dfs, var=customers, ngrp=..,nmap=..,nred=..)

    Improved data partitioning solutions could be sought for the cases where there are strong imbalances in the number of records per customer or if great variations are noticed between the reducer jobs completion times. This kind of optimization will be addressed in a later blog.


    The place for best practices, tips, and tricks for applying Oracle R Enterprise, Oracle R Distribution, ROracle, and Oracle R Advanced Analytics for Hadoop in both traditional and Big Data environments.


    « July 2016