X

Move data between Oracle Database and Apache Hadoop using high speed connectors.

Recent Posts

Using OHSH with Oracle Wallet

As we have seen in several earlier blog posts, OHSH is an easy-to-use CLI to move data between Apache Hadoop and Oracle Database. In this blog we will demonstrate how to use OHSH with Oracle Wallet. The initial resources you will create when using OHSH are the SQL*Plus resource and JDBC resource to connect to Oracle Database.   For example, ohsh> create sqlplus resource sql0 connectid=”<database-connection-URL>” This OHSH command will prompt you for username and password. This can be automated by using Oracle Wallet, which is the secure way to connect to Oracle Database.   Step 1.   Create an Oracle Wallet.   Refer to Oracle Wallet documentation for details.  The Oracle Wallet documentation also explains the updates needed to tnsnames.ora and sqlnet.ora.  Below are examples. tnsnames.ora entry (this might already exist) ORCL =   (DESCRIPTION =     (ADDRESS = (PROTOCOL = TCP)(HOST = xyz.oracle.com)(PORT = <port number>))     (CONNECT_DATA =       (SERVER = DEDICATED)       (SERVICE_NAME = <service name>)     )   )   sqlnet.ora entry WALLET_LOCATION =    (SOURCE =     (METHOD = FILE)     (METHOD_DATA =       (DIRECTORY = <full path of wallet location>)     )    ) SQLNET.WALLET_OVERRIDE = TRUE SSL_CLIENT_AUTHENTICATION = FALSE SSL_VERSION = 0   Step 2.   Configure OHSH to use Wallet.   On the client/edge node where you are running OHSH:    Edit $OHSH_HOME/bin/ohsh_config.sh to update the following environment variables: TNS_ADMIN=<location of tnsnames.ora and sqlnet.ora> WALLET_LOCATION=<location of Oracle Wallet files>   Step 3.   Add the Oracle Wallet files and tnsnames.ora and sqlnet.ora to Hadoop cluster nodes. Step 3.a. Copy the Oracle Wallet files, and tnsnames.ora and sqlnet.ora, to the same location in every node on the Hadoop cluster, and the Hadoop client/edge node where you launch OHSH.   On Oracle Big Data Appliance this task is easier since BDA has a shared NFS location that can be shared by all nodes.    You can copy the files to this shared NFS location.  On other clusters the location of Oracle Wallet files and *.ora files could be, for example, /home/oracle/oracle_wallet The location of the tnsnames.ora and sqlnet.ora could be /home/oracle/tns_admin These directories should be the same on all the nodes and the Hadoop client/edge node from where you launch OHSH. If the location of the Oracle Wallet files and *.ora files on the Hadoop cluster nodes (set in Step 3.a) is different from the location of these files on the client/edge node (set in Step 2) where you are running OHSH, follow Step 3.b. Step 3.b. Startup the OHSH CLI $ ohsh Set hadooptnsadmin and hadoopowalletlocation to the directory paths in Step 3.    For example, ohsh> set hadooptnsadmin /home/oracle/tns_admin ohsh> set hadoopwalletlocation /home/oracle/oracle_wallet   Step 4.   Startup the OHSH CLI. Use the identifier in tnsnames.ora when creating the SQL*Plus and JDBC resources.  For example, if you have the following entry in tnsnames.ora: ORCL =   (DESCRIPTION =     (ADDRESS = (PROTOCOL = TCP)(HOST = xyz.oracle.com)(PORT = <port number>))     (CONNECT_DATA =       (SERVER = DEDICATED)       (SERVICE_NAME = <service name>)     )   ) You can create the sqlplus resource as follows: ohsh> create sqlplus resource sql0 connectid=”orcl”

As we have seen in several earlier blog posts, OHSH is an easy-to-use CLI to move data between Apache Hadoop and Oracle Database. In this blog we will demonstrate how to use OHSH with Oracle Wallet. The...

Which Data Movement Tool Should I Use?

When moving data between Hadoop/Hive/HDFS and Oracle Database, a common question asked is, what is the best tool to use? Broadly, the wide array of products can be categorized as below: Data Movement   Oracle Shell for Hadoop Loaders (OHSH for short): This tool is a universal command-line UI for the three data movement products described below.   Load into Oracle Database Oracle Loader for Hadoop: Load from Hive/HDFS (and Kafka) to Oracle Database Oracle SQL Connector for HDFS: Load text data from Hive/HDFS to Oracle Database using SQL   Copy from Oracle Database Copy to Hadoop:  Copy data from Oracle Database tables to HDFS and access from Hive (and Spark through a Hive context)   In addition to the command-line OHSH these products are available for use through SQL Developer.   Data Access (in place)   Oracle Big Data SQL: Oracle SQL access of data in Hive/HDFS (and Kafka)   Oracle Table Access for Apache Hadoop: Hive SQL access to data in Oracle Database tables     Oracle Data Integrator has KMs for data movement, some of them use the above products.   Load Speed and Performance The data movement tools are architected to minimize CPU usage by the database system during the load process.  So for large loads, in particular when the load jobs co-exist with a database running applications, the data movement tools of Oracle Loader for Hadoop and Copy to Hadoop are ideal.   They enable fast data movement with minimal impact on the database.  Oracle Loader for Hadoop works very well for continuous loads or frequent loads into a database, as it can load data without impacting other applications.   Oracle SQL Connector for HDFS is faster, but it uses more database CPU cycles.      When used with Oracle Big Data Appliance and Oracle Exadata connected by InfiniBand, Oracle SQL Connector for HDFS can load into the database at 15 TB an hour.  UI/Skill Sets SQL Developer can be used with all the data movement tools. ODI KMs are very popular among customers who are already using ODI for data integration.   With the same skillset and a familiar product they can use ODI KMs to move data between Hadoop and Oracle Database.  The data access tools enable data movement through SQL, and so are appealing to SQL programmers.  Once the external table is created data can be moved using a CTAS (Create Table as Select) statement that reads the external table and writes the data to a local table.    However the primary goal of these tools is data access. Licensing Cloud: Oracle Loader for Hadoop and Copy to Hadoop (and the UI Oracle Shell for Hadoop Loaders) are available with Big Data Cloud Service and Big Data Cloud licenses.   No additional licenses are required. On-prem: Oracle Loader for Hadoop and Oracle SQL Connector for HDFS (and the UI Oracle Shell for Hadoop Loaders) are licensed with Oracle Big Data Connectors, along with Oracle Table Access for Apache Hadoop.     Copy to Hadoop (and the UI Oracle Shell for Hadoop Loaders) is licensed with Oracle Big Data SQL.

When moving data between Hadoop/Hive/HDFS and Oracle Database, a common question asked is, what is the best tool to use? Broadly, the wide array of products can be categorized as below: Data Movement   Ora...

Oracle Big Data Cloud and Oracle Database

Connecting to Oracle Database from Big Data Cloud Big Data Cloud release 17.4.6 comes with pre-installed and configured high speed connectors to load data to Oracle Database and to copy data from Oracle Database.    These are the Oracle Loader for Hadoop and Copy to Hadoop products you might be familiar with from other deployments. The UI to use these connectors is the OHSH (Oracle Shell for Hadoop Loaders) UI.   See here for a comprehensive set of blog posts on OHSH. A brief summary of OHSH OHSH (Oracle Shell for Hadoop Loaders), nick named “ohshell” is a CLI to run commands to load Oracle Database tables with data in Hadoop, and to copy Oracle Database tables to Hive.  OHSH layers on the underlying connector products such as Oracle Loader for Hadoop and Copy to Hadoop. OHSH integrate various tools needed to move and load data between Hadoop and Oracle Database: Beeline/Hive, Hadoop, bash, SQL*Plus.  A delegation operation “%” associated with a resource name identifies the resource that should execute a command.   For example, ohsh> %hive0 show tables; sends a “show tables” to Beeline/Hive CLI. hive0, hadoop0 and bash0 are resources available when you start OHSH.    A user can create additional resources, such as resources to send a query to the database, as we will see in this blog post. In 17.4.6 you can use OHSH by making an ssh connection to a Big Data Cloud node and starting up OHSH. Getting Started On the Database First, create a database user and tablespace in your Oracle Database Cloud Service. $ sqlplus / as sysdba sql> alter session set container=<container name> # Example container                                                                                   # name is PDB1 sql> create tablespace tbs_bdc datafile ‘tbs_bdc.dat’ size 1M autoextend on next 100K segment space management auto; sql> create user bdcusr identified as <password of your choice> default tablespace tbs_bdc; sql> grant create session, alter session, create table, create view to bdcusr; On Big Data Cloud ssh to a node in Big Data Cloud and login $ sudo su oracle   Add /opt/oracle/dbconnector/ohsh/bin to your PATH.   The ohsh executable is at this location.   There are a comprehensive set of example scripts in /opt/oracle/dbconnector/ohsh/examples (along with a README.txt) to use the connectors.   In this blog post we select a few commands from these scripts.   Load data from this directory into HDFS, for use in this blog post. $ cd /opt/oracle/dbconnector/ohsh/examples $ ohsh ohsh> %hadoop0 fs -rm -r ohshdata   # Use the hadoop0 resource to                                                               # delete existing directories ohsh> @create_hdfs_data_tables.ohsh # Example script that loads data into HDFS        5. Create a Hive table over this data for use in the examples.            ohsh> %hive0 >>           drop table fivdti;               create table fivdti                     (f1 decimal(38,18),                      i2 bigint,                     v3 string,                     d4 date,                      t5 timestamp,                     v6 string,                     i7 bigint) row format delimited fields terminated by ',' stored as textfile; Create a file setresources_blog.ohsh in your local directory.  This is similar to /opt/oracle/dbconnector/ohsh/examples/setresources.ohsh, but it creates fewer resources – it only create resources to connect to the database.  You will need the database connection string.   Add the following commands to setresources_blog.ohsh. # Create a sqlplus command line resource that enables you to run SQL queries in the database # from the OHSH prompt.    You will need the database username/password you created in the # ‘Getting Started’ section, you will be prompted for them.   create sqlplus resource sql0 connectid="bdctest.compute-uspm032.oraclecloud.internal:1521/pdb1.uspm032.oraclecloud.internal"   # Create a jdbc resource that enables jobs submitted via OHSH to # make JDBC connections to the # database.   You will need the database username/password you created in the # ‘Getting Started’ section, you will be prompted for them.   create oracle jdbc resource jdbc0 connectid="bdctest.compute-uspm032.oraclecloud.internal:1521/pdb1.uspm032.oraclecloud.internal"   Now you can execute in an OHSH session.   ohsh> @setresources_blog.ohsh   Resources are valid during the entire OHSH session.  If you start a new OHSH session, create the resources again.   Create tables in the database We create tables with a schema that matches the data we are going to load.   You can either connect to the database and create the tables, or create them from ohsh as shown below.  If using ohsh, note the use of the sql0 resource we created earlier. ohsh> %sql0 >> CREATE TABLE ohsh_fivdti (f1 NUMBER  ,i2 INT  ,v3 VARCHAR2(50)  ,d4 DATE  ,t5 TIMESTAMP  ,v6 VARCHAR2(200)  ,i7 INT ) PARTITION BY HASH(i7)   PARTITIONS 4 NOLOGGING PARALLEL; CREATE TABLE ohsh_fivdti_reverse (   i7 NUMBER(38,0)  ,v6 VARCHAR2(200)  ,t5 TIMESTAMP  ,d4 DATE  ,v3 VARCHAR2(50)  ,i2 NUMBER(38,0)  ,f1 NUMBER ) PARTITION BY HASH(i7)   PARTITIONS 4 NOLOGGING PARALLEL;   CREATE TABLE ohsh_fivdti_part (   state VARCHAR2(2)  ,f1 NUMBER  ,i2 INT  ,v3 VARCHAR2(50)  ,d4 DATE  ,t5 TIMESTAMP  ,v6 VARCHAR2(200)  ,i7 INT ) PARTITION BY HASH(state)   PARTITIONS 6 NOLOGGING PARALLEL; ;   Now you are ready to start using OHSH and the connectors. Load data from Oracle Big Data Cloud to a Database Cloud Service   The following ohsh commands are similar to the commands in the example load_directpath.ohsh $ ohsh # Truncate the destination table before the new load if you need to ohsh> %sql0 truncate table ohsh_fivdti;     # Set number of threads to execute the load on the Hadoop side.  Higher this number faster the load, # as long as the cluster has the compute resources to support it. ohsh> set reducetasks 4;     # Load data with the directpath load option  ohsh>  load oracle table jdbc0:OHSH_FIVDTI from \    path hadoop0:/user/${HADOOP_USER}/ohshdata/fivdti using directpath; OHSH will resolve the environmental variable HADOOP_USER to be the OS user or  user which has the kerberos ticket (if the cluster is a secure cluster). # Load data with the directpath load option, from a Hive table   ohsh> load oracle table jdbc0:OHSH_FIVDTI from \     hive table hive0:fivdti using directpath;   Copy Data from a Database Cloud Service to Big Data Cloud The following ohsh commands are similar to the commands in the example createreplace_directcopy.ohsh $ ohsh # Count the number of rows in the database table ohsh> %sql0 select count(*) from ohsh_fivdti;     # Set the number of files that the database data is copied to. Higher this number greater the parallelism, # as long as the database and the cluster have compute resources to support this. ohsh>  set dpfilecount 2;     # Copy data with the directcopy option and create a Hive table on these files  ohsh>  create or replace hive table hive0:cp2hadoop_fivdti from \    oracle table jdbc0:ohsh_fivdti using directcopy; # Count the number of rows in the hive table ohsh> %hive0 select count(*) from cp2hadoop_fivdti;         

Connecting to Oracle Database from Big Data Cloud Big Data Cloud release 17.4.6 comes with pre-installed and configured high speed connectors to load data to Oracle Database and to copy data from...

If You Want To Access Kafka From Hive, Then Read This

