Monitoring progress of embedded R functions

When you run R functions in the database, especially functions involving multiple R engines in parallel, you can monitor their progress using the Oracle R Enterprise datastore as a central location for progress notifications, or any intermediate status or results. In the following example, based on ore.groupApply, we illustrate instrumenting a simple function that builds a linear model to predict flight arrival delay based on a few other variables.

In the function modelBuildWithStatus, the function verifies that there are rows for building the model after eliminating incomplete cases supplied in argument dat. If not empty, the function builds a model and reports “success”, otherwise, it reports “no data.” It’s likely that the user would like to use this model in some way or save it in a datastore for future use, but for this example, we just build the model and discard it, validating that a model can be built on the data.


modelBuildWithStatus <-
  function(dat) {
    dat <- dat[complete.cases(dat),]
    if (nrow(dat)>0L) {
      mod <- lm(ARRDELAY ~ DISTANCE + AIRTIME + DEPDELAY, dat);
      "success"
    } else
      "no_data"
    }

When we invoke this using ore.groupApply, the goal is to build one model per “unique carrier” or airline. Using an ORE 1.4 feature, we specify the degree of parallelism using the parallel argument, setting it to 2.


res <- ore.groupApply(ONTIME_S[, c("UNIQUECARRIER","DISTANCE", "ARRDELAY", "DEPDELAY", "AIRTIME")],
        ONTIME_S$UNIQUECARRIER,
        modelBuildWithStatus,
        parallel=2L)

res.local<-ore.pull(res)
res.local[unlist(res.local)=="no_data"]

The result tells us about the status of each execution. Below, we print the unique carries that had no data.


R> res.local<-ore.pull(res)
R> res.local[unlist(res.local)=="no_data"]
$EA
[1] "no_data"

$`ML(1)`
[1] "no_data"

$`PA(1)`
[1] "no_data"

$PI
[1] "no_data"

$PS
[1] "no_data"

To monitor the progress of each execution, we can identify the group of data being processed in each function invocation using the value from the UNIQUECARRIER column. For this particular data set, we use the first two characters of the carrier’s symbol appended to “group.” to form a unique object name for storing in the datastore identified by job.name. (If we don’t do this, the value will form an invalid object name.) Note that since the UNIQUECARRIER column contains uniform data, we need only the first value.

The general idea for monitoring progress is to save an object in the datastore named for each execution of the function on a group. We can then list the contents of the named datastore and compute a percentage complete, which is discussed later in this post. For the “success” case, we assign the value “SUCCESS” to the variable named by the string in nm that we created earlier. Using ore.save, this uniquely named object is stored in the datastore with the name in job.name. We use the append=TRUE flag to indicate that the various function executions will be sharing the same named datastore.
If there is no data left in dat, we assign “NO DATA” to the variable named in nm and save that. Notice in both cases, we’re still returning “success” or “no data” so these come back in the list returned by ore.groupApply. However, we can return other values instead, e.g., the model produced.


modelBuildWithMonitoring <-
  function(dat, job.name) {
  nm <- paste("group.", substr(as.character(dat$UNIQUECARRIER[1L]),1,2), sep="")
  dat <- dat[complete.cases(dat),]
  if (nrow(dat)>0L) {
    mod <- lm(ARRDELAY ~ DISTANCE + AIRTIME + DEPDELAY, dat);
    assign(nm, "SUCCESS")
    ore.save(list=nm, name=job.name, append=TRUE)
    "success"
  } else {
    assign(nm, "NO DATA")
    ore.save(list=nm, name=job.name, append=TRUE)
    "no data"
  }
}

When we use this function in ore.groupApply, we provide the job.name and ore.connect arguments as well. The variable ore.connect must be set to TRUE in order to use the datastore. As the ore.groupApply executes, the datastore named by job.name will be increasingly getting objects added with the name of the carrier. First, delete the datastore named “job1”, if it exists.


ore.delete(name="job1")

res <- ore.groupApply(ONTIME_S[, c("UNIQUECARRIER","DISTANCE", "ARRDELAY", "DEPDELAY", "AIRTIME")],
        ONTIME_S$UNIQUECARRIER,
        modelBuildWithMonitoring,
        job.name="job1", parallel=2L, ore.connect=TRUE)

