Hadoop MapReduce and Coherence - A Perfect Match

Hadoop MapReduce(M/R) is the most popular programming model for processing large data sets with a parallel, distributed algorithm on an HDFS cluster. Coherence is the market leading In-Memory Data Grid. While Hadoop works fine for large processing operations, i.e. requiring many TB of data, that can be processed in a batch like way, there are use cases where the processing requirements are more real-time and the data volumes are smaller, where Coherence is a better choice than HDFS for storing the data.

So while I was trying to get familiar with Hadoop M/R , I realized it should not be too difficult to combine the two. The solution presented below suggests using the standard Hadoop M/R API to process data stored in a Coherence data grid as if it was stored on an HDFS cluster. With that the Hadoop WordCount example could look something like:

import com.oracle.coherence.mapreduce.JobClient;

... 

JobConf conf = new JobConf(WordCount.class);            

conf.setJobName("coherence-wordcount");

conf.setOutputKeyClass(Text.class);     

conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map. font-family: Consolas; line-height: 70%;">class);

conf.setCombinerClass(Reduce.class);

conf.setReducerClass(Reduce.class);     

conf.setInputFormat(CoherenceInputFormat.class);

conf.setOutputFormat(CoherenceOutputFormat.class);

CoherenceInputFormat.setInputPaths(conf,new Path("coherence://wordcount-Input");

CoherenceOutputFormat.setOutputPath(conf,new Path(">"coherence://wordcount-Result"));

  RunningJob job =  JobClient.runJob(conf);

The only change we made to the original Hadoop example,  was to use CoherenceInputFormat and CoherenceOutputFormat classes and  point the input and output paths to Coherence caches instead of HDFS. Everything else is identical to the Hadoop M/R API, including the user-defined Map and Reduce classes. To run this M/R job, there is no need to setup and run a Hadoop cluster, all you need is a Coherence cluster.

Under the Hood

At first, Entry Processors and Aggregators looked like the natural choice for implementing the mapper and reducer. However both require a single member/thread to process the results, thus limiting the applicability of the solution to those cases where the mapper/aggregator  result sets can fit into a single member heap (see Chrisitan Felde excellent blog on Coherence M/R). To work around this limitation, the solution uses Distributed Invocation Services with member affinity to implement the Mapper and Reducer functionality. From there, it was just a matter of implementing a couple of invocation services and a few Hadoop interfaces on top of Coherence caches

Fig-1

Fig-1

JobClient

In Hadoop, JobClient is the primary interface for the user-job to interact with the cluster. JobClient provides facilities to submit jobs, track their progress, etc. Normally the user creates the application, describes various facets of the job via JobConf and then uses the JobClient to submit the job and monitor its progress.

The solution provides it’s own implementation of the JobClient Hadoop class. It expects that the input data for the “Map” phase is already stored on a Coherence InputCache. JobClient delegates the work to the RunningJob class which' orchestrates the execution of the Mapper and Reducer services.Both the Mapper and the Reducer communicate progress and status information through the JobInfo Coherence cache. 

MapperService

The MapperService is implemented as a Coherence Async InvocationService, running on all storage enabled members of the Coherence cluster. To implement In-Situ processing of data, each member running the MapperService, uses the CoherenceRecordReader to processes only InputCache entries stored on that member. A MemberPartitionsListener is used to keep track of the ParitionSet per cluster member and a PartitionFilter to retrieve only the relevant local entries.

<distributed-scheme>

            <scheme-name>dist-default</scheme-name>

<partition-listener>

              <class-name>com.oracle.coherence.mapreduce.listeners.MemberPartitionsListener</class-name>

       </partition-listener>

Fig-2

The MapperService is multi-threaded, and each thread executes the MapperTask to process a  subset of the entries stored on that member. The MapperTask invokes the user-defined “Map” class for each InputCache entry, passing it a Coherence based implementation of a Hadoop OutputCollectorThe OutputCollector buffers the entries emitted by the Mapper class in memory, and flushes those to the MapperOutputCache when it reaches a predefined size.

The “MapperOutputCache” is configured with Parittion Affinity so that all entries with the same emitted key are stored in the same partition. Having partition affinity, there is no need to reshuffle the data before running the Reducer.

The OutputCollector also creates an entry for each unique emitted key in the MapperUniqueKeysCache. As with the MapperCache, the MapperUniqueKeysCache has Partition Affinity defined with the same association key, so that a master entry and all it’s associated MapperCache entries will be stored on the same coherence partition

When a user-defined “Combine” class is configured for the job, the OutputCollector will locally apply the “Combine” class before flushing the buffer to the MapperCache, thus reducing the number of entries that will be processed by the reducer.

ReducerService

The ReducerService is implemented as a Coherence Async InvocationService, running on all storage enabled members of the cluster. Each instance of the service only reduces intermediate cache entries that are locally stored on the member running the service instance.

The ReducerService iterates through the list of unique keys emitted by the Mapper to the MapperUniqueKeysCache. For each unique key, we use a KeyAssiciatedFilter to find all MapperOutputCache entries having the same unique key. The result set is then passed to the user-defined “Reduce” class, together with a coherence based implementation of a Hadoop OutputCollector class. The “Reduce” class then uses the OutputCollector to (transparently) emit its result to the Coherence ResultCache.


Fig-3

Installing the demo

The demo solution  ships with 2 samples,  WordCount and StandardDeviation . To run the samples:

  1. Download the solution and unzip it.
  2. The solution includes all the dependencies jars (hadoop, coherence, etc..) in the lib directory
  3. Edit the setEnv.cmd and modify the relevant environment variables.
  4. Execute the cache-server.cmd script to start one or more Coherence cache servers

WordCount Sample 

The WordCount sample. is the most popular MapReduce. The sample first  popules the cache with about 100,000 lines of text. The mapper then tokenize the lines and emmits a key value pair with the word as the key and a value of 1. The reducer then aggregates all the 1 for the same word

To run the sample, execute the run-wordcount.cmd

2013-11-13 23:27:10.868/16.038 Oracle Coherence GE 3.7.1.1 <Info> (thread=Invocation:InvocationService, member=5): CoherenceMapReduce:JobId=1660803 JobClient:ReducerObserver - Member 1 Completed :  Machine=MBENOLIE-IL Process=8148

Job Completed

**** Job Summary:

**** Job Id: 1660803

**** Job Name: wordcount

**** Mapper ****

------------Input Cache Size: 100000

------------Duration: 4036 ms

**** Reducer ****

------------Intermediate Cache Size: 162164

------------Unique Entries Size: 11108

------------Duration: 3964 ms

Standard Deviation Sample

The standard deviation is one of the popsample uses Mapreduce to compute the standard deviation of a sample, based on the formula in fig-4.

Fig-4

Fig-4

For each value in the sample, the Mapper  emits a triple consisting of the value itself X, X2 and the number 1.

The reducer/Combiner  adds up the emitted values and produce the result entries : N,sum(X2),(sum(X))2

 To run the sample , run the run-stddev.cmd script. The following output is produced.

2013-11-13 23:44:55.818/6.953 Oracle Coherence GE 3.7.1.1 (thread=Invocat ion:InvocationService, member=6): CoherenceMapReduce:JobId=114742004 JobClient:R educerObserver - Member 1 Completed : Machine=MBENOLIE-IL Process=8148

Final Calculation : entries in outputcache=3

Count = 20000.0

Sum = 1327351.1950964248

Sum of Sqrt = 8.83890996020529E7

Job Completed

Standard deviation= 3.8474319265580275

**** Job Summary:

**** Job Id: 114742004

**** Job Name: StandardDeviation

**** Mapper ****

------------Input Cache Size: 20000

------------Duration: 313 ms

**** Reducer ****

------------Intermediate Cache Size: 60

------------Unique Entries Size: 3

------------Duration: 51 ms

Tracking the M/R jobs 

a very primitive console is provided. Execute the console-mapreduce.cmd script, to launch the console. you can use it to track jobs progress  and display  the job  results (i.e the list of words that were counted). 

Summary

Leveraging existing powerful Coherence functionality, the solution combines the Real-Time/Fast-Data nature of Coherence In-Memory Data Grids with the popular Hadoop MapReduce API., giving developers the freedom to choose the most appropriate data-store while preserving their existing M/R assets.

Comments:

Post a Comment:
  • HTML Syntax: NOT allowed
About

Oracle Coherence, the market leading in-memory datagrid product.
Stay Connected...

Search

Archives
« April 2014
SunMonTueWedThuFriSat
  
1
2
4
5
6
7
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today