The Oracle developed Hive storage handler enables Hive to query Kafka streams.  The storage handler ships with Oracle Big Data Connectors and Oracle Big Data SQL (enable Oracle SQL queries on Kafka streams). Below are the steps to configure your cluster to use this Hive storage handler. We will use the Oracle EventHub Cloud Service, which is a managed Kafka service in this example.   Note the following on the Oracle EventHub you are going to access. Note the Kafka server IP and port. Enable network access from Hive cluster nodes to Kafka cluster ports. Note the name of the Kafka topic.   You will see that the domain name has been prefixed to the Topic name you had selected. On your Hive cluster Add oracle-kafka.jar and kafka-clients-0.10.2-kafka-2.2.0.jar to Hive auxlib.   oracle-kafka.jar is available with Big Data Connectors in $OLH_HOME/jlib/ and kafka-clients*.jar is available on your Kafka installation.  Check the instructions specific to your Hadoop distribution for adding jars to Hive auxlib. Restart Hiveserver2   Query Kafka with Hive Now you are ready to create a Hive table that can access a Kafka topic. CREATE EXTERNAL TABLE kafka_table row format serde 'oracle.hadoop.kafka.hive.KafkaSerDe' stored by 'oracle.hadoop.kafka.hive.KafkaStorageHandler' tblproperties( ‘oracle.kafka.table.key.type’=’string’, ‘oracle.kafka.table.value.type’=’long’, ‘oracle.kafka.bootstrap.servers’='129.150.65.214:6667', 'oracle.kafka.table.topics'='<topic1>’,’<topic2>’);   If the data is in Avro format it can be accessed by parsing the schema: CREATE EXTERNAL TABLE traffic_lights row format serde 'oracle.hadoop.kafka.hive.KafkaSerDe' stored by 'oracle.hadoop.kafka.hive.KafkaStorageHandler' tblproperties( 'oracle.kafka.table.value.type'='avro', 'oracle.kafka.bootstrap.servers'='129.150.65.214:6667', 'oracle.kafka.table.topics'='uspm004-traffic-light', 'oracle.kafka.table.value.schema'='{   "type": "record",   "name": "traffic_light",   "fields": [     {"name" : "id", "type": "string"},     {"name" : "clientId", "type": "string"},     {"name" : "source", "type" : "string"},     {"name" : "payload", "type": {       "type" : "record",       "name" : "payload_record",      "fields" : [        {"name" : "CurrentFault", "type" : "float"},        {"name" : "LightColor", "type" : "string"},        {"name" : "LightLumen", "type" : "float"}     ]}}   ] }' );   Now you can query the Kafka topic from Hive: beeline> select id, payload.lightlumen from traffic_lights where payload.lightlumen < 1140; +---------------------------------------+---------------------+--+ |                  id                   |     lightlumen      | +---------------------------------------+---------------------+--+ | 4950b109-b730-416b-8c25-cb6071c01ae6  | 1134.6158447265625  | | 028dd91b-374e-4650-9152-2b9940064009  | 1134.2039794921875  | | ea6eb2cd-6156-46b1-ab79-6b12418fd11f  | 1134.2138671875     | | 189983d4-3629-4d8f-a232-a18b199ee4da  | 1133.296875         | | c0821c6d-53b0-4a11-927a-270cca4559fe  | 1133.7509765625     | | 218bd840-b228-418b-823a-a84b3b828eb8  | 1131.4017333984375  | | ........                                                               | ...................              | | ........                                                               | ...................              | | 3d5871df-6dbc-49f0-9e25-b70a6093d3df  | 1130.330322265625   | | ec8aaf18-1ae0-49ba-b77d-d1eba5e5734b  | 1131.327880859375   | | 892d3781-b550-44e8-945c-e5fd840410f4  | 1134.4990234375     | +---------------------------------------+---------------------+--+ 1,438 rows selected (0.921 seconds) beeline> select id, payload.lightlumen, payload.currentfault from traffic_kafka limit 10; +---------------------+----------------------+--+   |     lightlumen      |     currentfault     | +---------------------+----------------------+--+ | 1196.422119140625   | 0.3884314000606537   |  | 1162.2769775390625  | 0.38775667548179626  |  | 1144.1348876953125  | 0.3893236517906189   |  | 1192.4967041015625  | 0.3968856930732727   |  | 1180.6944580078125  | 0.3963151276111603   |  | 1151.1507568359375  | 0.38285326957702637  |   | 1167.318603515625   | 0.3874223232269287   |  | 1192.4923095703125  | 0.381509393453598    |   | 1154.9349365234375  | 0.39086756110191345  |   | 1181.163330078125   | 0.3923773169517517   | +---------------------+----------------------+--+ select count(*) from traffic_kafka where timestamp > '2017-12-07 17:19:35'; +-------+--+ |  _c0  | +-------+--+ | 3300  | +-------+--+ select count(*) from traffic_kafka where timestamp between '2017-12-07 17:18:35' and '2017-12-07 17:19:35'; +--------+--+ |  _c0   | +--------+--+ | 16400  | +--------+--+   More details on this Hive storage handler are in the Oracle Big Data Connectors documentation, Appendix B.  Big Data Connectors 4.10 documentation is here.

The Oracle developed Hive storage handler enables Hive to query Kafka streams.  The storage handler ships with Oracle Big Data Connectors and Oracle Big Data SQL (enable Oracle SQL queries on Kafka...

See How Easily You Can Move Data Between Apache Hadoop and Oracle Database in the Cloud - Part 2

In Part 1 we saw how easily we can move data between a Big Data Cloud Service and Database Cloud Service. In this post we look at how data copied from Oracle Database Cloud Service to Big Data Cloud Service can be used in Machine Learning applications that use Spark MLLib. Using steps from this earlier blog post on using Spark and data copied by Copy to Hadoop, we see that a Data Frame can be created to point to the Hive external table created in Part 1.   scala> val movie_ratings_oracle_df = sqlContext.table("moviedemo.movie_ratings_oracle") This data has a lot of NULLs in the ratings column.   Let us remove them: scala> val movie_ratings_oracle_df_notnull = movie_ratings_oracle.filter("rating is not null") Let us transfer the data frame movie_ratings_oracle_df_notnull to an RDD of ratings, by extracting the columns cust_id(0), movie_id(1), rating(6) from movie_ratings_oracle_df_notnull. scala> val ratings = movie_ratings_oracle_df_notnull.map{row => Rating(row.getDecimal(0).intValue(),row.getDecimal(1).intValue(),row.getDecimal(6).doubleValue())} On screen: ratings: org.apache.spark.rdd.RDD[org.apache.spark.mlib.recommendation.Rating]=MapPartitionsRDD[26] at map at <console>:32 As described in Copy to Hadoop + Spark blog post, we can use this RDD in analysis after importing some Spark MLLib classes. scala> import org.apache.spark.mllib.recommendation.ALS scala> import org.apache.spark.mllib.recommendation.MatrixFactorizationModel scala> import org.apache.spark.mllib.recommendation.Rating

In Part 1 we saw how easily we can move data between a Big Data Cloud Service and Database Cloud Service. In this post we look at how data copied from Oracle Database Cloud Service to Big Data Cloud...

See How Easily You Can Move Data Between Apache Hadoop and Oracle Database in the Cloud - Part 1

  The Oracle Big Data Cloud Service (BDCS) license includes tools to move data between Apache Hadoop and Oracle Database, such as Oracle Loader for Hadoop and Copy to Hadoop.   Here is how you can use these tools with the easy-to-use CLI (OHSH).   These tools are all installed and configured in BDCS. There are just two additional configuration steps you need to perform: In the database cloud service enable the access rule to open the listener port 1521.   Access Cloudera Manager to note the Hiveserver2 host and port.   Edit /opt/oracle/ohsh-<ver>/bin/ohsh _config.sh to set these variables: export OLH_HOME=/opt/oracle/oraloader-<version> export OSCH_HOME=/opt/oracle/orahdfs-<version> export CP2HADOOP_HOME=/opt/oracle/orahivedp-<version> export HS2_HOST_PORT="<Hiveserver 2 host>:<Hiveserver 2 port>"    And as necessary TNS_ADMIN and WALLET_LOCATION Using the CLI ssh to a BDCS node. Start up the CLI $ ohsh Set up resources to connect to the database, and to Hive databases.    Database resources You will be prompted for database username/password when you run these commands. ohsh> create sqlplus resource sql0 connectid=“bdctest.compute-xyz00.oraclecloud.internal:1521/pdb1.xyz00.oraclecloud.internal” ohsh> create jdbc resource jdbc0 connectid=“bdctest.compute-xyz00.oraclecloud.internal:1521/pdb1.xyz00.oraclecloud.internal” Hive resources %hive0 is the resource for the database default in Hive.   Additional resources to connect to Hive databases: ohsh> create hive resource hive_moviedemo connectid=“bdctest.compute-xyz00.oraclecloud.internal:1521/pdb1.xyz00.oraclecloud.internal/moviedemo”   Data movement examples Load from Hadoop to Oracle Database ohsh> load oracle table jdbc0:movie_ratings from hive table hive_oracletest:movie_ratings_delimitedtext using directpath Load from Oracle Database to Hadoop ohsh> create or replace hive table hive_moviedemo:movie_ratings_oracle from oracle table jdbc0:movie_ratings using directcopy   As easy as pie.    

  The Oracle Big Data Cloud Service (BDCS) license includes tools to move data between Apache Hadoop and Oracle Database, such as Oracle Loader for Hadoop and Copy to Hadoop.   Here is how you can use...

Links to Oracle Big Data Connector Technology Blogs

The following links discuss OHSH (Oracle Shell for Hadoop Loaders) , which is a CLI used for loading Oracle tables from content living in HDFS or Hive tables, and for loading Hive tables from content living in Oracle tables.   How to Load Oracle and Hive Tables using OHSH (Part 1 - Introduction) How to Load Oracle and Hive Tables using OHSH (Part 2 - Configuration and CLI Usage) How to Load Oracle and Hive Tables with OHSH (Part 3 - Loading Oracle Tables) How to Load Oracle and Hive Tables Using OHSH (Part 4 - Loading Hive Tables) How to Load Oracle and Hive tables using OHSH (Part 5 - Using "loadermap" when loading Oracle tables) How to Load Oracle and Hive tables using OHSH (Part 6 - Using the "etl" method for loading Oracle tables) The following links discuss two Oracle Connectors for Hadoop: Oracle Loader for Hadoop (OLH) and Oracle SQL Connector to HDFS (OSCH).  These are two of the underlying technologies that OHSH uses which can also be directly used by power users.  They also provide better insight on what exactly is happening under-the-covers when using the parallelism of Oracle external tables or Hadoop Map/Reduce to load Oracle tables. How to Load Oracle Tables From Hadoop Tutorial (Part 1 - Overview) How to Load Oracle Tables From Hadoop Tutorial (Part 2 - Hello World) How to Load Oracle Tables From Hadoop Tutorial (Part 3 - Direct Path) How to Load Oracle Tables From Hadoop Tutorial (Part 4 - OSCH Hello World) How to Load Oracle Tables From Hadoop Tutorial (Part 5 - Leveraging Parallelism in OSCH) How to Load Oracle Tables From Hadoop Tutorial (Part 6 - The Data Pump Solution)

The following links discuss OHSH (Oracle Shell for Hadoop Loaders) , which is a CLI used for loading Oracle tables from content living in HDFS or Hive tables, and for loading Hive tables from...

How to Load Oracle Tables From Hadoop Tutorial (Part 1 - Overview)

Introduction This is the first of a series of blog posts that will discuss how to load data living in the Hadoop Ecosphere into Oracle tables. The goal is to give insights, discuss pros and cons, and best practices for achieving optimal load times and flexibility from an experienced developer’s point of view. Oracle and Hadoop are complementary technologies where the whole is greater than the sum of the parts. They both have parallel architectures, which, if used intelligently can move data at an impressive rate. Last year, we achieved a load rate of 12TB (terabytes) per hour between Oracle Exadata and Hadoop running on Oracle’s Big Data Appliance (BDA). The ability to distill big data in Hadoop and then to seamlessly move large result sets into the Oracle stack creates enormous added value in solving Big Data problems. In supporting customers who need this functionality we’ve noticed that more frequently than not, we are talking to people who are either Hadoop experts or Oracle heavyweights but not both. In our attempt to explain these two technologies we will offer breakout sections that offer some rudimentary background notes about Hadoop and Oracle that we think are important to understand, so you can use these tools effectively. Additional specialized topics will also go into loading issues specific to RAC and Exadata environments. Why Use Oracle Big Data Connectors? Hadoop developers might be asking themselves the following question: Oracle has been around for a long time managing huge sets of data in tables. These tables had to be loaded somehow?  What’s the added value of the Big Data Connectors? Can’t we use the standard utilities Oracle has provided to load tables? The quick answer is yes. But if you are dealing with Big Data, you really don’t want to. Some Background about Conventional Loading Tools and Oracle Oracle's off-the-shelf utility used for loading data from external source is called SQL*Loader. It does a great job loading files of various formats into an Oracle table. The following SQL*Loader control file illustrates what this utility does: LOAD DATA INFILE file1.dat INFILE file2.dat INFILE file3.dat APPEND INTO TABLE emp ( empno POSITION(1:4) INTEGER EXTERNAL, ename POSITION(6:15) CHAR, deptno POSITION(17:18) CHAR, mgr POSITION(20:23) INTEGER EXTERNAL ) SQL*Loader is being told to open three files and append an existing table “emp” with data from the files whose column mapping, physical position, and representation are articulated between the parenthesis. SQL*Loader is really powerful for processing files of various formats. But to use this tool with Hadoop you need to work around several problems. The first of which is that Hadoop content lives in Hadoop Distributed File System (HDFS) files, not standard OS file systems. SQL*Loader does not know how to access HDFS directly, so the “INFILE” verbiage is a non-starter. You could work around this problem two ways. One way is to copy the file from Hadoop onto a local disk on a system where SQL*Loader is installed. The problem with this solution is that Hadoop files are big, very often bigger than any storage you have on a single system. Remember that a single Hadoop file can potentially be huge (say 18TB, larger than the digital content of the Library of Congress).  That’s a big storage requirement for a single system, especially for a transient requirement such as staging data. Also you can assume that whatever storage requirements you have today for Big Data, they will certainly grow fast. Secondly, in order to get the data from HDFS into an Oracle table you are doubling the amount of IO resources consumed. (“Read from HDFS, write into an Oracle table” becomes “Read from HDFS, write to staged file, read from staged file, write into an Oracle table”). When operating against Big Data, doubling the IO overhead is worth avoiding. An alternative approach is to use FUSE technology (Mountable HDFS) that creates a mount point for HDFS. It is an elegant solution but it is substantially slower than Oracle Big Data Connectors (by a factor of 5) and consumes about three times the CPU. And in both cases you would be forced to run SQL*Loader on the machine where Oracle lives, not because of some functional limitation of SQL*Loader (you can run it anywhere) but because of the practicalities of working with HDFS which is inherently distributed. Running SQL*Loader on a non-Oracle system means you are moving huge data blocks of distributed data living on any number of Hadoop DataNodes through the network to a single system which will be tasked to pass the entire payload over the network again to Oracle. This model doesn’t scale.   Exploiting Parallelism in Oracle and Hadoop to Load Data The best solution for loading data from Hadoop to Oracle is to use and align the mechanisms for doing parallel work in both environments. Parallelism in Oracle Loader for Hadoop (OLH) For OLH this means running MapReduce programs in Hadoop to break up a load operation into tasks running on all available MapReduce nodes in a Hadoop cluster. These MapReduce tasks run concurrently, naturally dividing the workload into discrete payloads that use Oracle MapReduce code to connect to Oracle Database remotely and load data into a target table. It’s a natural parallel model for Hadoop since the loading logic is encapsulated and run as vanilla MapReduce jobs. And it’s a natural model for Oracle, since the Oracle database system is being tasked to serve multiple clients (i.e MapReduce tasks) loading data at once, using standard client-server architecture that’s been around for decades. Parallelism in Oracle SQL Connector for Hadoop Distributed File System (OSCH) OSCH is the alternative approach that marries two other parallel mechanisms: Oracle Parallel Query for Oracle External Tables and Hadoop HDFS Client. To explain how these mechanisms align, let’s first talk about External tables and Parallel Query. External Tables External tables are tables defined in Oracle which manage data not living in Oracle. For example, suppose you had an application that managed and frequently updated some structured text files in a system, but you needed to access that data to join it to some Oracle table. You would define an Oracle External table which pointed it to the same structured text files updated by the application, accompanied by verbiage that looks striking similar to the SQL*Loader verbiage discussed above. That’s not a coincidence. The Oracle External tables use the SQL*Loader driver which executes SQL*Loader code under the covers. Parallel Query Parallel Query (PQ) is a “divide and conquer” strategy that decomposes a SQL statement into partitioned tasks that can execute in parallel and merge the results. PQ exploits the fact that SQL tables are symmetric and can be logically subdivided into horizontal partitions (i.e. sets of rows). With PQ if you want to execute: SELECT last_name FROM emp WHERE salary > 30000 Oracle can decompose this query into smaller units of work which perform the identical query in parallel against mutually exclusive sets of rows in the “emp” table. For PQ to give you this advantage it needs to be enabled and properly configured (a detail we will talk about in a future post.)  For now you simply need to understand that PQ works to break down SQL statements into worker bees (i.e. PQ Slaves) that divide the load and execute in parallel. In particular, PQ can be enabled for External tables which allow SQL to access data outside of Oracle in parallel. The amount of parallelism an External table has is configurable and is dictated by configuring the DOP (degree of parallelism).  The DOP can be asserted various ways: as an attribute of a table, or within a SQL statement using a table, or at the session level after the user connects to Oracle. HDFS Client Now let’s talk about Hadoop HDFS Client. This is a Java API living in Hadoop that acts as a client to HDFS file systems. It looks like your standard file system programmatic interface: with open, read, write, and close methods. But because it works against HDFS which distributes individual blocks of a file across a Hadoop cluster, there is a lot of parallelism going on in the back end. Blocks are served up to HDFS by Hadoop DataNodes that are daemons running on Hadoop nodes, serving up data blocks that are stored locally to the node. If you run a lot of HDFS Clients concurrently against different HDFS files, you are doing lots of concurrent IO and concurrent streaming of data, from every Hadoop node that has a running DataNode. In other words you are maximizing retrieval of data from the Hadoop Cluster. Putting It All Together OSCH works by using all these mechanisms together. It defines a specialized External table which can invoke HDFS Client software to access data in HDFS. And when PQ is enabled for this type of External table, a SQL select statement gets decomposed into N PQ slaves (where N is the DOP). In other words a SQL select statement can kick off N PQ slaves that are each accessing K Hadoop DataNodes in parallel. Access of HDFS blocks by PQ slaves maximizes disk IO, network bandwidth, and processing both in Oracle and in Hadoop. With this model, you load an Oracle table (e.g. “MY_TABLE”) by executing a single SQL Insert statement. One that gets its data from a subordinate Select statement that references the external table retrieving data from HDFS (e.g. “MY_EXTERNAL_TABLE”). INSERT INTO MY_TABLE as SELECT * FROM MY_EXTERNAL_TABLE; Actually I lied. It takes two statements. COMMIT; Just sayin. Next Topic In following post we will look at OLH in depth starting with JDBC. We will look at the execution model, and discuss the basics for configuring and tuning a MapReduce job used to load a table living in Oracle. Author’s Background My background in this space involves both product development and performance. I was pulled into this project about 20 months ago from doing work in Oracle middleware (specifically Oracle Coherence). I am currently working with a talented team that developed OLH and OSCH from scratch. My contribution was to design and prototype OSCH to the point that it scaled, and then spin up on Oracle BDA/Exadata/Hadoop internals to do performance benchmarks and testing. Because I’m almost the newest member of the team, the experience of spinning up in this space is still fresh in my mind, so I have a healthy respect for what it’s like to wrap ones brain around both technologies. Many readers will have much deeper knowledge in either the Oracle space or about Hadoop, so questions or clarifications are welcome.    

Introduction This is the first of a series of blog posts that will discuss how to load data living in the Hadoop Ecosphere into Oracle tables. The goal is to give insights, discuss pros and cons, and...

Connecting Hadoop with Oracle

How to Load Oracle and Hive tables using OHSH (Part 5 - Using "loadermap" when loading Oracle tables)

In this tutorial we deal with columns and tables.   Specifically how to map Oracle columns to specific data fields in HDFS files or to Hive columns. Previously in Part 3, we discussed the methods for loading Oracle tables for relatively simple cases.  The simplifying assumptions included the following: When loading delimited text into Oracle tables, the physical order of the delimited text fields reflects the declared order of the columns in the Oracle table When loading Hive tables into Oracle tables the column names are the same for both Hive and Oracle tables Dates and timestamp fields have uniform formats. (i.e. in one load all date  fields loaded have the same format and all timestamp fields have the same format).  The data and timestamp formats can be specified using OHSH "set dateformat" for "jdbc" and "directpath" load methods, and "set datemask" and "set timestampmask" for the "exttab" load method OHSH's "loadermap" construct deals with situations when the above assumptions are not true.  The "loadermap" is only implemented and needed for loading Oracle tables.   When creating and loading Hive tables from Oracle, data pump files are used.  They contain metadata that is used by the Oracle DPSerde to create a Hive table with identical column names, so no special mapping constructs need to be specified. "loadermap" semantics The construct is optional and follows the <LoadSource> construct in an OHSH "load oracle table" statement. ('explain')? 'load' 'oracle' 'table' <ResourceName>':'<TableName> 'from' <LoadSource> <LoaderMap>? <Using>? <LoadConf>? A "loadermap" is a list of column mappings which are a series of triplets specifying the name of a column in the target table, and optional mapping to data living in the source, and an optional "format" field. <LoaderMap> : 'loadermap' <ColumnMapping> (',' <ColumnMapping>)* <ColumnMapping> : ('tcolname' ('=')?)? <STRORDQSTR>                                  (('field' ('=')? )? <STRING>)?                                  (('format' ('=')?)? <DQSTRING>)? A "loadermap" specification is analogous to the select column clause in SQL and does four things: Specifies the target columns that are to be loaded in a table Maps each specified target column to a field position in a delimited text file (if loading from HDFS files) Maps each specified target column to a Hive field (if loading from a Hive table) Optionally specifies the format of the field being loaded (typically an Oracle DATE or TIMESTAMP) The syntax looks complex but the defaulting allows it to be abbreviated, simple, and intuitive. Using a "loadermap" for a reverse mapping In earlier examples we showed how to use OHSH to load tables from delimited text files in HDFS, where we assumed the fields in the delimited text were in the same order as the columns declared in the Oracle table.  So when loading the MOVIE_RATINGS table below, "custid" would be the first field in each row of delimited text, while "movieid" would be the second field and so on. ohsh>%sql describe movie_ratings; Name                                      Null?    Type ----------------------------------------- -------- ---------------------------- CUSTID                                             NUMBER MOVIEID                                            NUMBER GENREID                                           NUMBER TIME                                                   DATE RECOMMENDED                               NUMBER ACTIVITY                                           NUMBER RATING                                              NUMBER SALES                                               NUMBER But what if the table was declared as this? Now the column order does not naturally match the field order in the text files used earlier. For example CUSTID is the 0 field in the text file, but it is the seventh column in the table below. ohsh>%sql describe MOVIE_RATINGS_SHUFFLED; Name                                      Null?    Type ----------------------------------------- -------- ---------------------------- MOVIEID                                            NUMBER SALES                                                NUMBER GENREID                                           NUMBER TIME                                                   DATE ACTIVITY                                           NUMBER RECOMMENDED                               NUMBER CUSTID                                              NUMBER RATING                                              NUMBER To remedy this using implicit "loadermaps" we simply need to declare the columns in the "loadermap" construct in the order they exist in the delimited text file. Informally we call this an implicit order "loadermap" because the order is implied by the order of columns listed in the "loadermap".  So in the example below "custid" maps to the 0 field, "movieid" to the 1 field, and so on.  (Note that unless a column is delimited by double quotes it is treated as being a case-insensitive name.) ohsh>set datemask "YYYY-MM-DD:HH24:MI:SS" ohsh>load oracle table olhp:movie_ratings_shuffled \   from path \   hadoop0: "/user/${USER}/moviedemo/movie_ratings/delimited_text"  \   loadermap \   (custid,movieid,genreid,time,recommended,activity,rating,sales) \   using exttab The alternative is to us explicit order loader maps.   They require you to identify the field position of each target column being loaded.   Explicit ordering is zero based (i.e. fields are numbered starting with 0). With explicit ordering, it doesn't matter about what order they are declared as long as they have a positive integer associated with it that is unique and maps to the position of one field in the delimited text file. The restrictions are that you can only map one column to one field, and you can't mix explicit loader map syntax with implicit order syntax.  You either provide a number position for all columns specified or don't specify any positions explicitly.   Explicit ordering allows you to pick and choose delimited text fields of interest.   You don't have to enumerate all the fields, just the ones that map to columns in your table. ohsh>set dateformat "yyyy-MM-dd:HH:mm:ss" ohsh>load oracle table olhp:movie_ratings_shuffled \    from path \   hadoop0:"/user/${USER}/moviedemo/movie_ratings/delimited_text" \    loadermap \    (genreid 2, custid 0, sales 7, movieid 1, rating 6, time 3, \    recommended 4,activity 5) \    using jdbc  "loadermaps" for loading Oracle tables from Hive tables are similar in structure, only rather than mapping target columns to numbers, they map to Hive column names.  Note that the order of declaration of column names in Hive and Oracle tables are not a problem.  If order of declarations don't match, OHSH won't get confused. For example, suppose we are loading the following Oracle table whose column names are different from the column names in Hive. ohsh>%sql describe movie_ratings_difcols_witness;  Name                                      Null?    Type ----------------------------------------- CUSTID_1                                           NUMBER MOVIEID_1                                         NUMBER GENREID_1                                        NUMBER TIME_1                                                DATE RECOMMENDED_1                            NUMBER ACTIVITY_1                                         NUMBER RATING_1                                            NUMBER SALES_1                                              NUMBER These column names don't match the column names in the Hive table we want to load from.  ohsh>%hive0 describe movie_ratings; OK custid                   int                                         movieid                 int                                         genreid                 int                                         time                     string                                      recommended       int                                         activity                 int                                         rating                   int                                         sales                   float         Since the column names are different in the two tables, loading of the Oracle table requires the use of a "loadermap". ohsh>set datemask "YYYY-MM-DD:HH24:MI:SS" ohsh>load oracle table olhp:movie_ratings_difcols \     from hive table moviedemo:movie_ratings \     loadermap (custid_1 custid, movieid_1 movieid, genreid_1 genreid, \     time_1 time,recommended_1 recommended, activity_1 activity     rating_1 rating, sales_1 sales)  \ Note that in the previous examples we have been setting either the "datemask" or the "dateformat" depending upon whether we are loading using Oracle external tables (i.e. the "exttab" method) or loading from Hadoop using "directpath" or "jdbc"). The "loadermap" verbiage allows for a format to be associated with a particular column.  Below the "datemask" format is embedded as a third format value in the the time_1 specification below.  This allows different date and time formats to be assigned to different columns being loaded. ohsh>load oracle table olhp:movie_ratings_difcols \     from hive table moviedemo:movie_ratings \     loadermap (custid_1 custid, movieid_1 movieid, genreid_1 genreid,  \     time_1 time "YYYY-MM-DD:HH24:MI:SS",\     recommended_1 recommended, activity_1 activity,   \     rating_1 rating, sales_1 sales)  \     using exttab Using a "loadermap" for projection The following shows loadermaps serving to project columns.  (Again if you use loadermaps you need to specify all columns by name that you want to load). Let's say our target table looks like this:   ohsh>%sql describe movie_ratings_project; Name                                      Null?    Type ----------------------------------------- -------- ---------------------------- CUSTID                                             NUMBER MOVIEID                                           NUMBER GENREID                                          NUMBER We want to load three columns and project out the rest.  For delimited text we enumerate the columns and their mappings explicitly. ohsh>set dateformat "yyyy-MM-dd:HH:mm:ss" ohsh>load oracle table olhp:movie_ratings_project \   from hive table moviedemo:movie_ratings \   loadermap (custid 0, movieid 1, genreid 2) \   using jdbc Since we want the first three fields, Implicit mappings to (fields 0, 1, and 2) work here as well. ohsh>set datemask "YYYY-MM-DD:HH24:MI:SS" ohsh>load oracle table olhp:movie_ratings_project \   from hive table moviedemo:movie_ratings \   loadermap (custid, movieid, genreid) \   using exttab      

In this tutorial we deal with columns and tables.   Specifically how to map Oracle columns to specific data fields in HDFS files or to Hive columns. Previously in Part 3, we discussed the methods for...

OHSH CLI: Moving Data from the Command Line

How to Load Oracle and Hive tables using OHSH (Part 6 - Using the "etl" method for loading Oracle tables)

In this post we will discuss the "etl" method for loading tables.  It was briefly mentioned in earlier posts, but because is a hybrid of both Hadoop map/reduce processing and Oracle external tables it made sense to defer its discussion until the other simpler methods of loading Oracle tables (i.e. "exttab", "directcopy", and "jdbc") were explained. The "etl" method runs an OLH job on the Hadoop cluster to produce data pump files living in HDFS.  This is followed by an OSCH job which reads the data pump files using OSCH generated external tables and loads them into the target table.  The method name "etl" is short-hand for "extract, transform, and load".  In this case OLH is doing the extraction and tranforming, while OSCH is doing the loading. Why is the method useful? It can be used to back out of a job that encounters even one data conversion problem due to dirty data without wasting any resources on the system where Oracle is running. It can load data pump files more quickly and efficiently via "exttab" than directly loading delimited text.  When loading HDFS delimited text files using "exttab", the heavy lifting is the translation of text fields to native Oracle data types, and the CPU cycles being impacted are on the Oracle system. With "etl", the translation to native Oracle data types is offloaded to the Hadoop cluster in a standard map/reduce job.  When the "exttab" method reads the data pump files it's basically doing IO, moving opaque bytes to storage without a lot of CPU processing. The "exttab" phase can be deferred.  This means that the time that the Oracle database actually loads the data can be embedded in a cron job at a point of time when there is low usage.   More importantly, because it will be dealing with data pump files, the chance of running into a problem during the exttab phase is very low. Both Oracle and Hadoop have settings to abort loads if some number of rows were rejected because of dirty data that cannot be converted to specific data types.  But because both Oracle and and Hadoop have parallel architectures, the reject limit is scoped to each PQ Slave or Hadoop task, and is not an absolute value across all slaves or tasks.  Basically it is a  fuzzy setting for both architectures used to control runaway processing disasters caused by an intrusion of lots of dirty data. In general the sooner you detect such data the better.   With the "etl" method you can assert that no Oracle load will occur if any dirty data is detected on the Hadoop side of the processing.  If the Hadoop map/reduce job completes without rejecting any records, you are guaranteed that the Oracle load won't fail because of Oracle data type conversion problems.  If it does, it is our (i.e. Oracle's) bug. What this gate-keeping avoids is having to churn through Oracle external table bad files to determine what records were rejected and why.  It avoids identifying all bad the records from HDFS, extracting them, and cleaning them up and running a compensating load to complete the intended load. Clearly there are use cases where a few bad apples don't spoil the rest of the barrel, but if they do than "etl" is a good way to go. Loading using "etl" method Loading syntax is identical to the syntax of other methods used for loading Oracle tables including OHSH loadermap functionality.   ohsh>set dateformat "yyyy-MM-dd:HH:mm:ss" ohsh>load oracle table olhp:movie_ratings from path \    hadoop0:/user/$ {USER}/moviedemo/movie_ratings/delimited_text \   using etl  The output shows two discrete phases being executed.  The first is an map/reduce job (using OLH) that creates Oracle datapump files in HDFS.   The second phase is loading those HDFS files using Oracle External tables generated by OSCH.  The output is verbose, but it gives a clear idea of what is going on under the hood. This is the first phase using Hadoop map/reduce to generate data pump files in HDFS. ----------------------------------------------------------------------------------- Begin ETL execution at 2017-02-24:11:34:34 ----------------------------------------------------------------------------------- ################################################################################### Starting SmartLoader Job OSL_170224_113434 Local OS user: rhanckel Hadoop user:   rhanckel ################################################################################### Loading Oracle table "OLHP"."MOVIE_RATINGS"   From HDFS files located at /user/rhanckel/moviedemo/movie_ratings/delimited_text   Using Oracle Loader for Hadoop (OLH) Datapump file output     into HDFS for subsequent OSCH load ----------------------------------------------------------------------------------- Hadoop MapReduce Configuration Settings ----------------------------------------------------------------------------------- mapreduce.am.max-attempts=2 mapreduce.app-submission.cross-platform=false mapreduce.client.completion.pollinterval=5000 .. yarn.app.mapreduce.client.job.retry-interval=2000 yarn.app.mapreduce.client.max-retries=3 yarn.app.mapreduce.task.container.log.backups=0 ----------------------------------------------------------------------------------- OLH Configuration Settings ----------------------------------------------------------------------------------- oracle.hadoop.loader.connection.tns_admin=/user/rhanckel/work oracle.hadoop.loader.connection.url=jdbc:oracle:thin:@inst1 oracle.hadoop.loader.connection.wallet_location=/user/rhanckel/work oracle.hadoop.loader.defaultDateFormat=yyyy-MM-dd:HH:mm:ss oracle.hadoop.loader.input.fieldNames=F0,F1,F2,F3,F4,F5,F6,F7 oracle.hadoop.loader.input.fieldTerminator=\u002c oracle.hadoop.loader.loaderMap."ACTIVITY".field=F5 oracle.hadoop.loader.loaderMap."CUSTID".field=F0 oracle.hadoop.loader.loaderMap."GENREID".field=F2 oracle.hadoop.loader.loaderMap."MOVIEID".field=F1 oracle.hadoop.loader.loaderMap."RATING".field=F6 oracle.hadoop.loader.loaderMap."RECOMMENDED".field=F4 oracle.hadoop.loader.loaderMap."SALES".field=F7 oracle.hadoop.loader.loaderMap."TIME".field=F3 oracle.hadoop.loader.loaderMap.columnNames="CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES" oracle.hadoop.loader.loaderMap.targetTable="OLHP"."MOVIE_RATINGS" oracle.hadoop.loader.logBadRecords=true oracle.hadoop.loader.rejectLimit=-1 ----------------------------------------------------------------------------------- Oracle Database Connectivity Settings ----------------------------------------------------------------------------------- TNS usage: OLH job using client TNS path on hadoop cluster. Wallet usage: OLH job using client wallet path on hadoop cluster. ----------------------------------------------------------------------------------- Begin OLH execution at 2017-02-24:11:34:34 ----------------------------------------------------------------------------------- ----------------------------------------------------------------------------------- OLH MapReduce job is submitted to the Hadoop cluster to load an Oracle table using ETL_ONLINE ----------------------------------------------------------------------------------- [INFO] 2017-02-24 11:34:34,153 [oracle.hadoop.loader.OraLoader]  Oracle Loader for Hadoop Release 3.8.1 - Production  Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved. [INFO] 2017-02-24 11:34:34,153 [oracle.hadoop.loader.OraLoader]  Built-Against: hadoop-2.2.0 hive-0.13.0 avro-1.8.1 jackson-1.8.8 [INFO] 2017-02-24 11:34:34,453 [oracle.hadoop.loader.OraLoader]  oracle.hadoop.loader.enableSorting disabled, no sorting key provided [INFO] 2017-02-24 11:34:34,770 [oracle.hadoop.balancer.Balancer]  Creating balancer [INFO] 2017-02-24 11:34:34,836 [oracle.hadoop.balancer.Balancer]  Starting Balancer [INFO] 2017-02-24 11:34:36,773 [oracle.hadoop.loader.OraLoader]  Submitting OraLoader job OSL_170224_113434 [INFO] 2017-02-24 11:34:36,795 [hadoop.conf.Configuration.deprecation]  session.id is deprecated. Instead, use dfs.metrics.session-id [INFO] 2017-02-24 11:34:36,796 [hadoop.metrics.jvm.JvmMetrics]  Initializing JVM Metrics with processName=JobTracker, sessionId= [INFO] 2017-02-24 11:34:37,137 [mapreduce.lib.input.FileInputFormat]  Total input paths to process : 1 [INFO] 2017-02-24 11:34:37,169 [apache.hadoop.mapreduce.JobSubmitter]  number of splits:1 [INFO] 2017-02-24 11:34:37,323 [apache.hadoop.mapreduce.JobSubmitter]  Submitting tokens for job: job_local978287496_0001 .. [INFO] 2017-02-24 11:34:42,223 [apache.hadoop.mapred.LocalJobRunner]  1 / 1 copied. [INFO] 2017-02-24 11:34:42,223 [apache.hadoop.mapred.Task]  Task attempt_local978287496_0001_r_000004_0 is allowed to commit now [INFO] 2017-02-24 11:34:42,225 [apache.hadoop.mapred.LocalJobRunner]  reduce > reduce [INFO] 2017-02-24 11:34:42,226 [apache.hadoop.mapred.Task]  Task 'attempt_local978287496_0001_r_000004_0' done. [INFO] 2017-02-24 11:34:42,226 [apache.hadoop.mapred.LocalJobRunner]  Finishing task: attempt_local978287496_0001_r_000004_0 [INFO] 2017-02-24 11:34:42,230 [apache.hadoop.mapred.LocalJobRunner]  reduce task executor complete. [INFO] 2017-02-24 11:34:42,322 [oracle.hadoop.loader.OraLoader]  map 100% reduce 100% [INFO] 2017-02-24 11:34:42,322 [oracle.hadoop.loader.OraLoader]  Job complete: OSL_170224_113434 (job_local978287496_0001) [INFO] 2017-02-24 11:34:42,376 [oracle.hadoop.loader.OraLoader]  Counters: 30         File System Counters                 FILE: Number of bytes read=303621893                 FILE: Number of bytes written=298705381                 FILE: Number of read operations=0                 FILE: Number of large read operations=0                 FILE: Number of write operations=0         Map-Reduce Framework                 Map input records=39716                 Map output records=39716                 Map output bytes=2540885                 Map output materialized bytes=2620347                 Input split bytes=142                 Combine input records=0                 Combine output records=0                 Reduce input groups=5                 Reduce shuffle bytes=2620347                 Reduce input records=39716                 Reduce output records=39716                 Spilled Records=79432                 Shuffled Maps =5                 Failed Shuffles=0                 Merged Map outputs=5                 GC time elapsed (ms)=215                 Total committed heap usage (bytes)=2119696384         Shuffle Errors                 BAD_ID=0                 CONNECTION=0                 IO_ERROR=0                 WRONG_LENGTH=0                 WRONG_MAP=0                 WRONG_REDUCE=0         File Input Format Counters                 Bytes Read=1688514         File Output Format Counters                 Bytes Written=2619598 ----------------------------------------------------------------------------------- End OLH execution at 2017-02-24:11:34:42 ----------------------------------------------------------------------------------- This is the second phase, used to load the data pump files generated above. Loading Oracle table "OLHP"."MOVIE_RATINGS"   From HDFS data pump files located at  /user/rhanckel/smartloader/jobhistory/oracle/OLHP/MOVIE_RATINGS/OSL_170224_113434   Using Oracle SQL Connector for Hadoop (OSCH) external tables ----------------------------------------------------------------------------------- OSCH Configuration Settings ----------------------------------------------------------------------------------- oracle.hadoop.connection.url=jdbc:oracle:thin:@inst1 oracle.hadoop.connection.user="OLHP" oracle.hadoop.exttab.createBadFiles=true oracle.hadoop.exttab.createLogFiles=false oracle.hadoop.exttab.dataPaths=/user/rhanckel/smartloader/jobhistory/oracle/OLHP/MOVIE_RATINGS/OSL_170224_113434/oraloader-0*.dat oracle.hadoop.exttab.defaultDirectory="OLHP_DEFAULT_DIR" oracle.hadoop.exttab.locationFileCount=4 oracle.hadoop.exttab.sourceType=datapump oracle.hadoop.exttab.tableName="OSL_170224_113434_EXT" ----------------------------------------------------------------------------------- Create OSCH external table(s) used for accessing Hadoop content ----------------------------------------------------------------------------------- Oracle SQL Connector for HDFS Release 3.7.1 - Production  Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved. The create table command succeeded. User: OLHP performed the following actions in schema: OLHP CREATE TABLE "OLHP"."OSL_170224_113434_EXT" (   "CUSTID"                         NUMBER,   "MOVIEID"                        NUMBER,   "GENREID"                        NUMBER,   "TIME"                           DATE,   "RECOMMENDED"                    NUMBER,   "ACTIVITY"                       NUMBER,   "RATING"                         NUMBER,   "SALES"                          NUMBER ) ORGANIZATION EXTERNAL (    TYPE ORACLE_LOADER    DEFAULT DIRECTORY "OLHP_DEFAULT_DIR"    ACCESS PARAMETERS    (      external variable data      NOLOGFILE      PREPROCESSOR "OSCH_BIN_PATH":'hdfs_stream'    )    LOCATION    (      'osch-494CD2456A4A3007E0533A58F20AB90B-1-1',      'osch-494CD2456A4A3007E0533A58F20AB90B-1-2',      'osch-494CD2456A4A3007E0533A58F20AB90B-1-3',      'osch-494CD2456A4A3007E0533A58F20AB90B-1-4',      'osch-494CD2456A4A3007E0533A58F20AB90B-1-5'    ) ) PARALLEL REJECT LIMIT UNLIMITED; The following location files were created. osch-494CD2456A4A3007E0533A58F20AB90B-1-1 contains 1 URI, 266240 bytes       266240 file:/user/rhanckel/smartloader/jobhistory/oracle/OLHP/MOVIE_RATINGS/OSL_170224_113434/oraloader-00000-dp-1.dat ... osch-494CD2456A4A3007E0533A58F20AB90B-1-5 contains 1 URI, 270336 bytes       270336 file:/user/rhanckel/smartloader/jobhistory/oracle/OLHP/MOVIE_RATINGS/OSL_170224_113434/oraloader-00004-dp-1.dat ----------------------------------------------------------------------------------- Begin OSCH execution  at 2017-02-24:11:34:42 ----------------------------------------------------------------------------------- ----------------------------------------------------------------------------------- Begin single load (1/1) at 2017-02-24:11:34:43 ----------------------------------------------------------------------------------- ALTER SESSION FORCE PARALLEL DML PARALLEL 4 ALTER SESSION FORCE PARALLEL QUERY PARALLEL 4 INSERT /*+ append pq_distribute("OLHP"."MOVIE_RATINGS", none) */ INTO "OLHP"."MOVIE_RATINGS"("CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES") SELECT "CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES" FROM "OSL_170224_113434_EXT" Transaction is committed. ----------------------------------------------------------------------------------- End single load at 2017-02-24:11:35:05. Elapsed load time = 0:00:22. ----------------------------------------------------------------------------------- ----------------------------------------------------------------------------------- End OSCH execution at 2017-02-24:11:35:05 ----------------------------------------------------------------------------------- ----------------------------------------------------------------------------------- Executing OSCH post-load cleanup ----------------------------------------------------------------------------------- Oracle SQL Connector for HDFS Release 3.7.1 - Production  Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved. The drop command was successful. User: OLHP dropped OSCH external table OLHP.OSL_170224_113434_EXT and all associated location files. Successfully dropped transient tables/views created for the load Deleting temporary datapump files used for loading. Deleting file:/user/rhanckel/smartloader/jobhistory/oracle/OLHP/MOVIE_RATINGS/OSL_170224_113434/oraloader-00001-dp-1.dat ... Deleting file:/user/rhanckel/smartloader/jobhistory/oracle/OLHP/MOVIE_RATINGS/OSL_170224_113434/oraloader-00002-dp-1.dat ################################################################################### Ending SmartLoader Job OSL_170224_113434 Local OS user: rhanckel Hadoop user:   rhanckel ################################################################################### Load operation successful. Load time = 0:00:35 -------------------------------------------------------------------------------------------------------------------------------- End of ETL load  operation at 2017-02-24:11:35:10 -------------------------------------------------------------------------------------------------------------------------------- ohsh>%sql select count(*) from movie_ratings;   COUNT(*) ----------      39716 Loading using "etl" method with rejectlimit set to 0 Now let's do the same job above but let's create one dirty record. The first record in the file had a time stamp value that conforms to the declared dateformat: ohsh>set dateformat "yyyy-MM-dd:HH:mm:ss" The first record had a timestamp value compliant with the format above: 1363545,27205,9,2012-07-00:00:04:03,1,5,, We will change it to this, which should cause the row to be rejected: 1363545,27205,9,2012-07-00 00:04:03,1,5,, We copy it to another HDFS directory to load the file again, now specifying that we don't want to execute the "exttab" phase if one or more bad records were found.  The Hadoop map/reduce phase of the "etl" job will see the bad record and abort the Oracle side of the load operation.  The "exttab" phase will not be run. ohsh>set rejectlimit 0 ohsh>load oracle table olhp:movie_ratings from path  \   hadoop0:/user/rhanckel/moviedemo/movie_ratings/delimited_text_bad \   using etl The first phase begins on the Hadoop side, and the "etl" job discovers one record is bad. ----------------------------------------------------------------------------------- Begin ETL_ONLINE execution at 2017-02-24:12:04:07 ----------------------------------------------------------------------------------- ################################################################################### Starting SmartLoader Job OSL_170224_120407 Local OS user: rhanckel Hadoop user:   rhanckel ################################################################################### Loading Oracle table "OLHP"."MOVIE_RATINGS"   From HDFS files located at       /user/rhanckel/moviedemo/movie_ratings/delimited_text_bad   Using Oracle Loader for Hadoop (OLH) Datapump file output into HDFS   for subsequent OSCH load ----------------------------------------------------------------------------------- Hadoop MapReduce Configuration Settings ----------------------------------------------------------------------------------- mapreduce.am.max-attempts=2 mapreduce.app-submission.cross-platform=false mapreduce.client.completion.pollinterval=5000 Lots of Hadoop settings... yarn.app.mapreduce.client.job.retry-interval=2000 yarn.app.mapreduce.client.max-retries=3 yarn.app.mapreduce.task.container.log.backups=0 ----------------------------------------------------------------------------------- OLH Configuration Settings ----------------------------------------------------------------------------------- oracle.hadoop.loader.connection.tns_admin=/user/rhanckel/work oracle.hadoop.loader.connection.url=jdbc:oracle:thin:@inst1 oracle.hadoop.loader.connection.wallet_location=/user/rhanckel/work oracle.hadoop.loader.defaultDateFormat=yyyy-MM-dd:HH:mm:ss oracle.hadoop.loader.input.fieldNames=F0,F1,F2,F3,F4,F5,F6,F7 oracle.hadoop.loader.input.fieldTerminator=\u002c oracle.hadoop.loader.loaderMap."ACTIVITY".field=F5 oracle.hadoop.loader.loaderMap."CUSTID".field=F0 oracle.hadoop.loader.loaderMap."GENREID".field=F2 oracle.hadoop.loader.loaderMap."MOVIEID".field=F1 oracle.hadoop.loader.loaderMap."RATING".field=F6 oracle.hadoop.loader.loaderMap."RECOMMENDED".field=F4 oracle.hadoop.loader.loaderMap."SALES".field=F7 oracle.hadoop.loader.loaderMap."TIME".field=F3 oracle.hadoop.loader.loaderMap.columnNames="CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES" oracle.hadoop.loader.loaderMap.targetTable="OLHP"."MOVIE_RATINGS" oracle.hadoop.loader.logBadRecords=true oracle.hadoop.loader.rejectLimit=0 ----------------------------------------------------------------------------------- Oracle Database Connectivity Settings ----------------------------------------------------------------------------------- TNS usage: OLH job using client TNS path on hadoop cluster. Wallet usage: OLH job using client wallet path on hadoop cluster. ----------------------------------------------------------------------------------- Begin OLH execution at 2017-02-24:12:04:07 ----------------------------------------------------------------------------------- ----------------------------------------------------------------------------------- OLH MapReduce job is submitted to the Hadoop cluster to load an Oracle table using ETL_ONLINE ----------------------------------------------------------------------------------- [INFO] 2017-02-24 12:04:07,360 [oracle.hadoop.loader.OraLoader]  Oracle Loader for Hadoop Release 3.8.1 - Production  Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved. [INFO] 2017-02-24 12:04:07,360 [oracle.hadoop.loader.OraLoader]  Built-Against: hadoop-2.2.0 hive-0.13.0 avro-1.8.1 jackson-1.8.8 [INFO] 2017-02-24 12:04:07,671 [oracle.hadoop.loader.OraLoader]  oracle.hadoop.loader.enableSorting disabled, no sorting key provided [INFO] 2017-02-24 12:04:08,119 [hadoop.conf.Configuration.deprecation]  mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps [INFO] 2017-02-24 12:04:08,120 [hadoop.conf.Configuration.deprecation]  mapred.max.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize [INFO] 2017-02-24 12:04:08,157 [mapreduce.lib.input.FileInputFormat]  Total input paths to process : 1 [INFO] 2017-02-24 12:04:08,243 [mapreduce.lib.output.FileOutputCommitter]  File Output Committer Algorithm version is 1 [INFO] 2017-02-24 12:04:08,249 [mapreduce.lib.output.FileOutputCommitter]  File Output Committer Algorithm version is 1 [WARN] 2017-02-24 12:04:08,574 [oracle.hadoop.loader.OraLoaderMapper]  oracle.hadoop.loader.OraLoaderException: error parsing input data: Unparseable date: "2012-07-01 00:04:03" [WARN] 2017-02-24 12:04:08,579 [oracle.hadoop.loader.OraLoaderMapper]  skipping record [Offset 0 infile:///user/rhanckel/moviedemo/movie_ratings/delimited_text_bad/tkhp_moviedata_onebadrecord.csv] [INFO] 2017-02-24 12:04:12,632 [apache.hadoop.mapred.MapTask]  Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer [WARN] 2017-02-24 12:04:12,696 [oracle.hadoop.loader.OraLoaderMapper]  oracle.hadoop.loader.OraLoaderException: error parsing input data: Unparseable date: "2012-07-01 00:04:03" [WARN] 2017-02-24 12:04:12,697 [oracle.hadoop.loader.OraLoaderMapper]  skipping record [Offset 0 infile:/user/rhanckel/moviedemo/movie_ratings/delimited_text_bad/tkhp_moviedata_onebadrecord.csv] [INFO] 2017-02-24 12:04:15,565 [oracle.hadoop.loader.OraLoader]  Counters: 32         File System Counters                 FILE: Number of bytes read=47308629                 FILE: Number of bytes written=47170746                 FILE: Number of read operations=0                 FILE: Number of large read operations=0                 FILE: Number of write operations=0         Map-Reduce Framework                 Map input records=39716                 Map output records=39715                 Map output bytes=2540821                 Map output materialized bytes=2620281                 Input split bytes=159                 Combine input records=0                 Combine output records=0                 Reduce input groups=0                 Reduce shuffle bytes=524096                 Reduce input records=0                 Reduce output records=0                 Spilled Records=39715                 Shuffled Maps =1                 Failed Shuffles=0                 Merged Map outputs=1                 GC time elapsed (ms)=410                 Total committed heap usage (bytes)=424148992         Rows skipped by input error                 Parse Error=1                 Total rows skipped by input error=1         Shuffle Errors                 BAD_ID=0                 CONNECTION=0                 IO_ERROR=0                 WRONG_LENGTH=0                 WRONG_MAP=0                 WRONG_REDUCE=0         File Input Format Counters                 Bytes Read=1675410         File Output Format Counters                 Bytes Written=0 [WARN] 2017-02-24 12:04:15,580 [oracle.hadoop.balancer.Balancer]  Cannot save balancer state: Job did not complete or was not successful ----------------------------------------------------------------------------------- End OLH execution at 2017-02-24:12:04:15 ----------------------------------------------------------------------------------- The second phase of the "etl" method is aborted, avoiding the use of any Oracle resources trying to churn through a bad load. Error: oracle.hadoop.smartloader.api.SmartLoaderException: ETL load was aborted during the OLH phase and no rows were loaded into Oracle. Typically this is because the reject limit was exceeded. See the OLH job history for details. The load fails and aborts the "exttab" load into Oracle.  No Oracle system resources are wasted on a bad load. Loading using "etl deferred" method The "etl" model can be used to split the time when the map/reduce phase produces data pump files, and the time when "exttab" is used to actually load the data into Oracle.  Presumably the actual load to Oracle would be fired off by a user defined "cron" job.   This method is called "etl deferred".  The command itself has the same pattern as other Oracle table loading commands. ohsh>load oracle table olhp:movie_ratings from path \     hadoop0:/user/${USER}/moviedemo/movie_ratings/delimited_text \     using etl deferred scriptdir=/tmp The command above runs an OLH job which produces data pump files in Hadoop and then writes an OHSH script local to the system where OHSH is running which contains the script to finish the job of loading the table using "exttab". Note that the "etl deferred" method is followed by "scriptdir=.".  This clause is specific to "etl deferred" method, where "scriptdir" is a directory where the generated script to load the table using "exttab" is written.  In this example, the script will be generated in "/tmp" on the system where OHSH is run. The filename of the generated script will reflect the OHSH job name. e.g. /tmp/OSL_170226_160417_offline.ohsh The script contains the following, which when executed will complete the load job. ohsh>load oracle table olhp:MOVIE_RATINGS from path \   hadoop0:"/user/rhanckel/smartloader/jobhistory/oracle/OLHP/MOVIE_RATINGS/OSL_170226_160417" \   fileformat datapump  \   using exttab The HDFS path was the directory location the OLH phase used to create and populate data pump files containing the contents to be loaded into the MOVIE_RATINGS table. But how does OHSH or the caller know the name of the generated file (e.g. OSL_170226_160417_offline.ohsh)? After calling the "etl deferred" command above, the command will finish by setting an OHSH environmental variable called OHSH_LATEST_DEFERRED.  It has the name of the generated OHSH script.  This variable can be used after executing the "etl deferred" command by user logic to schedule the rest of the load job. Below is a full example where the %bash0 operations are used to to generate a wrapper OHSH file that can be picked up by a cron job ohsh>@/tmp/mdresources ohsh>%sql truncate table movie_ratings; ohsh>set outputlevel verbose ohsh>set dateformat "yyyy-MM-dd:HH:mm:ss" ohsh> load oracle table olhp:movie_ratings from path \     hadoop0:"/user/${USER}/moviedemo/movie_ratings/delimited_text"\     using etl deferred scriptdir=/tmp At this point OHSH has run an OLH job and produced data pump files living in HDFS.   It also has written the generated script OSL_170226_160417_offline.ohsh.  It contains the following one line OHSH command to load the generated data pump files using "exttab". load oracle table omovies:MOVIE_RATINGS from path  hadoop0:"/user/rhanckel/smartloader/jobhistory/oracle/OLHP/MOVIE_RATINGS/OSL_170226_160417/etldeferred" fileformat datapump  using exttab Getting back to the parent script above, we can add additional bash commands to create a wrapper OHSH script (i.e. "OSL_170226_160417_offline.ohsh.wrapper.ohsh") to set additional run-time context needed for the generated load command. %bash0 echo "@/tmp/mdresources" > /tmp/${OHSH_LATEST_DEFERRED}.wrapper.ohsh %bash0 echo "set dop 4" >> /tmp/${OHSH_LATEST_DEFERRED}.wrapper.ohsh %bash0 echo "@/tmp/${OHSH_LATEST_DEFERRED}" >>  /tmp/${OHSH_LATEST_DEFERRED}.wrapper.ohsh %bash0 echo "%sql select count(*) from movie_ratings;" >> /tmp/${OHSH_LATEST_DEFERRED}.wrapper.ohsh The wrapper code is an OHSH script that looks like this: @/tmp/mdresources set dop 4 @/tmp/OSL_170724_141307_offline.ohsh %sql select count(*) from movie_ratings; Invocation of the wrapper script can be done appending it to some file executed by crontab job for doing nightly loads. Assumptions when using the "etl" method This model works assuming the following: That the Oracle wallet is used for doing authentication That the resources and defaults that are needed have been properly declared within a wrapper OHSH script That the user will clean up the data pump files living in HDFS, after the load has been done

In this post we will discuss the "etl" method for loading tables.  It was briefly mentioned in earlier posts, but because is a hybrid of both Hadoop map/reduce processing and Oracle external tables it...

OHSH CLI: Moving Data from the Command Line

How to Load Oracle and Hive Tables using OHSH (Part 1 - Introduction)

This is a first of a series of blog posts that revisits an earlier set of tutorials about loading Oracle tables from Hadoop. The intention of this tutorial is to introduce a tool formally known as OHSH (Oracle Shell for Hadoop Loaders), nick named "ohshell".  This is a CLI that supports commands to load Oracle tables from data living in Hadoop, and to create, replace, or incrementally load data from Oracle tables into Hive.  OHSH is a Hadoop application which layers on three Oracle underlying products: Oracle Loader for Hadoop (OLH),  Oracle SQL Connector for HDFS (OSCH), and the Copy To Hadoop feature of Big Data SQL (CP2HADOOP).  The last technology has been more recent and was not discussed in the original tutorials. It will be explained in depth here. Why OHSH? OHSH was built for a few reasons.   One reason was to create a standardized declarative model that allows the user to say what he wants done rather than struggling with the complexities of setting the appropriate set of low level properties when submitting a load job.  OHSH gets simple input from the user and automatically generates the correct properties that are needed to do a particular type of load.    Another reason was to simplify and unify the usage of the underlying technologies to move data between Oracle and Hadoop.  This includes automating the mechanical steps that were otherwise left for the users to do on their own. For instance if you use OSCH directly, it will create an external table that maps to content living in HDFS, but you need to compose and execute a correct Oracle SQL insert statement that would load the Oracle target  table from the external table created by OSCH tool. When using external tables to export Oracle table content to Hadoop, the user has to create the CTAS external tables export statement, manually copying over data pump files to HDFS and manually creating a Hive external table that maps to the data pump files. OHSH does all of this work under the covers.  You don't have to create or manage Oracle external tables serving OSCH or CP2HADOOP, since they are created and deleted on-the-fly by OHSH as artifacts of a single load operation. Additionally it was realized that the process of loading goes beyond the actual load operation.  It typically involves a lot of ad hoc interaction  to prep for a load, sanity check the results, and do cleanup afterward.  This involves interacting with various resources beyond the underlying products OHSH manages.  This include tables living in Oracle schemas or Hive databases, and files and directories living in HDFS or on the local file system. One could resort to writing custom bash or perl scripts to do such activity, but they invariably are hard to read and maintain. OHSH defines a minimalist domain specific language whose focus is on what is needed to move data between Oracle and Hadoop.   So if you are loading an Oracle table from content living in Hadoop, or when loading a Hive table from content living in an Oracle table the user typically needs to identify the name of the Oracle table and the source of the content living in Hadoop (either a path to an HDFS directory or the name of a Hive table)  and the preferred method for loading. If you are loading or creating a Hive table from an Oracle table, the user typically needs to specify the Hive and Oracle databases, the names of the tables, and the preferred method of loading.   Actually, even specifying a method is optional, although at the time of this writing, choosing a method is highly recommended after one understands the pros and cons of the different methods offered.  The trade offs become clear once you start using the tool and can quickly test various methods of loading interactively. OHSH Dependencies OHSH runs on Linux and can run on the system running Oracle, or on a Hadoop cluster, or on an edge node that has client access to Oracle and to a Hadoop cluster.  Typical users of OHSH will be Oracle DBAs and developers who use Oracle SQL*Plus and Oracle SQL Loader on a regular basis.  OHSH is installed on Oracle BDA, and is available for download along with Oracle Big Data Connectors, on OTN and other download sites. General dependencies include the following: JDBC connectivity to one or more Oracle databases Hive client software Hadoop client software SQL*Plus client Linux OS and Bash Underlying Oracle loader dependencies include: Oracle Big Data Connectors for Hadoop (OLH and OSCH) for loading Oracle tables from data in Hadoop and/or Copy To Hadoop (CP2HADOOP) for loading Hive tables from data in Oracle tables OHSH, Oracle Big Data Connectors and Copy To Hadoop above are available on Big Data Cloud Service, installed and ready to use.    If Oracle Big Data Connectors are licensed with Oracle Big Data Appliance, OHSH and Oracle Big Data Connectors are installed and configured.  On other Hadoop clusters OHSH needs to be configured to point to the homes of OLH and OSCH and/or the home of  CP2HADOOP.  OHSH is tightly coupled to these underlying technologies, so you need to download and install the appropriate versions of the aforementioned Oracle loader dependencies. Note that SQL*Plus is installed on the BDA and on Oracle Database systems.  If you want to use OHSH on your own Hadoop cluster or on an edge node you should download and install Oracle Instant Client which is free and downloadable. Typical OHSH Users Moving large data sets (potentially terabytes) from Oracle to Hadoop or vice versa is a resource intensive operation which needs to be carefully managed especially for systems that are in production. In general OHSH is designed for DBAs and developers who are defining and managing schemas and tables, managing Oracle directories, and who spend a lot of time using SQL*Plus, SQL*Loader, and/or Data Pump utilities. As a convenience, OHSH allows one to integrate SQL*Plus commands within the context of an OHSH script. OHSH Integration with Hadoop, Hive, Bash, SQL*Plus resources OHSH is meant to integrate the usage of various tools that are needed to move and load data between Hadoop and Oracle.  The internals are split between an upper layer (ohsh.jar) which provides a shell like environment and CLI to harvest user input, and a loading engine (smartloader.jar) that does all the real work. The CLI layer of OHSH is basically a REPL loop (read, evaluate, print) that performs two levels of parsing.  The top level captures a complete opaque command, determines what resource needs to execute the command (BeeLine/Hive, Hadoop, Bash,SQL*Plus, or OHSH itself), performs environmental variable substitution, and then routes the command to the appropriate resource for execution.  If the command is an OHSH command, the command will be parsed at a finer level, input harvested, and then visits an execution engine to either manage state, or to execute some load operation. Non OHSH commands use a delegation operator "%" associated with the name of the command line resource to identify the resource that should execute a command. For example: ohsh>%hive0 show tables; sends a "show tables" command to BeeLine/Hive CLI. The implementation of a CLI that invokes other CLIs is somewhat unconventional, but here it makes sense.  When loading a table, one might need to create the table in Oracle before doing the load.  Rather than re-implementing the complexities of an Oracle CREATE TABLE command in OHSH, it makes sense to simply delegate the CREATE TABLE statement to SQL*Plus but keep the step in the workflow of the OHSH script orchestrating a load. The OHSH CLI also supports shell amenities such as command history tracking and bang operation, spooling, setting environmental variables, up-arrow down-arrow command line recall. As mentioned earlier, load commands require minimal user input.  The rest of the user input is provided to load commands using OHSH default settings.  Most of these settings need to be set once for your operational environment and should work across various load operations.  These include things like Oracle date masks, Oracle DOP, Hadoop reduce tasks and so on.   A Brief Tour Starting with Loading an Oracle table Let's look at an OHSH script that loads an Oracle table using a standard OHSH script pattern.  In this case we will be loading a table from HDFS delimited text files using OLH with OCI directpath load. ohsh>set echo on ohsh>@mdresources ohsh>%sql truncate table movie_ratings; ohsh>set reducetasks 18  ohsh>load oracle table omovies:movie_ratings from path  hadoop0:/user/${USER}/moviedemo/movie_ratings_delimitedtext  using directpath ohsh>%sql select count(*) from movie_ratings; The "set echo on" tells OHSH to echo statements that are executed in subordinate scripts (e.g. "@mdresources.ohsh"). The @mdresources.ohsh executes a subordinate script which creates three resources named "sql", "omovies", and "hmovies".  "sql" is user defined name given to a SQL*Plus resource that executes SQL*Plus commands in an Oracle user/schema called "MOVIEDEMO".  The "omovies" resource is a JDBC connection that connects to the same schema using JDBC.  The "omovies" JDBC resource is needed for doing load commands, while the "sql" resource is for executing statements in SQL*Plus.   "hmovies" is a Hive resource that connects to a Hive database called "moviedemo".   The "%sql truncated table movie_ratings;" command delegates a truncate table DDL statement to SQL*Plus for execution. The "set reducetasks 18" says that the reduce phase in Hadoop map/reduce which does the loading is restricted to using 18 reducers.  This is a Hadoop related tuning option. The "load oracle table omovies:movie_ratings ... using directpath" command is compiled by the SmartLoader layer of OHSH and is executed.  It will load all of the delimited text files living in the HDFS path "/user/${USER}/movie_ratings_delimitedtext" into a table called movie_ratings. The table lives in the Oracle schema "OLHP" using the name of the JDBC resource declared above.   The last statement delegates a command back to SQL*Plus to count the number of rows loaded into the table after it was truncated. Loading the Same Table with other OHSH Methods If one wanted to execute a Hadoop map/reduce load using the jdbc load method rather than OCI directpath (for example, if the Oracle table is not partitioned, as OCI directpath requires the Oracle table to be partitioned), one would simply change the load method above (i.e. "directpath") to "jdbc".  The jdbc load method is typically slower than OCI directpath. ohsh>load oracle table omovies:movie_ratings from path      hadoop0:/user/${USER}/moviedemo/movie_ratings_delimitedtext      using jdbc Everything else is the same. If one wants to execute a load using an Oracle external table, a script would want to set the preferred DOP and then execute the same command, only using the "exttab" method. ohsh> set dop 18 ohsh>load oracle table omovies:movie_ratings from path      hadoop0:/user/${USER}/moviedemo/movie_ratings_delimitedtext      using exttab It is worth pointing out that Oracle table loads using Hadoop map/reduce versus loads using Oracle external tables have very different execution strategies, but the commands to do both are declarative and are almost identical.  This makes it easy to test all methods, understand what is going on under the covers, and decide which method suits your use case and environment. Loading Oracle tables from Hive Tables Loading data from Hive tables is similar to the commands above, only the source of data is from a named Hive table (in this case living in a Hive "moviedemo" database).  This is a load oracle table command from a Hive table using the "directpath" method. ohsh>set dateformat "yyyy-MM-dd" ohsh>load oracle table omovies:movie_ratings from hive table hmovies:movie_ratings using directpath  This is a similar command using OSCH. ohsh>set datemask "YYYY-MM-DD" ohsh>load oracle table omovies:movie_ratings from hive table hmovies:movie_ratings using exttab Note that there are different date and time defaults used when using "directpath" or "exttab".  (This will get explained in depth in later blog posts that do a deep dive into all these mechanisms, along with some recommendations as to best practices.)   Creating or Loading Hive tables from Oracle tables OHSH supports creating, create or replacing, replacing, or loading Hive tables from content in Oracle tables.    ohsh>create hive table hive0:movie_ratings_oracle \           from oracle table olhp:movie_ratings \           hiverootdir /user/${USER}/moviedemo/movie_ratings_datapump \           using directcopy This is loading a hive table called oracle_movieratings from the Oracle table movie_ratings using a method called "directcopy".  The "hiverootdir" is a root directory under which data files of copied Oracle tables are stored and managed. Next Steps That completes our nickel tour.  The next tutorial will get into the nuts and bolts of configuring the tool and how to take advantage of its ease of use features.  This will be followed by more tutorials looking at the tool in depth and focusing on loading Oracle tables and Hive tables, and explaining special cases such as loading Hive partitions, using loader maps to arbitrarily map table columns to data, and how to ensure that loads into the Oracle database won't happen unless the data is guaranteed to be clean for loading.  

This is a first of a series of blog posts that revisits an earlier set of tutorials about loading Oracle tables from Hadoop. The intention of this tutorial is to introduce a tool formally known as OHSH...

Connecting Hadoop with Oracle

How to Load Oracle and Hive Tables Using OHSH (Part 4 - Loading Hive Tables)

  In Part 3 we described how to use OHSH to load Oracle tables with content living in either HDFS files or in Hive tables.  In this tutorial we focus on how to use OHSH to do the opposite: create, replace, and incrementally load Hive tables with content living in Oracle tables.  To use OHSH to do this you need to download and configure Copy To Hadoop feature of Big Data SQL (CP2HADOOP) as described in Part 1 and Part 2 of this series of tutorials. When launching OHSH, the banner should tell you that Copy to Hadoop is enabled.  (If the banner does not include "Copy to Hadoop" OHSH is not configured correctly for these types of operations.) $ohsh Oracle Shell for Hadoop Loaders Release 1.2.0 - Production   (Build:20161214114823)  Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. Oracle Loader for Hadoop (3.8.0), Oracle SQL Connector for HDFS (3.7.0), The Copy to Hadoop feature of Oracle Big Data SQL (3.1.0) enabled. Copying data from Oracle to Hadoop can be done using three methods: "stage", "fuse", or "directcopy". The "stage" and "fuse" methods push data from Oracle into Hadoop, while "directcopy" pulls data from Oracle using a Hadoop Map job.  The "stage" method is restricted and typically only works when you are running OHSH on the Oracle system. All of the methods produce Oracle data pump files in an HDFS directory and create a Hive external table that points to the data pump files.  The HDFS directory where these files are placed must be readable from Hive.  The Hive external table uses an Oracle SERDE implementation called "oracle.hadoop.hive.datapump.DPSerde".  This serde can open and read Oracle data pump files. Note that in this discussion the term "external table" can refer to either Oracle external tables used to export data from Oracle as data pump files or Hive external tables that are used to read data pump files in Hive.  Conceptually external tables in Oracle and Hive serve the same purpose, mapping a table definition to data controlled and managed by users, not by Hive or Oracle internal storage. OHSH Copy To Hadoop Operations OHSH supports four types of operations when loading content from Oracle to Hive: create  - creates an external table in Hive with content from an Oracle table replace - replaces an external table in Hive with content from an Oracle table create or replace - creates or replaces an external table in Hive with content from an Oracle table load - incrementally load an existing external Hive table with content from an Oracle table When doing a "create" operation, OHSH assumes that the external table in Hive being created is from scratch.  Basically a "create" will fail if the table already exists or the HDFS directory where the table data would be stored exists. When doing a "replace" operation, OHSH drops an existing external Hive table and the related storage directory in HDFS.  It then proceeds to do a standard "create" operation.  If it can't drop the Hive table and cannot find and delete the HDFS storage directory containing table data, the operation doesn't proceed and returns with an error. The "create or replace" operation is more forgiving and drops up an existing table if it exists, and deletes the related storage directory if it exists.  It then proceeds with a normal "create" operation.  This is what you want to call if a "create" or a "replace" fails and leaves things in an inconsistent state and don't want to deal with the hassle of cleaning up Hive state. The "load" operation assumes a Hive external table exists and the related directory containing table data exists.  It simply adds data from an existing Oracle table to the data files already living under a Hive table. Why does OHSH support all these operations but only supports the load operation for loading Oracle tables?  Because normal tables in Oracle are not external and are an order of magnitude more sophisticated.  A "CREATE TABLE" command for an Oracle table has all sorts of powerful and fascinating options that are expressed in Oracle DDL.  Create and replacement of Oracle tables should be delegated to OHSH call outs to SQL*Plus. How "directcopy" works "directcopy" is the simple and preferred model both in operating complexity and security. It kicks off a Hadoop map-only job that connects to the Oracle data base, decomposes an Oracle table into table splits and reads splits from Oracle, writing the content into data pump files in HDFS into the directory that serves the Hive table.  The security is simpler because it eliminates the stage directory involved (as is the case with "stage" or "fuse" methods).  The OHSH user just needs to provide the credentials to connect to the Oracle schema that can read the table and needs to have HDFS privileges to write to the target HDFS directory. How "stage" and "fuse" methods work The "stage" and "fuse" methods work by producing data pump files using Oracle external tables in an Oracle CTAS (i.e. "CREATE TABLE AS SELECT") statement.   Typically you will use these methods only when the Hadoop cluster cannot connect to the Oracle database because of firewall and security issues, which prevents you from using "directcopy".  When the CTAS statement is executed, Oracle writes table data to files living in an Oracle directory object as data pump export files.  The name of the Oracle directory object is specified in OHSH default "locationdirectory", and this setting becomes external location directories used for exporting data out of Oracle.  From Oracle's perspective all this activity appears local to the system where Oracle is running.  Note that this directory serves an interim scratch area to land data pump files which will then be copied or moved to the user specified final destination someplace in the Hadoop cluster. The number of data pump files created is typically equal to the OHSH "dop" setting.  This enables writing of data pump files in parallel in the CTAS statement.  If the payload is very small then it is possible that fewer data pump files will be created. When using the "stage" method this scratch directory is readable and writable from the file system local to the Oracle database as a local OS directory.  When using "fuse" the scratch directory references an HDFS fuse mount to a directory living directly in HDFS.  This means that the Oracle external tables produce data pump files directly in HDFS.   Oracle is unaware of this.  It is naively writing to what appears to be a local directory but in-fact is a mount point into HDFS. Once the data pump files are written by Oracle to this scratch directory, OHSH will create a sub-folder living directly under a user specified Hive directory which will serve as the final destination.  OHSH will then move the files to the final destination in HDFS.  If the "stage" method is being used, the move operation involves copying the files into HDFS and deleting the interim copies in the scratch directory living on the Oracle system.  For the "fuse" method, the move is done by HDFS.  It is lightweight operation simply changing the HDFS path to the existing data pump files. The final step involves telling Hive to create a Hive external table to point to the HDFS directory containing the data pump files.  (This step is omitted if the operation is "load".) Comparing "stage" and "fuse" The "fuse" method is more efficient than the "stage" method because the data pump files are written once directly into HDFS.  The "stage" method writes data pump files twice, once to an OS file path visible to Oracle, and then to a directory living in HDFS. Oracle writes to the Oracle directory object specified in OHSH as the "locationdirectory".  (For those not familiar with Oracle, an Oracle directory object is a named object in Oracle that maps to some OS directory accessible to the Oracle system.)  This directory path needs to be readable and writable by both Oracle and the end user running OHSH. For "stage", this directory is either a local disk on the Oracle host (e.g. "/tmp/ohshscratch") or an NFS mount (e.g. "/mnt/shared/tmp/ohshscratch"). If the directory is local to the Oracle host and you want to use the "stage" method you will need to run OHSH on the Oracle host.  If the directory is an NFS mount, you can run "stage" anywhere as long as the directory path (e.g. "/mnt/shared/tmp/ohshscratch") is visible and has read and write access for Oracle and the OHSH user. For "fuse", the Fuse HDFS mount point (e.g. "/mnt/hdfs/tmp/ohshscratch") needs to be readable and writable by the Oracle host, and the underlying HDFS path (e.g. "/tmp/ohshscratch") needs to be HDFS readable and writable by the OHSH user. Obviously, both "stage" and "fuse" methods impose administration and security complexity if you are not running OHSH on the Oracle system as user "oracle".  It involves configuring directories that are writable and readable by both Oracle and by the OHSH user.  This requires some workable combination of user and group file privileges on local disk or shared storage. Making data pump files readable by Hive Regardless of which method is used to create data pump files in an HDFS directory, Hive needs to be able to read them when queries are issued against the Hive external table. The data pump files and the directory they live in are owned by the OHSH user in HDFS.  By default the data pump files landed in Hadoop are given user only (i.e. 700) file permissions. If Hive impersonation is enabled, Hive can read the files because it is accessing the files under the identity of the OHSH user. If Hive impersonation is not enabled, OHSH will change permissions at the end of the load to ensure the files are readable by the user identity of the HiveServer2 process (i.e. typically "hive"). OHSH detects whether or not Hive impersonation is enabled or not.  If it is not enabled, then OHSH will take administration steps to allow the HiveServer2 process to read the files either by adding an HDFS ACL to the directory, or by relaxing the directory permissions to 750.  For the latter case, the primary group of the OHSH user in HDFS needs to include the user identity of the HiveServer process. Examples of loading using "directcopy" and "stage" methods The Oracle table we are copying to Hive looks like this: ohsh>%sql describe movie_ratings;  Name                                      Null?    Type  ----------------------------------------- -------- ----------------------------  CUSTID                                             NUMBER  MOVIEID                                            NUMBER  GENREID                                            NUMBER  TIME                                                 DATE  RECOMMENDED                                    NUMBER  ACTIVITY                                           NUMBER  RATING                                             NUMBER  SALES                                              NUMBER Composing a "create or replace" Hive Table command The OHSH "create or replace hive table" command requires identifying the following: The name of the Oracle table to be copied and the associate OHSH JDBC and SQL*Plus resources that access that table. The name of the table to be created in Hive and the associated Hive resource that accesses the Hive database where the table is to be created. Identifying a root directory in HDFS which is writable by the user running OHSH and serves as a repository for tables copied from Oracle to Hive.  This is the directory tree where the data pump files will live.  It can be specified by the OHSH default "hiverootdir", which can be overridden in the "create hive table" command. For "stage" or "fuse" methods, setting the OHSH default "locationdirectory" to the name of the Oracle directory object that serves as a scratch directory where Oracle can write data pump files.  (If running the "stage" method this directory must be an OS directory visable to both Oracle and the OHSH user. If running the "fuse" method it must be mountable HDFS Fuse directory visible to Oracle.  The directories must be readable and writable by both Oracle and the OHSH user.) OHSH will want to exclusively manage sub-directories underneath "hiverootdir".  When copying a table from Oracle OHSH will create a folder under the "hiverootdir" which will reflect the Hive database and Hive table name whose data has been copied from Oracle to Hadoop. Before running any of these methods, one needs to define OHSH resources.  For "sql" and "omovies" (i.e. SQL*Plus and Oracle JDBC)  we are using a TNS alias "inst1" to identify the Oracle database and relying on Oracle Wallet to do authentication transparently. ohsh>create sqlplus resource sql connectid="inst1" ohsh>create oracle jdbc resource omovies connectid="inst1" We then define "hmovies" to connect to Hive and use the "moviedemo" database. ohsh>create hive resource hmovies connectionurl="jdbc:hive2:///moviedemo" In all methods below we will create or replace a Hive table called "movie_ratings_oracle" living in a Hive database called "moviedemo".   The actual data pump files backing up this Hive table will live in this HDFS folder:  /user/rhanckel/moviedemo/movie_ratings/moviedemo.movie_ratings_oracle.   The "hiverootdir" will be set to /user/rhanckel/moviedemo/movie_ratings.  The methods will create and manage the subfolder underneath that identifies the Hive database and table (i.e. moviedemo.movie_ratings_oracle). Creating a Hive table from an Oracle table using "directcopy" For "directcopy", you need to define the number of data pump files you want to be created, using the "set dpfilecount" command. ohsh>set outputlevel verbose ohsh>set dpfilecount 4 ohsh>set hiverootdir /user/rhanckel/moviedemo/movie_ratings ohsh>create or replace hive table hmovies:movie_ratings_oracle from oracle table omovies:movie_ratings using directcopy The verbose output reflects a Hadoop map-only job which is reading Oracle data and producing the data pump files in HDFS.  When the map-only job finishes, the Hive external tables is created by pointing to the HDFS storage directory containing the data pump files and telling Hive to use oracle.hadoop.hive.datapump.DPSerDe. Creating or replacing Hive table moviedemo.movie_ratings_oracle   From Oracle table "OLHP"."MOVIE_RATINGS"   Using direct copy to move content to Hive for transport to HDFS path /user/rhanckel/moviedemo/movie_ratings/moviedemo.movie_ratings_oracle -------------------------------------------------------------------------------------------------------------------------------- Oracle Database Connectivity Settings -------------------------------------------------------------------------------------------------------------------------------- TNS usage: CTOH (directcopy) job using client TNS path on hadoop cluster. Wallet usage: CTOH (directcopy) job using client wallet path on hadoop cluster. -------------------------------------------------------------------------------------------------------------------------------- Checking Hive metastore and preparing HDFS directory that will store exported data pump files -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- Dropping existing Hive table -------------------------------------------------------------------------------------------------------------------------------- drop table moviedemo.movie_ratings_oracle; scan complete in 3ms Connecting to jdbc:hive2:///moviedemo; Connected to: Apache Hive (version 1.1.0-cdh5.8.0) Driver: Hive JDBC (version 1.1.0-cdh5.8.0) Transaction isolation: TRANSACTION_REPEATABLE_READ OK No rows affected (2.758 seconds) Beeline version 1.1.0-cdh5.8.0 by Apache Hive Closing: 0: jdbc:hive2:///moviedemo; Deleting existing storage directory /user/rhanckel/moviedemo/movie_ratings/moviedemo.movie_ratings_oracle Creating storage directory /user/rhanckel/moviedemo/movie_ratings/moviedemo.movie_ratings_oracle with permission 700. -------------------------------------------------------------------------------------------------------------------------------- CTOH map-only job will be submitted to the Hadoop cluster to load a Hive table using DIRECTCOPY -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- Hadoop MapReduce Configuration Settings -------------------------------------------------------------------------------------------------------------------------------- mapreduce.am.max-attempts=2 mapreduce.app-submission.cross-platform=false mapreduce.client.completion.pollinterval=5000 mapreduce.client.genericoptionsparser.used=true mapreduce.client.output.filter=FAILED mapreduce.client.progressmonitor.pollinterval=1000 mapreduce.client.submit.file.replication=10 Lots of Hadoop configuration settings... yarn.app.mapreduce.client.job.retry-interval=2000 yarn.app.mapreduce.client.max-retries=3 yarn.app.mapreduce.shuffle.log.backups=0 yarn.app.mapreduce.shuffle.log.limit.kb=0 yarn.app.mapreduce.shuffle.log.separate=true yarn.app.mapreduce.task.container.log.backups=0 -------------------------------------------------------------------------------------------------------------------------------- CTOH Configuration Settings -------------------------------------------------------------------------------------------------------------------------------- oracle.hadoop.ctoh.connection.tnsAdmin=/user/rhanckel/oracle/work oracle.hadoop.ctoh.connection.username="OLHP" oracle.hadoop.ctoh.connection.walletLoc=/user/rhanckel/oracle/work oracle.hadoop.ctoh.datapump.basename=OSL_170707_215753 oracle.hadoop.ctoh.datapump.extension=.dmp oracle.hadoop.ctoh.datapump.output=/user/rhanckel/moviedemo/movie_ratings/moviedemo.movie_ratings_oracle oracle.hadoop.ctoh.home=/user/rhanckel/oracle/work/kits_scratch/orahivedp-3.1.3 oracle.hadoop.ctoh.jdbc.url=jdbc:oracle:thin:@inst1 oracle.hadoop.ctoh.maxSplits=4 oracle.hadoop.ctoh.splitterType=BLOCK_SPLITTER oracle.hadoop.ctoh.table="OLHP"."MOVIE_RATINGS" -------------------------------------------------------------------------------------------------------------------------------- Begin CTOH map-only execution on Hadoop -------------------------------------------------------------------------------------------------------------------------------- [INFO] 2017-07-07 21:58:18,809 [oracle.hadoop.ctoh.CtohDriver]  ctoh-conf resource: jar:file:/net/slc03lfz/scratch/rhanckel/view_storage/rhanckel_hadoop15/work/kits_scratch/orahivedp-3.1.3/jlib/orahivedp.jar!/oracle/hadoop/ctoh/ctoh-conf.xml [INFO] 2017-07-07 21:58:19,463 [oracle.hadoop.ctoh.CtohDriver]  Schema: OLHP Table: MOVIE_RATINGS [INFO] 2017-07-07 21:58:19,947 [hadoop.conf.Configuration.deprecation]  session.id is deprecated. Instead, use dfs.metrics.session-id [INFO] 2017-07-07 21:58:19,948 [hadoop.metrics.jvm.JvmMetrics]  Initializing JVM Metrics with processName=JobTracker, sessionId= [INFO] 2017-07-07 21:58:20,797 [hadoop.ctoh.split.DBParallelSplitFactory]  Minimum number of chunks 3 < 4 [INFO] 2017-07-07 21:58:21,055 [hadoop.ctoh.split.DBParallelSplitFactory]  Number of Chunks: 6 [INFO] 2017-07-07 21:58:21,196 [apache.hadoop.mapreduce.JobSubmitter]  number of splits:4 [INFO] 2017-07-07 21:58:21,266 [apache.hadoop.mapreduce.JobSubmitter]  Submitting tokens for job: job_local1250038248_0001 ...  SELECT /*+ no_parallel */ "CUSTID", "MOVIEID", "GENREID", "TIME", "RECOMMENDED", "ACTIVITY", "RATING", "SALES" FROM "OLHP"."MOVIE_RATINGS" WHERE (ROWID BETWEEN ? and ?) [INFO] 2017-07-07 21:58:24,230 [oracle.hadoop.ctoh.OjdbcRecordReader]  Query for Split: SELECT /*+ no_parallel */ "CUSTID", "MOVIEID", "GENREID", "TIME", "RECOMMENDED", "ACTIVITY", "RATING", "SALES" FROM "OLHP"."MOVIE_RATINGS" WHERE (ROWID BETWEEN ? and ?) ... [INFO] 2017-07-07 21:58:24,230 [oracle.hadoop.ctoh.OjdbcRecordReader]  bindings Length=2 [INFO] 2017-07-07 21:58:29,108 [apache.hadoop.mapred.LocalJobRunner]  map task executor complete. [INFO] 2017-07-07 21:58:29,618 [apache.hadoop.mapreduce.Job]   map 100% reduce 0% [INFO] 2017-07-07 21:58:29,619 [apache.hadoop.mapreduce.Job]  Job job_local1250038248_0001 completed successfully [INFO] 2017-07-07 21:58:29,654 [apache.hadoop.mapreduce.Job]  Counters: 16     File System Counters         FILE: Number of bytes read=159695873         FILE: Number of bytes written=165760746         FILE: Number of read operations=0         FILE: Number of large read operations=0         FILE: Number of write operations=0     Map-Reduce Framework         Map input records=39716         Map output records=39716         Input split bytes=1196         Spilled Records=0         Failed Shuffles=0         Merged Map outputs=0         GC time elapsed (ms)=11         Total committed heap usage (bytes)=956301312     oracle.hadoop.ctoh.         rowCountCounter=39716     File Input Format Counters         Bytes Read=0     File Output Format Counters         Bytes Written=2598942 [INFO] 2017-07-07 21:58:29,690 [oracle.hadoop.ctoh.CtohDriver]  Pattern for transferring files OSL_170707_215753*.dmp [INFO] 2017-07-07 21:58:29,701 [oracle.hadoop.ctoh.CtohDriver]  Number of files to transfer: 4 [INFO] 2017-07-07 21:58:29,703 [oracle.hadoop.ctoh.CtohDriver]  Moved File OSL_170707_215753-m-00000.dmp [INFO] 2017-07-07 21:58:29,703 [oracle.hadoop.ctoh.CtohDriver]  Moved File OSL_170707_215753-m-00001.dmp [INFO] 2017-07-07 21:58:29,704 [oracle.hadoop.ctoh.CtohDriver]  Moved File OSL_170707_215753-m-00002.dmp [INFO] 2017-07-07 21:58:29,705 [oracle.hadoop.ctoh.CtohDriver]  Moved File OSL_170707_215753-m-00003.dmp -------------------------------------------------------------------------------------------------------------------------------- End CTOH map-only execution on Hadoop -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- Creating the Hive table -------------------------------------------------------------------------------------------------------------------------------- add jars /user/rhanckel/oracle/work/kits_scratch/orahivedp-3.1.3/jlib/orahivedp.jar /user/rhanckel/oracle/work/kits_scratch/orahivedp-3.1.3/jlib/oraloader.jar /user/rhanckel/oracle/work/kits_scratch/orahivedp-3.1.3/jlib/ojdbc7.jar /user/rhanckel/oracle/work/kits_scratch/orahivedp-3.1.3/jlib/orai18n.jar /user/rhanckel/oracle/work/kits_scratch/orahivedp-3.1.3/jlib/ora-hadoop-common.jar;CREATE EXTERNAL TABLE moviedemo.movie_ratings_oracle   ROW FORMAT SERDE 'oracle.hadoop.hive.datapump.DPSerDe'   STORED AS   INPUTFORMAT 'oracle.hadoop.hive.datapump.DPInputFormat'   OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'   LOCATION '/user/rhanckel/moviedemo/movie_ratings/moviedemo.movie_ratings_oracle'; Shutting down embedded metastore database. scan complete in 3ms Connecting to jdbc:hive2:///moviedemo; Connected to: Apache Hive (version 1.1.0-cdh5.8.0) Driver: Hive JDBC (version 1.1.0-cdh5.8.0) Transaction isolation: TRANSACTION_REPEATABLE_READ No rows affected (0.123 seconds) OK No rows affected (1.731 seconds) Beeline version 1.1.0-cdh5.8.0 by Apache Hive Closing: 0: jdbc:hive2:///moviedemo; -------------------------------------------------------------------------------------------------------------------------------- Hive table creation successful -------------------------------------------------------------------------------------------------------------------------------- All load processing has completed. Load operation OSL_170707_215753 successful. Load time = 0:00:49 -------------------------------------------------------------------------------------------------------------------------------- End of CREATE_OR_REPLACE Hive table execution at 2017-07-07:21:58:43 -------------------------------------------------------------------------------------------------------------------------------- Creating a Hive table from an Oracle table using "stage" or "fuse" The processing steps for "stage" and "fuse" are almost identical.   The following describes the processing of the "stage" method. ohsh>set outputlevel verbose ohsh>set locationdirectory MOVIEDEMO_STAGE_DIR ohsh>set hiverootdir /user/rhanckel/moviedemo/movie_ratings ohsh>create or replace hive table hmovies:movie_ratings_oracle from oracle table omovies:movie_ratings using stage The verbose output below describes the steps being processed. The existing external table is dropped and the related HDFS storage directory is deleted Oracle exports new data pump file from an Oracle table to an OS stage directory The data pump files are then moved to a final HDFS directory underneath the "hiverootdir" directory which reflects the database qualified name of the Hive table   A Hive external table is created to point to the data pump contents living in this directory Artifacts of the operation that are transient (e.g. the Oracle external table used to write data pump files) are deleted or dropped at the end of the operation Creating or replacing Hive table moviedemo.movie_ratings_oracle   From Oracle table "OLHP"."MOVIE_RATINGS"   Using a local stage directory to move content to Hive for transport to HDFS path /user/rhanckel/moviedemo/movie_ratings/moviedemo.movie_ratings_oracle -------------------------------------------------------------------------------------------------------------------------------- Checking Hive metastore and preparing HDFS directory that will store exported data pump files -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- Dropping existing Hive table -------------------------------------------------------------------------------------------------------------------------------- drop table moviedemo.movie_ratings_oracle; scan complete in 2ms Connecting to jdbc:hive2:///moviedemo; Connected to: Apache Hive (version 1.1.0-cdh5.8.0) Driver: Hive JDBC (version 1.1.0-cdh5.8.0) Transaction isolation: TRANSACTION_REPEATABLE_READ OK No rows affected (2.33 seconds) Beeline version 1.1.0-cdh5.8.0 by Apache Hive Closing: 0: jdbc:hive2:///moviedemo; Deleting existing storage directory /user/rhanckel/moviedemo/movie_ratings/moviedemo.movie_ratings_oracle Creating storage directory /user/rhanckel/moviedemo/movie_ratings/moviedemo.movie_ratings_oracle with permission 700. -------------------------------------------------------------------------------------------------------------------------------- Writing data pump files containing Oracle table content -------------------------------------------------------------------------------------------------------------------------------- ALTER SESSION FORCE PARALLEL DML PARALLEL 4 ALTER SESSION FORCE PARALLEL QUERY PARALLEL 4 CREATE TABLE "OLHP"."OSL_170707_092253_EXT" ORGANIZATION EXTERNAL (TYPE ORACLE_DATAPUMP  DEFAULT DIRECTORY "MOVIEDEMO_DEFAULT_DIR" LOCATION ( "MOVIEDEMO_STAGE_DIR":'OSL_170707_092253-0.dmp', "MOVIEDEMO_STAGE_DIR":'OSL_170707_092253-1.dmp', "MOVIEDEMO_STAGE_DIR":'OSL_170707_092253-2.dmp', MOVIEDEMO_STAGE_DIR":'OSL_170707_092253-3.dmp')) PARALLEL 4 AS SELECT  * FROM "OLHP"."MOVIE_RATINGS" -------------------------------------------------------------------------------------------------------------------------------- Moving location file /user/rhanckel/oracle/work/cp2bda_stage/OSL_170707_092253-0.dmp from locally staged directory to HDFS directory /user/rhanckel/moviedemo/movie_ratings/moviedemo.movie_ratings_oracle -------------------------------------------------------------------------------------------------------------------------------- ... -------------------------------------------------------------------------------------------------------------------------------- Moving location file /user/rhanckel/oracle/work/cp2bda_stage/OSL_170707_092253-3.dmp from locally staged directory to HDFS directory /user/rhanckel/moviedemo/movie_ratings/moviedemo.movie_ratings_oracle -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- Creating the Hive table -------------------------------------------------------------------------------------------------------------------------------- add jars /user/rhanckel/oracle/work/kits_scratch/orahivedp-3.1.3/jlib/orahivedp.jar /user/rhanckel/oracle/work/kits_scratch/orahivedp-3.1.3/jlib/oraloader.jar /user/rhanckel/oracle/work/kits_scratch/orahivedp-3.1.3/jlib/ojdbc7.jar /user/rhanckel/oracle/work/kits_scratch/orahivedp-3.1.3/jlib/orai18n.jar /user/rhanckel/oracle/work/kits_scratch/orahivedp-3.1.3/jlib/ora-hadoop-common.jar;CREATE EXTERNAL TABLE moviedemo.movie_ratings_oracle   ROW FORMAT SERDE 'oracle.hadoop.hive.datapump.DPSerDe'   STORED AS   INPUTFORMAT 'oracle.hadoop.hive.datapump.DPInputFormat'   OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'   LOCATION '/user/rhanckel/moviedemo/movie_ratings/moviedemo.movie_ratings_oracle'; [WARN] 2017-07-07 09:23:16,064 [hadoop.hive.conf.HiveConf]  HiveConf of name hive.metastore.local does not exist Shutting down embedded metastore database. Connected to: Apache Hive (version 1.1.0-cdh5.8.0) Driver: Hive JDBC (version 1.1.0-cdh5.8.0) Transaction isolation: TRANSACTION_REPEATABLE_READ No rows affected (0.114 seconds) OK No rows affected (1.646 seconds) Beeline version 1.1.0-cdh5.8.0 by Apache Hive Closing: 0: jdbc:hive2:///moviedemo; -------------------------------------------------------------------------------------------------------------------------------- Hive table creation successful -------------------------------------------------------------------------------------------------------------------------------- All load processing has completed. Load operation OSL_170707_092253 successful. Load time = 0:00:35 -------------------------------------------------------------------------------------------------------------------------------- End of CREATE_OR_REPLACE Hive table execution at 2017-07-07:09:23:29 -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- Drop Oracle external table used for exporting content. -------------------------------------------------------------------------------------------------------------------------------- DROP TABLE "OLHP"."OSL_170707_092253_EXT" PURGE ohsh>set outputlevel minimal ohsh>%moviedemo select count(*) from movie_ratings_oracle; +--------+--+ |  _c0   | +--------+--+ | 39716  | +--------+--+ Incrementally loading a Hive table from an Oracle table Once a Hive table has been created from an Oracle table, how do you add new rows that have been inserted into the source table?  First you need to figure out the appropriate where clause that identifies the new rows. Let's say the Oracle source table has new rows not copied to the Hive table and these rows all have a "TIME" column value greater than or equal to October 12, 2012.  You can issue the OHSH load command, using any of the methods (i.e. "fuse", "stage", or "directcopy") and inject the new rows into the data living under the target Hive table. ohsh>load hive table hmovies:movie_ratings_oracle \      from oracle table omovies:movie_ratings using stage \      where "(time >= '01-OCT-12')" The load operation would be the usual steps of creating the data pump files and copying them to the HDFS storage directory supporting the Hive external table. This works for appending new rows, not for updated existing rows that are already in the Hive external table. Similar syntax syntax works for "directcopy" and "fuse" ohsh>load hive table hmovies:movie_ratings_oracle \      from oracle table omvoies:movie_ratings using directcopy \      where "(time >= '01-OCT-12')" Best practices for loading Hive tables using OHSH When possible use directcopy. It is the simplest and most direct method for moving data and avoids the configuration complexities of "stage" and "fuse".  The case where "stage" and "fuse" make  sense is when the Hadoop cluster cannot connect to the Oracle database because of network firewall constraints.  In such cases the data needs to be pushed from Oracle to Hadoop. When using "stage" or "fuse" set the OHSH default "dop" to the largest dop value you are allowed to use in Oracle, to maximum the parallelism of exporting data to data pump files Security models work best when they are simple to implement and understand.   To simplify security and configuration in production environments one good option is to run OHSH on the Oracle host as the OS user running Oracle (e.g. "oracle") and propagate data to HDFS living under the same OS user identity. This keeps the data files under "oracle" control.  If HDFS ACLs are enabled OHSH will allow read access to the "oracle" owned data pump files to the owner of the Hive server process (e.g. "hive").  Access to the data can be controlled by Hive "GRANT SELECT" statements to a set of privileged users.  This side-steps problems having to deal with clumsy group file privileges in HDFS.

  In Part 3 we described how to use OHSH to load Oracle tables with content living in either HDFS files or in Hive tables.  In this tutorial we focus on how to use OHSH to do the opposite: create,...

Connecting Hadoop with Oracle

How to Load Oracle and Hive Tables with OHSH (Part 3 - Loading Oracle Tables)

In this tutorial we are going to talk in depth about loading Oracle tables with content from HDFS files or from Hive tables.  The focus will be on content that is stored as delimited text, but we will talk about loading other forms of content (e.g. parquet). The Oracle target table and sources of data The Oracle table we will be loading lives in a schema called "MOVIEDEMO" and is called MOVIE_RATINGS with various attributes relating customers to movies they saw and other related information.  This is a hash partitioned table where the hash is on the ACTIVITY column. SQL> describe movie_ratings; Name                                      Null?    Type ----------------------------------------- -------- ---------------------------- CUSTID                                            NUMBER MOVIEID                                          NUMBER GENREID                                        NUMBER TIME                                                   DATE RECOMMENDED                           NUMBER ACTIVITY                                          NUMBER RATING                                             NUMBER SALES                                               NUMBER We will be loading data from HDFS files containing delimited text and from the following Hive table whose storage is also delimited text.  The HDFS file live under the path /user/${USER}/moviedata.   The rows look like this: 1129727,500,3,2012-07-01:00:30:38,1,5,, 1363545,27205,9,2012-07-01:00:35:09,1,11,,3.99 1036191,1149812,17,2012-07-01:00:37:35,0,4,, 1152235,9346,6,2012-07-01:00:39:49,1,2,, The table definition of the Hive table looks like this: hive> describe movie_ratings; OK custid                   int                                         movieid                int                                           genreid                int                                           time                     string                                        recommended     int                                         activity                  int                                         rating                    int                                         sales                    float                                       Time taken: 0.832 seconds, Fetched: 8 row(s) The Hive delimited text looks a little different from the HDFS text above.  By default, Hive uses the convention of "\N" for nulls, and uses a space character for field delimiters. 1402363 11547   6       2012-08-17:12:38:32     1       5       \N      \N 1102408 497     3       2012-08-17:12:40:35     1       11      \N      2.99 1172714 6978    7       2012-08-17:12:40:37     1       11      \N      1.99 1129253 4688    15      2012-08-17:12:41:52     0       11      \N      1.99 Delimited text assumptions All the examples below showing loading of HDFS files of delimited text makes the assumption that the declared order of columns in an Oracle table map to the physical ordering of fields in a delimited text record in an HDFS file (i.e. the first field maps to the first Oracle column and so on).  If that is not the case, OHSH has a language construct called "loadermap" that solves this problem, but it will be discussed separately in Part 5 of this tutorial. Loading from Hive tables is less restrictive regarding order.  As long as the column names in both the Oracle table and the Hive table are the same, the load should work regardless of the order of column name declarations in the table.  If the column names are not identical, "loadermap" can again be used to solve the problem. Loading using OHSH's "jdbc" or "directpath" method When loading Oracle tables using "jdbc" or "directpath", you are using a Hadoop map or map/reduce job to read data living in Hadoop and then load it into Oracle. Either a map-only or a map/reduce job is launched on the Hadoop cluster and data is read by map tasks using standard Hadoop input splits. The data is converted to Oracle data types and injected into Oracle using Oracle SQL with JDBC or using Oracle OCI interface.  A "jdbc" method will be a map-only job if the target table is not partitioned.  If it is partitioned (and typically large) the "directpath" method should be used.  This runs a full map/reduce job where the reduce stage uses Hadoop shuffle/sort to segregate data by partition.  This is followed by injection of data into Oracle partitions using OCI Direct Path.  This interface bypasses Oracle SQL and is a faster and more efficient way of injecting data into an Oracle table. Using OHSH we need to create a JDBC resource which is used by both "jdbc" and "directpath" to create a connection to Oracle.  Below we are using a TNS alias called "inst1" and using a wallet to connect to the Oracle database transparently as Oracle user "MOVIEDEMO". ohsh>create oracle jdbc resource omovies connectid="inst1" For convenience, we will create a SQL*Plus resource to interrogate the table after the load. ohsh>create sqlplus resource sql connectid="inst1" The text file we are loading requires us to define the default data format which will be applied when Java needs to convert the string to an Oracle date.  We use the JDBC "dateformat" to do this. ohsh>set dateformat "yyyy-MM-dd HH:mm:ss" For the purposes of understanding what is going on under the covers we will set output to verbose. ohsh>set outputlevel verbose Now we run the load job specifying the target table and the path where the movie data lives. ohsh>load oracle table omovies:movie_ratings from path hadoop0:/user/${USER}/moviedemo/movie_ratings_delimitedtext using jdbc Understanding loading Oracle tables using "jdbc" or "directpath" methods To better understand the what OHSH is doing during a load command you will want to set OHSH "outputlevel" to "verbose".  For "jdbc" and "directpath" the output will include the standard output of a Hadoop Map/Reduce job.  (Output is highlighted in green.) A banner describes the Oracle SmartLoader job id and the user's identity both locally and in Hadoop. ################################################################################## Starting SmartLoader Job OSL_161202_123555 Local OS user: rhanckel Hadoop user:   rhanckel ################################################################################## This is followed by a brief statement of what the load job is going to do: Loading Oracle table "MOVIEDEMO"."MOVIE_RATINGS"   From HDFS files located at /user/rhanckel/moviedemo/movie_ratings_delimitedtext   Using Oracle Loader for Hadoop (OLH) JDBC These are the Hadoop map-reduce configuration settings for the job. In general they are largely invariant across jobs. -------------------------------------------------------------------------------------------------------------------------------- Hadoop MapReduce Configuration Settings -------------------------------------------------------------------------------------------------------------------------------- mapreduce.am.max-attempts=2 mapreduce.app-submission.cross-platform=false mapreduce.client.completion.pollinterval=5000 mapreduce.client.genericoptionsparser.used=true mapreduce.client.output.filter=FAILED mapreduce.client.progressmonitor.pollinterval=1000 [ More Hadoop properties than you will ever want to know... ] yarn.app.mapreduce.client-am.ipc.max-retries=3 yarn.app.mapreduce.client-am.ipc.max-retries-on-timeouts=3 yarn.app.mapreduce.client.job.max-retries=0 yarn.app.mapreduce.client.job.retry-interval=2000 yarn.app.mapreduce.client.max-retries=3 yarn.app.mapreduce.shuffle.log.backups=0 yarn.app.mapreduce.shuffle.log.limit.kb=0 yarn.app.mapreduce.shuffle.log.separate=true yarn.app.mapreduce.task.container.log.backups=0 These settings are important and explain what properties are being set for the particular load.  They are very load specific.  The "jdbc" and "directpath" methods use OLH, hence OLH configuration settings come into play. ---------------------------------------------------------------------------------- OLH Configuration Settings ---------------------------------------------------------------------------------- oracle.hadoop.loader.connection.tns_admin=/ade/rhanckel_hadoop11/oracle/work oracle.hadoop.loader.connection.url=jdbc:oracle:thin:@inst1 oracle.hadoop.loader.connection.wallet_location=/ade/rhanckel_hadoop11/oracle/work oracle.hadoop.loader.defaultDateFormat=yyyy-MM-dd HH:mm:ss oracle.hadoop.loader.input.fieldNames=F0,F1,F2,F3,F4,F5,F6,F7 oracle.hadoop.loader.input.fieldTerminator=\u002c oracle.hadoop.loader.loaderMap."ACTIVITY".field=F5 oracle.hadoop.loader.loaderMap."CUSTID".field=F0 oracle.hadoop.loader.loaderMap."GENREID".field=F2 oracle.hadoop.loader.loaderMap."MOVIEID".field=F1 oracle.hadoop.loader.loaderMap."RATING".field=F6 oracle.hadoop.loader.loaderMap."RECOMMENDED".field=F4 oracle.hadoop.loader.loaderMap."SALES".field=F7 oracle.hadoop.loader.loaderMap."TIME".field=F3 oracle.hadoop.loader.loaderMap.columnNames= "CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES" oracle.hadoop.loader.loaderMap.targetTable="MOVIEDEMO"."MOVIE_RATINGS" oracle.hadoop.loader.logBadRecords=true oracle.hadoop.loader.rejectLimit=-1 The information below explains how OHSH thinks you are connected to Oracle and whether you are relying on Oracle Wallet and TNS settings on the Hadoop cluster. -------------------------------------------------------------------------------------------------------------------------------- Oracle Database Connectivity Settings -------------------------------------------------------------------------------------------------------------------------------- TNS usage: OLH job using client TNS path on hadoop cluster. Wallet usage: OLH job using client wallet path on hadoop cluster. Finally, the job executes.  This is mostly OLH/Hadoop generated output which shows the map/reduce job's progress in the Hadoop cluster. -------------------------------------------------------------------------------------------------------------------------------- Begin OLH execution at 2016-12-02:12:35:55 -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- OLH MapReduce job is submitted to the Hadoop cluster to load an Oracle table using JDBC -------------------------------------------------------------------------------------------------------------------------------- [INFO] 2016-12-02 12:35:55,425 [oracle.hadoop.loader.OraLoader]  Oracle Loader for Hadoop Release 3.8.0 - Production  Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved. [INFO] 2016-12-02 12:35:55,438 [oracle.hadoop.loader.OraLoader]  Built-Against: hadoop-2.2.0 hive-0.13.0 avro-1.8.1 jackson-1.8.8 [INFO] 2016-12-02 12:35:55,757 [oracle.hadoop.loader.OraLoader]  oracle.hadoop.loader.loadByPartition is disabled because table: MOVIE_RATINGS_NO_PART is not partitioned [INFO] 2016-12-02 12:35:55,757 [oracle.hadoop.loader.OraLoader]  oracle.hadoop.loader.enableSorting disabled, no sorting key provided [INFO] 2016-12-02 12:35:55,758 [oracle.hadoop.loader.OraLoader]  Reduce tasks set to 0 because of no partitioning or sorting. Loading will be done in the map phase. [INFO] 2016-12-02 12:35:55,763 [loader.lib.output.DBOutputFormat]  Setting map tasks speculative execution to false for : oracle.hadoop.loader.lib.output.JDBCOutputFormat [WARN] 2016-12-02 12:35:56,016 [oracle.hadoop.loader.OraLoader]  Sampler is disabled because the number of reduce tasks is less than two. Job will continue without sampled information. [INFO] 2016-12-02 12:35:56,016 [oracle.hadoop.loader.OraLoader]  Submitting OraLoader job OSL_161202_123555 [INFO] 2016-12-02 12:35:56,016 [hadoop.metrics.jvm.JvmMetrics]  Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized [INFO] 2016-12-02 12:35:56,558 [mapreduce.lib.input.FileInputFormat]  Total input paths to process : 1 [INFO] 2016-12-02 12:35:56,622 [apache.hadoop.mapreduce.JobSubmitter]  number of splits:1 [INFO] 2016-12-02 12:35:56,674 [apache.hadoop.mapreduce.JobSubmitter]  Submitting tokens for job: job_local1428083019_0005 [INFO] 2016-12-02 12:35:58,215 [apache.hadoop.mapred.LocalDistributedCacheManager]  Localized [INFO] 2016-12-02 12:35:58,542 [apache.hadoop.mapreduce.Job]  The url to track the job: http://localhost:8080/ [Lots of log output...] [INFO] 2016-12-02 12:35:58,542 [apache.hadoop.mapred.LocalJobRunner]  OutputCommitter set in config null [INFO] 2016-12-02 12:35:58,544 [mapreduce.lib.output.FileOutputCommitter]  File Output Committer Algorithm version is 1 [INFO] 2016-12-02 12:35:58,544 [apache.hadoop.mapred.LocalJobRunner]  OutputCommitter is oracle.hadoop.loader.lib.output.DBOutputCommitter [INFO] 2016-12-02 12:35:58,548 [apache.hadoop.mapred.LocalJobRunner]  Waiting for map tasks [INFO] 2016-12-02 12:35:58,548 [apache.hadoop.mapred.LocalJobRunner]  Starting task: attempt_local1428083019_0005_m_000000_0 [INFO] 2016-12-02 12:35:58,570 [mapreduce.lib.output.FileOutputCommitter]  File Output Committer Algorithm version is 1 [INFO] 2016-12-02 12:35:58,571 [apache.hadoop.mapred.Task]   Using ResourceCalculatorProcessTree : [ ] [INFO] 2016-12-02 12:35:58,571 [apache.hadoop.mapred.MapTask]  Processing split: file:/user/rhanckel/moviedemo/movie_ratings_delimitedtext/tkhp_moviedata.txt:0+1675410 [INFO] 2016-12-02 12:35:58,716 [loader.lib.output.DBOutputFormat]  conf prop: defaultExecuteBatch: 100 [INFO] 2016-12-02 12:35:58,717 [loader.lib.output.DBOutputFormat]  conf prop: loadByPartition: false [WARN] 2016-12-02 12:35:58,718 [hadoop.loader.utils.FSLogger]  Internal error: log stream already closed [INFO] 2016-12-02 12:35:58,727 [loader.lib.output.DBOutputFormat]  Insert statement: INSERT INTO "MOVIEDEMO"."MOVIE_RATINGS_NO_PART" ("CUSTID", "MOVIEID", "GENREID", "TIME", "RECOMMENDED", "ACTIVITY", "RATING", "SALES") VALUES (?, ?, ?, ?, ?, ?, ?, ?) [INFO] 2016-12-02 12:35:59,543 [oracle.hadoop.loader.OraLoader]  map 0% reduce 0% [INFO] 2016-12-02 12:36:02,491 [apache.hadoop.mapred.LocalJobRunner]  [INFO] 2016-12-02 12:36:02,514 [apache.hadoop.mapred.Task]  Task:attempt_local1428083019_0005_m_000000_0 is done. And is in the process of committing [INFO] 2016-12-02 12:36:02,522 [apache.hadoop.mapred.LocalJobRunner]  [INFO] 2016-12-02 12:36:02,522 [apache.hadoop.mapred.Task]  Task attempt_local1428083019_0005_m_000000_0 is allowed to commit now [INFO] 2016-12-02 12:36:02,526 [loader.lib.output.JDBCOutputFormat]  Committed work for task attempt attempt_local1428083019_0005_m_000000_0 [INFO] 2016-12-02 12:36:02,529 [mapreduce.lib.output.FileOutputCommitter]  Saved output of task 'attempt_local1428083019_0005_m_000000_0' to file:/user/rhanckel/smartloader/jobhistory/oracle/MOVIEDEMO/MOVIE_RATINGS_NO_PART/OSL_161202_123555/_temporary/0/task_local1428083019_0005_m_000000 [INFO] 2016-12-02 12:36:02,533 [apache.hadoop.mapred.LocalJobRunner]  map [INFO] 2016-12-02 12:36:02,534 [apache.hadoop.mapred.Task]  Task 'attempt_local1428083019_0005_m_000000_0' done. [INFO] 2016-12-02 12:36:02,534 [apache.hadoop.mapred.LocalJobRunner]  Finishing task: attempt_local1428083019_0005_m_000000_0 [INFO] 2016-12-02 12:36:02,534 [apache.hadoop.mapred.LocalJobRunner]  map task executor complete. [INFO] 2016-12-02 12:36:02,544 [oracle.hadoop.loader.OraLoader]  map 100% reduce 0% [INFO] 2016-12-02 12:36:03,544 [oracle.hadoop.loader.OraLoader]  Job complete: OSL_161202_123555 (job_local1428083019_0005) [INFO] 2016-12-02 12:36:03,545 [oracle.hadoop.loader.OraLoader]  Counters: 15         File System Counters                 FILE: Number of bytes read=1507643076                 FILE: Number of bytes written=1506266096                 FILE: Number of read operations=0                 FILE: Number of large read operations=0                 FILE: Number of write operations=0         Map-Reduce Framework                 Map input records=39716                 Map output records=39716                 Input split bytes=113                 Spilled Records=0                 Failed Shuffles=0                 Merged Map outputs=0                 GC time elapsed (ms)=23                 Total committed heap usage (bytes)=504889344         File Input Format Counters                 Bytes Read=1675410         File Output Format Counters                 Bytes Written=55072 The end of the job indicates whether it was successful or not and the load time. Load operation OSL_161202_123555 successful. Load time = 0:00:08 -------------------------------------------------------------------------------- End OLH execution at 2016-12-02:12:36:03 -------------------------------------------------------------------------------- ################################################################################ Ending SmartLoader Job OSL_161202_123555 Local OS user: rhanckel Hadoop user:   rhanckel ################################################################################ As you can see the output shows you the progress of the work capturing standard Hadoop map/reduce output generated by Hadoop.  The general structure of verbose output when loading using "jdbc" or "directpath" looks the same with minor differences about properties being set. Understanding loading Oracle tables using "exttab" method When loading tables using "exttab" the execution is the opposite from "jdbc" or "directpath". Rather than writing data into Oracle from Hadoop, Oracle reads data from Hadoop using an Oracle external tables (generated by OSCH).  For the "exttab" method the data lives as delimited text files living in HDFS. The sequence of OHSH commands is similar to loads using "jdbc" and "directpath" with two exceptions:  rather then set a Java "dateformat", you need to set the "datemask".  (This is the date format descriptor that is used by Oracle external tables.)  For the method you need to specify "exttab" (i.e. Oracle external table). ohsh>set datemask "YYYY-MM-DD HH24:MI:SS" ohsh>set outputlevel verbose ohsh>load oracle table omovies:movie_ratings from path \   hadoop0:/user/${USER}/moviedemo/movie_ratings_delimitedtext using exttab Below the output reveals what is going on under the covers.  Properties are set for OSCH, an Oracle External table is created on the fly that maps to data living in HDFS.  Default formats are set for NLS_DATE based on the "set datemask" statement in OHSH.  The external table is created which will read data living in /user/${USER}/moviedemo/movie_ratings_delimitedtext in HDFS.  The load consists of an Oracle INSERT statement being feed rows from a SELECT statement against the external table. After the load is committed the external table and other load artifacts are deleted in Oracle. ############################################################################### Starting SmartLoader Job OSL_161211_205819 Local OS user: rhanckel Hadoop user:   rhanckel ############################################################################### Loading Oracle table "MOVIEDEMO"."MOVIE_RATINGS"   From Hive table moviedemo.movie_ratings   Using Oracle SQL Connector for Hadoop (OSCH) external tables -------------------------------------------------------------------------------------------------------------------------------- OSCH Configuration Settings -------------------------------------------------------------------------------------------------------------------------------- oracle.hadoop.connection.url=jdbc:oracle:thin:@inst1 oracle.hadoop.connection.user="MOVIEDEMP" oracle.hadoop.exttab.colMap."ACTIVITY".columnName=ACTIVITY oracle.hadoop.exttab.colMap."ACTIVITY".columnType=NUMBER oracle.hadoop.exttab.colMap."CUSTID".columnName=CUSTID oracle.hadoop.exttab.colMap."CUSTID".columnType=NUMBER oracle.hadoop.exttab.colMap."GENREID".columnName=GENREID oracle.hadoop.exttab.colMap."GENREID".columnType=NUMBER oracle.hadoop.exttab.colMap."MOVIEID".columnName=MOVIEID oracle.hadoop.exttab.colMap."MOVIEID".columnType=NUMBER oracle.hadoop.exttab.colMap."RATING".columnName=RATING oracle.hadoop.exttab.colMap."RATING".columnType=NUMBER oracle.hadoop.exttab.colMap."RECOMMENDED".columnName=RECOMMENDED oracle.hadoop.exttab.colMap."RECOMMENDED".columnType=NUMBER oracle.hadoop.exttab.colMap."SALES".columnName=SALES oracle.hadoop.exttab.colMap."SALES".columnType=NUMBER oracle.hadoop.exttab.colMap."TIME".columnName=TIME oracle.hadoop.exttab.colMap."TIME".columnType=DATE oracle.hadoop.exttab.colMap."TIME".dateMask=YYYY-MM-DD HH24:MI:SS oracle.hadoop.exttab.createBadFiles=true oracle.hadoop.exttab.createLogFiles=false oracle.hadoop.exttab.defaultDirectory="MOVIEDEMO_DEFAULT_DIR" oracle.hadoop.exttab.hive.databaseName=moviedemo oracle.hadoop.exttab.hive.tableName=movie_ratings oracle.hadoop.exttab.locationFileCount=4 oracle.hadoop.exttab.skipColPrefix=OSL_SKIPCOL_ oracle.hadoop.exttab.sourceType=hive oracle.hadoop.exttab.tableName="OSL_161211_205819_EXT" -------------------------------------------------------------------------------------------------------------------------------- Setting NLS_DATE_FORMAT for Oracle session using OHSH datemask -------------------------------------------------------------------------------------------------------------------------------- ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS' -------------------------------------------------------------------------------------------------------------------------------- Setting NLS_TIMESTAMP_FORMAT for Oracle session using OHSH timestampmask -------------------------------------------------------------------------------------------------------------------------------- ALTER SESSION SET NLS_TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF' -------------------------------------------------------------------------------------------------------------------------------- Setting NLS_TIMESTAMP_TZ_FORMAT for Oracle session using OHSHtimestampmask -------------------------------------------------------------------------------------------------------------------------------- ALTER SESSION SET NLS_TIMESTAMP_TZ_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF' -------------------------------------------------------------------------------------------------------------------------------- Create OSCH external table(s) used for accessing Hadoop content -------------------------------------------------------------------------------------------------------------------------------- Oracle SQL Connector for HDFS Release 3.7.0 - Production  Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved. The create table command succeeded. User: MOVIEDEMO performed the following actions in schema: MOVIEDEMO CREATE TABLE "MOVIEDEMO"."OSL_161211_205819_EXT" ( "CUSTID"                         NUMBER, "MOVIEID"                        NUMBER, "GENREID"                        NUMBER, "TIME"                              DATE, "RECOMMENDED"           NUMBER, "ACTIVITY"                       NUMBER, "RATING"                         NUMBER, "SALES"                          NUMBER ) ORGANIZATION EXTERNAL (    TYPE ORACLE_LOADER    DEFAULT DIRECTORY "MOVIEDEMO_DEFAULT_DIR"    ACCESS PARAMETERS    (      RECORDS DELIMITED BY 0X'0A'      NOLOGFILE      CHARACTERSET AL32UTF8      PREPROCESSOR "OSCH_BIN_PATH":'hdfs_stream'      FIELDS TERMINATED BY 0X'09'      MISSING FIELD VALUES ARE NULL      (        "CUSTID" CHAR NULLIF "CUSTID"=0X'5C4E',        "MOVIEID" CHAR NULLIF "MOVIEID"=0X'5C4E',        "GENREID" CHAR NULLIF "GENREID"=0X'5C4E',        "TIME" CHAR DATE_FORMAT DATE MASK 'YYYY-MM-DD HH24:MI:SS' NULLIF "TIME"=0X'5C4E',        "RECOMMENDED" CHAR NULLIF "RECOMMENDED"=0X'5C4E',        "ACTIVITY" CHAR NULLIF "ACTIVITY"=0X'5C4E',        "RATING" CHAR NULLIF "RATING"=0X'5C4E',        "SALES" CHAR NULLIF "SALES"=0X'5C4E'      )    )    LOCATION    (      'osch-20161211085828-9526-1'    ) ) PARALLEL REJECT LIMIT UNLIMITED; The following location files were created. osch-20161211085828-9526-1 contains 1 URI, 1813410 bytes      1813410 -------------------------------------------------------------------------------------------------------------------------------- Begin OSCH execution  at 2016-12-11:20:58:30 -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- Begin single load (1/1) at 2016-12-11:20:58:30 -------------------------------------------------------------------------------------------------------------------------------- ALTER SESSION FORCE PARALLEL DML PARALLEL 4 ALTER SESSION FORCE PARALLEL QUERY PARALLEL 4 INSERT /*+ append pq_distribute("MOVIEDEMO"."MOVIE_RATINGS", none) */ INTO "MOVIEDEMO"."MOVIE_RATINGS"("CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES") SELECT "CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES" FROM "OSL_161211_205819_EXT" Transaction is committed. -------------------------------------------------------------------------------------------------------------------------------- End single load at 2016-12-11:20:58:34. Elapsed load time = 0:00:04. -------------------------------------------------------------------------------------------------------------------------------- Load operation OSL_161211_205819 successful. Load time = 0:00:04 -------------------------------------------------------------------------------------------------------------------------------- End OSCH execution at 2016-12-11:20:58:34 -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- Executing OSCH post-load cleanup -------------------------------------------------------------------------------------------------------------------------------- Oracle SQL Connector for HDFS Release 3.7.0 - Production  Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved. The drop command was successful. User: MOVIEDEMO dropped OSCH external table MOVIEDEMO.OSL_161211_205819_EXT and all associated location files. Successfully dropped transient tables/views created for the load ################################################################################################## Ending SmartLoader Job OSL_161211_205819 Local OS user: rhanckel Hadoop user:   rhanckel ################################################################################################## Loading Hive tables using "jdbc" and "directpath" Loading Oracle table from Hive tables is similar in syntax as when loading from HDFS files.  The difference is the source is the name of a Hive table living in either the "default" or some other Hive database. If loading from a table in Hive that is not in the "default" Hive database (i.e. OHSH Hive resource "hmovies"), then you have to create an OHSH resource that identifies the Hive database.  Here we create an OHSH resources called "hmovies" that maps to a Hive database "moviedemo". ohsh>create hive resource hmovies connectionurl = "jdbc:hive2:///moviedemo;" Again, since OLH is doing the load the Java dateformat needs to be set accordingly. ohsh>set dateformat "yyyy-MM-dd HH:mm:ss"  Finally you execute the load command with a from clause that specifies data is coming from a Hive table. ohsh>load oracle table omovies:movie_ratings \      from hive table hmovies:movie_ratings using directpath  A "jdbc" load is virtually identical. ohsh>load oracle table omovies:movie_ratings from hive table hmovies:movie_ratings using jdbc Loading Hive tables using "exttab" Loading Oracle tables from Hive tables using using "exttab" looks like this: ohsh>set datemask "YYYY-MM-DD HH24:MI:SS" ohsh>load oracle table omovies:movie_ratings \      from hive table hmovies:movie_ratings using exttab Limitations when loading Hive Tables All OHSH methods load Hive tables by using Hive metadata to determine the physical format of the data and where it is stored. The "jdbc", "directpath", and "exttab" commands above assume that the columns names in the Oracle table are the same column names in the Hive table, and that the Hive native data types of the Hive columns are convertible to Oracle data types. OLH "jdbc" and "directpath" are also more functional than "exttab" in that loads are not restricted by the underlying storage of the Hive table.  The Hive table can be delimited text, parquet, orc, and rcfiles. The "exttab" method is generally restricted to loading Hive tables that are delimited text.  (It can read data pump files living in HDFS, but that is a special case explained in later parts of this tutorial.)  It reads the HDFS files living under a Hive table directly using OSCH which streams delimited text into a vanilla Oracle external table. In the examples above, the assumption is that the column name of the target table is the column name of the Hive table.  If this assumption does not hold the OHSH "loadermap" needs to be used to provide an explicit column mapping between the Hive source and Oracle target tables. Loading Hive partitions If your Hive table is partitioned OHSH can either load all partitions or those that satisfy a partition filter expression. Note that partitions manifest themselves differently in Hive and Oracle.   In Hive, a partition serves as an organizational structure to segregate data files within an HDFS file system.  Data files belonging to a single Hive partition literally map to a unique HDFS directory dedicated to the partition.  This also means that the partition column value does not live as data with the values of other columns of the table.  It is a virtual column. In Oracle, the partition value is a column whose value is stored with other column data in a table.   Hive partitions are independent of partitions in Oracle.  For example a Hive table maybe partitioned while the Oracle table may not (and vice versa). If you want the partitioning to be the same you have to create the Hive table and Oracle table with the same partition columns.   Practically speaking, only Oracle list partitions can be mapped to Hive partitions. A Hive partition example Here is a Hive table that is partitioned the "state" column representing U.S.New England states (e.g. "CT" is Connecticut, "VT" is Vermont").    hive> describe movie_ratings_part; OK custid                   int                            movieid                int                                      genreid                 int time                      timestamp                                       activity                 int                                rating                    int                                     sales                     float                                     state                    string                                       # Partition Information          # col_name              data_type               comment              state                   string                             Loading all the partitions in a Hive table into an Oracle table has the same syntax as when loading non-partitioned tables. To load a subset of partitions, you need to specifiy a partition filter clause.  Again, except for the "dateformat"/"datemask" difference the syntax is identical for all methods. ohsh> set dateformat "yyyy-MM-dd HH:mm:ss" ohsh> load oracle table omovies:movie_ratings_part from hive table hmovies:movie_ratings_part partition filter = "(state=\"CT\" or state=\"VT\")"  using directpath ohsh> set dateformat "yyyy-MM-dd HH:mm:ss" ohsh> load oracle table omovies:movie_ratings_part from hive table hmovies:movie_ratings_part partition filter = "(state=\"CT\" or state=\"VT\")" using jdbc ohsh>set datemask "YYYY-MM-DD HH24.MI.SS" ohsh>load oracle table omovies:movie_ratings_part from hive table hmovies:movie_ratings_part partition filter = "(state=\"CT\" or state=\"VT\")" using exttab The execution output looks like standard map/reduce output for "jdbc" and "directpath" with some extra verbiage highlighting that Hive partition filters are being evaluated.  The "exttab" output looks different since it creates one OSCH external table for each partition being loaded, and iterates through each external table, loading one partition at a time.  So if N partitions are being loaded, expect to see N Oracle INSERT statements selecting data from N external tables. -------------------------------------------------------------------------------------------------------------------------------- Begin OSCH execution  at 2017-06-11:00:33:47 -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- Begin single load (1/2) at 2017-06-11:00:33:47 -------------------------------------------------------------------------------------------------------------------------------- ALTER SESSION FORCE PARALLEL DML PARALLEL 4 ALTER SESSION FORCE PARALLEL QUERY PARALLEL 4 INSERT /*+ append pq_distribute("OLHP"."MOVIE_RATINGS_PART", none) */ INTO "MOVIEDEMO"."MOVIE_RATINGS_PART"("CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES","STATE") SELECT "CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES","STATE" FROM "OSL_170611_003337_EXT_1" Transaction is committed. Loaded Hive partition: STATE=CT -------------------------------------------------------------------------------------------------------------------------------- End single load at 2017-06-11:00:33:51. Elapsed load time = 0:00:04. -------------------------------------------------------------------------------------------------------------------------------- -------------------------------------------------------------------------------------------------------------------------------- Begin single load (2/2) at 2017-06-11:00:33:51 -------------------------------------------------------------------------------------------------------------------------------- INSERT /*+ append pq_distribute("MOVIEDEMO"."MOVIE_RATINGS_PART", none) */ INTO "MOVIEDEMO"."MOVIE_RATINGS_PART"("CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES","STATE") SELECT "CUSTID","MOVIEID","GENREID","TIME","RECOMMENDED","ACTIVITY","RATING","SALES","STATE" FROM "OSL_170611_003337_EXT_2" Transaction is committed. Loaded Hive partition: STATE=VT -------------------------------------------------------------------------------------------------------------------------------- End single load at 2017-06-11:00:33:54. Elapsed load time = 0:00:02. -------------------------------------------------------------------------------------------------------------------------------- Load operation OSL_170611_003337 successful. Load time = 0:00:07 -------------------------------------------------------------------------------------------------------------------------------- End OSCH execution at 2017-06-11:00:33:54 --------------------------------------------------------------------------------------------------------------------------------   Cross schema load So far all the examples assume that the Oracle table being loaded from data in HDFS or in Hive lives in the schema of the Oracle user connecting to the database with JDBC.  An Oracle user can load into a table that lives in another Oracle schema assuming the user has been granted the appropriate privileges. In OHSH this is syntactically expressed by providing a fully qualified table name that "dot qualifies" the table name with the schema that owns it. For example, suppose you are the Oracle user "MOVIEDEMO" and want to load HDFS data into "movie_ratings" living in the "SCOTT" schema.  The load command would look like this. ohsh>load oracle table omovies:scott.movie_ratings from path hadoop0:/user/${USER}/moviedemo/movie_ratings_delimitedtext using jdbc Note that all OHSH identifiers for Oracle tables, schema, directories, and column names can be double quoted to preserve case and to specify odd characters.  In general case sensitivity and odd characters are never a good idea, especially when you want to create similar table and column names in both Oracle and in Hive. Where to look if there is a load failure When running loads, verbose console output is the first place to see where a load failed.  If the load failed because of some underlying failure in the OLH or OSCH job (e.g. bad data) you need to either look in OLH job log files living in Hadoop when running "jdbc" or "directpath", or look in Oracle directories supporting logging of Oracle external tables. For OLH, the logs describing the load are located in HDFS under "/user/${USER}/smartloader/jobhistory/oracle" subdirectories.  The subdirectories are partitioned by Oracle schema, Oracle table name and the specific OHSH job prefaced by "OSL_" and some unique job id. Below is the location of a "directpath" load of Oracle table MOVIEDEMO.MOVIE_RATINGS whose load id is OSL_170112_12101.  The oraloader-report.txt describes the load at a high level. The separate OCI log files are outputs of Hadoop reduce tasks participating in a load. hadoop fs -ls /user/rhanckel/smartloader/jobhistory/oracle/MOVIEDEMO/MOVIE_RATINGS/OSL_170112_121015/_olh oraloader-00000-oci.log  oraloader-00001-oci.log  oraloader-00002-oci.log  oraloader-00003-oci.log  oraloader-00004-oci.log  oraloader-report.txt oraloader-00000-oci.xml  oraloader-00001-oci.xml  oraloader-00002-oci.xml  oraloader-00003-oci.xml  oraloader-00004-oci.xml  tableMetadata.xml oraloader-00000-r.xml    oraloader-00001-r.xml    oraloader-00002-r.xml    oraloader-00003-r.xml    oraloader-00004-r.xml When running "exttab" loads you need to look at the standard places one looks for errors when using Oracle external tables.  This includes the default directory for the external table and the log directory of the external tables if specified.   The OHSH default "createlogfiles" need to be set to "true" to produce log files.  Log files will be written the external default directory or the log directory if specified.  (In OHSH, these are set as OHSH default settings "defaultdirectory" and "logdirectory").  If the problem involves bad data you will want to set the OHSH default "logbadrecords" to true. Which method do I use for loading? If you are loading from Hive tables whose storage is not delimited text you need to load using "jdbc" or "directpath".   In general the choice between "jdbc" and "directpath" depends upon whether the Oracle table is partitioned.  If the table is not partitioned, you have to run "jdbc".  If it is partitioned you can run "jdbc", but "directpath" is more efficient because it bypasses Oracle SQL code path.   If you are loading delimited text (either in Hive tables or in HDFS files) the choice is between "exttab" and "jdbc" for non-partitioned Oracle tables, or "exttab" and "directpath" for partitioned Oracle tables.  (Again you can run "jdbc" against partitioned Oracle tables, but you should opt for running "directpath" since it will do the same job faster and more efficiently). Loading delimited text using "exttab" is much faster in end-to-end load time but is much more expensive in terms of CPU cycles expended on the system running Oracle.  All conversion to Oracle data types is being done using Oracle CPU cycles. Loading delimited text using "jdbc" or "directpath" is slower in end-to-end time, but that's because the data conversion is done on the Hadoop side and is split across many map-reduce tasks.  There is less burden on Oracle CPU because the heavy lifting of Oracle type conversion was done on the Hadoop side. Note that the "load oracle table" command in OHSH does not require you to choose a method.  If a method is not specified OHSH will choose a default method: choosing "jdbc" if the Oracle table is not partitioned or choosing "directpath" otherwise. Best practices for loading Oracle tables using OHSH If you want to minimize complexities in loading data into Oracle from Hadoop you might want to consider doing the following. o Set console output to verbose. Verbose output explains what is going on under the covers, and outputs load settings for Hadoop, Hive, OLH, and OSCH.  Because output for "jdbc" and "directpath" includes the standard OLH output of a Hadoop map-reduce job, you get a very clear idea of the progress of the load.  Verbose output often clarifies the source of a failure without hunting down logs. o Ensure that dates and timestamp fields have uniform formats Most problems with delimited text parsing focus on dates and timestamp fields because there are so many different formats.  Conversion of these types is much more complex than any other scalar data type.  If you generate Hadoop data with a single standard format for all date and timestamp fields, life will be much easier. o When loading Hive tables into Oracle tables ensure that the column names and order of declaration are the same for Hive tables and Oracle tables This is not a hard requirement, since OHSH loadermaps work around this problem, but it does introduce more complexity in specifying the load operation.  So if you can give columns in both tables the same names and order, life becomes much simpler and you will get to the beach more often, especially when the table has a large number of columns. o Isolate the setting of defaults to a separate callable OHSH script If there are some standard default settings that are invariant across all loads, write up a standard initialization OHSH script that gets executed at the outset of an OHSH session either by calling the script indirectly or by using the initialization switch (e.g. "-i <initscript>") when invoking OHSH.  

In this tutorial we are going to talk in depth about loading Oracle tables with content from HDFS files or from Hive tables.  The focus will be on content that is stored as delimited text, but we will...

Connecting Hadoop with Oracle

How to Load Oracle and Hive Tables using OHSH (Part 2 - OHSH Configuration and CLI Usage)

In this "eat your vegetables before having dessert" tutorial we will talk about how to configure OHSH.  We will also discuss groups of OHSH commands that you will need to know about before you start using the tool to perform loading tasks. Configuration and SQL*Plus and Hadoop/Hive Dependencies Configuration of OHSH will differ depending upon where you want to run it.  The short story is that OHSH expects to run in a Linux environment with Bash shell available, and have SQL*Plus, Hadoop and Hive client software installed.   The latter two need to be configured to access the Hadoop cluster of interest. An Oracle database system serves as a natural home for this tool since loading is a DBA centric activity.  (It's where SQL*Plus is installed.  It's also where OSCH kit needs to be installed, if you are using Oracle Connectors for Hadoop).  If OSCH was previously installed, then the dependent Hadoop and Hive clients should already be on the system.  For production systems, it is most secure to run OHSH logged in as "oracle" or some other OS user on the Oracle system that can do DBA type activities. Alternatively, your can run OHSH on a Hadoop cluster.  Oracle's BDA and most Hadoop clusters comes automatically installed and configured for Hadoop and Hive.  If SQL*Plus is not installed you should install Oracle Instant Client.  You then just need to twiddle with Oracle TNS names or connection URLs to ensure you can connect to the Oracle database with JDBC and SQL*Plus. If you want to run it on an edge node you will need to install Hadoop and Hive clients and Oracle Instant client.  If your SQL*Plus client connects to your Oracle database and Hadoop and Hive can find their Hadoop cluster, you are good to go. The OHSH kit has three configuration READMEs for Oracle, Hadoop cluster, and edge node environments, that walk through the configuration details step by step. Once you have all these dependencies installed, check to see that Bash, Hadoop, Beeline or HiveCLI, and SQL*Plus are working and can access the appropriate resources (i.e. Hadoop and Hive command line work and can access the Hadoop cluster, and SQL*Plus can connect to your Oracle database system.) Oracle Connectors and/or Copy To Hadoop Dependencies You need to download and install the Oracle Connectors (OLH and OSCH) and/or the Copy To Hadoop (CP2HADOOP) feature from Big Data SQL. NOTE: OHSH is tightly coupled to particular versions of dependencies so check the OHSH README to know the precise version of each product it wants to use. These kits need to live on some directories on your system that you can point to.  OLH and CP2HADOOP are just a bunch of jars that OHSH needs to find. OSCH is a little more complicated because it has a tool component that OHSH calls to generate Oracle external tables to access Hadoop HDFS content, and it has run-time components that needs to live on the Oracle database system and serve as a gateway to HDFS content. If your system is not the database system, the OSCH kit also needs to be copied and configured on the database system.  Configuration of OSCH on an Oracle system requires installing a Hadoop client that can access the Hadoop cluster, creating some Oracle directories, and setting some Oracle privileges.  OSCH's installation guide explains all of this.  Once a Hadoop client is installed on the Oracle system, configuring OSCH on the system takes a few minutes. Configuring ohsh_config.sh Once all the dependencies are in place, you need to edit ohsh_config.sh in the bin directory of OHSH. 1.) (REQUIRED) You need to define OLH_HOME, OSCH_HOME, and/or CP2HADOOP_HOME to the directories on your system where these kits were unzipped and installed.  To use Oracle Big Data Connectors, OLH_HOME and OSCH_HOME must be both set.   2.) (REQUIRED) You need to define how to connect to HiveServer2 Specifically you need to define HS2_HOST_PORT=<host>:<port> where <host> is the host name on which HiveServer2 is running and <port> is the port number on which HiveServer2 is listening. You also need to define HIVE_SESS_VAR_LIST that contains session variables required to connect to HiveServer2. To connect to HiveServer2 with Kerberos authentication, "principal=<Server_Principal_of_HiveServer2>" To connect to HiveServer2 running in HTTP mode, "transportMode=http;httpPath=<http_endpoint>" 3.) (OPTIONAL) If Hadoop and Beeline/Hive CLI do not default to the appropriate configuration to find the right Hadoop cluster, you need to define HADOOP_CONF_DIR and HIVE_CONF_DIR. 4.) (OPTIONAL) If you needed to install Oracle Instant Client you will need to define SQLCLIENT_LIB. Post Configuration 4.) (OPTIONAL) If you want to use TNS aliases for connecting to Oracle you need to define TNS_ADMIN to point to the directory where "tnsnames.ora" lives.  Typically this lives under $ORACLE_HOME/network/admin, but you can create your own TNS file with your own aliases for connecting to particular Oracle databases.  They just need to be readable from the OS user running OHSH. 5.) (OPTIONAL) If you have set up wallet security for connecting to Oracle set the WALLET_LOCATION to the directory containing Oracle Wallet artifacts.  The wallet should live in a directory owned by the OS user running OHSH whose privileges are 700 (readable or writable only by your OS user name). Please note that the Oracle Wallet is what you always should use when running OHSH in a production environment.  If you are experimenting with the tool and kicking the tires in the OHSH examples, you can run without it.  When the Oracle Wallet is not configured OHSH will prompt you for Oracle user name and password when creating a JDBC or SQL*Plus resource.  However if you have a few minutes to configure the Oracle Wallet it will be well worth your while, since you won't be consistently prodded for user credentials every time you invoke OHSH. OHSH Invocation You should put $OHSH_HOME/bin on your PATH. To launch OHSH interactively, type "ohsh".  The banner will be accompanied by the underling Oracle Connector and Copy To Hadoop technologies that have be installed and configured. bash3.2> ohsh Oracle Shell for Hadoop Loaders Release 1.2.0 - Production   (Build:20161119092118) Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. Oracle Loader for Hadoop (3.8.0), Oracle SQL Connector for HDFS (3.7.0), The Copy to Hadoop feature of Oracle Big Data SQL (3.1.0) enabled. OHSH can be invoked non-interactively with the -f switch. bash3.2> ohsh -f mytableload.ohsh It also supports an initialization script that executes before the interactive prompt appears. bash3.2> ohsh -i initresources.ohsh Oracle Shell for Hadoop Loaders Release 1.2.0 - Production   (Build:20161119092118) Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. Oracle Loader for Hadoop (3.8.0), Oracle SQL Connector for HDFS (3.7.0), The Copy to Hadoop feature of Oracle Big Data SQL (3.1.0) enabled. The -i and -f switches can be used together for non-interactive sessions. bash3.2> ohsh -i initresources.ohsh -f mytableload.ohsh This is useful for executing commands to initialize resources used for all loading activities (e.g. JDBC connection to a specific Oracle database), before executing a specific load operation. OHSH runs as a Hadoop application.  It is launched as a jar by the "hadoop" command and inherits Hadoop's default configuration.  When OHSH performs load operations, it clones the configuration settings and adds new properties and/or alters default properties to serve a particular job.   bash3.2> ohsh -c  This dumps the properties OHSH inherits from hadoop and from settings in the "ohsh" launch script.  This is useful for inspecting the configuration that exists before OHSH modifies it before running a particular job. OHSH help outlines the groups of commands that you will be working with. ohsh>help Help <sub-commands> help load     - load Oracle tables or load/create/replace Hive tables commands help set      - set load defaults help show     - show load defaults help shell    - shell commands help resource - create and drop resources Resource Commands Resources describe named entities that you will be interacting with to perform load operations. When you launch OHSH you get predefined named resources for bash, hadoop, and beeline/hive. ohsh>show resources hadoop0 : Hadoop Command Line Resource   HADOOP_CONF_DIR = /home/rhanckel/hadoop_scratch/conf hive0 : Hive Command Line Resource   HIVE_CONF_DIR = /ade/rhanckel/hive_scratch/conf   Connect to jdbc:hive2:///default;   hadoop resource id = hadoop0 bash0 : Bash Command Line Resource   path = /usr/local/bin/bash These are command line resources and are invoked using the "%" operator. Check to see if these resources above (i.e. bash0, hadoop0, hive0) work.  ohsh>%bash0 uname Linux ohsh>%hadoop0 fs -ls /user Found 2 items drwxrwxr-x   - rhanckel g578       4096 2016-06-21 15:43 /user/oracle drwxrwxrwx   - root     root       4096 2016-10-30 18:24 /user/rhanckel ohsh>%hive0 show tables; +-------------------------------+--+ |           tab_name            | +-------------------------------+--+ | movie_ratings                 | +-------------------------------+--+ Create SQL*Plus and Oracle JDBC Resources Now lets define SQL*Plus and JDBC resources that are related to an Oracle database.  The Oracle user schema of interest is called MOVIEDEMO.  The connect id to the database is "movieserver:15210/movieservice".  (If you have TNS aliases enabled, your connect id can be an alias.)  If an Oracle Wallet has not been configured, these commands will prompt for an Oracle user name and password. ohsh>create sqlplus resource sql connectid="movieserver:15210/movieservice" ohsh>create oracle jdbc resource omovies connectid="movieserver:15210/movieservice" Now we should be able to delegate commands to SQL*Plus connecting as "OLHP". NOTE: Don't forget the semicolon at the end of your SQL*Plus input. Otherwise SQL*Plus will quietly treat the input as a NOOP. ohsh>%sql select table_name from tabs; TABLE_NAME -------------------------------------------------------------------------------- MOVIE_RATINGS 1 rows selected. User defined resources need to be declared with a resource name that doesn't clash with other currently defined resources.  Additional Hive resources can be declared to map to other Hive databases managed by Hive (e.g. "hmovie" rather than "default").  Hive "databases" are roughly equivalent to Oracle schemas.  They define a namespace for user defined tables but live in the same Hadoop/Hive system. ohsh>create hive resource hmovies connectionurl="jdbc:hive2:///moviedemo" The "show resources" command should reflect these three additional resources. ohsh>show resources omovies : Oracle JDBC Resource   user=MOVIEDEMO   connectId=moviehost:15210/movieservice sql : Oracle SQL*Plus Command Line Resource   user=MOVIEDEMO   connectId=moviehost:15210/movieservice   sysdba=false hadoop0 : Hadoop Command Line Resource hmovies : Hive Command Line Resource   HIVE_CONF_DIR = /user/rhanckel/hive_scratch/conf   Connect to jdbc:hive2:///moviedemo;   hadoop resource id = hadoop0 hive0 : Hive Command Line Resource   HIVE_CONF_DIR = /user/rhanckel/hive_scratch/conf   Connect to jdbc:hive2:///default;   hadoop resource id = hadoop0 bash0 : Bash Command Line Resource   path = /usr/local/bin/bash "hive0" and any user created resources can be dropped.   ohsh>drop resource hmovie; Only "bash0" and "hadoop0" resources are not drop-able. Command Line Resources and Multi-line Statements In OHSH the delegation operator (i.e. "%") simply takes one opaque line of input and feeds it to bash, Beeline, Hadoop, or SQL*Plus for execution in a sub-process.   SQL*Plus, Beeline, and bash support multiple lines of input.  This situation is indicated by using the ">>" operator with the delegation operator.  This allows one push multiple commands to a resource.  The termination of the input is a line with a single ";"  ohsh>%sql >> select table_name from tabs where table_name like 'M%'; select directory_name from all_directories where directory_name like 'OHSH%'; ; TABLE_NAME -------------------------------------------------------------------------------- MOVIE_RATINGS 1 rows selected. DIRECTORY_NAME -------------------------------------------------------------------------------- OHSH_FUSE_MOUNT OHSH_T_WORK ohsh>%bash0 >> uname echo $HOME ; Linux /home/rhanckel Shell Commands OHSH supports ease of use shell features: OHSH inherits the environmental variables of the parent process OHSH supports setenv and printenv Environmental variables are expanded using formal bracketed ${<envvar_name>} syntax that appears in a command.  Note that ${USER} will be expanded. $USER will not be expanded. Up arrow and down arrow causes command line recall history recall is supported the bang operator recalls interactive commands @<filename> input redirection (with ".ohsh" as default file extension) spooling operations (by default to ohshspool.txt in the local directory)        e.g. ohsh>spool on        Turns spooling on to ohshspool.txt        e.g. ohsh>spool off        Turns spooling off        e.g. set spool "mytest.txt"        Directs spooling to a user defined file. History is only recorded for interactive sessions. It is persisted in a shadow directory under ${HOME}/.ohsh. User input is tokenized by OHSH as STRING and DQSTRING.  A STRING is a token that is not double quoted and has the typical characters one expects to see in Oracle and Hive identifiers, or in normal looking file paths, or URLs.  The DQSTRING is a double quoted string and accepts virtually anything as input.  DQSTRING always preserves case.  When STRING is used as an Oracle identifier will be normalized to upper case. Sometimes user input clashes with defined tokens. For example: ohsh>@resources line 1:1 mismatched input 'resources' expecting {STRING, DQSTRING} Error: ohsh syntax error encountered Use "help" to check correct syntax.  The problem is that "resources" is a keyword in OHSH grammar, so  it is getting confused when it is being used as a filename.  (The full grammar specification is specified at the bottom of this chapter.) You can always work around the problem by double quoting the input. ohsh>@"resources" For this case adding the file extension also works since the string is no longer a keyword. ohsh>@resources.ohsh Show and Set Defaults OHSH relies heavily on default settings that augment user input when executing log commands.  "show defaults" lists all defaults and their settings. ohsh>show defaults createlogfiles = false dateformat = "yyyy-MM-dd HH:mm:ss" datemask = "YYYY-MM-DD HH24:MI:SS" defaultdirectory = "OLHP_DEFAULT_DIR" dop = 4 dpfilecount = 4 fieldterminator = "\u002c" fusepath = "" hadooptnsadmin = "" hadoopwalletlocation = "" hiverootdir = "/user/rhanckel/oracle_warehouse" locationdirectory = "OLHP_STAGE_DIR" logbadrecords = true logdirectory = "" multibyte = false outputlevel = verbose reducetasks = 5 rejectlimit = unlimited rowdelimiter = "\u000a" sessionconf = "" skipcolprefix = "OSL_SKIPCOL_" timestampmask = "YYYY-MM-DD HH24:MI:SS.FF" Many of these defaults are specific to one or more load methods. The semantics of these defaults are explained by: ohsh>help set This elaborates all the defaults and explains what load methods they impact. For example, the default "dateformat" has this explanation. 'set' 'dateformat' <DQSTRING> Sets the default value of a Java dateformat. Load methods: directpath, etl (hadoop-phase), jdbc Sets oracle.hadoop.loader.defaultDateFormat. Practically speaking only a few of these properties might need to be set on a per load basis.  Usually these are the data and timestamp default formats, assuming the formats differ from load to load. Load Statements Load statements will be the subject of the Parts 3, 4, 5, and 6 of the tutorial.  In Part 1 we walked through some simple cases.  The formal load syntax looks like this: <LoadOracleTable> : ('explain')? 'load' 'oracle' 'table' <ResourceName>':'<TableName> 'from' <LoadSource> <LoaderMap>? <Using>? <LoadConf>? <LoadSource> : (<HdfsSource> | <HiveSource>) <HdfsSource> : 'path' <ResourceName>':'<HdfsDirectoryPath> (<FileFormat>)? <HiveSource> : 'hive' 'table' <ResourceName>':'<TableName> ('partition' ('filter')? ('=')? <DQSTRING>)? <FileFormat> : 'fileformat' ('datapump' | 'text' <TextDelimiter>) <LoaderMap> : 'loadermap' <ColumnMapping> (',' <ColumnMapping>)* <ColumnMapping> : ('tcolname' ('=')?)? <STRORDQSTR> (('field' ('=')? )? <STRING>)? (('format' ('=')?)? <DQSTRING>)? <TextDelimiter> : ('fieldterminator' ('=')? <DQSTRING>)? ('rowdelimiter' ('=')? <DQSTRING>)? ('initialencloser' ('=')? <DQSTRING> ('trailingencloser' ('=')? <DQSTRING>)?)? <Using> : 'using' ( 'jdbc' | 'directpath' | 'exttab' <OracleWhereClause>? | 'etl' <OracleWhereClause>? | 'etl' 'deferred' <OracleWhereClause>? ('scriptdir' <STRORDQSTR>)? ) <LoadHiveTable> : ('explain')? ('create' | 'create' 'or' 'replace' | 'load' | 'replace' ) 'hive' 'table' <ResourceName>':'<TableName> 'from' 'oracle' 'table' <ResourceName>':'<TableName> <HiveStorage>? <HiveUsing>? <OracleWhereClause>? <HiveStorage> : 'hiverootdir' <HdfsDirectoryPath> <HiveUsing> : 'using' ('directcopy' | 'fuse' | 'stage') --------------- Common SubClauses -------------- <HdfsDirectoryPath> : <STRORDQSTR> <OracleWhereClause> : <DQSTRING> <ResourceName> : <STRING> <TableName> : <STRORDQSTR> --------------- Lexical Primitives --------------- <BOOLEAN> : 'true' | 'false' <STRORDQSTR> : (<STRING>|<DQSTRING>) <DQSTRING> : "[.]*" e.g. "anything in double quotes!" <STRING> : ('a'..'z'|'A'..'Z'|'0'..'9'|'.'|'/'|'\'|'_'|'+'|'-'|'~')+ e.g. Scott, 123A, /tmp/foo.txt <INTEGER> : ['0'-'9']+ e.g. 1245 Next Steps The next tutorial will focus on using OHSH to load Oracle tables from content living in HDFS files and Hive tables.  

