X

Move data between Oracle Database and Apache Hadoop using high speed connectors.

How to Load Oracle and Hive Tables with OHSH (Part 3 - Loading Oracle Tables)

In this tutorial we are going to talk in depth about loading Oracle tables with content from HDFS files or from Hive tables.  The focus will be on content that is stored as delimited text, but we will talk about loading other forms of content (e.g. parquet).

The Oracle target table and sources of data

The Oracle table we will be loading lives in a schema called "MOVIEDEMO" and is called MOVIE_RATINGS with various attributes relating customers to movies they saw and other related information.  This is a hash partitioned table where the hash is on the ACTIVITY column.

SQL> describe movie_ratings;
Name                                      Null?    Type
----------------------------------------- -------- ----------------------------
CUSTID                                            NUMBER
MOVIEID                                          NUMBER
GENREID                                        NUMBER
TIME                                                   DATE
RECOMMENDED                           NUMBER
ACTIVITY                                          NUMBER
RATING                                             NUMBER
SALES                                               NUMBER

We will be loading data from HDFS files containing delimited text and from the following Hive table whose storage is also delimited text. 

The HDFS file live under the path /user/${USER}/moviedata.   The rows look like this:

1129727,500,3,2012-07-01:00:30:38,1,5,,
1363545,27205,9,2012-07-01:00:35:09,1,11,,3.99
1036191,1149812,17,2012-07-01:00:37:35,0,4,,
1152235,9346,6,2012-07-01:00:39:49,1,2,,

The table definition of the Hive table looks like this:

hive> describe movie_ratings;
OK
custid                   int                                        
movieid                int                                          
genreid                int                                          
time                     string                                       
recommended     int                                        
activity                  int                                        
rating                    int                                        
sales                    float                                      
Time taken: 0.832 seconds, Fetched: 8 row(s)

The Hive delimited text looks a little different from the HDFS text above.  By default, Hive uses the convention of "\N" for nulls, and uses a space character for field delimiters.

1402363 11547   6       2012-08-17:12:38:32     1       5       \N      \N
1102408 497     3       2012-08-17:12:40:35     1       11      \N      2.99
1172714 6978    7       2012-08-17:12:40:37     1       11      \N      1.99
1129253 4688    15      2012-08-17:12:41:52     0       11      \N      1.99

Delimited text assumptions

All the examples below showing loading of HDFS files of delimited text makes the assumption that the declared order of columns in an Oracle table map to the physical ordering of fields in a delimited text record in an HDFS file (i.e. the first field maps to the first Oracle column and so on).  If that is not the case, OHSH has a language construct called "loadermap" that solves this problem, but it will be discussed separately in Part 5 of this tutorial.

Loading from Hive tables is less restrictive regarding order.  As long as the column names in both the Oracle table and the Hive table are the same, the load should work regardless of the order of column name declarations in the table.  If the column names are not identical, "loadermap" can again be used to solve the problem.

Loading using OHSH's "jdbc" or "directpath" method

When loading Oracle tables using "jdbc" or "directpath", you are using a Hadoop map or map/reduce job to read data living in Hadoop and then load it into Oracle.

Either a map-only or a map/reduce job is launched on the Hadoop cluster and data is read by map tasks using standard Hadoop input splits.

The data is converted to Oracle data types and injected into Oracle using Oracle SQL with JDBC or using Oracle OCI interface.  A "jdbc" method will be a map-only job if the target table is not partitioned.  If it is partitioned (and typically large) the "directpath" method should be used.  This runs a full map/reduce job where the reduce stage uses Hadoop shuffle/sort to segregate data by partition.  This is followed by injection of data into Oracle partitions using OCI Direct Path.  This interface bypasses Oracle SQL and is a faster and more efficient way of injecting data into an Oracle table.

Using OHSH we need to create a JDBC resource which is used by both "jdbc" and "directpath" to create a connection to Oracle.  Below we are using a TNS alias called "inst1" and using a wallet to connect to the Oracle database transparently as Oracle user "MOVIEDEMO".

ohsh>create oracle jdbc resource omovies connectid="inst1"

For convenience, we will create a SQL*Plus resource to interrogate the table after the load.

ohsh>create sqlplus resource sql connectid="inst1"

The text file we are loading requires us to define the default data format which will be applied when Java needs to convert the string to an Oracle date.  We use the JDBC "dateformat" to do this.

ohsh>set dateformat "yyyy-MM-dd HH:mm:ss"

For the purposes of understanding what is going on under the covers we will set output to verbose.

ohsh>set outputlevel verbose

Now we run the load job specifying the target table and the path where the movie data lives.

ohsh>load oracle table omovies:movie_ratings from path hadoop0:/user/${USER}/moviedemo/movie_ratings_delimitedtext using jdbc

Understanding loading Oracle tables using "jdbc" or "directpath" methods

