Tuesday Mar 03, 2015

Why SQL is the natural language for data analysis

Analytics is a must-have component of every corporate data warehousing and big data project. It is the core driver for the business: the development of new products, better targeting of customers with promotions, hiring of new talent and retention of existing key talent. Yet the analysis of especially “big data environments”, data stored and processed outside of classical relational systems, continues to be a significant challenge for the majority companies. According to Gartner, 72% of companies are planning to increase their expenditure on big data yet 55% state they don’t have the necessary skills to make use of it. 

The objective of this series of articles, which will appear over the coming weeks, is to explain why SQL is the natural language for amy kind of data analysis including big data and the benefits that this brings for application developers, DBAs and business users. 

[Read More]

Tuesday Nov 11, 2014

Optimizing Table Scans with Zone Maps

Most of you will be familiar with partition pruning, where the Oracle Database will avoid the need to scan table and index partitions based on query predicates. This optimization is transparent to your application, but for it to work, the database has to find a way of mapping a query filter predicate to the partitioning key column (or columns). Partition pruning can only occur if the query has predicates that match the predetermined shape of a partitioned object. For example, a query on a SALES table partitioned by ORDER_DATE will need to include ORDER_DATE in a join or WHERE clause for it to be optimized by partition pruning.

What if you could do better than this? What if you could prune partitions using a variety of column predicates and dimension hierarchies, irrespective of their appearance in the partitioning key? How about pruning at a much finer level of granularity than a partition? Perhaps we want to optimize queries that filter SALES by SHIP_DATE, STATE and COUNTY, as well as ORDER_DATE. The new Oracle zone map feature is designed to achieve this, and just like partitioning, zone maps are transparent to your queries; you don’t have to change your applications to make use of them.

Zone maps are available in Oracle Database 12c for Oracle Engineered Systems. Conceptually, they divide a table up into contiguous regions of blocks called zones (the default zone size being 1024 blocks). For each zone, the Oracle database records the minimum and maximum values for specified columns using a new database object called a zone map. Queries that filter on zone map columns have the potential to be optimized; it’s possible to prune zones that contain ranges of column values outside the match specified in the query predicate.

Consider a query that filters a sales table by (North American) state; in this case “CA”. A zone map on the STATE column will record the minimum and maximum values for this column for each zone in the table. This makes it possible to skip the zones that we can be certain won’t contain rows for “CA”.

Zone Map Representation

You are probably aware that Exadata storage cells and the Oracle database In-Memory Column Store uses similar storage index techniques, so what benefits do zone maps add? Besides the fact that you can control zone maps explicitly, the most significant difference between zone maps and storage indexes is that zone maps can be used to prune zones using column predicates from multiple (joined) tables. Consider a more realistic scenario, in which the SALES table doesn’t have a STATE column, but instead has a LOCATION_ID referencing a dimension table called LOCATIONS. This is our query for summing the sales figures in California:

SELECT SUM(amount)
FROM   sales     s,
       locations l
WHERE  s.location_id = l.location_id
AND    l.state = 'CA';

It would be great if we could avoid scanning zones in SALES that don’t contain rows associated with “CA”. Before we look at how we can do this, we’ll make the scenario even more realistic by assuming that LOCATIONS is a dimensional hierarchy of State and County, like this:

Zone Map Locations Table Example

 Each State is made up of multiple Counties, so “CA” will be associated with multiple LOCATION_ID values. If we want the “CA” rows in SALES, we’ll need to match the ones marked below in bold/red:

Zone Map Sales Table Example

If we want to optimize a scan for “CA” rows, we will have to address a few issues:

  • The SALES table does not have a STATE column, so no storage index structure on SALESs data will allow us to directly prune disk regions based on “CA”.
  • Table rows associated with “CA” are likely to be physically scattered throughout the SALES table, so it’s unlikely that these rows will be confined to a relatively small number of zones or disk regions. We might not be able to make efficient use of an Exadata storage index on SALES.LOCATION_ID, if any (note that I am consciously ignoring the push down of BLOOM FILTERs to Exadata here, which still suffers from the physical scattering).
  • A SALES storage index based on min/max Location ID is likely to be less efficient than using zones based on min/max State values, simply because each State is made up of multiple Location IDs. This inefficiency is more pronounced if Location IDs for “CA” are not numerically close to one another - it will reduce the chances that the Location IDs we’re searching for will be found within the same min/max Location ID regions.

Of course, zone maps are designed to address these issues - with a little bit of help from another Oracle Database 12c feature called attribute clustering.  I introduced attribute clustering in an earlier post, but don’t worry if you haven’t read that yet; I’ll cover the basics here anyway. You’ve probably deduced that we can reduce the number of zones that contain “CA” rows if we cluster or sort the rows in SALES, keeping these rows close to one another, like this:

Table with Zone Map

Attribute clustering is the feature that’s used to cluster the rows together. Zone maps are used to record the min/max values for specified columns for each zone (and this can include column values derived from joins; LOCATIONS.STATE and LOCATIONS.COUNTY in our case).

The following DDL will create a zone map on our SALES fact table using the dimension table columns LOCATIONS.STATE and LOCATIONS.COUNTY. It will also enable attribute clustering, using the same columns to cluster the table’s rows:

JOIN locations ON (sales_ac.location_id = locations.location_id) 
BY LINEAR ORDER (locations.state, locations.county)

The LINEAR ORDER clause specifies a linear clustering algorithm, which is ideal for this example. Another algorithm is available; it is specified with "INTERLEAVED" and is optimized for more complex combinations of query predicates and dimension tables. Note that the definition of attribute clustering by itself does not change any data stored on disk; instead, it provides a directive for direct path operations; INSERT APPEND and MOVE that will physically perform the clustering operation for us. If there are pre-existing rows in SALES, we can MOVE the table (or its partitions) to re-order them.

Joins between SALES and the dimension table are now candidates for optimization when the query includes predicates on the dimension hierarchy “state” and “state, county”. For example:

SELECT SUM(amount)
FROM   sales
JOIN   locations  ON (sales.location_id = locations.location_id) 
WHERE  locations.state  = 'NM';
SELECT SUM(amount)
FROM   sales
JOIN   locations  ON (sales.location_id = locations.location_id) 
WHERE  locations.state  = 'CA'
AND    locations.county = 'Kern';

By clustering the rows and recording appropriate min/max column values for our zones, we have addressed all of the issues I identified above. What’s more, we can still get benefit from Exadata storage indexes because zone maps and storage indexes complement one another, and they work together transparently.

Zone maps are explicitly created and controlled by the database administrator on a table-by-table basis. They are an inherent part of the physical database design and can be thought of as a coarse anti-index structure (unlike an index, a zone map tells you what zones not to access). Zone maps are very compact, and in some cases it is possible to use them where you would otherwise use an index. This is most relevant in data warehousing environments where scanning is often more appropriate than indexed row retrieval, and where indexes can use a considerable amount of storage space. Zone maps must be refreshed to be synchronized with the underlying table data, so you will need to give some consideration to how you want them to be kept up-to-date if you decide to use them as an alternative to indexes.

In summary, take a look at zone maps if you want to:

  • Optimize scanning queries, particular when joining with one or more tables.
  • Reduce your dependency on indexes, particularly in data warehousing environments.
  • Improve performance in your data warehouse; particularly for star or snowflake schemas.

Here’s an example of using zone maps to optimize a table scan. To compare before and after, start by creating a table that has no zone map or attribute clustering:

CREATE TABLE sales_zm (sale_id NUMBER(10), customer_id NUMBER(10));

