X

Best practices, news, tips and tricks - learn about Oracle's R Technologies for Oracle Database and Big Data

  • February 26, 2018

Scalable scoring with multiple models using Oracle R Enterprise Embedded R Execution

Mark Hornick
Director, Advanced Analytics and Machine Learning

At first glance, scoring data in batch with a machine learning model appears to be a straightforward endeavor: build the model, load the data, score using the model, do something with the results. This “something” can include writing the scores to a table, computing model evaluation/quality metrics, directly feeding a dashboard, etc. However, the task becomes a little more challenging when some of the details are filled in and hardware and software realities come into play. What are some of these details?

  • How much data needs to be scored?
  • Where are the data stored?
  • How many individual models are involved?
  • How large are the models?
  • Does the model scoring software scale as data volumes grow?
  • Does model scoring take advantage of parallelism?
  • How quickly are the results required?
  • Do the results need to be persisted?
  • What are our hardware limitations: RAM, CPUs, diskspace?

Note the distinction between parallelism and scalability. One does not necessarily imply the other. While algorithms in Oracle R Enterprise and Oracle R Advanced Analytics for Hadoop provide parallel and scalable algorithms for both model building and scoring, we are focusing here on how third party packages available from sites like CRAN can be used in a scalable and performant manner leveraging the infrastructure provided in Oracle R Enterprise.

At a high level, we might depict the problem as:

Let’s select a few parameters to make the scenario specific. Suppose we have 100 models, each 500 MB in size, and a 20 million row data set with 50 columns to be scored. Often, memory limitations are our first concern, especially if using software not specifically designed to be scalable. Based on this, we’re looking at the following potential memory requirements:
  • Models: 100 models * 500 MB = 50 GB
  • Scoring data: 20 M rows * 50 columns * 16 bytes = 16 GB
  • Scores: 100 models * 20 M rows = 2 B scores + 20 M IDs = 32.2 GB

If we wanted to take a brute force approach, we load everything in memory and serially execute. Just for the raw data noted above, this will require ~100 GB of RAM, and then there is the requirement to have enough RAM left over to perform the scoring - and don't forget the OS, database and other software that may be running.

While there are various approaches to this problem, in this blog we highlight using Oracle R Enterprise Embedded R Execution to achieve several goals:

  • scale to as much data as needed
  • limit memory requirements of the models themselves
  • create the final table in Oracle Database, thereby avoiding persisting intermediate results

But first, let's explore a few options.First is the brute force approach characterized above: 

Load all the data into memory
Load all models into memory
For each model i {
  Score data with model i
  Please scores in single in-memory table for all models
}
Write in-memory score table

We could choose to parallelize this approach on each model, but if we do so maximally (with 100 concurrently executing processes), we’ll require not only 100 times the ~18 GB (50 MB model + 16 GB data + 640 MB for the scores with key) plus the RAM to be loaded into each parallel engine (~1.8 TB), but additional RAM for the processes supporting the scoring in parallel. So while we can speed up the elapsed execution time, it comes at a major cost of memory, which you may not have. (Of course, we could limit the number of concurrent engines to reduce this significantly, but this indicates worst case scenario.)

What options might we consider to get this job done?

In the example above, the models take the majority of the space, unless you consider replicating the data in each concurrent engine. Perhaps we load only one at a time, score with it, and then discard it prior to moving to the next model. Additionally, the scores themselves consume a lot of space, so let’s purge those as well after writing them to a table.

Load all the data into memory
For each model i {
	Load model i
        Score data with model i
        Purge model
        Write scores with ID key to a table
        Purge scores
}
Join all score tables based on ID

This requires 16 GB for the data, 500 MB for the model, and 640 MB for the scores and keys. At ~18 GB, we’re doing better than the ~100 GB in the non-parallel case and ~1.8 TB in the worst case parallel case, but this may still not scale, especially if the data grow. The problem with this approach is that it requires all data to be loaded at once into memory, which by definition does not scale.