To see the progress during execution, we can use the following function, which takes a job name and the cardinality of the INDEX column to determine the percent complete. This function is invoked in a separate R engine connected to the same schema. If the job name is found, we print the percent complete, otherwise stop with an error message.


check.progress <- function(job.name, total.groups) {
  if ( job.name %in% ore.datastore()$datastore.name )
    print(sprintf("%.1f%%", nrow(ore.datastoreSummary(name=job.name))/total.groups*100L))
  else
    stop(paste("Job", job.name, " does not exist"))
}

To invoke this, compute the total number of groups and provide this and the job name to the function check.progress.
total.groups <- length(unique(ONTIME_S$UNIQUECARRIER))
check.progress("job1",total.groups)

However, we really want a loop to report on the progress automatically. One simple approach is to set up a while loop with a sleep delay. When we reach 100%, stop. To be self-contained, we include a simplification of the function above as a local function.


check.progress.loop <- function(job.name, total.groups, sleep.time=2) {
  check.progress <- function(job.name, total.groups) {
    if ( job.name %in% ore.datastore()$datastore.name )
      print(sprintf("%.1f%%", nrow(ore.datastoreSummary(name=job.name))/total.groups*100L))
    else
      paste("Job", job.name, " does not exist")
  }
  while(1) {
    try(x <- check.progress(job.name,total.groups))
    Sys.sleep(sleep.time)
    if(x=="100.0%") break
  }
}

As before, this function is invoked in a separate R engine connected to the same schema.


check.progress.loop("job1",total.groups)

Looking at the results, we can see the progress reported at one second intervals. Since the models build quickly, it doesn’t take long to reach 100%. For functions that take longer to execute or where there are more groups to process, you may choose a longer sleep time. Following this, we look at the datastore “job1” using ore.datastore and its contents using ore.datastoreSummary.


R> check.progress.loop("job1",total.groups,sleep.time=1)
[1] "6.9%"
[1] "96.6%"
[1] "100.0%"

R> ore.datastore(name="job1")
  datastore.name object.count size      creation.date description
1 job1 29 1073 2014-02-13 22:03:20
R> ore.datastoreSummary(name="job1")
object.name class size length row.count col.count
1 group.9E character 37 1 NA NA
2 group.AA character 37 1 NA NA
3 group.AQ character 37 1 NA NA
4 group.AS character 37 1 NA NA
5 group.B6 character 37 1 NA NA
6 group.CO character 37 1 NA NA
7 group.DH character 37 1 NA NA
8 group.DL character 37 1 NA NA
9 group.EA character 37 1 NA NA
10 group.EV character 37 1 NA NA
11 group.F9 character 37 1 NA NA
12 group.FL character 37 1 NA NA
13 group.HA character 37 1 NA NA
14 group.HP character 37 1 NA NA
15 group.ML character 37 1 NA NA
16 group.MQ character 37 1 NA NA
17 group.NW character 37 1 NA NA
18 group.OH character 37 1 NA NA
19 group.OO character 37 1 NA NA
20 group.PA character 37 1 NA NA
21 group.PI character 37 1 NA NA
22 group.PS character 37 1 NA NA
23 group.TW character 37 1 NA NA
24 group.TZ character 37 1 NA NA
25 group.UA character 37 1 NA NA
26 group.US character 37 1 NA NA
27 group.WN character 37 1 NA NA
28 group.XE character 37 1 NA NA
29 group.YV character 37 1 NA NA

The same basic technique can be used to note progress in any long running or complex embedded R function, e.g., in ore.tableApply or ore.doEval. At various points in the function, sequence-named objects can be added to a datastore. Moreover, the contents of those objects can contain incremental or partial results, or even debug output.

While we’ve focused on the R API for embedded R execution, the same functions could be invoked using the SQL API. However, monitoring would still be done from an interactive R engine.

Comments:

Post a Comment:
  • HTML Syntax: NOT allowed
About

The place for best practices, tips, and tricks for applying Oracle R Enterprise, Oracle R Distribution, ROracle, and Oracle R Advanced Analytics for Hadoop in both traditional and Big Data environments.

Search

Archives
« April 2014
SunMonTueWedThuFriSat
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today