Insert 8 million rows with the following PL/SQL code. Why that many? With our example, we'll read one or two zones rather than the entire table, so I'm aiming to make the difference pretty obvious when you look at the block read statistics:

  i NUMBER(10);
  FOR i IN 1..80
    INSERT INTO sales_zm
    FROM   dual
    CONNECT BY LEVEL <= 100000;
EXEC dbms_stats.gather_table_stats(ownname=>NULL, tabname=>'SALES_ZM');

Run the following query a few times to see what value “consistent gets” settles at:

FROM   sales_zm
WHERE  customer_id = 50;

On my machine, I read 7,545 blocks from the buffer cache, but since the value depends on some storage defaults don’t be surprised if your value is different:

Before Zone Map

The following DDL will create a zone map, but since attribute clustering is a property of the table (like compression), any existing rows will not be re-ordered:


The zone map will not be efficient until we cluster the rows together, so we’ll MOVE the table to achieve this. This will refresh the zone map too:


Run the same query a few times to see what value “consistent gets” settles at:

FROM   sales_zm
WHERE  customer_id = 50;

On my database, I read around 1,051 database blocks instead of 7,545: a considerable improvement:

After Zone Map

You'll find more examples covering zone maps and attribute clustering in the Oracle Learning Library and inside the Oracle Github repository. Full details on zone maps and attribute clustering can be found in the Oracle documentation library; particularly the Oracle 12c Database Data Warehousing Guide.

There's an earlier post on attribute clustering if you haven't read it already.

If there's anything to need to ask, or if you can't find what you need regarding zone maps or attribute clustering, please let me know by leaving a comment below. Thanks!

Friday Sep 26, 2014

Oracle Big Data Lite 4.0 Virtual Machine Now Available