A variant of this approach is to process the data in smaller chunks, iterating serially over those chunks. Since we can choose the chunk size (in terms of number of rows), we could require as little at ~2 GB of RAM (processing a handful of rows each time, which would be inefficient). If we read half the data at a time, this could be reduced to ~10 GB. Of course, with this approach, unless the script appends to the existing table, we will have n tables to union, one for each model. The union, however, is not necessarily a costly operation, but can result in duplicating the data to materialize the result.

Until done {
        Load n rows of data into memory
        For each model i {
                Load model i
		Score data with model i
                Write scores with ID key to a table
	}
	Union all tables for model i (unneeded if data appended to single table)
}
Join all score tables based on ID (avoidable if scores updated in single table with ID key)

A parallel variant of this performs the scoring of each chunk in parallel. We’ll need to decide on a degree of parallelism, which is based on the machine specs (RAM and CPUs) and expected resource availability.

So let’s see how this could be done using Oracle R Enterprise Embedded R Execution with a simple example you can easily reproduce on your own system.

To illustrate, we'll start with a simple linear model using R's lm function. However, you can substitute your favorite algorithm available from CRAN or elsewhere. Using the Longley data set, we predict the number of people employed, which results in a numeric vector.

> # build test model
> mod <- lm(Employed ~ ., data = longley)
> pred <- predict(mod, longley)
> head(pred)
    1947     1948     1949     1950     1951     1952 
60.05566 61.21601 60.12471 61.59711 62.91129 63.88831 
To simulate multiple models, we'll create three datastore entries labeled test_1, test_2, and test_3, each containing the same lm model created above. A datastore allows one or more R objects to be stored in the database by name.
> ore.save(mod, name="test_1")
> ore.save(mod, name="test_2")
> ore.save(mod, name="test_3")
> ore.datastore(pattern="test")   # list the contents
  datastore.name object.count  size       creation.date description
1         test_1            1  7346 2018-02-21 18:15:05        
2         test_2            1  7346 2018-02-21 18:15:05        
3         test_3            1  7346 2018-02-21 18:15:06  

> ds_models <- c("test_1", "test_2", "test_3")
Next, we'll define our function f, which takes a chunk of rows in argument dat, along with the set of datastore names corresponding to our models used for scoring. Our objective is to score with each model sequentially on the given chunk of data (while ultimately processing the chunks in parallel), and place the results from each model in a list. We free the model when finished with it and garbage collect the environment just for good measure. To complete the function, the prediction list is converted to a data.frame with an ID added from dat.
> f <- function (dat, ds_models) {
+     pred <- list()
+     for(m in ds_models) {
+         ore.load(m)
+         pred[[m]] <- predict(mod, dat)
+         rm(mod)
+         gc()
+     }
+     res <- as.data.frame(pred)
+     res$ID <- dat$ID
+     res
+ }
Let's test this function, but first we'll add an ID column based on the row name as the function f expects.
> # provide ID column in data set
> longley2 <- data.frame(ID=rownames(longley), longley)
> head(longley2,3)
    ID GNP.deflator     GNP Unemployed Armed.Forces Population Year
1 1947         83.0 234.289      235.6        159.0    107.608 1947
2 1948         88.5 259.426      232.5        145.6    108.632 1948
3 1949         88.2 258.054      368.2        161.6    109.773 1949
  Employed
1   60.323
2   61.122
3   60.171
Now we're ready to test the function locally - a best practice before moving to Embedded R Execution. We see that the output is a data.frame with the prediction for each model in a column, followed by the ID. This facilitates comparison of scores from the multiple models. Note that we could store the scores in long format using a 3 column table (model_name, ID, value), but this would require roughly 3 times as much space.
> # test function locally
> dat <- head(longley2)
> testRes <- f(dat, ds_model_list)
> class(testRes)
[1] "data.frame"
> testRes
    test_1   test_2   test_3   ID
