Wednesday Oct 09, 2013

Big Data Openworld Sessions now available for Download/Viewing

For those who did go to Openworld, the session catalog now has the download links to the session materials online. You can now refresh your memory and share your experience with the rest of your organization. For those who did not go, here is your chance to look over some of the materials.

On the big data side, here are some of the highlights:

There are a great number of other sessions, simply look for: Solutions => Big Data and Business Analytics and you will find a wealth of interesting content around big data, Hadoop and analytics.

Sentry Meetup at Strata + Hadoop World 2013

Meetup Details and Exact Location Here

Join us for the inaugural Apache Sentry meetup at Oracle's offices in NYC, on the evening of the last day of Strata + Hadoop World 2013 in New York. 

(@ Oracle Offices, 120 Park Ave, 26th Floor -- Note: Bring your ID and check in with security in the lobby!)

We'll kick-off the meetup with the following presentation:

Getting Serious about Security with Sentry

Shreepadma Venugopalan - Lead Engineer for Sentry
Arvind Prabhakar - Engineering Manager for Sentry 
Jacco Draaijer - Development Manager for Oracle Big Data

Apache Hadoop offers strong support for authentication and coarse grained authorization - but this is not necessarily enough to meet the demands of enterprise applications and compliance requirements. Providing fine-grained access to data will enable organizations to store more sensitive information in Hadoop; only those users with the appropriate privileges will ever see that sensitive data.

Cloudera and Oracle are taking the lead on Sentry - a new open source authorization module that integrates with Hadoop-based SQL query engines. Key developers for the project will provide details on its implementation, including:

-Motivations for the project
-Key requirements that Sentry satisfies
-Utilizing Sentry in your applications
-Future plans

Thursday Jul 18, 2013

Practical HDFS Permissions


Documentation and most discussions are quick to point out that HDFS provides OS-level permissions on files and directories.  However, there is less readily-available information about what the effects of OS-level permissions are on accessing data in HDFS via higher-level abstractions such as Hive or Pig.  To provide a bit of clarity, I decided to run through the effects of permissions on different interactions with HDFS.

The Setup

In this scenario, we have three users: oracle, dan, and not_dan.  The oracle user has captured some data in an HDFS directory.  The directory has 750 permissions: read/write/execute for oracle, read/execute for dan, and no access for not_dan.  One of the files in the directory has 700 permissions, meaning that only the oracle user can read it.  Each user will tries to do the following tasks:

  • List the contents of the directory
  • Count the lines in a subset of files including the file with 700 permissions
  • Run a simple Hive query over the directory

Listing Files

Each user issues the command

hadoop fs -ls /user/shared/moving_average|more

And what do they see:

[oracle@localhost ~]$ hadoop fs -ls /user/shared/moving_average|more

Found 564 items

Obviously, the oracle user can see all the files in its own directory.

[dan@localhost oracle]$ hadoop fs -ls /user/shared/moving_average|more
Found 564 items

Similarly, since dan has group read access, that user can also list all the files. The user without group read permissions, however, receives an error.

[not_dan@localhost oracle]$ hadoop fs -ls /user/shared/moving_average|more

ls: Permission denied: user=not_dan, access=READ_EXECUTE,


Counting Rows in the Shell

In this test, each user pipes a set of HDFS files into a unix command and counts rows.  Recall, one of the files has 700 permissions.

The oracle user, again, can see all the available data:

[oracle@localhost ~]$ hadoop fs -cat /user/shared/moving_average/FlumeData.137408218405*|wc -l

The user with partial permissions receives an error on the console, but can access the data they have permissions on.  Naturally, the user without permissions only receives the error.

[dan@localhost oracle]$ hadoop fs -cat /user/shared/moving_average/FlumeData.137408218405*|wc -l
cat: Permission denied: user=dan, access=READ, inode="/user/shared/moving_average/FlumeData.1374082184056":oracle:shared_hdfs:-rw-------
[not_dan@localhost oracle]$ hadoop fs -cat /user/shared/moving_average/FlumeData.137408218405*|wc -l
cat: Permission denied: user=not_dan, access=READ_EXECUTE, inode="/user/shared/moving_average":oracle:shared_hdfs:drwxr-x---

Permissions on Hive

In this final test, the oracle user defines an external Hive table over the shared directory.  Each user issues a simple COUNT(*) query against the directory.  Interestingly, the results are not the same as piping the datastream to the shell.

The oracle user's query runs correctly, while both dan and not_dan's queries fail:

As dan

Job Submission failed with exception ' /user/shared/moving_average/FlumeData.1374082184056 does not exist)'

As not_dan