Big Data Lite 4.0 is now available for download from OTN.  There are lots of new capabilities in this latest version:
  • Oracle Database 12c (, including new JSON support and Oracle Big Data SQL-enabled external tables.  Check out this hands-on lab to learn how to securely analyze all your data - across both Hadoop and Oracle Database 12c - using Big Data SQL.
  • New versions of SQL Developer and Data Modeler that support Hive access and automatic generation of Big Data SQL external tables
  • GoldenGate and the latest ODI versions are now included - with some great new hands-on labs.
  • Cloudera Manager is back - you can now optionally use CM to manage your Hadoop environment (requires 10GB memory devoted to the VM).  If you don't want to use CM, you can use the manual CDH configuration with the Big Data Lite services application
  • New versions of the entire stack... Big Data Connectors, NoSQL Database, CDH, JDeveloper and more.

Here's the inventory of all the features and version:

  • Oracle Enterprise Linux 6.4
  • Oracle Database 12c Release 1 Enterprise Edition ( - including Oracle Big Data SQL-enabled external tables, Oracle Advanced Analytics, OLAP, Spatial and more
  • Cloudera Distribution including Apache Hadoop (CDH5.1.2)
  • Cloudera Manager (5.1.2)
  • Oracle Big Data Connectors 4.0
    • Oracle SQL Connector for HDFS 3.1.0
    • Oracle Loader for Hadoop 3.2.0
    • Oracle Data Integrator 12c
    • Oracle R Advanced Analytics for Hadoop 2.4.1
    • Oracle XQuery for Hadoop 4.0.1
  • Oracle NoSQL Database Enterprise Edition 12cR1 (3.0.14)
  • Oracle JDeveloper 12c (12.1.3)
  • Oracle SQL Developer and Data Modeler 4.0.3
  • Oracle Data Integrator 12cR1 (12.1.3)
  • Oracle GoldenGate 12c
  • Oracle R Distribution 3.1.1

Monday Sep 15, 2014

Oracle SQL Developer & Data Modeler Support for Oracle Big Data SQL

Oracle SQL Developer and Data Modeler (version 4.0.3) now support Hive and Oracle Big Data SQL.  The tools allow you to connect to Hive, use the SQL Worksheet to query, create and alter Hive tables, and automatically generate Big Data SQL-enabled Oracle external tables that dynamically access data sources defined in the Hive metastore.  

Let's take a look at what it takes to get started and then preview this new capability.

Setting up Connections to Hive

The first thing you need to do is set up a JDBC connection to Hive.  Follow these steps to set up the connection:

Download and Unzip JDBC Drivers

Cloudera provides high performance JDBC drivers that are required for connectivity:

  • Download the Hive Drivers from the Cloudera Downloads page to a local directory
  • Unzip the archive
    • unzip hive_jdbc_2.5.15.1040.zip
  • Three zip files are contained within the archive.  Unzip the JDBC4 archive to a target directory that is accessible to SQL Developer (e.g. /home/oracle/jdbc below): 
    • unzip Cloudera_HiveJDBC4_2.5.15.1040.zip -d /home/oracle/jdbc/
    • Note: you will get an error when attempting to open a Hive connection in SQL Developer if you use a different JDBC version. Ensure you use JDBC4 and not JDBC41.

Now that the JDBC drivers have been extracted, update SQL Developer to use the new drivers.

Update SQL Developer to use the Cloudera Hive JDBC Drivers

Update the preferences in SQL Developer to leverage the new drivers:

  • Start SQL Developer
  • Go to Tools -> Preferences
  • Navigate to Database -> Third Party JDBC Drivers
  • Add all of the jar files contained in the zip to the Third-party JDBC Driver Path.  It should look like the picture below:
    sql developer preferences

  • Restart SQL Developer

Create a Connection

Now that SQL Developer is configured to access Hive, let's create a connection to Hiveserver2.  Click the New Connection button in the SQL Developer toolbar.  You'll need to have an ID, password and the port where Hiveserver2 is running:

connect to hiveserver2

The example above is creating a connection called hive which connects to Hiveserver2 on localhost running on port 10000.  The Database field is optional; here we are specifying the default database.

Using the Hive Connection

The Hive connection is now treated like any other connection in SQL Developer.  The tables are organized into Hive databases; you can review the tables' data, properties, partitions, indexes, details and DDL:

sqldeveloper - view data in hive

And, you can use the SQL Worksheet to run custom queries, perform DDL operations - whatever is supported in Hive:


Here, we've altered the definition of a hive table and then queried that table in the worksheet.

Create Big Data SQL-enabled Tables Using Oracle Data Modeler

Oracle Data Modeler automates the definition of Big Data SQL-enabled external tables.  Let's create a few tables using the metadata from the Hive Metastore.  Invoke the import wizard by selecting the File->Import->Data Modeler->Data Dictionary menu item.  You will see the same connections found in the SQL Developer connection navigator:

pick a connection

After selecting the hive connection and a database, select the tables to import:

pick tables to import

There could be any number of tables here - in our case we will select three tables to import.  After completing the import, the logical table definitions appear in our palette:

imported tables

You can update the logical table definitions - and in our case we will want to do so.  For example, the recommended column in Hive is defined as a string (i.e. there is no precision) - which the Data Modeler casts as a varchar2(4000).  We have domain knowledge and understand that this field is really much smaller - so we'll update it to the appropriate size:

update prop

Now that we're comfortable with the table definitions, let's generate the DDL and create the tables in Oracle Database 12c.  Use the Data Modeler DDL Preview to generate the DDL for those tables - and then apply the definitions in the Oracle Database SQL Worksheet:

preview ddl

Edit the Table Definitions

The SQL Developer table editor has been updated so that it now understands all of the properties that control Big Data SQL external table processing.  For example, edit table movieapp_log_json:

edit table props

You can update the source cluster for the data, how invalid records should be processed, how to map hive table columns to the corresponding Oracle table columns (if they don't match), and much more.

Query All Your Data

You now have full Oracle SQL access to data across the platform.  In our example, we can combine data from Hadoop with data in our Oracle Database.  The data in Hadoop can be in any format - Avro, json, XML, csv - if there is a SerDe that can parse the data - then Big Data SQL can access it!  Below, we're combining click data from the JSON-based movie application log with data in our Oracle Database tables to determine how the company's customers rate blockbuster movies:

compare to blockbuster movies

Looks like they don't think too highly of them! Of course - the ratings data is fictitious ;)

Friday Sep 12, 2014

Announcement: Big Data SQL is Generally Available

Oracle Big Data SQL and Big Data Appliance 4.0 are generally available

Big Data Appliance 4.0 receives the following upgrades:


  • Big Data SQL: Join data in Hadoop with Oracle Database using Oracle SQL 
    • New external table types for handling data stored in Hadoop 
    • Smart Scan for Hadoop to provide fast query performance 
    • Requires additional license for Big Data SQL 
    • Requires Exadata Database Machine running DB 
  • Automated recovery from server failure 
    • This includes migration of master roles to a different server and re-provisioning of a slave node on a server that has been replaced 
  • NoSQL DB multiple-zone configurations on BDA 
    • When adding nodes to a NoSQL DB BDA cluster, they can be added to an existing zone or to a new zone. 
  •  Update to using the 12c ODI Agent on BDA clusters 

For more information on Big Data SQL, check out:



[Read More]

Tuesday Jul 15, 2014

Oracle Big Data SQL: One Fast Query, All Your Data


Today we're pleased to announce Big Data SQL, Oracle's unique approach to providing unified query over data in Oracle Database, Hadoop, and select NoSQL datastores.  Big Data SQL has been in development for quite a while now, and will be generally available in a few months.  With today's announcement of the product, I wanted to take a chance to explain what we think is important and valuable about Big Data SQL.

SQL on Hadoop

As anyone paying attention to the Hadoop ecosystem knows, SQL-on-Hadoop has seen a proliferation of solutions in the last 18 months, and just as large a proliferation of press.  From good, ol' Apache Hive to Cloudera Impala and SparkSQL, these days you can have SQL-on-Hadoop any way you like it.  It does, however, prompt the question: Why SQL?

There's an argument to be made for SQL simply being a form of skill reuse.  If people and tools already speak SQL, then give the people what they know.  In truth, that argument falls flat when one considers the sheer pace at which the Hadoop ecosystem evolves.  If there were a better language for querying Big Data, the community would have turned it up by now.

I think the reality is that the SQL language endures because it is uniquely suited to querying datasets.  Consider, SQL is a declarative language for operating on relations in data.  It's a domain-specific language where the domain is datasets.  In and of itself, that's powerful: having language elements like FROM, WHERE and GROUP BY make reasoning about datasets simpler.  It's set theory set into a programming language.

It goes beyond just the language itself.  SQL is declarative, which means I only have to reason about the shape of the result I want, not the data access mechanisms to get there, the join algorithms to apply, how to serialize partial aggregations, and so on.  SQL lets us think about answers, which lets us get more done.

SQL on Hadoop, then, is somewhat obvious.  As data gets bigger, we would prefer to only have to reason about answers.

SQL On More Than Hadoop

For all the obvious goodness of SQL on Hadoop, there's a somewhat obvious drawback.  Specifically, data rarely lives in a single place.  Indeed, if Big Data is causing a proliferation of new ways to store and process data, then there are likely more places to store data then every before.  If SQL on Hadoop is separate from SQL on a DBMS, I run the risk of constructing every IT architect's least favorite solution: the stovepipe.

If we want to avoid stovepipes, what we really need is the ability to run SQL queries that work seamlessly across multiple datastores.  Ideally, in a Big Data world, SQL should "play data where it lies," using the declarative power of the language to provide answers from all data.

This is why we think Oracle Big Data SQL is obvious too.

It's just a little more complicated than SQL on any one thing.  To pull it off, we have to do a few things:

  • Maintain the valuable characteristics of the system storing the data
  • Unify metadata to understand how to execute queries
  • Optimize execution to take advantage of the systems storing the data

For the case of a relational database, we might say that the valuable storage characteristics include things like: straight-through processing, change-data logging, fine-grained access controls, and a host of other things.

For Hadoop, I believe that the two most valuable storage characteristics are scalability and schema-on-read.  Cost-effective scalability is one of the first things that people look to HDFS for, so any solution that does SQL over a relational database and Hadoop has to understand how HDFS scales and distributes data.  Schema-on-read is at least equally important if not more.  As Daniel Abadi recently wrote, the flexibility of schema-on-read is gives Hadoop tremendous power: dump data into HDFS, and access it without having to convert it to a specific format.  So, then, any solution that does SQL over a relational database and Hadoop is going to have to respect the schemas of the database, but be able to really apply schema-on-read principals to data stored in Hadoop.

Oracle Big Data SQL maintains all of these valuable characteristics, and it does it specifically through the approaches taken for unifying metadata and optimizing performance.

Big Data SQL queries data in a DBMS and Hadoop by unifying metadata and optimizing performance.

Unifying Metadata

To unify metadata for planning and executing SQL queries, we require a catalog of some sort.  What tables do I have?  What are their column names and types?  Are there special options defined on the tables?  Who can see which data in these tables?

Given the richness of the Oracle data dictionary, Oracle Big Data SQL unifies metadata using Oracle Database: specifically as external tables.  Tables in Hadoop or NoSQL databases are defined as external tables in Oracle.  This makes sense, given that the data is external to the DBMS.

Wait a minute, don't lots of vendors have external tables over HDFS, including Oracle?

 Yes, but Big Data SQL provides as an external table is uniquely designed to preserve the valuable characteristics of Hadoop.  The difficulty with most external tables is that they are designed to work on flat, fixed-definition files, not distributed data which is intended to be consumed through dynamically invoked readers.  That causes both poor parallelism and removes the value of schema-on-read.

  The external tables Big Data SQL presents are different.  They leverage the Hive metastore or user definitions to determine both parallelism and read semantics.  That means that if a file in HFDS is 100 blocks, Oracle database understands there are 100 units which can be read in parallel.  If the data was stored in a SequenceFile using a binary SerDe, or as Parquet data, or as Avro, that is how the data is read.  Big Data SQL uses the exact same InputFormat, RecordReader, and SerDes defined in the Hive metastore to read the data from HDFS.

Once that data is read, we need only to join it with internal data and provide SQL on Hadoop and a relational database.

Optimizing Performance

Being able to join data from Hadoop with Oracle Database is a feat in and of itself.  However, given the size of data in Hadoop, it ends up being a lot of data to shift around.  In order to optimize performance, we must take advantage of what each system can do.

In the days before data was officially Big, Oracle faced a similar challenge when optimizing Exadata, our then-new database appliance.  Since many databases are connected to shared storage, at some point database scan operations can become bound on the network between the storage and the database, or on the shared storage system itself.  The solution the group proposed was remarkably similar to much of the ethos that infuses MapReduce and Apache Spark: move the work to the data and minimize data movement.

The effect is striking: minimizing data movement by an order of magnitude often yields performance increases of an order of magnitude.

Big Data SQL takes a play from both the Exadata and Hadoop books to optimize performance: it moves work to the data and radically minimizes data movement.  It does this via something we call Smart Scan for Hadoop.

Moving the work to the data is straightforward.  Smart Scan for Hadoop introduces a new service into to the Hadoop ecosystem, which is co-resident with HDFS DataNodes and YARN NodeManagers.  Queries from the new external tables are sent to these services to ensure that reads are direct path and data-local.  Reading close to the data speeds up I/O, but minimizing data movement requires that Smart Scan do some things that are, well, smart.

Smart Scan for Hadoop

Consider this: most queries don't select all columns, and most queries have some kind of predicate on them.  Moving unneeded columns and rows is, by definition, excess data movement and impeding performance.  Smart Scan for Hadoop gets rid of this excess movement, which in turn radically improves performance.

For example, suppose we were querying a 100 of TB set of JSON data stored in HDFS, but only cared about a few fields -- email and status -- and only wanted results from the state of Texas.

Once data is read from a DataNode, Smart Scan for Hadoop goes beyond just reading.  It applies parsing functions to our JSON data, discards any documents which do not contain 'TX' for the state attribute.  Then, for those documents which do match, it projects out only the email and status attributes to merge with the rest of the data.  Rather than moving every field, for every document, we're able to cut down 100s of TB to 100s of GB.

The approach we take to optimizing performance with Big Data SQL makes Big Data much slimmer.


So, there you have it: fast queries which join data in Oracle Database with data in Hadoop while preserving the makes each system a valuable part of overall information architectures.  Big Data SQL unifies metadata, such that data sources can be queried with the best possible parallelism and the correct read semantics.  Big Data SQL optimizes performance using approaches inspired by Exadata: filtering out irrelevant data before it can become a bottleneck.

It's SQL that plays data where it lies, letting you place data where you think it belongs.

[Read More]

Wednesday Mar 26, 2014

Oracle Big Data Lite Virtual Machine - Version 2.5 Now Available

Oracle Big Data Appliance Version 2.5 was released last week.  Some great new features in this release- including a continued security focus (on-disk encryption and automated configuration of Sentry for data authorization) and updates to Cloudera Distribution of Apache Hadoop and Cloudera Manager.

With each BDA release, we have a new release of Oracle Big Data Lite Virtual Machine.  Oracle Big Data Lite provides an integrated environment to help you get started with the Oracle Big Data platform. Many Oracle Big Data platform components have been installed and configured - allowing you to begin using the system right away. The following components are included on Oracle Big Data Lite Virtual Machine v 2.5:

  • Oracle Enterprise Linux 6.4
  • Oracle Database 12c Release 1 Enterprise Edition (
  • Cloudera’s Distribution including Apache Hadoop (CDH4.6)
  • Cloudera Manager 4.8.2
  • Cloudera Enterprise Technology, including:
    • Cloudera RTQ (Impala 1.2.3)
    • Cloudera RTS (Search 1.2)
  • Oracle Big Data Connectors 2.5
    • Oracle SQL Connector for HDFS 2.3.0
    • Oracle Loader for Hadoop 2.3.1
    • Oracle Data Integrator 11g
    • Oracle R Advanced Analytics for Hadoop 2.3.1
    • Oracle XQuery for Hadoop 2.4.0
  • Oracle NoSQL Database Enterprise Edition 12cR1 (2.1.54)
  • Oracle JDeveloper 11g
  • Oracle SQL Developer 4.0
  • Oracle Data Integrator 12cR1
  • Oracle R Distribution 3.0.1

Go to the Oracle Big Data Lite Virtual Machine landing page on OTN to download the latest release.

Monday Mar 24, 2014

Demonstration: Auditing Data Access Across the Enterprise

Security has been an important theme across recent Big Data Appliance releases. Our most recent release includes encryption of data at rest and automatic configuration of Sentry for data authorization. This is in addition to the security features previously added to the BDA, including Kerberos-based authentication, network encryption and auditing.

Auditing data access across the enterprise - including databases, operating systems and Hadoop - is critically important and oftentimes required for SOX, PCI and other regulations. Let's take a look at a demonstration of how Oracle Audit Vault and Database Firewall delivers comprehensive audit collection, alerting and reporting of activity on an Oracle Big Data Appliance and Oracle Database 12c. 


In this scenario, we've set up auditing for both the BDA and Oracle Database 12c.


The Audit Vault Server is deployed to its own secure server and serves as mission control for auditing. It is used to administer audit policies, configure activities that are tracked on the secured targets and provide robust audit reporting and alerting. In many ways, Audit Vault is a specialized auditing data warehouse. It automates ETL from a variety of sources into an audit schema and then delivers both pre-built and ad hoc reporting capabilities.

For our demonstration, Audit Vault agents are deployed to the BDA and Oracle Database 12c monitored targets; these agents are responsible for managing collectors that gather activity data. This is a secure agent deployment; the Audit Vault Server has a trusted relationship with each agent. To set up the trusted relationship, the agent makes an activation request to the Audit Vault Server; this request is then activated (or "approved") by the AV Administrator. The monitored target then applies an AV Server generated Agent Activation Key to complete the activation.


On the BDA, these installation and configuration steps have all been automated for you. Using the BDA's Configuration Generation Utility, you simply specify that you would like to audit activity in Hadoop. Then, you identify the Audit Vault Server that will receive the audit data. Mammoth - the BDA's installation tool - uses this information to configure the audit processing. Specifically, it sets up audit trails across the following services:

  • HDFS: collects all file access activity
  • MapReduce:  identifies who ran what jobs on the cluster
  • Oozie:  audits who ran what as part of a workflow
  • Hive:  captures changes that were made to the Hive metadata

There is much more flexibility when monitoring the Oracle Database. You can create audit policies for SQL statements, schema objects, privileges and more. Check out the auditor's guide for more details. In our demonstration, we kept it simple: we are capturing all select statements on the sensitive HR.EMPLOYEES table, all statements made by the HR user and any unsuccessful attempts at selecting from any table in any schema.

Now that we are capturing activity across the BDA and Oracle Database 12c, we'll set up an alert to fire whenever there is suspicious activity attempted over sensitive HR data in Hadoop:


In the alert definition found above, a critical alert is defined as three unsuccessful attempts from a given IP address to access data in the HR directory. Alert definitions are extremely flexible - using any audited field as input into a conditional expression. And, they are automatically delivered to the Audit Vault Server's monitoring dashboard - as well as via email to appropriate security administrators.

Now that auditing is configured, we'll generate activity by two different users: oracle and DrEvil. We'll then see how the audit data is consolidated in the Audit Vault Server and how auditors can interrogate that data.

Capturing Activity

The demonstration is driven by a few scripts that generate different types of activity by both the oracle and DrEvil users. These activities include:

  • an oozie workflow that removes salary data from HDFS
  • numerous HDFS commands that upload files, change file access privileges, copy files and list the contents of directories and files
  • hive commands that query, create, alter and drop tables
  • Oracle Database commands that connect as different users, create and drop users, select from tables and delete records from a table

After running the scripts, we log into the Audit Vault Server as an auditor. Immediately, we see our alert has been triggered by the users' activity.


Drilling down on the alert reveals DrEvil's three failed attempts to access the sensitive data in HDFS:

alert details

Now that we see the alert triggered in the dashboard, let's see what other activity is taking place on the BDA and in the Oracle Database.

Ad Hoc Reporting

Audit Vault Server delivers rich reporting capabilities that enables you to better understand the activity that has taken place across the enterprise. In addition to the numerous reports that are delivered out of box with Audit Vault, you can create your own custom reports that meet your own personal needs. Here, we are looking at a BDA monitoring report that focuses on Hadoop activities that occurred in the last 24 hours:

monitor events

As you can see, the report tells you all of the key elements required to understand: 1) when the activity took place, 2) the source service for the event, 3) what object was referenced, 4) whether or not the event was successful, 5) who executed the event, 6) the ip address (or host) that initiated the event, and 7) how the object was modified or accessed. Stoplight reporting is used to highlight critical activity - including DrEvils failed attempts to open the sensitive salaries.txt file.

