Thursday Mar 27, 2014

Not Just a Cache

Coherence as a Compute Grid

Coherence is best known as a data grid, providing distributed caching with an ability to move processing to the data in the grid.  Less well known is the fact that Coherence also has the ability to function as a compute grid, distributing work across multiple servers in a cluster.  In this entry, which was co-written with my colleague Utkarsh Nadkarni, we will look at using Coherence as a compute grid through the use of the Work Manager API and compare it to manipulating data directly in the grid using Entry Processors.

Coherence Distributed Computing Options

The Coherence documentation identifies several methods for distributing work across the cluster, see Processing Data in a Cache.  They can be summarized as:

  • Entry Processors
    • An InvocableMap interface, inherited by the NamedCache interface, provides support for executing an agent (EntryProcessor or EntryAggregator) on individual entries within the cache.
    • The entries may or may not exist, either way the agent is executed once for each key provided, or if no key is provided then it is executed once for each object in the cache.
    • In Enterprise and Grid editions of Coherence the entry processors are executed on the primary cache nodes holding the cached entries.
    • Agents can return results.
    • One agent executes multiple times per cache node, once for each key targeted on the node.
  • Invocation Service
    • An InvocationService provides support for executing an agent on one or more nodes within the grid.
    • Execution may be targeted at specific nodes or at all nodes running the Invocation Service.
    • Agents can return results.
    • One agent executes once per node.
  • Work Managers
    • A WorkManager class provides a grid aware implementation of the commonJ WorkManager which can be used to run tasks across multiple threads on multiple nodes within the grid.
    • WorkManagers run on multiple nodes.
    • Each WorkManager may have multiple threads.
    • Tasks implement the Work interface and are assigned to specific WorkManager threads to execute.
    • Each task is executed once.

Three Models of Distributed Computation

The previous section listing the distributed computing options in Coherence shows that there are 3 distinct execution models:

  • Per Cache Entry Execution (Entry Processor)
    • Execute the agent on the entry corresponding to a cache key.
    • Entries processed on a single thread per node.
    • Parallelism across nodes.
  • Per Node Execution (Invocation Service)
    • Execute the same agent once per node.
    • Agent processed on a single thread per node.
    • Parallelism across nodes.
  • Per Task Execution (Work Manager)
    • Each task executed once.
    • Parallelism across nodes and across threads within a node.

The entry processor is good for operating on individual cache entries.  It is not so good for working on groups of cache entries.

The invocation service is good for performing checks on a node, but is limited in its parallelism.

The work manager is good for operating on groups of related entries in the cache or performing non-cache related work in parallel.  It has a high degree of parallelism.

As you can see the primary choice for distributed computing comes down to the Work Manager and the Entry Processor.

Differences between using Entry Processors and Work Managers in Coherence

Aspect Entry Processors Work Managers
Degree of parallelization Is a function of the number of Coherence nodes. EntryProcessors are run concurrently across all nodes in a cluster. However, within each node only one instance of the entry processor executes at a time. Is a function of the number of Work Manager threads. The Work is run concurrently across all threads in all Work Manager instances.
Transactionality Transactional. If an EntryProcessor running on one node does not complete (say, due to that node crashing), the entries targeted will be executed by an EntryProcessor on another node. Not transactional. The specification does not explicitly specify what the response should be if a remote server crashes during an execution. Current implementation uses WORK_COMPLETED with WorkCompletedException as a result. In case a Work does not run to completion, it is the responsibility of the client to resubmit the Work to the Work Manager.
How is the Cache accessed or mutated? Operations against the cache contents are executed by (and thus within the localized context of) a cache. Accesses and changes to the cache are done directly through the cache API.
Where is the processing performed? In the same JVM where the entries-to-be-processed reside. In the Work Manager server. This may not be the same JVM where the entries-to-be-processed reside.
Network Traffic Is a function of the size of the EntryProcessor. Typically, the size of an EntryProcessor is much smaller than the size of the data transferred across nodes in the case of a Work Manager approach. This makes the EntryProcessor approach more network-efficient and hence more scalable. One EntryProcessor is transmitted to each cache node. Is a function of the
  • Number of Work Objects, of which multiple may be sent to each server.
  • Size of the data set transferred from the Backing Map to the Work Manager Server.
