Thursday Feb 07, 2013

New Series of Video Workshops

Contrary to my habit of posting How-To articles, today I'm not going to tell you how to do anything.  Instead, I'm going to ask you to go do something.  What I want you to do is check out the series of video tutorials I'm doing around pieces of the Hadoop ecosystem and Big Data analytics.  

The first entry is on YouTube and in the Big Data Playlist on Oracle Media Network.  It gives a brief overview of setting up Flume flows into HDFS.  Of course, if you're looking for a more detailed How-To, just check the archives -- I covered it a while back.

Some of the topics I'll cover in upcoming months include:

  • Hive User Defined Functions
  • Market Basket Analysis with Hive, R and Data Miner 
  • Customer Segmentation and Clustering in Hadoop
  • Classification and Targeting using Random Forests

[Read More]

Wednesday Jan 30, 2013

Parallel R: Quick Ways Model More

Introduction

I am less and less often mistaken for a pirate when I mention the R language.  While I miss the excuse to wear an eyepatch, I'm glad more people are beginning to explore a statistical language I've been touting for years.  When it comes to plotting or running complex statistics in a single line of code, R is a great tool to have.  That said, there are plenty of pitfalls for the casual or new user: syntax, learning to write vectorized code, or even just knowing which "apply" function you really should choose.

  I want to explore a slightly less-often considered aspect of R development: parallelism.  Out of the box, R can seem very limited to someone used to working on compute clusters or even a multicore server.  However, there are a few tricks we can leverage to get the most out of R on everything from a personal workstation to a Hadoop cluster.

 R is Single-Threaded

The R interpreter is -- and likely always will be -- single-threaded.  This means loading data frames is done in a single thread.  So is building your linear model, or generating that pretty surface plot.  Even on my laptop, that's a lot of threads to not use for modeling.  No matter how much my web browser might covet those cycles, I'd like to use them for work.

Rather than a complex multithreaded re-implementation, the R interpreter offers a number of ways to allow users to selectively apply parallelism.  Some of these approaches leverage MPI libraries and mirror that message passing approach.  Others allow a more implicit parallelism via "foreach" or "apply" constructs. We'll just focus on a pair of strategies using the parallelism that's been included in R since it's 2.14.1 version: the parallel library.

 Setting The Stage for Parallel Execution

We're going to need to load a few libraries into our R session before we can execute anything outside of our single-thread.  We'll use the doParallel and foreach because they allow us to focus on what to parallelize rather than how to coordinate our threads.

> data(iris)
library(parallel)
library(iterators)
library(doParallel)
library(foreach)

Knowing that calculations in R will be single-threaded, we want to use the parallel package to operate on logical subsets of the data simultaneously.  For example, I loaded a set of data about Iris which contains a number of different species.  One way I might want to parallelize is to fit the same each species simultaneously.  For that, I'm going to have to split the data by species:

> species.split <- split(iris, iris$Species)

 This gives us a list we can iterate over -- or parallelize.  From here on out, it's simply a question of deciding what resources we want to leverage: local CPUs or remote hosts.

FORKs and SOCKs

We're going to use the makeCluster function to bind together a set of computational resources.  But first we need to decide: do we want to use only local CPUs, or is it necessary to open up socket connections to other machines distribute our workload?  In the former case we'll use makeCluster to create what's called a FORK cluster (in that it uses UNIX's fork call to create slaves).  In the latter, we'll create a SOCK cluster by opening up sockets to a list of remote hosts and starting slave processes on them.

Here's a FORK cluster which uses all my cores:

> cl <- makeCluster(detectCores())
registerDoParallel(cl)

And here's a SOCK cluster across three nodes (password-less SSH is required)

> hostlist <- c("10.0.0.1", "10.0.0.2", "10.0.0.3")
cl <- makeCluster(hostlist)
registerDoParallel(cl)

In each case, I call registerDoParallel to bind this cluster to the %dopar% operator.  This is the operator which will let us easily iterate in parallel.

Running in Parallel

Once we've got something to iterate over and a cluster with which to do it, modeling in parallel becomes straightforward.  Suppose I want to fit a model of sepal length as a linear combination of petal characteristics.  In that case, the code is simply:

> species.models <- foreach(i=species.split) %dopar% {
m<-lm(i$Sepal.Length ~ i$Petal.Width*i$Petal.Length);
return(m)
}

But I'm not just restricted to fitting linear models on my little cluster.  I can run k-means clustering for several different k simultaneously using basically the same block:

> species.clusters<- foreach(i=2:5) %dopar% {
km <- kmeans(iris, i);
return(km)
}

When I'm done with my block, I can just call stopCluster(cl) to ensure my processes terminate and I'm not hogging resources.

Using Hadoop

Finally, there will be situations in which I need to deploy in parallel against much larger datasets -- specifically, datasets stored in HDFS.  Both Hive and Pig will let me run an R script as part of a streaming process.  In Hive, the TRANSFORM operator will send data to an R Script.  In Pig, you can use theSTREAM operator to send a whole bag to an R script.  However, you can't stream from within Pig'sFOREACH blocks, so I occasionally use a UDF which invokes R scripts for me.

