So while I was trying to
get familiar with Hadoop M/R , I realized it should not be too difficult to
combine the two. The solution presented
below suggests using the standard Hadoop M/R API to process data stored in a Coherence data grid as if it was stored
on an HDFS cluster. With that the Hadoop WordCount example could look something
conf = new JobConf(WordCount.class);
conf.setMapperClass(Map. font-family: Consolas; line-height: 70%;">class);
RunningJob job =
The only change we made to the original Hadoop
example, was to use CoherenceInputFormat and CoherenceOutputFormat classes and point the input and
output paths to Coherence caches instead of HDFS. Everything else is identical
to the Hadoop M/R API, including the user-defined Map and Reduce classes. To run this M/R job, there is no need to setup and run a Hadoop cluster, all you need is a Coherence cluster.
At first, Entry Processors and
Aggregators looked like the natural choice for implementing the mapper and reducer. However both require a single member/thread to process the results, thus limiting the applicability of the solution to those cases where the mapper/aggregator result sets can fit into a single member heap (see Chrisitan
Felde excellent blog on Coherence M/R). To work around this limitation, the solution uses Distributed Invocation Services with member affinity to implement the Mapper and Reducer functionality. From there, it was just a matter of implementing a couple
of invocation services and a few Hadoop interfaces on top of Coherence caches.
In Hadoop, JobClient is the primary interface for the user-job
to interact with the cluster. JobClient provides facilities to submit jobs, track their
progress, etc. Normally the user creates the
application, describes various facets of the job via JobConf and then
uses the JobClient to
submit the job and monitor its progress.
The solution provides
it’s own implementation of the JobClient Hadoop class. It expects
that the input data for the “Map” phase is already stored on a Coherence
InputCache. JobClient delegates the work to the RunningJob class which' orchestrates the execution of the Mapper and Reducer services.Both the Mapper and the
Reducer communicate progress and status information through the JobInfo Coherence
The MapperService is implemented as a Coherence
Async InvocationService, running on all
storage enabled members of the Coherence cluster. To implement In-Situ processing of data, each member running the MapperService, uses the CoherenceRecordReader to processes
only InputCache entries stored on that member. A MemberPartitionsListener is used to keep track of the ParitionSet per cluster member and a PartitionFilter to retrieve only the
relevant local entries.
The MapperService is multi-threaded, and each thread executes the MapperTask to process a subset of the entries stored on that member. The MapperTask invokes the user-defined “Map” class for each InputCache entry, passing it a Coherence based
implementation of a Hadoop OutputCollector. The OutputCollector buffers the entries emitted by the Mapper class in memory, and flushes those to the MapperOutputCache
when it reaches a predefined size.
is configured with Parittion Affinity so
that all entries with the same emitted key are stored in the same partition.
Having partition affinity, there is no need to reshuffle the data before
running the Reducer.
The OutputCollector also creates an entry for
each unique emitted key in the MapperUniqueKeysCache. As with the MapperCache, the
MapperUniqueKeysCache has Partition
Affinity defined with the same association key, so that a master entry and all
it’s associated MapperCache entries will be stored on the same coherence
user-defined “Combine” class is configured for the job, the OutputCollector
will locally apply the “Combine” class before flushing the buffer to the MapperCache,
thus reducing the number of entries that will be processed by the reducer.
is implemented as a Coherence Async InvocationService, running on all
storage enabled members of the cluster. Each instance of the service only
reduces intermediate cache entries that are locally stored on the member
running the service instance.
The ReducerService iterates through the list of unique keys emitted by the Mapper to the MapperUniqueKeysCache.
For each unique key, we use a KeyAssiciatedFilter
to find all MapperOutputCache entries having the same unique key. The result set is then passed to the user-defined “Reduce” class, together with a coherence based implementation of a Hadoop OutputCollector
class. The “Reduce” class then uses the OutputCollector
to (transparently) emit its result to the
The demo solution ships with 2 samples, WordCount and StandardDeviation . To run the samples:
The WordCount sample. is the most popular MapReduce. The sample first popules the cache with about 100,000 lines of text. The mapper then tokenize the lines and emmits a key value pair with the word as the key and a value of 1. The reducer then aggregates all the 1 for the same word
To run the sample, execute the run-wordcount.cmd
2013-11-13 23:27:10.868/16.038 Oracle Coherence GE 126.96.36.199 <Info> (thread=Invocation:InvocationService, member=5): CoherenceMapReduce:JobId=1660803 JobClient:ReducerObserver - Member 1 Completed : Machine=MBENOLIE-IL Process=8148
**** Job Summary:
**** Job Id: 1660803
**** Job Name: wordcount
**** Mapper ****
------------Input Cache Size: 100000
------------Duration: 4036 ms
**** Reducer ****
------------Intermediate Cache Size: 162164
------------Unique Entries Size: 11108
------------Duration: 3964 ms
Standard Deviation Sample
The standard deviation is one of the popsample uses Mapreduce to compute the standard deviation of a sample, based on the formula in fig-4.
For each value in the sample, the Mapper emits a triple consisting of the value itself X, X2 and the number 1.
The reducer/Combiner adds up the emitted values and produce the result entries : N,sum(X2),(sum(X))2
To run the sample , run the run-stddev.cmd script. The following output is produced.
2013-11-13 23:44:55.818/6.953 Oracle Coherence GE 188.8.131.52
ion:InvocationService, member=6): CoherenceMapReduce:JobId=114742004 JobClient:R
educerObserver - Member 1 Completed : Machine=MBENOLIE-IL Process=8148
A a very primitive console is provided. Execute the console-mapreduce.cmd script, to launch the console. you can use it to track jobs progress and display the job results (i.e the list of words that were counted).
Leveraging existing powerful Coherence functionality, the solution combines the Real-Time/Fast-Data nature of Coherence In-Memory Data Grids with the popular Hadoop MapReduce API., giving developers the freedom to choose the most appropriate data-store while preserving their existing M/R assets.