Distribution of “Tasks” Tasks are moved to the location at which the entries-to-be-processed are being managed. This may result in a random distribution of tasks. The distribution tends to get equitable as the number of entries increases. Tasks are distributed equally across the threads in the Work Manager Instances.
Implementation of the EntryProcessor or Work class. Create a class that extends AbstractProcessor. Implement the process method. Update the cache item based on the key passed in to the process method. Create a class that is serializable and implements commonj.work.Work. Implement the run method.
Implementation of “Task” In the process method, update the cache item based on the key passed into the process method. In the run method, do the following:
  • Get a reference to the named cache
  • Do the Work – Get a reference to the Cache Item; change the cache item; put the cache item back into the named cache.
Completion Notification When the NamedCache.invoke method completes then all the entry processors have completed executing. When a task is submitted for execution it executes asynchronously on the work manager threads in the cluster.  Status may be obtained by registering a commonj.work.WorkListener class when calling the WorkManager.schedule method.  This will provide updates when the Work is accepted, started and completed or rejected.  Alternatively the WorkManager.waitForAll and WorkManager.waitForAny methods allow blocking waits for either all or one result respectively.
Returned Results java.lang.Object – when executed on one cache item. This returns result of the invocation as returned from the EntryProcessor.
java.util.Map – when executed on a collection of keys. This returns a Map containing the results of invoking the EntryProcessor against each of the specified keys.
commonj.work.WorkItem - There are three possible outcomes
  • The Work is not yet complete. In this case, a null is returned by WorkItem.getResult.
  • The Work started but completed with an exception. This may have happened due to a Work Manager Instance terminating abruptly. This is indicated by an exception thrown by WorkItem.getResult.
  • The Work Manager instance indicated that the Work is complete and the Work ran to completion. In this case, WorkItem.getResult returns a non-null and no exception is thrown by WorkItem.getResult.
Error Handling Failure of a node results in all the work assigned to that node being executed on the new primary. This may result in some work being executed twice, but Coherence ensures that the cache is only updated once per item. Failure of a node results in the loss of scheduled tasks assigned to that node. Completed tasks are sent back to the client as they complete.

Fault Handling Extension

Entry processors have excellent error handling within Coherence.  Work Managers less so.  In order to provide resiliency on node failure I implemented a “RetryWorkManager” class that detects tasks that have failed to complete successfully and resubmits them to the grid for another attempt.

A JDeveloper project with the RetryWorkManager is available for download here.  It includes sample code to run a simple task across multiple work manager threads.

To create a new RetryWorkManager that will retry failed work twice then you would use this:

WorkManager = new RetryWorkManager("WorkManagerName", 2);  // Change for number of retries, if no retry count is provided then the default is 0.
You can control the number of retries at the individual work level as shown below:
WorkItem workItem = schedule(work); // Use number of retries set at WorkManager creation
WorkItem workItem = schedule(work, workListener); // Use number of retries set at WorkManager creation
WorkItem workItem = schedule(work, 4); // Change number of retries
WorkItem workItem = schedule(work, workListener, 4); // Change number of retries
Currently the RetryWorkManager defaults to having 0 threads.  To change use this constructor:
WorkItem workItem = schedule(work, workListener, 3, 4); // Change number of threads (3) and retries (4)
Note that none of this sample code is supported by Oracle in any way, and is provided purely as a sample of what can be done with Coherence.

How the RetryWorkManager Works

