Thursday Apr 04, 2013

Three Little Hive UDFs: Part 1

Introduction

In our ongoing series of posts explaining the in's and out's of Hive User Defined Functions, we're starting with the simplest case.  Of the three little UDFs, today's entry built a straw house: simple, easy to put together, but limited in applicability.  We'll walk through important parts of the code, but you can grab the whole source from github here.

Extending UDF

The first few lines of interest are very straightforward:

@Description(name = "moving_avg", value = "_FUNC_(x, n) - Returns the moving mean of a set of numbers over a window of n observations")
@UDFType(deterministic = false, stateful = true)

public class UDFSimpleMovingAverage extends UDF

We're extending the UDF class with some decoration.  The decoration is important for usability and functionality.  The description decorator allows us to give the Hive some information to show users about how to use our UDF and what it's method signature will be.  The UDFType decoration tells Hive what sort of behavior to expect from our function.

 A deterministic UDF will always return the same output given a particular input.  A square-root computing UDF will always return the same square root for 4, so we can say it is deterministic; a call to get the system time would not be.  The stateful annotation of the UDFType decoration is relatively new to Hive (e.g., CDH4 and above).  The stateful directive allows Hive to keep some static variables available across rows.  The simplest example of this is a "row-sequence," which maintains a static counter which increments with each row processed.

  Since square-root and row-counting aren't terribly interesting, we'll use the stateful annotation to build a simple moving average function.  We'll return to the notion of a moving average later when we build a UDAF, so as to compare the two approaches.

