## Sunday Apr 07, 2013

### Introduction

In the final installment in our series on Hive UDFs, we're going to tackle the least intuitive of the three types: the User Defined Aggregating Function.  While they're challenging to implement, UDAFs are necessary if we want functions for which the distinction of map-side v. reduce-side operations are opaque to the user.  If a user is writing a query, most would prefer to focus on the data they're trying to compute, not which part of the plan is running a given function.

The UDAF also provides a valuable opportunity to consider some of the nuances of distributed programming and parallel database operations.  Since each task in a MapReduce job operates in a bit of a vacuum (e.g. Map task A does not know what data Map task B has), a UDAF has to explicitly account for more operational states than a simple UDF.  We'll return to the notion of a simple Moving Average function, but ask yourself: how do we compute a moving average if we don't have state or order around the data?

As before, the code is available on github, but we'll excerpt the important parts here.

### Prefix Sum: Moving Average without State

In order to compute a moving average without state, we're going to need a specialized parallel algorithm.  For moving average, the "trick" is to use a prefix sum, effectively keeping a table of running totals for quick computation (and recomputation) of our moving average.  A full discussion of prefix sums for moving averages is beyond length of a blog post, but John Jenq provides an excellent discussion of the technique as applied to CUDA implementations.

What we'll cover here is the necessary implementation of a pair of classes to store and operate on our prefix sum entry within the UDAF.

`public class PrefixSumMovingAverage {    static class PrefixSumEntry implements Comparable     {        int period;        double value;        double prefixSum;        double subsequenceTotal;        double movingAverage;        public int compareTo(Object other)        {            PrefixSumEntry o = (PrefixSumEntry)other;            if (period < o.period)                return -1;            if (period > o.period)                return 1;            return 0;        }`

}

Here we have the definition of our moving average class and the static inner class which serves as an entry in our table.  What's important here are some of the variables we define for each entry in the table: the time-index or period of the value (its order), the value itself, the prefix sum,  the subsequence total, and the moving average itself.  Every entry in our table requires not just the current value to compute the moving average, but also sum of entries in our moving average window.  It's the pair of these two values which allows prefix sum methods to work their magic.

`//class variables    private int windowSize;    private ArrayList<PrefixSumEntry> entries;        public PrefixSumMovingAverage()    {        windowSize = 0;    }        public void reset()    {        windowSize = 0;        entries = null;    }        public boolean isReady()    {        return (windowSize > 0);    }`

The above are simple initialization routines: a constructor, a method to reset the table, and a boolean method on whether or not the object has a prefix sum table on which to operate.  From here, there are 3 important methods to examine: add, merge, and serialize.  The first is intuitive, as we scan rows in Hive we want to add them to our prefix sum table.  The second are important because of partial aggregation.

We cannot say ahead of time where this UDAF will run, and partial aggregation may be required.  That is, it's entirely possible that some values may run through the UDAF during a map task, but then be passed to a reduce task to be combined with other values.  The serialize method will allow Hive to pass the partial results from the map side to the reduce side.  The merge method allows reducers to combine the results of partial aggregations from the map tasks.

` @SuppressWarnings("unchecked")  public void add(int period, double v)  {    //Add a new entry to the list and update table    PrefixSumEntry e = new PrefixSumEntry();    e.period = period;    e.value = v;    entries.add(e);    // do we need to ensure this is sorted?    //if (needsSorting(entries))    	Collections.sort(entries);    // update the table    // prefixSums first    double prefixSum = 0;    for(int i = 0; i < entries.size(); i++)    {        PrefixSumEntry thisEntry = entries.get(i);        prefixSum += thisEntry.value;        thisEntry.prefixSum = prefixSum;        entries.set(i, thisEntry);    }`

The first part of the add task is simple: we add the element to the list and update our table's prefix sums.

` // now do the subsequence totals and moving averages    for(int i = 0; i < entries.size(); i++)    {        double subsequenceTotal;        double movingAverage;        PrefixSumEntry thisEntry = entries.get(i);        PrefixSumEntry backEntry = null;        if (i >= windowSize)            backEntry = entries.get(i-windowSize);        if (backEntry != null)        {            subsequenceTotal = thisEntry.prefixSum - backEntry.prefixSum;                    }        else        {            subsequenceTotal = thisEntry.prefixSum;        }        movingAverage = subsequenceTotal/(double)windowSize;	thisEntry.subsequenceTotal = subsequenceTotal;	thisEntry.movingAverage = movingAverage;	entries.set(i, thisEntry);`

}

