Tuesday Dec 03, 2013

Big Data - Real and Practical Use Cases: Expand the Data Warehouse

As a follow-on to the previous post (here) on use cases, follow the link below to a recording that explains how to go about expanding the data warehouse into a big data platform.

The idea behind it all is to cover a best practice on adding to the existing data warehouse and expanding the system (shown in the figure below) to deal with:

  • Real-Time ingest and reporting on large volumes of data
  • A cost effective strategy to analyze all data across types and at large volumes
  • Deliver SQL access and analytics to all data 
  • Deliver the best query performance to match the business requirements

Roles of the Components

To access the webcast:

  • Register on this page
  • Scroll down and find the session labeled "Best practices for expanding the data warehouse with new big data streams"
  • Have fun
you want the slides, go to slideshare.

Wednesday Nov 27, 2013

Big Data - Real and Practical Use Cases

The goal of this post is to explain in a few succinct patterns how organizations can start to work with big data and identify credible and doable big data projects. This goal is achieved by describing a set of general patterns that can be seen in the market today.
Big Data Usage Patterns
The following usage patterns are derived from actual customer projects across a large number of industries and cross boundaries between commercial enterprises and public sector. These patterns are also geographically applicable and technically feasible with today’s technologies.
This paper will address the following four usage patterns:
  • Data Factory – a pattern that enable an organization to integrate and transform – in a batch method – large diverse data sets before moving this data into an upstream system like an RDBMS or a NoSQL system. Data in the data factory is possibly transient and the focus is on data processing.
  • Data Warehouse Expansion with a Data Reservoir – a pattern that expands the data warehouse with a large scale Hadoop system to capture data at lower grain and higher diversity, which is then fed into upstream systems. Data in the data reservoir is persistent and the focus is on data processing as well as data storage as well as the reuse of data.
  • Information Discovery with a Data Reservoir – a pattern that creates a data reservoir for discovery data marts or discovery systems like Oracle Endeca to tap into a wide range of data elements. The goal is to simplify data acquisition into discovery tools and to initiate discovery on raw data.
  • Closed Loop Recommendation and Analytics system – a pattern that is often considered the holy grail of data systems. This pattern combines both analytics on historical data, event processing or real time actions on current events and closes the loop between the two to continuously improve real time actions based on current and historical event correlation.

Pattern 1: Data Factory

The core business reason to build a Data Factory as it is presented here is to implement a cost savings strategy by placing long-running batch jobs on a cheaper system. The project is often funded by not spending money on the more expensive system – for example by switching Mainframe MIPS off  - and instead leveraging that cost savings to fund the Data Factory. The first figure shows a simplified implementation of the Data Factory.
As the image below shows, the data factory must be scalable, flexible and (more) cost effective for processing the data. The typical system used to build a data factory is Apache Hadoop or in the case of Oracle’s Big Data Appliance – Cloudera’s Distribution including Apache Hadoop (CDH).

data factory

Hadoop (and therefore Big Data Appliance and CDH) offers an extremely scalable environment to process large data volumes (or a large number of small data sets) and jobs. Most typical is the offload of large batch updates, matching and de-duplication jobs etc. Hadoop also offers a very flexible model, where data is interpreted on read, rather than on write. This idea enables a data factory to quickly accommodate all types of data, which can then be processed in programs written in Hive, Pig or MapReduce.
As shown in above the data factory is an integration platform, much like an ETL tool. Data sets land in the data factory, batch jobs process data and this processed data moves into the upstream systems. These upstream systems include RDBMS’s which are then used for various information needs. In the case of a Data Warehouse, this is very close to pattern 2 described below, with the difference that in the data factory data is often transient and removed after the processing is done.
This transient nature of data is not a required feature, but it is often implemented to keep the Hadoop cluster relatively small. The aim is generally to just transform data in a more cost effective manner.
In the case of an upstream system in NoSQL systems, data is often prepared in a specific key-value format to be served up to end applications like a website. NoSQL databases work really well for that purpose, but the batch processing is better left to Hadoop cluster.
It is very common for data to flow in the reverse order or for data from RDBMS or NoSQL databases to flow into the data factory. In most cases this is reference data, like customer master data. In order to process new customer data, this master data is required in the Data Factory.
Because of its low risk profile – the logic of these batch processes is well known and understood – and funding from savings in other systems, the Data Factory is typically an IT department’s first attempt at a big data project. The down side of a Data Factory project is that business users see very little benefits in that they do not get new insights out of big data.

Pattern 2: Data Warehouse Expansion

The common way to drive new insights out of big data is pattern two. Expanding the data warehouse with a data reservoir enables an organization to expand the raw data captured in a system that is able to add agility to the organization. The graphical pattern is shown in below.

DW Expansion

A Data Reservoir – like the Data Factory from Pattern 1 – is based on Hadoop and Oracle Big Data Appliance, but rather then have transient data and just process data and then hand the data off, a Data Reservoir aims to store data at a lower than previously stored grain for a period much longer than previous periods.
The Data Reservoir is initially used to capture data, aggregate new metrics and augment (not replace) the data warehouse with new and expansive KPIs or context information. A very typical addition is the sentiment of a customer towards a product or brand which is added to a customer table in the data warehouse.
The addition of new KPIs or new context information is a continuous process. That is, new analytics on raw and correlated data should find their way into the upstream Data Warehouse on a very, very regular basis.
As the Data Reservoir grows and starts to become known to exist because of the new KPIs or context, users should start to look at the Data Reservoir as an environment to “experiment” and “play” with data. With some rudimentary programming skills power users can start to combine various data elements in the Data Reservoir, using for example Hive. This enables the users to verify a hypotheses without the need to build a new data mart. Hadoop and the Data Reservoir now becomes an economically viable sandbox for power users driving innovation, agility and possibly revenue from hitherto unused data.

