X

Move data between Oracle Database and Apache Hadoop using high speed connectors.

  • October 4, 2016

Using Spark with Data Copied using Copy to Hadoop

Copy to Hadoop copies data from an Oracle Database table to
HDFS, as Oracle Data Pump files.  These
files can be accessed by Hive tables using a SerDe that is part of Copy to
Hadoop. So Hive queries can be run
against this data.

What if you would like to include this data in a Spark ML
(machine learning) application? A Spark
data frame can access Oracle Data Pump files via Hive. Below is an example.

Configure the Spark Installation to use Hive

To read Oracle Data Pump files into a Spark Data Frame the
Spark installation must be configured with a Hive installation. One way to achieve this is to modify the
conf/spark_env.sh and append HIVE_CONF_DIR to the environment variable
SPARK_DIST_CLASSPATH. If your Spark
installation is not built with Hive, then Hive related jars should also be
appended to the SPARK_DIST_CLASSPATH. A
simple way to do this is to append HIVE_HOME/lib/* to
SPARK_DIST_CLASSPATH. This should be
done on all nodes in the cluster. More
detail about manipulating SPARK_DIST_CLASSPATH can be found in https://spark.apache.org/docs/1.5.1/hadoop-provided.html

Happily, in the Big Data Lite VM Spark is
configured to work with Hive.

Copy data from moviedemo schema in Oracle Database using
Copy to Hadoop

Let us now copy data from the moviedemo schema and create a
Hive external table. We will do this
from the OHSH (Oracle Shell for Hadoop Loaders) CLI.

As described in this blog post, create some OHSH
resources. Then copy the table

ohsh> create hive table
hive_moviedemo:movie_oracledb_tab \

from oracle table moviedemo:movie_local_tab using stage

(Note on using this example in Oracle Big Data Lite:

If the table
moviedemo:movie_local_tab in Oracle Database is empty, load it first using OLH.

ohsh> set dateformat
“yyyy-MM-dd:HH:mm:ss”

ohsh> load oracle table
moviedemo:MOVIE_LOCAL_TAB from \

hive table hive_moviedemo:movieapp_log_stage_1 database=moviedemo using
directpath

If you need to create the table
MOVIE_LOCAL_TAB, the SQL is available in
/home/oracle/movie/moviework/olh/movie_tab.sql.

Now you can copy this table back
using Copy to Hadoop )

Access Hive external table over Oracle Data Pump files in
Spark

Now let us
access the data in the Hive external table movie_oracledb_tab from Spark.

Start up spark-shell with Copy to Hadoop jars

spark-shell has to be launched by specifying the Copy to
Hadoop jars in the --jars option.

prompt> spark-shell --jars
orahivedp.jar, ojdbc7.jar, oraloader.jar, orai18n.jar, ora-hadoop-common.jar

In Oracle Big Data Lite these files are located in
/u01/orahivedp/jlib. On the Big Data
Appliance with Big Data SQL installed, these jars will be located in /opt/oracle/bigdatasql/bdcell-<ver>/jlib

Verify the
type of SQLContext in spark-shell:

scala>sqlContext

Your output
will be something like:
res0:org.apache.spark.sql.SQLContext
= org.apache.spark.sql.hive.HiveContext@66ad7167

If the
default sqlContext is not HiveContext, you have to create it:

scala> val hiveContext = new
org.apache.spark.sql.hive.HiveContext(sc)

You can now
create a Data Frame movie_oracledb_df that points to the Hive external table
movie_oracledb_tab

scala> val movie_oracledb_df = sqlContext.table("moviedemo.movie_oracledb_tab")

Now you can
access data via the data frame

scala> movie_oracledb_df.count

res3: Long = 39716

scala> movie_oracledb_df.head

The result
will be something like:
res4: org.apache.spark.sql.Row =
[1217572.000000000000000000,4951.000000000000000000,6.000000000000000000,2012-08-04
09:53:46.0,0E-18,1.000000000000000000,2.000000000000000000,null]

scala> movie_oracledb_df.groupBy("genreid").count().show()

+--------------------+-----+

| genreid|count|

+--------------------+-----+

|45.00000000000000...| 232|

|46.00000000000000...| 167|

|51.00000000000000...| 1|

|53.00000000000000...| 27|

|-1.00000000000000...| 20|

|1.000000000000000000| 66|

|2.000000000000000000| 80|

|3.000000000000000000| 995|

|6.000000000000000000| 910|

|7.000000000000000000| 459|

|8.000000000000000000| 319|

|9.000000000000000000| 367|

|10.00000000000000...| 50|

|11.00000000000000...| 269|

|12.00000000000000...| 288|

|14.00000000000000...| 189|

|15.00000000000000...| 306|

|16.00000000000000...| 147|

|17.00000000000000...| 341|

|18.00000000000000...| 186|

+--------------------+-----+

only showing top 20 rows

Access Oracle Data Pump files in Spark

If a Hive external table had not been created over Oracle
Data Pump files created by Copy to Hadoop, you can create the Hive external
table from within Spark.

scala>
sqlContext.sql(“CREATE EXTERNAL TABLE movie_oracledb_tab ROW FORMAT SERDE
'oracle.hadoop.hive.datapump.DPSerDe' STORED AS
INPUTFORMAT

'oracle.hadoop.hive.datapump.DPInputFormat' OUTPUTFORMAT

'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION

'/user/oracle/oracle_warehouse/moviedemo'")

Change the
location of the Oracle Data Pump files if they are in a different location.

Using Spark MLLib to build a recommendation system

Here we will demonstrate how to integrate the Data Frame we
created with Spark MLLib and run some fairly complex machine learning
operations on it. This example is
adapted from http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html

The Hive external table moviedemo.movie_oracledb_tab has a
lot of values in the rating columns NULL. Let us extract all rows with non-NULL ratings.

scala> val movie_oracledb_df_notnull
= movie_oracledb_tab.filter(“rating is not null”)

You will see something like this on screen:

movie_oracledb_df_notnull:
org.apache.spark.sql.DataFrame = [custid: decimal(38,18), movieid:
decimal(38,18), genreid: decimal(38,18), time: timestamp, recommended:
decimal(38,18), rating: decimal(38,18), activity: decimal(38,18), sales:
decimal(38,18)]

We need to import some classes from Spark MLLib to run the
example.

scala>import
org.apache.spark.mllib.recommendation.ALS

scala>import
org.apache.spark.mllib.recommendation.MatrixFactorizationModel

scala>import
org.apache.spark.mllib.recommendation.Rating

The
algorithm used in this example is the Alternating Least Square (ALS)
algorithm. See http://dl.acm.org/citation.cfm?id=1608614 for details of this algorithm.

To use ALS
in our data, we need to transfer the data frame movie_oracledb_tab_notnull into
an RDD of ratings. We do this by
extracting the columns cust_id(0), movie_id(1), rating(6) from movie_oracledb_df_notnull.

scala> val ratings = movie_oracledb_df_notnull.map{ row =>
Rating(row.getDecimal(0).intValue(),row.getDecimal(1).intValue(),row.getDecimal(6).doubleValue())}

On screen:

ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating]
= MapPartitionsRDD[26] at map at <console>:32

The ALS algorithm has many
parameters. In this example we use the
parameters rank and iterations. Rank
designates the number of latent factors in the model and iterations specifies
the number of iterations to run. For a
full description of parameters please see
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.recommendation.ALS

scala> val rank = 10

scala> val numIterations = 10

scala> val model =
ALS.train(ratings, rank, numIterations)

You will see on screen something like:

model:
org.apache.spark.mllib.recommendation.MatrixFactorizationModel =
org.apache.spark.mllib.recommendation.MatrixFactorizationModel@3c1c984f

After we build the model we can
evaluate the model. For simplicity we
will use the same data, but of course that would make no sense in the real
world.

scala> val usersProducts =
ratings.map{ case Rating(user,product,rate) => (user, product) }

To predict user ratings

scala> val predictions =
model.predict(usersProducts).map{ case Rating(user,product,
rate)=>((user,product), rate) }

Compute the mean standard error using
the predictions and actual rating.

scala>val ratesAndPreds = ratings.map { case Rating(user, product,
rate) =>((user, product), rate) }.join(predictions)

scala>val MSE = ratesAndPreds.map { case ((user, product), (r1, r2))
=>val err = (r1 - r2) err * err}.mean()

scala>println("Mean
Squared Error = " + MSE)

We can
persist the model in HDFS to use later:

model.save(sc,
“hdfs:///user/oracle/collabFilter”)

val.savedModel
= MatrixFactorizationModel.load(sc, “hdfs://user/oracle/collabFilter”)

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.