To better understand the what OHSH is doing during a load command you will want to set OHSH "outputlevel" to "verbose".  For "jdbc" and "directpath" the output will include the standard output of a Hadoop Map/Reduce job.  (Output is highlighted in green.)

A banner describes the Oracle SmartLoader job id and the user's identity both locally and in Hadoop.

##################################################################################
Starting SmartLoader Job OSL_161202_123555
Local OS user: rhanckel
Hadoop user:   rhanckel
##################################################################################

This is followed by a brief statement of what the load job is going to do:

Loading Oracle table "MOVIEDEMO"."MOVIE_RATINGS"
  From HDFS files located at /user/rhanckel/moviedemo/movie_ratings_delimitedtext
  Using Oracle Loader for Hadoop (OLH) JDBC

These are the Hadoop map-reduce configuration settings for the job. In general they are largely invariant across jobs.

--------------------------------------------------------------------------------------------------------------------------------
Hadoop MapReduce Configuration Settings
--------------------------------------------------------------------------------------------------------------------------------

mapreduce.am.max-attempts=2
mapreduce.app-submission.cross-platform=false
mapreduce.client.completion.pollinterval=5000
mapreduce.client.genericoptionsparser.used=true
mapreduce.client.output.filter=FAILED
mapreduce.client.progressmonitor.pollinterval=1000

[ More Hadoop properties than you will ever want to know... ]

yarn.app.mapreduce.client-am.ipc.max-retries=3
yarn.app.mapreduce.client-am.ipc.max-retries-on-timeouts=3
yarn.app.mapreduce.client.job.max-retries=0
yarn.app.mapreduce.client.job.retry-interval=2000
yarn.app.mapreduce.client.max-retries=3
yarn.app.mapreduce.shuffle.log.backups=0
yarn.app.mapreduce.shuffle.log.limit.kb=0
yarn.app.mapreduce.shuffle.log.separate=true
yarn.app.mapreduce.task.container.log.backups=0

These settings are important and explain what properties are being set for the particular load.  They are very load specific.  The "jdbc" and "directpath" methods use OLH, hence OLH configuration settings come into play.

----------------------------------------------------------------------------------
OLH Configuration Settings
----------------------------------------------------------------------------------

oracle.hadoop.loader.connection.tns_admin=/ade/rhanckel_hadoop11/oracle/work
oracle.hadoop.loader.connection.url=jdbc:oracle:thin:@inst1
oracle.hadoop.loader.connection.wallet_location=/ade/rhanckel_hadoop11/oracle/work
oracle.hadoop.loader.defaultDateFormat=yyyy-MM-dd HH:mm:ss
oracle.hadoop.loader.input.fieldNames=F0,F1,F2,F3,F4,F5,F6,F7
oracle.hadoop.loader.input.fieldTerminator=\u002c
oracle.hadoop.loader.loaderMap."ACTIVITY".field=F5
oracle.hadoop.loader.loaderMap."CUSTID".field=F0
oracle.hadoop.loader.loaderMap."GENREID".field=F2
oracle.hadoop.loader.loaderMap."MOVIEID".field=F1
oracle.hadoop.loader.loaderMap."RATING".field=F6
oracle.hadoop.loader.loaderMap."RECOMMENDED".field=F4
oracle.hadoop.loader.loaderMap."SALES".field=F7
oracle.hadoop.loader.loaderMap."TIME".field=F3
oracle.hadoop.loader.loaderMap.columnNames=
"CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES"
oracle.hadoop.loader.loaderMap.targetTable="MOVIEDEMO"."MOVIE_RATINGS"
oracle.hadoop.loader.logBadRecords=true
oracle.hadoop.loader.rejectLimit=-1

The information below explains how OHSH thinks you are connected to Oracle and whether you are relying on Oracle Wallet and TNS settings on the Hadoop cluster.

--------------------------------------------------------------------------------------------------------------------------------
Oracle Database Connectivity Settings
--------------------------------------------------------------------------------------------------------------------------------

TNS usage: OLH job using client TNS path on hadoop cluster.
Wallet usage: OLH job using client wallet path on hadoop cluster.

Finally, the job executes.  This is mostly OLH/Hadoop generated output which shows the map/reduce job's progress in the Hadoop cluster.

--------------------------------------------------------------------------------------------------------------------------------
Begin OLH execution at 2016-12-02:12:35:55
--------------------------------------------------------------------------------------------------------------------------------

--------------------------------------------------------------------------------------------------------------------------------
OLH MapReduce job is submitted to the Hadoop cluster to load an Oracle table using JDBC
--------------------------------------------------------------------------------------------------------------------------------

[INFO] 2016-12-02 12:35:55,425 [oracle.hadoop.loader.OraLoader]  Oracle Loader for Hadoop Release 3.8.0 - Production

 Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved.

