Copy to Hadoop copies data from an Oracle Database table to
HDFS, as Oracle Data Pump files. These
files can be accessed by Hive tables using a SerDe that is part of Copy to
Hadoop. So Hive queries can be run
against this data.
What if you would like to include this data in a Spark ML
(machine learning) application? A Spark
data frame can access Oracle Data Pump files via Hive. Below is an example.
Configure the Spark Installation to use Hive
To read Oracle Data Pump files into a Spark Data Frame the
Spark installation must be configured with a Hive installation. One way to achieve this is to modify the
conf/spark_env.sh and append HIVE_CONF_DIR to the environment variable
SPARK_DIST_CLASSPATH. If your Spark
installation is not built with Hive, then Hive related jars should also be
appended to the SPARK_DIST_CLASSPATH. A
simple way to do this is to append HIVE_HOME/lib/* to
SPARK_DIST_CLASSPATH. This should be
done on all nodes in the cluster. More
detail about manipulating SPARK_DIST_CLASSPATH can be found in https://spark.apache.org/docs/1.5.1/hadoop-provided.html
Happily, in the Big Data Lite VM Spark is
configured to work with Hive.
Copy data from moviedemo schema in Oracle Database using
Copy to Hadoop
Let us now copy data from the moviedemo schema and create a
Hive external table. We will do this
from the OHSH (Oracle Shell for Hadoop Loaders) CLI.
As described in this blog post, create some OHSH
resources. Then copy the table
ohsh> create hive table
hive_moviedemo:movie_oracledb_tab \
from oracle table moviedemo:movie_local_tab using stage
(Note on using this example in Oracle Big Data Lite:
If the table
moviedemo:movie_local_tab in Oracle Database is empty, load it first using OLH.
ohsh> set dateformat
“yyyy-MM-dd:HH:mm:ss”
ohsh> load oracle table
moviedemo:MOVIE_LOCAL_TAB from \
hive table hive_moviedemo:movieapp_log_stage_1 database=moviedemo using
directpath
If you need to create the table
MOVIE_LOCAL_TAB, the SQL is available in
/home/oracle/movie/moviework/olh/movie_tab.sql.
Now you can copy this table back
using Copy to Hadoop )
Access Hive external table over Oracle Data Pump files in
Spark
Now let us
access the data in the Hive external table movie_oracledb_tab from Spark.
Start up spark-shell with Copy to Hadoop jars
spark-shell has to be launched by specifying the Copy to
Hadoop jars in the --jars option.
prompt> spark-shell --jars
orahivedp.jar, ojdbc7.jar, oraloader.jar, orai18n.jar, ora-hadoop-common.jar
In Oracle Big Data Lite these files are located in
/u01/orahivedp/jlib. On the Big Data
Appliance with Big Data SQL installed, these jars will be located in /opt/oracle/bigdatasql/bdcell-<ver>/jlib
Verify the
type of SQLContext in spark-shell:
scala>sqlContext
Your output
will be something like:
res0:org.apache.spark.sql.SQLContext
= org.apache.spark.sql.hive.HiveContext@66ad7167
If the
default sqlContext is not HiveContext, you have to create it:
scala> val hiveContext = new
org.apache.spark.sql.hive.HiveContext(sc)
You can now
create a Data Frame movie_oracledb_df that points to the Hive external table
movie_oracledb_tab
scala> val movie_oracledb_df = sqlContext.table("moviedemo.movie_oracledb_tab")
Now you can
access data via the data frame
scala> movie_oracledb_df.count
res3: Long = 39716
scala> movie_oracledb_df.head
The result
will be something like:
res4: org.apache.spark.sql.Row =
[1217572.000000000000000000,4951.000000000000000000,6.000000000000000000,2012-08-04
09:53:46.0,0E-18,1.000000000000000000,2.000000000000000000,null]
scala> movie_oracledb_df.groupBy("genreid").count().show()
+--------------------+-----+
| genreid|count|
+--------------------+-----+
|45.00000000000000...| 232|
|46.00000000000000...| 167|
|51.00000000000000...| 1|
|53.00000000000000...| 27|
|-1.00000000000000...| 20|
|1.000000000000000000| 66|
|2.000000000000000000| 80|
|3.000000000000000000| 995|
|6.000000000000000000| 910|
|7.000000000000000000| 459|
|8.000000000000000000| 319|
|9.000000000000000000| 367|
|10.00000000000000...| 50|
|11.00000000000000...| 269|
|12.00000000000000...| 288|
|14.00000000000000...| 189|
|15.00000000000000...| 306|
|16.00000000000000...| 147|
|17.00000000000000...| 341|
|18.00000000000000...| 186|
+--------------------+-----+
only showing top 20 rows
Access Oracle Data Pump files in Spark
If a Hive external table had not been created over Oracle
Data Pump files created by Copy to Hadoop, you can create the Hive external
table from within Spark.
scala>
sqlContext.sql(“CREATE EXTERNAL TABLE movie_oracledb_tab ROW FORMAT SERDE 'oracle.hadoop.hive.datapump.DPSerDe' STORED AS
INPUTFORMAT
'oracle.hadoop.hive.datapump.DPInputFormat' OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION
'/user/oracle/oracle_warehouse/moviedemo'")
Change the
location of the Oracle Data Pump files if they are in a different location.
Using Spark MLLib to build a recommendation system
Here we will demonstrate how to integrate the Data Frame we
created with Spark MLLib and run some fairly complex machine learning
operations on it. This example is
adapted from http://spark.apache.org/docs/latest/mllib-collaborative-filtering.html
The Hive external table moviedemo.movie_oracledb_tab has a
lot of values in the rating columns NULL. Let us extract all rows with non-NULL ratings.
scala> val movie_oracledb_df_notnull
= movie_oracledb_tab.filter(“rating is not null”)
You will see something like this on screen:
movie_oracledb_df_notnull:
org.apache.spark.sql.DataFrame = [custid: decimal(38,18), movieid:
decimal(38,18), genreid: decimal(38,18), time: timestamp, recommended:
decimal(38,18), rating: decimal(38,18), activity: decimal(38,18), sales:
decimal(38,18)]
We need to import some classes from Spark MLLib to run the
example.
scala>import
org.apache.spark.mllib.recommendation.ALS
scala>import
org.apache.spark.mllib.recommendation.MatrixFactorizationModel
scala>import
org.apache.spark.mllib.recommendation.Rating
The
algorithm used in this example is the Alternating Least Square (ALS)
algorithm. See http://dl.acm.org/citation.cfm?id=1608614 for details of this algorithm.
To use ALS
in our data, we need to transfer the data frame movie_oracledb_tab_notnull into
an RDD of ratings. We do this by
extracting the columns cust_id(0), movie_id(1), rating(6) from movie_oracledb_df_notnull.
scala> val ratings = movie_oracledb_df_notnull.map{ row =>
Rating(row.getDecimal(0).intValue(),row.getDecimal(1).intValue(),row.getDecimal(6).doubleValue())}
On screen:
ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating]
= MapPartitionsRDD[26] at map at <console>:32
The ALS algorithm has many
parameters. In this example we use the
parameters rank and iterations. Rank
designates the number of latent factors in the model and iterations specifies
the number of iterations to run. For a
full description of parameters please see http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.recommendation.ALS
scala> val rank = 10
scala> val numIterations = 10
scala> val model =
ALS.train(ratings, rank, numIterations)
You will see on screen something like:
model:
org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@3c1c984f
After we build the model we can
evaluate the model. For simplicity we
will use the same data, but of course that would make no sense in the real
world.
scala> val usersProducts =
ratings.map{ case Rating(user,product,rate) => (user, product) }
To predict user ratings
scala> val predictions =
model.predict(usersProducts).map{ case Rating(user,product,
rate)=>((user,product), rate) }
Compute the mean standard error using
the predictions and actual rating.
scala>val ratesAndPreds = ratings.map { case Rating(user, product,
rate) =>((user, product), rate) }.join(predictions)
scala>val MSE = ratesAndPreds.map { case ((user, product), (r1, r2))
=>val err = (r1 - r2) err * err}.mean()
scala>println("Mean
Squared Error = " + MSE)
We can
persist the model in HDFS to use later:
model.save(sc,
“hdfs:///user/oracle/collabFilter”)
val.savedModel
= MatrixFactorizationModel.load(sc, “hdfs://user/oracle/collabFilter”)