Notice, events may be related to one another. The Hive command "ALTER TABLE my_salarys RENAME TO my_salaries" will generate two events. The first event is sourced from the Metastore; the alter table command is captured and the metadata definition is updated. The Hive command also impacts HDFS; the table name is represented by an HDFS folder. Therefore, an HDFS event is logged that renames the "my_salarys" folder to "my_salaries".

Next, consider an Oozie workflow that performs a simple task: delete a file "salaries2.txt" in HDFS. This Oozie worflow generates the following events:


  1. First, an Oozie workflow event is generated indicating the start of the workflow.
  2. The workflow definition is read from the "workflow.xml" file found in HDFS.
  3. An Oozie working directory is created
  4. The salaries2.txt file is deleted from HDFS
  5. Oozie runs its clean-up process

The Audit Vault reports are able to reveal all of the underlying activity that is executed by the Oozie workflow. It's flexible reporting allows you to sequence these independent events into a logical series of related activities.

The reporting focus so far has been on Hadoop - but one of the core strengths of Oracle Audit Vault is its ability to consolidate all audit data. We know that DrEvil had a few unsuccessful attempts to access sensitive salary data in HDFS. But, what other unsuccessful events have occured recently across our data platform? We'll use Audit Vault's ad hoc reporting capabilities to answer that question. Report filters enable users to search audit data based on a range of conditions. Here, we'll keep it pretty simple; let's find all failed access attempts across both the BDA and the Oracle Database within the last two hours:


