More often than not, data doesn't come packaged exactly as we'd like it for analysis. Transformation, match-merge operations, and a host of data munging tasks are usually needed before we can extract insights from our Big Data sources. Few people find data munging exciting, but it has to be done. Once we've suffered that boredom, we should take steps to automate the process. We want codify our work into repeatable units and create workflows which we can leverage over and over again without having to write new code. In this article, we'll look at how to use Oozie to create a workflow for the parallel machine learning task I described on Cloudera's site.
Hive Actions: Prepping for Pig
In my parallel machine learning article, I use data from the National Climatic Data Center to build weather models on a state-by-state basis. NCDC makes the data freely available as gzipped files of day-over-day observations stretching from the 1930s to today. In reading that post, one might get the impression that the data came in a handy, ready-to-model files with convenient delimiters. The truth of it is that I need to perform some parsing and projection on the dataset before it can be modeled. If I get more observations, I'll want to retrain and test those models, which will require more parsing and projection. This is a good opportunity to start building up a workflow with Oozie.
I store the data from the NCDC in HDFS and create an external Hive table partitioned by year. This gives me flexibility of Hive's query language when I want it, but let's me put the dataset in a directory of my choosing in case I want to treat the same data with Pig or MapReduce code.
CREATE EXTERNAL TABLE IF NOT EXISTS historic_weather(column 1, column2)
PARTITIONED BY (yr string)
STORED AS ...
As new weather data comes in from NCDC, I'll need to add partitions to my table. That's an action I should put in the workflow. Similarly, the weather data requires parsing in order to be useful as a set of columns. Because of their long history, the weather data is broken up into fields of specific byte lengths: x bytes for the station ID, y bytes for the dew point, and so on. The delimiting is consistent from year to year, so writing SerDe or a parser for transformation is simple. Once that's done, I want to select columns on which to train, classify certain features, and place the training data in an HDFS directory for my Pig script to access.
ALTER TABLE historic_weather ADD IF NOT EXISTS PARTITION (yr='2010')
INSERT OVERWRITE DIRECTORY '/user/oracle/weather/cleaned_history'
SELECT w.stn, w.wban, w.weather_year, w.weather_month,
w.weather_day, w.temp, w.dewp, w.weather FROM (
FROM historic_weather SELECT TRANSFORM(...)
as stn, wban, weather_year, weather_month, weather_day, temp, dewp, weather
Since I'm going to prepare training directories with at least the same frequency that I add partitions, I should also add that to my workflow. Oozie is going to invoke these Hive actions using what's somewhat obviously referred to as a Hive action. Hive actions amount to Oozie running a script file containing our query language statements, so we can place them in a file called
Starting Our Workflow
Oozie offers two types of jobs: workflows and coordinator jobs. Workflows are straightforward: they define a set of actions to perform as a sequence or directed acyclic graph. Coordinator jobs can take all the same actions of Workflow jobs, but they can be automatically started either periodically or when new data arrives in a specified location. To keep things simple we'll make a workflow job; coordinator jobs simply require another XML file for scheduling. The bare minimum for workflow XML defines a name, a starting point, and an end point:
<workflow-app name="WeatherMan" xmlns="uri:oozie:workflow:0.1">
To this we need to add an action, and within that we'll specify the hive parameters Also, keep in mind that actions require <ok> and <error> tags to direct the next action on success or failure.
There are a couple of things to note here:
That last point is particularly important. Oozie doesn't make assumptions about where a given workflow is being run. You might submit workflows against different clusters, or have different
- I have to give the FQDN (or IP) and port of my JobTracker and NameNode.
- I have to include a
- I have to include a script file.
hive-default.xml and script file must be stored in HDFS
hive-defaults.xml on different clusters (e.g. MySQL or Postgres-backed metastores).
A quick way to ensure that all the assets end up in the right place in HDFS is just to make a working directory locally, build your workflow.xml in it, and copy the assets you'll need to it as you add actions to
workflow.xml. At this point, our local directory should contain:
hive-defaults.xml (make sure this file contains your metastore connection data)
Adding Pig to the Ooze
Adding our Pig script as an action is slightly simpler from an XML standpoint. All we do is add an action to
workflow.xml as follows:
Once we've done this, we'll copy
weather_train.pig to our working directory. However, there's a bit of a "gotcha" here. My pig script registers the Weka Jar and a chunk of jython. If those aren't also in HDFS, our action will fail from the outset -- but where do we put them? The Jython script goes into the working directory at the same level as the pig script, because pig attempts to load Jython files in the directory from which the script executes. However, that's not where our Weka jar goes.
While Oozie doesn't assume much, it does make an assumption about the Pig classpath. Anything under
working_directory/lib gets automatically added to the Pig classpath and no longer requires a REGISTER statement in the script. Anything that uses a REGISTER statement cannot be in the
working_directory/lib directory. Instead, it needs to be in a different HDFS directory and attached to the pig action with an
Yes, that's as confusing as you think it is.
You can get the exact rules for adding Jars to the distributed cache from Oozie's Pig Cookbook.
Making the Workflow Work
We've got a workflow defined and have collected all the components we'll need to run. But we can't run anything yet, because we still have to define some properties about the job and submit it to Oozie. We need to start with the job properties, as this is essentially the "request" we'll submit to the Oozie server. In the same working directory, we'll make a file called
job.properties as follows:
While some of the pieces of the properties file are familiar (e.g., JobTracker address), others take a bit of explaining. The first is
weatherRoot: this is essentially an environment variable for the script (as are jobTracker and queueName). We're simply using them to simplify the directives for the Oozie job. The
oozie.libpath pieces is extremely important. This is a directory in HDFS which holds Oozie's shared libraries: a collection of Jars necessary for invoking Hive, Pig, and other actions. It's a good idea to make sure this has been installed and copied up to HDFS. The last two lines are straightforward: run the application defined by workflow.xml at the application path listed and write the output to the output directory.
We're finally ready to submit our job! After all that work we only need to do a few more things:
Let's do them in order. First validate the workflow:
- Validate our
- Copy our working directory to HDFS
- Submit our job to the Oozie server
- Run our workflow
Next, copy the working directory up to HDFS:
oozie validate workflow.xml
Now we submit the job to the Oozie server. We need to ensure that we've got the correct URL for the Oozie server, and we need to specify our
hadoop fs -put working_dir /user/oracle/working_dir
job.properties file as an argument.
We've submitted the job, but we don't see any activity on the JobTracker? All I got was this funny bit of output:
oozie job -oozie http://url.to.oozie.server:port_number/ -config /path/to/working_dir/job.properties -submit
This is because submitting a job to Oozie creates an entry for the job and places it in
PREP status. What we got back, in essence, is a ticket for our workflow to ride the Oozie train. We're responsible for redeeming our ticket and running the job.
Of course, if we really want to run the job from the outset, we can change the "-submit" argument above to "-run." This will prep and run the workflow immediately.
oozie -oozie http://url.to.oozie.server:port_number/ -start 14-20120525161321-oozie-oracle
So, there you have it: the somewhat laborious process of building an Oozie workflow. It's a bit tedious the first time out, but it does present a pair of real benefits to those of us who spend a great deal of time data munging. First, when new data arrives that requires the same processing, we already have the workflow defined and ready to run. Second, as we build up a set of useful action definitions over time, creating new workflows becomes quicker and quicker.