Regardless of the method you choose to send HDFS data to an R process, it's important to make sure your R script can consume data streaming from standard input.  I find the most expedient way of doing this via the file function.  A typical script might start:

#! /usr/bin/env Rscript
#Connection to STDIN for reading a data frame
con <- file(description="stdin")
my.data.frame <- read.table(con, header=FALSE, sep=",")

Summary

We've covered several ways to push R beyond the the bounds of its single-threaded core.  There are forking and socket mechanisms for spreading our work around, not to mention tricks for leveraging the power of Hadoop Streaming.  In each case, however, one thing stands out: we must be smart as modelers and understand what can and should be done in parallel.

[Read More]

Friday Oct 05, 2012

Building Simple Workflows in Oozie

Introduction

More often than not, data doesn't come packaged exactly as we'd like it for analysis. Transformation, match-merge operations, and a host of data munging tasks are usually needed before we can extract insights from our Big Data sources. Few people find data munging exciting, but it has to be done. Once we've suffered that boredom, we should take steps to automate the process. We want codify our work into repeatable units and create workflows which we can leverage over and over again without having to write new code. In this article, we'll look at how to use Oozie to create a workflow for the parallel machine learning task I described on Cloudera's site.

Hive Actions: Prepping for Pig

In my parallel machine learning article, I use data from the National Climatic Data Center to build weather models on a state-by-state basis. NCDC makes the data freely available as gzipped files of day-over-day observations stretching from the 1930s to today. In reading that post, one might get the impression that the data came in a handy, ready-to-model files with convenient delimiters. The truth of it is that I need to perform some parsing and projection on the dataset before it can be modeled. If I get more observations, I'll want to retrain and test those models, which will require more parsing and projection. This is a good opportunity to start building up a workflow with Oozie.

I store the data from the NCDC in HDFS and create an external Hive table partitioned by year. This gives me flexibility of Hive's query language when I want it, but let's me put the dataset in a directory of my choosing in case I want to treat the same data with Pig or MapReduce code.

CREATE EXTERNAL TABLE IF NOT EXISTS historic_weather(column 1, column2)
PARTITIONED BY (yr string)
STORED AS ...
LOCATION '/user/oracle/weather/historic';

As new weather data comes in from NCDC, I'll need to add partitions to my table. That's an action I should put in the workflow. Similarly, the weather data requires parsing in order to be useful as a set of columns. Because of their long history, the weather data is broken up into fields of specific byte lengths: x bytes for the station ID, y bytes for the dew point, and so on. The delimiting is consistent from year to year, so writing SerDe or a parser for transformation is simple. Once that's done, I want to select columns on which to train, classify certain features, and place the training data in an HDFS directory for my Pig script to access.

ALTER TABLE historic_weather ADD IF NOT EXISTS PARTITION (yr='2010')
LOCATION '/user/oracle/weather/historic/yr=2011';

INSERT OVERWRITE DIRECTORY '/user/oracle/weather/cleaned_history'
SELECT w.stn, w.wban, w.weather_year, w.weather_month,
w.weather_day, w.temp, w.dewp, w.weather FROM (
FROM historic_weather SELECT TRANSFORM(...)
USING '/path/to/hive/filters/ncdc_parser.py'
as stn, wban, weather_year, weather_month, weather_day, temp, dewp, weather
) w;

Since I'm going to prepare training directories with at least the same frequency that I add partitions, I should also add that to my workflow. Oozie is going to invoke these Hive actions using what's somewhat obviously referred to as a Hive action. Hive actions amount to Oozie running a script file containing our query language statements, so we can place them in a file called weather_train.hql.

Starting Our Workflow

Oozie offers two types of jobs: workflows and coordinator jobs. Workflows are straightforward: they define a set of actions to perform as a sequence or directed acyclic graph. Coordinator jobs can take all the same actions of Workflow jobs, but they can be automatically started either periodically or when new data arrives in a specified location. To keep things simple we'll make a workflow job; coordinator jobs simply require another XML file for scheduling. The bare minimum for workflow XML defines a name, a starting point, and an end point:

<workflow-app name="WeatherMan" xmlns="uri:oozie:workflow:0.1">
<start to="ParseNCDCData"/>
<end name="end"/>
</workflow-app>

To this we need to add an action, and within that we'll specify the hive parameters Also, keep in mind that actions require <ok> and <error> tags to direct the next action on success or failure.

<action name="ParseNCDCData">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>localhost:8021</job-tracker>
<name-node>localhost:8020</name-node>
<configuration>
<property>
<name>oozie.hive.defaults</name>
<value>/user/oracle/weather_ooze/hive-default.xml</value>
</property>
</configuration>
<script>ncdc_parse.hql</script>
</hive>
<ok to="WeatherMan"/>
<error to="end"/>
</action>