Again, DrEvil's activity stands out. As you can see, DrEvil is attempting to access sensitive salary data not only in HDFS - but also in the Oracle Database.


Security and integration with the rest of the Oracle ecosystem are two tablestakes that are critical to Oracle Big Data Appliance releases. Oracle Audit Vault and Database Firewall's auditing of data across the BDA, databases and operating systems epitomizes this goal - providing a single repository and reporting environment for all your audit data.

Monday Jan 27, 2014

Announcing: Oracle Big Data Lite Virtual Machine

You've been hearing alot about Oracle's big data platform. Today, we're pleased to announce Oracle Big Data Lite Virtual Machine - an environment to help you get started with the platform. And, we have a great OTN Virtual Developer Day event scheduled where you can start using our big data products as part of a series of workshops.

BigDataLite Picture

Oracle Big Data Lite Virtual Machine is an Oracle VM VirtualBox that contains many key components of Oracle's big data platform, including: Oracle Database 12c Enterprise Edition, Oracle Advanced Analytics, Oracle NoSQL Database, Cloudera Distribution including Apache Hadoop, Oracle Data Integrator 12c, Oracle Big Data Connectors, and more. It's been configured to run on a "developer class" computer; all Big Data Lite needs is a couple of cores and about 5GB memory to run (this means that your computer should have at least 8GB total memory). With Big Data Lite, you can develop your big data applications and then deploy them to the Oracle Big Data Appliance. Or, you can use Big Data Lite as a client to the BDA during application development.

How do you get started? Why not start by registering for the Virtual Developer Day scheduled for Tuesday, February 4, 2014 - 9am  to 1pm PT / 12pm to 4pm ET / 3pm to 7pm BRT:


There will be 45 minute sessions delivered by product experts (from both Oracle and Oracle Aces) - highlighted by Tom Kyte and Jonathan Lewis' keynote "Landscape of Oracle Database Technology Evolution". Some of the big data technical sessions include:

  • Oracle NoSQL Database Installation and Cluster Topology Deployment
  • Application Development & Schema Design with Oracle NoSQL Database
  • Processing Twitter Data with Hadoop
  • Use Data from a Hadoop Cluster with Oracle Database
  • Make the Right Offers to Customers Using Oracle Advanced Analytics
  • In-DB Map Reduce with SQL/Hadoop
  • Pattern Matching in SQL

Keep an eye on this space - we'll be publishing how-to's that leverage the new Oracle Big Data Lite VM. And, of course, we'd love to hear about the clever applications you build as well!

Friday Dec 20, 2013

BAE Systems Choose Big Data Appliance for Critical Projects

Here's another great story about how to use data warehousing and big data technologies to solve real world problems using diverse sets of data using Oracle technology. BAE Systems is taking unstructured, semi-structured, operational and social media data and using it to solve complex problems such as financial crime, cyber security and digital transformation. The volumes of data that BAE deals with are very large and this creates its own set of challenges and problems in terms of optimising hardware and software to work efficiently and effectively together. Although BAE had their own in-house Hadoop experts they chose Oracle Big Data Appliance for their Hadoop cluster because it’s easier, cheaper, and faster to operate.

BAE is working with many telco customers to explore the new areas that are being opened up by the use of big data to manage browsing data and call record data. These data sources are being transformed to provide additional insight for the network operations teams, analysis of customer quality and to drive marketing campaigns.




Click on the image to watch the video, or click here: http://medianetwork.oracle.com/video/player/2940549413001

Friday Dec 13, 2013

New! Oracle DW-Big Data Monthly Roundup Magazine

Are you interested in learning how Oracle customers are taking advantage of Oracle's Data Warehousing and Big Data Platform?  Want to keep up on the latest product releases and how they might impact your organization?  Looking for best practices that describe how to most effectively apply Oracle DW-Big Data technology?

Check out Oracle's new monthly magazine - Oracle DW-Big Data Monthly Roundup.  Powered by Flipboard - you can now view all this content on your favorite device. 

Let us know what you think!

Wednesday Nov 06, 2013

Swiss Re increases data warehouse performance and deploys in record time

Great information on yet another data warehouse deployment on Exadata.

A little background on Swiss Re:

In 2002, Swiss Re established a data warehouse for its client markets and products to gather reinsurance information across all organizational units into an integrated structure. The data warehouse provided the basis for reporting at the group level with drill-down capability to individual contracts, while facilitating application integration and data exchange by using common data standards. Initially focusing on property and casualty reinsurance information only, it now includes life and health reinsurance, insurance, and nonlife insurance information.

Key highlights of the benefits that Swiss Re achieved by using Exadata:

  • Reduced the time to feed the data warehouse and generate data marts by 58%
  • Reduced average runtime by 24% for standard reports
  • comfortably loading two data warehouse refreshes per day with incremental feeds
  • Freed up technical experts by significantly minimizing time spent on tuning activities

Most importantly this was one of the fastest project deployments in Swiss Re's history. They went from installation to production in just four months! What is truly surprising is the that it only took two weeks between power-on to testing the machine with full data volumes! Business teams at Swiss Re are now able to fully exploit up-to-date analytics across property, casualty, life, health insurance, and reinsurance lines to identify successful products.

These points are highlighted in the following quotes from Dr. Stephan Gutzwiller, Head of Data Warehouse Services at Swiss Re: 

"We were operating a complete Oracle stack, including servers, storage area network, operating systems, and databases that was well optimized and delivered very good performance over an extended period of time. When a hardware replacement was scheduled for 2012, Oracle Exadata was a natural choice—and the performance increase was impressive. It enabled us to deliver analytics to our internal customers faster, without hiring more IT staff"