[INFO] 2016-12-02 12:35:55,438 [oracle.hadoop.loader.OraLoader]  Built-Against: hadoop-2.2.0 hive-0.13.0 avro-1.8.1 jackson-1.8.8
[INFO] 2016-12-02 12:35:55,757 [oracle.hadoop.loader.OraLoader]  oracle.hadoop.loader.loadByPartition is disabled because table: MOVIE_RATINGS_NO_PART is not partitioned
[INFO] 2016-12-02 12:35:55,757 [oracle.hadoop.loader.OraLoader]  oracle.hadoop.loader.enableSorting disabled, no sorting key provided
[INFO] 2016-12-02 12:35:55,758 [oracle.hadoop.loader.OraLoader]  Reduce tasks set to 0 because of no partitioning or sorting. Loading will be done in the map phase.
[INFO] 2016-12-02 12:35:55,763 [loader.lib.output.DBOutputFormat]  Setting map tasks speculative execution to false for : oracle.hadoop.loader.lib.output.JDBCOutputFormat
[WARN] 2016-12-02 12:35:56,016 [oracle.hadoop.loader.OraLoader]  Sampler is disabled because the number of reduce tasks is less than two. Job will continue without sampled information.
[INFO] 2016-12-02 12:35:56,016 [oracle.hadoop.loader.OraLoader]  Submitting OraLoader job OSL_161202_123555
[INFO] 2016-12-02 12:35:56,016 [hadoop.metrics.jvm.JvmMetrics]  Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
[INFO] 2016-12-02 12:35:56,558 [mapreduce.lib.input.FileInputFormat]  Total input paths to process : 1
[INFO] 2016-12-02 12:35:56,622 [apache.hadoop.mapreduce.JobSubmitter]  number of splits:1
[INFO] 2016-12-02 12:35:56,674 [apache.hadoop.mapreduce.JobSubmitter]  Submitting tokens for job: job_local1428083019_0005
[INFO] 2016-12-02 12:35:58,215 [apache.hadoop.mapred.LocalDistributedCacheManager]  Localized
[INFO] 2016-12-02 12:35:58,542 [apache.hadoop.mapreduce.Job]  The url to track
the job:
http://localhost:8080/

[Lots of log output...]

[INFO] 2016-12-02 12:35:58,542 [apache.hadoop.mapred.LocalJobRunner]  OutputCommitter set in config null
[INFO] 2016-12-02 12:35:58,544 [mapreduce.lib.output.FileOutputCommitter]  File Output Committer Algorithm version is 1
[INFO] 2016-12-02 12:35:58,544 [apache.hadoop.mapred.LocalJobRunner]  OutputCommitter is oracle.hadoop.loader.lib.output.DBOutputCommitter
[INFO] 2016-12-02 12:35:58,548 [apache.hadoop.mapred.LocalJobRunner]  Waiting for map tasks
[INFO] 2016-12-02 12:35:58,548 [apache.hadoop.mapred.LocalJobRunner]  Starting task: attempt_local1428083019_0005_m_000000_0
[INFO] 2016-12-02 12:35:58,570 [mapreduce.lib.output.FileOutputCommitter]  File Output Committer Algorithm version is 1
[INFO] 2016-12-02 12:35:58,571 [apache.hadoop.mapred.Task]   Using ResourceCalculatorProcessTree : [ ]
[INFO] 2016-12-02 12:35:58,571 [apache.hadoop.mapred.MapTask]  Processing split:
file:/user/rhanckel/moviedemo/movie_ratings_delimitedtext/tkhp_moviedata.txt:0+1675410
[INFO] 2016-12-02 12:35:58,716 [loader.lib.output.DBOutputFormat]  conf prop: defaultExecuteBatch: 100
[INFO] 2016-12-02 12:35:58,717 [loader.lib.output.DBOutputFormat]  conf prop: loadByPartition: false
[WARN] 2016-12-02 12:35:58,718 [hadoop.loader.utils.FSLogger]  Internal error: log stream already closed
[INFO] 2016-12-02 12:35:58,727 [loader.lib.output.DBOutputFormat]  Insert statement: INSERT INTO "MOVIEDEMO"."MOVIE_RATINGS_NO_PART" ("CUSTID", "MOVIEID", "GENREID", "TIME", "RECOMMENDED", "ACTIVITY", "RATING", "SALES") VALUES (?, ?, ?, ?, ?, ?, ?, ?)
[INFO] 2016-12-02 12:35:59,543 [oracle.hadoop.loader.OraLoader]  map 0% reduce 0%
[INFO] 2016-12-02 12:36:02,491 [apache.hadoop.mapred.LocalJobRunner] 
[INFO] 2016-12-02 12:36:02,514 [apache.hadoop.mapred.Task]  Task:attempt_local1428083019_0005_m_000000_0 is done. And is in the process of committing
[INFO] 2016-12-02 12:36:02,522 [apache.hadoop.mapred.LocalJobRunner] 
[INFO] 2016-12-02 12:36:02,522 [apache.hadoop.mapred.Task]  Task attempt_local1428083019_0005_m_000000_0 is allowed to commit now
[INFO] 2016-12-02 12:36:02,526 [loader.lib.output.JDBCOutputFormat]  Committed work for task attempt attempt_local1428083019_0005_m_000000_0
[INFO] 2016-12-02 12:36:02,529 [mapreduce.lib.output.FileOutputCommitter]  Saved output of task 'attempt_local1428083019_0005_m_000000_0' to
file:/user/rhanckel/smartloader/jobhistory/oracle/MOVIEDEMO/MOVIE_RATINGS_NO_PART/OSL_161202_123555/_temporary/0/task_local1428083019_0005_m_000000
[INFO] 2016-12-02 12:36:02,533 [apache.hadoop.mapred.LocalJobRunner]  map
[INFO] 2016-12-02 12:36:02,534 [apache.hadoop.mapred.Task]  Task 'attempt_local1428083019_0005_m_000000_0' done.
[INFO] 2016-12-02 12:36:02,534 [apache.hadoop.mapred.LocalJobRunner]  Finishing task: attempt_local1428083019_0005_m_000000_0
[INFO] 2016-12-02 12:36:02,534 [apache.hadoop.mapred.LocalJobRunner]  map task executor complete.
[INFO] 2016-12-02 12:36:02,544 [oracle.hadoop.loader.OraLoader]  map 100% reduce 0%
[INFO] 2016-12-02 12:36:03,544 [oracle.hadoop.loader.OraLoader]  Job complete: OSL_161202_123555 (job_local1428083019_0005)
[INFO] 2016-12-02 12:36:03,545 [oracle.hadoop.loader.OraLoader]  Counters: 15
        File System Counters
                FILE: Number of bytes read=1507643076
                FILE: Number of bytes written=1506266096
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
        Map-Reduce Framework
                Map input records=39716
                Map output records=39716
                Input split bytes=113
                Spilled Records=0
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=23
                Total committed heap usage (bytes)=504889344
        File Input Format Counters
                Bytes Read=1675410
        File Output Format Counters
                Bytes Written=55072

