Thursday Apr 04, 2013

Three Little Hive UDFs: Part 2

Introduction

In our ongoing exploration of Hive UDFs, we've covered the basic row-wise UDF.  Today we'll move to the UDTF, which generates multiple rows for every row processed.  This UDF built its house from sticks: it's slightly more complicated than the basic UDF and allows us an opportunity to explore how Hive functions manage type checking.

 We'll step through some of the more interesting pieces, but as before the full source is available on github here.

Extending GenericUDTF

 Our UDTF is going to produce pairwise combinations of elements in a comma-separated string.  So, for a string column "Apples, Bananas, Carrots" we'll produce three rows:

 

  • Apples, Bananas
  • Apples, Carrots
  • Bananas, Carrots

 

As with the UDF, the first few lines are a simple class extension with a decorator so that Hive can describe what the function does.

@Description(name = "pairwise", value = "_FUNC_(doc) - emits pairwise combinations of an input array")
public class PairwiseUDTF extends GenericUDTF {

private PrimitiveObjectInspector stringOI = null;

 We also create an object of PrimitiveObjectInspector, which we'll use to ensure that the input is a string.  Once this is done, we need to override methods for initialization, row processing, and cleanup.

@Override

  public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException

{

    if (args.length != 1) {
      throw new UDFArgumentException("pairwise() takes exactly one argument");
    }
 
    if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE

        && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() !=

PrimitiveObjectInspector.PrimitiveCategory.STRING) {

      throw new UDFArgumentException("pairwise() takes a string as a parameter");
    }
 

stringOI = (PrimitiveObjectInspector) args[0];

This UDTF is going to return an array of structs, so the initialize method needs to return aStructObjectInspector object.  Note that the arguments to the constructor come in as an array of ObjectInspector objects.  This allows us to handle arguments in a "normal" fashion but with the benefit of methods to broadly inspect type.  We only allow a single argument -- the string column to be processed -- so we check the length of the array and validate that the sole element is both a primitive and a string.

The second half of the initialize method is more interesting: 

List<String> fieldNames = new ArrayList<String>(2);
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);
    fieldNames.add("memberA");
    fieldNames.add("memberB");
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);

}

Here we set up information about what the UDTF returns.  We need this in place before we start processing rows, otherwise Hive can't correctly build execution plans before submitting jobs to MapReduce.  The structures we're returning will be two strings per struct, which means we'll needObjectInspector objects for both the values and the names of the fields.  We create two lists, one of strings for the name, the other of ObjectInspector objects.  We pack them manually and then use a factor to get the StructObjectInspector which defines the actual return value. 

Now we're ready to actually do some processing, so we override the process method.

@Override
  public void process(Object[] record) throws HiveException {
    final String document = (String) stringOI.getPrimitiveJavaObject(record[0]);
 
    if (document == null) {
      return;
    }
    String[] members = document.split(",");
java.util.Arrays.sort(members);
for (int i = 0; i < members.length - 1; i++)
for (int j = 1; j < members.length; j++)
if (!members[i].equals(members[j]))
forward(new Object[] {members[i],members[j]});

}

This is simple pairwise expansion, so the logic isn't anything more than a nested for-loop.  There are, though, some interesting things to note.  First, to actually get a string object to operate on, we have to use an ObjectInspector and some typecasting.  This allows us to bail out early if the column value is null.  Once we have the string, splitting, sorting, and looping is textbook stuff.  

The last notable piece is that the process method does not return anything.  Instead, we callforward to emit our newly created structs.  From the context of those used to database internals, this follows the producer-consumer models of most RDBMs.  From the context of those used to MapReduce semantics, this is equivalent to calling write on the Context object.

@Override
  public void close() throws HiveException {
    // do nothing

}

If there were any cleanup to do, we'd take care of it here.  But this is simple emission, so our override doesn't need to do anything.

Using the UDTF

Once we've built our UDTF, we can access it via Hive by adding the jar and assigning it to a temporary function.  However, mixing the results of a UDTF with other columns from the base table requires that we use a LATERAL VIEW.

#Add the Jar
add jar /mnt/shared/market_basket_example/pairwise.jar;
 
#Create a function
 
CREATE temporary function pairwise AS 'com.oracle.hive.udtf.PairwiseUDTF';
 
# view the pairwise expansion output
SELECT m1, m2, COUNT(*) FROM market_basket

LATERAL VIEW pairwise(basket) pwise AS m1,m2 GROUP BY m1,m2;

[Read More]

Three Little Hive UDFs: Part 1

Introduction

In our ongoing series of posts explaining the in's and out's of Hive User Defined Functions, we're starting with the simplest case.  Of the three little UDFs, today's entry built a straw house: simple, easy to put together, but limited in applicability.  We'll walk through important parts of the code, but you can grab the whole source from github here.

Extending UDF

The first few lines of interest are very straightforward:

@Description(name = "moving_avg", value = "_FUNC_(x, n) - Returns the moving mean of a set of numbers over a window of n observations")
@UDFType(deterministic = false, stateful = true)

public class UDFSimpleMovingAverage extends UDF

We're extending the UDF class with some decoration.  The decoration is important for usability and functionality.  The description decorator allows us to give the Hive some information to show users about how to use our UDF and what it's method signature will be.  The UDFType decoration tells Hive what sort of behavior to expect from our function.

 A deterministic UDF will always return the same output given a particular input.  A square-root computing UDF will always return the same square root for 4, so we can say it is deterministic; a call to get the system time would not be.  The stateful annotation of the UDFType decoration is relatively new to Hive (e.g., CDH4 and above).  The stateful directive allows Hive to keep some static variables available across rows.  The simplest example of this is a "row-sequence," which maintains a static counter which increments with each row processed.

  Since square-root and row-counting aren't terribly interesting, we'll use the stateful annotation to build a simple moving average function.  We'll return to the notion of a moving average later when we build a UDAF, so as to compare the two approaches.