1 60.05566 60.05566 60.05566 1947
2 61.21601 61.21601 61.21601 1948
3 60.12471 60.12471 60.12471 1949
4 61.59711 61.59711 61.59711 1950
5 62.91129 62.91129 62.91129 1951
6 63.88831 63.88831 63.88831 1952
We now create an ore.frame, LONGLEY, from our data.frame for use with ore.rowApply. The function ore.push suffices for a demo example. We then invoke ore.rowApply with the ore.frame and the function f. Further, we specify that the data should be processed in chunks of 4 rows, and computed using 2 parallel R engines. We also pass the argument ds_models and specify to establish a connection to the database from the R engine automatically (this is needed to access the datastore). This invocation produces a list of data.frame objects, where each list element is produced from one execution of the R function. Since we have 16 rows, there will be 4 executions, executed 2 at a time in parallel.
> LONGLEY <- ore.push(longley2)
> 
> # return list of individual execution results
> ore.rowApply(LONGLEY, f, rows=4, parallel=2, 
+              ds_models = ds_models, ore.connect=TRUE)
$`1`
    test_1   test_2   test_3   ID
1 62.91129 62.91129 62.91129 1951
2 63.88831 63.88831 63.88831 1952
3 65.15305 65.15305 65.15305 1953
4 63.77418 63.77418 63.77418 1954

$`2`
    test_1   test_2   test_3   ID
1 60.05566 60.05566 60.05566 1947
2 61.21601 61.21601 61.21601 1948
3 60.12471 60.12471 60.12471 1949
4 61.59711 61.59711 61.59711 1950

$`3`
    test_1   test_2   test_3   ID
1 68.81055 68.81055 68.81055 1959
2 69.64967 69.64967 69.64967 1960
3 68.98907 68.98907 68.98907 1961
4 70.75776 70.75776 70.75776 1962

$`4`
    test_1   test_2   test_3   ID
1 66.00470 66.00470 66.00470 1955
2 67.40161 67.40161 67.40161 1956
3 68.18627 68.18627 68.18627 1957
4 66.55206 66.55206 66.55206 1958

Although we have the scores, they're not in the form we need. We want a single table in the database that contains all these rows. We can do that with a minor addition to the ore.rowApply invocation. We specify the argument FUN.VALUE, providing a row from our previous run as an example, although we could explicitly construct it using the data.frame function. This FUN.VALUE informs Oracle Database what the new table should look like (column names and data types).

Next we store the ore.frame result in variable res, and materialize a database table LONGLEY_SCORES. Note that when using FUN.VALUE, the ore.rowApply invocation constructs the ore.frame specification. Not until the result is accessed, e.g., when ore.create us invoked in this case, does Oracle Database spawn the R engines and execute the function. Each time the ore.frame in res is accessed, the result will be recomputed. So if a result is to be used multiple times, it is best to materialize it as a database table.

> # return ore.frame with all individual results
> res <- ore.rowApply(LONGLEY, f, FUN.VALUE = testRes[1,], 
+                     rows=4, parallel=2,ds_model_list = ds_model_list, ore.connect=TRUE)
> ore.drop("LONGLEY_SCORES")
> ore.create(res, table="LONGLEY_SCORES") 
[1] 16  4
> 
> dim(LONGLEY_SCORES)
> head(LONGLEY_SCORES)
    test_1   test_2   test_3   ID
1 60.05566 60.05566 60.05566 1947
2 61.21601 61.21601 61.21601 1948
3 60.12471 60.12471 60.12471 1949
4 61.59711 61.59711 61.59711 1950
5 62.91129 62.91129 62.91129 1951
6 63.88831 63.88831 63.88831 1952
> 

In this blog post, we've discussed some issues that can arise when trying to perform scalable scoring with multiple models using Oracle R Enterprise Embedded R Execution. In particular, this case arises when leveraging open source package algorithms, such as those from CRAN, that are not designed with parallelism or scalability in mind. The example above provides a template you can adapt and experiment with to meet your own project requirements,

When possible, leveraging algorithms designed for parallelism and scalability, such as those found in Oracle R Enterprise and Oracle R Advanced Analytics for Hadoop, can offer superior performance and scalability.

The next step for many enterprises is putting this solution in production, perhaps scheduled for regular execution. This can be accomplished using Oracle Database's RDBMS_SCHEDULER package and the Oracle R Enterprise SQL interface to Embedded R Execution.

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.Captcha
Oracle

Integrated Cloud Applications & Platform Services