The end of the job indicates whether it was successful or not and the load time.

Load operation OSL_161202_123555 successful. Load time = 0:00:08

--------------------------------------------------------------------------------
End OLH execution at 2016-12-02:12:36:03
--------------------------------------------------------------------------------

################################################################################
Ending SmartLoader Job OSL_161202_123555
Local OS user: rhanckel
Hadoop user:   rhanckel
################################################################################

As you can see the output shows you the progress of the work capturing standard Hadoop map/reduce output generated by Hadoop.  The general structure of verbose output when loading using "jdbc" or "directpath" looks the same with minor differences about properties being set.

Understanding loading Oracle tables using "exttab" method

When loading tables using "exttab" the execution is the opposite from "jdbc" or "directpath". Rather than writing data into Oracle from Hadoop, Oracle reads data from Hadoop using an Oracle external tables (generated by OSCH).  For the "exttab" method the data lives as delimited text files living in HDFS.

The sequence of OHSH commands is similar to loads using "jdbc" and "directpath" with two exceptions:  rather then set a Java "dateformat", you need to set the "datemask".  (This is the date format descriptor that is used by Oracle external tables.)  For the method you need to specify "exttab" (i.e. Oracle external table).

ohsh>set datemask "YYYY-MM-DD HH24:MI:SS"
ohsh>set outputlevel verbose
ohsh>load oracle table omovies:movie_ratings
from path \
  hadoop0:/user/${USER}/moviedemo/movie_ratings_delimitedtext using exttab

Below the output reveals what is going on under the covers.  Properties are set for OSCH, an Oracle External table is created on the fly that maps to data living in HDFS.  Default formats are set for NLS_DATE based on the "set datemask" statement in OHSH.  The external table is created which will read data living in /user/${USER}/moviedemo/movie_ratings_delimitedtext in HDFS. 

The load consists of an Oracle INSERT statement being feed rows from a SELECT statement against the external table.

After the load is committed the external table and other load artifacts are deleted in Oracle.

###############################################################################
Starting SmartLoader Job OSL_161211_205819
Local OS user: rhanckel
Hadoop user:   rhanckel
###############################################################################

Loading Oracle table "MOVIEDEMO"."MOVIE_RATINGS"
  From Hive table moviedemo.movie_ratings
  Using Oracle SQL Connector for Hadoop (OSCH) external tables

--------------------------------------------------------------------------------------------------------------------------------
OSCH Configuration Settings
--------------------------------------------------------------------------------------------------------------------------------