Pattern 3: Information Discovery

Agility for power users and expert programmers is one thing, but eventually the goal is to enable business users to discover new and exciting things in the data. Pattern 3 combines the data reservoir with a special information discovery system to provide a Graphical User Interface specifically for data discovery. This GUI emulates in many ways how an end user today searches for information on the internet.
To empower a set of business users to truly discover information, they first and foremost require a Discovery tool. A project should therefore always start with that asset.

Once the Discovery tool (like Oracle Endeca) is in place, it pays to start to leverage the Data Reservoir to feed the Discovery tool. As is shown above, the Data Reservoir is continuously fed with new data. The Discovery tool is a business user’s tool to create ad-hoc data marts in the discovery tool. Having the Data Reservoir simplifies the acquisition by end users because they only need to look in one place for data.
In essence, the Data Reservoir now is used to drive two different systems; the Data Warehouse and the Information Discovery environment and in practice users will very quickly gravitate to the appropriate system. But no matter which system they use, they now have the ability to drive value from data into the organization.

Pattern 4: Closed Loop Recommendation and Analytics System

So far, most of what was discussed was analytics and batch based. But a lot of organizations want to come to some real time interaction model with their end customers (or in the world of the Internet of Things – with other machines and sensors).

Closed Loop System

Hadoop is very good at providing the Data Factory and the Data Reservoir, at providing a sandbox, at providing massive storage and processing capabilities, but it is less good at doing things in real time. Therefore, to build a closed loop recommendation system – which should react in real time – Hadoop is only one of the components .
Typically the bottom half of the last figure is akin to pattern 2 and is used to catch all data, analyze the correlations between recorded events (detected fraud for example) and generate a set of predictive models describing something like “if a, b and c during a transaction – mark as suspect and hand off to an agent”. This model would for example block a credit card transaction.
To make such a system work it is important to use the right technology at both levels. Real time technologies like Oracle NoSQL Database, Oracle Real Time Decisions and Oracle Event Processing work on the data stream in flight. Oracle Big Data Appliance, Oracle Exadata/Database and Oracle Advanced Analytics provide the infrastructure to create, refine and expose the models.


Today’s big data technologies offer a wide variety of capabilities. Leveraging these capabilities with the existing environment and skills already in place according to the four patterns described does enable an organization to benefit from big data today. It is a matter of identifying the applicable pattern for your organization and then to start on the implementation.

The technology is ready. Are you?

Thursday Nov 14, 2013

Data Scientist Boot camp (Skills and Training)

As almost everyone is interested in data science, take the boot camp to get ahead of the curve. Leverage this free Data Science Boot camp from Oracle Academy to learn some of the following things:

  • Introduction: Providing Data-Driven Answers to Business Questions
  • Lesson 1: Acquiring and Transforming Big Data
  • Lesson 2: Finding Value in Shopping Baskets
  • Lesson 3: Unsupervised Learning for Clustering
  • Lesson 4: Supervised Learning for Classification and Prediction
  • Lesson 5: Classical Statistics in a Big Data World
  • Lesson 6: Building and Exploring Graphs

You will also find the code samples that go with the training and you can get of to a running start.

Tuesday Nov 12, 2013

Big Data Appliance X4-2 Release Announcement

Today we are announcing the release of the 3rd generation Big Data Appliance. Read the Press Release here.

Software Focus

The focus for this 3rd generation of Big Data Appliance is:

  • Comprehensive and Open - Big Data Appliance now includes all Cloudera Software, including Back-up and Disaster Recovery (BDR), Search, Impala, Navigator as well as the previously included components (like CDH, HBase and Cloudera Manager) and Oracle NoSQL Database (CE or EE).
  • Lower TCO then DIY Hadoop Systems
  • Simplified Operations while providing an open platform for the organization
  • Comprehensive security including the new Audit Vault and Database Firewall software, Apache Sentry and Kerberos configured out-of-the-box

Hardware Update

A good place to start is to quickly review the hardware differences (no price changes!). On a per node basis the following is a comparison between old and new (X3-2) hardware:

Big Data Appliance X3-2

Big Data Appliance X4-2


2 x 8-Core Intel® Xeon® E5-2660 (2.2 GHz)
2 x 8-Core Intel® Xeon® E5-2650 V2 (2.6 GHz)

12 x 3TB High Capacity SAS

12 x 4TB High Capacity SAS

For all the details on the environmentals and other useful information, review the data sheet for Big Data Appliance X4-2. The larger disks give BDA X4-2 33% more capacity over the previous generation while adding faster CPUs. Memory for BDA is expandable to 512 GB per node and can be done on a per-node basis, for example for NameNodes or for HBase region servers, or for NoSQL Database nodes.

Software Details

More details in terms of software and the current versions (note BDA follows a three monthly update cycle for Cloudera and other software):

Big Data Appliance 2.2 Software Stack Big Data Appliance 2.3 Software Stack
Oracle Linux 5.8 with UEK 1
Oracle Linux 6.4 with UEK 2
Cloudera CDH
CDH 4.3
CDH 4.4
Cloudera Manager
CM 4.6
CM 4.7

And like we said at the beginning it is important to understand that all other Cloudera components are now included in the price of Oracle Big Data Appliance. They are fully supported by Oracle and available for all BDA customers.

For more information:

Monday Nov 04, 2013

New Big Data Appliance Security Features

The Oracle Big Data Appliance (BDA) is an engineered system for big data processing.  It greatly simplifies the deployment of an optimized Hadoop Cluster – whether that cluster is used for batch or real-time processing.  The vast majority of BDA customers are integrating the appliance with their Oracle Databases and they have certain expectations – especially around security.  Oracle Database customers have benefited from a rich set of security features:  encryption, redaction, data masking, database firewall, label based access control – and much, much more.  They want similar capabilities with their Hadoop cluster.   

Unfortunately, Hadoop wasn’t developed with security in mind.  By default, a Hadoop cluster is insecure – the antithesis of an Oracle Database.  Some critical security features have been implemented – but even those capabilities are arduous to setup and configure.  Oracle believes that a key element of an optimized appliance is that its data should be secure.  Therefore, by default the BDA delivers the “AAA of security”: authentication, authorization and auditing.

Security Starts at Authentication

A successful security strategy is predicated on strong authentication – for both users and software services.  Consider the default configuration for a newly installed Oracle Database; it’s been a long time since you had a legitimate chance at accessing the database using the credentials “system/manager” or “scott/tiger”.  The default Oracle Database policy is to lock accounts thereby restricting access; administrators must consciously grant access to users.

Default Authentication in Hadoop

By default, a Hadoop cluster fails the authentication test. For example, it is easy for a malicious user to masquerade as any other user on the system.  Consider the following scenario that illustrates how a user can access any data on a Hadoop cluster by masquerading as a more privileged user.  In our scenario, the Hadoop cluster contains sensitive salary information in the file /user/hrdata/salaries.txt.  When logged in as the hr user, you can see the following files.  Notice, we’re using the Hadoop command line utilities for accessing the data:

$ hadoop fs -ls /user/hrdata

Found 1 items
-rw-r--r--   1 oracle supergroup         70 2013-10-31 10:38 /user/hrdata/salaries.txt

$ hadoop fs -cat /user/hrdata/salaries.txt
Tom Brady,11000000
Tom Hanks,5000000
Bob Smith,250000

User DrEvil has access to the cluster – and can see that there is an interesting folder called “hrdata”. 

$ hadoop fs -ls /user
Found 1 items
drwx------   - hr supergroup          0 2013-10-31 10:38 /user/hrdata

However, DrEvil cannot view the contents of the folder due to lack of access privileges:

$ hadoop fs -ls /user/hrdata
ls: Permission denied: user=drevil, access=READ_EXECUTE, inode="/user/hrdata":oracle:supergroup:drwx------

Accessing this data will not be a problem for DrEvil. He knows that the hr user owns the data by looking at the folder’s ACLs. To overcome this challenge, he will simply masquerade as the hr user. On his local machine, he adds the hr user, assigns that user a password, and then accesses the data on the Hadoop cluster:

$ sudo useradd hr
$ sudo passwd
$ su hr
$ hadoop fs -cat /user/hrdata/salaries.txt
Tom Brady,11000000
Tom Hanks,5000000
Bob Smith,250000

Hadoop has not authenticated the user; it trusts that the identity that has been presented is indeed the hr user. Therefore, sensitive data has been easily compromised. Clearly, the default security policy is inappropriate and dangerous to many organizations storing critical data in HDFS.

Big Data Appliance Provides Secure Authentication

The BDA provides secure authentication to the Hadoop cluster by default – preventing the type of masquerading described above. It accomplishes this thru Kerberos integration.

Figure 1: Kerberos Integration

The Key Distribution Center (KDC) is a server that has two components: an authentication server and a ticket granting service. The authentication server validates the identity of the user and service. Once authenticated, a client must request a ticket from the ticket granting service – allowing it to access the BDA’s NameNode, JobTracker, etc.

At installation, you simply point the BDA to an external KDC or automatically install a highly available KDC on the BDA itself. Kerberos will then provide strong authentication for not just the end user – but also for important Hadoop services running on the appliance. You can now guarantee that users are who they claim to be – and rogue services (like fake data nodes) are not added to the system.

It is common for organizations to want to leverage existing LDAP servers for common user and group management. Kerberos integrates with LDAP servers – allowing the principals and encryption keys to be stored in the common repository. This simplifies the deployment and administration of the secure environment.

Authorize Access to Sensitive Data

Kerberos-based authentication ensures secure access to the system and the establishment of a trusted identity – a prerequisite for any authorization scheme. Once this identity is established, you need to authorize access to the data. HDFS will authorize access to files using ACLs with the authorization specification applied using classic Linux-style commands like chmod and chown (e.g. hadoop fs -chown oracle:oracle /user/hrdata changes the ownership of the /user/hrdata folder to oracle). Authorization is applied at the user or group level – utilizing group membership found in the Linux environment (i.e. /etc/group) or in the LDAP server.

For SQL-based data stores – like Hive and Impala – finer grained access control is required. Access to databases, tables, columns, etc. must be controlled. And, you want to leverage roles to facilitate administration.

Apache Sentry is a new project that delivers fine grained access control; both Cloudera and Oracle are the project’s founding members. Sentry satisfies the following three authorization requirements:

  • Secure Authorization:  the ability to control access to data and/or privileges on data for authenticated users.
  • Fine-Grained Authorization:  the ability to give users access to a subset of the data (e.g. column) in a database
  • Role-Based Authorization:  the ability to create/apply template-based privileges based on functional roles.