private DoubleWritable result = new DoubleWritable();
  private static ArrayDeque<Double> window;
  int windowSize;
  
  public UDFSimpleMovingAverage() {
    result.set(0);

}

 The above code is basic initialization.  We make a double in which to hold the result, but it needs to be of class DoubleWritable so that MapReduce can properly serialize the data.  We use a deque to hold our sliding window, and we need to keep track of the window's size.  Finally, we implement a constructor for the UDF class.

 public DoubleWritable evaluate(DoubleWritable v, IntWritable n) {
    double sum = 0.0;
    double moving_average;
    double residual;
    if (window == null)
    {
        window = new ArrayDeque<Double>();

}

Here's the meat of the class: the evaluate method.  This method will be called on each row by the map tasks.  For any given row, we can't say whether or not our sliding window exists, so we initialize it if it's null.

//slide the window
    if (window.size() == n.get())
        window.pop();
            
    window.addLast(new Double(v.get()));
            
    // compute the average
    for (Iterator<Double> i = window.iterator(); i.hasNext();)

sum += i.next().doubleValue();

Here we deal with the deque and compute the sum of the window's elements.  Deques are essentially double-ended queues, so they make excellent sliding windows.  If the window is full, we pop the oldest element and add the current value.

moving_average = sum/window.size();
    result.set(moving_average);

return result;

Computing the moving average without weighting is simply dividing the sum of our window by its size.  We then set that value in our Writable variable and return it.  The value is then emitted as part of the map task executing the UDF function.

Going Further

The stateful annotation made it simple for us to compute a moving average since we could keep the deque static.  However, how would we compute a moving average if there was no notion of state between Hadoop tasks? At the end of the series we'll examine a UDAF that does this, but the algorithm ends up being much different.  In the meantime, I challenge the reader to think about what sort of approach is needed to compute the window.

[Read More]

Tuesday Apr 02, 2013

User Defined Functions in Hive

Introduction

User-defined Functions (UDFs) have a long history of usefulness in SQL-derived languages.  While query languages can be rich in their expressiveness, there's just no way they can anticipate all the things a developer wants to do.  Thus, the custom UDF has become commonplace in our data manipulation toolbox.

Apache Hive is no different in this respect from other SQL-like languages.  Hive allows extensibility via both Hadoop Streaming and compiled Java.  However, largely because of the underlying MapReduce paradigm, all Hive UDFs are not created equally.  Some UDFs are intended for "map-side" execution, while others are portable and can be run on the "reduce-side."  Moreover, UDF behavior via streaming requires that queries be formatted so as to direct script execution where we desire it.

 The intricacies of where and how a UDF executes may seem like minutiae, but we would be disappointed time spent coding a cumulative sum UDF only executed on single rows.  To that end, I'm going to spend the rest of the week diving into the three primary types of Java-based UDFs in Hive.  You can find all of the sample code discussed here.

The Three Little UDFs

Hive provides three classes of UDFs that most users are interested in: UDFs, UDTFs, and UDAFs.  Broken down simply, the three classes can be explained as such:

  • UDFs -- User Defined Functions; these operate row-wise, generally during map execution.  They're the simplest UDFs to write, but constrained in their functionality.
  • UDTFs -- User Defined Table-Generating Functions; these also execute row-wise, but they produce multiple rows of output (i.e., they generate a table).  The most common example of this is Hive's explode function.
  • UDAFs -- User Defined Aggregating Functions; these can execute on either the map-side or the reduce-side and far more flexible than UDFs.  The challenge, however, is that in writing UDAFs we have to think not just about what to do with a single row, or even a group of rows.  Here, one has to consider partial aggregation and serialization between map and reduce proceses.
Over the next few days, we'll walk through code for each of these function types, from simple to complex.  Along the way, we'll end up with a couple of useful functions you can use in your own Hive code (or improve upon). 

[Read More]

Tuesday Mar 05, 2013

Hadoop Cluster: Build vs. Buy (part II)

About a year ago we did a comparison (with an update here) of Build your Own Hadoop cluster and a Big Data Appliance, where we focused purely on the hardware and software cost. We thought it could use an update, but luckily an analyst firm did one for us and this time it covers both the Hardware/Software costs, but also ventures a lot more into dealing with other costs.

Read all about it in ESG's Getting Real About Big Data, Build vs Buy (Feb 2013) here.

Some highlights from the report:

  • Oracle Big Data Appliance($450k) is 39% less costly than "build your own"($733k) 
  • OBDA reduces time-to-market by 33% vs "build"

But, the report is not just about those numbers, it covers a number of very interesting things like 3 Hadoop Myths, the importance of big data in the near future and the priority customers give to improving their analytics footprint.

Enjoy the read!


Wednesday Feb 20, 2013

Looking for tools to solve your big data problems?

Look no further, Infosys today announced its Infosys BigDataEdge developer platform to drive value from your big data stack. 

By empowering business users to rapidly develop insights from vast amounts of structured and unstructured data, better business decisions can be made in near real-time. With Infosys BigDataEdge, enterprises can reduce the time taken to extract information by up to 40 percent and generate insights up to eight times faster.

Read More.

Thursday Feb 07, 2013

New Series of Video Workshops

Contrary to my habit of posting How-To articles, today I'm not going to tell you how to do anything.  Instead, I'm going to ask you to go do something.  What I want you to do is check out the series of video tutorials I'm doing around pieces of the Hadoop ecosystem and Big Data analytics.  

The first entry is on YouTube and in the Big Data Playlist on Oracle Media Network.  It gives a brief overview of setting up Flume flows into HDFS.  Of course, if you're looking for a more detailed How-To, just check the archives -- I covered it a while back.

Some of the topics I'll cover in upcoming months include:

  • Hive User Defined Functions
  • Market Basket Analysis with Hive, R and Data Miner 
  • Customer Segmentation and Clustering in Hadoop
  • Classification and Targeting using Random Forests

[Read More]

Wednesday Jan 30, 2013

Parallel R: Quick Ways Model More

Introduction

I am less and less often mistaken for a pirate when I mention the R language.  While I miss the excuse to wear an eyepatch, I'm glad more people are beginning to explore a statistical language I've been touting for years.  When it comes to plotting or running complex statistics in a single line of code, R is a great tool to have.  That said, there are plenty of pitfalls for the casual or new user: syntax, learning to write vectorized code, or even just knowing which "apply" function you really should choose.

  I want to explore a slightly less-often considered aspect of R development: parallelism.  Out of the box, R can seem very limited to someone used to working on compute clusters or even a multicore server.  However, there are a few tricks we can leverage to get the most out of R on everything from a personal workstation to a Hadoop cluster.

 R is Single-Threaded

The R interpreter is -- and likely always will be -- single-threaded.  This means loading data frames is done in a single thread.  So is building your linear model, or generating that pretty surface plot.  Even on my laptop, that's a lot of threads to not use for modeling.  No matter how much my web browser might covet those cycles, I'd like to use them for work.

Rather than a complex multithreaded re-implementation, the R interpreter offers a number of ways to allow users to selectively apply parallelism.  Some of these approaches leverage MPI libraries and mirror that message passing approach.  Others allow a more implicit parallelism via "foreach" or "apply" constructs. We'll just focus on a pair of strategies using the parallelism that's been included in R since it's 2.14.1 version: the parallel library.

 Setting The Stage for Parallel Execution

We're going to need to load a few libraries into our R session before we can execute anything outside of our single-thread.  We'll use the doParallel and foreach because they allow us to focus on what to parallelize rather than how to coordinate our threads.

> data(iris)
library(parallel)
library(iterators)
library(doParallel)
library(foreach)

Knowing that calculations in R will be single-threaded, we want to use the parallel package to operate on logical subsets of the data simultaneously.  For example, I loaded a set of data about Iris which contains a number of different species.  One way I might want to parallelize is to fit the same each species simultaneously.  For that, I'm going to have to split the data by species:

> species.split <- split(iris, iris$Species)

 This gives us a list we can iterate over -- or parallelize.  From here on out, it's simply a question of deciding what resources we want to leverage: local CPUs or remote hosts.

FORKs and SOCKs

We're going to use the makeCluster function to bind together a set of computational resources.  But first we need to decide: do we want to use only local CPUs, or is it necessary to open up socket connections to other machines distribute our workload?  In the former case we'll use makeCluster to create what's called a FORK cluster (in that it uses UNIX's fork call to create slaves).  In the latter, we'll create a SOCK cluster by opening up sockets to a list of remote hosts and starting slave processes on them.