In the second half of the add function, we compute our moving averages based on the prefix sums.  It's here you can see the hinge on which the algorithm swings: thisEntry.prefixSum - backEntry.prefixSum -- that offset between the current table entry and it's nth predecessor makes the whole thing work.

`public ArrayList<DoubleWritable> serialize()  {    ArrayList<DoubleWritable> result = new ArrayList<DoubleWritable>();        result.add(new DoubleWritable(windowSize));    if (entries != null)    {        for (PrefixSumEntry i : entries)        {            result.add(new DoubleWritable(i.period));            result.add(new DoubleWritable(i.value));        }    }    return result;`

}

The serialize method needs to package the results of our algorithm to pass to another instance of the same algorithm, and it needs to do so in a type that Hadoop can serialize.  In the case of a method like sum, this would be relatively simple: we would only need to pass the sum up to this point.  However, because we cannot be certain whether this instance of our algorithm has seen all the values, or seen them in the correct order, we actually need to serialize the whole table.  To do this, we create a list ofDoubleWritables, pack the window size at its head, and then each period and value.  This gives us a structure that's easy to unpack and merge with other lists with the same construction.

` @SuppressWarnings("unchecked")  public void merge(List<DoubleWritable> other)  {    if (other == null)        return;        // if this is an empty buffer, just copy in other    // but deserialize the list    if (windowSize == 0)    {        windowSize = (int)other.get(0).get();        entries = new ArrayList<PrefixSumEntry>();        // we're serialized as period, value, period, value        for (int i = 1; i < other.size(); i+=2)        {            PrefixSumEntry e = new PrefixSumEntry();            e.period = (int)other.get(i).get();            e.value = other.get(i+1).get();            entries.add(e);        }`

}

Merging results is perhaps the most complicated thing we need to handle.  First, we check the case in which there was no partial result passed -- just return and continue.  Second, we check to see if this instance of PrefixSumMovingAverage already has a table.  If it doesn't, we can simply unpack the serialized result and treat it as our window.

`   // if we already have a buffer, we need to add these entries    else    {        // we're serialized as period, value, period, value        for (int i = 1; i < other.size(); i+=2)        {            PrefixSumEntry e = new PrefixSumEntry();            e.period = (int)other.get(i).get();            e.value = other.get(i+1).get();            entries.add(e);        }`

}

The third case is the non-trivial one: if this instance has a table and receives a serialized table, we must merge them together.  Consider a Reduce task: as it receives outputs from multiple Map tasks, it needs to merge all of them together to form a larger table.  Thus, merge will be called many times to add these results and reassemble a larger time series.

`// sort and recompute    Collections.sort(entries);    // update the table    // prefixSums first    double prefixSum = 0;    for(int i = 0; i < entries.size(); i++)    {        PrefixSumEntry thisEntry = entries.get(i);        prefixSum += thisEntry.value;        thisEntry.prefixSum = prefixSum;        entries.set(i, thisEntry);`

}

This part should look familiar, it's just like the add method.  Now that we have new entries in our table, we need to sort by period and recompute the moving averages.  In fact, the rest of the merge method is exactly like the add method, so we might consider putting sorting and recomputing in a separate method.

### Orchestrating Partial Aggregation

We've got a clever little algorithm for computing moving average in parallel, but Hive can't do anything with it unless we create a UDAF that understands how to use our algorithm.  At this point, we need to start writing some real UDAF code.  As before, we extend a generic class, in this case GenericUDAFEvaluator.

`  public static class GenericUDAFMovingAverageEvaluator extends GenericUDAFEvaluator {                    // input inspectors for PARTIAL1 and COMPLETE        private PrimitiveObjectInspector periodOI;        private PrimitiveObjectInspector inputOI;        private PrimitiveObjectInspector windowSizeOI;                // input inspectors for PARTIAL2 and FINAL        // list for MAs and one for residuals        private StandardListObjectInspector loi;`

As in the case of a UDTF, we create ObjectInspectors to handle type checking.  However, notice that we have inspectors for different states: PARTIAL1, PARTIAL2, COMPLETE, and FINAL.  These correspond to the different states in which our UDAF may be executing.  Since our serialized prefix sum table isn't the same input type as the values our add method takes, we need different type checking for each.