oracle.hadoop.connection.url=jdbc:oracle:thin:@inst1
oracle.hadoop.connection.user="MOVIEDEMP"
oracle.hadoop.exttab.colMap."ACTIVITY".columnName=ACTIVITY
oracle.hadoop.exttab.colMap."ACTIVITY".columnType=NUMBER
oracle.hadoop.exttab.colMap."CUSTID".columnName=CUSTID
oracle.hadoop.exttab.colMap."CUSTID".columnType=NUMBER
oracle.hadoop.exttab.colMap."GENREID".columnName=GENREID
oracle.hadoop.exttab.colMap."GENREID".columnType=NUMBER
oracle.hadoop.exttab.colMap."MOVIEID".columnName=MOVIEID
oracle.hadoop.exttab.colMap."MOVIEID".columnType=NUMBER
oracle.hadoop.exttab.colMap."RATING".columnName=RATING
oracle.hadoop.exttab.colMap."RATING".columnType=NUMBER
oracle.hadoop.exttab.colMap."RECOMMENDED".columnName=RECOMMENDED
oracle.hadoop.exttab.colMap."RECOMMENDED".columnType=NUMBER
oracle.hadoop.exttab.colMap."SALES".columnName=SALES
oracle.hadoop.exttab.colMap."SALES".columnType=NUMBER
oracle.hadoop.exttab.colMap."TIME".columnName=TIME
oracle.hadoop.exttab.colMap."TIME".columnType=DATE
oracle.hadoop.exttab.colMap."TIME".dateMask=YYYY-MM-DD HH24:MI:SS
oracle.hadoop.exttab.createBadFiles=true
oracle.hadoop.exttab.createLogFiles=false
oracle.hadoop.exttab.defaultDirectory="MOVIEDEMO_DEFAULT_DIR"
oracle.hadoop.exttab.hive.databaseName=moviedemo
oracle.hadoop.exttab.hive.tableName=movie_ratings
oracle.hadoop.exttab.locationFileCount=4
oracle.hadoop.exttab.skipColPrefix=OSL_SKIPCOL_
oracle.hadoop.exttab.sourceType=hive
oracle.hadoop.exttab.tableName="OSL_161211_205819_EXT"

--------------------------------------------------------------------------------------------------------------------------------
Setting NLS_DATE_FORMAT for Oracle session using OHSH datemask
--------------------------------------------------------------------------------------------------------------------------------

ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'

--------------------------------------------------------------------------------------------------------------------------------
Setting NLS_TIMESTAMP_FORMAT for Oracle session using OHSH timestampmask
--------------------------------------------------------------------------------------------------------------------------------

ALTER SESSION SET NLS_TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF'

--------------------------------------------------------------------------------------------------------------------------------
Setting NLS_TIMESTAMP_TZ_FORMAT for Oracle session using OHSHtimestampmask
--------------------------------------------------------------------------------------------------------------------------------

ALTER SESSION SET NLS_TIMESTAMP_TZ_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF'

--------------------------------------------------------------------------------------------------------------------------------
Create OSCH external table(s) used for accessing Hadoop content
--------------------------------------------------------------------------------------------------------------------------------

Oracle SQL Connector for HDFS Release 3.7.0 - Production

 Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved.

The create table command succeeded.

User: MOVIEDEMO performed the following actions in schema: MOVIEDEMO

CREATE TABLE "MOVIEDEMO"."OSL_161211_205819_EXT"
(
"CUSTID"                         NUMBER,
"MOVIEID"                        NUMBER,
"GENREID"                        NUMBER,
"TIME"                              DATE,
"RECOMMENDED"           NUMBER,
"ACTIVITY"                       NUMBER,
"RATING"                         NUMBER,
"SALES"                          NUMBER
)
ORGANIZATION EXTERNAL
(
   TYPE ORACLE_LOADER
   DEFAULT DIRECTORY "MOVIEDEMO_DEFAULT_DIR"
   ACCESS PARAMETERS
   (
     RECORDS DELIMITED BY 0X'0A'
     NOLOGFILE
     CHARACTERSET AL32UTF8
     PREPROCESSOR "OSCH_BIN_PATH":'hdfs_stream'
     FIELDS TERMINATED BY 0X'09'
     MISSING FIELD VALUES ARE NULL
     (
       "CUSTID" CHAR NULLIF "CUSTID"=0X'5C4E',
       "MOVIEID" CHAR NULLIF "MOVIEID"=0X'5C4E',
       "GENREID" CHAR NULLIF "GENREID"=0X'5C4E',
       "TIME" CHAR DATE_FORMAT DATE MASK 'YYYY-MM-DD HH24:MI:SS' NULLIF "TIME"=0X'5C4E',
       "RECOMMENDED" CHAR NULLIF "RECOMMENDED"=0X'5C4E',
       "ACTIVITY" CHAR NULLIF "ACTIVITY"=0X'5C4E',
       "RATING" CHAR NULLIF "RATING"=0X'5C4E',
       "SALES" CHAR NULLIF "SALES"=0X'5C4E'
     )
   )
   LOCATION
   (
     'osch-20161211085828-9526-1'
   )
) PARALLEL REJECT LIMIT UNLIMITED;

The following location files were created.

osch-20161211085828-9526-1 contains 1 URI, 1813410 bytes

     1813410

