Thursday Apr 09, 2015

ODI, Big Data SQL and Oracle NoSQL

Back in January Anuj posted an article here on using Oracle NoSQL via the Oracle database Big Data SQL feature. In this post, I guess you could call it part 2 of Anuj's I will follow up with how the Oracle external table is configured and how it all hangs together with manual code and via ODI. For this I used the Big Data Lite VM and also the newly released Oracle Data Integrator Big Data option. The BDA Lite VM 4.1 release uses version 3.2.5 of Oracle NoSQL - from this release I used the new declarative DDL for Oracle NoSQL to project the shape from NoSQL with some help from Anuj.

My goal for the integration design is to show a logical design in ODI and how KMs are used to realize the implementation and leverage Oracle Big Data SQL - this integration design supports predicate pushdown so I actually minimize data moved from my NoSQL store on Hadoop and the Oracle database - think speed and scalability! My NoSQL store contains user movie recommendations. I want to join this with reference data in Oracle which includes the customer information, movie and genre information and store in a summary table.

Here is the code to create and load the recommendation data in NoSQL - this would normally be computed by another piece of application logic in a real world scenario;

  • export KVHOME=/u01/nosql/kv-3.2.5
  • cd /u01/nosql/scripts
  • ./admin.sh

  • connect store -name kvstore
  • EXEC "CREATE TABLE recommendation( \
  •          custid INTEGER, \
  •          sno INTEGER, \
  •          genreid INTEGER,\
  •          movieid INTEGER,\
  •          PRIMARY KEY (SHARD(custid), sno, genreid, movieid))"
  • PUT TABLE -name RECOMMENDATION  -file /home/oracle/movie/moviework/bigdatasql/nosqldb/user_movie.json

The Manual Approach

This example is using the new data definition language in NoSQL. To make this accessible via Hive, users can create Hive external tables that use the NoSQL Storage Handler provided by Oracle. If this were manually coded in Hive, we could define the table as follows;

  • CREATE EXTERNAL TABLE IF NOT EXISTS recommendation(
  •                  custid INT,
  •                  sno INT,
  •                  genreId INT,
  •                  movieId INT)
  •           STORED BY 'oracle.kv.hadoop.hive.table.TableStorageHandler'
  •           TBLPROPERTIES  ( "oracle.kv.kvstore"="kvstore",
  •                            "oracle.kv.hosts"="localhost:5000",
  •                            "oracle.kv.hadoop.hosts"="localhost",
  •                            "oracle.kv.tableName"="recommendation");

At this point we have made NoSQL accessible to many components in the Hadoop stack - pretty much every component in the hadoop ecosystem can leverage the HCatalog entries defined be they Hive, Pig, Spark and so on. We are looking at Oracle Big Data SQL tho, so let's see how that is achieved. We must define an external table that uses either the SerDe or a Hive table, below you can see how the table has been defined in Oracle;

  • CREATE TABLE recommendation(
  •                  custid NUMBER,
  •                  sno NUMBER,
  •                  genreid NUMBER,
  •                  movieid NUMBER
  •          )
  •                  ORGANIZATION EXTERNAL
  •          (
  •                  TYPE ORACLE_HIVE
  •                  DEFAULT DIRECTORY DEFAULT_DIR
  •                  ACCESS PARAMETERS  (
  •                      com.oracle.bigdata.tablename=default.recommendation
  •                  )
  •          ) ;

Now we are ready to write SQL! Really!? Well let's see, below we can see the type of query we can do to join the NoSQL data with our Oracle reference data;

  • SELECT m.title, g.name, c.first_name
  • FROM recommendation r, movie m, genre g, customer c
  • WHERE r.movieid=m.movie_id and r.genreid=g.genre_id and r.custid=c.cust_id and r.custid=1255601 and r.sno=1 
  • ORDER by r.sno, r.genreid;

Great, we can now access the data from Oracle - we benefit from the scalability of the solution and minimal data movement! Let's make it better, let's make it more maintainable, flexibility to future changes and also accessible by more people by showing how it is done in ODI.

Oracle Data Integrator Approach

The data in NoSQL has a shape, we can capture that shape in ODI just as it is defined in NoSQL. We can then design mappings that manipulate the shape and load into whatever target we like. The SQL we saw above is represented in a logical mapping as below;


Users can use the same design experience as other data items and benefit from the mapping designer. They can join, map, transform just as normal. The ODI designer allows you to separate how you physically want this to happen from the logical semantics - this is all about giving you flexibility to change and adapt to new integration technologies and patterns.

In the physical design we can assign Knowledge Modules that take the responsibility of building the integration objects that we previously manually coded above. These KMs are generic so support all shapes and sizes of data items. Below you can see how the LKM is assigned for accessing Hive from Oracle;