In this "eat your vegetables before having dessert" tutorial we will talk about how to configure OHSH.  We will also discuss groups of OHSH commands that you will need to know about before you start...

Move Data between Apache Hadoop and Oracle Database with SQL Developer

SQL Developer is much loved in the developer community.   A developer starting on the task of moving data between Apache Hadoop and Oracle Database can save lots of time by using a tool they know and like. Download SQL Developer 4.2 and try it out if you are moving data between Apache Hadoop and Oracle Database.   Remember that you always start with the target (destination) of the load.  To load data from Hadoop into Oracle Database: Using Oracle Loader for Hadoop: Start by right-clicking the target table in an Oracle Database connection. Using Oracle SQL Connector for HDFS:  Start by right-clicking the 'Tables' tab in an Oracle Database connection. Using Copy to Hadoop: Start by right-clicking on the 'Tables' tab in a Hive connection. Additional details on getting started are below.   They are also in the SQL Developer help menu. Installing and Getting Started with SQL Developer and Connectors for Apache Hadoop Install SQL Developer Download SQL Developer 4.2. Checkboxes in SQL Developer The following options should be selected in SQL Developer to use the Apache Hadoop connectors Tools -> Features -> Database. Check the box for Oracle SQL Developer – Hadoop Connectors. In the connections panel on the left, right-click on the Oracle Database connections you plan to use, and then select Manage Features -> Hadoop Integration Configure Cloudera-Hive JDBC drivers in SQL Developer The connectors for Apache Hadoop require Cloudera JDBC drivers.  You can download them from cloudera.com, and upload them as follows Tools -> Preferences -> Database -> Third party JDBC drivers Using Cloudera_HiveJDBC4_*.zip is recommended.   Download from cloudera.com, unzip to a local directory, and add to SQL Developer as follows: Environment Variables Oracle Loader for Hadoop, Oracle SQL Connector for HDFS and Copy to Hadoop ‘directcopy’ option require an SSH connection to a Hadoop client/edge node/Hadoop cluster node.  Copy to Hadoop ‘stage’ option requires an SSH connection to the database node.   See the section Create SSH Hosts in SQL Developer for steps to create an SSH connection. The home directory of the SSH connection (for example, /home/<user>) should contain the following files with environment variables.   See Appendix A for information on creating these files and adding the required environment variables. Copy to Hadoop: .sqldev_cp2hadoop_env Oracle Loader for Hadoop: .sql_olh_env Oracle SQL Connector for HDFS: .sqldev_osch_env Setting up Connections Create SSH hosts in SQL Developer To use the SQL developer wizards for Oracle Loader for Hadoop, Oracle SQL Connector for HDFS and Copy to Hadoop ‘directcopy’ option you will need to create SSH host connections to a Hadoop client/ edge node/Hadoop cluster node.  (To run Copy to Hadoop ‘stage’ option you will need to create an SSH connection to the Oracle Database node.) view -> SSH to see the SSH Hosts pane SSH Hosts -> New SSH Host -> Enter hostname and user name Click OK Note that the home directory of the SSH connections should contain the files with environment variables as explained in the previous section.  See Appendix A for more details. To run Copy to Hadoop ‘stage’ option you will need to create an SSH connection to the Oracle Database node, as user ‘oracle.’  It is important the user is ‘oracle.’ Oracle Database Connections Set up connections to Oracle Database schemas. Apache Hive Connections Open the new connection dialog box Click on the ‘Hive’ tab.  If the ‘Hive’ tab does not display next to the ‘Oracle’ tab it is likely that the Hive JDBC drivers did not install correctly. Username: On the BDA the default is ‘oracle’ Host name: Hostname where hiveserver2 is running Port: Usually 10000 Database: Hive schema you want to connect to Getting Started You are now ready to get started using the SQL Developer wizards for the Apache Hadoop connectors.  A rule of thumb is the wizards start from the destination object. Oracle Loader for Hadoop Right click on a target table in Oracle Database.  If this option is grayed out, you may need to select “Manage Features -> Hadoop Integration” on the database connection you are using.  See “Checkboxes in SQL Developer” for more information. Oracle SQL Connector for HDFS Right click on the ‘Tables’ tab in the Oracle Database connection.  If this option is grayed out, you may need to select “Manage Features -> Hadoop Integration” on the database connection you are using.  See “Checkboxes in SQL Developer” for more information. Copy to Hadoop Right click on the ‘Tables’ tab in the Apache Hive connection Appendix A Copy to Hadoop environment file The file .sqldev_cp2hadoop_env has to be create in the home directory (for example, /home/<user>) of the SSH Host.  The contents of the file are: #!/bin/bash export CP2HADOOP_HOME=<parent directory of the directory containing Copy to Hadoop jars.  On the BDA this is /opt/oracle/bigdatasql/bdcell-12.1> export HADOOP_CLASSPATH=${CP2HADOOP_HOME}/jlib/* export PATH=${HADOOP_HOME}/bin:${PATH} # If using Oracle Wallet export WALLET_LOCATION=<location of the wallet files, if using Oracle Wallet.  For example, /home/${USER}/wallet> export TNS_ADMIN=<location of the wallet files, if using Oracle Wallet. TNS_ADMIN should point to the same location. For example, /home/${USER}/wallet> export CLUSTER_WALLET_LOCATION=${WALLET_LOCATION} export CLUSTER_TNS_ADMIN=${TNS_ADMIN}   Oracle Loader for Hadoop environment file The file .sqldev_olh_env has to be create in the home directory (/home/<user>) of the SSH Host.  The contents of the file are: #!/bin/bash export HIVE_HOME=<Example: /opt/cloudera/parcels/CDH/lib/hive> export HIVE_CONF_DIR=<Example: /etc/hive/conf> export OLH_HOME=<Example on Oracle Big Data Appliance: /opt/oracle/olh> export HADOOP_CLASSPATH=${OLH_HOME}/jlib/*:${HIVE_CONF_DIR}:${HIVE_HOME}/lib/* export OLH_LIB_JARS=${HIVE_HOME}/lib/hive-exec.jar,${HIVE_HOME}/lib/hive-metastore.jar,${HIVE_HOME}/lib/libfb303-0.9.2.jar   #If using Oracle Wallet   export WALLET_LOCATION=<location of the wallet files, if using Oracle Wallet.  For example, /home/${USER}/wallet> export TNS_ADMIN=<location of the wallet files, if using Oracle Wallet. TNS_ADMIN should point to the same location. For example, /home/${USER}/wallet> export CLUSTER_WALLET_LOCATION=${WALLET_LOCATION} export CLUSTER_TNS_ADMIN=${TNS_ADMIN}   Oracle SQL Connector for HDFS environment file The file .sqldev_osch_env has to be create in the home directory (/home/<user>) of the SSH Host.  The contents of the file are: #!/bin/bash export HIVE_HOME=<Example: /opt/cloudera/parcels/CDH/lib/hive> export HIVE_CONF_DIR=<Example: /etc/hive/conf> export OSCH_HOME=<Example on Oracle Big Data Appliance: /opt/oracle/osch> export HADOOP_CLASSPATH=${OSCH_HOME}/jlib/*:${HIVE_CONF_DIR}:${HIVE_HOME}/lib/*   #If using Oracle Wallet   export WALLET_LOCATION=<location of the wallet files, if using Oracle Wallet.  For example, /home/${USER}/wallet> export TNS_ADMIN=<location of the wallet files, if using Oracle Wallet. TNS_ADMIN should point to the same location. For example, /home/${USER}/wallet> export CLUSTER_WALLET_LOCATION=${WALLET_LOCATION} export CLUSTER_TNS_ADMIN=${TNS_ADMIN}     Note: Oracle Loader for Hadoop and Oracle SQL Connector for HDFS are licensed with Oracle Big Data Connectors.   Oracle Copy to Hadoop is licensed with Oracle Big Data SQL. 