Job Submission failed with exception '
(Permission denied: user=not_dan, access=READ_EXECUTE,

So, what's going on here? In each case, the query fails, but for different reasons. In the case of not_dan, the query fails because the user has no permissions on the directory. However, the query issued by dan fails because of a FileNotFound exception. Because dan does not have read permissions on the file, Hive cannot find all the files necessary to build the underlying MapReduce job. Thus, the query fails before being submitted to the JobTracker.  The rule then, becomes simple: to issue a Hive query, a user must have read permissions on all files read by the query. If a user has permissions on one set of partition directories,  but not another, they can issue queries against the readable partitions, but not against the entire table.


In a nutshell, the OS-level permissions of HDFS behave just as we would expect in the shell. However, problems can arise when tools like Hive or Pig try to construct MapReduce jobs. As a best practice, permissions structures should be tested against the tools which will access the data. This ensures that users can read
what they are allowed to, in the manner that they need to. 

[Read More]

Wednesday Jul 17, 2013

Oracle: Big Data at Work

There is a lot of hype around big data, but here at Oracle we try to help customers implement big data solutions to solve real business problems. For those of you interested in understanding more about how you can put big data to work at your organization, consider joining these events:

San Jose | August 5 - 6
Marriott San Jose
301 S Market St, San Jose, California 95113
Event Registration Page
Chicago | August 7 - 8
The Westin Michigan Avenue
909 N Michigan Ave, Chicago, IL 60611

New York | August 12 - 13
Marriott Marquis Times Square
1535 Broadway, New York, NY 10036
Event Registration Page


Friday May 10, 2013

Streaming data to and from Hadoop and NoSQL Database

A quick update on some of the integration components needed to build things like M2M (Machine 2 Machine communication) and on integrating fast moving data (events) with the Hadoop and NoSQL Database. As of of the Oracle Event Processing product you now have:

OEP Data Cartridge for Hadoop (the real doc is here)

OEP Data Cartridge for NoSQL Database (the real doc is here)

The fun with these products is that you can now model (in a UI!!) how to interact with these products. For example you can sink data into Hadoop without impacting the stream logic and stream performance and you can do a quick CQL (the OEP language) lookup to our NoSQL DB to resolve for example a customer profile or status lookup.

More to come, but very interesting and really something cool on making products work together out of the box.

Friday May 03, 2013

Videos: How to build out an end-to-end Big Data System

For those interested in understanding how to actually build a big data solution including things like NoSQL Database, Hadoop, MapReduce, Hive, Pig and Analytics (data mining, R) have a look at the big data videos Marty did:

  • Video 1: Using Big Data to Improve the Customer Experience
  • Video 2: Using Big Data to Deliver a Personalized Service
  • Video 3: Using Big Data and NoSQL to Manage On-line Profiles
  • Video 4: Oracle Big Data and Hadoop to Process Log Files
  • Video 5: Integrate All your data with Big Data Connectors
  • Video 6: Maximizing business impact with Big Data

Happy watching and learning.

Thursday Apr 11, 2013

Big Data Appliance - more flexibility, same rapid time to value

Untitled Document

This week Oracle announced the availability (yes you can right away buy and use these systems) to Big Data Appliance X3-2 Starter Rack and Big Data Appliance X3-2 In-Rack Expansion. You can read the press release here. For those who are interested in the operating specs, best to look at the data sheet on OTN.

So what does this mean? In effect this means that you can now start any big data project with an appliance. Whether you are looking to try your hand on your first project with Hadoop, or whether you are building your enterprise Hadoop solution with a large number of nodes, you can now get the benefits of Oracle Big Data Appliance. By leveraging Big Data Appliance for all your big data needs (being this Hadoop or Oracle NoSQL Database) you always get:

  • Reduced risk by having the best of Oracle and Cloudera engineering available in an easy to consume appliance
  • Faster time to value by not spending weeks or months building and tuning your own Hadoop system
  • No cost creep for the cluster as your system is set up and configured for a known cost

Assume you want to start your first implementation on Hadoop, you can now start with the BDA Starter Rack, 6 servers which you can fully deploy for HDFS and MapReduce capabilities (of course we also support for example HBase). All the services are pre-configured, so you have Highly Available NameNodes, automatic failover and a balanced approach to leveraging the 6 servers as Hadoop nodes.As your project grows (and you need more compute power and space to store data) you simply add nodes in chunks of 6 using the In-Rack Expansion, filling up the rack.

Once full you can either add another Starter Rack or add Full Racks to the system. As you do that, Mammoth - the install, configure and patch utility for BDA - ensures that your service nodes are in the appropriate place. For example, once you have 2 cabinets assigned to a single cluster, Mammoth will move the second NameNode to the second Rack for higher fault tolerance.

This new release of Big Data Appliance (the software parts of it) now also include Cloudera CDH 4.2 and Cloudera Manager 4.5. On top of that, you now create multiple clusters on a single BDA Full Rack using just Mammoth, which means you can now patch and update individual clusters on that Full Rack. As you add nodes to a cluster, Mammoth will allow you to choose where to add nodes, how to grow a set of clusters, etc.

Lastly, but not least, there is more flexibility in how to acquire Big Data Appliance Full Rack, as it is now a part of Oracle's Infrastructure as a Service offering, allowing for a smooth capital outflow for your Big Data Appliance. 

Sunday Apr 07, 2013

Three Little Hive UDFs: Part 3


In the final installment in our series on Hive UDFs, we're going to tackle the least intuitive of the three types: the User Defined Aggregating Function.  While they're challenging to implement, UDAFs are necessary if we want functions for which the distinction of map-side v. reduce-side operations are opaque to the user.  If a user is writing a query, most would prefer to focus on the data they're trying to compute, not which part of the plan is running a given function.

The UDAF also provides a valuable opportunity to consider some of the nuances of distributed programming and parallel database operations.  Since each task in a MapReduce job operates in a bit of a vacuum (e.g. Map task A does not know what data Map task B has), a UDAF has to explicitly account for more operational states than a simple UDF.  We'll return to the notion of a simple Moving Average function, but ask yourself: how do we compute a moving average if we don't have state or order around the data?  

As before, the code is available on github, but we'll excerpt the important parts here.

 Prefix Sum: Moving Average without State

In order to compute a moving average without state, we're going to need a specialized parallel algorithm.  For moving average, the "trick" is to use a prefix sum, effectively keeping a table of running totals for quick computation (and recomputation) of our moving average.  A full discussion of prefix sums for moving averages is beyond length of a blog post, but John Jenq provides an excellent discussion of the technique as applied to CUDA implementations.

What we'll cover here is the necessary implementation of a pair of classes to store and operate on our prefix sum entry within the UDAF.

class PrefixSumMovingAverage {
    static class PrefixSumEntry implements Comparable
        int period;
        double value;
        double prefixSum;
        double subsequenceTotal;
        double movingAverage;
        public int compareTo(Object other)
            PrefixSumEntry o = (PrefixSumEntry)other;
            if (period < o.period)
                return -1;
            if (period > o.period)
                return 1;
            return 0;


Here we have the definition of our moving average class and the static inner class which serves as an entry in our table.  What's important here are some of the variables we define for each entry in the table: the time-index or period of the value (its order), the value itself, the prefix sum,  the subsequence total, and the moving average itself.  Every entry in our table requires not just the current value to compute the moving average, but also sum of entries in our moving average window.  It's the pair of these two values which allows prefix sum methods to work their magic.

//class variables
    private int windowSize;
    private ArrayList<PrefixSumEntry> entries;
    public PrefixSumMovingAverage()
        windowSize = 0;
    public void reset()
        windowSize = 0;
        entries = null;
    public boolean isReady()
        return (windowSize > 0);

The above are simple initialization routines: a constructor, a method to reset the table, and a boolean method on whether or not the object has a prefix sum table on which to operate.  From here, there are 3 important methods to examine: add, merge, and serialize.  The first is intuitive, as we scan rows in Hive we want to add them to our prefix sum table.  The second are important because of partial aggregation.  

We cannot say ahead of time where this UDAF will run, and partial aggregation may be required.  That is, it's entirely possible that some values may run through the UDAF during a map task, but then be passed to a reduce task to be combined with other values.  The serialize method will allow Hive to pass the partial results from the map side to the reduce side.  The merge method allows reducers to combine the results of partial aggregations from the map tasks.

  public void add(int period, double v)
    //Add a new entry to the list and update table
    PrefixSumEntry e = new PrefixSumEntry();
    e.period = period;
    e.value = v;
    // do we need to ensure this is sorted?
    //if (needsSorting(entries))
    // update the table
    // prefixSums first
    double prefixSum = 0;
    for(int i = 0; i < entries.size(); i++)
        PrefixSumEntry thisEntry = entries.get(i);
        prefixSum += thisEntry.value;
        thisEntry.prefixSum = prefixSum;
        entries.set(i, thisEntry);

 The first part of the add task is simple: we add the element to the list and update our table's prefix sums.

 // now do the subsequence totals and moving averages
    for(int i = 0; i < entries.size(); i++)
        double subsequenceTotal;
        double movingAverage;
        PrefixSumEntry thisEntry = entries.get(i);
        PrefixSumEntry backEntry = null;
        if (i >= windowSize)
            backEntry = entries.get(i-windowSize);
        if (backEntry != null)
            subsequenceTotal = thisEntry.prefixSum - backEntry.prefixSum;
            subsequenceTotal = thisEntry.prefixSum;
        movingAverage = subsequenceTotal/(double)windowSize;
thisEntry.subsequenceTotal = subsequenceTotal;
thisEntry.movingAverage = movingAverage;
entries.set(i, thisEntry);


In the second half of the add function, we compute our moving averages based on the prefix sums.  It's here you can see the hinge on which the algorithm swings: thisEntry.prefixSum - backEntry.prefixSum -- that offset between the current table entry and it's nth predecessor makes the whole thing work.

public ArrayList<DoubleWritable> serialize()
    ArrayList<DoubleWritable> result = new ArrayList<DoubleWritable>();
    result.add(new DoubleWritable(windowSize));
    if (entries != null)
        for (PrefixSumEntry i : entries)
            result.add(new DoubleWritable(i.period));
            result.add(new DoubleWritable(i.value));
    return result;


The serialize method needs to package the results of our algorithm to pass to another instance of the same algorithm, and it needs to do so in a type that Hadoop can serialize.  In the case of a method like sum, this would be relatively simple: we would only need to pass the sum up to this point.  However, because we cannot be certain whether this instance of our algorithm has seen all the values, or seen them in the correct order, we actually need to serialize the whole table.  To do this, we create a list ofDoubleWritables, pack the window size at its head, and then each period and value.  This gives us a structure that's easy to unpack and merge with other lists with the same construction.

  public void merge(List<DoubleWritable> other)
    if (other == null)
    // if this is an empty buffer, just copy in other
    // but deserialize the list
    if (windowSize == 0)
        windowSize = (int)other.get(0).get();
        entries = new ArrayList<PrefixSumEntry>();
        // we're serialized as period, value, period, value
        for (int i = 1; i < other.size(); i+=2)
            PrefixSumEntry e = new PrefixSumEntry();
            e.period = (int)other.get(i).get();
            e.value = other.get(i+1).get();


Merging results is perhaps the most complicated thing we need to handle.  First, we check the case in which there was no partial result passed -- just return and continue.  Second, we check to see if this instance of PrefixSumMovingAverage already has a table.  If it doesn't, we can simply unpack the serialized result and treat it as our window.

   // if we already have a buffer, we need to add these entries
        // we're serialized as period, value, period, value
        for (int i = 1; i < other.size(); i+=2)
            PrefixSumEntry e = new PrefixSumEntry();
            e.period = (int)other.get(i).get();
            e.value = other.get(i+1).get();


The third case is the non-trivial one: if this instance has a table and receives a serialized table, we must merge them together.  Consider a Reduce task: as it receives outputs from multiple Map tasks, it needs to merge all of them together to form a larger table.  Thus, merge will be called many times to add these results and reassemble a larger time series.

// sort and recompute
    // update the table
    // prefixSums first
    double prefixSum = 0;
    for(int i = 0; i < entries.size(); i++)
        PrefixSumEntry thisEntry = entries.get(i);
        prefixSum += thisEntry.value;
        thisEntry.prefixSum = prefixSum;
        entries.set(i, thisEntry);


This part should look familiar, it's just like the add method.  Now that we have new entries in our table, we need to sort by period and recompute the moving averages.  In fact, the rest of the merge method is exactly like the add method, so we might consider putting sorting and recomputing in a separate method.

Orchestrating Partial Aggregation

We've got a clever little algorithm for computing moving average in parallel, but Hive can't do anything with it unless we create a UDAF that understands how to use our algorithm.  At this point, we need to start writing some real UDAF code.  As before, we extend a generic class, in this case GenericUDAFEvaluator.

  public static class GenericUDAFMovingAverageEvaluator extends GenericUDAFEvaluator {
        // input inspectors for PARTIAL1 and COMPLETE
        private PrimitiveObjectInspector periodOI;
        private PrimitiveObjectInspector inputOI;
        private PrimitiveObjectInspector windowSizeOI;
        // input inspectors for PARTIAL2 and FINAL
        // list for MAs and one for residuals
        private StandardListObjectInspector loi;

 As in the case of a UDTF, we create ObjectInspectors to handle type checking.  However, notice that we have inspectors for different states: PARTIAL1, PARTIAL2, COMPLETE, and FINAL.  These correspond to the different states in which our UDAF may be executing.  Since our serialized prefix sum table isn't the same input type as the values our add method takes, we need different type checking for each.

        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
            // initialize input inspectors
            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE)
                assert(parameters.length == 3);
                periodOI = (PrimitiveObjectInspector) parameters[0];
                inputOI = (PrimitiveObjectInspector) parameters[1];
                windowSizeOI = (PrimitiveObjectInspector) parameters[2];

 Here's the beginning of our overrided initialization function.  We check the parameters for two modes, PARTIAL1 and COMPLETE.  Here we assume that the arguments to our UDAF are the same as the user passes in a query: the period, the input, and the size of the window.  If the UDAF instance is consuming the results of our partial aggregation, we need a different ObjectInspector.  Specifically, this one:

                loi = (StandardListObjectInspector) parameters[0];


Similar to the UDTF, we also need type checking on the output types -- but for both partial and full aggregation. In the case of partial aggregation, we're returning lists of DoubleWritables:

              // init output object inspectors
            if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2) {
                // The output of a partial aggregation is a list of doubles representing the
                // moving average being constructed.
                // the first element in the list will be the window size
                return ObjectInspectorFactory.getStandardListObjectInspector(


 But in the case of FINAL or COMPLETE, we're dealing with the types that will be returned to the Hive user, so we need to return a different output.  We're going to return a list of structs that contain the period, moving average, and residuals (since they're cheap to compute).

else {
                // The output of FINAL and COMPLETE is a full aggregation, which is a
                // list of DoubleWritable structs that represent the final histogram as
                // (x,y) pairs of bin centers and heights.
                ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
                ArrayList<String> fname = new ArrayList<String>();
                return ObjectInspectorFactory.getStandardListObjectInspector(
                 ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi) );


Next come methods to control what happens when a Map or Reduce task is finished with its data.  In the case of partial aggregation, we need to serialize the data.  In the case of full aggregation, we need to package the result for Hive users.

    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
      // return an ArrayList where the first parameter is the window size
      MaAgg myagg = (MaAgg) agg;
      return myagg.prefixSum.serialize();
    public Object terminate(AggregationBuffer agg) throws HiveException {
      // final return value goes here
      MaAgg myagg = (MaAgg) agg;
      if (myagg.prefixSum.tableSize() < 1)
        return null;
        ArrayList<DoubleWritable[]> result = new ArrayList<DoubleWritable[]>();
        for (int i = 0; i < myagg.prefixSum.tableSize(); i++)
double residual = myagg.prefixSum.getEntry(i).value - myagg.prefixSum.getEntry(i).movingAverage;
            DoubleWritable[] entry = new DoubleWritable[3];
            entry[0] = new DoubleWritable(myagg.prefixSum.getEntry(i).period);
            entry[1] = new DoubleWritable(myagg.prefixSum.getEntry(i).movingAverage);
entry[2] = new DoubleWritable(residual);
        return result;


We also need to provide instruction on how Hive should merge the results of partial aggregation.  Fortunately, we already handled this in our PrefixSumMovingAverage class, so we can just call that.

    public void merge(AggregationBuffer agg, Object partial) throws HiveException {
        // if we're merging two separate sets we're creating one table that's doubly long
        if (partial != null)
            MaAgg myagg = (MaAgg) agg;
            List<DoubleWritable> partialMovingAverage = (List<DoubleWritable>) loi.getList(partial);


Of course, merging and serializing isn't very useful unless the UDAF has logic for iterating over values.  The iterate method handles this and -- as one would expect -- relies entirely on thePrefixSumMovingAverage class we created.

    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
      assert (parameters.length == 3);
      if (parameters[0] == null || parameters[1] == null || parameters[2] == null)
      MaAgg myagg = (MaAgg) agg;
      // Parse out the window size just once if we haven't done so before. We need a window of at least 1,
      // otherwise there's no window.
      if (!myagg.prefixSum.isReady())
        int windowSize = PrimitiveObjectInspectorUtils.getInt(parameters[2], windowSizeOI);
        if (windowSize < 1)
            throw new HiveException(getClass().getSimpleName() + " needs a window size >= 1");
      //Add the current data point and compute the average
      int p = PrimitiveObjectInspectorUtils.getInt(parameters[0], inputOI);
      double v = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI);


Aggregation Buffers: Connecting Algorithms with Execution

One might notice that the code for our UDAF references an object of type AggregationBuffer quite a lot.  This is because the AggregationBuffer is the interface which allows us to connect our custom PrefixSumMovingAverage class to Hive's execution framework.  While it doesn't constitute a great deal of code, it's glue that binds our logic to Hive's execution framework.  We implement it as such:

 // Aggregation buffer definition and manipulation methods
    static class MaAgg implements AggregationBuffer {
        PrefixSumMovingAverage prefixSum;
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
      MaAgg result = new MaAgg();
      return result;


Using the UDAF

The goal of a good UDAF is that, no matter how complicated it was for us to implement, it's that it be simple for our users.  For all that code and parallel thinking, usage of the UDAF is very straightforward:

ADD JAR /mnt/shared/hive_udfs/dist/lib/moving_average_udf.jar;
#get the moving average for a single tail number
SELECT TailNum,moving_avg(timestring, delay, 4) FROM ts_example WHERE TailNum='N967CA' GROUP BY TailNum LIMIT 100;

Here we're applying the UDAF to get the moving average of arrival delay from a particular flight.  It's a really simple query for all that work we did underneath.  We can do a bit more and leverage Hive's abilities to handle complex types as columns, here's a query which creates a table of timeseries as arrays.

#create a set of moving averages for every plane starting with N
#Note: this UDAF blows up unpleasantly in heap; there will be data volumes for which you need to throw
#excessive amounts of memory at the problem
CREATE TABLE moving_averages AS
SELECT TailNum, moving_avg(timestring, delay, 4) as timeseries FROM ts_example



We've covered all manner of UDFs: from simple class extensions which can be written very easily, to very complicated UDAFs which require us to think about distributed execution and plan orchestration done by query engines.  With any luck, the discussion has provided you with the confidence to go out and implement your own UDFs -- or at least pay some attention to the complexities of the ones in use every day.

[Read More]

Thursday Apr 04, 2013

Three Little Hive UDFs: Part 2


In our ongoing exploration of Hive UDFs, we've covered the basic row-wise UDF.  Today we'll move to the UDTF, which generates multiple rows for every row processed.  This UDF built its house from sticks: it's slightly more complicated than the basic UDF and allows us an opportunity to explore how Hive functions manage type checking.

 We'll step through some of the more interesting pieces, but as before the full source is available on github here.

Extending GenericUDTF

 Our UDTF is going to produce pairwise combinations of elements in a comma-separated string.  So, for a string column "Apples, Bananas, Carrots" we'll produce three rows:


  • Apples, Bananas
  • Apples, Carrots
  • Bananas, Carrots


As with the UDF, the first few lines are a simple class extension with a decorator so that Hive can describe what the function does.

@Description(name = "pairwise", value = "_FUNC_(doc) - emits pairwise combinations of an input array")
public class PairwiseUDTF extends GenericUDTF {

private PrimitiveObjectInspector stringOI = null;

 We also create an object of PrimitiveObjectInspector, which we'll use to ensure that the input is a string.  Once this is done, we need to override methods for initialization, row processing, and cleanup.


  public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException


    if (args.length != 1) {
      throw new UDFArgumentException("pairwise() takes exactly one argument");
    if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE

        && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() !=

PrimitiveObjectInspector.PrimitiveCategory.STRING) {

      throw new UDFArgumentException("pairwise() takes a string as a parameter");

stringOI = (PrimitiveObjectInspector) args[0];

This UDTF is going to return an array of structs, so the initialize method needs to return aStructObjectInspector object.  Note that the arguments to the constructor come in as an array of ObjectInspector objects.  This allows us to handle arguments in a "normal" fashion but with the benefit of methods to broadly inspect type.  We only allow a single argument -- the string column to be processed -- so we check the length of the array and validate that the sole element is both a primitive and a string.

The second half of the initialize method is more interesting: 

List<String> fieldNames = new ArrayList<String>(2);
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);
    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);


Here we set up information about what the UDTF returns.  We need this in place before we start processing rows, otherwise Hive can't correctly build execution plans before submitting jobs to MapReduce.  The structures we're returning will be two strings per struct, which means we'll needObjectInspector objects for both the values and the names of the fields.  We create two lists, one of strings for the name, the other of ObjectInspector objects.  We pack them manually and then use a factor to get the StructObjectInspector which defines the actual return value. 

Now we're ready to actually do some processing, so we override the process method.

  public void process(Object[] record) throws HiveException {
    final String document = (String) stringOI.getPrimitiveJavaObject(record[0]);
    if (document == null) {
    String[] members = document.split(",");
for (int i = 0; i < members.length - 1; i++)
for (int j = 1; j < members.length; j++)
if (!members[i].equals(members[j]))
forward(new Object[] {members[i],members[j]});


This is simple pairwise expansion, so the logic isn't anything more than a nested for-loop.  There are, though, some interesting things to note.  First, to actually get a string object to operate on, we have to use an ObjectInspector and some typecasting.  This allows us to bail out early if the column value is null.  Once we have the string, splitting, sorting, and looping is textbook stuff.  

The last notable piece is that the process method does not return anything.  Instead, we callforward to emit our newly created structs.  From the context of those used to database internals, this follows the producer-consumer models of most RDBMs.  From the context of those used to MapReduce semantics, this is equivalent to calling write on the Context object.

  public void close() throws HiveException {
    // do nothing


If there were any cleanup to do, we'd take care of it here.  But this is simple emission, so our override doesn't need to do anything.

Using the UDTF

Once we've built our UDTF, we can access it via Hive by adding the jar and assigning it to a temporary function.  However, mixing the results of a UDTF with other columns from the base table requires that we use a LATERAL VIEW.

#Add the Jar
add jar /mnt/shared/market_basket_example/pairwise.jar;
#Create a function
CREATE temporary function pairwise AS '';
# view the pairwise expansion output
SELECT m1, m2, COUNT(*) FROM market_basket

LATERAL VIEW pairwise(basket) pwise AS m1,m2 GROUP BY m1,m2;

[Read More]

Three Little Hive UDFs: Part 1


In our ongoing series of posts explaining the in's and out's of Hive User Defined Functions, we're starting with the simplest case.  Of the three little UDFs, today's entry built a straw house: simple, easy to put together, but limited in applicability.  We'll walk through important parts of the code, but you can grab the whole source from github here.

Extending UDF

The first few lines of interest are very straightforward:

@Description(name = "moving_avg", value = "_FUNC_(x, n) - Returns the moving mean of a set of numbers over a window of n observations")
@UDFType(deterministic = false, stateful = true)

public class UDFSimpleMovingAverage extends UDF

We're extending the UDF class with some decoration.  The decoration is important for usability and functionality.  The description decorator allows us to give the Hive some information to show users about how to use our UDF and what it's method signature will be.  The UDFType decoration tells Hive what sort of behavior to expect from our function.

 A deterministic UDF will always return the same output given a particular input.  A square-root computing UDF will always return the same square root for 4, so we can say it is deterministic; a call to get the system time would not be.  The stateful annotation of the UDFType decoration is relatively new to Hive (e.g., CDH4 and above).  The stateful directive allows Hive to keep some static variables available across rows.  The simplest example of this is a "row-sequence," which maintains a static counter which increments with each row processed.

  Since square-root and row-counting aren't terribly interesting, we'll use the stateful annotation to build a simple moving average function.  We'll return to the notion of a moving average later when we build a UDAF, so as to compare the two approaches.

private DoubleWritable result = new DoubleWritable();
  private static ArrayDeque<Double> window;
  int windowSize;
  public UDFSimpleMovingAverage() {


 The above code is basic initialization.  We make a double in which to hold the result, but it needs to be of class DoubleWritable so that MapReduce can properly serialize the data.  We use a deque to hold our sliding window, and we need to keep track of the window's size.  Finally, we implement a constructor for the UDF class.

 public DoubleWritable evaluate(DoubleWritable v, IntWritable n) {
    double sum = 0.0;
    double moving_average;
    double residual;
    if (window == null)
        window = new ArrayDeque<Double>();


Here's the meat of the class: the evaluate method.  This method will be called on each row by the map tasks.  For any given row, we can't say whether or not our sliding window exists, so we initialize it if it's null.

//slide the window
    if (window.size() == n.get())
    window.addLast(new Double(v.get()));
    // compute the average
    for (Iterator<Double> i = window.iterator(); i.hasNext();)

sum +=;

Here we deal with the deque and compute the sum of the window's elements.  Deques are essentially double-ended queues, so they make excellent sliding windows.  If the window is full, we pop the oldest element and add the current value.

moving_average = sum/window.size();

return result;

Computing the moving average without weighting is simply dividing the sum of our window by its size.  We then set that value in our Writable variable and return it.  The value is then emitted as part of the map task executing the UDF function.

Going Further

The stateful annotation made it simple for us to compute a moving average since we could keep the deque static.  However, how would we compute a moving average if there was no notion of state between Hadoop tasks? At the end of the series we'll examine a UDAF that does this, but the algorithm ends up being much different.  In the meantime, I challenge the reader to think about what sort of approach is needed to compute the window.

[Read More]

Tuesday Apr 02, 2013

User Defined Functions in Hive


User-defined Functions (UDFs) have a long history of usefulness in SQL-derived languages.  While query languages can be rich in their expressiveness, there's just no way they can anticipate all the things a developer wants to do.  Thus, the custom UDF has become commonplace in our data manipulation toolbox.

Apache Hive is no different in this respect from other SQL-like languages.  Hive allows extensibility via both Hadoop Streaming and compiled Java.  However, largely because of the underlying MapReduce paradigm, all Hive UDFs are not created equally.  Some UDFs are intended for "map-side" execution, while others are portable and can be run on the "reduce-side."  Moreover, UDF behavior via streaming requires that queries be formatted so as to direct script execution where we desire it.

 The intricacies of where and how a UDF executes may seem like minutiae, but we would be disappointed time spent coding a cumulative sum UDF only executed on single rows.  To that end, I'm going to spend the rest of the week diving into the three primary types of Java-based UDFs in Hive.  You can find all of the sample code discussed here.

The Three Little UDFs

Hive provides three classes of UDFs that most users are interested in: UDFs, UDTFs, and UDAFs.  Broken down simply, the three classes can be explained as such:

  • UDFs -- User Defined Functions; these operate row-wise, generally during map execution.  They're the simplest UDFs to write, but constrained in their functionality.
  • UDTFs -- User Defined Table-Generating Functions; these also execute row-wise, but they produce multiple rows of output (i.e., they generate a table).  The most common example of this is Hive's explode function.
  • UDAFs -- User Defined Aggregating Functions; these can execute on either the map-side or the reduce-side and far more flexible than UDFs.  The challenge, however, is that in writing UDAFs we have to think not just about what to do with a single row, or even a group of rows.  Here, one has to consider partial aggregation and serialization between map and reduce proceses.
Over the next few days, we'll walk through code for each of these function types, from simple to complex.  Along the way, we'll end up with a couple of useful functions you can use in your own Hive code (or improve upon). 

[Read More]

Tuesday Mar 05, 2013

Hadoop Cluster: Build vs. Buy (part II)

About a year ago we did a comparison (with an update here) of Build your Own Hadoop cluster and a Big Data Appliance, where we focused purely on the hardware and software cost. We thought it could use an update, but luckily an analyst firm did one for us and this time it covers both the Hardware/Software costs, but also ventures a lot more into dealing with other costs.

Read all about it in ESG's Getting Real About Big Data, Build vs Buy (Feb 2013) here.

Some highlights from the report:

  • Oracle Big Data Appliance($450k) is 39% less costly than "build your own"($733k) 
  • OBDA reduces time-to-market by 33% vs "build"

But, the report is not just about those numbers, it covers a number of very interesting things like 3 Hadoop Myths, the importance of big data in the near future and the priority customers give to improving their analytics footprint.

Enjoy the read!

Wednesday Feb 20, 2013

Looking for tools to solve your big data problems?

Look no further, Infosys today announced its Infosys BigDataEdge developer platform to drive value from your big data stack. 

By empowering business users to rapidly develop insights from vast amounts of structured and unstructured data, better business decisions can be made in near real-time. With Infosys BigDataEdge, enterprises can reduce the time taken to extract information by up to 40 percent and generate insights up to eight times faster.

Read More.

Thursday Feb 07, 2013

New Series of Video Workshops

Contrary to my habit of posting How-To articles, today I'm not going to tell you how to do anything.  Instead, I'm going to ask you to go do something.  What I want you to do is check out the series of video tutorials I'm doing around pieces of the Hadoop ecosystem and Big Data analytics.  

The first entry is on YouTube and in the Big Data Playlist on Oracle Media Network.  It gives a brief overview of setting up Flume flows into HDFS.  Of course, if you're looking for a more detailed How-To, just check the archives -- I covered it a while back.

Some of the topics I'll cover in upcoming months include:

  • Hive User Defined Functions
  • Market Basket Analysis with Hive, R and Data Miner 
  • Customer Segmentation and Clustering in Hadoop
  • Classification and Targeting using Random Forests

[Read More]

Wednesday Jan 30, 2013

Parallel R: Quick Ways Model More


I am less and less often mistaken for a pirate when I mention the R language.  While I miss the excuse to wear an eyepatch, I'm glad more people are beginning to explore a statistical language I've been touting for years.  When it comes to plotting or running complex statistics in a single line of code, R is a great tool to have.  That said, there are plenty of pitfalls for the casual or new user: syntax, learning to write vectorized code, or even just knowing which "apply" function you really should choose.

  I want to explore a slightly less-often considered aspect of R development: parallelism.  Out of the box, R can seem very limited to someone used to working on compute clusters or even a multicore server.  However, there are a few tricks we can leverage to get the most out of R on everything from a personal workstation to a Hadoop cluster.

 R is Single-Threaded

The R interpreter is -- and likely always will be -- single-threaded.  This means loading data frames is done in a single thread.  So is building your linear model, or generating that pretty surface plot.  Even on my laptop, that's a lot of threads to not use for modeling.  No matter how much my web browser might covet those cycles, I'd like to use them for work.

Rather than a complex multithreaded re-implementation, the R interpreter offers a number of ways to allow users to selectively apply parallelism.  Some of these approaches leverage MPI libraries and mirror that message passing approach.  Others allow a more implicit parallelism via "foreach" or "apply" constructs. We'll just focus on a pair of strategies using the parallelism that's been included in R since it's 2.14.1 version: the parallel library.

 Setting The Stage for Parallel Execution

We're going to need to load a few libraries into our R session before we can execute anything outside of our single-thread.  We'll use the doParallel and foreach because they allow us to focus on what to parallelize rather than how to coordinate our threads.

> data(iris)

Knowing that calculations in R will be single-threaded, we want to use the parallel package to operate on logical subsets of the data simultaneously.  For example, I loaded a set of data about Iris which contains a number of different species.  One way I might want to parallelize is to fit the same each species simultaneously.  For that, I'm going to have to split the data by species:

> species.split <- split(iris, iris$Species)

 This gives us a list we can iterate over -- or parallelize.  From here on out, it's simply a question of deciding what resources we want to leverage: local CPUs or remote hosts.


We're going to use the makeCluster function to bind together a set of computational resources.  But first we need to decide: do we want to use only local CPUs, or is it necessary to open up socket connections to other machines distribute our workload?  In the former case we'll use makeCluster to create what's called a FORK cluster (in that it uses UNIX's fork call to create slaves).  In the latter, we'll create a SOCK cluster by opening up sockets to a list of remote hosts and starting slave processes on them.

Here's a FORK cluster which uses all my cores:

> cl <- makeCluster(detectCores())

And here's a SOCK cluster across three nodes (password-less SSH is required)

> hostlist <- c("", "", "")
cl <- makeCluster(hostlist)

In each case, I call registerDoParallel to bind this cluster to the %dopar% operator.  This is the operator which will let us easily iterate in parallel.

Running in Parallel

Once we've got something to iterate over and a cluster with which to do it, modeling in parallel becomes straightforward.  Suppose I want to fit a model of sepal length as a linear combination of petal characteristics.  In that case, the code is simply:

> species.models <- foreach(i=species.split) %dopar% {
m<-lm(i$Sepal.Length ~ i$Petal.Width*i$Petal.Length);

But I'm not just restricted to fitting linear models on my little cluster.  I can run k-means clustering for several different k simultaneously using basically the same block:

> species.clusters<- foreach(i=2:5) %dopar% {
km <- kmeans(iris, i);

When I'm done with my block, I can just call stopCluster(cl) to ensure my processes terminate and I'm not hogging resources.

Using Hadoop

Finally, there will be situations in which I need to deploy in parallel against much larger datasets -- specifically, datasets stored in HDFS.  Both Hive and Pig will let me run an R script as part of a streaming process.  In Hive, the TRANSFORM operator will send data to an R Script.  In Pig, you can use theSTREAM operator to send a whole bag to an R script.  However, you can't stream from within Pig'sFOREACH blocks, so I occasionally use a UDF which invokes R scripts for me.

Regardless of the method you choose to send HDFS data to an R process, it's important to make sure your R script can consume data streaming from standard input.  I find the most expedient way of doing this via the file function.  A typical script might start:

#! /usr/bin/env Rscript
#Connection to STDIN for reading a data frame
con <- file(description="stdin") <- read.table(con, header=FALSE, sep=",")


We've covered several ways to push R beyond the the bounds of its single-threaded core.  There are forking and socket mechanisms for spreading our work around, not to mention tricks for leveraging the power of Hadoop Streaming.  In each case, however, one thing stands out: we must be smart as modelers and understand what can and should be done in parallel.

[Read More]

The data warehouse insider is written by the Oracle product management team and sheds lights on all thing data warehousing and big data.


« June 2016