Thursday Apr 04, 2013

Three Little Hive UDFs: Part 2


In our ongoing exploration of Hive UDFs, we've covered the basic row-wise UDF.  Today we'll move to the UDTF, which generates multiple rows for every row processed.  This UDF built its house from sticks: it's slightly more complicated than the basic UDF and allows us an opportunity to explore how Hive functions manage type checking.

 We'll step through some of the more interesting pieces, but as before the full source is available on github here.

Extending GenericUDTF

 Our UDTF is going to produce pairwise combinations of elements in a comma-separated string.  So, for a string column "Apples, Bananas, Carrots" we'll produce three rows:


  • Apples, Bananas
  • Apples, Carrots
  • Bananas, Carrots


As with the UDF, the first few lines are a simple class extension with a decorator so that Hive can describe what the function does.

@Description(name = "pairwise", value = "_FUNC_(doc) - emits pairwise combinations of an input array")
public class PairwiseUDTF extends GenericUDTF {

private PrimitiveObjectInspector stringOI = null;

 We also create an object of PrimitiveObjectInspector, which we'll use to ensure that the input is a string.  Once this is done, we need to override methods for initialization, row processing, and cleanup.


  public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException


    if (args.length != 1) {
      throw new UDFArgumentException("pairwise() takes exactly one argument");
    if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE

        && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() !=

PrimitiveObjectInspector.PrimitiveCategory.STRING) {

      throw new UDFArgumentException("pairwise() takes a string as a parameter");

stringOI = (PrimitiveObjectInspector) args[0];

This UDTF is going to return an array of structs, so the initialize method needs to return aStructObjectInspector object.  Note that the arguments to the constructor come in as an array of ObjectInspector objects.  This allows us to handle arguments in a "normal" fashion but with the benefit of methods to broadly inspect type.  We only allow a single argument -- the string column to be processed -- so we check the length of the array and validate that the sole element is both a primitive and a string.

The second half of the initialize method is more interesting: 

List<String> fieldNames = new ArrayList<String>(2);
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);
    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);


Here we set up information about what the UDTF returns.  We need this in place before we start processing rows, otherwise Hive can't correctly build execution plans before submitting jobs to MapReduce.  The structures we're returning will be two strings per struct, which means we'll needObjectInspector objects for both the values and the names of the fields.  We create two lists, one of strings for the name, the other of ObjectInspector objects.  We pack them manually and then use a factor to get the StructObjectInspector which defines the actual return value. 

Now we're ready to actually do some processing, so we override the process method.

  public void process(Object[] record) throws HiveException {
    final String document = (String) stringOI.getPrimitiveJavaObject(record[0]);
    if (document == null) {
    String[] members = document.split(",");
for (int i = 0; i < members.length - 1; i++)
for (int j = 1; j < members.length; j++)
if (!members[i].equals(members[j]))
forward(new Object[] {members[i],members[j]});


This is simple pairwise expansion, so the logic isn't anything more than a nested for-loop.  There are, though, some interesting things to note.  First, to actually get a string object to operate on, we have to use an ObjectInspector and some typecasting.  This allows us to bail out early if the column value is null.  Once we have the string, splitting, sorting, and looping is textbook stuff.  

The last notable piece is that the process method does not return anything.  Instead, we callforward to emit our newly created structs.  From the context of those used to database internals, this follows the producer-consumer models of most RDBMs.  From the context of those used to MapReduce semantics, this is equivalent to calling write on the Context object.

  public void close() throws HiveException {
    // do nothing


If there were any cleanup to do, we'd take care of it here.  But this is simple emission, so our override doesn't need to do anything.

Using the UDTF

Once we've built our UDTF, we can access it via Hive by adding the jar and assigning it to a temporary function.  However, mixing the results of a UDTF with other columns from the base table requires that we use a LATERAL VIEW.

#Add the Jar
add jar /mnt/shared/market_basket_example/pairwise.jar;
#Create a function
CREATE temporary function pairwise AS '';
# view the pairwise expansion output
SELECT m1, m2, COUNT(*) FROM market_basket

LATERAL VIEW pairwise(basket) pwise AS m1,m2 GROUP BY m1,m2;

[Read More]

Tuesday Apr 02, 2013

User Defined Functions in Hive


User-defined Functions (UDFs) have a long history of usefulness in SQL-derived languages.  While query languages can be rich in their expressiveness, there's just no way they can anticipate all the things a developer wants to do.  Thus, the custom UDF has become commonplace in our data manipulation toolbox.

Apache Hive is no different in this respect from other SQL-like languages.  Hive allows extensibility via both Hadoop Streaming and compiled Java.  However, largely because of the underlying MapReduce paradigm, all Hive UDFs are not created equally.  Some UDFs are intended for "map-side" execution, while others are portable and can be run on the "reduce-side."  Moreover, UDF behavior via streaming requires that queries be formatted so as to direct script execution where we desire it.

 The intricacies of where and how a UDF executes may seem like minutiae, but we would be disappointed time spent coding a cumulative sum UDF only executed on single rows.  To that end, I'm going to spend the rest of the week diving into the three primary types of Java-based UDFs in Hive.  You can find all of the sample code discussed here.

The Three Little UDFs

Hive provides three classes of UDFs that most users are interested in: UDFs, UDTFs, and UDAFs.  Broken down simply, the three classes can be explained as such:

  • UDFs -- User Defined Functions; these operate row-wise, generally during map execution.  They're the simplest UDFs to write, but constrained in their functionality.
  • UDTFs -- User Defined Table-Generating Functions; these also execute row-wise, but they produce multiple rows of output (i.e., they generate a table).  The most common example of this is Hive's explode function.
  • UDAFs -- User Defined Aggregating Functions; these can execute on either the map-side or the reduce-side and far more flexible than UDFs.  The challenge, however, is that in writing UDAFs we have to think not just about what to do with a single row, or even a group of rows.  Here, one has to consider partial aggregation and serialization between map and reduce proceses.
Over the next few days, we'll walk through code for each of these function types, from simple to complex.  Along the way, we'll end up with a couple of useful functions you can use in your own Hive code (or improve upon). 

[Read More]

The data warehouse insider is written by the Oracle product management team and sheds lights on all thing data warehousing and big data.


« April 2014