More often than not, data doesn't come packaged exactly as we'd like it for analysis. Transformation, match-merge operations, and a host of data munging tasks are usually needed before we can extract insights from our Big Data sources. Few people find data munging exciting, but it has to be done. Once we've suffered that boredom, we should take steps to automate the process. We want codify our work into repeatable units and create workflows which we can leverage over and over again without having to write new code. In this article, we'll look at how to use Oozie to create a workflow for the parallel machine learning task I described on Cloudera's site.
In my parallel machine learning article, I use data from the National Climatic Data Center to build weather models on a state-by-state basis. NCDC makes the data freely available as gzipped files of day-over-day observations stretching from the 1930s to today. In reading that post, one might get the impression that the data came in a handy, ready-to-model files with convenient delimiters. The truth of it is that I need to perform some parsing and projection on the dataset before it can be modeled. If I get more observations, I'll want to retrain and test those models, which will require more parsing and projection. This is a good opportunity to start building up a workflow with Oozie.
I store the data from the NCDC in HDFS and create an external Hive table partitioned by year. This gives me flexibility of Hive's query language when I want it, but let's me put the dataset in a directory of my choosing in case I want to treat the same data with Pig or MapReduce code.
CREATE EXTERNAL TABLE IF NOT EXISTS historic_weather(column 1, column2)
PARTITIONED BY (yr string)
STORED AS ...
As new weather data comes in from NCDC, I'll need to add partitions to my table. That's an action I should put in the workflow. Similarly, the weather data requires parsing in order to be useful as a set of columns. Because of their long history, the weather data is broken up into fields of specific byte lengths: x bytes for the station ID, y bytes for the dew point, and so on. The delimiting is consistent from year to year, so writing SerDe or a parser for transformation is simple. Once that's done, I want to select columns on which to train, classify certain features, and place the training data in an HDFS directory for my Pig script to access.
ALTER TABLE historic_weather ADD IF NOT EXISTS PARTITION (yr='2010')
INSERT OVERWRITE DIRECTORY '/user/oracle/weather/cleaned_history'
SELECT w.stn, w.wban, w.weather_year, w.weather_month,
w.weather_day, w.temp, w.dewp, w.weather FROM (
FROM historic_weather SELECT TRANSFORM(...)
as stn, wban, weather_year, weather_month, weather_day, temp, dewp, weather
Since I'm going to prepare training directories with at least the same frequency that I add partitions, I should also add that to my workflow. Oozie is going to invoke these Hive actions using what's somewhat obviously referred to as a Hive action. Hive actions amount to Oozie running a script file containing our query language statements, so we can place them in a file called
Oozie offers two types of jobs: workflows and coordinator jobs. Workflows are straightforward: they define a set of actions to perform as a sequence or directed acyclic graph. Coordinator jobs can take all the same actions of Workflow jobs, but they can be automatically started either periodically or when new data arrives in a specified location. To keep things simple we'll make a workflow job; coordinator jobs simply require another XML file for scheduling. The bare minimum for workflow XML defines a name, a starting point, and an end point:
<workflow-app name="WeatherMan" xmlns="uri:oozie:workflow:0.1"><start to="ParseNCDCData"/><end name="end"/></workflow-app>
To this we need to add an action, and within that we'll specify the hive parameters Also, keep in mind that actions require
<ok> and <error> tags to direct the next action on success or failure.
<action name="ParseNCDCData"><hive xmlns="uri:oozie:hive-action:0.2"><job-tracker>localhost:8021</job-tracker><name-node>localhost:8020</name-node><configuration><property><name>oozie.hive.defaults</name><value>/user/oracle/weather_ooze/hive-default.xml</value></property></configuration><script>ncdc_parse.hql</script></hive><ok to="WeatherMan"/><error to="end"/></action>
hive-default.xmland script file must be stored in HDFS
hive-defaults.xmlon different clusters (e.g. MySQL or Postgres-backed metastores).
A quick way to ensure that all the assets end up in the right place in HDFS is just to make a working directory locally, build your workflow.xml in it, and copy the assets you'll need to it as you add actions to
At this point, our local directory should contain:
hive-defaults.xml(make sure this file contains your metastore connection data)
Adding our Pig script as an action is slightly simpler from an XML standpoint. All we do is add an action to
workflow.xml as follows:
<action name="WeatherMan"><pig><job-tracker>localhost:8021</job-tracker><name-node>localhost:8020</name-node><script>weather_train.pig</script></pig><ok to="end"/><error to="end"/></action>
Once we've done this, we'll copy
weather_train.pig to our working directory. However, there's a bit of a "gotcha" here.
My pig script registers the Weka Jar and a chunk of jython. If those aren't also in HDFS, our action will fail from the outset -- but where do we put them? The Jython script goes into the working directory at the same level as the pig script, because pig attempts to load Jython files in the directory from which the script executes. However, that's not where our Weka jar goes.
While Oozie doesn't assume much, it does make an assumption about the Pig classpath. Anything under
gets automatically added to the Pig classpath and no longer requires a REGISTER statement in the script. Anything that uses a REGISTER statement cannot be in the
working_directory/lib directory. Instead, it needs to be in a different HDFS directory and attached to the pig action with an
Yes, that's as confusing as you think it is.
You can get the exact rules for adding Jars to the
distributed cache from Oozie's Pig Cookbook.
We've got a workflow defined and have collected all the components we'll need to run. But we can't run anything yet, because we still have to define some properties about the job and submit it to Oozie. We need to start with the job properties, as this is essentially the "request" we'll submit to the Oozie server. In the same working directory, we'll make a file called
job.properties as follows:
While some of the pieces of the properties file are familiar (e.g., JobTracker address), others take a bit of explaining. The first is
weatherRoot: this is essentially an environment variable for the script (as are jobTracker and queueName). We're simply using them to simplify the directives for the Oozie job. The
oozie.libpath pieces is extremely important. This is a directory in HDFS which holds Oozie's shared libraries: a collection of Jars necessary for invoking Hive, Pig, and other actions. It's a good idea to make sure this has been installed and copied up to HDFS. The last two lines are straightforward: run the application defined by workflow.xml at the application path listed and write the output to the output directory.
We're finally ready to submit our job! After all that work we only need to do a few more things:
oozie validate workflow.xml
hadoop fs -put working_dir /user/oracle/working_dir
job.propertiesfile as an argument.
oozie job -oozie http://url.to.oozie.server:port_number/ -config /path/to/working_dir/job.properties -submit
PREPstatus. What we got back, in essence, is a ticket for our workflow to ride the
oozie -oozie http://url.to.oozie.server:port_number/ -start 14-20120525161321-oozie-oracle
So, there you have it: the somewhat laborious process of building an Oozie workflow. It's a bit tedious the first time out, but it
does present a pair of real benefits to those of us who spend a great deal of time data munging. First, when new data arrives that requires the same processing,
we already have the workflow defined and ready to run. Second, as we build up a set of useful action definitions over time, creating new workflows becomes quicker and