“The high quality data that is readily available with Oracle Exadata gives us the insight and agility we need to cater to client needs. We also can continue re-engineering to keep up with the increasing demand without having to grow the organization. This combination creates excellent business value.”

Our full press release is available here: http://www.oracle.com/us/corporate/customers/customersearch/swiss-re-1-exadata-ss-2050409.html. If you want more information about how Exadata can increase the performance of your data warehouse visit our home page: http://www.oracle.com/us/products/database/exadata-database-machine/overview/index.html

Sunday Apr 07, 2013

Three Little Hive UDFs: Part 3


In the final installment in our series on Hive UDFs, we're going to tackle the least intuitive of the three types: the User Defined Aggregating Function.  While they're challenging to implement, UDAFs are necessary if we want functions for which the distinction of map-side v. reduce-side operations are opaque to the user.  If a user is writing a query, most would prefer to focus on the data they're trying to compute, not which part of the plan is running a given function.

The UDAF also provides a valuable opportunity to consider some of the nuances of distributed programming and parallel database operations.  Since each task in a MapReduce job operates in a bit of a vacuum (e.g. Map task A does not know what data Map task B has), a UDAF has to explicitly account for more operational states than a simple UDF.  We'll return to the notion of a simple Moving Average function, but ask yourself: how do we compute a moving average if we don't have state or order around the data?  

As before, the code is available on github, but we'll excerpt the important parts here.

 Prefix Sum: Moving Average without State

In order to compute a moving average without state, we're going to need a specialized parallel algorithm.  For moving average, the "trick" is to use a prefix sum, effectively keeping a table of running totals for quick computation (and recomputation) of our moving average.  A full discussion of prefix sums for moving averages is beyond length of a blog post, but John Jenq provides an excellent discussion of the technique as applied to CUDA implementations.

What we'll cover here is the necessary implementation of a pair of classes to store and operate on our prefix sum entry within the UDAF.

class PrefixSumMovingAverage {
    static class PrefixSumEntry implements Comparable
        int period;
        double value;
        double prefixSum;
        double subsequenceTotal;
        double movingAverage;
        public int compareTo(Object other)
            PrefixSumEntry o = (PrefixSumEntry)other;
            if (period < o.period)
                return -1;
            if (period > o.period)
                return 1;
            return 0;


Here we have the definition of our moving average class and the static inner class which serves as an entry in our table.  What's important here are some of the variables we define for each entry in the table: the time-index or period of the value (its order), the value itself, the prefix sum,  the subsequence total, and the moving average itself.  Every entry in our table requires not just the current value to compute the moving average, but also sum of entries in our moving average window.  It's the pair of these two values which allows prefix sum methods to work their magic.

//class variables
    private int windowSize;
    private ArrayList<PrefixSumEntry> entries;
    public PrefixSumMovingAverage()
        windowSize = 0;
    public void reset()
        windowSize = 0;
        entries = null;
    public boolean isReady()
        return (windowSize > 0);

The above are simple initialization routines: a constructor, a method to reset the table, and a boolean method on whether or not the object has a prefix sum table on which to operate.  From here, there are 3 important methods to examine: add, merge, and serialize.  The first is intuitive, as we scan rows in Hive we want to add them to our prefix sum table.  The second are important because of partial aggregation.  

We cannot say ahead of time where this UDAF will run, and partial aggregation may be required.  That is, it's entirely possible that some values may run through the UDAF during a map task, but then be passed to a reduce task to be combined with other values.  The serialize method will allow Hive to pass the partial results from the map side to the reduce side.  The merge method allows reducers to combine the results of partial aggregations from the map tasks.

  public void add(int period, double v)
    //Add a new entry to the list and update table
    PrefixSumEntry e = new PrefixSumEntry();
    e.period = period;
    e.value = v;
    // do we need to ensure this is sorted?
    //if (needsSorting(entries))
    // update the table
    // prefixSums first
    double prefixSum = 0;
    for(int i = 0; i < entries.size(); i++)
        PrefixSumEntry thisEntry = entries.get(i);
        prefixSum += thisEntry.value;
        thisEntry.prefixSum = prefixSum;
        entries.set(i, thisEntry);

 The first part of the add task is simple: we add the element to the list and update our table's prefix sums.

 // now do the subsequence totals and moving averages
    for(int i = 0; i < entries.size(); i++)
        double subsequenceTotal;
        double movingAverage;
        PrefixSumEntry thisEntry = entries.get(i);
        PrefixSumEntry backEntry = null;
        if (i >= windowSize)
            backEntry = entries.get(i-windowSize);
        if (backEntry != null)
            subsequenceTotal = thisEntry.prefixSum - backEntry.prefixSum;
            subsequenceTotal = thisEntry.prefixSum;
        movingAverage = subsequenceTotal/(double)windowSize;
thisEntry.subsequenceTotal = subsequenceTotal;
thisEntry.movingAverage = movingAverage;
entries.set(i, thisEntry);


In the second half of the add function, we compute our moving averages based on the prefix sums.  It's here you can see the hinge on which the algorithm swings: thisEntry.prefixSum - backEntry.prefixSum -- that offset between the current table entry and it's nth predecessor makes the whole thing work.

public ArrayList<DoubleWritable> serialize()
    ArrayList<DoubleWritable> result = new ArrayList<DoubleWritable>();
    result.add(new DoubleWritable(windowSize));
    if (entries != null)
        for (PrefixSumEntry i : entries)
            result.add(new DoubleWritable(i.period));
            result.add(new DoubleWritable(i.value));
    return result;


The serialize method needs to package the results of our algorithm to pass to another instance of the same algorithm, and it needs to do so in a type that Hadoop can serialize.  In the case of a method like sum, this would be relatively simple: we would only need to pass the sum up to this point.  However, because we cannot be certain whether this instance of our algorithm has seen all the values, or seen them in the correct order, we actually need to serialize the whole table.  To do this, we create a list ofDoubleWritables, pack the window size at its head, and then each period and value.  This gives us a structure that's easy to unpack and merge with other lists with the same construction.

  public void merge(List<DoubleWritable> other)
    if (other == null)
    // if this is an empty buffer, just copy in other
    // but deserialize the list
    if (windowSize == 0)
        windowSize = (int)other.get(0).get();
        entries = new ArrayList<PrefixSumEntry>();
        // we're serialized as period, value, period, value
        for (int i = 1; i < other.size(); i+=2)
            PrefixSumEntry e = new PrefixSumEntry();
            e.period = (int)other.get(i).get();
            e.value = other.get(i+1).get();


Merging results is perhaps the most complicated thing we need to handle.  First, we check the case in which there was no partial result passed -- just return and continue.  Second, we check to see if this instance of PrefixSumMovingAverage already has a table.  If it doesn't, we can simply unpack the serialized result and treat it as our window.

   // if we already have a buffer, we need to add these entries
        // we're serialized as period, value, period, value
        for (int i = 1; i < other.size(); i+=2)
            PrefixSumEntry e = new PrefixSumEntry();
            e.period = (int)other.get(i).get();
            e.value = other.get(i+1).get();


The third case is the non-trivial one: if this instance has a table and receives a serialized table, we must merge them together.  Consider a Reduce task: as it receives outputs from multiple Map tasks, it needs to merge all of them together to form a larger table.  Thus, merge will be called many times to add these results and reassemble a larger time series.

// sort and recompute
    // update the table
    // prefixSums first
    double prefixSum = 0;
    for(int i = 0; i < entries.size(); i++)
        PrefixSumEntry thisEntry = entries.get(i);
        prefixSum += thisEntry.value;
        thisEntry.prefixSum = prefixSum;
        entries.set(i, thisEntry);


This part should look familiar, it's just like the add method.  Now that we have new entries in our table, we need to sort by period and recompute the moving averages.  In fact, the rest of the merge method is exactly like the add method, so we might consider putting sorting and recomputing in a separate method.

Orchestrating Partial Aggregation

We've got a clever little algorithm for computing moving average in parallel, but Hive can't do anything with it unless we create a UDAF that understands how to use our algorithm.  At this point, we need to start writing some real UDAF code.  As before, we extend a generic class, in this case GenericUDAFEvaluator.