This KM takes the role of building the external table - you can take this use it, customize it and the logical design stays the same. Why is that important? Integration recipes CHANGE as we learn more and developers build newer and better mechanisms to integrate. 

This KM takes care of creating the external table in Hive that access our NoSQL system. You could also have manually built the external table and imported this into ODI and used that as a source for the mapping, I want to show how the raw items can be integrated as the more metadata we have and you use to design the greater the flexibility in the future. The LKM Oracle NoSQL to Hive uses regular KM APIs to build the access object, here is a snippet from the KM;

  • create table <%=odiRef.getObjectName("L", odiRef.getTableName("COLL_SHORT_NAME"), "W")%>
  •  <%=odiRef.getColList("(", "[COL_NAME] [DEST_CRE_DT]", ", ", ")", "")%> 
  •           STORED BY 'oracle.kv.hadoop.hive.table.TableStorageHandler'
  •           TBLPROPERTIES  ( "oracle.kv.kvstore"="<%=odiRef.getInfo("SRC_SCHEMA")%>",
  •                            "oracle.kv.hosts"="<%=odiRef.getInfo("SRC_DSERV_NAME")%>",
  •                            "oracle.kv.hadoop.hosts"="localhost",
  •                            "oracle.kv.tableName"="<%=odiRef.getSrcTablesList("", "[TABLE_NAME]", ", ", "")%>");

You can see the templatized code versus literals, this still needs some work as you can see, can you spot some hard-wiring that needs fixed? ;-) This was using the 12.1.3.0.1 Big Data option of ODI so integration with Hive is much improved and it leverages the DataDirect driver which is also a big improvement. In this post I created a new technology for Oracle NoSQL in ODI, you can do this too for anything you want, I will post this technology on java.net and more so that as a community we can learn and share.

Summary 

Here we have seen how we can make seemingly complex integration tasks quite simple and leverage the best of data integration technologies today and importantly in the future!


Sunday Jul 13, 2014

New Big Data Features in ODI 12.1.3

Oracle Data Integrator (ODI) 12.1.3 extends its Hadoop capabilities through a number of exciting new cababilities. The new features include:

  • Loading of RDBMS data from and to Hadoop using Sqoop
  • Support for Apache HBase databases
  • Support for Hive append functionality
With these new additions ODI provides full connectivity to load, transform, and unload data in a Big Data environment.

The diagram below shows all ODI Hadoop knowledge modules with KMs added in ODI 12.1.3 in red. 

Sqoop support

Apache Sqoop is designed for efficiently transferring bulk amounts of data between Hadoop and relational databases such as Oracle, MySQL, Teradata, DB2, and others. Sqoop operates by creating multiple parallel map-reduce processes across a Hadoop cluster and connecting to an external database and transfering data from or to Hadoop storage in a partitioned fashion. Data can be stored in Hadoop using HDFS, Hive, or HBase. ODI adds two knowledge modules IKM SQL to Hive- HBase-File (SQOOP) and IKM File-Hive to SQL (SQOOP).

Loading from and to Sqoop in ODI is straightforward. Create a mapping with the database source and hadoop target (or vice versa) and apply any necessary transformation expressions.

In the physical design of the map, make sure to set the LKM of the target to LKM SQL Multi-Connect.GLOBAL and choose a Sqoop IKM, such as  IKM SQL to Hive- HBase-File (SQOOP). Change the MapReduce Output Directory IKM property MAPRED_OUTPUT_BASE_DIR to an appropriate HDFS dir. Review all other properties and tune as necessary. Using these simple steps you should be able to perform a quick Sqoop load. 

For more information please review the great ODI Sqoop article from Benjamin Perez-Goytia, or read the ODI 12.1.3 documentation about Sqoop.

HBase support

ODI adds support for HBase as a source and target. HBase metadata can be reverse-engineered using the RKM HBase knowledge module, and HBase can be used as source and target of a Hive transformation using LKM HBase to Hive and IKM Hive to HBase. Sqoop KMs also support HBase as a target for loads from a database. 

For more information please read the ODI 12.1.3 documentation about HBase.

Hive Append support

Prior to Hive 0.8 there had been no direct way to append data to an existing table. Prior Hive KMs emulated such logic by renaming the existing table and concatenating old and new data into a new table with the prior name. This emulated append operation caused major data movement, particularly when the target table has been large.

Starting with version 0.8 Hive has been enhanced to support appending. All ODI 12.1.3 Hive KMs have been updated to support the append capability by default but provide backward compatibility to the old behavior through the KM property HIVE_COMPATIBLE=0.7. 