Here's a FORK cluster which uses all my cores:

> cl <- makeCluster(detectCores())
registerDoParallel(cl)

And here's a SOCK cluster across three nodes (password-less SSH is required)

> hostlist <- c("10.0.0.1", "10.0.0.2", "10.0.0.3")
cl <- makeCluster(hostlist)
registerDoParallel(cl)

In each case, I call registerDoParallel to bind this cluster to the %dopar% operator.  This is the operator which will let us easily iterate in parallel.

Running in Parallel

Once we've got something to iterate over and a cluster with which to do it, modeling in parallel becomes straightforward.  Suppose I want to fit a model of sepal length as a linear combination of petal characteristics.  In that case, the code is simply:

> species.models <- foreach(i=species.split) %dopar% {
m<-lm(i$Sepal.Length ~ i$Petal.Width*i$Petal.Length);
return(m)
}

But I'm not just restricted to fitting linear models on my little cluster.  I can run k-means clustering for several different k simultaneously using basically the same block:

> species.clusters<- foreach(i=2:5) %dopar% {
km <- kmeans(iris, i);
return(km)
}

When I'm done with my block, I can just call stopCluster(cl) to ensure my processes terminate and I'm not hogging resources.

Using Hadoop

Finally, there will be situations in which I need to deploy in parallel against much larger datasets -- specifically, datasets stored in HDFS.  Both Hive and Pig will let me run an R script as part of a streaming process.  In Hive, the TRANSFORM operator will send data to an R Script.  In Pig, you can use theSTREAM operator to send a whole bag to an R script.  However, you can't stream from within Pig'sFOREACH blocks, so I occasionally use a UDF which invokes R scripts for me.

Regardless of the method you choose to send HDFS data to an R process, it's important to make sure your R script can consume data streaming from standard input.  I find the most expedient way of doing this via the file function.  A typical script might start:

#! /usr/bin/env Rscript
#Connection to STDIN for reading a data frame
con <- file(description="stdin")
my.data.frame <- read.table(con, header=FALSE, sep=",")

Summary

We've covered several ways to push R beyond the the bounds of its single-threaded core.  There are forking and socket mechanisms for spreading our work around, not to mention tricks for leveraging the power of Hadoop Streaming.  In each case, however, one thing stands out: we must be smart as modelers and understand what can and should be done in parallel.

[Read More]

Monday Jan 28, 2013

First Oracle BIWA Data Scientists Certified

For those who attended the BIWA Summit a few weeks ago, you would have seen the data scientist certification. BIWA just listed the first batch of data scientists it certified:

Instructor Level Certificate  - Brendan Tierney

Oracle Data Scientist Certificate 
Don Ferguson, CherryRoad Technologies
Jorge Anicama, IBM (GBS)
Tim Vlamis, Vlamis Software Solutions
Vijayalakshmi Muthukrishnan, Motorola
Sicheng Liu, Deloitte Consulting
Avik Bhattacharya, Printpack Inc.
Ari Kaplan, Ariball
Paul Mitchell, Oracle