With Sentry, “all”, “select” or “insert” privileges are granted to an object. The descendants of that object automatically inherit that privilege. A collection of privileges across many objects may be aggregated into a role – and users/groups are then assigned that role. This leads to simplified administration of security across the system.

Sentry Object Hieararchy

Figure 2: Object Hierarchy – granting a privilege on the database object will be inherited by its tables and views.

Sentry is currently used by both Hive and Impala – but it is a framework that other data sources can leverage when offering fine-grained authorization. For example, one can expect Sentry to deliver authorization capabilities to Cloudera Search in the near future.

Audit Hadoop Cluster Activity

Auditing is a critical component to a secure system and is oftentimes required for SOX, PCI and other regulations. The BDA integrates with Oracle Audit Vault and Database Firewall – tracking different types of activity taking place on the cluster:

Figure 3: Monitored Hadoop services.

At the lowest level, every operation that accesses data in HDFS is captured. The HDFS audit log identifies the user who accessed the file, the time that file was accessed, the type of access (read, write, delete, list, etc.) and whether or not that file access was successful. The other auditing features include:

  • MapReduce:  correlate the MapReduce job that accessed the file
  • Oozie:  describes who ran what as part of a workflow
  • Hive:  captures changes were made to the Hive metadata

The audit data is captured in the Audit Vault Server – which integrates audit activity from a variety of sources, adding databases (Oracle, DB2, SQL Server) and operating systems to activity from the BDA.

Audit Vault Server

Figure 4: Consolidated audit data across the enterprise. 

Once the data is in the Audit Vault server, you can leverage a rich set of prebuilt and custom reports to monitor all the activity in the enterprise. In addition, alerts may be defined to trigger violations of audit policies.


Security cannot be considered an afterthought in big data deployments. Across most organizations, Hadoop is managing sensitive data that must be protected; it is not simply crunching publicly available information used for search applications. The BDA provides a strong security foundation – ensuring users are only allowed to view authorized data and that data access is audited in a consolidated framework.

Tuesday Oct 29, 2013

Start your journey into Big Data with the Oracle Academy today!

 Big Data has the power to change the way we work, live, and think. The datafication of everything will create unprecedented demand for data scientists, software developers and engineers who can derive value from unstructured data to transform the world.

The Oracle Academy Big Data Resource Guide is a collection of articles, videos, and other resources organized to help you gain a deeper understanding of the exciting field of Big Data. To start your journey visit the Oracle Academy website here: https://academy.oracle.com/oa-web-big-data.html. This landing pad will guide through the whole area of big data using the following structure:

  1. What is “Big Data”
  2. Engineered Systems
  3. Integration
  4. Database and Data Analytics
  5. Advanced Information
  6. Supplemental Information

This is great resource packed with must-see videos and must-read whitepapers and blog posts by industry leaders. 


Technorati Tags: , , ,

Tuesday Oct 15, 2013

Are you ready for Hadoop?

To find out, take the assessment here. Have fun!

Wednesday Oct 09, 2013

Big Data Openworld Sessions now available for Download/Viewing

For those who did go to Openworld, the session catalog now has the download links to the session materials online. You can now refresh your memory and share your experience with the rest of your organization. For those who did not go, here is your chance to look over some of the materials.

On the big data side, here are some of the highlights:

There are a great number of other sessions, simply look for: Solutions => Big Data and Business Analytics and you will find a wealth of interesting content around big data, Hadoop and analytics.

Sentry Meetup at Strata + Hadoop World 2013

Meetup Details and Exact Location Here

Join us for the inaugural Apache Sentry meetup at Oracle's offices in NYC, on the evening of the last day of Strata + Hadoop World 2013 in New York. 

(@ Oracle Offices, 120 Park Ave, 26th Floor -- Note: Bring your ID and check in with security in the lobby!)

We'll kick-off the meetup with the following presentation:

Getting Serious about Security with Sentry

Shreepadma Venugopalan - Lead Engineer for Sentry
Arvind Prabhakar - Engineering Manager for Sentry 
Jacco Draaijer - Development Manager for Oracle Big Data

Apache Hadoop offers strong support for authentication and coarse grained authorization - but this is not necessarily enough to meet the demands of enterprise applications and compliance requirements. Providing fine-grained access to data will enable organizations to store more sensitive information in Hadoop; only those users with the appropriate privileges will ever see that sensitive data.

Cloudera and Oracle are taking the lead on Sentry - a new open source authorization module that integrates with Hadoop-based SQL query engines. Key developers for the project will provide details on its implementation, including:

-Motivations for the project
-Key requirements that Sentry satisfies
-Utilizing Sentry in your applications
-Future plans

Thursday Jul 18, 2013

Practical HDFS Permissions


Documentation and most discussions are quick to point out that HDFS provides OS-level permissions on files and directories.  However, there is less readily-available information about what the effects of OS-level permissions are on accessing data in HDFS via higher-level abstractions such as Hive or Pig.  To provide a bit of clarity, I decided to run through the effects of permissions on different interactions with HDFS.

The Setup

In this scenario, we have three users: oracle, dan, and not_dan.  The oracle user has captured some data in an HDFS directory.  The directory has 750 permissions: read/write/execute for oracle, read/execute for dan, and no access for not_dan.  One of the files in the directory has 700 permissions, meaning that only the oracle user can read it.  Each user will tries to do the following tasks:

  • List the contents of the directory
  • Count the lines in a subset of files including the file with 700 permissions
  • Run a simple Hive query over the directory

Listing Files

Each user issues the command

hadoop fs -ls /user/shared/moving_average|more

And what do they see:

[oracle@localhost ~]$ hadoop fs -ls /user/shared/moving_average|more