` @Override        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {                    super.init(m, parameters);                        // initialize input inspectors            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE)            {                assert(parameters.length == 3);                periodOI = (PrimitiveObjectInspector) parameters[0];                inputOI = (PrimitiveObjectInspector) parameters[1];                windowSizeOI = (PrimitiveObjectInspector) parameters[2];            }`

Here's the beginning of our overrided initialization function.  We check the parameters for two modes, PARTIAL1 and COMPLETE.  Here we assume that the arguments to our UDAF are the same as the user passes in a query: the period, the input, and the size of the window.  If the UDAF instance is consuming the results of our partial aggregation, we need a different ObjectInspector.  Specifically, this one:

`else            {                loi = (StandardListObjectInspector) parameters[0];`

}

Similar to the UDTF, we also need type checking on the output types -- but for both partial and full aggregation. In the case of partial aggregation, we're returning lists of DoubleWritables:

`              // init output object inspectors            if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2) {                // The output of a partial aggregation is a list of doubles representing the                // moving average being constructed.                // the first element in the list will be the window size                //                 return ObjectInspectorFactory.getStandardListObjectInspector(                    PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);`

}

But in the case of FINAL or COMPLETE, we're dealing with the types that will be returned to the Hive user, so we need to return a different output.  We're going to return a list of structs that contain the period, moving average, and residuals (since they're cheap to compute).

```else {                // The output of FINAL and COMPLETE is a full aggregation, which is a                // list of DoubleWritable structs that represent the final histogram as                // (x,y) pairs of bin centers and heights.                                ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();                foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);                foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);		foi.add(PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);                ArrayList<String> fname = new ArrayList<String>();                fname.add("period");                fname.add("moving_average");		fname.add("residual");
return ObjectInspectorFactory.getStandardListObjectInspector(                 ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi) );```

}

Next come methods to control what happens when a Map or Reduce task is finished with its data.  In the case of partial aggregation, we need to serialize the data.  In the case of full aggregation, we need to package the result for Hive users.

```  @Override    public Object terminatePartial(AggregationBuffer agg) throws HiveException {      // return an ArrayList where the first parameter is the window size      MaAgg myagg = (MaAgg) agg;      return myagg.prefixSum.serialize();    }
@Override    public Object terminate(AggregationBuffer agg) throws HiveException {      // final return value goes here      MaAgg myagg = (MaAgg) agg;            if (myagg.prefixSum.tableSize() < 1)      {        return null;      }            else      {        ArrayList<DoubleWritable[]> result = new ArrayList<DoubleWritable[]>();        for (int i = 0; i < myagg.prefixSum.tableSize(); i++)        {	    double residual = myagg.prefixSum.getEntry(i).value - myagg.prefixSum.getEntry(i).movingAverage;
DoubleWritable[] entry = new DoubleWritable[3];            entry[0] = new DoubleWritable(myagg.prefixSum.getEntry(i).period);            entry[1] = new DoubleWritable(myagg.prefixSum.getEntry(i).movingAverage);	    entry[2] = new DoubleWritable(residual);            result.add(entry);        }                return result;      }      ```

}

We also need to provide instruction on how Hive should merge the results of partial aggregation.  Fortunately, we already handled this in our PrefixSumMovingAverage class, so we can just call that.

`@SuppressWarnings("unchecked")    @Override    public void merge(AggregationBuffer agg, Object partial) throws HiveException {        // if we're merging two separate sets we're creating one table that's doubly long                if (partial != null)        {            MaAgg myagg = (MaAgg) agg;            List<DoubleWritable> partialMovingAverage = (List<DoubleWritable>) loi.getList(partial);            myagg.prefixSum.merge(partialMovingAverage);        }`

}

Of course, merging and serializing isn't very useful unless the UDAF has logic for iterating over values.  The iterate method handles this and -- as one would expect -- relies entirely on thePrefixSumMovingAverage class we created.

```@Override    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {          assert (parameters.length == 3);            if (parameters[0] == null || parameters[1] == null || parameters[2] == null)      {        return;      }            MaAgg myagg = (MaAgg) agg;            // Parse out the window size just once if we haven't done so before.  We need a window of at least 1,      // otherwise there's no window.      if (!myagg.prefixSum.isReady())      {        int windowSize = PrimitiveObjectInspectorUtils.getInt(parameters[2], windowSizeOI);        if (windowSize < 1)        {            throw new HiveException(getClass().getSimpleName() + " needs a window size >= 1");        }        myagg.prefixSum.allocate(windowSize);      }            //Add the current data point and compute the average
int p = PrimitiveObjectInspectorUtils.getInt(parameters[0], inputOI);      double v = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI);      myagg.prefixSum.add(p,v);      ```

}

