X

Oracle Big Data Spatial and Graph - technical tips, best practices, and news from the product team

Integrating Apache Spark with Oracle Big Data Spatial and Graph Property Graph

Alan Wu
Architect

This is a joint work with Eduardo Pacheco and Gabriela Montiel (primary contributors).

Apache Spark is an
exciting open source big-data framework that allows data scientists to do in
memory computations. It allows one to process large amounts of data efficiently.
Apache Spark comes with a set of powerful libraries for processing data namely SQL, MLlib,
Spark Streaming, and DataFrames and
it is able to read data from different sources like HDFS and Apache HBase. Because
of all these Apache Spark can be an
excellent complementary technology to our Oracle Big Data Spatial and Graph (BDSG)
Property Graph feature.

This blog post will guide
you through the process of loading instances of Big Data Oracle Property Graph
into Spark in order to query them using SPARK
SQL. We assume, in this case, graph is stored in Apache HBase.

Throughout this blog
we will use Scala 10.4 and Spark 1.6 (included in CDH 5.7 ). One advantage of
using Scala is that you can simply copy-paste the provided snippets of code
into Spark’s shell (small modifications are required based on your setup) to start
playing with your data in Spark. We
assume that the reader has some basic knowledge of Spark and Scala.

Let's start with a property
graph named socialNet stored in Apache HBase using Oracle Big Data Spatial and
Graph. This graph describes the
relationships among a set of individuals, see Figure 1. As figure shows, each vertex is identified by
an ID and has a single property ( name ). And each edge in the graph has an
ID, a label, and incoming and outgoing vertices .

BDSG Property Graph
for Apache HBase stores the vertices and edges information of a graph into two
main HBase tables: <graph_name>VT. and <graph_name>GE., respectively. Thus socialnetVT. and
socialnetGE. are the table names in
HBase containing the information of vertices and edges of our example graph.

To execute SQL queries over property graph data
using Spark, we need to create a Spark application that loads the graph data
into RDDs. We will create an RDD for each table (socialNetVT. and
socialNetGE.). To create our Spark
application, we need first to import the following libraries:

import org.apache.spark._

import org.apache.spark.SparkContext._

import org.apache.spark.sql.SQLContext

import org.apache.spark.storage.StorageLevel

import scala.collection.mutable.HashMap

import collection.JavaConversions._

import scala.io.Source

import org.apache.spark.sql.DataFrame

Additionally we need the following libraries to
read rows out of HBase tables

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}

import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor,
CellUtil, Cell }

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.util.Bytes

The first thing a Spark program
must do is to create a SparkContext object which is required to create RDDs.

The following snippet of code
describes how to get a Spark context in Scala specifying the application name
and the master:

val SC = new SparkContext(

new SparkConf()

.setAppName("Example")

.setMaster("spark://localhost:7077"))

val sqlCtx = new SQLContext(sc)

Since our purpose is running
SQL queries on our graph data we also need to create a SQLContext. Notice that If we are using Spark’s shell we can skip
this step since Spark’s shell will take care of doing it for us.

Using the Spark
Context that we just created we can easily get an RDD out of a Hadoop file by
invoking the
newAPIHadoopRDD method. The
first step is to create Hadoop
Configuration objects to access the vertices and edges stored in Apache HBase. Note
that the file’s format must meet Hadoop's
InputFormat
specification. Therefore we can easily read the data from our socialNetVT. and
socialNetGE. tables using this method. A Configuration object specifies the parameters used to
connect to Apache HBase such as the Zookeeper quorum, Zookeper Client port, and
table name.

@transient val hBaseConfNodes = HBaseConfiguration.create()

hBaseConfNodes.set(TableInputFormat.INPUT_TABLE, "socialNetVT.")

hBaseConfNodes.set("hbase.zookeeper.quorum", "node041,node042,node043")

hBaseConfNodes.set("hbase.zookeeper.port", "2181")

@transient
val hBaseConfEdges =
HBaseConfiguration.create()

hBaseConfEdges.set(TableInputFormat.INPUT_TABLE,
"socialNetGE.")

hBaseConfEdges.set("hbase.zookeeper.quorum",
"node041,node042,node043")

hBaseConfEdges.set("hbase.zookeeper.port",
"2181")