Found 564 items

Obviously, the oracle user can see all the files in its own directory.

[dan@localhost oracle]$ hadoop fs -ls /user/shared/moving_average|more
Found 564 items

Similarly, since dan has group read access, that user can also list all the files. The user without group read permissions, however, receives an error.

[not_dan@localhost oracle]$ hadoop fs -ls /user/shared/moving_average|more

ls: Permission denied: user=not_dan, access=READ_EXECUTE,


Counting Rows in the Shell

In this test, each user pipes a set of HDFS files into a unix command and counts rows.  Recall, one of the files has 700 permissions.

The oracle user, again, can see all the available data:

[oracle@localhost ~]$ hadoop fs -cat /user/shared/moving_average/FlumeData.137408218405*|wc -l

The user with partial permissions receives an error on the console, but can access the data they have permissions on.  Naturally, the user without permissions only receives the error.

[dan@localhost oracle]$ hadoop fs -cat /user/shared/moving_average/FlumeData.137408218405*|wc -l
cat: Permission denied: user=dan, access=READ, inode="/user/shared/moving_average/FlumeData.1374082184056":oracle:shared_hdfs:-rw-------
[not_dan@localhost oracle]$ hadoop fs -cat /user/shared/moving_average/FlumeData.137408218405*|wc -l
cat: Permission denied: user=not_dan, access=READ_EXECUTE, inode="/user/shared/moving_average":oracle:shared_hdfs:drwxr-x---

Permissions on Hive

In this final test, the oracle user defines an external Hive table over the shared directory.  Each user issues a simple COUNT(*) query against the directory.  Interestingly, the results are not the same as piping the datastream to the shell.

The oracle user's query runs correctly, while both dan and not_dan's queries fail:

As dan

Job Submission failed with exception 'java.io.FileNotFoundException(File /user/shared/moving_average/FlumeData.1374082184056 does not exist)'

As not_dan