Associate Level 
Suresh Anand, Sashatech LLC

Participation Certificate 
Ahmed Kopap
Ekine Akuiyibo
Khader Mohiuddin

More on the program, see here: http://oraclebiwasig.blogspot.com/2013/01/oracle-data-scientist-at-biwa-summit.html


Friday Jan 25, 2013

Announcing: OTN Big Data Developer Day

Announcing the first Big Data Developer Day. A full day with two tracks and hands-on on all things Big Data at Oracle!!

An influx of new data types combined with new approaches for analyzing data are creating untapped growth opportunities that have the potential to transform your business. Oracle is the first vendor to provide a complete and integrated set of enterprise-ready products to address the full spectrum of big data business requirements. Jumpstart your understanding of big data in the enterprise by attending this complementary one-day hands-on workshop. You will learn from technical experts how to:

  • Write MapReduce on Oracle’s Big Data Platform
  • Manage a Big Data environment
  • Access Oracle NoSQL Database
  • Manage Oracle NoSQL DB Cluster
  • Use data from a Hadoop Cluster with Oracle
  • Develop analytics on big data

Register today to learn these skills which you can immediately put to use within your organization.

For more information and to sign up (space is limited!) click the link here.

So if you are in the bay area, do come and learn the coolest new technologies.

Monday Jan 21, 2013

A short Big Data and Engineered Systems Customer Video

Apart from the 2012 Government Big Data Award hear from Thomson Reuters in this video:

Friday Jan 18, 2013

Big Data Appliance X3-2 Updates

Untitled Document

Hello world. Waaw, time went by too fast. Happy new year, and here is the long past due update on the new Big Data Appliance and the software updates.

Big Data Appliance X3-2

Both the software as well as the hardware of the Big Data Appliance got a refresher.

Hardware Update

A good place to start is to quickly review the hardware differences (no price changes!). On a per node basis the following is a comparison between old and new (X3-2) hardware:

Big Data Appliance v1

Big Data Appliance X3-2

CPU

2 x 6-Core Intel® Xeon® 5675 (3.06 GHz)
2 x 8-Core Intel® Xeon® E5-2660 (2.2 GHz)
Memory
48GB
64GB expandable to 512GB
Disk

12 x 3TB High Capacity SAS

12 x 3TB High Capacity SAS
InfiniBand
40Gb/sec
40Gb/sec
Ethernet
10Gb/sec
10Gb/sec
KVM
1 KVM Switch
N/A (removed)

For all the details on the environmentals and other useful information, review the data sheet for Big Data Appliance X3-2. For those wondering what we did with the 2RU we now have left from the KVM, that is open space, at the top of the rack.

The higher core count gives a BDA X3-2 more parallel compute power while saving some 30% in energy and heat.

Software Update

As we did with Hardware, a good place to start is a quick overview of the software changes in below table:

Big Data Appliance v1.1.x Software Stack Big Data Appliance V2.0.1 Software Stack
Linux
Oracle Linux 5.6
Oracle Linux 5.8 with UEK
JDK
1.6
1.6u35
Cloudera CDH
CDH 3u4
CDH 4.1.x
Cloudera Manager
CM 3
CM 4.1
Oracle Enterprise Manager
N/A
Big Data Appliance Plug-In for Enterprise Manager
R
Open Source R
Oracle R Distribution 2.x
Big Data Connectors *
Big Data Connectors 1.1.x
Big Data Connectors 2.0.x
Oracle NoSQL Database CE **
NoSQL DB 1.x
NoSQL DB 2.x

* Oracle Big Data Connectors is a separately licensed product which can be pre-installed and pre-configured on BDA
** Oracle NoSQL DB 2.x will be pre-installed in a future update to Mammoth but can be applied manually today

Apart from the versions updates, bug fixes and a great number of performance improvements across the entire system, the biggest updates are the inclusion of CDH 4.1.2 and the default set up of highly available name nodes for Hadoop, the Enterprise Manager management of the BDA, the uptake of the Oracle R Distribution and the updates to Oracle NoSQL Database. In a nutshell these updates deliver the following improvements:

Cloudera CDH 4.1.x

The latest version of CDH and CM deliver:

  • Higher overall performance
  • Highly available name nodes with the BDA using failover quorum processes instead of an external HA filer solution
  • Vastly expanded management capabilities via CM 4