We pass this
Configuration object to the
newAPIHadoopRDD. We also pass in the class of the InputFormat
and the classes of its keys and values.
We will create
a RDD (called bytesResultVertices) for the vertices table and a RDD for edges
table (bytesResultEdges). Both of them are
RDD of type RDD[ImmutableBytesWritable, Result] ,
where each Result Object contains the data of one vertex or one edge.

val bytesResultVertices =
sc.newAPIHadoopRDD(

  hBaseConfNodes,

  classOf[TableInputFormat],

  classOf[ImmutableBytesWritable],

  classOf[Result])

val bytesResultEdges =
sc.newAPIHadoopRDD(

  hBaseConfEdges,

  classOf[TableInputFormat],

  classOf[ImmutableBytesWritable],

  classOf[Result])

Let us define a
transformation res2Vertex to get the vertex information out of each
Result object in
bytesResultVertices. This simple transformation takes an HBase Result object, gets the corresponding vertex id and
name, and creates a Vertex instance out of this data. Similarly, the transformation
res2Edge does the same to get an Edge instance.

case class Vertex(Id : Long, name :
String)

case class Edge(Id : Long, source : Long,

target : Long, label : String)

def res2Vertex( res : Result) : Node = {

  val
id = Bytes.toLong(res.getRow(),8)

  val
family = CellUtil.cloneFamily(res.rawCells()(0))

  val
map = res.getFamilyMap(family)

  val
bname = Bytes.toBytes("kname")

  val
bdefault = Bytes.toBytes("null")

  val name = Bytes.toString(map.getOrElse(bname,bdefault)).trim

  Vertex(id,name)

}

def res2Edge(res : Result) : Edge = {

  val id = Bytes.toLong(res.getRow(),8)

  val family = CellUtil.cloneFamily(res.rawCells()(0))

  val map = res.getFamilyMap(family)

  val bsource = Bytes.toBytes("ao")

  val btarget = Bytes.toBytes("ai")

  val blabel =
Bytes.toBytes("al")

  val bdefault = Bytes.toBytes("null")

  val ksource = Bytes.toLong(map.getOrElse(bsource,bdefault))

  val ktarget = Bytes.toLong(map.getOrElse(btarget,bdefault))

  val klabel =
Bytes.toString(map.getOrElse(blabel,bdefault)).trim

  Edge(id,ksource,ktarget,klabel)

}

Now we can apply these
transformations to each Result object in
bytesResultVertices and bytesResultEdges to obtain the RDD[Vertex] and RDD[Edge], we do so in the following
snippet of code.

val
verticesRDD = bytesResultVertices.values.map(result => res2Node(result))

val
edgesRDD = bytesResultEdges.values.map( result => res2Edge(result))

Simple, right?

At this point we are
almost ready to run SQL queries on our graph. To do so, we need to create a DataFrame
object from our RDDs using the SQL context we defined at the beginning and
method 
createDataFrame. Case classes Vertex and Edge play an important
role here since Spark uses them in order to figure out the dataframe’s column
names. After creating dataframes we save
them as temporary tables.

val verticesDF =
sqlCtx.createDataFrame(verticesRDD)

verticesDF.registerTempTable("VERTICES_TABLE")

val edgesDF
= sqlCtx.createDataFrame(edgesRDD)

edgesDF.registerTempTable("EDGES_TABLE")

Now we are set to run queries.
For instance, let us find all Hugo's friends.

sqlCtx.sql("select name from

(select target from EDGES_TABLE WHERE
source = 1) REACHABLE

left join VERTICES_TABLE on
VERTICES_TABLE.id = REACHABLE.target ").show

So that's it, folks!

In addition to reading
out graph data directly from Apache HBase and perform operations on the graph
in Apache Spark, BDSG also has a very useful feature that allows one to use
in-memory analyst to analyze graph data in Apache Spark. See Section 6.14 in [1] for details.

Acknowledgement: thanks Jay Banerjee for his input on this blog post.

[1] http://docs.oracle.com/bigdata/bda46/BDSPA/using-in-memory-analyst.htm#BDSPA362

Join the discussion

Comments ( 2 )
  • guest Monday, April 24, 2017

    Its an awesome post and is quite meaningful.


  • guest Monday, April 24, 2017

    Its an awesome post and is quite meaningful.


Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.Captcha