The RetryWorkManager delegates most operations to a Coherence WorkManager instance.  It creates a WorkManagerListener to intercept status updates.  On receiving a WORK_COMPLETED callback the listener checks the result to see if the completion is due to an error.  If an error occurred and there are retries left then the work is resubmitted.  The WorkItem returned by scheduling an event is wrapped in a RetryWorkItem.  This RetryWorkItem is updated with a new Coherence WorkItem when the task is retried.  If the client registers a WorkManagerListener then the RetryWorkManagerListener delegates non-retriable events to the client listener.  Finally the waitForAll and waitForAny methods are modified to deal with work items being resubmitted in the event of failure.

Sample Code for EntryProcessor and RetryWorkManager

The downloadable project contains sample code for running the work manager and an entry processor.

The demo implements a 3-tier architecture

  1. Coherence Cache Servers
    • Can be started by running RunCacheServer.cmd
    • Runs a distributed cache used by the Task to be executed in the grid
  2. Coherence Work Manager Servers
    • Can be started by running RunWorkManagerServer.cmd
    • Takes no parameters
    • Runs two threads for executing tasks
  3. Coherence Work Manager Clients
    • Can be started by running RunWorkManagerClient.cmd
    • Takes three parameters currently
      • Work Manager name - should be "AntonyWork" - default is "AntonyWork"
      • Number of tasks to schedule - default is 10
      • Time to wait for tasks to complete in seconds - default is 60

The task stores the number of times it has been executed in the cache, so multiple runs will see the counter incrementing.  The choice between EntryProcessor and WorkManager is controlled by changing the value of USE_ENTRY_PROCESSOR between false and true in the RunWorkManagerClient.cmd script.

The SetWorkManagerEnv.cmd script should be edited to point to the Coherence home directory and the Java home directory.

Summary

If you need to perform operations on cache entries and don’t need to have cross-checks between the entries then the best solution is to use an entry processor.  The entry processor is fault tolerant and updates to the cached entity will be performed once only.

If you need to perform generic work that may need to touch multiple related cache entries then the work manager may be a better solution.  The extensions I created in the RetryWorkManager provide a degree of resiliency to deal with node failure without impacting the client.

The RetryWorkManager can be downloaded here.

Friday Mar 02, 2012

Using Coherence with JDeveloper

Configuring JDeveloper for use with Coherence

Doing some work with Coherence again and so I needed to create some Java code calling Coherence API and edit some Coherence configuration files in JDeveloper.  The easiest way to do this is to register the Coherence jar file and the Coherence Schemas with JDeveloper, once that is done then you can use JDevelopers XML insight features to help you create the XML documents.

Register the Coherence Library

To register the Coherence jar file in JDeveloper go to “Tools->Manage Libraries…”, select “New…” and then use “Add Entry…” to add the following entries:

  • Class Path
    • <COHERENCE_HOME>\lib\coherence.jar
  • Doc Path
    • <COHERENCE_HOME>\doc\api

COHERENCE_HOME is the location where you unzipped the Coherence product.

This lets us use the Coherence API in our Java code by adding the library to our project.

Register the Schemas

To register the Coherence XML Schemas with JDevelper go to “Tools->Preferences…”, select “XML Schemas” and choose “Add…”.

Browse to the <COHERENCE_HOME>\lib\coherence.jar file and add the following schemas:

  • coherence-cache-config.xsd
  • coherence-operational-config.xsd
  • coherence-pof-config.xsd
  • coherence-report-config.xsd
  • coherence-report-group-config.xsd
  • coherence-rest-config.xsd

Now when you create an XML file for use with Coherence you can choose “XML Document from XML Schema” and choose “Use Registered Schemas” to show you suitable schemas to use for your Coherence config.

About

Musings on Fusion Middleware and SOA Picture of Antony Antony works with customers across the US and Canada in implementing SOA and other Fusion Middleware solutions. Antony is the co-author of the SOA Suite 11g Developers Cookbook, the SOA Suite 11g Developers Guide and the SOA Suite Developers Guide.

Search

Archives
« April 2014
SunMonTueWedThuFriSat
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today