On top of this, BDA now has both Zookeeper and Oozie configured out of the box.

Oracle Enterprise Manager

The new Big Data Appliance Plug-In for Enterprise Manager delivers the first end-to-end management of the Hadoop cluster from hardware metrics to software and Hadoop metrics. To achieve the end-to-end management of the system Enterprise Manager delivers all the system metrics users are used to from the Exadata Plug-In for Enterprise Manager. Enterprise Manager enables a seamless transition between the Hardware and high level software monitoring and the expanded Hadoop monitoring and diagnostics from Cloudera Manager. This combination of functionality makes operations for a BDA simpler and allows operations staff to seamlessly switch between their Exadata, Big Data Appliance and other Oracle Engineered systems.

Oracle R Distribution

The big difference between Oracle R Distribution and the Opensource R distribution is that Oracle R Distribution is enabled to dynamically load the math kernel libraries on the CPUs from both Intel and AMD. This increases performance of basic calculations, which in turn increases the performance of the overall R calculations because more math is off-loaded into the CPUs.

Oracle NoSQL Database 2.x

A great number of great new features are added into NoSQL DB 2.x. Most of these are in both the Community Edition as well in the Enterprise Edition. Charles Lamb has a nice concise post describing what is new here.

Big Data Connectors

To close out, Big Data Connectors got a refresher focused on performance, so download the new products here and give them a go via this download page. More information on news, read the data sheet here.

Tuesday Nov 20, 2012

Winner of the 2012 Government Big Data Solutions Award

Hot off the press:

The winner of the 2012 Government Big Data Solutions Aware is the National Cancer Institute!! Read all the details on CTOLabs.com.

A short excerpt to wet your appetite: "... This solution, based on the Oracle Big Data Appliance with the Cloudera Distribution of Apache Hadoop (CDH), leverages capabilities available from the Big Data community today in pioneering ways that can serve a broad range of researchers. The promising approach of this solution is repeatable across many other Big Data challenges for bioinfomatics, making this approach worthy of its selection as the 2012 Government Big Data Solution Award."

Read the entire post.

Congrats to the entire team!!

Monday Oct 15, 2012

The Oldest Big Data Problem: Parsing Human Language

There's a new whitepaper up on Oracle Technology Network which details the use of Digital Reasoning Systems' Synthesys software on Oracle Big Data Appliance.  Digital Reasoning's approach is inherently "big data friendly," as it leverages multiple components of the Hadoop ecosystem.  Moreover, the paper addresses the oldest big data problem of them all: extracting knowledge from human text.

  You can find the paper here.

  From the Executive Summary:

There is a wealth of information to be extracted from natural language, but that extraction is challenging. The volume of human language we generate constitutes a natural Big Data problem, while its complexity and nuance requires a particular expertise to model and mine. In this paper we illustrate the impressive combination of Oracle Big Data Appliance and Digital Reasoning Synthesys software. The combination of Synthesys and Big Data Appliance makes it possible to analyze tens of millions of documents in a matter of hours. Moreover, this powerful combination achieves four times greater throughput than conducting the equivalent analysis on a much larger cloud-deployed Hadoop cluster.

[Read More]

Friday Oct 12, 2012

Read how a customer uses Oracle NoSQL Database

For those who have had the pleasure to be in SF for Oracle Openworld, you might have seen or heard about this story already. If you did not, here is a great story on how to use Oracle NoSQL Database.

Apart from all the cool technology, I'm just excited that this is a company founded by a football international and dealing with sports data, games and other cool things. Like an all things cool combo in one place.


Friday Oct 05, 2012

Building Simple Workflows in Oozie

Introduction

More often than not, data doesn't come packaged exactly as we'd like it for analysis. Transformation, match-merge operations, and a host of data munging tasks are usually needed before we can extract insights from our Big Data sources. Few people find data munging exciting, but it has to be done. Once we've suffered that boredom, we should take steps to automate the process. We want codify our work into repeatable units and create workflows which we can leverage over and over again without having to write new code. In this article, we'll look at how to use Oozie to create a workflow for the parallel machine learning task I described on Cloudera's site.

Hive Actions: Prepping for Pig