SQL Developer is much loved in the developer community.   A developer starting on the task of moving data between Apache Hadoop and Oracle Database can save lots of time by using a tool they know and...

Using Copy to Hadoop to copy Oracle Database data into Parquet format in HDFS

It is very easy to copy Oracle Database tables to Parquet format in HDFS. When Using Copy to Hadoop with OHSH  If using Copy to Hadoop with OHSH, with one additional step you can convert the Oracle Data Pump files into Parquet. ohsh> %hive_moviedemo create movie_sessions_tab_parquet stored as parquet as select * from movie_sessions_tab; hive_moviedemo is a Hive resource (we created that in the blog post on using Copy to Hadoop with OHSH).   Within OHSH you are using Hive to convert the data pump files to Parquet.   When Using Copy to Hadoop with SQL Developer You can select Parquet as the destination format when using SQL Developer.                 When Running Copy to Hadoop as a Hadoop job (for power users) The Hadoop job for the directcopy option syntax is the following.   Refer to Appendix B in the Oracle Big Data SQL User's Guide for more details. # hadoop jar ${CP2HADOOP_HOME}/jlib/orahivedp.jar oracle.hadoop.ctoh.CtohDriver \ -D oracle.hadoop.ctoh.connection.tnsEntry=<my-oracle-tns-entry> \ -D oracle.hadoop.ctoh.connection.walletLoc=<local-oracle-wallet-dir> \ -D oracle.hadoop.ctoh.connection.tnsAdmin=<local-oracle-wallet-dir> \ -D oracle.hadoop.ctoh.connection.clusterWalletLoc=<oracle-wallet-dir-on-hadoop-cluster> \ -D oracle.hadoop.ctoh.connection.clusterTnsAdmin=<oracle-wallet-dir-on-hadoop-cluster> \ -D mapreduce.output.fileoutputformat.outputdir=<mytab-hdfs-output-dir> \ -D oracle.hadoop.ctoh.splitterType="BLOCK_SPLITTER" \ -D oracle.hadoop.ctoh.table=<dbSchema.dbTable> \  -D oracle.hadoop.ctoh.maxSplits=10     You would then create a Hive table on the Oracle Data Pump files in <mytab-hdfs-output-dir>.   This makes the data accessible in Hive. hive> create external table <mytab-hive>  row format   serde 'oracle.hadoop.hive.datapump.DPSerDe' stored as   inputformat 'oracle.hadoop.hive.datapump.DPInputFormat'   outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' location <mytab-hdfs-output-dir>; Now you can convert the data to Parquet in Hive. hive> create <mytab-parquet> stored as parquet as select * from <mytab-hive>;      