Job Submission failed with exception 'org.apache.hadoop.security.AccessControlException
(Permission denied: user=not_dan, access=READ_EXECUTE,

So, what's going on here? In each case, the query fails, but for different reasons. In the case of not_dan, the query fails because the user has no permissions on the directory. However, the query issued by dan fails because of a FileNotFound exception. Because dan does not have read permissions on the file, Hive cannot find all the files necessary to build the underlying MapReduce job. Thus, the query fails before being submitted to the JobTracker.  The rule then, becomes simple: to issue a Hive query, a user must have read permissions on all files read by the query. If a user has permissions on one set of partition directories,  but not another, they can issue queries against the readable partitions, but not against the entire table.


In a nutshell, the OS-level permissions of HDFS behave just as we would expect in the shell. However, problems can arise when tools like Hive or Pig try to construct MapReduce jobs. As a best practice, permissions structures should be tested against the tools which will access the data. This ensures that users can read
what they are allowed to, in the manner that they need to. 

[Read More]

Wednesday Jul 17, 2013

Oracle: Big Data at Work

There is a lot of hype around big data, but here at Oracle we try to help customers implement big data solutions to solve real business problems. For those of you interested in understanding more about how you can put big data to work at your organization, consider joining these events:

San Jose | August 5 - 6
Marriott San Jose
301 S Market St, San Jose, California 95113
Event Registration Page
Chicago | August 7 - 8
The Westin Michigan Avenue
909 N Michigan Ave, Chicago, IL 60611

New York | August 12 - 13
Marriott Marquis Times Square
1535 Broadway, New York, NY 10036
Event Registration Page


Friday May 10, 2013

Streaming data to and from Hadoop and NoSQL Database

A quick update on some of the integration components needed to build things like M2M (Machine 2 Machine communication) and on integrating fast moving data (events) with the Hadoop and NoSQL Database. As of of the Oracle Event Processing product you now have:

OEP Data Cartridge for Hadoop (the real doc is here)

OEP Data Cartridge for NoSQL Database (the real doc is here)

The fun with these products is that you can now model (in a UI!!) how to interact with these products. For example you can sink data into Hadoop without impacting the stream logic and stream performance and you can do a quick CQL (the OEP language) lookup to our NoSQL DB to resolve for example a customer profile or status lookup.

More to come, but very interesting and really something cool on making products work together out of the box.

Friday May 03, 2013

Videos: How to build out an end-to-end Big Data System

For those interested in understanding how to actually build a big data solution including things like NoSQL Database, Hadoop, MapReduce, Hive, Pig and Analytics (data mining, R) have a look at the big data videos Marty did:

  • Video 1: Using Big Data to Improve the Customer Experience
  • Video 2: Using Big Data to Deliver a Personalized Service
  • Video 3: Using Big Data and NoSQL to Manage On-line Profiles
  • Video 4: Oracle Big Data and Hadoop to Process Log Files
  • Video 5: Integrate All your data with Big Data Connectors
  • Video 6: Maximizing business impact with Big Data

Happy watching and learning.

Thursday Apr 11, 2013

Big Data Appliance - more flexibility, same rapid time to value

Untitled Document

This week Oracle announced the availability (yes you can right away buy and use these systems) to Big Data Appliance X3-2 Starter Rack and Big Data Appliance X3-2 In-Rack Expansion. You can read the press release here. For those who are interested in the operating specs, best to look at the data sheet on OTN.

So what does this mean? In effect this means that you can now start any big data project with an appliance. Whether you are looking to try your hand on your first project with Hadoop, or whether you are building your enterprise Hadoop solution with a large number of nodes, you can now get the benefits of Oracle Big Data Appliance. By leveraging Big Data Appliance for all your big data needs (being this Hadoop or Oracle NoSQL Database) you always get:

  • Reduced risk by having the best of Oracle and Cloudera engineering available in an easy to consume appliance
  • Faster time to value by not spending weeks or months building and tuning your own Hadoop system
  • No cost creep for the cluster as your system is set up and configured for a known cost

Assume you want to start your first implementation on Hadoop, you can now start with the BDA Starter Rack, 6 servers which you can fully deploy for HDFS and MapReduce capabilities (of course we also support for example HBase). All the services are pre-configured, so you have Highly Available NameNodes, automatic failover and a balanced approach to leveraging the 6 servers as Hadoop nodes.As your project grows (and you need more compute power and space to store data) you simply add nodes in chunks of 6 using the In-Rack Expansion, filling up the rack.

Once full you can either add another Starter Rack or add Full Racks to the system. As you do that, Mammoth - the install, configure and patch utility for BDA - ensures that your service nodes are in the appropriate place. For example, once you have 2 cabinets assigned to a single cluster, Mammoth will move the second NameNode to the second Rack for higher fault tolerance.

This new release of Big Data Appliance (the software parts of it) now also include Cloudera CDH 4.2 and Cloudera Manager 4.5. On top of that, you now create multiple clusters on a single BDA Full Rack using just Mammoth, which means you can now patch and update individual clusters on that Full Rack. As you add nodes to a cluster, Mammoth will allow you to choose where to add nodes, how to grow a set of clusters, etc.

Lastly, but not least, there is more flexibility in how to acquire Big Data Appliance Full Rack, as it is now a part of Oracle's Infrastructure as a Service offering, allowing for a smooth capital outflow for your Big Data Appliance. 

Sunday Apr 07, 2013

Three Little Hive UDFs: Part 3


In the final installment in our series on Hive UDFs, we're going to tackle the least intuitive of the three types: the User Defined Aggregating Function.  While they're challenging to implement, UDAFs are necessary if we want functions for which the distinction of map-side v. reduce-side operations are opaque to the user.  If a user is writing a query, most would prefer to focus on the data they're trying to compute, not which part of the plan is running a given function.

The UDAF also provides a valuable opportunity to consider some of the nuances of distributed programming and parallel database operations.  Since each task in a MapReduce job operates in a bit of a vacuum (e.g. Map task A does not know what data Map task B has), a UDAF has to explicitly account for more operational states than a simple UDF.  We'll return to the notion of a simple Moving Average function, but ask yourself: how do we compute a moving average if we don't have state or order around the data?  

As before, the code is available on github, but we'll excerpt the important parts here.

 Prefix Sum: Moving Average without State

In order to compute a moving average without state, we're going to need a specialized parallel algorithm.  For moving average, the "trick" is to use a prefix sum, effectively keeping a table of running totals for quick computation (and recomputation) of our moving average.  A full discussion of prefix sums for moving averages is beyond length of a blog post, but John Jenq provides an excellent discussion of the technique as applied to CUDA implementations.

What we'll cover here is the necessary implementation of a pair of classes to store and operate on our prefix sum entry within the UDAF.

class PrefixSumMovingAverage {
    static class PrefixSumEntry implements Comparable
        int period;
        double value;
        double prefixSum;
        double subsequenceTotal;
        double movingAverage;
        public int compareTo(Object other)
            PrefixSumEntry o = (PrefixSumEntry)other;
            if (period < o.period)
                return -1;
            if (period > o.period)
                return 1;
            return 0;


Here we have the definition of our moving average class and the static inner class which serves as an entry in our table.  What's important here are some of the variables we define for each entry in the table: the time-index or period of the value (its order), the value itself, the prefix sum,  the subsequence total, and the moving average itself.  Every entry in our table requires not just the current value to compute the moving average, but also sum of entries in our moving average window.  It's the pair of these two values which allows prefix sum methods to work their magic.

//class variables
    private int windowSize;
    private ArrayList<PrefixSumEntry> entries;
    public PrefixSumMovingAverage()
        windowSize = 0;
    public void reset()
        windowSize = 0;
        entries = null;
    public boolean isReady()
        return (windowSize > 0);

The above are simple initialization routines: a constructor, a method to reset the table, and a boolean method on whether or not the object has a prefix sum table on which to operate.  From here, there are 3 important methods to examine: add, merge, and serialize.  The first is intuitive, as we scan rows in Hive we want to add them to our prefix sum table.  The second are important because of partial aggregation.  

We cannot say ahead of time where this UDAF will run, and partial aggregation may be required.  That is, it's entirely possible that some values may run through the UDAF during a map task, but then be passed to a reduce task to be combined with other values.  The serialize method will allow Hive to pass the partial results from the map side to the reduce side.  The merge method allows reducers to combine the results of partial aggregations from the map tasks.

  public void add(int period, double v)
    //Add a new entry to the list and update table
    PrefixSumEntry e = new PrefixSumEntry();
    e.period = period;
    e.value = v;
    // do we need to ensure this is sorted?
    //if (needsSorting(entries))
    // update the table
    // prefixSums first
    double prefixSum = 0;
    for(int i = 0; i < entries.size(); i++)
        PrefixSumEntry thisEntry = entries.get(i);
        prefixSum += thisEntry.value;
        thisEntry.prefixSum = prefixSum;
        entries.set(i, thisEntry);

 The first part of the add task is simple: we add the element to the list and update our table's prefix sums.

 // now do the subsequence totals and moving averages
    for(int i = 0; i < entries.size(); i++)
        double subsequenceTotal;
        double movingAverage;
        PrefixSumEntry thisEntry = entries.get(i);
        PrefixSumEntry backEntry = null;
        if (i >= windowSize)
            backEntry = entries.get(i-windowSize);
        if (backEntry != null)
            subsequenceTotal = thisEntry.prefixSum - backEntry.prefixSum;
            subsequenceTotal = thisEntry.prefixSum;
        movingAverage = subsequenceTotal/(double)windowSize;
thisEntry.subsequenceTotal = subsequenceTotal;
thisEntry.movingAverage = movingAverage;
entries.set(i, thisEntry);


In the second half of the add function, we compute our moving averages based on the prefix sums.  It's here you can see the hinge on which the algorithm swings: thisEntry.prefixSum - backEntry.prefixSum -- that offset between the current table entry and it's nth predecessor makes the whole thing work.

public ArrayList<DoubleWritable> serialize()
    ArrayList<DoubleWritable> result = new ArrayList<DoubleWritable>();
    result.add(new DoubleWritable(windowSize));
    if (entries != null)
        for (PrefixSumEntry i : entries)
            result.add(new DoubleWritable(i.period));
            result.add(new DoubleWritable(i.value));
    return result;


The serialize method needs to package the results of our algorithm to pass to another instance of the same algorithm, and it needs to do so in a type that Hadoop can serialize.  In the case of a method like sum, this would be relatively simple: we would only need to pass the sum up to this point.  However, because we cannot be certain whether this instance of our algorithm has seen all the values, or seen them in the correct order, we actually need to serialize the whole table.  To do this, we create a list ofDoubleWritables, pack the window size at its head, and then each period and value.  This gives us a structure that's easy to unpack and merge with other lists with the same construction.

  public void merge(List<DoubleWritable> other)
    if (other == null)
    // if this is an empty buffer, just copy in other
    // but deserialize the list
    if (windowSize == 0)
        windowSize = (int)other.get(0).get();
        entries = new ArrayList<PrefixSumEntry>();
        // we're serialized as period, value, period, value
        for (int i = 1; i < other.size(); i+=2)
            PrefixSumEntry e = new PrefixSumEntry();
            e.period = (int)other.get(i).get();
            e.value = other.get(i+1).get();


Merging results is perhaps the most complicated thing we need to handle.  First, we check the case in which there was no partial result passed -- just return and continue.  Second, we check to see if this instance of PrefixSumMovingAverage already has a table.  If it doesn't, we can simply unpack the serialized result and treat it as our window.

   // if we already have a buffer, we need to add these entries
        // we're serialized as period, value, period, value
        for (int i = 1; i < other.size(); i+=2)
            PrefixSumEntry e = new PrefixSumEntry();
            e.period = (int)other.get(i).get();
            e.value = other.get(i+1).get();


The third case is the non-trivial one: if this instance has a table and receives a serialized table, we must merge them together.  Consider a Reduce task: as it receives outputs from multiple Map tasks, it needs to merge all of them together to form a larger table.  Thus, merge will be called many times to add these results and reassemble a larger time series.

// sort and recompute
    // update the table
    // prefixSums first
    double prefixSum = 0;
    for(int i = 0; i < entries.size(); i++)
        PrefixSumEntry thisEntry = entries.get(i);
        prefixSum += thisEntry.value;
        thisEntry.prefixSum = prefixSum;
        entries.set(i, thisEntry);


This part should look familiar, it's just like the add method.  Now that we have new entries in our table, we need to sort by period and recompute the moving averages.  In fact, the rest of the merge method is exactly like the add method, so we might consider putting sorting and recomputing in a separate method.

Orchestrating Partial Aggregation

We've got a clever little algorithm for computing moving average in parallel, but Hive can't do anything with it unless we create a UDAF that understands how to use our algorithm.  At this point, we need to start writing some real UDAF code.  As before, we extend a generic class, in this case GenericUDAFEvaluator.

  public static class GenericUDAFMovingAverageEvaluator extends GenericUDAFEvaluator {
        // input inspectors for PARTIAL1 and COMPLETE
        private PrimitiveObjectInspector periodOI;
        private PrimitiveObjectInspector inputOI;
        private PrimitiveObjectInspector windowSizeOI;
        // input inspectors for PARTIAL2 and FINAL
        // list for MAs and one for residuals
        private StandardListObjectInspector loi;

 As in the case of a UDTF, we create ObjectInspectors to handle type checking.  However, notice that we have inspectors for different states: PARTIAL1, PARTIAL2, COMPLETE, and FINAL.  These correspond to the different states in which our UDAF may be executing.  Since our serialized prefix sum table isn't the same input type as the values our add method takes, we need different type checking for each.

        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            super.init(m, parameters);
            // initialize input inspectors
            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE)
                assert(parameters.length == 3);
                periodOI = (PrimitiveObjectInspector) parameters[0];
                inputOI = (PrimitiveObjectInspector) parameters[1];
                windowSizeOI = (PrimitiveObjectInspector) parameters[2];

 Here's the beginning of our overrided initialization function.  We check the parameters for two modes, PARTIAL1 and COMPLETE.  Here we assume that the arguments to our UDAF are the same as the user passes in a query: the period, the input, and the size of the window.  If the UDAF instance is consuming the results of our partial aggregation, we need a different ObjectInspector.  Specifically, this one:

                loi = (StandardListObjectInspector) parameters[0];


Similar to the UDTF, we also need type checking on the output types -- but for both partial and full aggregation. In the case of partial aggregation, we're returning lists of DoubleWritables:

              // init output object inspectors
            if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2) {
                // The output of a partial aggregation is a list of doubles representing the
                // moving average being constructed.
                // the first element in the list will be the window size
                return ObjectInspectorFactory.getStandardListObjectInspector(


 But in the case of FINAL or COMPLETE, we're dealing with the types that will be returned to the Hive user, so we need to return a different output.  We're going to return a list of structs that contain the period, moving average, and residuals (since they're cheap to compute).

else {
                // The output of FINAL and COMPLETE is a full aggregation, which is a
                // list of DoubleWritable structs that represent the final histogram as
                // (x,y) pairs of bin centers and heights.
                ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
                ArrayList<String> fname = new ArrayList<String>();
                return ObjectInspectorFactory.getStandardListObjectInspector(
                 ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi) );


Next come methods to control what happens when a Map or Reduce task is finished with its data.  In the case of partial aggregation, we need to serialize the data.  In the case of full aggregation, we need to package the result for Hive users.

    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
      // return an ArrayList where the first parameter is the window size
      MaAgg myagg = (MaAgg) agg;
      return myagg.prefixSum.serialize();
    public Object terminate(AggregationBuffer agg) throws HiveException {
      // final return value goes here
      MaAgg myagg = (MaAgg) agg;
      if (myagg.prefixSum.tableSize() < 1)
        return null;
        ArrayList<DoubleWritable[]> result = new ArrayList<DoubleWritable[]>();
        for (int i = 0; i < myagg.prefixSum.tableSize(); i++)
double residual = myagg.prefixSum.getEntry(i).value - myagg.prefixSum.getEntry(i).movingAverage;
            DoubleWritable[] entry = new DoubleWritable[3];
            entry[0] = new DoubleWritable(myagg.prefixSum.getEntry(i).period);
            entry[1] = new DoubleWritable(myagg.prefixSum.getEntry(i).movingAverage);
entry[2] = new DoubleWritable(residual);
        return result;


We also need to provide instruction on how Hive should merge the results of partial aggregation.  Fortunately, we already handled this in our PrefixSumMovingAverage class, so we can just call that.

    public void merge(AggregationBuffer agg, Object partial) throws HiveException {
        // if we're merging two separate sets we're creating one table that's doubly long
        if (partial != null)
            MaAgg myagg = (MaAgg) agg;
            List<DoubleWritable> partialMovingAverage = (List<DoubleWritable>) loi.getList(partial);


Of course, merging and serializing isn't very useful unless the UDAF has logic for iterating over values.  The iterate method handles this and -- as one would expect -- relies entirely on thePrefixSumMovingAverage class we created.

    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
      assert (parameters.length == 3);
      if (parameters[0] == null || parameters[1] == null || parameters[2] == null)
      MaAgg myagg = (MaAgg) agg;
      // Parse out the window size just once if we haven't done so before. We need a window of at least 1,
      // otherwise there's no window.
      if (!myagg.prefixSum.isReady())
        int windowSize = PrimitiveObjectInspectorUtils.getInt(parameters[2], windowSizeOI);
        if (windowSize < 1)
            throw new HiveException(getClass().getSimpleName() + " needs a window size >= 1");
      //Add the current data point and compute the average
      int p = PrimitiveObjectInspectorUtils.getInt(parameters[0], inputOI);
      double v = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI);


Aggregation Buffers: Connecting Algorithms with Execution

One might notice that the code for our UDAF references an object of type AggregationBuffer quite a lot.  This is because the AggregationBuffer is the interface which allows us to connect our custom PrefixSumMovingAverage class to Hive's execution framework.  While it doesn't constitute a great deal of code, it's glue that binds our logic to Hive's execution framework.  We implement it as such:

 // Aggregation buffer definition and manipulation methods
    static class MaAgg implements AggregationBuffer {
        PrefixSumMovingAverage prefixSum;
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
      MaAgg result = new MaAgg();
      return result;


Using the UDAF

The goal of a good UDAF is that, no matter how complicated it was for us to implement, it's that it be simple for our users.  For all that code and parallel thinking, usage of the UDAF is very straightforward:

ADD JAR /mnt/shared/hive_udfs/dist/lib/moving_average_udf.jar;
CREATE TEMPORARY FUNCTION moving_avg AS 'com.oracle.hadoop.hive.ql.udf.generic.GenericUDAFMovingAverage';
#get the moving average for a single tail number
SELECT TailNum,moving_avg(timestring, delay, 4) FROM ts_example WHERE TailNum='N967CA' GROUP BY TailNum LIMIT 100;

Here we're applying the UDAF to get the moving average of arrival delay from a particular flight.  It's a really simple query for all that work we did underneath.  We can do a bit more and leverage Hive's abilities to handle complex types as columns, here's a query which creates a table of timeseries as arrays.

#create a set of moving averages for every plane starting with N
#Note: this UDAF blows up unpleasantly in heap; there will be data volumes for which you need to throw
#excessive amounts of memory at the problem
CREATE TABLE moving_averages AS
SELECT TailNum, moving_avg(timestring, delay, 4) as timeseries FROM ts_example



We've covered all manner of UDFs: from simple class extensions which can be written very easily, to very complicated UDAFs which require us to think about distributed execution and plan orchestration done by query engines.  With any luck, the discussion has provided you with the confidence to go out and implement your own UDFs -- or at least pay some attention to the complexities of the ones in use every day.

[Read More]

The data warehouse insider is written by the Oracle product management team and sheds lights on all thing data warehousing and big data.


« July 2016