There are a couple of things to note here:
  1. I have to give the FQDN (or IP) and port of my JobTracker and NameNode.
  2. I have to include a hive-default.xml file.
  3. I have to include a script file.
  4. The hive-default.xml and script file must be stored in HDFS
That last point is particularly important. Oozie doesn't make assumptions about where a given workflow is being run. You might submit workflows against different clusters, or have different hive-defaults.xml on different clusters (e.g. MySQL or Postgres-backed metastores).

 

A quick way to ensure that all the assets end up in the right place in HDFS is just to make a working directory locally, build your workflow.xml in it, and copy the assets you'll need to it as you add actions toworkflow.xml. At this point, our local directory should contain:

  1. workflow.xml
  2. hive-defaults.xml (make sure this file contains your metastore connection data)
  3. ncdc_parse.hql

 

Adding Pig to the Ooze

Adding our Pig script as an action is slightly simpler from an XML standpoint. All we do is add an action toworkflow.xml as follows:

<action name="WeatherMan">
<pig>
<job-tracker>localhost:8021</job-tracker>
<name-node>localhost:8020</name-node>
<script>weather_train.pig</script>
</pig>
<ok to="end"/>
<error to="end"/>
</action>

 

Once we've done this, we'll copy weather_train.pig to our working directory. However, there's a bit of a "gotcha" here. My pig script registers the Weka Jar and a chunk of jython. If those aren't also in HDFS, our action will fail from the outset -- but where do we put them? The Jython script goes into the working directory at the same level as the pig script, because pig attempts to load Jython files in the directory from which the script executes. However, that's not where our Weka jar goes.

While Oozie doesn't assume much, it does make an assumption about the Pig classpath. Anything underworking_directory/lib gets automatically added to the Pig classpath and no longer requires a REGISTER statement in the script. Anything that uses a REGISTER statement cannot be in theworking_directory/lib directory. Instead, it needs to be in a different HDFS directory and attached to the pig action with an <archive> tag.

Yes, that's as confusing as you think it is.

 You can get the exact rules for adding Jars to the distributed cache from Oozie's Pig Cookbook.

Making the Workflow Work

We've got a workflow defined and have collected all the components we'll need to run. But we can't run anything yet, because we still have to define some properties about the job and submit it to Oozie. We need to start with the job properties, as this is essentially the "request" we'll submit to the Oozie server. In the same working directory, we'll make a file called job.properties as follows:

nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
weatherRoot=weather_ooze
mapreduce.jobtracker.kerberos.principal=foo
dfs.namenode.kerberos.principal=foo
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.wf.application.path=${nameNode}/user/${user.name}/${weatherRoot}
outputDir=weather-ooze

 

While some of the pieces of the properties file are familiar (e.g., JobTracker address), others take a bit of explaining. The first is weatherRoot: this is essentially an environment variable for the script (as are jobTracker and queueName). We're simply using them to simplify the directives for the Oozie job. Theoozie.libpath pieces is extremely important. This is a directory in HDFS which holds Oozie's shared libraries: a collection of Jars necessary for invoking Hive, Pig, and other actions. It's a good idea to make sure this has been installed and copied up to HDFS. The last two lines are straightforward: run the application defined by workflow.xml at the application path listed and write the output to the output directory.

We're finally ready to submit our job! After all that work we only need to do a few more things:

  1. Validate our workflow.xml
  2. Copy our working directory to HDFS
  3. Submit our job to the Oozie server
  4. Run our workflow
Let's do them in order. First validate the workflow:
oozie validate workflow.xml

Next, copy the working directory up to HDFS:
hadoop fs -put working_dir /user/oracle/working_dir

Now we submit the job to the Oozie server. We need to ensure that we've got the correct URL for the Oozie server, and we need to specify our job.properties file as an argument.
oozie job -oozie http://url.to.oozie.server:port_number/ -config /path/to/working_dir/job.properties -submit

We've submitted the job, but we don't see any activity on the JobTracker? All I got was this funny bit of output:
14-20120525161321-oozie-oracle

This is because submitting a job to Oozie creates an entry for the job and places it in PREP status. What we got back, in essence, is a ticket for our workflow to ride the Oozie train. We're responsible for redeeming our ticket and running the job.
oozie -oozie http://url.to.oozie.server:port_number/ -start 14-20120525161321-oozie-oracle

Of course, if we really want to run the job from the outset, we can change the "-submit" argument above to "-run." This will prep and run the workflow immediately.

 

Takeaway

So, there you have it: the somewhat laborious process of building an Oozie workflow. It's a bit tedious the first time out, but it does present a pair of real benefits to those of us who spend a great deal of time data munging. First, when new data arrives that requires the same processing, we already have the workflow defined and ready to run. Second, as we build up a set of useful action definitions over time, creating new workflows becomes quicker and quicker.

[Read More]
About

The data warehouse insider is written by the Oracle product management team and sheds lights on all thing data warehousing and big data.

Search

Archives
« April 2014
SunMonTueWedThuFriSat
  
2
4
5
6
7
8
9
10
11
12
13
14
16
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today