X

The Oracle NoSQL Database Blog covers all things Oracle NoSQL Database. On-Prem, Cloud and more.

  • December 2, 2014

Using Nosql Tables with Spark

This post goal is to explain how to use Nosql tables and how to put their content into a file on hdfs using the java API for Spark. In hdfs, the table content will be presented in a comma separated style (CSV).

Oracle (latest) Big Data Appliance "X4-2", offers Cloudera Enterprise Technology software including Cloudera CDH, and Oracle NoSql database including tables.

The Cloudera part offers several ways of integration with Spark (see Using Nosql and Spark) : Standalone or via Yarn (see Running Spark Applications)

The Nosql part allows the use of tables. Tables can be defined within the Nosql console by issuing the following command:

java -Xmx256m -Xms256m -jar $KVHOME/lib/kvstore.jar runadmin -host <host> -port <store port> -store <store name>

There are two parts for defining and creating a table. Define which includes table name, table fields, primary key and shared-key which is a "prefix" of the primary key, ends with the keyword "exit"

table create -name flightTestExtract

add-field -name param -type STRING

add-field -name flight -type STRING

add-field -name timeref -type LONG

add-field -name value -type INTEGER

primary-key -field timeref -field param -field flight 

shard-key -field timeref

exit

Plan which allows table creation and index definition and creation:

plan add-table -wait -name flightTestExtract

plan add-index -wait -name flightIndex -table  flightTestExtract -field flight -field param -field timeref

plan add-index -wait -name paramIndex -table  flightTestExtract -field param -field flight -field timeref

Inserting into the table can be done by the put command as:

put table -name flightTestExtract -json "{\"param\":\"11\",\"flight\":\"8\",\"timeref\":61000000000002,\"value\":1764248535}"

put table -name flightTestExtract -json "{\"param\":\"12\",\"flight\":\"8\",\"timeref\":61000000000002,\"value\":-1936513330}"

put table -name flightTestExtract -json "{\"param\":\"11\",\"flight\":\"6\",\"timeref\":61000000000013,\"value\":1600130521}"

put table -name flightTestExtract -json "{\"param\":\"11\",\"flight\":\"8\",\"timeref\":61000000000013,\"value\":478674806}"

The last patch of Nosql, 3.1.7, has some new java classes that could be used to get table data into hadoop. The class oracle.kv.hadoop.table.TableInputFormat can be used as a Spark JavaRDD:

JavaPairRDD<PrimaryKey, Row> jrdd = sc.newAPIHadoopRDD(hconf, TableInputFormat.class, PrimaryKey.class, Row.class);

The oracle.kv.table.PrimaryKey.class correspond to the fields of the primary key of the table, for example in json style:

{"timeref":61000000000013, "param":"11","flight":"8"}

The oracle.kv.table.Row.class correspond to the fields of table row, for example in json style:

{"param":"11","flight":"8","timeref":61000000000013,"value":478674806}

If we want to save the content of the table on hdfs in a csv style we have to:

  • apply a flatMap on the rows of the RDD 
    flatMap(func) each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). 
  • save the result on hdfs

The following inner class defines the map:

     static class FlatMapRow_Str implements FlatMapFunction<Row, String> {

        @Override

        public Iterable<String> call(Row s) {

            List<String> lstr = s.getFields();

            String tabedValues = "";

            for (String field : lstr)

                tabedValues += s.get(field) + ",";

            return Arrays.asList(tabedValues);

        }

    }

The code to do the job is: 

//Obtain the Row RDD       

JavaRDD<Row> rddvalues = jrdd.values();

//Obtain the csv style form of the RDD 

JavaRDD<String> csvStr = rddvalues .flatMap(new FlatMapRow_Str());

//Save the results on hdfs 

csvStr.saveAsTextFile(pathPrefix + "/" + tableName + "csvStr");

The last step is to test using Yarn:

spark-submit --master yarn --jars /u01/nosql/kv-ee/lib/kvclient.jar --class table.SparkNosqlTable2HadoopBlog /u01/nosql/kv-ee/examples/table/deploy/sparktables.jar <nosql store name> <nosql store url> <table name> <path prefix>

<nosql store url> is <store host>:<store port> 

You can get the java source code here

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.