### Aggregation Buffers: Connecting Algorithms with Execution

One might notice that the code for our UDAF references an object of type AggregationBuffer quite a lot.  This is because the AggregationBuffer is the interface which allows us to connect our custom PrefixSumMovingAverage class to Hive's execution framework.  While it doesn't constitute a great deal of code, it's glue that binds our logic to Hive's execution framework.  We implement it as such:

``` // Aggregation buffer definition and manipulation methods     static class MaAgg implements AggregationBuffer {        PrefixSumMovingAverage prefixSum;    };
@Override    public AggregationBuffer getNewAggregationBuffer() throws HiveException {      MaAgg result = new MaAgg();      reset(result);      return result;```

}

### Using the UDAF

The goal of a good UDAF is that, no matter how complicated it was for us to implement, it's that it be simple for our users.  For all that code and parallel thinking, usage of the UDAF is very straightforward:

`ADD JAR /mnt/shared/hive_udfs/dist/lib/moving_average_udf.jar;CREATE TEMPORARY FUNCTION moving_avg AS 'com.oracle.hadoop.hive.ql.udf.generic.GenericUDAFMovingAverage'; #get the moving average for a single tail numberSELECT TailNum,moving_avg(timestring, delay, 4) FROM ts_example WHERE TailNum='N967CA' GROUP BY TailNum LIMIT 100;`

Here we're applying the UDAF to get the moving average of arrival delay from a particular flight.  It's a really simple query for all that work we did underneath.  We can do a bit more and leverage Hive's abilities to handle complex types as columns, here's a query which creates a table of timeseries as arrays.

`#create a set of moving averages for every plane starting with N#Note: this UDAF blows up unpleasantly in heap; there will be data volumes for which you need to throw#excessive amounts of memory at the problemCREATE TABLE moving_averages AS SELECT TailNum, moving_avg(timestring, delay, 4) as timeseries FROM ts_example `

WHERE TailNum LIKE 'N%' GROUP BY TailNum;

### Summary

We've covered all manner of UDFs: from simple class extensions which can be written very easily, to very complicated UDAFs which require us to think about distributed execution and plan orchestration done by query engines.  With any luck, the discussion has provided you with the confidence to go out and implement your own UDFs -- or at least pay some attention to the complexities of the ones in use every day.

## Thursday Apr 04, 2013

### Introduction

In our ongoing exploration of Hive UDFs, we've covered the basic row-wise UDF.  Today we'll move to the UDTF, which generates multiple rows for every row processed.  This UDF built its house from sticks: it's slightly more complicated than the basic UDF and allows us an opportunity to explore how Hive functions manage type checking.

We'll step through some of the more interesting pieces, but as before the full source is available on github here.

### Extending GenericUDTF

Our UDTF is going to produce pairwise combinations of elements in a comma-separated string.  So, for a string column "Apples, Bananas, Carrots" we'll produce three rows:

• Apples, Bananas
• Apples, Carrots
• Bananas, Carrots

As with the UDF, the first few lines are a simple class extension with a decorator so that Hive can describe what the function does.

```@Description(name = "pairwise", value = "_FUNC_(doc) - emits pairwise combinations of an input array")public class PairwiseUDTF extends GenericUDTF {
```

private PrimitiveObjectInspector stringOI = null;

We also create an object of PrimitiveObjectInspector, which we'll use to ensure that the input is a string.  Once this is done, we need to override methods for initialization, row processing, and cleanup.

`@Override  public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {    if (args.length != 1) {      throw new UDFArgumentException("pairwise() takes exactly one argument");    }     if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE        && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() !=         PrimitiveObjectInspector.PrimitiveCategory.STRING) {      throw new UDFArgumentException("pairwise() takes a string as a parameter");    } `

stringOI = (PrimitiveObjectInspector) args[0];

This UDTF is going to return an array of structs, so the initialize method needs to return aStructObjectInspector object.  Note that the arguments to the constructor come in as an array of ObjectInspector objects.  This allows us to handle arguments in a "normal" fashion but with the benefit of methods to broadly inspect type.  We only allow a single argument -- the string column to be processed -- so we check the length of the array and validate that the sole element is both a primitive and a string.