  public static class GenericUDAFMovingAverageEvaluator extends GenericUDAFEvaluator {
        // input inspectors for PARTIAL1 and COMPLETE
        private PrimitiveObjectInspector periodOI;
        private PrimitiveObjectInspector inputOI;
        private PrimitiveObjectInspector windowSizeOI;
        // input inspectors for PARTIAL2 and FINAL
        // list for MAs and one for residuals
        private StandardListObjectInspector loi;

 As in the case of a UDTF, we create ObjectInspectors to handle type checking.  However, notice that we have inspectors for different states: PARTIAL1, PARTIAL2, COMPLETE, and FINAL.  These correspond to the different states in which our UDAF may be executing.  Since our serialized prefix sum table isn't the same input type as the values our add method takes, we need different type checking for each.

        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
            // initialize input inspectors
            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE)
                assert(parameters.length == 3);
                periodOI = (PrimitiveObjectInspector) parameters[0];
                inputOI = (PrimitiveObjectInspector) parameters[1];
                windowSizeOI = (PrimitiveObjectInspector) parameters[2];

 Here's the beginning of our overrided initialization function.  We check the parameters for two modes, PARTIAL1 and COMPLETE.  Here we assume that the arguments to our UDAF are the same as the user passes in a query: the period, the input, and the size of the window.  If the UDAF instance is consuming the results of our partial aggregation, we need a different ObjectInspector.  Specifically, this one:

                loi = (StandardListObjectInspector) parameters[0];


Similar to the UDTF, we also need type checking on the output types -- but for both partial and full aggregation. In the case of partial aggregation, we're returning lists of DoubleWritables:

