X

An Oracle blog about Oracle Coherence

Hadoop MapReduce and Coherence - A Perfect Match

Guest Author

Hadoop MapReduce(M/R) is the most popular programming model for processing large data sets with a parallel, distributed algorithm on an HDFS cluster. Coherence is the market leading In-Memory Data Grid. While Hadoop works fine for large processing
operations, i.e. requiring many TB of data, that can be processed in a batch
like way, there are use cases where the
processing requirements are more real-time and the data volumes are smaller,
where Coherence is a better choice than HDFS for storing the data.

So while I was trying to
get familiar with Hadoop M/R , I realized it should not be too difficult to
combine the two. The solution presented
below suggests using the standard Hadoop M/R API to process data stored in a Coherence data grid as if it was stored
on an HDFS cluster. With that the Hadoop WordCount example could look something
like:

import com.oracle.coherence.mapreduce.JobClient;

... 

JobConf
conf =
new JobConf(WordCount.class);            

conf.setJobName("coherence-wordcount");

conf.setOutputKeyClass(Text.class);     

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map. font-family: Consolas; line-height: 70%;">class);

conf.setCombinerClass(Reduce.class);

conf.setReducerClass(Reduce.class);     

conf.setInputFormat(CoherenceInputFormat.class);

conf.setOutputFormat(CoherenceOutputFormat.class);