--------------------------------------------------------------------------------------------------------------------------------
Begin OSCH execution  at 2016-12-11:20:58:30
--------------------------------------------------------------------------------------------------------------------------------

--------------------------------------------------------------------------------------------------------------------------------
Begin single load (1/1) at 2016-12-11:20:58:30
--------------------------------------------------------------------------------------------------------------------------------

ALTER SESSION FORCE PARALLEL DML PARALLEL 4
ALTER SESSION FORCE PARALLEL QUERY PARALLEL 4
INSERT /*+ append pq_distribute("MOVIEDEMO"."MOVIE_RATINGS", none) */ INTO "MOVIEDEMO"."MOVIE_RATINGS"("CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES") SELECT "CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES" FROM "OSL_161211_205819_EXT"

Transaction is committed.

--------------------------------------------------------------------------------------------------------------------------------
End single load at 2016-12-11:20:58:34. Elapsed load time = 0:00:04.
--------------------------------------------------------------------------------------------------------------------------------

Load operation OSL_161211_205819 successful. Load time = 0:00:04

--------------------------------------------------------------------------------------------------------------------------------
End OSCH execution at 2016-12-11:20:58:34
--------------------------------------------------------------------------------------------------------------------------------

--------------------------------------------------------------------------------------------------------------------------------
Executing OSCH post-load cleanup
--------------------------------------------------------------------------------------------------------------------------------

Oracle SQL Connector for HDFS Release 3.7.0 - Production

 Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved.

The drop command was successful.

User: MOVIEDEMO dropped OSCH external table MOVIEDEMO.OSL_161211_205819_EXT and all associated location files.

Successfully dropped transient tables/views created for the load

##################################################################################################
Ending SmartLoader Job OSL_161211_205819
Local OS user: rhanckel
Hadoop user:   rhanckel
##################################################################################################

Loading Hive tables using "jdbc" and "directpath"

Loading Oracle table from Hive tables is similar in syntax as when loading from HDFS files.  The difference is the source is the name of a Hive table living in either the "default" or some other Hive database.

If loading from a table in Hive that is not in the "default" Hive database (i.e. OHSH Hive resource "hmovies"), then you have to create an OHSH resource that identifies the Hive database.  Here we create an OHSH resources called "hmovies" that maps to a Hive database "moviedemo".

ohsh>create hive resource hmovies connectionurl = "jdbc:hive2:///moviedemo;"

Again, since OLH is doing the load the Java dateformat needs to be set accordingly.

ohsh>set dateformat "yyyy-MM-dd HH:mm:ss" 

Finally you execute the load command with a from clause that specifies data is coming from a Hive table.

ohsh>load oracle table omovies:movie_ratings \
     from hive table hmovies:movie_ratings using directpath 

A "jdbc" load is virtually identical.

ohsh>load oracle table omovies:movie_ratings from hive table hmovies:movie_ratings using jdbc

Loading Hive tables using "exttab"

Loading Oracle tables from Hive tables using using "exttab" looks like this:

ohsh>set datemask "YYYY-MM-DD HH24:MI:SS"
ohsh>load oracle table omovies:movie_ratings \
     from hive table hmovies:movie_ratings using exttab

Limitations when loading Hive Tables

All OHSH methods load Hive tables by using Hive metadata to determine the physical format of the data and where it is stored.

The "jdbc", "directpath", and "exttab" commands above assume that the columns names in the Oracle table are the same column names in the Hive table, and that the Hive native data types of the Hive columns are convertible to Oracle data types. OLH "jdbc" and "directpath" are also more functional than "exttab" in that loads are not restricted by the underlying storage of the Hive table.  The Hive table can be delimited text, parquet, orc, and rcfiles.

The "exttab" method is generally restricted to loading Hive tables that are delimited text.  (It can read data pump files living in HDFS, but that is a special case explained in later parts of this tutorial.)  It reads the HDFS files living under a Hive table directly using OSCH which streams delimited text into a vanilla Oracle external table.

In the examples above, the assumption is that the column name of the target table is the column name of the Hive table.  If this assumption does not hold the OHSH "loadermap" needs to be used to provide an explicit column mapping between the Hive source and Oracle target tables.

Loading Hive partitions

If your Hive table is partitioned OHSH can either load all partitions or those that satisfy a partition filter expression.

Note that partitions manifest themselves differently in Hive and Oracle.  

In Hive, a partition serves as an organizational structure to segregate data files within an HDFS file system.  Data files belonging to a single Hive partition literally map to a unique HDFS directory dedicated to the partition.  This also means that the partition column value does not live as data with the values of other columns of the table.  It is a virtual column.

In Oracle, the partition value is a column whose value is stored with other column data in a table.  

Hive partitions are independent of partitions in Oracle.  For example a Hive table maybe partitioned while the Oracle table may not (and vice versa). If you want the partitioning to be the same you have to create the Hive table and Oracle table with the same partition columns.   Practically speaking, only Oracle list partitions can be mapped to Hive partitions.