Conclusion

ODI 12.1.3 provides an optimal and easy-to use way to perform data integration in a Big Data environment. ODI utilizes the processing power of the data storage and processing environment rather than relying on a proprietary transformation engine. This core "ELT" philosophy has its perfect match in a Hadoop environment, where ODI can provide unique value by providing a native and easy-to-use data integration envionment.

Tuesday Jul 09, 2013

ODI - Loading MongoDB (API as Target)

In this post I will show how to load documents into a MongoDB collection. The interface design looks just like all other ODI interfaces, but behind the scenes the KM configured in the physical design uses the MongoDB SDK (see MongoDB SDK here) to insert the documents. The target datastore below represents a MongoDB document, the columns are the keys in the document. Each row is inserted as a document, and each column is a key, the column value is the value. The ENAME value below is shown as a complex JSON value.

The IKM I have used is a multi-connect IKM, the source is a SQL data source and the target uses a MongoDB groovy command. The heart of the IKM to insert the documents into the collection has a SQL select as the source command and the following groovy code for the target command;

  1. import com.mongodb.*
  2. MongoClient mongoClient = new MongoClient(" <%=odiRef.getOption("MONGO_SERVER")%> ", <%=odiRef.getOption("MONGO_PORT")%> );
  3. DB db = mongoClient.getDB("<%=odiRef.getOption("MONGODB")%>");
  4. DBCollection coll = db.getCollection("<%=odiRef.getOption("MONGOCOLL")%>")
  5. BasicDBObject doc = new BasicDBObject();

  6. <%=odiRef.getColList(" ", "doc.put(\u0022[COL_NAME]\u0022, \u0022#[CX_COL_NAME]\u0022);", " \n ", "", "((INS and !TRG) and REW)")%>

  7. coll.insert(doc);

The odiRef.getColList method call above generates code for every target column, the code performs a doc.put invocation to add the key-value pairs into the document. For example this is the code generated and executed based on the interface design discussed above;

  1. import com.mongodb.*
  2. MongoClient mongoClient = new MongoClient("DALLAN-SVR", 27017);
  3. DB db = mongoClient.getDB("test");
  4. DBCollection coll = db.getCollection("testCollection")
  5. BasicDBObject doc = new BasicDBObject();

  6.  doc.put("EMPNO", "#EMPNO"); 
  7.  doc.put("ENAME", "#ENAME"); 
  8.  doc.put("JOB", "#JOB"); 
  9.  doc.put("MGR", "#MGR"); 
  10.  doc.put("HIREDATE", "#HIREDATE"); 
  11.  doc.put("SAL", "#SAL"); 
  12.  doc.put("COMM", "#COMM"); 
  13.  doc.put("DEPTNO", "#DEPTNO");

  14. coll.insert(doc);

 This is a simple illustration of how to load documents into MongoDB. We can go into the MongoDB command line and execute the command to see all objects in the collection and get the list of documents, below you can see a preview of executing db.testCollection.find()

  • { "_id" : ObjectId("51dc3ded6c4b9a5bd07d68a6"), "EMPNO" : "7369", "ENAME" : "{ NAME : SMITH, DESCR : 22 }", "JOB" : "CLERK", "MGR" : "7902", "HIREDATE" : "1980-12-17 00:00:00.0", "SAL" : "801", "COMM" : "", "DEPTNO" : "20" }
  • { "_id" : ObjectId("51dc3ded6c4b9a5bd07d68a7"), "EMPNO" : "7499", "ENAME" : "{ NAME : ALLEN, DESCR : 22 }", "JOB" : "SALESMAN", "MGR" : "7698", "HIREDATE" : "1981-02-20 00:00:00.0", "SAL" : "1601", "COMM" : "300", "DEPTNO" : "30" }

 You can see the key:value pairs in our document. For those MongoDB gurus, you'll notice in the 'complex' data illustration, this is really a string and not a MongoDB complex object - that discussion is for another day.

This post is not just about MongoDB, but also a useful post on how to integrate APIs as a target in a data flow. 

Wednesday Jan 02, 2013

ODI - Hive and NoSQL, the code

This post includes the Java client demonstration code used in the Hive and NoSQL post illustrated here. The BasicBigData.java code is a NoSQL client which populates a key value store that is queryable using the Hive external table from that post. It didn't take long to code and a few peeks at the NoSQL javadoc to get it going. You can take this java code and compile and run it (instructions for compiling are similar to the verification demo here - it is very easy).