CoherenceInputFormat.setInputPaths(conf,new Path("coherence://wordcount-Input");

CoherenceOutputFormat.setOutputPath(conf,new Path(">"coherence://wordcount-Result"));

  RunningJob job = 
JobClient.runJob(conf);

The only change we made to the original Hadoop
example,  was to use CoherenceInputFormat and CoherenceOutputFormat classes and  point the input and
output paths to Coherence caches instead of HDFS. Everything else is identical
to the Hadoop M/R API, including the user-defined Map and Reduce classes. To run this M/R job, there is no need to setup and run a Hadoop cluster, all you need is a Coherence cluster.

Under the Hood

At first, Entry Processors and
Aggregators looked like the natural choice for implementing the mapper and reducer. However both require a single member/thread to process the results, thus limiting the applicability of the solution to those cases where the mapper/aggregator  result sets can fit into a single member heap (see
Chrisitan
Felde excellent blog on Coherence M/R
). To work around this limitation, the solution uses Distributed Invocation Services with member affinity to implement the Mapper and Reducer functionality. From there, it was just a matter of implementing a couple
of invocation services and a few Hadoop interfaces on top of Coherence caches

Fig-1

Fig-1

JobClient

In Hadoop, JobClient is the primary interface for the user-job
to interact with the cluster. JobClient provides facilities to submit jobs, track their
progress, etc. Normally the user creates the
application, describes various facets of the job via JobConf and then
uses the JobClient to
submit the job and monitor its progress.

The solution provides
it’s own implementation of the JobClient Hadoop class.
It expects
that the input data for the “Map” phase is already stored on a Coherence
InputCache. 
JobClient delegates the work to the RunningJob class which' orchestrates the execution of the Mapper and Reducer services.Both the Mapper and the
Reducer communicate progress and status information through the JobInfo Coherence
cache. 

MapperService

The MapperService is implemented as a Coherence
Async InvocationService, running on all
storage enabled members of the Coherence cluster. 
To implement In-Situ processing of data, each member running the MapperService, uses the CoherenceRecordReader to processes
only InputCache entries stored on that member. A 
MemberPartitionsListener is used to keep track of the
ParitionSet per cluster member and a PartitionFilter to retrieve only the
relevant local entries.

<distributed-scheme>

            <scheme-name>dist-default</scheme-name>

<partition-listener>

              <class-name>com.oracle.coherence.mapreduce.listeners.MemberPartitionsListener</class-name>

       </partition-listener>

Fig-2

The MapperService is multi-threaded, and each thread executes the MapperTask to process a  subset of the entries stored on that member. The MapperTask invokes the user-defined “Map” class for each InputCache entry, passing it a Coherence based
implementation of a Hadoop OutputCollector
The OutputCollector buffers the entries emitted by the Mapper class in memory, and flushes those to the MapperOutputCache
when it reaches a predefined size.

The “MapperOutputCache
is configured with Parittion Affinity so
that all entries with the same emitted key are stored in the same partition.
Having partition affinity, there is no need to reshuffle the data before
running the Reducer.

The OutputCollector also creates an entry for
each unique emitted key in the MapperUniqueKeysCache. As with the MapperCache, the
MapperUniqueKeysCache has Partition
Affinity defined with the same association key, so that a master entry and all
it’s associated MapperCache entries will be stored on the same coherence
partition

When a
user-defined “Combine” class is configured for the job, the OutputCollector
will locally apply the “Combine” class before flushing the buffer to the MapperCache,
thus reducing the number of entries that will be processed by the reducer.

ReducerService

The ReducerService
is implemented as a Coherence Async InvocationService, running on all
storage enabled members of the cluster. Each instance of the service only
reduces intermediate cache entries that are locally stored on the member
running the service instance.

The ReducerService iterates through the list of unique keys emitted by the Mapper to the MapperUniqueKeysCache.
For each unique key, we use a KeyAssiciatedFilter
to find all MapperOutputCache entries having the same unique key. The result set is then passed to the user-defined “Reduce” class, together with a coherence based implementation of a Hadoop OutputCollector
class. 
The “Reduce” class then uses the OutputCollector
to (transparently) emit its result to the
Coherence 
ResultCache.


Fig-3

Installing the demo

The demo solution  ships with 2 samples,  WordCount and StandardDeviation . To run the samples:

  1. Download the solution and unzip it.
  2. The solution includes all the dependencies jars (hadoop, coherence, etc..) in the lib directory
  3. Edit the setEnv.cmd and modify the relevant environment variables.
  4. Execute the cache-server.cmd script to start one or more Coherence cache servers

WordCount Sample 

The WordCount sample. is the most popular MapReduce. The sample first  popules the cache with about 100,000 lines of text. The mapper then tokenize the lines and emmits a key value pair with the word as the key and a value of 1. The reducer then aggregates all the 1 for the same word

To run the sample, execute the run-wordcount.cmd

2013-11-13 23:27:10.868/16.038 Oracle Coherence GE 3.7.1.1 <Info> (thread=Invocation:InvocationService, member=5): CoherenceMapReduce:JobId=1660803 JobClient:ReducerObserver - Member 1 Completed :  Machine=MBENOLIE-IL Process=8148

Job Completed

**** Job Summary:

**** Job Id: 1660803

**** Job Name: wordcount

**** Mapper ****

------------Input Cache Size: 100000

------------Duration: 4036 ms

**** Reducer ****

------------Intermediate Cache Size: 162164

------------Unique Entries Size: 11108

------------Duration: 3964 ms

Standard Deviation Sample

The standard deviation is one of the popsample uses Mapreduce to compute the standard deviation of a sample, based on the formula in fig-4.

Fig-4

Fig-4

For each value in the sample, the Mapper  emits a triple consisting of the value itself X, X2 and the number 1.

The reducer/Combiner  adds up the emitted values and produce the result entries : N,sum(X2),(sum(X))2

 To run the sample , run the run-stddev.cmd script. The following output is produced.


2013-11-13 23:44:55.818/6.953 Oracle Coherence GE 3.7.1.1 (thread=Invocat
ion:InvocationService, member=6): CoherenceMapReduce:JobId=114742004 JobClient:R
educerObserver - Member 1 Completed : Machine=MBENOLIE-IL Process=8148

Final Calculation : entries in outputcache=3

Count = 20000.0

Sum = 1327351.1950964248

Sum of Sqrt = 8.83890996020529E7

Job Completed

Standard deviation= 3.8474319265580275

**** Job Summary:

**** Job Id: 114742004

**** Job Name: StandardDeviation

**** Mapper ****

------------Input Cache Size: 20000

------------Duration: 313 ms

**** Reducer ****

------------Intermediate Cache Size: 60

------------Unique Entries Size: 3

------------Duration: 51 ms

Tracking the M/R jobs 

a very primitive console is provided. Execute the console-mapreduce.cmd script, to launch the console. you can use it to track jobs progress  and display  the job  results (i.e the list of words that were counted). 

Summary

Leveraging existing powerful Coherence functionality, the solution combines the Real-Time/Fast-Data nature of Coherence In-Memory Data Grids with the popular Hadoop MapReduce API., giving developers the freedom to choose the most appropriate data-store while preserving their existing M/R assets.

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.