private DoubleWritable result = new DoubleWritable();
  private static ArrayDeque<Double> window;
  int windowSize;
  
  public UDFSimpleMovingAverage() {
    result.set(0);

}

 The above code is basic initialization.  We make a double in which to hold the result, but it needs to be of class DoubleWritable so that MapReduce can properly serialize the data.  We use a deque to hold our sliding window, and we need to keep track of the window's size.  Finally, we implement a constructor for the UDF class.

 public DoubleWritable evaluate(DoubleWritable v, IntWritable n) {
    double sum = 0.0;
    double moving_average;
    double residual;
    if (window == null)
    {
        window = new ArrayDeque<Double>();

}

Here's the meat of the class: the evaluate method.  This method will be called on each row by the map tasks.  For any given row, we can't say whether or not our sliding window exists, so we initialize it if it's null.

//slide the window
    if (window.size() == n.get())
        window.pop();
            
    window.addLast(new Double(v.get()));
            
    // compute the average
    for (Iterator<Double> i = window.iterator(); i.hasNext();)

sum += i.next().doubleValue();

Here we deal with the deque and compute the sum of the window's elements.  Deques are essentially double-ended queues, so they make excellent sliding windows.  If the window is full, we pop the oldest element and add the current value.

moving_average = sum/window.size();
    result.set(moving_average);

return result;

Computing the moving average without weighting is simply dividing the sum of our window by its size.  We then set that value in our Writable variable and return it.  The value is then emitted as part of the map task executing the UDF function.

Going Further

The stateful annotation made it simple for us to compute a moving average since we could keep the deque static.  However, how would we compute a moving average if there was no notion of state between Hadoop tasks? At the end of the series we'll examine a UDAF that does this, but the algorithm ends up being much different.  In the meantime, I challenge the reader to think about what sort of approach is needed to compute the window.

[Read More]

Tuesday Apr 02, 2013

User Defined Functions in Hive

Introduction

User-defined Functions (UDFs) have a long history of usefulness in SQL-derived languages.  While query languages can be rich in their expressiveness, there's just no way they can anticipate all the things a developer wants to do.  Thus, the custom UDF has become commonplace in our data manipulation toolbox.

Apache Hive is no different in this respect from other SQL-like languages.  Hive allows extensibility via both Hadoop Streaming and compiled Java.  However, largely because of the underlying MapReduce paradigm, all Hive UDFs are not created equally.  Some UDFs are intended for "map-side" execution, while others are portable and can be run on the "reduce-side."  Moreover, UDF behavior via streaming requires that queries be formatted so as to direct script execution where we desire it.

 The intricacies of where and how a UDF executes may seem like minutiae, but we would be disappointed time spent coding a cumulative sum UDF only executed on single rows.  To that end, I'm going to spend the rest of the week diving into the three primary types of Java-based UDFs in Hive.  You can find all of the sample code discussed here.

The Three Little UDFs

Hive provides three classes of UDFs that most users are interested in: UDFs, UDTFs, and UDAFs.  Broken down simply, the three classes can be explained as such:

  • UDFs -- User Defined Functions; these operate row-wise, generally during map execution.  They're the simplest UDFs to write, but constrained in their functionality.
  • UDTFs -- User Defined Table-Generating Functions; these also execute row-wise, but they produce multiple rows of output (i.e., they generate a table).  The most common example of this is Hive's explode function.
  • UDAFs -- User Defined Aggregating Functions; these can execute on either the map-side or the reduce-side and far more flexible than UDFs.  The challenge, however, is that in writing UDAFs we have to think not just about what to do with a single row, or even a group of rows.  Here, one has to consider partial aggregation and serialization between map and reduce proceses.
Over the next few days, we'll walk through code for each of these function types, from simple to complex.  Along the way, we'll end up with a couple of useful functions you can use in your own Hive code (or improve upon). 

[Read More]

Sunday Nov 04, 2012

Blueprints for Oracle NoSQL Database

I think that some of the most interesting analytic problems are graph problems.  I'm always interested in new ways to store and access graphs.  As such, I really like the work being done by Tinkerpop to create Open Source Software to make property graphs more accessible over a wide variety of datastores.  Since key-value stores like Oracle NoSQL Database are well-suited to storing property graphs, I decided to extend the Blueprints API to work with it.  Below I'll discuss some of the implementation details, but you can check out the finished product here: http://github.com/dwmclary/blueprints-oracle-nosqldb.

 What's in a Property Graph? 

In the most general sense, a graph is just a collection of vertices and edges.  Vertices and edges can have properties: weights, names, or any number of other traits.  In an undirected graph, edges connect vertices without direction.  A directed graph specifies that all edges have a head and a tail --- a direction.  A multi-graph allows multiple edges to connect two vertices.  A "property graph" encompasses all of these traits.

Key-Value Stores for Property Graphs

Key-Value stores like Oracle NoSQL Database tend to be ideal for implementing property graphs.  First, if any vertex or edge can have any number of traits, we can treat it as a hash map.  For example:

Vertex["name"] = "Mary"

Vertex["age"] = 28

Vertex["ID"] = 12345

 and so on.  This is a natural key-value relationship: the key "name" maps to the value "Mary."  Moreover if we maintain two hash maps, one for vertex objects and one for edge objects, we've essentially captured the graph.  As such, any scalable key-value store is fertile ground for planting graphs.

Oracle NoSQL Database as a Scalable Graph Database

While Oracle NoSQL Database offers useful features like tunable consistency, what lends it to storing property graphs is the storage guarantees around its key structure.  Keys in Oracle NoSQL Database are divided into two parts: a major key and a minor key.  The storage guarantee is simple.  Major keys will be distributed across storage nodes, which could encompass a large number of servers.  However, all minor keys which are children of a given major key are guaranteed to be stored on the same storage node.  For example, the vertices:

/Personnel/Vertex/1 

and

/Personnel/Vertex/2

May be stored on different servers, but

/Personnel/Vertex/1-/name

and 

/Personnel/Vertex/1-/age

will always be on the same server.  This means that we can structure our graph database such that retrieving all the properties for a vertex or edge requires I/O from only a single storage node.  Moreover, Oracle NoSQL Database provides a storeIterator which allows us to store a huge number of vertices and edges in a scalable fashion.  By storing the vertices and edges as major keys, we guarantee that they are distributed evenly across all storage nodes.  At the same time we can use a partial major key to iterate over all the vertices or edges (e.g. we search over /Personnel/Vertex to iterate over all vertices).

Fork It!

The Blueprints API and Oracle NoSQL Database present a great way to get started using a scalable key-value database to store and access graph data.  However, a graph store isn't useful without a good graph to work on.  I encourage you to fork or pull the repository, store some data, and try using Gremlin or any other language to explore.

[Read More]

Monday Oct 15, 2012

The Oldest Big Data Problem: Parsing Human Language

There's a new whitepaper up on Oracle Technology Network which details the use of Digital Reasoning Systems' Synthesys software on Oracle Big Data Appliance.  Digital Reasoning's approach is inherently "big data friendly," as it leverages multiple components of the Hadoop ecosystem.  Moreover, the paper addresses the oldest big data problem of them all: extracting knowledge from human text.

  You can find the paper here.

  From the Executive Summary:

There is a wealth of information to be extracted from natural language, but that extraction is challenging. The volume of human language we generate constitutes a natural Big Data problem, while its complexity and nuance requires a particular expertise to model and mine. In this paper we illustrate the impressive combination of Oracle Big Data Appliance and Digital Reasoning Synthesys software. The combination of Synthesys and Big Data Appliance makes it possible to analyze tens of millions of documents in a matter of hours. Moreover, this powerful combination achieves four times greater throughput than conducting the equivalent analysis on a much larger cloud-deployed Hadoop cluster.

[Read More]

Friday Oct 05, 2012

Building Simple Workflows in Oozie

Introduction

More often than not, data doesn't come packaged exactly as we'd like it for analysis. Transformation, match-merge operations, and a host of data munging tasks are usually needed before we can extract insights from our Big Data sources. Few people find data munging exciting, but it has to be done. Once we've suffered that boredom, we should take steps to automate the process. We want codify our work into repeatable units and create workflows which we can leverage over and over again without having to write new code. In this article, we'll look at how to use Oozie to create a workflow for the parallel machine learning task I described on Cloudera's site.

Hive Actions: Prepping for Pig

In my parallel machine learning article, I use data from the National Climatic Data Center to build weather models on a state-by-state basis. NCDC makes the data freely available as gzipped files of day-over-day observations stretching from the 1930s to today. In reading that post, one might get the impression that the data came in a handy, ready-to-model files with convenient delimiters. The truth of it is that I need to perform some parsing and projection on the dataset before it can be modeled. If I get more observations, I'll want to retrain and test those models, which will require more parsing and projection. This is a good opportunity to start building up a workflow with Oozie.

I store the data from the NCDC in HDFS and create an external Hive table partitioned by year. This gives me flexibility of Hive's query language when I want it, but let's me put the dataset in a directory of my choosing in case I want to treat the same data with Pig or MapReduce code.

CREATE EXTERNAL TABLE IF NOT EXISTS historic_weather(column 1, column2)
PARTITIONED BY (yr string)
STORED AS ...
LOCATION '/user/oracle/weather/historic';

As new weather data comes in from NCDC, I'll need to add partitions to my table. That's an action I should put in the workflow. Similarly, the weather data requires parsing in order to be useful as a set of columns. Because of their long history, the weather data is broken up into fields of specific byte lengths: x bytes for the station ID, y bytes for the dew point, and so on. The delimiting is consistent from year to year, so writing SerDe or a parser for transformation is simple. Once that's done, I want to select columns on which to train, classify certain features, and place the training data in an HDFS directory for my Pig script to access.

ALTER TABLE historic_weather ADD IF NOT EXISTS PARTITION (yr='2010')
LOCATION '/user/oracle/weather/historic/yr=2011';

INSERT OVERWRITE DIRECTORY '/user/oracle/weather/cleaned_history'
SELECT w.stn, w.wban, w.weather_year, w.weather_month,
w.weather_day, w.temp, w.dewp, w.weather FROM (
FROM historic_weather SELECT TRANSFORM(...)
USING '/path/to/hive/filters/ncdc_parser.py'
as stn, wban, weather_year, weather_month, weather_day, temp, dewp, weather
) w;

Since I'm going to prepare training directories with at least the same frequency that I add partitions, I should also add that to my workflow. Oozie is going to invoke these Hive actions using what's somewhat obviously referred to as a Hive action. Hive actions amount to Oozie running a script file containing our query language statements, so we can place them in a file called weather_train.hql.

Starting Our Workflow

Oozie offers two types of jobs: workflows and coordinator jobs. Workflows are straightforward: they define a set of actions to perform as a sequence or directed acyclic graph. Coordinator jobs can take all the same actions of Workflow jobs, but they can be automatically started either periodically or when new data arrives in a specified location. To keep things simple we'll make a workflow job; coordinator jobs simply require another XML file for scheduling. The bare minimum for workflow XML defines a name, a starting point, and an end point:

<workflow-app name="WeatherMan" xmlns="uri:oozie:workflow:0.1">
<start to="ParseNCDCData"/>
<end name="end"/>
</workflow-app>

To this we need to add an action, and within that we'll specify the hive parameters Also, keep in mind that actions require <ok> and <error> tags to direct the next action on success or failure.

<action name="ParseNCDCData">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>localhost:8021</job-tracker>
<name-node>localhost:8020</name-node>
<configuration>
<property>
<name>oozie.hive.defaults</name>
<value>/user/oracle/weather_ooze/hive-default.xml</value>
</property>
</configuration>
<script>ncdc_parse.hql</script>
</hive>
<ok to="WeatherMan"/>
<error to="end"/>
</action>

There are a couple of things to note here:
  1. I have to give the FQDN (or IP) and port of my JobTracker and NameNode.
  2. I have to include a hive-default.xml file.
  3. I have to include a script file.
  4. The hive-default.xml and script file must be stored in HDFS
That last point is particularly important. Oozie doesn't make assumptions about where a given workflow is being run. You might submit workflows against different clusters, or have different hive-defaults.xml on different clusters (e.g. MySQL or Postgres-backed metastores).

 

A quick way to ensure that all the assets end up in the right place in HDFS is just to make a working directory locally, build your workflow.xml in it, and copy the assets you'll need to it as you add actions toworkflow.xml. At this point, our local directory should contain:

  1. workflow.xml
  2. hive-defaults.xml (make sure this file contains your metastore connection data)
  3. ncdc_parse.hql

 

Adding Pig to the Ooze

Adding our Pig script as an action is slightly simpler from an XML standpoint. All we do is add an action toworkflow.xml as follows:

<action name="WeatherMan">
<pig>
<job-tracker>localhost:8021</job-tracker>
<name-node>localhost:8020</name-node>
<script>weather_train.pig</script>
</pig>
<ok to="end"/>
<error to="end"/>
</action>

 

Once we've done this, we'll copy weather_train.pig to our working directory. However, there's a bit of a "gotcha" here. My pig script registers the Weka Jar and a chunk of jython. If those aren't also in HDFS, our action will fail from the outset -- but where do we put them? The Jython script goes into the working directory at the same level as the pig script, because pig attempts to load Jython files in the directory from which the script executes. However, that's not where our Weka jar goes.

While Oozie doesn't assume much, it does make an assumption about the Pig classpath. Anything underworking_directory/lib gets automatically added to the Pig classpath and no longer requires a REGISTER statement in the script. Anything that uses a REGISTER statement cannot be in theworking_directory/lib directory. Instead, it needs to be in a different HDFS directory and attached to the pig action with an <archive> tag.

Yes, that's as confusing as you think it is.

 You can get the exact rules for adding Jars to the distributed cache from Oozie's Pig Cookbook.

Making the Workflow Work

We've got a workflow defined and have collected all the components we'll need to run. But we can't run anything yet, because we still have to define some properties about the job and submit it to Oozie. We need to start with the job properties, as this is essentially the "request" we'll submit to the Oozie server. In the same working directory, we'll make a file called job.properties as follows:

nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
weatherRoot=weather_ooze
mapreduce.jobtracker.kerberos.principal=foo
dfs.namenode.kerberos.principal=foo
oozie.libpath=${nameNode}/user/oozie/share/lib
oozie.wf.application.path=${nameNode}/user/${user.name}/${weatherRoot}
outputDir=weather-ooze

 

While some of the pieces of the properties file are familiar (e.g., JobTracker address), others take a bit of explaining. The first is weatherRoot: this is essentially an environment variable for the script (as are jobTracker and queueName). We're simply using them to simplify the directives for the Oozie job. Theoozie.libpath pieces is extremely important. This is a directory in HDFS which holds Oozie's shared libraries: a collection of Jars necessary for invoking Hive, Pig, and other actions. It's a good idea to make sure this has been installed and copied up to HDFS. The last two lines are straightforward: run the application defined by workflow.xml at the application path listed and write the output to the output directory.

We're finally ready to submit our job! After all that work we only need to do a few more things:

  1. Validate our workflow.xml
  2. Copy our working directory to HDFS
  3. Submit our job to the Oozie server
  4. Run our workflow
Let's do them in order. First validate the workflow:
oozie validate workflow.xml

Next, copy the working directory up to HDFS:
hadoop fs -put working_dir /user/oracle/working_dir

Now we submit the job to the Oozie server. We need to ensure that we've got the correct URL for the Oozie server, and we need to specify our job.properties file as an argument.
oozie job -oozie http://url.to.oozie.server:port_number/ -config /path/to/working_dir/job.properties -submit

We've submitted the job, but we don't see any activity on the JobTracker? All I got was this funny bit of output:
14-20120525161321-oozie-oracle

This is because submitting a job to Oozie creates an entry for the job and places it in PREP status. What we got back, in essence, is a ticket for our workflow to ride the Oozie train. We're responsible for redeeming our ticket and running the job.
oozie -oozie http://url.to.oozie.server:port_number/ -start 14-20120525161321-oozie-oracle

Of course, if we really want to run the job from the outset, we can change the "-submit" argument above to "-run." This will prep and run the workflow immediately.

 

Takeaway

So, there you have it: the somewhat laborious process of building an Oozie workflow. It's a bit tedious the first time out, but it does present a pair of real benefits to those of us who spend a great deal of time data munging. First, when new data arrives that requires the same processing, we already have the workflow defined and ready to run. Second, as we build up a set of useful action definitions over time, creating new workflows becomes quicker and quicker.

[Read More]

Wednesday Aug 01, 2012

Flume and Hive for Log Analytics

There's a lot to learn from log data, but to get the most value from it, that data needs to be easy to collect and analyze. Otherwise, time that could be used to learn from data is spent writing parsers and transport components. In this entry we'll simplify log collection and transport using JSON serialization and parts of the Hadoop ecosystem.

Logging everything in JSON is a great idea. As serialization formats go it's engineer-friendly: you and your favorite programming language can both read it. Moreover, having all of your log data structured as universal data structures makes getting started with analytics much simpler. To illustrate how much simpler, we'll take JSON logs written to a flat file, stream them into HDFS, and expose them via Hive for exploration and aggregation.

The Preliminaries

We're going to use three components to put our system together:

  • A flat file that's collecting JSON data. Assume entries look a bit like this:
    {"fieldA":"string data","fieldB":400,"fieldC":0.99}
  • Flume: the distributed log-collection service that's part of the Hadoop ecosystem
  • Hive and a SerDe for handling JSON data

The "Tail Table"

We'll begin by setting up the final destination for our log data. This requires we create a directory in HDFS to hold the log data and define a Hive table over it. Making the directory's easy:

hadoop fs -mkdir /user/oracle/tail_table

Similarly, defining the external table is straightforward in the Hive command line:

CREATE EXTERNAL TABLE IF NOT EXISTS tail_table(fieldA int, fieldB string, fieldC float)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.JsonSerde'
LOCATION '/user/oracle/tail_table';

This gives us a table which will read and respect the types of the values in our JSON records. If a field isn't present for a given record, a NULL value is returned for that column. Fields not included in the CREATE statement are ignored but still exist in the JSON. This allows the schema of the JSON to remain flexible while minimally impacting the Hive table.

Streaming Data with Flume OG

CDH 3u4 ships with two very different versions of Flume. The default is Flume 0.9.4, or Flume OG . It's great at streaming data into HDFS, but Flume OG has some requirements.

  • You must run Zookeeper to coordinate Flume nodes
  • You must run a Flume master to control Flume nodes
Those caveats aside, setting up Flume to stream data into our Hive table is remarkably simple. We only need to define a source which tails our JSON logs and a sink which writes these into the appropriate HDFS directory. We can set this up via the Flume master's web interface. Just navigate to the Flume master web interface at http://flumemaster.your.domain:35871and click the config link. From here, select the Flume node you want to configure from the dropdown menu (i.e. the node which has the JSON log file). The rest is easy:
  • Set the source as: tail("/path/to/json.log")
  • Set the sink as: collectorSink("hdfs://namenode/user/oracle/tail_table", "logdata", 30)

This configuration will tail the log file and write a new message into HDFS with each new line. The collectorSink will commit data to our Hive table every 30 seconds. The resulting configuration looks like this: 

Streaming Data with Flume NG

The other version of Flume which ships with CDH3 is Flume NG . Flume NG is significantly different from its predecessor. Our tail source from the previous section is gone, but so too are many of restrictions.
  • Zookeeper is no longer a requirement
  • The master-slave architecture has been replaced by independent Flume agents
  • We can now use Avro RPCs to transfer data in multi-hop flows
That last point is a big advance for Flume. In Flume OG, transfer from application servers to our Hadoop cluster was a gray area. Either our application servers run Flume nodes connected to the Zookeeper instances and Flume masters for the Hadoop cluster, or logs must be transferred into the Hadoop cluster via another method. In Flume NG, we can run independent Flume agents on application server and the Hadoop cluster, relying on Avro RPC to handle forwarding.

For this type of multi-hop log transfer, we need a flume-ng-agent running on each application server and one on the Hadoop cluster. The application servers will have a flume.conf file which includes something like this:

app-agent.sources = tail
app-agent.channels = memoryChannel
app-agent.sinks = avro-forward-sink
app-agent.sources.tail.type = exec
app-agent.sources.tail.command = tail -f /path/to/json.log
app-agent.sources.avro-forward-sink.type = avro
app-agent.sources.avro-forward-sink.hostname = 10.1.1.100
app-agent.sources.avro-forward-sink.port = 10000

This sets up a source that runs "tail" and sinks that data via Avro RPC to 10.1.1.100 on port 10000.


The collecting Flume agent on the Hadoop cluster will need a flume.conf with an avro source and an HDFS sink.
hdfs-agent.sources= avro-collect
hdfs-agent.sinks = hdfs-write
hdfs-agent.channels = memoryChannel
hdfs-agent.sources.avro-collect.type = avro
hdfs-agent.sources.avro-collect.bind = 10.1.1.100
hdfs-agent.sources.avro-collect.port = 10000
hdfs-agent.sinks.hdfs-write.type = hdfs
hdfs-agent.sinks.hdfs-write.path = hdfs://namenode/user/oracle/tail_table
hdfs-agent.sinks.hdfs-write.rollInterval = 30

On this side we've defined a source that reads Avro messages from port 10000 on 10.1.1.100 and writes the results into HDFS, rolling the file every 30 seconds. It's just like our setup in Flume OG, but now multi-hop forwarding is a snap.


The resulting configuration looks like this: 

Takeway

No matter which version you deploy, the combination of Flume, Hive and JSON make it straightforward to up an end-to-end pipeline for consuming and analyzing serialized log data. With deployments this simple, you can spend more time focusing on your applications and analytics

[Read More]

Wednesday Jun 27, 2012

FairScheduling Conventions in Hadoop

While scheduling and resource allocation control has been present in Hadoop since 0.20, a lot of people haven't discovered or utilized it in their initial investigations of the Hadoop ecosystem. We could chalk this up to many things:

  • Organizations are still determining what their dataflow and analysis workloads will comprise
  • Small deployments under tests aren't likely to show the signs of strains that would send someone looking for resource allocation options
  • The default scheduling options -- the FairScheduler and the CapacityScheduler -- are not placed in the most prominent position within the Hadoop documentation.

However, for production deployments, it's wise to start with at least the foundations of scheduling in place so that you can tune the cluster as workloads emerge. To do that, we have to ask ourselves something about what the off-the-rack scheduling options are. We have some choices:

  • The FairScheduler, which will work to ensure resource allocations are enforced on a per-job basis.
  • The CapacityScheduler, which will ensure resource allocations are enforced on a per-queue basis.
  • Writing your own implementation of the abstract class org.apache.hadoop.mapred.job.TaskScheduler is an option, but usually overkill.

If you're going to have several concurrent users and leverage the more interactive aspects of the Hadoop environment (e.g. Pig and Hive scripting), the FairScheduler is definitely the way to go. In particular, we can do user-specific pools so that default users get their fair share, and specific users are given the resources their workloads require.

To enable fair scheduling, we're going to need to do a couple of things. First, we need to tell the JobTracker that we want to use scheduling and where we're going to be defining our allocations. We do this by adding the following to the mapred-site.xml file in HADOOP_HOME/conf:

<property>
<name>mapred.jobtracker.taskScheduler</name>
<value>org.apache.hadoop.mapred.FairScheduler</value>
</property>

<property>
<name>mapred.fairscheduler.allocation.file</name>
<value>/path/to/allocations.xml</value>
</property>

<property>
<name>mapred.fairscheduler.poolnameproperty</name>
<value>pool.name</value>
</property>

<property>
<name>pool.name</name>
<value>${user.name}</name>
</property>

What we've done here is simply tell the JobTracker that we'd like to task scheduling to use the FairScheduler class rather than a single FIFO queue. Moreover, we're going to be defining our resource pools and allocations in a file called allocations.xml For reference, the allocation file is read every 15s or so, which allows for tuning allocations without having to take down the JobTracker.

Our allocation file is now going to look a little like this

<?xml version="1.0"?>
<allocations>
<pool name="dan">
<minMaps>5</minMaps>
<minReduces>5</minReduces>
<maxMaps>25</maxMaps>
<maxReduces>25</maxReduces>
<minSharePreemptionTimeout>300</minSharePreemptionTimeout>
</pool>
<mapreduce.job.user.name="dan">
<maxRunningJobs>6</maxRunningJobs>
</user>
<userMaxJobsDefault>3</userMaxJobsDefault>
<fairSharePreemptionTimeout>600</fairSharePreemptionTimeout>
</allocations>

In this case, I've explicitly set my username to have upper and lower bounds on the maps and reduces, and allotted myself double the number of running jobs. Now, if I run hive or pig jobs from either the console or via the Hue web interface, I'll be treated "fairly" by the JobTracker. There's a lot more tweaking that can be done to the allocations file, so it's best to dig down into the description and start trying out allocations that might fit your workload.

[Read More]

Friday Oct 14, 2011

A Closer Look at Oracle Big Data Appliance

Oracle Openworld just flew by… a lot of things happened in the big data space of course and you can read a lot of articles, blogs and other interesting materials all over.

What I thought I’d do here is to go through the big data appliance in a little more detail so everyone understands what the make-up of the machine is, what software we are putting on the machine and how it integrates with the Exadata machines.

Now, if you are bored reading, you can actually see and hear Todd and me discuss all this stuff using this link. This should be fun if you have never been to Openworld, as the interview is recorded at the OTN Lounge in the Howard street tent.

Oracle Big Data Appliance

The machine details are as follows:

  • 18 Nodes – Sun Servers
  • 2 CPUs per node, each with 6 cores (216 cores total)
  • 12 Disks per node (432 TB raw disk total)
  • Redundant InfiniBand Switches with 10GigE connectivity

To scale the machines, simply add a rack to the original full rack via InfiniBand. By leveraging InfiniBand we generally remove the network bottlenecks in the machine and between the machines. We chose InfiniBand over the 10GigE connectivity because we do believe network capacity of 40Gb/sec is a valuable asset in a Hadoop cluster. We also think that using InfiniBand to connect the big data appliance to an Exadata machine will have a positive influence of the batch loads done into an Oracle system.

cache_fusion_states

The software we are going to pre-install on the machine is:

  • Oracle Linux and Oracle Hotspot
  • Open-source distribution of Apache Hadoop
  • Oracle NoSQL Database Enterprise Edition (also available stand-alone)
  • Oracle Loader for Hadoop (also available stand-alone)
  • Open-source distribution of R (statistical package)
  • Oracle Data Integrator Application Adapter for Hadoop (also available stand-alone with ODI)

The goal of this software stack combined with the Sun hardware as an appliance is to create an enterprise class solution for Big Data that is:

  • Optimized and Complete - Everything you need to store and integrate your lower information density data
  • Integrated with Oracle Exadata - Analyze all your data
  • Easy to Deploy - Risk Free, Quick Installation and Setup
  • Single Vendor Support - Full Oracle support for the entire system and software set

As we get closer to the delivery date, you will see more detailed descriptions of the appliance, so stay tuned.

Thursday Sep 29, 2011

Added Session: Big Data Appliance

We added a new session to discuss Oracle Big Data Appliance. Here are the session details:

Oracle Big Data Appliance: Big Data for the Enterprise
Wednesday 10:15 AM
Marriott Marquis - Golden Gate C3 

Should be a fun session... see you all there!!

Monday Sep 19, 2011

Focus On: Big Data at Openworld

With Oracle Openworld rapidly approaching and many new things coming / being announced around big data at Openworld, I figured it is good to share some of the sessions that are interesting in the big data context.

All big data related session can be found here: Focus on Big Data.

A couple of important highlights to set the scene:

  • Sunday 5.30pm: Openworld Welcome Keynote featuring some of the announcements
  • Monday 2.00pm: Extreme Data Management - Are you Ready? by Andy Mendelsohn - this one promises to be a very fun session mixing fun and technical content

With those session behind your belt, you are ready to dive into the details as listed in the Focus On document. And if you just want to look at the new machines, come visit us at the Engineered Systems Showcase in front of the key note hall or come look around the Big Data area in the database demogrounds section!

See you all in San Francisco!

Friday Aug 05, 2011

Big Data: In-Memory MapReduce

Achieving the impossible often comes at a cost. In the today’s big data world that cost is often latency. Bringing real-time, deep analytics to big data requires improvements to the core of the big data platform. In this post we will discuss parallel analytics and how we can bring in-memory – or real-time – to very large data volumes.

The Parallel Platform that run in Memory

One of the often overlooked things we did in Oracle 11.2 is that we changed the behavior of data that resides in memory across a RAC cluster. Instead of restricting the data size to the size of a the memory in a single node in the cluster, we now allow the database to see the memory as a large pool. We no longer do cache fusion to replicate all pieces of data in an object to all nodes.

cache_fusion_states

That cache fusion process is show above. In state A, node 1 has acquired pieces of data into the buffer cache, as has node 2. These are different pieces of data. Cache fusion now kicks in and ensures that all data for that object is in the cache of each node.

In 11.2 we have changed this by allowing parallel execution to leverage the buffer cache as a grid. To do that we no longer do cache fusion, but instead pin (affinitize is the not so nice English word we use) data onto the memory of a node based on some internal algorithm. Parallel execution keeps track of where data lives and shuffles the query to that node (rather than using cache fusion to move data around).

inmemory_grid

The above shows how this works. P1 and P2 are chunks of data living in the buffer cache and are affinitized with a certain node. The parallel server processes on that node will execute the query and send the results back to the node that originated the query. With this in-memory grid we can process much larger data volumes.

In-Database MapReduce becomes In-Memory MapReduce

We talked about in-database MapReduce quite a bit on this blog, so I won’t repeat myself. If you are new to this topic, have a look at this post.

Because of how the database is architected, any code running within it leveraging parallelism can now use the data hosted in memory of the machine. Whether this is across the grid or not, doesn’t matter. So rather than having to figure out how to create a system that allows for MapReduce code to run in memory, you need to just figure out how to write the code, Oracle will ensure that if the data fits, it will leverage memory instead of disk. That is shown in the following picture.

inmemory_MR

Now, is this big data? Well, if I look at an Exadata X2-8  machine today, we will have 2TB of memory to work with and with Exadata Hybrid Columnar Compression (yes the data resides in memory on compressed state) this means I should easily be able to run upwards of 10TB in memory.  As memory footprints grow, more data will fit within the memory and we can do more and more interesting analytics on that data, bringing at least realtime to some fairly complex analytics.

More at Oracle Openworld

For those of you who are going to San Francisco (with or without flowers in your hat), expect to see and hear a lot more on big data and MapReduce! See you there!

Monday Jun 27, 2011

Big Data Accelerator

For everyone who does not regularly listen to earnings calls, Oracle's Q4 call was interesting (as it mostly is). One of the announcements in the call was the Big Data Accelerator from Oracle (Seeking Alpha link here - slightly tweaked for correctness shown below):

 "The big data accelerator includes some of the standard open source software, HDFS, the file system and a number of other pieces, but also some Oracle components that we think can dramatically speed up the entire map-reduce process. And will be particularly attractive to Java programmers [...]. There are some interesting applications they do, ETL is one. Log processing is another. We're going to have a lot of those features, functions and pre-built applications in our big data accelerator."

 Not much else we can say right now, more on this (and Big Data in general) at Openworld!

Tuesday Jun 07, 2011

Big Data: Achieve the Impossible in Real-Time

Sure, we all want to make the impossible possible… in any scenario, in any business. Here we are talking about driving performance to levels previously considered impossible and doing so by using just data and advanced analytics.
An amazing example of this is the BMW Oracle Americas cup boat and its usage of sensor data and deep analytics (story here).

Consider these two quotes from the article:

"They were measuring an incredible number of parameters across the trimaran, collected 10 times per second, so there were vast amounts of [sensor] data available for analysis. An hour of sailing generates 90 million data points."

"[…] we could compare our performance from the first day of sailing to the very last day of sailing, with incremental improvements the whole way through. With data mining we could check data against the things we saw, and we could find things that weren't otherwise easily observable and findable."

Winning the Cup

BMW Oracle Racing © Photo Gilles Martin-Raget

The end result of all of this (and do read the entire article, it is truly amazing with things like data projected in sunglasses!) that the guys on the boat can make a sailboat to go THREE times as fast as the wind that propels the boat.

To make this magic happen, a couple of things had to be done:

  1. Put the sensors in place and capture all the data
  2. Organize the data and analyze all of it in real-time
  3. Provide the decisions to the people who need it, exactly when they need it (like in the helmsman’s sunglasses!)
  4. Convince the best sailors in the world to trust and use the analysis to drive the boat

Since this blog is not about sailing but about data warehousing, big data and other (only slightly) less cool things, the intent is to explain how you can deliver magic like this in your company?

Move your company onto the next value curve

The above example gives you an actual environment where the combination of high volume, high velocity sensor data, deep analytics and real-time decisions are used to drive performance. This example is a real big data story.

Sure, a multi-billion dollar business will collect often more data, but the point of the above story is analyzing a previously unseen, massive influx of data – the team estimated 40x more data than in conventional environments. However, the extra interesting aspect is that decisions are automated. Rather than flooding the sunglasses with data, only relevant decisions and data are projected. No need for the helmsman to interpret the data, he needed to simply act on the decision points.

To project the idea of acting on decision points into an organization, your IT will have to start changing, as will your end users. To do so, you need to jump onto the bandwagon called big data. The following describes how to get on that bandwagon.

Today, your organization is doing the best it can by leveraging its current IT and DW platforms. That means – for most organizations – that you have squeezed all the relevant information out of the historical data assets you analyze. You are the dot on the lower value curve and you are on the plateau. Any extra dollar invested in the plateau is just about keeping the lights on, not about generating competitive advantage or business value. To jump to the next curve, you need to find some way to harness the challenges imposed by big data.

Value Curves Today

From an infrastructure perspective, you must design a big data platform. That big data platform is a fundamental part of your IT infrastructure if your company wants to compete over the next few years.

Value Curves Tomorrow

The main components in the big data platform provide:

  • Deep Analytics – a fully parallel, extensive and extensible toolbox full of advanced and novel statistical and data mining capabilities
  • High Agility – the ability to create temporary analytics environments in an end-user driven, yet secure and scalable environment to deliver new and novel insights to the operational business
  • Massive Scalability – the ability to scale analytics and sandboxes to previously unknown scales while leveraging previously untapped data potential
  • Low Latency – the ability to instantly act based on these advanced analytics in your operational, production environments

Read between the lines and you see that the big data platform is based on the three hottest topics in the industry: security, cloud computing and big data, all working in conjunction to deliver the next generation big data computing platform.

IT Drives Business Value

Over the next couple of years, companies which drive efficiency, agility and IT as a service via the cloud, which drive new initiatives and top line growth leveraging big data and analytics, keep all their data safe and secure, will be the leaders in their industry.

Oracle is building the next generation big data platforms on these three pillars: cloud, security and big data. Over the next couple of months – leading up to Oracle OpenWorld – we will cover details about Oracle’s analytical platform and in-memory computing for real-time big data (and general purpose speed!) on this blog.

A little bit of homework to prepare you for those topics is required. If you have not yet read the following, do give them a go, they are a good read:

These - older - blog posts will get you an understanding of in-database mapreduce techniques, how to integrate with Hadoop and a peak at some futuristic applications that I think would be generally cool and surely be coming down the pipeline in some form or fashion.

Sunday Sep 27, 2009

In-Memory Parallel Execution in Oracle Database 11gR2

As promised, the next entry in our 11gR2 explorations is In-Memory Parallel Execution. If you are going to Oracle OpenWorld next month make sure you check out the following session:

Tuesday, October 13 2009 5:30PM, Moscone South Room 308
Session S311420
Extreme Performance with Oracle Database 11g and In-Memory Parallel Execution.

In this session you will get more details and insight from the folks who actually built this functionality! A must see if this is of any interest, so book that ticket now and register!

Down to business, what is "In-Memory Parallel Execution"?

Let's begin by having a quick trip down memory-lane back to Oracle Database 7 when Parallel Execution (PX) was first introduced. The goal of PX then and now is to reduce the time it takes to complete a complex SQL statement by using multiple processes to go after the necessary data instead of just one process. Up until now these parallel server processes, typically by-passed the buffer cache and read the necessary data directly from disk. The main reasoning for this was that the objects accessed by PX were large and would not fit into the buffer cache. Any attempt made to read these large objects into the cache would have resulted in trashing the cache content.

However, as hardware systems have evolved; the memory capacity on a typical database server have become extremely large. Take for example the 2 CPU socket Sun server being used in new the Sun Oracle Database Machine. It has an impressive 72GB of memory, giving a full Database Machine (8 database nodes) over ½ a TB of memory. Suddenly using the buffer cache to hold large object doesn't seem so impossible any more.

In-Memory Parallel Execution (In-Memory PX) takes advantage of these larger buffer caches but it also ensures we don't trash the cache.

In-Memory PX begins by determining if the working set (group of database blocks) necessary for a query fits into the aggregated buffer cache of the system. If the working set does not fit then the objects will be accessed via direct path IO just as they were before. If the working set fits into the aggregated buffer cache then the blocks will be distributed among the nodes and the blocks will be affinitzed or associated with that node.

In previous releases, if the Parallel Execution of one statement read part of an object into the buffer cache, then subsequent SQL statement on other nodes in the cluster would access that data via Cache Fusion. This behavior could eventually result in a full copy of that table in every buffer cache in the cluster. In-Memory PX is notably different because Cache Fusion will not be used to copy the data from its original node to another node, even if a parallel SQL statement that requires this data is issued from another node. Instead Oracle uses the parallel server process on the same node (that the data resides on) to access the data and will return only the result to the node where the statement was issued.

The decision to use the aggregated buffer cache is based on an advanced set of heuristics that include; the size of the object, the frequency at which the object changes and is accessed, and the size of the aggregated buffer cache. If the object meets these criteria it will be fragmented or broken up into pieces and each fragment will be mapped to a specific node. If the object is hash partitioned then each partition becomes a fragment, otherwise the mapping is based on the FileNumber and ExtentNumber.

 

InMemoryPX_final.jpg

 

To leverage In-Memory PX you must set the initialization parameter PARALLEL_DEGREE_POLICY to AUTO (default MANUAL). Once this is set, the database controls which objects are eligible to be read into the buffer cache and which object will reside there at any point in time. It is not possible to manual control the behavior for specific statements.

Obviously this post is more of a teaser, for in-depth discussions on this, go to Openworld and/or keep an eye out for a new white paper called Parallel Execution Fundemental in Oracle Database 11gR2 that will be coming soon to oracle.com. This paper not only covers In-Memory PX but Auto-DOP and parallel statement queuing.

Stay tuned for more on 11gR2 coming soon...

About

The data warehouse insider is written by the Oracle product management team and sheds lights on all thing data warehousing and big data.

Search

Archives
« April 2015
SunMonTueWedThuFriSat
   
1
2
3
4
5
6
9
10
11
12
13
15
16
17
18
19
20
21
22
23
25
26
27
28
29
30
  
       
Today