The second half of the initialize method is more interesting:

`List<String> fieldNames = new ArrayList<String>(2);    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);    fieldNames.add("memberA");    fieldNames.add("memberB");    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);`

}

Here we set up information about what the UDTF returns.  We need this in place before we start processing rows, otherwise Hive can't correctly build execution plans before submitting jobs to MapReduce.  The structures we're returning will be two strings per struct, which means we'll needObjectInspector objects for both the values and the names of the fields.  We create two lists, one of strings for the name, the other of ObjectInspector objects.  We pack them manually and then use a factor to get the StructObjectInspector which defines the actual return value.

Now we're ready to actually do some processing, so we override the process method.

`@Override  public void process(Object[] record) throws HiveException {    final String document = (String) stringOI.getPrimitiveJavaObject(record[0]);     if (document == null) {      return;    }    String[] members = document.split(",");	java.util.Arrays.sort(members);	for (int i = 0; i < members.length - 1; i++)		for (int j = 1; j < members.length; j++)			if (!members[i].equals(members[j]))				forward(new Object[] {members[i],members[j]});`

}

This is simple pairwise expansion, so the logic isn't anything more than a nested for-loop.  There are, though, some interesting things to note.  First, to actually get a string object to operate on, we have to use an ObjectInspector and some typecasting.  This allows us to bail out early if the column value is null.  Once we have the string, splitting, sorting, and looping is textbook stuff.

The last notable piece is that the process method does not return anything.  Instead, we callforward to emit our newly created structs.  From the context of those used to database internals, this follows the producer-consumer models of most RDBMs.  From the context of those used to MapReduce semantics, this is equivalent to calling write on the Context object.

`@Override  public void close() throws HiveException {    // do nothing`

}

If there were any cleanup to do, we'd take care of it here.  But this is simple emission, so our override doesn't need to do anything.

### Using the UDTF

Once we've built our UDTF, we can access it via Hive by adding the jar and assigning it to a temporary function.  However, mixing the results of a UDTF with other columns from the base table requires that we use a LATERAL VIEW.

`#Add the Jaradd jar /mnt/shared/market_basket_example/pairwise.jar;  #Create a function CREATE temporary function pairwise AS 'com.oracle.hive.udtf.PairwiseUDTF';  # view the pairwise expansion outputSELECT m1, m2, COUNT(*) FROM market_basket`

LATERAL VIEW pairwise(basket) pwise AS m1,m2 GROUP BY m1,m2;

## Tuesday Apr 02, 2013

### Introduction

User-defined Functions (UDFs) have a long history of usefulness in SQL-derived languages.  While query languages can be rich in their expressiveness, there's just no way they can anticipate all the things a developer wants to do.  Thus, the custom UDF has become commonplace in our data manipulation toolbox.

Apache Hive is no different in this respect from other SQL-like languages.  Hive allows extensibility via both Hadoop Streaming and compiled Java.  However, largely because of the underlying MapReduce paradigm, all Hive UDFs are not created equally.  Some UDFs are intended for "map-side" execution, while others are portable and can be run on the "reduce-side."  Moreover, UDF behavior via streaming requires that queries be formatted so as to direct script execution where we desire it.

The intricacies of where and how a UDF executes may seem like minutiae, but we would be disappointed time spent coding a cumulative sum UDF only executed on single rows.  To that end, I'm going to spend the rest of the week diving into the three primary types of Java-based UDFs in Hive.  You can find all of the sample code discussed here.

### The Three Little UDFs

Hive provides three classes of UDFs that most users are interested in: UDFs, UDTFs, and UDAFs.  Broken down simply, the three classes can be explained as such:

• UDFs -- User Defined Functions; these operate row-wise, generally during map execution.  They're the simplest UDFs to write, but constrained in their functionality.
• UDTFs -- User Defined Table-Generating Functions; these also execute row-wise, but they produce multiple rows of output (i.e., they generate a table).  The most common example of this is Hive's explode function.
• UDAFs -- User Defined Aggregating Functions; these can execute on either the map-side or the reduce-side and far more flexible than UDFs.  The challenge, however, is that in writing UDAFs we have to think not just about what to do with a single row, or even a group of rows.  Here, one has to consider partial aggregation and serialization between map and reduce proceses.
Over the next few days, we'll walk through code for each of these function types, from simple to complex.  Along the way, we'll end up with a couple of useful functions you can use in your own Hive code (or improve upon).