In my parallel machine learning article, I use data from the National Climatic Data Center to build weather models on a state-by-state basis. NCDC makes the data freely available as gzipped files of day-over-day observations stretching from the 1930s to today. In reading that post, one might get the impression that the data came in a handy, ready-to-model files with convenient delimiters. The truth of it is that I need to perform some parsing and projection on the dataset before it can be modeled. If I get more observations, I'll want to retrain and test those models, which will require more parsing and projection. This is a good opportunity to start building up a workflow with Oozie.

I store the data from the NCDC in HDFS and create an external Hive table partitioned by year. This gives me flexibility of Hive's query language when I want it, but let's me put the dataset in a directory of my choosing in case I want to treat the same data with Pig or MapReduce code.

CREATE EXTERNAL TABLE IF NOT EXISTS historic_weather(column 1, column2)
PARTITIONED BY (yr string)
STORED AS ...
LOCATION '/user/oracle/weather/historic';

As new weather data comes in from NCDC, I'll need to add partitions to my table. That's an action I should put in the workflow. Similarly, the weather data requires parsing in order to be useful as a set of columns. Because of their long history, the weather data is broken up into fields of specific byte lengths: x bytes for the station ID, y bytes for the dew point, and so on. The delimiting is consistent from year to year, so writing SerDe or a parser for transformation is simple. Once that's done, I want to select columns on which to train, classify certain features, and place the training data in an HDFS directory for my Pig script to access.

ALTER TABLE historic_weather ADD IF NOT EXISTS PARTITION (yr='2010')
LOCATION '/user/oracle/weather/historic/yr=2011';

INSERT OVERWRITE DIRECTORY '/user/oracle/weather/cleaned_history'
SELECT w.stn, w.wban, w.weather_year, w.weather_month,
w.weather_day, w.temp, w.dewp, w.weather FROM (
FROM historic_weather SELECT TRANSFORM(...)
USING '/path/to/hive/filters/ncdc_parser.py'
as stn, wban, weather_year, weather_month, weather_day, temp, dewp, weather
) w;

Since I'm going to prepare training directories with at least the same frequency that I add partitions, I should also add that to my workflow. Oozie is going to invoke these Hive actions using what's somewhat obviously referred to as a Hive action. Hive actions amount to Oozie running a script file containing our query language statements, so we can place them in a file called weather_train.hql.

Starting Our Workflow

Oozie offers two types of jobs: workflows and coordinator jobs. Workflows are straightforward: they define a set of actions to perform as a sequence or directed acyclic graph. Coordinator jobs can take all the same actions of Workflow jobs, but they can be automatically started either periodically or when new data arrives in a specified location. To keep things simple we'll make a workflow job; coordinator jobs simply require another XML file for scheduling. The bare minimum for workflow XML defines a name, a starting point, and an end point:

<workflow-app name="WeatherMan" xmlns="uri:oozie:workflow:0.1">
<start to="ParseNCDCData"/>
<end name="end"/>
</workflow-app>

To this we need to add an action, and within that we'll specify the hive parameters Also, keep in mind that actions require <ok> and <error> tags to direct the next action on success or failure.

<action name="ParseNCDCData">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>localhost:8021</job-tracker>
<name-node>localhost:8020</name-node>
<configuration>
<property>
<name>oozie.hive.defaults</name>
<value>/user/oracle/weather_ooze/hive-default.xml</value>
</property>
</configuration>
<script>ncdc_parse.hql</script>
</hive>
<ok to="WeatherMan"/>
<error to="end"/>
</action>

There are a couple of things to note here:
  1. I have to give the FQDN (or IP) and port of my JobTracker and NameNode.
  2. I have to include a hive-default.xml file.
  3. I have to include a script file.
  4. The hive-default.xml and script file must be stored in HDFS
That last point is particularly important. Oozie doesn't make assumptions about where a given workflow is being run. You might submit workflows against different clusters, or have different hive-defaults.xml on different clusters (e.g. MySQL or Postgres-backed metastores).

 

A quick way to ensure that all the assets end up in the right place in HDFS is just to make a working directory locally, build your workflow.xml in it, and copy the assets you'll need to it as you add actions toworkflow.xml. At this point, our local directory should contain:

  1. workflow.xml
  2. hive-defaults.xml (make sure this file contains your metastore connection data)
  3. ncdc_parse.hql

 

Adding Pig to the Ooze

Adding our Pig script as an action is slightly simpler from an XML standpoint. All we do is add an action toworkflow.xml as follows:

<action name="WeatherMan">
<pig>
<job-tracker>localhost:8021</job-tracker>
<name-node>localhost:8020</name-node>
<script>weather_train.pig</script>
</pig>
<ok to="end"/>
<error to="end"/>
</action>

 

