Monitoring progress of embedded R functions

By: Mark Hornick | Director, Advanced Analytics and Machine Learning

When you run R functions in Oracle Database, especially functions involving multiple R engines in parallel, you can monitor their progress using the Oracle R Enterprise datastore as a central location for progress notifications, or any intermediate status or results. In the following example, based on ore.groupApply, we illustrate instrumenting a simple function that builds a linear model to predict flight arrival delay based on a few other variables.

In the function modelBuildWithStatus, the function verifies that there are rows for building the model after eliminating incomplete cases supplied in argument dat. If not empty, the function builds a model and reports “success”, otherwise, it reports “no data.” It’s likely that the user would like to use this model in some way or save it in a datastore for future use, but for this example, we just build the model and discard it, validating that a model can be built on the data.

For this example, we use flights data from the nycflights13 package. We can easily load the flights data.frame into Oracle Database as a table using ore.create. This creates a proxy object in R of class ore.frame that can be used for in-database computations.


# ore.connect (...)
ore.create(flights, table="FLIGHTS")

modelBuildWithStatus <-
  function(dat) {
    dat <- dat[complete.cases(dat),]
    if (nrow(dat)>0L) {
      mod <- lm(arr_delay ~ distance + air_time + dep_delay, dat);
    } else

When we invoke this using ore.groupApply, the goal is to build one model per “unique carrier” or airline. Using the parallel argument, we specify the degree of parallelism and set it to 2.

res <- ore.groupApply(FLIGHTS[, c("carrier","distance", "arr_delay", "dep_delay", "air_time")],

The result tells us about the status of each execution. Below, we print the carriers that had no data.

> res.local
[1] "success"
[1] "success"
[1] "success"
[1] "success"

To monitor the progress of each execution, we can identify the group of data being processed in each function invocation using the value from the carrier column. For this particular data set, we use the first two characters of the carrier’s symbol appended to “group.” to form a unique object name for storing in the datastore identified by job.name. (If we don’t do this, the value will form an invalid object name.) Note that since the carrier column contains uniform data, we need only the first value.

The general idea for monitoring progress is to save an object in the datastore named for each execution of the function on a group. We can then list the contents of the named datastore and compute a percentage complete, which is discussed later in this post. For the “success” case, we assign the value “SUCCESS” to the variable named by the string in nm that we created earlier. Using ore.save, this uniquely named object is stored in the datastore with the name in job.name. We use the append=TRUE flag to indicate that the various function executions will be sharing the same named datastore.
If there is no data left in dat, we assign “NO DATA” to the variable named in nm and save that. Notice in both cases, we’re still returning “success” or “no data” so these come back in the list returned by ore.groupApply. However, we can return other values instead, e.g., the model produced.

modelBuildWithMonitoring <-
  function(dat, job.name) {
    nm <- paste("group.", substr(as.character(dat$carrier[1L]),1,2), sep="")
    dat <- dat[complete.cases(dat),]
    if (nrow(dat)>0L) {
      mod <- lm(arr_delay ~ distance + air_time + dep_delay, dat);
      assign(nm, "SUCCESS")
      ore.save(list=nm, name=job.name, append=TRUE)
    } else {
      assign(nm, "NO DATA")
      ore.save(list=nm, name=job.name, append=TRUE)
      "no data"

When we use this function in ore.groupApply, we provide the job.name and ore.connect arguments as well. The variable ore.connect must be set to TRUE in order to use the datastore. As the ore.groupApply executes, the datastore named by job.name will be increasingly getting objects added with the name of the carrier. First, delete the datastore named “job1”, if it exists.

res <- ore.groupApply(FLIGHTS[, c("carrier","distance", "arr_delay", "dep_delay", "air_time")],
                      job.name="job1", parallel=2L, ore.connect=TRUE)

To see the progress during execution, we can use the following function, which takes a job name and the cardinality of the INDEX column to determine the percent complete. This function is invoked in a separate R engine connected to the same schema. If the job name is found, we print the percent complete, otherwise stop with an error message.

check.progress <- function(job.name, total.groups) {
  if ( job.name %in% ore.datastore()$datastore.name )
    print(sprintf("%.1f%%", nrow(ore.datastoreSummary(name=job.name))/total.groups*100L))
    stop(paste("Job", job.name, " does not exist"))
To invoke this, compute the total number of groups and provide this and the job name to the function check.progress.
total.groups <- length(unique(FLIGHTS$carrier))
However, we really want a loop to report on the progress automatically. One simple approach is to set up a while loop with a sleep delay. When we reach 100%, stop. To be self-contained, we include a simplification of the function above as a local function.
check.progress.loop <- function(job.name, total.groups, sleep.time=2) {
  check.progress <- function(job.name, total.groups) {
    if ( job.name %in% ore.datastore()$datastore.name )
      print(sprintf("%.1f%%", nrow(ore.datastoreSummary(name=job.name))/total.groups*100L))
      paste("Job", job.name, " does not exist")
  while(1) {
    try(x <- check.progress(job.name,total.groups))
    if(x=="100.0%") break

As before, this function is invoked in a separate R engine connected to the same schema.

Looking at the results, we can see the progress reported at one second intervals. Since the models build quickly, it doesn’t take long to reach 100%. For functions that take longer to execute or where there are more groups to process, you may choose a longer sleep time. Following this, we look at the datastore “job1” using ore.datastore and its contents using ore.datastoreSummary.
R> check.progress.loop("job1",total.groups,sleep.time=1)
[1] "6.9%"
[1] "96.6%"
[1] "100.0%"
> ore.datastore("job1")
  datastore.name object.count size       creation.date description
1           job1           16  592 2017-09-20 12:38:26        

> ore.datastoreSummary("job1")
   object.name     class size length row.count col.count
1     group.9E character   37      1        NA        NA
2     group.AA character   37      1        NA        NA
3     group.AS character   37      1        NA        NA
4     group.B6 character   37      1        NA        NA
5     group.DL character   37      1        NA        NA
6     group.EV character   37      1        NA        NA
7     group.F9 character   37      1        NA        NA
8     group.FL character   37      1        NA        NA
9     group.HA character   37      1        NA        NA
10    group.MQ character   37      1        NA        NA
11    group.OO character   37      1        NA        NA
12    group.UA character   37      1        NA        NA
13    group.US character   37      1        NA        NA
14    group.VX character   37      1        NA        NA
15    group.WN character   37      1        NA        NA
16    group.YV character   37      1        NA        NA

The same basic technique can be used to note progress in any long running or complex embedded R function, e.g., in ore.tableApply or ore.doEval. At various points in the function, sequence-named objects can be added to a datastore. Moreover, the contents of those objects can contain incremental or partial results, or even debug output.

While we’ve focused on the R API for embedded R execution, the same functions could be invoked using the SQL API. However, monitoring would still be performed from an interactive R engine.

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.Captcha