              // init output object inspectors
            if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2) {
                // The output of a partial aggregation is a list of doubles representing the
                // moving average being constructed.
                // the first element in the list will be the window size
                return ObjectInspectorFactory.getStandardListObjectInspector(


 But in the case of FINAL or COMPLETE, we're dealing with the types that will be returned to the Hive user, so we need to return a different output.  We're going to return a list of structs that contain the period, moving average, and residuals (since they're cheap to compute).

else {
                // The output of FINAL and COMPLETE is a full aggregation, which is a
                // list of DoubleWritable structs that represent the final histogram as
                // (x,y) pairs of bin centers and heights.
                ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
                ArrayList<String> fname = new ArrayList<String>();
                return ObjectInspectorFactory.getStandardListObjectInspector(
                 ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi) );


Next come methods to control what happens when a Map or Reduce task is finished with its data.  In the case of partial aggregation, we need to serialize the data.  In the case of full aggregation, we need to package the result for Hive users.

    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
      // return an ArrayList where the first parameter is the window size
      MaAgg myagg = (MaAgg) agg;
      return myagg.prefixSum.serialize();
    public Object terminate(AggregationBuffer agg) throws HiveException {
      // final return value goes here
      MaAgg myagg = (MaAgg) agg;
      if (myagg.prefixSum.tableSize() < 1)
        return null;
        ArrayList<DoubleWritable[]> result = new ArrayList<DoubleWritable[]>();
        for (int i = 0; i < myagg.prefixSum.tableSize(); i++)
double residual = myagg.prefixSum.getEntry(i).value - myagg.prefixSum.getEntry(i).movingAverage;
            DoubleWritable[] entry = new DoubleWritable[3];
            entry[0] = new DoubleWritable(myagg.prefixSum.getEntry(i).period);
            entry[1] = new DoubleWritable(myagg.prefixSum.getEntry(i).movingAverage);
entry[2] = new DoubleWritable(residual);
        return result;


We also need to provide instruction on how Hive should merge the results of partial aggregation.  Fortunately, we already handled this in our PrefixSumMovingAverage class, so we can just call that.

    public void merge(AggregationBuffer agg, Object partial) throws HiveException {
        // if we're merging two separate sets we're creating one table that's doubly long
        if (partial != null)
            MaAgg myagg = (MaAgg) agg;
            List<DoubleWritable> partialMovingAverage = (List<DoubleWritable>) loi.getList(partial);


Of course, merging and serializing isn't very useful unless the UDAF has logic for iterating over values.  The iterate method handles this and -- as one would expect -- relies entirely on thePrefixSumMovingAverage class we created.

    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
      assert (parameters.length == 3);
      if (parameters[0] == null || parameters[1] == null || parameters[2] == null)
      MaAgg myagg = (MaAgg) agg;
      // Parse out the window size just once if we haven't done so before. We need a window of at least 1,
      // otherwise there's no window.
      if (!myagg.prefixSum.isReady())
        int windowSize = PrimitiveObjectInspectorUtils.getInt(parameters[2], windowSizeOI);
        if (windowSize < 1)
            throw new HiveException(getClass().getSimpleName() + " needs a window size >= 1");
      //Add the current data point and compute the average
      int p = PrimitiveObjectInspectorUtils.getInt(parameters[0], inputOI);
      double v = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI);


Aggregation Buffers: Connecting Algorithms with Execution

One might notice that the code for our UDAF references an object of type AggregationBuffer quite a lot.  This is because the AggregationBuffer is the interface which allows us to connect our custom PrefixSumMovingAverage class to Hive's execution framework.  While it doesn't constitute a great deal of code, it's glue that binds our logic to Hive's execution framework.  We implement it as such:

 // Aggregation buffer definition and manipulation methods
    static class MaAgg implements AggregationBuffer {
        PrefixSumMovingAverage prefixSum;
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
      MaAgg result = new MaAgg();
      return result;


Using the UDAF

The goal of a good UDAF is that, no matter how complicated it was for us to implement, it's that it be simple for our users.  For all that code and parallel thinking, usage of the UDAF is very straightforward:

ADD JAR /mnt/shared/hive_udfs/dist/lib/moving_average_udf.jar;
CREATE TEMPORARY FUNCTION moving_avg AS 'com.oracle.hadoop.hive.ql.udf.generic.GenericUDAFMovingAverage';
#get the moving average for a single tail number
SELECT TailNum,moving_avg(timestring, delay, 4) FROM ts_example WHERE TailNum='N967CA' GROUP BY TailNum LIMIT 100;

Here we're applying the UDAF to get the moving average of arrival delay from a particular flight.  It's a really simple query for all that work we did underneath.  We can do a bit more and leverage Hive's abilities to handle complex types as columns, here's a query which creates a table of timeseries as arrays.

#create a set of moving averages for every plane starting with N
#Note: this UDAF blows up unpleasantly in heap; there will be data volumes for which you need to throw
#excessive amounts of memory at the problem
CREATE TABLE moving_averages AS
SELECT TailNum, moving_avg(timestring, delay, 4) as timeseries FROM ts_example



We've covered all manner of UDFs: from simple class extensions which can be written very easily, to very complicated UDAFs which require us to think about distributed execution and plan orchestration done by query engines.  With any luck, the discussion has provided you with the confidence to go out and implement your own UDFs -- or at least pay some attention to the complexities of the ones in use every day.

[Read More]

Thursday Apr 04, 2013

Three Little Hive UDFs: Part 2


In our ongoing exploration of Hive UDFs, we've covered the basic row-wise UDF.  Today we'll move to the UDTF, which generates multiple rows for every row processed.  This UDF built its house from sticks: it's slightly more complicated than the basic UDF and allows us an opportunity to explore how Hive functions manage type checking.

 We'll step through some of the more interesting pieces, but as before the full source is available on github here.

Extending GenericUDTF

 Our UDTF is going to produce pairwise combinations of elements in a comma-separated string.  So, for a string column "Apples, Bananas, Carrots" we'll produce three rows:


  • Apples, Bananas
  • Apples, Carrots
  • Bananas, Carrots


As with the UDF, the first few lines are a simple class extension with a decorator so that Hive can describe what the function does.

@Description(name = "pairwise", value = "_FUNC_(doc) - emits pairwise combinations of an input array")
public class PairwiseUDTF extends GenericUDTF {

private PrimitiveObjectInspector stringOI = null;

 We also create an object of PrimitiveObjectInspector, which we'll use to ensure that the input is a string.  Once this is done, we need to override methods for initialization, row processing, and cleanup.


  public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException


    if (args.length != 1) {
      throw new UDFArgumentException("pairwise() takes exactly one argument");
    if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE

        && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() !=

PrimitiveObjectInspector.PrimitiveCategory.STRING) {

      throw new UDFArgumentException("pairwise() takes a string as a parameter");

stringOI = (PrimitiveObjectInspector) args[0];

This UDTF is going to return an array of structs, so the initialize method needs to return aStructObjectInspector object.  Note that the arguments to the constructor come in as an array of ObjectInspector objects.  This allows us to handle arguments in a "normal" fashion but with the benefit of methods to broadly inspect type.  We only allow a single argument -- the string column to be processed -- so we check the length of the array and validate that the sole element is both a primitive and a string.

The second half of the initialize method is more interesting: 

List<String> fieldNames = new ArrayList<String>(2);
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);
    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);


Here we set up information about what the UDTF returns.  We need this in place before we start processing rows, otherwise Hive can't correctly build execution plans before submitting jobs to MapReduce.  The structures we're returning will be two strings per struct, which means we'll needObjectInspector objects for both the values and the names of the fields.  We create two lists, one of strings for the name, the other of ObjectInspector objects.  We pack them manually and then use a factor to get the StructObjectInspector which defines the actual return value. 

Now we're ready to actually do some processing, so we override the process method.

  public void process(Object[] record) throws HiveException {
    final String document = (String) stringOI.getPrimitiveJavaObject(record[0]);
    if (document == null) {
    String[] members = document.split(",");
for (int i = 0; i < members.length - 1; i++)
for (int j = 1; j < members.length; j++)
if (!members[i].equals(members[j]))
forward(new Object[] {members[i],members[j]});


This is simple pairwise expansion, so the logic isn't anything more than a nested for-loop.  There are, though, some interesting things to note.  First, to actually get a string object to operate on, we have to use an ObjectInspector and some typecasting.  This allows us to bail out early if the column value is null.  Once we have the string, splitting, sorting, and looping is textbook stuff.  

The last notable piece is that the process method does not return anything.  Instead, we callforward to emit our newly created structs.  From the context of those used to database internals, this follows the producer-consumer models of most RDBMs.  From the context of those used to MapReduce semantics, this is equivalent to calling write on the Context object.

  public void close() throws HiveException {
    // do nothing


If there were any cleanup to do, we'd take care of it here.  But this is simple emission, so our override doesn't need to do anything.

Using the UDTF

Once we've built our UDTF, we can access it via Hive by adding the jar and assigning it to a temporary function.  However, mixing the results of a UDTF with other columns from the base table requires that we use a LATERAL VIEW.

#Add the Jar
add jar /mnt/shared/market_basket_example/pairwise.jar;
#Create a function
CREATE temporary function pairwise AS 'com.oracle.hive.udtf.PairwiseUDTF';
# view the pairwise expansion output
SELECT m1, m2, COUNT(*) FROM market_basket

LATERAL VIEW pairwise(basket) pwise AS m1,m2 GROUP BY m1,m2;

[Read More]

Three Little Hive UDFs: Part 1


In our ongoing series of posts explaining the in's and out's of Hive User Defined Functions, we're starting with the simplest case.  Of the three little UDFs, today's entry built a straw house: simple, easy to put together, but limited in applicability.  We'll walk through important parts of the code, but you can grab the whole source from github here.

Extending UDF

The first few lines of interest are very straightforward:

@Description(name = "moving_avg", value = "_FUNC_(x, n) - Returns the moving mean of a set of numbers over a window of n observations")
@UDFType(deterministic = false, stateful = true)

public class UDFSimpleMovingAverage extends UDF

We're extending the UDF class with some decoration.  The decoration is important for usability and functionality.  The description decorator allows us to give the Hive some information to show users about how to use our UDF and what it's method signature will be.  The UDFType decoration tells Hive what sort of behavior to expect from our function.

 A deterministic UDF will always return the same output given a particular input.  A square-root computing UDF will always return the same square root for 4, so we can say it is deterministic; a call to get the system time would not be.  The stateful annotation of the UDFType decoration is relatively new to Hive (e.g., CDH4 and above).  The stateful directive allows Hive to keep some static variables available across rows.  The simplest example of this is a "row-sequence," which maintains a static counter which increments with each row processed.

  Since square-root and row-counting aren't terribly interesting, we'll use the stateful annotation to build a simple moving average function.  We'll return to the notion of a moving average later when we build a UDAF, so as to compare the two approaches.

private DoubleWritable result = new DoubleWritable();
  private static ArrayDeque<Double> window;
  int windowSize;
  public UDFSimpleMovingAverage() {


 The above code is basic initialization.  We make a double in which to hold the result, but it needs to be of class DoubleWritable so that MapReduce can properly serialize the data.  We use a deque to hold our sliding window, and we need to keep track of the window's size.  Finally, we implement a constructor for the UDF class.

 public DoubleWritable evaluate(DoubleWritable v, IntWritable n) {
    double sum = 0.0;
    double moving_average;
    double residual;
    if (window == null)
        window = new ArrayDeque<Double>();


Here's the meat of the class: the evaluate method.  This method will be called on each row by the map tasks.  For any given row, we can't say whether or not our sliding window exists, so we initialize it if it's null.

//slide the window
    if (window.size() == n.get())
    window.addLast(new Double(v.get()));
    // compute the average
    for (Iterator<Double> i = window.iterator(); i.hasNext();)

sum += i.next().doubleValue();

Here we deal with the deque and compute the sum of the window's elements.  Deques are essentially double-ended queues, so they make excellent sliding windows.  If the window is full, we pop the oldest element and add the current value.

moving_average = sum/window.size();

return result;

Computing the moving average without weighting is simply dividing the sum of our window by its size.  We then set that value in our Writable variable and return it.  The value is then emitted as part of the map task executing the UDF function.

Going Further

The stateful annotation made it simple for us to compute a moving average since we could keep the deque static.  However, how would we compute a moving average if there was no notion of state between Hadoop tasks? At the end of the series we'll examine a UDAF that does this, but the algorithm ends up being much different.  In the meantime, I challenge the reader to think about what sort of approach is needed to compute the window.

[Read More]

The data warehouse insider is written by the Oracle product management team and sheds lights on all thing data warehousing and big data.


« May 2016