Once we've done this, we'll copy weather_train.pig to our working directory. However, there's a bit of a "gotcha" here. My pig script registers the Weka Jar and a chunk of jython. If those aren't also in HDFS, our action will fail from the outset -- but where do we put them? The Jython script goes into the working directory at the same level as the pig script, because pig attempts to load Jython files in the directory from which the script executes. However, that's not where our Weka jar goes.

While Oozie doesn't assume much, it does make an assumption about the Pig classpath. Anything underworking_directory/lib gets automatically added to the Pig classpath and no longer requires a REGISTER statement in the script. Anything that uses a REGISTER statement cannot be in theworking_directory/lib directory. Instead, it needs to be in a different HDFS directory and attached to the pig action with an <archive> tag.

Yes, that's as confusing as you think it is.

 You can get the exact rules for adding Jars to the distributed cache from Oozie's Pig Cookbook.

Making the Workflow Work

We've got a workflow defined and have collected all the components we'll need to run. But we can't run anything yet, because we still have to define some properties about the job and submit it to Oozie. We need to start with the job properties, as this is essentially the "request" we'll submit to the Oozie server. In the same working directory, we'll make a file called job.properties as follows:

nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
weatherRoot=weather_ooze
mapreduce.jobtracker.kerberos.principal=foo
dfs.namenode.kerberos.principal=foo
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.wf.application.path=${nameNode}/user/${user.name}/${weatherRoot}
outputDir=weather-ooze

 

While some of the pieces of the properties file are familiar (e.g., JobTracker address), others take a bit of explaining. The first is weatherRoot: this is essentially an environment variable for the script (as are jobTracker and queueName). We're simply using them to simplify the directives for the Oozie job. Theoozie.libpath pieces is extremely important. This is a directory in HDFS which holds Oozie's shared libraries: a collection of Jars necessary for invoking Hive, Pig, and other actions. It's a good idea to make sure this has been installed and copied up to HDFS. The last two lines are straightforward: run the application defined by workflow.xml at the application path listed and write the output to the output directory.

We're finally ready to submit our job! After all that work we only need to do a few more things:

  1. Validate our workflow.xml
  2. Copy our working directory to HDFS
  3. Submit our job to the Oozie server
  4. Run our workflow
Let's do them in order. First validate the workflow:
oozie validate workflow.xml

Next, copy the working directory up to HDFS:
hadoop fs -put working_dir /user/oracle/working_dir

Now we submit the job to the Oozie server. We need to ensure that we've got the correct URL for the Oozie server, and we need to specify our job.properties file as an argument.
oozie job -oozie http://url.to.oozie.server:port_number/ -config /path/to/working_dir/job.properties -submit

We've submitted the job, but we don't see any activity on the JobTracker? All I got was this funny bit of output:
14-20120525161321-oozie-oracle

This is because submitting a job to Oozie creates an entry for the job and places it in PREP status. What we got back, in essence, is a ticket for our workflow to ride the Oozie train. We're responsible for redeeming our ticket and running the job.
oozie -oozie http://url.to.oozie.server:port_number/ -start 14-20120525161321-oozie-oracle

Of course, if we really want to run the job from the outset, we can change the "-submit" argument above to "-run." This will prep and run the workflow immediately.

 

Takeaway

So, there you have it: the somewhat laborious process of building an Oozie workflow. It's a bit tedious the first time out, but it does present a pair of real benefits to those of us who spend a great deal of time data munging. First, when new data arrives that requires the same processing, we already have the workflow defined and ready to run. Second, as we build up a set of useful action definitions over time, creating new workflows becomes quicker and quicker.

[Read More]

Wednesday Sep 26, 2012

See a Big Data Appliance in the wild

It is a news day today, check out the pictures of the BDA getting wheeled into Enkitec...

Kerry's Blog Post

Always fun to see  happy faces when the new to arrives. Looking forward to more Enkitec news in the coming weeks (starting at Openworld of course).


About

The data warehouse insider is written by the Oracle product management team and sheds lights on all thing data warehousing and big data.

Search

Archives
« May 2016
SunMonTueWedThuFriSat
1
3
4
5
6
7
8
9
10
11
12
14
15
16
18
20
21
22
23
24
25
26
27
28
29
30
31
    
       
Today