A Hive partition example

Here is a Hive table that is partitioned the "state" column representing U.S.New England states (e.g. "CT" is Connecticut, "VT" is Vermont").   

hive> describe movie_ratings_part;
OK
custid                   int                           
movieid                int                                     
genreid                 int
time                      timestamp                                      
activity                 int                               
rating                    int                                    
sales                     float                                    
state                    string                                      

# Partition Information         
# col_name              data_type               comment             

state                   string                            

Loading all the partitions in a Hive table into an Oracle table has the same syntax as when loading non-partitioned tables. To load a subset of partitions, you need to specifiy a partition filter clause.  Again, except for the "dateformat"/"datemask" difference the syntax is identical for all methods.

ohsh> set dateformat "yyyy-MM-dd HH:mm:ss"
ohsh> load oracle table omovies:movie_ratings_part from hive table hmovies:movie_ratings_part partition filter = "(state=\"CT\" or state=\"VT\")"  using directpath

ohsh> set dateformat "yyyy-MM-dd HH:mm:ss"
ohsh> load oracle table omovies:movie_ratings_part from hive table hmovies:movie_ratings_part partition filter = "(state=\"CT\" or state=\"VT\")" using jdbc

ohsh>set datemask "YYYY-MM-DD HH24.MI.SS"
ohsh>load oracle table omovies:movie_ratings_part from hive table hmovies:movie_ratings_part partition filter = "(state=\"CT\" or state=\"VT\")" using exttab

The execution output looks like standard map/reduce output for "jdbc" and "directpath" with some extra verbiage highlighting that Hive partition filters are being evaluated.  The "exttab" output looks different since it creates one OSCH external table for each partition being loaded, and iterates through each external table, loading one partition at a time.  So if N partitions are being loaded, expect to see N Oracle INSERT statements selecting data from N external tables.

--------------------------------------------------------------------------------------------------------------------------------
Begin OSCH execution  at 2017-06-11:00:33:47
--------------------------------------------------------------------------------------------------------------------------------

--------------------------------------------------------------------------------------------------------------------------------
Begin single load (1/2) at 2017-06-11:00:33:47
--------------------------------------------------------------------------------------------------------------------------------


ALTER SESSION FORCE PARALLEL DML PARALLEL 4
ALTER SESSION FORCE PARALLEL QUERY PARALLEL 4
INSERT /*+ append pq_distribute("OLHP"."MOVIE_RATINGS_PART", none) */ INTO "MOVIEDEMO"."MOVIE_RATINGS_PART"("CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES","STATE") SELECT
"CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES","STATE" FROM "OSL_170611_003337_EXT_1"

Transaction is committed.
Loaded Hive partition: STATE=CT


--------------------------------------------------------------------------------------------------------------------------------
End single load at 2017-06-11:00:33:51. Elapsed load time = 0:00:04.
--------------------------------------------------------------------------------------------------------------------------------

--------------------------------------------------------------------------------------------------------------------------------
Begin single load (2/2) at 2017-06-11:00:33:51
--------------------------------------------------------------------------------------------------------------------------------


INSERT /*+ append pq_distribute("MOVIEDEMO"."MOVIE_RATINGS_PART", none) */ INTO "MOVIEDEMO"."MOVIE_RATINGS_PART"("CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES","STATE") SELECT
"CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES","STATE" FROM "OSL_170611_003337_EXT_2"

Transaction is committed.
Loaded Hive partition: STATE=VT


--------------------------------------------------------------------------------------------------------------------------------
End single load at 2017-06-11:00:33:54. Elapsed load time = 0:00:02.
--------------------------------------------------------------------------------------------------------------------------------

Load operation OSL_170611_003337 successful. Load time = 0:00:07

--------------------------------------------------------------------------------------------------------------------------------
End OSCH execution at 2017-06-11:00:33:54
--------------------------------------------------------------------------------------------------------------------------------

 

Cross schema load

So far all the examples assume that the Oracle table being loaded from data in HDFS or in Hive lives in the schema of the Oracle user connecting to the database with JDBC.  An Oracle user can load into a table that lives in another Oracle schema assuming the user has been granted the appropriate privileges.

In OHSH this is syntactically expressed by providing a fully qualified table name that "dot qualifies" the table name with the schema that owns it.

For example, suppose you are the Oracle user "MOVIEDEMO" and want to load HDFS data into "movie_ratings" living in the "SCOTT" schema.  The load command would look like this.

ohsh>load oracle table omovies:scott.movie_ratings from path hadoop0:/user/${USER}/moviedemo/movie_ratings_delimitedtext using jdbc

Note that all OHSH identifiers for Oracle tables, schema, directories, and column names can be double quoted to preserve case and to specify odd characters.  In general case sensitivity and odd characters are never a good idea, especially when you want to create similar table and column names in both Oracle and in Hive.

Where to look if there is a load failure

When running loads, verbose console output is the first place to see where a load failed.  If the load failed because of some underlying failure in the OLH or OSCH job (e.g. bad data) you need to either look in OLH job log files living in Hadoop when running "jdbc" or "directpath", or look in Oracle directories supporting logging of Oracle external tables.

For OLH, the logs describing the load are located in HDFS under "/user/${USER}/smartloader/jobhistory/oracle" subdirectories.  The subdirectories are partitioned by Oracle schema, Oracle table name and the specific OHSH job prefaced by "OSL_" and some unique job id.

Below is the location of a "directpath" load of Oracle table MOVIEDEMO.MOVIE_RATINGS whose load id is OSL_170112_12101.  The oraloader-report.txt describes the load at a high level. The
separate OCI log files are outputs of Hadoop reduce tasks participating in a load.

hadoop fs -ls /user/rhanckel/smartloader/jobhistory/oracle/MOVIEDEMO/MOVIE_RATINGS/OSL_170112_121015/_olh
oraloader-00000-oci.log  oraloader-00001-oci.log  oraloader-00002-oci.log  oraloader-00003-oci.log  oraloader-00004-oci.log  oraloader-report.txt
oraloader-00000-oci.xml  oraloader-00001-oci.xml  oraloader-00002-oci.xml  oraloader-00003-oci.xml  oraloader-00004-oci.xml  tableMetadata.xml
oraloader-00000-r.xml    oraloader-00001-r.xml    oraloader-00002-r.xml    oraloader-00003-r.xml    oraloader-00004-r.xml

When running "exttab" loads you need to look at the standard places one looks for errors when using Oracle external tables.  This includes the default directory for the external table and the log directory of the external tables if specified.  

The OHSH default "createlogfiles" need to be set to "true" to produce log files.  Log files will be written the external default directory or the log directory if specified.  (In OHSH, these are set as OHSH default settings "defaultdirectory" and "logdirectory").  If the problem involves bad data you will want to set the OHSH default "logbadrecords" to true.

Which method do I use for loading?

If you are loading from Hive tables whose storage is not delimited text you need to load using "jdbc" or "directpath".  

In general the choice between "jdbc" and "directpath" depends upon whether the Oracle table is partitioned.  If the table is not partitioned, you have to run "jdbc".  If it is partitioned you can run "jdbc", but "directpath" is more efficient because it bypasses Oracle SQL code path.  

If you are loading delimited text (either in Hive tables or in HDFS files) the choice is between "exttab" and "jdbc" for non-partitioned Oracle tables, or "exttab" and "directpath" for partitioned Oracle tables.  (Again you can run "jdbc" against partitioned Oracle tables, but you should opt for running "directpath" since it will do the same job faster and more efficiently).

Loading delimited text using "exttab" is much faster in end-to-end load time but is much more expensive in terms of CPU cycles expended on the system running Oracle.  All conversion to Oracle data types is being done using Oracle CPU cycles.

Loading delimited text using "jdbc" or "directpath" is slower in end-to-end time, but that's because the data conversion is done on the Hadoop side and is split across many map-reduce tasks.  There is less burden on Oracle CPU because the heavy lifting of Oracle type conversion was done on the Hadoop side.

Note that the "load oracle table" command in OHSH does not require you to choose a method.  If a method is not specified OHSH will choose a default method: choosing "jdbc" if the Oracle table is not partitioned or choosing "directpath" otherwise.

Best practices for loading Oracle tables using OHSH

If you want to minimize complexities in loading data into Oracle from Hadoop you might want to consider doing the following.

o Set console output to verbose.

Verbose output explains what is going on under the covers, and outputs load settings for Hadoop, Hive, OLH, and OSCH.  Because output for "jdbc" and "directpath" includes the standard OLH output of a Hadoop map-reduce job, you get a very clear idea of the progress of the load.  Verbose output often clarifies the source of a failure without hunting down logs.

o Ensure that dates and timestamp fields have uniform formats

Most problems with delimited text parsing focus on dates and timestamp fields because there are so many different formats.  Conversion of these types is much more complex than any other scalar data type.  If you generate Hadoop data with a single standard format for all date and timestamp fields, life will be much easier.

o When loading Hive tables into Oracle tables ensure that the column names and order of declaration are the same for Hive tables and Oracle tables

This is not a hard requirement, since OHSH loadermaps work around this problem, but it does introduce more complexity in specifying the load operation.  So if you can give columns in both tables the same names and order, life becomes much simpler and you will get to the beach more often, especially when the table has a large number of columns.

o Isolate the setting of defaults to a separate callable OHSH script

If there are some standard default settings that are invariant across all loads, write up a standard initialization OHSH script that gets executed at the outset of an OHSH session either by calling the script indirectly or by using the initialization switch (e.g. "-i <initscript>") when invoking OHSH.

 

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.