The java code uses the NoSQL major/minor path constructor to describe the Key, below is a snippet to define the birthdate for Bob Smith;

  1. ArrayList<String> mjc1 = new ArrayList<String>();
  2. mjc1.add("Smith");
  3. mjc1.add("Bob");
  4. ...
  5. ArrayList<String> mnrb = new ArrayList<String>();
  6. mnrb.add("birthdate");
  7. ...
  8. store.put(Key.createKey(mjc1,mnrb),Value.createValue("05/02/1975".getBytes()));
  9. ...

In the referenced post, to actually aggregate the key values, we used the Hive collect_set aggregation function (see here for Hive aggregation functions). The collect_set aggregation function returns a set of objects with duplicates eliminated. To get the aggregation function behavior in ODI with the correct group by we must tell ODI about the Hive aggregation function. We can define a new language element for collect set in the Topology tree, define the element as a group function, and also define the expression for Hive under the Implementation tab;

We are then able to define expressions which reference this aggregation function and get the exact syntax defined in the earlier post. Below we see the Hive expressions using collect_set below;

From this design and the definition of the aggregation function in ODI, when its executed you can see the generated Hive QL with the correct columns in the grouping function;

The target Hive datastore in the interface I defined as been loaded with the key values from the NoSQL keystore, cool!

Those are a few of the missing pieces which would let you query NoSQL through Hive external tables, hopefully some useful pointers. 

Monday Dec 31, 2012

ODI - Hive and NoSQL

The Hive external table let's us do lots of cool stuff including processing data from NoSQL. We have seen how custom SerDes are used, Hive storage handlers also provide some cool capabilities. Using the Hive storage handler defined here, an external table can be defined to project data from a NoSQL key-value store. The external table can then be used as a source in ODI, very simple.

The illustration on github has the following data stored in a Oracle NoSQL Database (the key is the lastname/firstname etc):

  • /Smith/Bob/-/birthdate: 05/02/1975
  • /Smith/Bob/-/phonenumber: 1111-1111
  • /Smith/Bob/-/userid: 1
  • /Smith/Patricia/-/birthdate: 10/25/1967
  • /Smith/Patricia/-/phonenumber: 2222-2222
  • /Smith/Patricia/-/userid: 2
  • /Wong/Bill/-/birthdate: 03/10/1982
  • /Wong/Bill/-/phonenumber: 3333-3333
  • /Wong/Bill/-/userid: 3

Using the Hive external table and the custom storage handler for a key value store, we define a mask to project the data through the external table. 

  1. ADD JAR /home/oracle/kv/HiveKVStorageHandler.jar;
  2. CREATE EXTERNAL TABLE MY_KV_TABLE (lastname string, firstname string, birthdate string, phonenumber string, userid string)
  3.       STORED BY 'org.vilcek.hive.kv.KVHiveStorageHandler'
  4.       WITH SERDEPROPERTIES ("kv.major.keys.mapping" = "lastname,firstname", "kv.minor.keys.mapping" = "birthdate,phonenumber,userID")
  5.       TBLPROPERTIES ("kv.host.port" = "localhost:5000", "kv.name" = "kvstore");

There are a few interesting properties here;

  • we specify the keyvalue store using TBLPROPERTIES, identify the host/port and the keystore name (kvstore).
  • the SerDe properties contains the mapping of the keys to column names, you will get a row for each value of birthdate, phonenumber, userID

Fairly straightforward. We can then reverse engineer this into ODI, using the same mechanism as I described in previous posts here setting the ODI_HIVE_SESSION_JARS and so forth. The data projected looks like this;

  1. hive> SELECT * FROM MY_KV_TABLE;
  2. OK
  3. Smith     Patricia     10/25/1967     NULL NULL
  4. Smith Patricia NULL 2222-2222     NULL
  5. Smith Patricia NULL NULL 2
  6. Smith Bob 05/02/1975 NULL NULL
  7. Smith Bob NULL 1111-1111 NULL
  8. Smith Bob NULL NULL 1
  9. Wong Bill 03/10/1982 NULL NULL
  10. Wong Bill NULL 3333-3333 NULL
  11. Wong Bill NULL NULL 3

In ODI by defining the Hive collect_set function as an aggregation function, we can then aggregate the data and pivot the data to get it as a row;

  1. SELECT lastname, firstname, collect_set(birthdate)[0], collect_set(phonenumber)[0], collect_set(userid)[0]
  2.       FROM MY_KV_TABLE
  3.       GROUP BY lastname, firstname;

So another interesting illustration of external tables in Hive and what they can provide.

About

Learn the latest trends, use cases, product updates, and customer success examples for Oracle's data integration products-- including Oracle Data Integrator, Oracle GoldenGate and Oracle Enterprise Data Quality

Search

Archives
« August 2015
SunMonTueWedThuFriSat
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
     
Today