It is very easy to copy Oracle Database tables to Parquet format in HDFS. When Using Copy to Hadoop with OHSH  If using Copy to Hadoop with OHSH, with one additional step you can convert the Oracle...

Connecting Hadoop with Oracle

Oracle Loader for Hadoop and Performance Tuning

After following the tips below your Oracle Loader for Hadoop jobs should fly!   Summary   Check your network bandwidth Use OCIOutputFormat if possible (OCIOutputFormat requires the target table to be partitioned, if target table is not partitioned, see whether you can make it a partitioned table) Check parallelism (this is the number of reduce tasks when loading into a partitioned table) Use SDP (in particular when using InfiniBand) Confirm that balancer is enabled, a well-balanced load job is the fastest Now we will look at each topic in more detail.     Network Bandwidth The first thing to check is the network bandwidth you have between your Hadoop cluster and Oracle Database. You are more likely to be bound by network bandwidth than by CPU. So if your connection is 1 GigE see whether you can make it 10 GigE or more. InfiniBand would be great, if possible. OutputFormat OutputFormat determines how the data is written to the database.  You have the choice of using the JDBCOutputFormat or OCIOutputFormat for your job.   JDBCOutputFormat uses INSERT statements to write to the database.  It can work with partitioned and non-partitioned target tables.  Inserted rows are committed in batches (see section 'Load Batch Size' below for more details).   In the event of failure in the middle of a load job, only that batch will be rolled back.  When a job fails identifying the rows that did not load and re-starting the load has to be done manually. OCIOutputFormat uses direct path load to write to the database.  It requires the target table to be partitioned.  Inserted rows are committed after the entire job completes.   So in the event of failure the entire job is rolled back. OCIOutputFormat is faster than JDBCOutputFormat.   If the target table is partitioned, we always recommend OCIOutputFormat.  If the target table is not partitioned, try to partition the table.  Use JDBCOutputFormat only when you cannot use OCIOutputFormat. Parallelism The most important feature in performance tuning of Oracle Loader for Hadoop is parallelism. The most significant performance boost comes when the load is parallel. Below are some tips to ensure and verify your Oracle Loader for Hadoop job is running in parallel. Paralleism when using JDBCOutputFormat The degree of parallelism is the same as the number of map tasks (when the target table is not partitioned) and the same as the number of reduce tasks (when the target table is partitioned). When the target table is not partitioned: Determining the number of map tasks happens automatically as part of the job.  No user action is needed.  The maximum number of map tasks is the minimum of (number of splits, maximum capacity of the cluster). If it appears that there is no parallelism, it might because only one map task is created. One possible reason for that is the data is non-splittable. For example, gzip compressed data is non-splittable. Then the entire file will be loaded by one map task. Try to split the data into multiple files that are individually gzip compressed to increase parallelism. Use Cloudera Manager (if using CDH) or Ambari (if using HDP) to check the number of map tasks. The number of map tasks will match the number of database connections.  This is the degree of parallelism during load. When the target table is partitioned: Refer to the section below When using OCIOutputFormat.   Parallelism when using OCIOutputFormat The degree of parallelism is the same as the number of reduce tasks.   Use the property mapreduce.job.reduces to select the number of reduce tasks. Again, use Cloudera Manager or Ambari to confirm that the number of reduce tasks of the job. The number of reduce tasks will match the number of database connections. If the target database is Exadata, you can start with the number of reduce tasks at 64. Then increase the number of reduce tasks and measure load performance to determine an optimal number of reduce tasks to use. OCIOutputFormat requires the target table to be partitioned. The number of reduce tasks can write to a single partition. Even if the table has only one partition, multiple reduce tasks will parallelize the load. Load Batch Size When using JDBCOutputFormat, data is loaded in batches. The default size is 100. You can increase this value by setting the property oracle.hadoop.loader.defaultExecuteBatch. However, we do not recommend increasing this value by a lot, as it will use more memory during load and you might not see real load speed gains. In practice the default value of 100 works very well. This property does not apply to OCIOutputFormat. Network Connection Using the SDP transport protocol can improve performance by about 5-10%. In particular, if your network connection between Hadoop and the database is InfiniBand you will want to use SDP.  SDP is a network transport protocol that uses RDMA technology that allows network interfaces to move data packets directly into RAM without involving CPU. Use of SDP protocol only applies to OCIOutputFormat. If you are using InfiniBand, you are probably using Oracle Big Data Appliance and Oracle Exadata. Configure the Exadata listener with an SDP port (refer to the Oracle Big Data Appliance Software User's Guide for details), and use an additional (in addition to the standard database connection string) Oracle connection descriptor for SDP: <property>   <name>oracle.hadoop.loader.connection.oci_url</name>  <value>(DESCRIPTION=(ADDRESS=(PROTOCOL=SDP)(HOST=192.168.40.200)(PORT=1523))(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=dbm))) </value> </property> Balancing Your Load This only applies to OCIOutputFormat. Data is often skewed. For example, when loading into a partitioned target table, some partitions might have more data than others. Without balancing, reduce tasks loading into the larger partitions will do more work than others. Other reduce tasks will finish early. Oracle Loader for Hadoop has a nice feature to balance the load so that all reduce tasks share the work equally. This makes the load faster, as the load is only as fast as the longest running reducer. The balancer feature is enabled by default, whenever the target table is partitioned and when the load method is OCIOutputFormat. You can enable or disable balancer by using the property oracle.hadoop.loader.sampler.enableSampling. <property>   <name>oracle.hadoop.loader.sampler.enableSampling</name>  <value>true </value> </property> You can verify that the data is being sampled and used for balancing the load by checking console output when you run a load job. The first half page of output will contain information on whether balancing is enabled, such as: …… INFO balancer.Balancer: Creating balancer …… INFO balancer.Balancer: Starting Balancer ----- a few lines later ---- …… INFO balancer.Balancer: Balancer completed   Another indication that balancer was enabled is the creation of the _balancer directory. $ hadoop fs -ls <value of property mapreduce.output.fileoutputformat.outputdir> Found 3 items -rw-r--r-- 1 oracle oracle 0 2017-02-17 16:55 <HDFS dir>/_SUCCESS drwxr-xr-x - oracle oracle 0 2017-02-17 16:55 <HDFS dir>/_balancer drwxr-xr-x - oracle oracle 0 2017-02-17 16:55 <HDFS dir>/_olh

After following the tips below your Oracle Loader for Hadoop jobs should fly!   Summary   Check your network bandwidth Use OCIOutputFormat if possible (OCIOutputFormat requires the target table to be...

Loading Parquet Files Using Oracle Loader for Hadoop

Oracle Loader for Hadoop (OLH) can load Parquet data.  It does so by reading Parquet data via a Hive table. So if you have Parquet data that you want to load intoOracle Database using Oracle Loader for Hadoop, the first step is to create aHive table over the Parquet data.  Forexample: create external table oracletest.my_hivetab_on_parquet(f1decimal(38,18), i2 int, v3 varchar(50), d4 timestamp, t5 timestamp, v6varchar(200), i7 int)       stored asparquet       location'/user/hive/warehouse/oracletest.db/parquet_data'; It would be nice ifthe syntax was simpler than listing out all the Hive column names and datatypes, wouldn't it!   But unfortunately there is no way around that for now. Once  you have the parquet table you can assemble the confirmation properties just as youwould when using Oracle Loader for Hadoop to load from a Hive table.  Note that the Hive table name in the configuration properties below (value for theproperty oracle.hadoop.loader.input.hive.tableName) is the Hive table we justcreated, my_hivetab_on_parquet. olh_parquet.xml: <?xml version="1.0"encoding="UTF-8"?> <configuration> <!-- Input settings --> <property> <name>mapreduce.inputformat.class</name> <value>oracle.hadoop.loader.lib.input.HiveToAvroInputFormat</value>  </property> <property> <name>oracle.hadoop.loader.input.hive.databaseName</name> <value>oracletest</value> </property> <property> <name>oracle.hadoop.loader.input.hive.tableName</name> <value>my_hivetab_on_parquet</value> </property> <!-- Output settings --> <property> <name>mapreduce.job.outputformat.class</name> <value>oracle.hadoop.loader.lib.output.OCIOutputFormat</value> </property> <property> <name>mapreduce.output.fileoutputformat.outputdir</name> <value>temp_out_session_p</value> </property> <!-- Table information --> <property> <name>oracle.hadoop.loader.loaderMap.targetTable</name> <value>MY_ORA_TAB</value> </property> <property> <name>oracle.hadoop.loader.input.fieldNames</name> <value>F1,I2,V3,D4,T5,V6,I7</value> </property> <property> <name>oracle.hadoop.loader.defaultDateFormat</name> <value>yyyy-MM-dd HH:mm:ss</value> </property> <!-- Connection information --> <..... the usual connection information.....>  Now, you can submit the Oracle Loader for Hadoop job to loadthe data in Parquet format: #!/bin/bash exportOLH_HOME=/opt/oracle/olh exportHIVE_CONF_DIR=/etc/hive/conf exportHIVE_HOME=/opt/cloudera/parcels/CDH/lib/hive exportHADOOP_CLASSPATH=$HADOOP_CLASSPATH:$OLH_HOME/jlib/*:$HIVE_HOME/lib/*:$HIVE_CONF_DIR hadoop jar${OLH_HOME}/jlib/oraloader.jar \ oracle.hadoop.loader.OraLoader \ -conf /home/oracle/olh_parquet.xml \        -libjars/opt/cloudera/parcels/CDH/lib/hive/lib/hive-exec.jar,/opt/cloudera/parcels/CDH/lib/hive/lib/hive-metastore.jar,/opt/cloudera/parcels/CDH/lib/hive/lib/libfb303-0.9.2.jar\ -D mapred.reduce.tasks=4 

Oracle Loader for Hadoop (OLH) can load Parquet data.  It does so by reading Parquet data via a Hive table. So if you have Parquet data that you want to load intoOracle Database using Oracle Loader for...

Copy to Hadoop with OShell (OHSH)

Here are some Copy to Hadoop examples to use with Oracle Big Data Lite VM.  Copy to Hadoop is a feature of Oracle Big Data SQL. Starting with Big Data SQL 3.1 (available with Oracle Big Data Lite VM 4.7), there are two options when using Copy to Hadoop. directcopy: A Hadoop job copies data from the database table directly to HDFS stage: Data is first written as Oracle Data Pump files on the database file system, and then copied over to HDFS The default is directcopy.  stage option is useful when there is no direct connection between Oracle Database and the Hadoop cluster.    #Startup OHSH prompt> ohsh Set up some resources  # Create a hive resource for hive database MOVIEDEMO ohsh> create hive resource hive_moviedemo \connectionurl="jdbc:hive2://bigdatalite.localdomain:10000/moviedemo"  # Create a SQL resource for Oracle Database.  You will be prompted for username and password (moviedemo/welcome1). ohsh> create sqlplus resource sql  \connectid="bigdatalite.localdomain:1521/orcl" # Create a JDBC resource to connect to Oracle Database.  You will be prompted for username and password (moviedemo/welcome1). ohsh> create oracle jdbc resource moviedemo \connectid="bigdatalite.localdomain:1521/orcl" Now you are ready to run some Copy to Hadoop examples # Copy the table movie_sessions_tab in Oracle Database schema MOVIEDEMO to Hive schema MOVIEDEMOohsh> create hive table hive_moviedemo:movie_sessions_tab from oracle table \ moviedemo:movie_sessions_tab;  # Verify the results of the copy by comparing the number of rowsohsh> %sql select count(*) from movie_sessions_tab;%hive_moviedemo select count(*) from movie_sessions_tab; # Append additional rows to the tableohsh> load hive table hive_moviedemo:movie_sessions_tab from oracle table \ moviedemo:movie_sessions_tab where "(cust_id = 1446522)"; # Verify the results again - you will have four more rows%hive_moviedemo select count(*) from movie_sessions_tab; Using the stage option You will need database directories for staging the Oracle Data Pump files and for log files.  You can create them outside of OHSH, or from OHSH itself for convenience.   For example: # Create directories for staging data pump files and log files ohsh> %bash0 mkdir /home/oracle/src/samples/c2hadoop/dmpfilesohsh> %bash0 mkdir /home/oracle/src/samples/c2hadoop/logs # Create database directories for these directories # Temporary database directory for staging data pump files ohsh> %sql create or replace directory C2H_DP_DIR as \'/home/oracle/src/samples/c2hadoop/dmpfiles'; # Database directory for log files ohsh> %sql create or replace directory C2H_LOG_DIR as \'/home/oracle/src/samples/c2hadoop/logs'; # set OHSH resources for database directories ohsh> set locationdirectory C2H_DP_DIRohsh> set defaultdirectory C2H_LOG_DIR  # You can now again copy the table movie_sessions_tab in Oracle Database schema MOVIEDEMO to Hive schema  # MOVIEDEMO, this time copying to the Hive table movie_sessions_tab_stage.   # Note the clause using stage at the end. ohsh> create hive table hive_moviedemo:movie_sessions_tab_stage from oracle table \ moviedemo:movie_sessions_tab using stage; 

Here are some Copy to Hadoop examples to use with Oracle Big Data Lite VM.  Copy to Hadoop is a feature of Oracle Big Data SQL. Starting with Big Data SQL 3.1 (available with Oracle Big Data Lite VM...

Using Spark with Data Copied using Copy to Hadoop

Copy to Hadoop copies data from an Oracle Database table toHDFS, as Oracle Data Pump files.  Thesefiles can be accessed by Hive tables using a SerDe that is part of Copy toHadoop. So Hive queries can be runagainst this data. What if you would like to include this data in a Spark ML(machine learning) application? A Sparkdata frame can access Oracle Data Pump files via Hive. Below is an example. Configure the Spark Installation to use Hive To read Oracle Data Pump files into a Spark Data Frame theSpark installation must be configured with a Hive installation. One way to achieve this is to modify theconf/spark_env.sh and append HIVE_CONF_DIR to the environment variableSPARK_DIST_CLASSPATH. If your Sparkinstallation is not built with Hive, then Hive related jars should also beappended to the SPARK_DIST_CLASSPATH. Asimple way to do this is to append HIVE_HOME/lib/* toSPARK_DIST_CLASSPATH. This should bedone on all nodes in the cluster. Moredetail about manipulating SPARK_DIST_CLASSPATH can be found in https://spark.apache.org/docs/1.5.1/hadoop-provided.html Happily, in the Big Data Lite VM Spark isconfigured to work with Hive. Copy data from moviedemo schema in Oracle Database usingCopy to Hadoop Let us now copy data from the moviedemo schema and create aHive external table. We will do thisfrom the OHSH (Oracle Shell for Hadoop Loaders) CLI. As described in this blog post, create some OHSHresources. Then copy the table ohsh> create hive tablehive_moviedemo:movie_oracledb_tab \from oracle table moviedemo:movie_local_tab using stage (Note on using this example in Oracle Big Data Lite: If the tablemoviedemo:movie_local_tab in Oracle Database is empty, load it first using OLH. ohsh> set dateformat“yyyy-MM-dd:HH:mm:ss” ohsh> load oracle tablemoviedemo:MOVIE_LOCAL_TAB from \hive table hive_moviedemo:movieapp_log_stage_1 database=moviedemo usingdirectpath If you need to create the tableMOVIE_LOCAL_TAB, the SQL is available in/home/oracle/movie/moviework/olh/movie_tab.sql. Now you can copy this table backusing Copy to Hadoop ) Access Hive external table over Oracle Data Pump files inSpark Now let usaccess the data in the Hive external table movie_oracledb_tab from Spark. Start up spark-shell with Copy to Hadoop jars spark-shell has to be launched by specifying the Copy toHadoop jars in the --jars option. prompt> spark-shell --jarsorahivedp.jar, ojdbc7.jar, oraloader.jar, orai18n.jar, ora-hadoop-common.jar In Oracle Big Data Lite these files are located in/u01/orahivedp/jlib. On the Big DataAppliance with Big Data SQL installed, these jars will be located in /opt/oracle/bigdatasql/bdcell-<ver>/jlib Verify thetype of SQLContext in spark-shell: scala>sqlContext Your outputwill be something like: res0:org.apache.spark.sql.SQLContext= org.apache.spark.sql.hive.HiveContext@66ad7167 If thedefault sqlContext is not HiveContext, you have to create it: scala> val hiveContext = neworg.apache.spark.sql.hive.HiveContext(sc) You can nowcreate a Data Frame movie_oracledb_df that points to the Hive external tablemovie_oracledb_tab scala> val movie_oracledb_df = sqlContext.table("moviedemo.movie_oracledb_tab") Now you canaccess data via the data frame scala> movie_oracledb_df.countres3: Long = 39716 scala> movie_oracledb_df.head The resultwill be something like: res4: org.apache.spark.sql.Row =[1217572.000000000000000000,4951.000000000000000000,6.000000000000000000,2012-08-0409:53:46.0,0E-18,1.000000000000000000,2.000000000000000000,null] scala> movie_oracledb_df.groupBy("genreid").count().show() +--------------------+-----+ | genreid|count|+--------------------+-----+|45.00000000000000...| 232||46.00000000000000...| 167||51.00000000000000...| 1||53.00000000000000...| 27||-1.00000000000000...| 20||1.000000000000000000| 66||2.000000000000000000| 80||3.000000000000000000| 995||6.000000000000000000| 910||7.000000000000000000| 459||8.000000000000000000| 319||9.000000000000000000| 367||10.00000000000000...| 50||11.00000000000000...| 269||12.00000000000000...| 288||14.00000000000000...| 189||15.00000000000000...| 306||16.00000000000000...| 147||17.00000000000000...| 341||18.00000000000000...| 186|+--------------------+-----+only showing top 20 rows Access Oracle Data Pump files in Spark If a Hive external table had not been created over OracleData Pump files created by Copy to Hadoop, you can create the Hive externaltable from within Spark. scala>sqlContext.sql(“CREATE EXTERNAL TABLE movie_oracledb_tab ROW FORMAT SERDE 'oracle.hadoop.hive.datapump.DPSerDe' STORED ASINPUTFORMAT 'oracle.hadoop.hive.datapump.DPInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '/user/oracle/oracle_warehouse/moviedemo'") Change thelocation of the Oracle Data Pump files if they are in a different location. Using Spark MLLib to build a recommendation system Here we will demonstrate how to integrate the Data Frame wecreated with Spark MLLib and run some fairly complex machine learningoperations on it. This example isadapted from http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html The Hive external table moviedemo.movie_oracledb_tab has alot of values in the rating columns NULL. Let us extract all rows with non-NULL ratings. scala> val movie_oracledb_df_notnull= movie_oracledb_tab.filter(“rating is not null”) You will see something like this on screen: movie_oracledb_df_notnull:org.apache.spark.sql.DataFrame = [custid: decimal(38,18), movieid:decimal(38,18), genreid: decimal(38,18), time: timestamp, recommended:decimal(38,18), rating: decimal(38,18), activity: decimal(38,18), sales:decimal(38,18)] We need to import some classes from Spark MLLib to run theexample. scala>importorg.apache.spark.mllib.recommendation.ALS scala>importorg.apache.spark.mllib.recommendation.MatrixFactorizationModel scala>importorg.apache.spark.mllib.recommendation.Rating Thealgorithm used in this example is the Alternating Least Square (ALS)algorithm. See http://dl.acm.org/citation.cfm?id=1608614 for details of this algorithm. To use ALSin our data, we need to transfer the data frame movie_oracledb_tab_notnull intoan RDD of ratings. We do this byextracting the columns cust_id(0), movie_id(1), rating(6) from movie_oracledb_df_notnull. scala> val ratings = movie_oracledb_df_notnull.map{ row =>Rating(row.getDecimal(0).intValue(),row.getDecimal(1).intValue(),row.getDecimal(6).doubleValue())} On screen: ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating]= MapPartitionsRDD[26] at map at <console>:32 The ALS algorithm has manyparameters. In this example we use theparameters rank and iterations. Rankdesignates the number of latent factors in the model and iterations specifiesthe number of iterations to run. For afull description of parameters please see http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.recommendation.ALS scala> val rank = 10 scala> val numIterations = 10 scala> val model =ALS.train(ratings, rank, numIterations) You will see on screen something like: model:org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@3c1c984f After we build the model we canevaluate the model. For simplicity wewill use the same data, but of course that would make no sense in the realworld. scala> val usersProducts =ratings.map{ case Rating(user,product,rate) => (user, product) } To predict user ratings scala> val predictions =model.predict(usersProducts).map{ case Rating(user,product,rate)=>((user,product), rate) } Compute the mean standard error usingthe predictions and actual rating. scala>val ratesAndPreds = ratings.map { case Rating(user, product,rate) =>((user, product), rate) }.join(predictions) scala>val MSE = ratesAndPreds.map { case ((user, product), (r1, r2))=>val err = (r1 - r2) err * err}.mean() scala>println("MeanSquared Error = " + MSE) We canpersist the model in HDFS to use later: model.save(sc,“hdfs:///user/oracle/collabFilter”) val.savedModel= MatrixFactorizationModel.load(sc, “hdfs://user/oracle/collabFilter”)

Copy to Hadoop copies data from an Oracle Database table to HDFS, as Oracle Data Pump files.  These files can be accessed by Hive tables using a SerDe that is part of Copy toHadoop. So Hive queries...

Using Oracle SQL Connector for HDFS with Oracle Wallet

It is easy to use Oracle SQL Connector for HDFS with aclient-side Oracle Wallet.  Oracle Walletis a secure container that stores authentication and signing credentials. This is a secure way to automate connectionsto the database and simplify large-scale deployments that rely on passwordcredentials to connect to the database. Refer to Oracle Wallet documentation here. You can create the OSCH external table from either thedatabase node, or from a Hadoop node, edge node, or Hadoop client. If you using a Hadoop node, edge node orHadoop client, you will likely not have a database client installed. You will need to copy tnsnames.ora andsqlnet.ora from the database install location to a location on your client touse Wallet (see below). Below is an example of using Oracle Wallet with Oracle SQLConnector for HDFS on the Oracle Big Data Lite VM. Step 1 1. 1. Create a Wallet store prompt> mkstore –wrl  <wallet_location> -createFor example,prompt> mkstore –wrl /home/oracle/oracle_wallet-create Step 2 2. 2. Create a credential to log into the database prompt> mkstore –wrl <wallet_location> -createCredential<db_connect_string> <user>For example, prompt> mkstore –wrl /home/oracle/oracle_wallet -createorcl moviedemo 3. If       Step 3           If you are creating the OSCH external table froma Hadoop node, edge node, or Hadoop client, copy the files $ORACLE_HOME/network/admin/tnsnames.oraand $ORACLE_HOME/network/admin/sqlnet.ora from ORACLE_HOME to a location onyour client. 4.     Step 4       Make the entry in tnsnames.ora as documented inthe Oracle Wallet documentation. Thismight already exist. For example: ORCL = (DESCRIPTION = (ADDRESS = (PROTOCOL = TCP)(HOST =localhost)(PORT = 1521)) (CONNECT_DATA = (SERVER = DEDICATED) (SERVICE_NAME = orcl) ) ) 5.     Step 5         Make the entry in sqlnet.ora as documented inthe Oracle Wallet documentation. Forexample: WALLET_LOCATION = (SOURCE = (METHOD = FILE) (METHOD_DATA = (DIRECTORY = /home/oracle/oracle_wallet) ) ) SQLNET.WALLET_OVERRIDE =TRUE Now you are ready to use this Walletcredential with OSCH. Step 1 Add the following properties to the OSCH configproperties. oracle.hadoop.connection.wallet_location Example: <property> <name>oracle.hadoop.connection.wallet_location</name> <value>/home/oracle/oracle_wallet</value> </property> oracle.hadoop.connection.tnsEntryName Example: <property> <name>oracle.hadoop.connection.tnsEntryName</name> <value>orcl</value> </property> oracle.hadoop.connection.tns_admin Example: <property> <name>oracle.hadoop.connection.tns_admin</name> <value>/u01/app/oracle/product/12.1.0.2/dbhome_1/network/admin</value> </property> Step 2 Remove the following connection properties thatmight have been used to connect to the database by interactively typing thepassword. oracle.hadoop.connection.urland oracle.hadoop.connection.user Now you are ready to use OSCH and connect to the databaseusing Oracle Wallet. Run your OSCHcommands as before. OSCH will use thecredentials stored in Oracle Wallet and will not prompt you for a password.

It is easy to use Oracle SQL Connector for HDFS with a client-side Oracle Wallet.  Oracle Walletis a secure container that stores authentication and signing credentials. This is a secure way to...

Oracle Shell for Hadoop Loaders (OHSH)

Oracle Shell for Hadoop Loaders (OHSH) is an intuitivecommand line tool to move data between Hadoop and Oracle Database. OHSH uses the concept of resources to identify datasources and destinations.   Hive resource: For executing Hive commands andspecifying Hive as a source or destination Hadoop resource: For executing HDFS commands tonavigate HDFS and use HDFS as a source or destination SQL resource: For executing SQL commands in adatabase schema JDBC resource: To make JDBC connections to thedatabase For example, hive0 is a resource created whenOHSH starts up, and enables access to the Hive database default. hadoop0is a resource (also created when OHSH starts up) that enables access to HDFS. Additional resources can be created to connect other Hivedatabases, and to Oracle Database schemas. Here are some examples you can use in the Oracle Big DataLite VM 4.5. # Startup OHSH prompt> ohsh # Create a hive resource for hive database MOVIEDEMO ohsh> create hive resourcehive_moviedemo database="MOVIEDEMO" # Create a SQL resource for Oracle Database user MOVIEDEMO ohsh> create sqlplus resource sqluser="MOVIEDEMO" \ connectid="bigdatalite.localdomain:1521/orcl" # Create a JDBC resource to connect to Oracle Database ohsh> create oracle jdbc resource moviedemouser="MOVIEDEMO" \ connectid="bigdatalite.localdomain:1521/orcl" Now you can move data between the source and destinationidentified by these resources.  Here is an example to run Oracle Loader for Hadoop. # Load data from Hive table moviedemo.movieapp_log_stageinto a table in the # OracleDatabase moviedemo.movie_local_tab ohsh> load oracle tablemoviedemo:MOVIE_LOCAL_TAB \from hive table hive_moviedemo:movieapp_log_stage database=moviedemo \using directpath # Fromwithin the same UI, check that the rows have been loaded into the OracleDatabase tableohsh> %sql select count(*) from MOVIE_LOCAL_TAB;

Oracle Shell for Hadoop Loaders (OHSH) is an intuitive command line tool to move data between Hadoop and Oracle Database. OHSH uses the concept of resources to identify data sources and destinations.   H...

A Hybrid Marriage: Connecting Exadata and Hadoop hosted on AWS

Guest post by Javier De La Torre Medina and Iwona Rajca In this post we describe how to setup and use Oracle SQLConnector for Hadoop (OSCH) for data query between Cloudera’s DistributionIncluding Apache Hadoop hosted on AWS and Oracle Exadata. OSCH leverages Oracle’sExternal tables technology and enables queries on Hadoop and Hive from the database. AWSis a good starting point to build your own test Hadoop environment. Many enterprises who are experimenting with Hadoop in thecloud setup will likely have engineered systems like Exadata on premise – andlook to combine the two to create a powerful blended architecture. Part 1: Configure Exadata as a Hadoop client The first thing to do is networking setup between Exadataand Cloudera. Make sure that Exadata can access Hadoop on AWS. There are a fewthings to be aware of: check the firewall setup and confirm that the ports forHiveMetastore, DataNodes, Zookeeper, and Namenode are open. To set up the networking you might need to make changes onthe AWS cluster. The rest of the configuration is done solely on the Exadataside. Oracle provides comprehensive documentation on how to setup OSCHfor a remote Hadoop cluster – check the latest Connnectors User’s Guide here.The main tasks you need to perform are: Download & install OSCH on Exadata. Download and install selected Cloudera packages(Hadoop and Hive) on Exadata. Configure Hadoop client on Exadata. The documentation contains detailed steps. Part 1: Setup OSCH for a remote Hadoop cluster Download & install OSCH. Download the OSCH connector form OTNand unpack it. The OSCH directory should be placed in a shared directory sothat all nodes in Exadata can access it. Download and install selected Cloudera Hadoopclient. To be able to run queries from Exadata on AWS-hosted CDH, youneed to install and configure Hadoop and Hive client packages on the clientsystem. You can do an RPM install with Cloudera-provided packages. Alternatively you can download aCloudera-provided tarballthat matches the version of your CDH on AWS. For the tarball install unzip the following packages: hadoop-<version>.tar.gz hive-<version>.tar.gz When theseare installed set up your HADOOP_CLASSPATH environmental variable to point to: the path to the JAR files for OSCH (your OSCH installationfolder), e.g. path/orahdfs-3.3.0/jlib/* Hive JAR files and conf directory (in a defaultRPM install these are: /usr/lib/hive/lib/*, /etc/hive/conf) Configure Hadoop client. Afterinstalling CDH packages, you must configure the Hadoop client for use with CDHon AWS. Cloudera Manager automatically generates the configuration files forthe Hadoop client – they will include your cluster network configurationdetails. To get them, log in to Cloudera Manager as admin and right next to youcluster name click on a drop down and select “View Client Configuration URLs”option. From there download YARN and Hive packages. Once you get the zipcontents unpacked, move them to configuration directories of Hadoop and Hive – ina default RPM install these are /etc/hadoop/conf and /etc/hive/conf respectively. Test theconnection by accessing HDFS files and running Hive queries. Part 2: Configure Oracle SQL Connector for Hadoop Once you can run your Hive jobs, configuring OSCH is amatter of minutes. Start by creating a folder in a shared location that willhost external table location files when an external table is created – e.g. path/ext_table_dir. You can use the ASMCluster File System in Exadata to have a shared folder. To access Hive, an Oracle user requires the basic createsession, create table and create view privileges. For OSCH to make a Hive toOracle mapping, a Sysdba should specify the following:  - create a database directory to point to theorahdfs-version/bin directory where hdfs_stream file resides. E.g.: create orreplace directory osch_bin_path as 'path/orahdfs-3.1.0/bin' - grant read, execute on directory osch_bin_pathto <username> - create a database directory where Hive fileswill be published to, e.g.: create or replace directory ext_table_dir as 'path/ext_table_dir' - grant read, write on ext_table_dir to <username> Part 3: Create the external table Creating an External table is simple – as long as youspecify the correct Hive table and the destination table, Oracle will useHive’s metadata to describe the new table. To create a table, simply use theLinux CLI or put the configuration under a separate file and point to it. A sample CREATETABLE statement: $ hadoop jar OSCH_HOME/jlib/orahdfs.jar \ oracle.hadoop.exttab.ExternalTable \ -Doracle.hadoop.exttab.tableName=<username>.oracle_table \ <- pick aname for the external table -D oracle.hadoop.exttab.sourceType=hive \ <- specifiesexternal table type -D oracle.hadoop.exttab.hive.tableName=my_hive_table \ <-point to your Hive table -D oracle.hadoop.exttab.hive.databaseName=default \ <-point to you Hive database -D oracle.hadoop.exttab.defaultDirectory= ext_table_dir \<- point to your external directory -D oracle.hadoop.connection.url=jdbc:oracle:thin:@//myhost:1521/myservicename\ <- specify the connection to Oracle database -D oracle.hadoop.connection.user=<username> \ <-specify the database user (unless using Oracle wallet) -createTable This will create the table. Worth noting - The Oracle external table is not a"live" Hive table. After changes are made to a Hive table, you mustuse the ExternalTable tool to drop the existing external table and create a newone.

Guest post by Javier De La Torre Medina and Iwona Rajca In this post we describe how to setup and use Oracle SQL Connector for Hadoop (OSCH) for data query between Cloudera’s DistributionIncluding...

Loading Apache Web Logs with Oracle Loader For Hadoop

We will discuss howto load Apache access logs in the Combined Log Format using OracleLoader for Hadoop (OLH). Let's start with a brief introduction to Apache web logs as described in http://httpd.apache.org/docs/2.4/logs.html. Apache Web Logs Apache HTTP Servers provide a variety of differentmechanisms for logging everything that happens on the HTTP server.  They generateserver access logs and server error logs. The server access log contains records of all requestsprocessed by the server. The server error log contains error and diagnosticinformation for errors encountered while processing requests. Atypical configuration for the Apache Combined Log Format might look like this LogFormat "%h %l %u %t \"%r\" %>s %b \"%{Referer}i\" \"%{User-agent}i\"" Example: 157.166.226.21157.166.226.21 - - [25/Jan/2014:18:41:40 -0400] "GET / HTTP/1.1" 200 - "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/534.30 (KHTML, like Gecko) Chrome/12.0.742.124 Safari/534.30"Each part of the log format string is described below  %h is the IP address of the client (remote host) which made the request to the server %l is the identity of the client, almost never used. The "hyphen" in the output indicates that the requested piece of information is not available.  %u is the userid as determined by HTTP authentication.  %t is the time that the request was received. The format is: [day/month/year:hour:minute:second zone]. [25/Jan/2014:18:41:40 -0400] \"%r\" is the request line from the client, given in double quotes "GET / HTTP/1.1" %>s is the status code that the server sends back to the client (e.g. 200, 404). %b is the size of the object returned to the client. "%{Referer}i\" is the site that the client reports having been referred from.   \"%{User-agent}i\" is the identifying information that the client browser reports about itself. "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/534.30 (KHTML, like Gecko) Chrome/12.0.742.124 Safari/534.30" Input Format: We will use the regularexpression input format class oracle.hadoop.loader.lib.input.RegexInputFormatto read Apache log files. This input format uses a regularexpression to match a line of text.  In this example we assume the lines in the log file are separated by newline characters. The regular expression isspecified by the oracle.hadoop.loader.input.regexPatternproperty.  The job configuration properties are shown below <!-- input format --><property>  <name>mapreduce.inputformat.class</name>  <value>oracle.hadoop.loader.lib.input.RegexInputFormat</value> </property> <!-- regular expression --><property>   <name>oracle.hadoop.loader.input.regexPattern</name>   <value>([^ ]*) ([^ ]*) ([^ ]*) (-|\[[^\]]*\]) ((?:[^ \"]*)|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\"[^\"]*\") ([^ \"]*|\"[^\"]*\"))?</value></property> See the ComplexText Input Formats section for more information about this input format. Output Format: We will use the outputformat class oracle.hadoop.loader.lib.output.OCIOutputFormatto load the Apache web logs into a partitioned database table. Note that OCIOutputFormat requires the databasetable to be partitioned. Sample Log Files    You can download sample data  from //cdn.app.compendium.com/uploads/user/e7c690e8-6ff9-102a-ac6d-e4aebca50425/f4a5b21d-66fa-4885-92bf-c4e81c06d916/File/1c81c152db7abac017f7a2d6cdee5a5b/apache_access_log.1 to/tmp/apache_access_log.1. After copying you can copy the log file to HDFS:  $ hadoop fs -mkdir apache_log_dir $ hadoop fs -put /tmp/apache_access_log.1 apache_log_dir Steps: Create the database table APACHE_COMBINED_LOG. Next create the job configuration file apachelog_jobconf.xml. Then run an OLH job to load the apache logs from HDFS to into the database table. Lastly query the the database table APACHE_COMBINED_LOG to see the data. Also see the job reports generated by OLH. Details Please note: All the codelisted in this document is for reference purposes and will not work “AS-IS” inyour environment. You must customize the environment variables such asHADOOP_HOME, OLH_HOME and other jobconfiguration properties to get this example to work in your environment. Sample data and configuration files referenced in this blog can be used as a starting point to run this example. Step 1: Create the database table APACHE_COMBINED_LOG >sqlplus SCOTT SQL*Plus: Release 12.1.0.1.0 Production on Tue May 13 16:22:53 2014 Copyright (c) 1982, 2013, Oracle. All rights reserved. Enter password: enter password forSCOTT SQL>create table APACHE_COMBINED_LOG ( CLIENT_IP_ADDRESS VARCHAR2(15), CLIENT_ID VARCHAR2(40), REMOTE_USER_ID VARCHAR2(40), REQUEST_DATE DATE, REQUEST_LINE VARCHAR(3000), RETURN_STATUS_CODE NUMBER, RETURN_SIZE VARCHAR2(20), REFERER VARCHAR2(3000), USER_AGENT VARCHAR2(3000) )partition by hash(REQUEST_DATE) partitions 2; Table created. SQL> Step 2: Create the job configuration file Download the job configuration file //cdn.app.compendium.com/uploads/user/e7c690e8-6ff9-102a-ac6d-e4aebca50425/f4a5b21d-66fa-4885-92bf-c4e81c06d916/File/8816bd297c381d35b700f39772e08cca/apache_log_jobconf.xml into your current directory.  Edit the configuration file to set values for HOST, PORT, SERVICE_NAME, database user, database password, job input and output directories. Review all the properties listed in this file. Step 3: Run OLH job NOTE 1: Change values for HADOOP_HOME and OLH_HOME #!/bin/bash export HADOOP_HOME=/usr/lib/hadoopexport PATH=${HADOOP_HOME}/bin:${PATH} export OLH_HOME=/myhome/olh/oraloader-3.0.0-h2export HADOOP_CLASSPATH=${OLH_HOME}/jlib/* #remove job output directoryhadoop fs -rmr myoutputdir #Run OLH hadooporacle.hadoop.loader.OraLoader -conf ./apache_log_jobconf.xml Sample job output: Note: You may see a lot deprecated property warnings if you are using the newer release of Hadoop such as CDH5.0.0. This is expected, and is because the examples use some older properties for compatibility across different Hadoop versions.   14/05/1510:18:51 INFO loader.OraLoader: Oracle Loader for Hadoop Release 3.0.0 -Production Copyright(c) 2011, 2014, Oracle and/or its affiliates. All rights reserved. 14/05/1510:18:51 INFO loader.OraLoader: Built-Against: hadoop-2.3.0-cdh5.0.0hive-0.12.0-cdh5.0.0 avro-1.7.3 jackson-1.8.8  ... 14/05/1510:18:54 INFO output.OCIOutputFormat: Adding OCI libraries to distributed cache ... 14/05/1510:19:07 INFO loader.OraLoader: map 0% reduce 0% 14/05/1510:19:41 INFO loader.OraLoader: map 100% reduce 0% 14/05/1510:19:55 INFO loader.OraLoader: map 100% reduce 100% 14/05/1510:19:56 INFO loader.OraLoader: Job complete: olh_regex_apache_combined_log(job_1399674267702_0150) ... File Input Format Counters Bytes Read=1216 File Output Format Counters Bytes Written=2223 ... Step 4: Query the APACHE_COMBINED_LOG table  Verify that the log data has been loaded into the table. sql> connect scott SQL>select count(*) from apache_combined_log; COUNT(*) ---------- 8 SQL> select client_ip_address, request_date, return_status_code fromapache_combined_log; CLIENT_IP_ADDRE REQUEST_D RETURN_STATUS_CODE --------------- --------- ------------------ 157.166.226.25 26-JUL-11 401 23.7.70.140 26-JUL-11 200 157.166.226.25 25-JUL-11 404 157.166.226.25 25-JUL-11 200 157.166.226.25 26-JUL-11 401 23.7.70.140 26-JUL-11 401 157.166.226.25 25-JUL-11 304 157.166.226.25 25-JUL-11 404 8 rows selected. Step 5: Job Report and debugging tips Oracle Loader for Hadoop consolidates reporting informationfrom individual tasks into a file named ${mapred.output.dir}/_olh/oraloader-report.txt. In this example ${mapred.output.dir}points to myoutputdir. Among other statistics, the report shows the number of errors, broken out bytype and task, for each mapper and reducer. In this example we set oracle.hadoop.loader.logBadRecords configuration property to true,which directs OLH to log bad records into one or more ".bad" files inthe myoutputdir/_olh/ directory.You can inspect “.bad” files to find out which rows were rejected and why theywere rejected. A listing of the job output directory forthis example looks like this. $ hadoop fs -ls myoutputdir Found 2 items -rw-r--r-- 3 yala hadoop 0 2014-05-15 100 2014-05-15 10:19myoutputdir/_SUCCESS drwxr-xr-x - yala hadoop 0 2014-05-15 100 2014-05-15 10:19 myoutputdir/_olh $ hadoop fs -ls myoutputdir/_olhFound 5 items -rw-r--r-- 3 yala hadoop 1209 2014-05-15 10:19myoutputdir/_olh/oraloader-00000-oci.log -rw-r--r-- 3 yala hadoop 1014 2014-05-15 10:19myoutputdir/_olh/oraloader-00000-oci.xml -rw-r--r-- 3 yala hadoop 377 2014-05-15 10:19 myoutputdir/_olh/oraloader-00000-r.xml -rw-r--r-- 3 yala hadoop 6285 2014-05-15 10:19myoutputdir/_olh/oraloader-report.txt -rw-r--r-- 1 yala hadoop 19607 2014-05-15 10:18myoutputdir/_olh/tableMetadata.xml Software used inthis example: Oracle Loader ForHadoop 3.0.0: oraloader-3.0.0-h2 (from http://www.oracle.com/technetwork/database/database-technologies/bdc/big-data-connectors/downloads/big-data-downloads-1451048.html) Hadoop: hadoop2.3.0-cdh5.0.0 from Cloudera Oracle Database: Oracle Release 12.1.0.1.0 Production withpartitioning option Sample files: apache_log_jobconf.xml:  //cdn.app.compendium.com/uploads/user/e7c690e8-6ff9-102a-ac6d-e4aebca50425/f4a5b21d-66fa-4885-92bf-c4e81c06d916/File/8816bd297c381d35b700f39772e08cca/apache_log_jobconf.xml sample data:  //cdn.app.compendium.com/uploads/user/e7c690e8-6ff9-102a-ac6d-e4aebca50425/f4a5b21d-66fa-4885-92bf-c4e81c06d916/File/1c81c152db7abac017f7a2d6cdee5a5b/apache_access_log.1 Call Send SMS Add to Skype You'll need Skype CreditFree via Skype

We will discuss how to load Apache access logs in the Combined Log Format using Oracle Loader for Hadoop (OLH). Let's start with a brief introduction to Apache web logs as described in http://httpd.apa...

How to Load Oracle Tables From Hadoop Tutorial (Part 6 - The Data Pump Solution)

Using OLH and OSCH Together So far we've discussed OLH and OSCH as alternative strategies for loading Oracle tables with data living in Hadoop. OLH uses the map-reduce framework to do the heavy lifting of converting text data into Oracle native data types, and splitting loads so they can get dedicated to Oracle table partitions. It then uses OCI Direct Path to push data from Hadoop into Oracle. OSCH uses Oracle parallel query processing to start pulling data from Hadoop into Oracle using Oracle external tables where the native data type conversion and partitioning of data occurs all on the Oracle side. The key difference between OLH and OSCH is efficiency versus speed. OLH is a map reduce batch job that dramatically reduces the CPU resources needed by Oracle to complete a load, but the load only starts at the end of a map reduce job.   OSCH starts loading immediately, but uses more Oracle resources. In this lesson we will talk about using them together for a hybrid solution using Data Pump files. Speaking for myself, if I was administering the loading of data from Hadoop clusters into Oracle databases on a regular basis I would go with this solution. The Data Pump Model About ten years ago Oracle revamped its model for importing and exporting data, taking advantage of parallel technology and calling it Oracle Data Pump. It has since become a well established utility for moving data from one Oracle database to another. It offers two standalone utilities “expdp” and “impdp” to export and import Oracle tables and other schema objects to and from files, and is integrated into Oracle's external table infrastructure. The format of files exported and imported are “Data Pump” formats, which means they are already converted into native Oracle data types and have metadata embedded that describes the format of an Oracle table. Pulling data into Oracle via external tables reading Data Pump files is more efficient than reading text files because the type conversion of table columns is, by definition, already done. The Data Pump model for loading data from Hadoop into Oracle is to use OLH to run a map reduce job and produce Data Pump files in HDFS, and then use OSCH to later access these files.  In other words OLH stages data in HDFS for OSCH to pick up and load at a later time. In the first lesson of this tutorial I argued that staging data adds an additional write and read of content and is costly. That's true and it would be crippling if you were to stage data to a standard file system, but when you stage it using HDFS (which is massively distributed and parallel) such costs are minimized. There are several reasons why this is an attractive solution. Administrative Flexibility When OLH runs either using JDBC or OCI Direct Path, the load will be using both the Oracle database and a Hadoop cluster at roughly the same time. Very often these systems in production environments have their own schedules, and finding a time slot to dedicate two systems to do loading can be difficult. With the Data Pump model, the OLH job can run when it suits Hadoop administrators. The Oracle DBA simply needs to know where the Data Pump load files are living in HDFS, and can load them at a later time that suits his own schedule. Ease of Use When an external table references text files you need to concern yourself with converting the text into native Oracle types, either using external table access parameter verbiage in an external table definition, or by using CAST operators while loading the data into an Oracle table.  It's not rocket science, but it does require some work to get it right. Data Pump external tables bypasses these issues so the external table DDL is streamlined and simple. To create a Data Pump external table you just point OSCH’s External Table tool to a set of Data Pump files generated by OLH and living in HDFS. The tool will extract table schema information from the files to create an external table. It will also sanity check that all Data Pump files referenced have the same table schema. Load Integrity We haven't talked much about what happens with OLH or OSCH when it runs into bad data formats that can't be converted properly to Oracle native data types. OSCH, when used against HDFS text files, relies on standard Oracle external table functionality to state how the external table should react if it gets bad data.  Options can include simply skipping the bad row, recording the bad row and associated bad data in a dedicated log, or aborting the load immediately or after some threshold of bad records is exceeded.  The point is that with the Data Pump model you will catch your bad record errors when running OLH before you load the database. Any Data Pump files that OLH produces will be bullet proof, so you can be confident that when you load it with OSCH, the load will not fail because of dirty data.   You can think of OLH generating Data Pump files as the final transform of an ETL process. Efficiency and Performance When OSCH is loading Data Pump files it uses substantially less database resources then when it is loading HDFS text files. That's because the work and cost of converting text to native Oracle data types was offloaded to OLH. Typically for big loads, an OSCH load of HDFS Data Pump files runs anywhere from ten to thirty percent faster than the same payload of HDFS text files. OSCH against text files is fast. OSCH against Data Pump files is faster and uses less database resources when you are dealing with partitioned tables. An Example Generating Data Pump Files Generating Data Pump files in HDFS is accomplished by running a vanilla OLH job. The structure of an OLH command is very similar to the structure we described for OCI Direct Path. You specify the number of reduce tasks (assuming the table is partitioned), the input directory in HDFS, the job name, and the HDFS output directory and so on. The only difference between this and OCI Direct Path is the output format buried in the dpOutput.xml file below: $HADOOP_HOME/bin/hadoop jar $OLH_HOME/jlib/oraloader.jar oracle.hadoop.loader.OraLoader     -D oracle.hadoop.loader.jobName=OLHP_fivdti_dtext_dp_749     -D oracle.hadoop.loader.loaderMapFile=file:/tmp/loaderMap_fivdti.xml     -D mapred.reduce.tasks=10     -D mapred.input.dir=/user/olh_performance/fivdti/56000000_90     -D mapred.output.dir=/user/oracle/olh_test/results/fivdti/749     -conf /tmp/oracle_connection.xml     -conf /tmp/dtextInput.xml      -conf /tmp/dpOutput.xml The key difference between this and the OLH Direct Path command discussed in Lesson 3, is the new file we are using, “dpOutput.xml” which specifies that the output format is Data Pump. <configuration>    <property>        <name>mapreduce.outputformat.class</name>        <value>oracle.hadoop.loader.lib.output.DataPumpOutputFormat</value>    </property> </configuration> Rather than loading an Oracle database, the output will be stored as Data Pump files in the “mapred.output.dir” specified in the command (e.g. /user/oracle/olh_test/results/749). They will have this type of pattern in the filename: “oraloader—00059-dp-85.dat”. One obvious question is since this OLH command is not loading an Oracle table why do you need the “oracle_connection.xml” file which allows OLH to connect to the database? The reason is that OLH will still need to make a connection to get metadata about the target table (e.g. "helloworld") that ultimately will be loaded via OSCH, so it needs to extract this information from Oracle. This is not resource intensive. OLH simply connects to Oracle, gets the metadata and a quick cup of coffee and then does the rest of the work off-line. While connectivity is not required for this type of OLH job (there is a workaround where you specify the precise metadata information OLH needs), it is easier and more convenient. Note: since this series of tutorials was started, the loader map mechanism has been deprecated (but still supported) in favor of using new OLH properties. Generating an External Table that Maps to Data Pump Files This is where the ease of use kicks in. You use OSCH’s External Table tool to create the external table for you. Since metadata is embedded in the Data Pump files you want access, you don’t have to fiddle with table column definitions as you do if you were loading HDFS text files. The tool will look at your Data Pump files in HDFS, extract schema information, sanity check that all the Data Pump files share the share the same single definition of the schema, and create an external table for you, mapping it to access the Data Pump files.                       "Metadata?  We don't need no stinkin metadata!" The structure of the command is similar to the publish command in OSCH. You need to provide the standard connection information, the name of the Oracle user, and provide the name of the external table. The key differences from publish are that you specify the source type as “datapump”, specify the Oracle directory name you created where location files live, point to the same results directory that was specified in the OLH job you ran above, and ask the tool to create the external table. /home/oracle/osch/hadoop-2.0.0/bin/hadoop jar /home/oracle/osch/orahdfs-2.3.0/jlib/orahdfs.jar oracle.hadoop.exttab.ExternalTable     -D oracle.hadoop.connection.url=jdbc:oracle:thin:@localhost/dbm     -D oracle.hadoop.connection.user=oschuser     -D oracle.hadoop.exttab.defaultDirectory=EXTTAB_DIR     -D oracle.hadoop.exttab.tableName=helloworld_exttab     -D oracle.hadoop.exttab.sourceType=datapump     -D oracle.hadoop.exttab.dataPaths=/user/oracle/olh_test/results/749      -createTable Loading the Data At this point, your Data Pump files are bound to your external table, which can be used for access or for loading the Oracle target table. INSERT INTO helloworld SELECT * FROM helloworld_exttab; Optimizations When loading text files, we enumerated a lot of rules for figuring out the optimal number of location files and data files that we wanted, driven off of the DOP we expected to use when running OSCH. These rules don’t apply for loading Data Pump files, simply because the number of Data Pump files created by OLH is not strictly controllable. For this model there are three rules you want to follow: Rule 1: For optimal performance use this model against partitioned tables. The model will work for non-partitioned tables but not as efficiently and as fast. In the non-partitioned case, the OLH job becomes a map only job and creates a Data Pump file for every map task (i.e. input split). Since input splits are relatively small sizes (e.g. 250MB), this tends to create a lot of Data Pump files and correspondingly a lot of location files when you publish using OSCH. With non-partitioned table you still get ease-of-use and load integrity benefits mentioned above, but not the load efficiency.  In this case OSCH against vanilla text files will probably be a better choice. Rule 2: If the table is partitioned, use the sampler when generating Datapump files with the OLH job. This will help balance out the size of the generated Data Pump files. Rule 3: Adjust your session to the desired DOP (or use the “parallel” hint),  and use the standard “append” and “pq_distribute” hints. ALTER SESSION FORCE PARALLEL DML PARALLEL  8; ALTER SESSION FORCE PARALLEL QUERY PARALLEL 8; INSERT /*+ append pq_distribute(helloworld, none) */ INTO helloworld SELECT * FROM helloworld_exttab;                

Using OLH and OSCH Together So far we've discussed OLH and OSCH as alternative strategies for loading Oracle tables with data living in Hadoop. OLH uses the map-reduce framework to do the heavy lifting...

How to Load Oracle Tables From Hadoop Tutorial (Part 5 - Leveraging Parallelism in OSCH)

Using OSCH: Beyond Hello World In the previous post we discussed a “Hello World” example for OSCH focusing on the mechanics of getting a toy end-to-end example working. In this post we are going to talk about how to make it work for big data loads. We will explain how to optimize an OSCH external table for load, paying particular attention to Oracle’s DOP (degree of parallelism), the number of external table location files we use, and the number of HDFS files that make up the payload. We will provide some rules that serve as best practices when using OSCH. The assumption is that you have read the previous post and have some end to end OSCH external tables working and now you want to ramp up the size of the loads. Using OSCH External Tables for Access and Loading OSCH external tables are no different from any other Oracle external tables.  They can be used to access HDFS content using Oracle SQL: SELECT * FROM my_hdfs_external_table; or use the same SQL access to load a table in Oracle. INSERT INTO my_oracle_table SELECT * FROM my_hdfs_external_table; To speed up the load time, you will want to control the degree of parallelism (i.e. DOP) and add two SQL hints. ALTER SESSION FORCE PARALLEL DML PARALLEL  8; ALTER SESSION FORCE PARALLEL QUERY PARALLEL 8; INSERT /*+ append pq_distribute(my_oracle_table, none) */ INTO my_oracle_table SELECT * FROM my_hdfs_external_table; There are various ways of either hinting at what level of DOP you want to use.  The ALTER SESSION statements above force the issue assuming you (the user of the session) are allowed to assert the DOP (more on that in the next section).  Alternatively you could embed additional parallel hints directly into the INSERT and SELECT clause respectively. /*+ parallel(my_oracle_table,8) */ /*+ parallel(my_hdfs_external_table,8) */ Note that the "append" hint lets you load a target table by reserving space above a given "high watermark" in storage and uses Direct Path load.  In other words, it doesn't try to fill blocks that are already allocated and partially filled. It uses unallocated blocks.  It is an optimized way of loading a table without incurring the typical resource overhead associated with run-of-the-mill inserts.  The "pq_distribute" hint in this context unifies the INSERT and SELECT operators to make data flow during a load more efficient. Finally your target Oracle table should be defined with "NOLOGGING" and "PARALLEL" attributes.   The combination of the "NOLOGGING" and use of the "append" hint disables REDO logging, and its overhead.  The "PARALLEL" clause tells Oracle to try to use parallel execution when operating on the target table. Determine Your DOP   It might feel natural to build your datasets in Hadoop, then afterwards figure out how to tune the OSCH external table definition, but you should start backwards. You should focus on Oracle database, specifically the DOP you want to use when loading (or accessing) HDFS content using external tables. The DOP in Oracle controls how many PQ slaves are launched in parallel when executing an external table. Typically the DOP is something you want to Oracle to control transparently, but for loading content from Hadoop with OSCH, it's something that you will want to control. Oracle computes the maximum DOP that can be used by an Oracle user. The maximum value that can be assigned is an integer value typically equal to the number of CPUs on your Oracle instances, times the number of cores per CPU, times the number of Oracle instances. For example, suppose you have a RAC environment with 2 Oracle instances. And suppose that each system has 2 CPUs with 32 cores. The maximum DOP would be 128 (i.e. 2*2*32). In point of fact if you are running on a production system, the maximum DOP you are allowed to use will be restricted by the Oracle DBA. This is because using a system maximum DOP can subsume all system resources on Oracle and starve anything else that is executing. Obviously on a production system where resources need to be shared 24x7, this can’t be allowed to happen. The use cases for being able to run OSCH with a maximum DOP are when you have exclusive access to all the resources on an Oracle system. This can be in situations when you are first seeding tables in a new Oracle database, or there is a time where normal activity in the production database can be safely taken off-line for a few hours to free up resources for a big incremental load. Using OSCH on high end machines (specifically Oracle Exadata and Oracle BDA cabled with Infiniband), this mode of operation can load up to 15TB per hour. The bottom line is that you should first figure out what DOP you will be allowed to run with by talking to the DBAs who manage the production system. You then use that number to derive the number of location files, and (optionally) the number of HDFS data files that you want to generate, assuming that is flexible. Rule 1: Find out the maximum DOP you will be allowed to use with OSCH on the target Oracle system Determining the Number of Location Files Let’s assume that the DBA told you that your maximum DOP was 8. You want the number of location files in your external table to be big enough to utilize all 8 PQ slaves, and you want them to represent equally balanced workloads. Remember location files in OSCH are metadata lists of HDFS files and are created using OSCH’s External Table tool. They also represent the workload size given to an individual Oracle PQ slave (i.e. a PQ slave is given one location file to process at a time, and only it will process the contents of the location file.) Rule 2: The size of the workload of a single location file (and the PQ slave that processes it) is the sum of the content size of the HDFS files it lists For example, if a location file lists 5 HDFS files which are each 100GB in size, the workload size for that location file is 500GB. The number of location files that you generate is something you control by providing a number as input to OSCH’s External Table tool. Rule 3: The number of location files chosen should be a small multiple of the DOP Each location file represents one workload for one PQ slave. So the goal is to keep all slaves busy and try to give them equivalent workloads. Obviously if you run with a DOP of 8 but have 5 location files, only five PQ slaves will have something to do and the other three will have nothing to do and will quietly exit. If you run with 9 location files, then the PQ slaves will pick up the first 8 location files, and assuming they have equal work loads, will finish up about the same time. But the first PQ slave to finish its job will then be rescheduled to process the ninth location file, potentially doubling the end to end processing time. So for this DOP using 8, 16, or 32 location files would be a good idea. Determining the Number of HDFS Files Let’s start with the next rule and then explain it: Rule 4: The number of HDFS files should try to be a multiple of the number of location files and try to be relatively the same size   In our running example, the DOP is 8. This means that the number of location files should be a small multiple of 8. Remember that each location file represents a list of unique HDFS files to load, and that the sum of the files listed in each location file is a workload for one Oracle PQ slave. The OSCH External Table tool will look in an HDFS directory for a set of HDFS files to load.  It will generate N number of location files (where N is the value you gave to the tool). It will then try to divvy up the HDFS files and do its best to make sure the workload across location files is as balanced as possible. (The tool uses a greedy algorithm that grabs the biggest HDFS file and delegates it to a particular location file. It then looks for the next biggest file and puts in some other location file, and so on). The tools ability to balance is reduced if HDFS file sizes are grossly out of balance or are too few. For example suppose my DOP is 8 and the number of location files is 8. Suppose I have only 8 HDFS files, where one file is 900GB and the others are 100GB. When the tool tries to balance the load it will be forced to put the singleton 900GB into one location file, and put each of the 100GB files in the 7 remaining location files. The load balance skew is 9 to 1. One PQ slave will be working overtime, while the slacker PQ slaves are off enjoying happy hour. If however the total payload (1600 GB) were broken up into smaller HDFS files, the OSCH External Table tool would have an easier time generating a list where each workload for each location file is relatively the same.  Applying Rule 4 above to our DOP of 8, we could divide the workload into160 files that were approximately 10 GB in size.  For this scenario the OSCH External Table tool would populate each location file with 20 HDFS file references, and all location files would have similar workloads (approximately 200GB per location file.) As a rule, when the OSCH External Table tool has to deal with more and smaller files it will be able to create more balanced loads. How small should HDFS files get? Not so small that the HDFS open and close file overhead starts having a substantial impact. For our performance test system (Exadata/BDA with Infiniband), I compared three OSCH loads of 1 TiB. One load had 128 HDFS files living in 64 location files where each HDFS file was about 8GB. I then did the same load with 12800 files where each HDFS file was about 80MB size. The end to end load time was virtually the same. However when I got ridiculously small (i.e. 128000 files at about 8MB per file), it started to make an impact and slow down the load time. What happens if you break rules 3 or 4 above? Nothing draconian, everything will still function. You just won’t be taking full advantage of the generous DOP that was allocated to you by your friendly DBA. The key point of the rules articulated above is this: if you know that HDFS content is ultimately going to be loaded into Oracle using OSCH, it makes sense to chop them up into the right number of files roughly the same size, derived from the DOP that you expect to use for loading. Next Steps So far we have talked about OLH and OSCH as alternative models for loading. That’s not quite the whole story. They can be used together in a way that provides for more efficient OSCH loads and allows one to be more flexible about scheduling on a Hadoop cluster and an Oracle Database to perform load operations. The next lesson will talk about Oracle Data Pump files generated by OLH, and loaded using OSCH. It will also outline the pros and cons of using various load methods.    

Using OSCH: Beyond Hello World In the previous post we discussed a “Hello World” example for OSCH focusing on the mechanics of getting a toy end-to-end example working. In this post we are going to...

How to Load Oracle Tables From Hadoop Tutorial (Part 4 - OSCH Hello World)

Oracle OSCH: A “World Hello” Example In this post we will walk through Alice in Wonderland's looking glass and do a “Hello World” example for Oracle SQL Connector for HDFS (i.e. OSCH). The above title, “World Hello” is a play on words meant to drive home the relationship between the two loading models: OLH and OSCH. They both can be used to load an Oracle table but while OLH is run on Hadoop and uses Hadoop’s Map Reduce engine to write data into Oracle tables, OSCH uses the SQL engine running on Oracle to read data living in HDFS files. OLH pushes data to Oracle on demand from Hadoop. OSCH pulls data from HDFS on demand from Oracle. Below we will first review the OSCH execution’s model. We will then discuss configuration. OSCH has a few more moving parts to worry about than OLH which invariably will create hiccups, but if you follow my instructions, in the following order, these should be minimized. Perform one-time configuration steps Create an Oracle external table that works against local test data Load the same local test data into an HDFS directory Recreate the external table to reference a Hadoop cluster Use OSCH External Table publishing tool to point the external table to the test data location in HDFS The OSCH Execution Model OSCH was explained in the first lesson of the tutorial, but since we are revisiting it in depth, let’s review how it works. OSCH is simply the plumping that lets Oracle external tables access HDFS content. Oracle external tables are a well established mechanism for reading content that is not populated or managed by Oracle. For conventional Oracle external tables, the content lives as files visible to the OS where the Oracle system is running.  These would be either local files, or shared network files (e.g. NFS). When you create an Oracle External table you point it to a set of files that constitute data that can be rendered as SQL tables. Oracle External table definitions call these “location” files. Before OSCH was invented, external tables introduced an option called a preprocessor directive. Originally it was an option that allowed a user to preprocess a single location file before the content was streamed into Oracle. For instance, if your contents were zip files, the preprocessor option could specify that “unzip –p” is to be called with each location file, which would unzip the files before passing the unzipped content to Oracle. The output of an executable specified in the preprocessor directive is always stdout (hence the “-p” option for the unzip call). A preprocessor executable is a black box to Oracle. All Oracle knows is that when it launches it and feeds it a location file path as an argument, the executable will feed it a stream of bits that represents data of an external table. OSCH repurposed the preprocessor directive to provide access to HDFS. Instead of calling something like “unzip” it calls an OSCH tool that streams HDFS file content from Hadoop. The files it reads from HDFS are specified as OSCH metadata living in the external table “location” files locally. (These metadata files are created using OSCH’s publishing tool.) In other words, for OSCH, location files do not contain HDFS content, but contains references to HDFS files living in a Hadoop cluster. The OSCH supplied preprocessor expects to find OSCH metadata in this file. All this is encapsulated with the Oracle external table definition. The preprocessor logic gets invoked every time one issues a SELECT statement in SQL against the external table. At run time, the OSCH preprocessor is invoked, which opens a “location” file with metadata. It parses it the metadata and then generates a list of files in HDFS it will open, one at a time, and read, piping the content into Oracle. (The metadata also includes optional CODEC directives, so if the HDFS content needs to be decompressed before being fed to Oracle, the OSCH preprocessor can handle it). BTW, if you just got nervous about the performance implications of the “one at a time” phrase above, don’t be. This model is massively scalable. One Time Configuration Steps   Understand the Requirements for Installing and Configuring OSCH The things you will need for installing and configuring OSCH include: Access to the system where Oracle is running and to the OS account where Oracle is running (typically the Unix account "oracle”) Access to SQL*Plus and permission to connect as DBA Ability to create an Oracle user (e.g. "oschuser") with enough permission to create an external table and directory objects The OSCH kit  The Hadoop client kit for the Hadoop cluster you want to access The Hadoop client configuration for HDFS access Permission to read, write, and delete files in HDFS as OS user "oracle"  (i.e. "oracle" is an Hadoop user) The formal documentation to install OSCH is here. Below I try to outline a process that has worked best for me. Install the Bits Logon to the system where Oracle is running as “oracle”. Carve out an independent directory structure (e.g. /home/oracle/osch) outside of the directory structure of ORACLE_HOME. Install the OSCH kit (called “orahdfs-2.2.0”) and the Hadoop client kit (“hadoop-2.0.0”). I typically make these peers. Both kits need to be unzipped. Hadoop client kits typically require some building to create a few native libraries typically related to CODECs. You will also unzip the Hadoop configurations files (“hadoop-conf”). Finally you want to create a default directory for location files that will be referenced by external tables. This is the “exttab” directory below. This directory needs read and write privileges set for “oracle”. At this point you should have a directory structure that looks something like this: /home/oracle/osch/orahdfs-2.2.0 /home/oracle/osch/hadoop-2.0.0 /home/oracle/osch/hadoop-conf /home/oracle/osch/exttab Configure HDFS Follow the standard Hadoop client instructions that allow you access the Hadoop cluster via HDFS from the Oracle system logged in as “oracle”. Typically this is to call Hadoop pointing to the hadoop-conf files you copied over. With Hadoop you will want to be able to create, read, and write files under HDFS /user/oracle directory. For the moment carve out an area where we will put test data to read from HDFS using OSCH. hadoop --config /home/oracle/osch/hadoop-conf fs –mkdir /user/oracle/osch/exttab Configure OSCH In the OSCH kit you will need to configure the preprocessor that is used to access the Hadoop cluster and read HDFS files. It is in the OSCH kit under the bin directory, and is called hdfs_stream. This is a bash script which invokes an OSCH executable under the covers. You need to edit the script and provide a definition for OSCH_HOME. You will also need to modify and export modified PATH and JAVA_LIBRARY_PATH definitions to pick up Hadoop client binaries. e.g. OSCH_PATH = /home/oracle/orahdfs-2.2.0 export PATH=/home/oracle/hadoop-2.0.0/bin:/user/bin:/bin export JAVA_LIBRARY_PATH=/home/oracle/hadoop-2.0.0/lib/native Optionally hdfs_stream allows you to specify where external table log files go. By default it goes into the log directory living in the OSCH installation (e.g. /home/oracle/orahdfs-2.2.0/log). When you’ve complete this step, interactively invoke hdfs_stream with a single bogus argument “foo”, again on the Oracle system logged in as “oracle”. e.g. ./hdfs_stream foo OSCH: Error reading or parsing location file foo This might seem lame, but it is a good sanity check that ensures Oracle can execute the script while processing an external table. If you get a Java stack trace rather than the above error message, the paths you defined in hdfs_stream are probably broken and need to be fixed. Configure Oracle for OSCH In this step you need to first connect to Oracle as SYSDBA and create an Oracle DIRECTORY object that points to the file location where hdfs_stream exists. You create one of these to be shared by any Oracle users running OSCH to connect to a particular Hadoop cluster. SQLPLUS> CREATE DIRECTORY osch_bin_path as ‘/home/oracle/osch/oradhdfs-2.2.0/bin’; Assuming you’ve created a vanilla Oracle user (e.g. "oschuser") which will own the external table, you want to grant execute privileges on the osch_bin_path directory. SQLPLUS>GRANT EXECUTE ON DIRECTORY osch_bin_path TO oschuser; Now reconnect to Oracle as “oschuser” and create an additional directory to point to the directory where location files live. SQLPLUS> CREATE DIRECTORY exttab_default_directory AS ‘/home/oracle/osch/exttab’; At this point you have configured OSCH to run against a Hadoop cluster. Now you move on to create external tables to map to content living in HDFS. Create an Oracle External Table that works against Local Test Data You want to create an external table definition that mirrors the table you want to load (e.g. reflecting the same column names and data types.) Even the simplest local external table definitions take some time to get right, and 99% of the external table verbiage needed to get it working against HDFS is identical to getting it to work against local files, so it makes sense to get a vanilla local external table working before trying it against HDFS.  What you want to do is take a small representative set of sample data that you want to access in HDFS and localize it into as a single file local to the Oracle system and to the “oracle” user. Call it testdata.txt and put it in the /home/oracle/osch/exttab directory, which is our directory for location files.  I would recommend starting with a simple text CSV file. To make things easier we will use the OSCH External Table tool to create an external table definition that you can use as a template to tweak to conform to your data.  This tool can be run from any system that can connect to the Oracle database, but in this case we are going to stay put and run it locally where Oracle is running as the OS "oracle" user. The tool requires two environmental settings to run: specifically JAVA_HOME and CLASSPATH which needs to reference the tool's jar files: export JAVA_HOME=/usr/lib/jdk export CLASSPATH=/home/oracle/osch/orahdfs-2.2.0/jlib/* For our running example it would look like this: /home/oracle/osch/hadoop-2.0.0/bin/hadoop jar /home/oracle/osch/orahdfs-2.2.0/jlib/orahdfs.jar oracle..hadoop.exttab.ExternalTable   -D oracle.hadoop.connection.url=jdbc:oracle:thin:@localhost/dbm   -D oracle.hadoop.connection.user=oschuser   -D oracle.hadoop.exttab.tableName=helloworld_exttab   -D oracle.hadoop.exttab.dataPaths=/user/oracle/osch/exttab   -D oracle.hadoop.exttab.defaultDirectory=exttab_default_directory   -D oracle.hadoop.exttab.locationFileCount=1   -D oracle.hadoop.exttab.columnCount=7   -createTable --noexecute Let’s decompose this command. The following invokes the OSCH External Table tool by pointing to the OSCH jar file (“orahdfs.jar”): /home/oracle/osch/hadoop-2.0.0/bin/hadoop jar /home/oracle/osch/orahdfs-2.2.0/jlib/orahdfs.jar oracle.hadoop.exttab.ExternalTable These two lines connect to the Oracle database service ("dbm") as Oracle user “oschuser”:   -D oracle.hadoop.connection.url=jdbc:oracle:thin:@localhost/dbm   -D oracle.hadoop.connection.user=oschuser This identifies the name of the external table we want to create:   -D oracle.hadoop.exttab.tableName=helloworld_exttab This tells the tool the directory in HDFS where data lives:   -D oracle.hadoop.exttab.dataPaths=/user/oracle/osch/exttab This indicates where the location files will live (using the name of the Oracle directory created above that maps to "/home/oracle/osch/exttab"):   -D oracle.hadoop.exttab.defaultDirectory=exttab_default_dir This indicates how many location files we generate. For now since we are only loading one HDFS file, we need only one location file to reference it, so we feed it a value of 1:   -D oracle.hadoop.exttab.locationFileCount=1  This indicates how many columns are in the table:   -D oracle.hadoop.exttab.columnCount=7 Finally we tell the tool to just pretend to create an external table.  This will generate an external table definition and output it to the console:   -createTable --noexecute The generated console output should look something like this: Oracle SQL Connector for HDFS Release 2.2.0 - Production Copyright (c) 2011, 2013, Oracle and/or its affiliates. All rights reserved. The create table command was not executed. The following table would be created. CREATE TABLE "OSCHUSER"."HELLOWORLD_EXTTAB" (  "C1"                             VARCHAR2(4000),  "C2"                             VARCHAR2(4000),  "C3"                             VARCHAR2(4000),  "C4"                             VARCHAR2(4000),  "C5"                             VARCHAR2(4000),  "C6"                             VARCHAR2(4000),  "C7"                             VARCHAR2(4000) ) ORGANIZATION EXTERNAL (    TYPE ORACLE_LOADER    DEFAULT DIRECTORY "EXTTAB_DEFAULT_DIRECTORY"    ACCESS PARAMETERS    (      RECORDS DELIMITED BY 0X'0A'      CHARACTERSET AL32UTF8      STRING SIZES ARE IN CHARACTERS      PREPROCESSOR "OSCH_BIN_PATH":'hdfs_stream'      FIELDS TERMINATED BY 0X'2C'      MISSING FIELD VALUES ARE NULL      (        "C1" CHAR(4000),        "C2" CHAR(4000),        "C3" CHAR(4000),        "C4" CHAR(4000),        "C5" CHAR(4000),        "C6" CHAR(4000),        "C7" CHAR(4000)      )    )    LOCATION    (      'osch-20130904094340-966-1'    ) ) PARALLEL REJECT LIMIT UNLIMITED; Cut and paste the console output to an editor (or cut and paste the text above) and temporarily remove the PREPROCESSOR directive and rename the location file (i.e. "osch=20130904094340-966-1") to "testdata.txt" (the name of your data file).  You then want to twiddle with the external table verbiage and change the dummy column names (e.g. C1), data types (e.g. VARCHAR2), and field definitions (e.g. CHAR) to reflect the table you want to load. (The details for creating Oracle external tables are explained here).  Note that the rest of the verbiage (e.g. RECORDS DELIMITED BY) is used to support standard CSV text files, so if the data in your test file is correctly formed as CSV input, then this stuff should be left as is. When you think your external table definition is correct, create the table in Oracle and  try accessing the data from SQL: SQLPLUS>SELECT * FROM helloworld_exttab; After you’ve got a SQL SELECT statement working, it's time to load the same data file it into HDFS and recreate the external table for remote access. Load an HDFS directory with Local Test Data File Using your hadoop client on your Oracle system upload the working test file you got working into HDFS into a the data directory you created earlier. hadoop fs –put /home/oracle/osch/exttab/testdata.txt /user/oracle/osch/exttab Recreate the External Table Using the PREPROCESSOR Directive Now drop the local external table, and recreate it using the identical syntax that worked above, but putting back the preprocessor directive: PREPROCESSOR "OSCH_BIN_PATH":hdfs_stream This will redirect processing to HDFS files living in your Hadoop cluster. Don’t try doing a SELECT statement yet. The last step is to recreate location files so they point to content living in HDFS.   Using the OSCH Publishing Tool to point to test data living in HDFS By adding the preprocessor directive, you now have an external table that is bound to data living in a Hadoop cluster. You now want to point the external table to data living somewhere in HDFS. For our case that is the data living in the HDFS directory we created and populated above: “/user/oracle/osch/exttab”. First delete the local data file, testdata.txt, living under /home/oracle/osch/exttab.  That way we know if the external table works, it's not fooling us simply accessing local data. Then rerun the External Table tool with the "publish" command: /home/oracle/osch/hadoop-2.0.0/bin/hadoop jar /home/oracle/osch/orahdfs-2.2.0/jlib/orahdfs.jar oracle.hadoop.exttab.ExternalTable   -D oracle.hadoop.connection.url=jdbc:oracle:thin:@localhost/dbm   -D oracle.hadoop.connection.user=oschuser   -D oracle.hadoop.exttab.tableName=helloworld_exttab   -D oracle.hadoop.exttab.dataPaths=/user/oracle/osch/exttab   -D oracle.hadoop.exttab.locationFileCount=1   -publish This time the tool executes the "publish" command, which connects to the Oracle database, prompts for "oschuser"'s password, reads the files living in the HDFS under “/user/oracle/osch/exttab” and creates one location file that references our singleton data file "testdata.txt" that we moved into HDFS.  If you look at your local directory, “ /home/oracle/osch/exttab”, you will see that it has been populated with a machine generated file (e.g. “osch-20130821102309-6509-1”) which contains XML verbiage referring to testdata.txt in HDFS. Test an Oracle External Table that works against HDFS Data Now you connect to Oracle as “oschuser" and issue the same SQL query you did when the data was local.  You should get identical results as you did earlier (the order of the rows might be different). SQLPLUS>SELECT * FROM helloworld_exttab; At this point you have SQL access to content living in HDFS.   To use it to load an Oracle table (e.g. "helloworld") you need to use either an INSERT statement: SQLPLUS> INSERT INTO helloworld SELECT * FROM helloworld_exttab; or a CREATE TABLE statement. SQLPLUS>CREATE TABLE helloworld as SELECT * from helloworld_exttab; What Did We Just Do? Aside from doing one time initialization steps, what we did was create an external table and tested it locally to see if it would work with a particular data file format, then we recreated the external table definition,adding the preprocessor directive to point to HDFS living in a Hadoop cluster.  We then used the OSCH External Table tool to point an external table to a directory in HDFS with data files having the same format. The bindings here are simple to understand: The preprocessor directive references hdfs_stream which binds external tables to a particular Hadoop cluster The External Table publishing tool binds a external table to a set of data files living in that cluster If you want to access multiple Hadoop clusters, you simply need to create a copy of “hdfs_stream” giving it a new name (e.g. "hdfs_stream_2”), configure it to work against the new cluster, and use the preprocessor directive to call “hdfs_stream_2” for external tables access content living in the new cluster. If you want two external tables to point to two different data sources of the same format, then create a new external table with the same attributes, and use OSCH External Table tool to point to another directory in HDFS. One question that frequently comes up has to do with using OSCH for SQL access.  Specifically, since external tables map HDFS data, are they useful for doing general purpose Oracle SQL queries against HDFS data, not just for loading an Oracle table? If the data set is very large and you intend to run multiple SQL queries, then you want load it into an Oracle table and run your queries against it. The reason has to do with the “black box” design of external tables. The storage is not controlled by Oracle, so there are no indices and no internal structures that Oracle would need to make access by SQL efficient. SELECT statements against any external table are a full table scan, something Oracle SQL optimizer tries to avoid because it is resource expensive. One last point, always use external table definitions to facilitate the conversion of text to Oracle native data types (e.g. NUMBER, INTEGER, TIMESTAMP, DATE).  Do not rely on CAST and other functions (e.g. to_date) in SQL.  The data type conversion code in external tables is much more efficient. Next Steps This post was to get a toy example working with a single data file. The next post will focus on how to tune OSCH to for large data sets living in HDFS and exploit Oracle Parallel query infrastructure for high performance loads.  We will also discuss the pros and cons of using OSCH versus OLH.  

Oracle OSCH: A “World Hello” Example In this post we will walk through Alice in Wonderland's looking glass and do a “Hello World” example for Oracle SQL Connector for HDFS (i.e. OSCH). The above title,...

Parallelism while Querying and Loading Data using Oracle SQL Connector for HDFS

For fast performance Oracle SQL Connector for HDFS (OSCH) canuse parallel threads in the database to read data files on HDFS.   This post discusses the settings in OSCHthat are relevant for using parallel threads, and some heuristics to set them. First, a quick background on‘location files.’ OSCH generates anexternal table that is used to query or load data from HDFS. This external table has a LOCATION clausewith a list of location files (see example of a generated external table at theend of this post). OSCH populates thelocation files with URIs of the data files on HDFS. When there are multiple location files andmultiple data files, OSCH uses load balancing to distribute the files (by size)among the location files. Data files arenot split up, but the URIs are distributed by data file size as evenly aspossible. See the documentationon location files in the Oracle® Big Data Connectors User's Guide Release 2 (2.2) formore details. Number of Location Files Degree of Parallelism (DOP) is the number of parallelthreads used by the database to read the data. When reading data from external tables, the DOP specified in thedatabase has to be supported by the relevant number of location files in theexternal table. If there is only one locationfile, there will be no parallelism while reading the data. The default value for the number of locationfiles is 4. Higher numbers to support a higher DOP canbe specified using the locationFileCountproperty. Keep in mind the resources available in thedatabase while determining the number of parallel threads to use. Specifying locationFileCount while Using OSCH As a -D parameter: -Doracle.hadoop.exttab.locationFileCount=256 In a configuration file: <property> <name>oracle.hadoop.exttab.locationFileCount</name> <value>256</value></property> A value of 256 will result in an external table with 256location files, which in turn means that 256 parallel threads can be used toaccess the data. Number of Data Files If the number of data files accessed is less than the number oflocation files, the number of parallel threads will be constrained by thenumber of populated location files. Ifpossible, split up the data files to support higher numbers of parallelthreads. If there are more data files thanlocation files OSCH will load balance to evenly distribute data files by sizeacross location files. A goodheuristic: the number of data files should be a multiple of the number oflocation files. Reading from a Hive table If OSCH is accessing data from a Hive table with a singleunderlying data file, a bucketed Hive table can be created to split the inputfiles. If there are more buckets thanlocation files OSCH will load balance. Agood heuristic: the number of bucketsshould be a multiple of the number of location files. Example on the Oracle Big Data Appliance Data file size: 1 TB. If the data is in a Hive table, a bucketed Hive table can becreated with 2048 buckets (each file will have 2 blocks on the BDA which has ablock size of 256MB). A locationFileCount value of 256 willresult in each thread reading about 8 files. Note on enabling parallel query in the database Parallel query has to be enabled on the database for datafiles to be accessed in parallel from HDFS. From the Oracle® Big Data Connectors User's Guide Release 2 (2.2): Enable parallel query with this SQLcommand: ALTER SESSION ENABLE PARALLEL QUERY; Before loading the data into an Oracledatabase from the external files created by Oracle SQL Connector for HDFS,enable parallel DDL: ALTERSESSION ENABLE PARALLEL DDL; Before inserting data into an existingdatabase table, enable parallel DML with this SQL command: ALTERSESSION ENABLE PARALLEL DML; Hints such as APPEND and PQ_DISTRIBUTE alsoimprove performance while inserting data. Example external table generated by OSCH (LOCATIONclause and location files in red) CREATE TABLE"HDFSUSER"."TEXT_ORA_EXT_TAB" ( "CUST_ID" VARCHAR2(4000), "MOVIE_ID" VARCHAR2(4000), "GENRE_ID" VARCHAR2(4000), "TIME_ID" VARCHAR2(4000), "RECOMMENDED" VARCHAR2(4000), "ACTIVITY_ID" VARCHAR2(4000), "RATING" VARCHAR2(4000), "SALES" VARCHAR2(4000) ) ORGANIZATION EXTERNAL ( TYPE ORACLE_LOADER DEFAULT DIRECTORY"ORACLETEST_DIR" ACCESS PARAMETERS ( RECORDS DELIMITED BY 0X'0A' CHARACTERSET AL32UTF8 STRING SIZES ARE IN CHARACTERS PREPROCESSOR"OSCH_BIN_PATH":'hdfs_stream' FIELDS TERMINATED BY 0X'09' MISSING FIELD VALUES ARE NULL ( "CUST_ID" CHAR(4000), "MOVIE_ID" CHAR(4000), "GENRE_ID" CHAR(4000), "TIME_ID" CHAR(4000), "RECOMMENDED" CHAR(4000), "ACTIVITY_ID" CHAR(4000), "RATING" CHAR(4000), "SALES" CHAR(4000) ) ) LOCATION ( 'osch-20130725105856-1332-1’,   'osch-20130725105856-1332-2',   'osch-20130725105856-1332-3',   'osch-20130725105856-1332-4'  ........     ........  ) ) REJECT LIMIT UNLIMITED PARALLEL

For fast performance Oracle SQL Connector for HDFS (OSCH) can use parallel threads in the database to read data files on HDFS.   This post discusses the settings in OSCHthat are relevant for using...

How to Load Oracle Tables From Hadoop Tutorial (Part 3 - Direct Path)

Oracle Loader for Hadoop: OCI Direct Path In the previous tutorial post we discussed the basic mechanics and structure of an OLH job using JDBC. In this post we move on to the more mainstream method used for OLH, specifically OCI Direct Path. The focus here is on loading Oracle tables with really big data, and we will discuss how to do this efficiently, and provide some basic rules for optimizing load performance. We will discuss the mechanics of submitting an OLH job, and then take a dive into why this OLH load method is what you want to use for most situations. The Structure of an OLH Command using OCI Direct Path The structure of an OLH command using OCI Direct Path is very similar to the structure we described for submitting a JDBC load: $HADOOP_HOME/bin/hadoop jar $OLH_HOME/jlib/oraloader.jar oracle.hadoop.loader.OraLoader  -D oracle.hadoop.loader.jobName=OLHP_fivdti_dtext_oci_10_723  -D oracle.hadoop.loader.loaderMapFile=file:/tmp/loaderMap_fivdti.xml  -D mapred.reduce.tasks=10  -D mapred.input.dir=/user/olh_performance/fivdti/56000000_90  -D mapred.output.dir=/user/oracle/olh_test/results/fivdti/723   -conf /tmp/oracle_connection.xml   -conf /tmp/dtextInput.xml   -conf /tmp/dlOutput.xml Aside from cosmetic changes (e.g. the job name) the key differences between this and the JDBC command discussed in lesson 2, is a non-zero value for “mapred.reduce.tasks” property and a different conf file for specifying the type of output (i.e. the bold italic lines above). The new file we are using, “dlOutput.xml”, specifies the output format is OCI Direct Path (and not JDBC): <configuration>   <property>     <name>mapreduce.outputformat.class</name>     <value>oracle.hadoop.loaderlib.output.OCIOutputFormat</value>   </property> </configuration> So switching from JDBC to OCI Direct Path is trivial. A little less trivial is why OCI Direct Path is preferred and what rules you should know to make this type of loading perform well and to maximize efficiency. Rule 1: When using OCI Direct Path the target table must be partitioned. This might sounds like a constraint, but practically speaking it isn’t. Exploiting Oracle Table Partitioning   A full understanding of Oracle table partitioning goes beyond the scope of this tutorial, and you would be advised to read related documentation that gets into this subject in depth, but for the sake of readers who live mostly in the world of Hadoop and have a limited understanding of Oracle let’s briefly outline the basics of what Oracle table partitioning is and why it is essential to understand. Rule 2: If you are loading really big data into an Oracle table, your Oracle table will want to be partitioned. The reason is pretty simple. Table partitions are Oracle’s method of breaking up a table into workloads that can be optimized transparently by SQL.  In the same way MapReduce jobs scale out by breaking up a workload into data blocks and scheduling tasks to work in parallel against data blocks, Oracle SQL does the same with partitions. This is not only true for querying but it is also true for doing big loads. Let’s look at the “fivdti” table we have been using. A flat table would be declared like this: CREATE TABLE fivdti   (f1 NUMBER,    i2 INT,    v3 VARCHAR2(50),    d4 DATE,    t5 TIMESTAMP,    v6 VARCHAR2(200),    i7 INT);   A partitioned table declaration, using a hash partitioning scheme would look like this: CREATE TABLE fivdti   (f1 NUMBER,    i2 INT,    v3 VARCHAR2(50),    d4 DATE,    t5 TIMESTAMP,    v6 VARCHAR2(200),    i7 INT) PARTITION BY HASH(i7) PARTITIONS 10 PARALLEL; With the simple addition of the partition clause at the bottom of the CREATE TABLE clause, you’ve empowered Oracle to exploit big optimizations for processing.  The clause tells Oracle that the table should be divided into 10 partitions, and the partition for a row is determined by performing a hash operation on the value of the i7 column.  If you were to compare load rates using OLH, SQL*Loader, or SQL for the flat table and the table that is partitioned, you would typically see a dramatic difference that favors partitioning. The same holds true for SQL. When querying partitioned tables, SQL can do all sorts of tricks under the covers to use parallel query technology that subdivides a job and maximizes parallel CPU and IO. Oracle table partitioning comes in various flavors such as hash, list, and range. They also can be composites of the same.  OLH supports all partition methods except reference partitioning and virtual column-based partitioning. Advantages of OCI Direct Path OCI Direct Path is a well-established method of loading data into Oracle using OCI (Oracle’s C based client interface) or SQL*Loader. It is a code path dedicated to bulk loading and its key advantage is that it bypasses Oracle SQL, which makes it very efficient. Virtually all relational database systems including Oracle are built on two layers of software: one for managing data at the row level (i.e. SQL), and another for managing data at the block level (i.e. storage).  Loading through SQL (i.e. Oracle’s front door) is expensive. It’s okay when one is inserting a singleton row or a small array of rows, but it uses a lot of code path before the rows are passed onto storage and are copied into data blocks that ultimately get written to disk. OCI Direct Path load is a short cut with an API whose code path both on the client and in the Oracle database is streamlined for loading. It does the work of preparing rows for storage in data blocks using client resources. (For our case the client is OLH running in Hadoop.)  It then sends blocks of rows to Oracle’s storage layer in a form close to what will be written to disk on a code path that minimizes contention: rows don’t need to pass through Oracle's buffer cache layer. It also maximizes parallelism for multi-block IO.  OCI Direct Path can also take advantage of presorted data which helps if it needs to build indexes for a table. Running an OLH Job With OCI Direct Path This pretty much looks the same as running a job with JDBC, except that the reduce phase always executes (since the target table is partitioned) , and it is much faster. For both JDBC and OCI Direct Path the actual loading of the Oracle table occurs when the Reduce phase is 67% complete. For large loads approximating or exceeding a terabyte you will see a big difference in the time spent in this phase. OCI Direct Path is much faster than JDBC. Are You Balanced? Rule 3: After running an OLH load, check out the Oraloader report to see if it is balanced. After the run of a successful OLH job, the output directory (specified by the “mapred.output.dir” property) generates an elegant report called “oraloader-report.txt” that details the work done in the reduce phase. It identifies reducer tasks that ran and associated statistics of their workload: bytes loaded, records loaded, and duration of the tasks (in seconds). If the load is not balanced, the values for bytes and duration will vary significantly between reduce tasks, and you will want to make adjustments. Optimizing OLH and OCI Direct Path Now we will discuss basic steps to optimize OLH using OCI Direct Path: · Choosing a good number for Reducer Tasks · Enabling the OLH Sampler · Finding the sweet spot for Hadoop Map Reduce payloads · If possible load using SDP transport protocol   Choosing a Number for Reducer Tasks Rule 4: When using OCI Direct Path you want to choose the number of reducer tasks to be close to a multiple of the number of reducer slots allocated on your Hadoop cluster. Reducer slots in Hadoop mean the number of processes that can run in a Hadoop cluster at once, performing the reduce phase for an OLH job. The Hadoop Map/Reduce Administration UI displays this as Reduce Task Capacity. Typically you choose some multiple of the number of reducer slots available. For example if the reduce task capacity in the Hadoop cluster is 50, then a mapred.reduce.tasks value of 50 or 100 should work well. The purpose of this rule is to try to get reducers running and loading at the same time, and to make sure all available slots are being used. Not doing this can be costly. For example, suppose there are 50 reducer slots but you set the number of reducer tasks to 51. If reduce loads are balanced then the 50 reducer slots will start and finish at roughly the same time, but you will to wait for the singleton 51st task to run, which will double the time the reduce phase spends loading the data. Rule 4 only works fully to your advantage when the data sets are balanced (i.e. you are using the Sampler) and your OLH job is not competing with other ongoing Map Reduce jobs that can steal reduce slots that you were expecting to use.  Note that Apache actually recommends a value close to a multiple of the number of reducer slots, for dealing with situations where reducers are not balanced. This takes us to the next rule. Rule 5: Always use the OLH Sampler. The OLH Sampler The OLH Sampler is an optional feature of OLH that does a great job of balancing the workloads of reducer tasks when partitions are not balanced.  (Note that the Sampler works with all OLH load methods, not just OCI Direct Path).  You can control the Sampler manually by setting the following property to “true” or “false” (for recent versions of OLH the Sampler is turned on by default): -D oracle.hadoop.loader.sampler.enableSampling=true For example, suppose I had a customer table which was partitioned using list partitioning representing the fifty states in the United States. Most likely the partition representing California will be much larger than the state of New Hampshire. Without enabling the OLH Sampler, a single reducer task has the burden of publishing a whole partition. This means that one reducer will have to publish California records while another will be tasked to publish the records from New Hampshire. This will cause skew, where some tasks have bigger workloads than others. The OLH Sampler addresses this pathology, and breaks up large partitions into smaller equal sized units that can be dispatched evenly across various reducer tasks. The overhead of the OLH Sampler is very small for big data payloads. A Hadoop Map Reduce job typically takes minutes or hours, and the sampler overhead typically takes a few seconds. (OLH console output tells you at the outset if the Sampler is running and how much time it cost.) It runs at the beginning of the Map Reduce job and samples the dataset to determine differences between partition sizes, it then creates an partition strategy which balances the reduce load evenly. Another pathology that the Sampler addresses is when you have more available reducer slots than partitions in your table. For instance suppose your table has 10 partitions but your Hadoop cluster has 50 reducer slots free. You would want to set the number of reduce tasks to take advantage of all these reducer slots to speed up the load. -D mapred.reduce.tasks=50 But without the Sampler enabled this tuning knob would not have the desired effect. When the Sampler is not enabled, partitions are restricted to a single reducer task, which means that only 10 reducers will do real work, and the other 40 reduce slots will have nothing to do. Based on our experience the Sampler should be used virtually all the time. The only situation to be wary of is when the Hadoop input splits are clustered by the reduce key. (e.g. the input data living in HDFS files is sorted by the value of the partition column). Under these circumstances loads might still be unbalanced. The work-around for clustered data is to force the Sampler to spend more time looking at the distribution of data by looking at more splits. (By default it looks at at least five). This is done by using the following property and setting <N > to a higher number. -D oracle.hadoop.loader.sampler.minSplits=<N> Again the higher number will impose more Sampler overhead at the beginning of the job but this should be rewarded with more efficient use of Hadoop resources Finding the Sweet Spot for Hadoop Map Reduce Payloads Rule 6: Experiment with different sized payloads. Hadoop is great technology that does a good job of making sure that Map Reduce payloads scale. That being said, the resources of a Hadoop cluster are still finite, and there is a breaking point where load sizes are simply too big. Hadoop’s scaling typically breaks down in the reduce shuffle/sort stage where there is a tremendous amount of disk and network IO going on within a Hadoop cluster to move sorted data to designated systems where reducer tasks will do the actual loading. A telling sign is when you see your Hadoop job start to suffer from failed and restarted task attempts in the reduce phase. The other obvious sign is that when you double your payload, the time to process the load is substantially greater than a factor of 2. It’s a good idea to spend some time experimenting with different load sizes to see what your Hadoop configuration can handle. Obviously, if you break down a single big job into a series of smaller jobs, you will be paying a higher cost of overhead for starting up and tearing down multiple Map Reduce jobs.  That being said, breaking down a 90 minute OLH payload into three smaller 30 minute payloads is a perfectly reasonable strategy, since the startup/teardown overhead for running each OLH job is still very small compared to the total time running. Use the SDP Protocol on Exadata and the BDA   Rule 7: If you are using Oracle Exadata and Oracle BDA with Infiniband, use SDP protocol. SDP is a network transport protocol supported for loading tables living in an Oracle Exadata machine with HDFS data living in Oracle BDA (Big Data Appliance). Exadata and BDA move data using Infiniband, which has very high throughput and low latency. Because Infiniband has such high bandwidth, it can create bottlenecks in conventional TCP sockets. SDP is an alternative networking protocol that uses RDMA technology which allows network interfaces to move data packets directly into RAM without involving CPU. In other words it doesn’t reproduce the network bottleneck that is seen when using TCP. In performance test runs we’ve found that using SDP improves the load stage of an OLH Direct Path by ten to twenty percent. If you are running OLH Direct Path jobs using Infiniband, you will want to take advantage of SDP.  The way this is done is to configure Exadata listeners with an SDP port, and assert an additional Oracle connection descriptor dedicated to SDP when running OLH. <property>   <name>oracle.hadoop.loader.connection.oci_url</name>   <value>(DESCRIPTION=(ADDRESS=(PROTOCOL=SDP)(HOST=192.168.40.200)(PORT=1523)) (CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=dbm)))   </value> </property> This esoteric property isolates SDP usage only when OLH reduce tasks create connections to Oracle to execute OCI Direct Path loading. All other network activity uses standard TCP connections.  

Oracle Loader for Hadoop: OCI Direct Path In the previous tutorial post we discussed the basic mechanics and structure of an OLH job using JDBC. In this post we move on to the more mainstream method...

How to Load Oracle Tables From Hadoop Tutorial (Part 2 - Hello World)

              A “Hello World” Example using OLH This post will discuss the basic mechanics for loading an Oracle table using Oracle Loader for Hadoop (OLH). For this “Hello World” discussion, we will use JDBC to drive the example, loading delimited text living in HDFS files into a simple un-partitioned table called “FIVDTI” living in an Oracle schema. It will illustrate the bare bones structure of an OLH job, and touch upon the minimal configuration properties you need to set to get something working end to end. This tutorial assumes that you know how to run basic MapReduce jobs and know how to connect to Oracle with SQL*Plus to create and drop tables using Oracle SQL. Restating what was explained in the introduction to this tutorial, OLH uses a MapReduce job to read data living in HDFS and to load it into a target table living in Oracle. If the data you are loading is in a typical form (e.g. delimited text or CSV files), you should be able to load a table interactively with a single command. Requirements for Running an OLH Job Since OLH runs MapReduce jobs, the OLH command will need to run on either some system on a Hadoop cluster, or on a system that has client access to the cluster. That system will also need JDBC access to an Oracle database that has a schema with the table that you want to load. Finally the system will need access to an OLH installation and its jar files (i.e. OLH_HOME in our running example below). If you want to use a Hadoop client, setting it up on a development node is not a lot of hard work. You simply need to download and install the Hadoop software that is running on the cluster using the configuration files specific to the cluster. You want to sanity test everything to make sure the plumbing works. If you can kick off the Hadoop Wordcount MapReduce job and can read/write/delete HDFS files interactively using Hadoop, your Hadoop plumbing should be ready to accept an OLH job. (See http://hadoop.apache.org/docs/r1.0.4/mapred_tutorial.html for details on sanity checking MapReduce.) At this point it would be good to carve out a subdirectory in HDFS that you own that will hold result directories of OLH jobs that you want to run. (In our working example in this post the result log directory will live in “/user/olh_test/results/fivdti”.) You use hadoop to do this: hadoop fs –mkdir /user/oracle/olh_test/results/fivdti You want to make sure that you are either the owner of this HDFS directory or at least have read and write access to it. Connecting to an Oracle database requires you to connect using an Oracle connection URL with the name of the Oracle user and the Oracle password. In this example, we are assuming the Oracle schema and the Oracle user are the same. (See http://radiofreetooting.blogspot.com/2007/02/user-schema.html for an explanation of the difference between the two concepts.) Before kicking off an OLH job it is worth taking the time to ensure the Oracle connection credentials are correct. Remember OLH will load an Oracle table across many Hadoop map or reduce tasks each of which will try to make a JDBC connection to the database, so you want to eliminate trial-and-error type authentication hiccups before passing them on to OLH. (See http://www.dba-oracle.com/t_oracle_jdbc_connection_testing.htm for details.) While configuring and testing the OLH framework you will also want some minimal administration of Hadoop and Oracle. Specifically you will want to browse the Hadoop URL that contains JobTracker information where you can monitor a load job. You will also need to connect to Oracle via SQL*Plus to do administration with your schema and to manually inspect the target table using SQL. For purposes of OLH, installing a formal Oracle client on your development system, though convenient, is overkill. You simply need to log onto a system where SQL*Plus is available (e.g. the system where Oracle is running) and where you can connect to the Oracle database. The Structure of an OLH Command Let’s start by looking at an OLH command that you would invoke to kick off an OLH MapReduce job. Again this will use JDBC to connect to the Oracle database and load a table with delimited text living in files in HDFS. $HADOOP_HOME/bin/hadoop jar $OLH_HOME/jlib/oraloader.jar oracle.hadoop.loader.OraLoader   -D oracle.hadoop.loader.jobName=OLHP_fivdti_dtext_jdbc_0_722   -D oracle.hadoop.loader.loaderMapFile=file:/tmp/loaderMap_fivdti.xml   -D mapred.reduce.tasks=0   -D mapred.input.dir=/user/olh_performance/fivdti/56000000_90   -D mapred.output.dir=/user/oracle/olh_test/results/fivdti/722   -conf /tmp/oracle_connection.xml   -conf /tmp/dtextInput.xml   -conf /tmp/jdbcOutput.xml The command starts with an invocation of hadoop (living in the Hadoop client) passing the “jar” command. $HADOOP_HOME/bin/hadoop jar This is followed by a reference to OLH’s jar file (“oraloader.jar”) and the OLH loader class (fully qualified by its classpath) that defines an OLH load job. $OLH_HOME/jlib/oraloader.jar oracle.hadoop.loader.OraLoader The following two properties are consumed by OLH. (Note that the space between a –D and the property name is not a typo. It is Hadoop’s convention for setting properties that are directed for Hadoop rather than for the JVM running underneath it.) The first is to give the job a meaningful name when it executes in Hadoop. When creating a name, I find it useful to capture the name of the table being loaded, the type of input, the load method, the number of reducers used, and a unique job number for the OLH run (i.e. 722). -D oracle.hadoop.loader.jobName=OLHP_fivdti_dtext_jdbc_0_722 MapReduce jobs are long running batch jobs. While debugging and tuning OLH load operations you will want to spend a lot of time looking at Hadoop’s JobTracker URL to find your job by name and see how it behaves. The second property points to the loader map. The loader map is an XML file that indicates what Oracle table will be loaded, and indicates the fields in a delimited text file that map to columns in the target table. -D oracle.hadoop.loader.loaderMapFile=file:/tmp/loaderMap_fivdti.xml The next three properties are consumed by Hadoop. The first designates the number of reduce tasks to run. The reduce stage in OLH is used for improving load performance by sorting records by table partitions and optionally sorting by table columns. If the table being loaded is not partitioned and if there is no value of sorting by columns, then you will want to set this value to zero. This means that the MapReduce job only runs the map stage, which simply loads the Oracle table.  Note that for JDBC, a zero value can be used even if the table is partitioned.  But if a table is partitioned you probably will want to use the OCI Direct Load method which does require a non-zero value.  (Using OCI Direct Load to load partitioned tables will be discussed in depth in the next post.)  -D mapred.reduce.tasks=0 The second property designates the HDFS directory that holds input files of delimited text that are to serve as the payload for the target table. -D mapred.input.dir=/user/olh_performance/fivdti/56000000_90 Finally you specify an HDFS directory where OLH can write logging output specific to the job. In this case we carved out a directory under /user/oracle/olh_test/results, and assigned a unique integer for the job (i.e. 722). An important note: the subdirectory “722” is created by Hadoop once the job is submitted. If it exists before the job is submitted Hadoop will fail. -D mapred.output.dir=/user/oracle/olh_test/results/fivdti/722/ The final arguments are files that complete configuration. For purposes of modularity they are split across three configuration files that can be reused for different types of jobs. The first file has the credentials needed to connect to the Oracle database. -conf /tmp/oracle_connection.xml The second file designates the form of input (“delimited text”). -conf /tmp/dtextInput.xml The last file designates the output (i.e. online loading of an Oracle table via JDBC). -conf /tmp/jdbcOutput.xml Again all of these properties embedded in these various configuration files could be bundled into one large configuration file or called out explicitly using in-line “-D” properties. However, assuming you don’t have a penchant for obscurity or verbosity, it’s probably prudent to breakdown the various configuration settings into configurations files that are organized to be reusable. Tuning Hadoop to run big MapReduce jobs can take some time to get optimal, and once you get there it’s a good idea to isolate the settings so they can be easily reused for different jobs. It’s also easier to test different combinations on the fly (e.g. trying JDBC and then trying OCI Direct Path) against the same table. You just need to swap the jdbcOutput.xml for the analogous specification for OCI Direct Path. Loader Maps Loader maps are used to map fields read as input to columns living in the Oracle table that will be loaded. They also identify the table being loaded (e.g. “FIVDTI”) and the schema (“OLHP”) where the table resides First let’s look at the Oracle table we are loading: SQL> describe fivdti Name Null Type ------------------------------------------------------------- F1 NUMBER I2 NUMBER(38) V3 VARCHAR2(50) D4 DATE T5 TIMESTAMP(6) V6 VARCHAR2(200) I7 NUMBER(38) Again, the loader map file is identified by the following OLH property: -D oracle.hadoop.loader.loaderMapFile=file:/tmp/loaderMap_fivdti.xml The file contains this specification: <LOADER_MAP>    <SCHEMA>OLHP</SCHEMA>    <TABLE>FIVDTI</TABLE>    <COLUMN field="F0">F1</COLUMN>    <COLUMN field="F1">I2</COLUMN>    <COLUMN field="F2">V3</COLUMN>    <COLUMN field="F3" format="yyyy-MM-dd HH:mm:ss">D4</COLUMN>    <COLUMN field="F4">T5</COLUMN>    <COLUMN field="F5">V6</COLUMN>    <COLUMN field="F6">I7</COLUMN> </LOADER_MAP> We need to map fields in the delimited text to the column names specified in the table. Field names in delimited text files can default to a simple naming convention, where fields are named by default to “F0, F1, F2…” reflecting the physical order that they appear in, in a line of CSV text. These field names are then paired with column names. At the top of the specification is the schema name and table name. What is critical about loader maps is ensuring that the text fields being loaded can be legally converted to the Oracle data type of the mapped columns, considering issues such as precision and scale that are asserted on the Oracle columns. Typically for standard scalar types this is straight forward. However DATE columns are fussier and require the user to describe an explicit format. (Strictly speaking, loader maps are not required in cases when column names reflect field names and the DATE data type is not used, but in this case we are using default field names that are different from the formal column names of the Oracle table.) Configuration Specified in Files Let’s look at the settings living in the configuration files specified above. OLH Connection Properties The oracle_connection.xml file in the example above contains the credentials needed to find the Oracle database and to connect to a schema. This information gets passed to the OLH MapReduce jobs. The connection URL follows the “jdbc:oracle:thin” pattern. You need to know the host where the Oracle database is running (e.g. “myoraclehost”), the listening port (e.g. 1511), and the database service name (e.g. “dbm”). You also need to identify the Oracle user (e.g. “olhp”) and password (e.g. “welcome1”). (In a production environment you will want to use Oracle Wallet to avoid storing passwords in clear text, but we will save that issue for a later post.) <configuration> <property>     <name>oracle.hadoop.loader.connection.url</name>     <value>jdbc:oracle:thin:@myoraclehost:1511/dbm</value>   </property>   <property>     <name>oracle.hadoop.loader.connection.user</name>     <value>olhp</value>   </property>   <property>     <name>oracle.hadoop.loader.connection.password</name>     <value>welcome1</value>    </property> </configuration> OLH Input Format The dtextInput.xml configuration file in the example above describes the physical characteristics of the rows of data that OLH will read. OLH provides off-the-shelf built-in input format implementations that cover many common formats that you would expect to see living in Hadoop data files. These include delimited text and CSV, text with regular expressions, Hive formats, and Oracle NoSQL database. In general, it’s much easier to generate output in a form that is compliant with the existing built-ins, although rolling your own input format class is a supported option. In our running example we are using default settings for delimited text, which is CSV. Additional properties for delimited text allow you to specify alternative field terminators, and require explicit field enclosers (useful when you want to use CSV as an input format but when there are commas embedded in field values.) <configuration>  <property>     <name>mapreduce.inputformat.class</name>     <value>oracle.hadoop.loader.lib.input.DelimitedTextInputFormat</value>  </property> </configuration> OLH Output Format The jdbcOutput.xml file is used to specify how we want to load Oracle. Output formats can be divided into two types:those that load the data directly into an Oracle database table, or those that store the data in a new set of files in HDFS that can be pulled into Oracle later. For our running example, we are going to use JDBC. <configuration>    <property>     <name>mapreduce.outputformat.class</name>     <value>oracle.hadoop.loaderlib.output.JDBCOutputFormat</value>   </property> </configuration> Running the OLH Job When you kick off the OLH Job it will give you console output that looks something like this: Copyright (c) 2011, 2013, Oracle and/or its affiliates. All rights reserved. 13/05/30 09:18:00 INFO loader.OraLoader: Oracle Loader for Hadoop Release 2.1.0 - Production Copyright (c) 2011, 2013, Oracle and/or its affiliates. All rights reserved. 13/05/30 09:18:00 INFO loader.OraLoader: Built-Against: hadoop-2.0.0-mr1-cdh4.1.2 hive-0.9.0-cdh4.1.2 avro-1.6.3 jackson-1.8.8 13/05/30 09:18:02 INFO loader.OraLoader: oracle.hadoop.loader.loadByPartition is disabled because mapred.reduce.tasks=0 13/05/30 09:18:02 INFO loader.OraLoader: oracle.hadoop.loader.enableSorting disabled: cannot sort by key when number of reducers is zero 13/05/30 09:18:02 INFO output.DBOutputFormat: Setting map tasks speculative execution to false for : oracle.hadoop.loader.lib.output.JDBCOutputFormat 13/05/30 09:18:02 WARN loader.OraLoader: Sampler error: the number of reduce tasks must be greater than one; the configured value is 0 . Job will continue without sampled information. 13/05/30 09:18:02 INFO loader.OraLoader: Sampling time=0D:0h:0m:0s:14ms (14 ms) 13/05/30 09:18:02 INFO loader.OraLoader: Submitting OraLoader job OraLoader . 13/05/30 09:18:02 INFO input.FileInputFormat: Total input paths to process : 90 13/05/30 09:18:04 INFO loader.OraLoader: map 0% reduce 0% 13/05/30 09:18:19 INFO loader.OraLoader: map 1% reduce 0% 13/05/30 09:18:20 INFO loader.OraLoader: map 2% reduce 0% 13/05/30 09:18:22 INFO loader.OraLoader: map 3% reduce 0% …. 13/05/30 09:21:13 INFO loader.OraLoader: map 95% reduce 0% 13/05/30 09:21:16 INFO loader.OraLoader: map 96% reduce 0% 13/05/30 09:21:18 INFO loader.OraLoader: map 97% reduce 0% 13/05/30 09:21:20 INFO loader.OraLoader: map 98% reduce 0% 13/05/30 09:21:23 INFO loader.OraLoader: map 99% reduce 0% 13/05/30 09:21:31 INFO loader.OraLoader: map 100% reduce 0% 13/05/30 09:21:33 INFO loader.OraLoader: Job complete: OraLoader (job_201305201106_0524) Note that because we are loading a simple non-partitioned table, this is a map-only job where loading is done by mappers and there is no reduce phase. The warning message at the outset of the job is about an OLH feature called the sampler. It is used when tables are partitioned to balance the reducers doing a load. Since the target table is not partitioned the warning about the sampler being disabled is not interesting. Where to look if something went wrong I’ve run OLH jobs interactively daily for more than a year. When something goes wrong, Hadoop console output will make it obvious, and typically gives a pretty good idea of what problem you are having. I also rely on Hadoop’s Job and Task tracker UIs which allow you to drill down to a failed task and look at the output it produces: typically Java stack dumps and log messages that detail the problems it was having. The results directory in HDFS that was specified in the “mapred.output.dir" setting in the OLH command contains lots of information about a job run. In the directory there will be a top level report called “oraloader-report.txt”. It offers a clean breakdown of time spent by tasks running in Hadoop that were used to load the target table. It probably is the quickest way of looking at the workloads and determining if they are unbalanced. Hadoop and Connection Resources About the only additional issue that you need to be concerned about for this kind of job is the number of concurrent connections the Oracle database accepts. This becomes a problem when the number of Hadoop tasks that are loading a table concurrently exceeds the number of connections that Oracle will accept. If that happens the loading tasks will fail with an ORA-00020 error. You want to check the number of map and reduce slots that are configured for Hadoop. For map-only jobs, if the number of map slots is less than the number of connections Oracle accepts,  there won’t be a problem. The same holds true for full MapReduce jobs if the number of reduce slots are less than the max number of Oracle connections accepted. If this is not true you need to artificially restrict the concurrency of load tasks. For map-only jobs (like the one illustrated above) this means you will need to restrict the number of map slots in the cluster available to the OLH job to something less than the number of connections Oracle allows. For full MapReduce OLH jobs (which are more typical) loading occurs in the reduce phase, and this can be easily controlled in the OLH command by tweaking the “mapred.reduce.tasks” property mentioned above, and setting it to an appropriate number. -D mapred.reduce.tasks=20 Summary To summarize, a bare bones OLH configuration typically needs the following information: How to connect to Oracle How many reduce tasks to run The form of input (e.g. CSV) and output (e.g. JDBC) An HDFS directory containing input files An HDFS directory where OLH can write information about job results A loader map that tells what fields correspond to what columns in a table   That's it.  Easy-peasy lemon squeezy.  All OLH commands pretty much look like the one used above with minor substitutions.  Creating configurations for other tables typically requires only creating new a loader map specification. All the other configuration files can be reused. The next post will discuss using OLH with OCI Direct Load which is what you want to use when loading big tables that are sorted by key or are partitioned. We will spend a lot of time discussing performance issues such as load balancing, using the OLH sampler, and using the SDP transport protocol.  

              A “Hello World” Example using OLH This post will discuss the basic mechanics for loading an Oracle table using Oracle Loader for Hadoop (OLH). For this “Hello World” discussion, we will use JDBC...

Accessing a Hive Table from Oracle Database

Hive is a popular tool to query data in HDFS.  Oracle SQL Connector for HDFS can read data directlyfrom a Hive table in version 2.0. Also,the command-line tool of Oracle SQL Connector for HDFS has been simplifiedgreatly. The connector only needs theHive table and Hive database name, specified as Hadoop configuration properties,to access data from a Hive table. oracle.hadoop.exttab.sourceType=hiveoracle.hadoop.exttab.hive.tableName=<hive table name> oracle.hadoop.exttab.hive.databaseName=<hivedatabase name> A command-line tool creates an external table in OracleDatabase. This external table is queriedto access data in the Hive table. Since anexternal table can be queried like any other table in the database, Oracle SQLfunctionality is available to query the Hive table. The name of the external table and database connectioninformation are also specified as Hadoop configuration properties. hadoop jar$OSCH_HOME/jlib/orahdfs.jar \ oracle.hadoop.exttab.ExternalTable -D oracle.hadoop.exttab.sourceType=hive\ -Doracle.hadoop.exttab.hive.tableName=<hive table name> \ -Doracle.hadoop.exttab.hive.databaseName=<hive database name> \ -Doracle.hadoop.connection.url=<database connection url> \ -D oracle.hadoop.exttab.tableName=<externaltable name> \ …. -createTable While creating the external table the Hive table data types areautomatically mapped to Oracle data types in the external table. Below are some sample mappings: Hive Tinyint, Smallint, Int Oracle NUMBERHive Float, Double Oracle NUMBER(default precision)Hive Decimal Oracle NUMBERHive Boolean Oracle VARCHAR2(5)Hive String Oracle VARCHAR2(4000)Hive Timestamp Oracle TIMESTAMP

Hive is a popular tool to query data in HDFS.  Oracle SQL Connector for HDFS can read data directly from a Hive table in version 2.0. Also,the command-line tool of Oracle SQL Connector for HDFS has...

Welcome!

Welcome to the Oracle Big Data Connectors blog, which willfocus on posts related to integrating data on a Hadoop cluster with OracleDatabase. In particular the blog will focus on best practices, usagenotes, and performance tips for using Oracle Loader for Hadoop and OracleDirect Connector for HDFS, which are part of Oracle Big Data Connectors. Oracle Big Data Connectors 1.0 also includesOracle R Connector for Hadoop and Oracle Data Integrator Application Adaptersfor Hadoop. Oracle Loader for Hadoop: Oracle Loader for Hadoop loads data fromHadoop to Oracle Database. It runs as aMapReduce job on Hadoop to partition, sort, and convert the data into anOracle-ready format, offloading to Hadoop the processing that is typically doneusing database CPUs. The data is thenloadedto the database by the Oracle Loader for Hadoop job (online load) or writtenout as Oracle Data Pump files for load and access later (offline load) withOracle Direct Connector for HDFS. Oracle Direct Connector for HDFS: Oracle Direct Connectorfor HDFS is a connector for high speed access of data on HDFS from Oracle Database. With this connector Oracle SQL can be used todirectly query data on HDFS. The datacan be Oracle Data Pump files generated by Oracle Loader for Hadoop ordelimited text files. The connector canalso be used to load data into the database using SQL.

Welcome to the Oracle Big Data Connectors blog, which will focus on posts related to integrating data on a Hadoop cluster with Oracle Database. In particular the blog will focus on best practices,...

Oracle

Integrated Cloud Applications & Platform Services