Coherence is best known as a data grid, providing distributed caching with an ability to move processing to the data in the grid. Less well known is the fact that Coherence also has the ability to function as a compute grid, distributing work across multiple servers in a cluster. In this entry, which was co-written with my colleague Utkarsh Nadkarni, we will look at using Coherence as a compute grid through the use of the Work Manager API and compare it to manipulating data directly in the grid using Entry Processors.
The Coherence documentation identifies several methods for distributing work across the cluster, see Processing Data in a Cache. They can be summarized as:
The previous section listing the distributed computing options in Coherence shows that there are 3 distinct execution models:
The entry processor is good for operating on individual cache entries. It is not so good for working on groups of cache entries.
The invocation service is good for performing checks on a node, but is limited in its parallelism.
The work manager is good for operating on groups of related entries in the cache or performing non-cache related work in parallel. It has a high degree of parallelism.
As you can see the primary choice for distributed computing comes down to the Work Manager and the Entry Processor.
|Aspect||Entry Processors||Work Managers|
|Degree of parallelization||Is a function of the number of Coherence nodes. EntryProcessors are run concurrently across all nodes in a cluster. However, within each node only one instance of the entry processor executes at a time.||Is a function of the number of Work Manager threads. The Work is run concurrently across all threads in all Work Manager instances.|
|Transactionality||Transactional. If an EntryProcessor running on one node does not complete (say, due to that node crashing), the entries targeted will be executed by an EntryProcessor on another node.||Not transactional. The specification does not explicitly specify what the response should be if a remote server crashes during an execution. Current implementation uses WORK_COMPLETED with WorkCompletedException as a result. In case a Work does not run to completion, it is the responsibility of the client to resubmit the Work to the Work Manager.|
|How is the Cache accessed or mutated?||Operations against the cache contents are executed by (and thus within the localized context of) a cache.||Accesses and changes to the cache are done directly through the cache API.|
|Where is the processing performed?||In the same JVM where the entries-to-be-processed reside.||In the Work Manager server. This may not be the same JVM where the entries-to-be-processed reside.|
|Network Traffic||Is a function of the size of the EntryProcessor. Typically, the size of an EntryProcessor is much smaller than the size of the data transferred across nodes in the case of a Work Manager approach. This makes the EntryProcessor approach more network-efficient and hence more scalable. One EntryProcessor is transmitted to each cache node.||Is a function of the |
|Distribution of “Tasks”||Tasks are moved to the location at which the entries-to-be-processed are being managed. This may result in a random distribution of tasks. The distribution tends to get equitable as the number of entries increases.||Tasks are distributed equally across the threads in the Work Manager Instances.|
|Implementation of the EntryProcessor or Work class.||Create a class that extends AbstractProcessor. Implement the process method. Update the cache item based on the key passed in to the process method.||Create a class that is serializable and implements commonj.work.Work. Implement the run method.|
|Implementation of “Task”||In the process method, update the cache item based on the key passed into the process method.||In the run method, do the following: |
|Completion Notification||When the NamedCache.invoke method completes then all the entry processors have completed executing.||When a task is submitted for execution it executes asynchronously on the work manager threads in the cluster. Status may be obtained by registering a commonj.work.WorkListener class when calling the WorkManager.schedule method. This will provide updates when the Work is accepted, started and completed or rejected. Alternatively the WorkManager.waitForAll and WorkManager.waitForAny methods allow blocking waits for either all or one result respectively.|
|Returned Results||java.lang.Object – when executed on one cache item. This returns result of the invocation as returned from the EntryProcessor.|
java.util.Map – when executed on a collection of keys. This returns a Map containing the results of invoking the EntryProcessor against each of the specified keys.
|commonj.work.WorkItem - There are three possible outcomes |
|Error Handling||Failure of a node results in all the work assigned to that node being executed on the new primary. This may result in some work being executed twice, but Coherence ensures that the cache is only updated once per item.||Failure of a node results in the loss of scheduled tasks assigned to that node. Completed tasks are sent back to the client as they complete.|
Entry processors have excellent error handling within Coherence. Work Managers less so. In order to provide resiliency on node failure I implemented a “RetryWorkManager” class that detects tasks that have failed to complete successfully and resubmits them to the grid for another attempt.
A JDeveloper project with the RetryWorkManager is available for download here. It includes sample code to run a simple task across multiple work manager threads.
To create a new RetryWorkManager that will retry failed work twice then you would use this:
WorkManager = new RetryWorkManager("WorkManagerName", 2); // Change for number of retries, if no retry count is provided then the default is 0.You can control the number of retries at the individual work level as shown below:
WorkItem workItem = schedule(work); // Use number of retries set at WorkManager creationCurrently the RetryWorkManager defaults to having 0 threads. To change use this constructor:
WorkItem workItem = schedule(work, workListener); // Use number of retries set at WorkManager creation
WorkItem workItem = schedule(work, 4); // Change number of retries
WorkItem workItem = schedule(work, workListener, 4); // Change number of retries
WorkItem workItem = schedule(work, workListener, 3, 4); // Change number of threads (3) and retries (4)Note that none of this sample code is supported by Oracle in any way, and is provided purely as a sample of what can be done with Coherence.
The RetryWorkManager delegates most operations to a Coherence WorkManager instance. It creates a WorkManagerListener to intercept status updates. On receiving a WORK_COMPLETED callback the listener checks the result to see if the completion is due to an error. If an error occurred and there are retries left then the work is resubmitted. The WorkItem returned by scheduling an event is wrapped in a RetryWorkItem. This RetryWorkItem is updated with a new Coherence WorkItem when the task is retried. If the client registers a WorkManagerListener then the RetryWorkManagerListener delegates non-retriable events to the client listener. Finally the waitForAll and waitForAny methods are modified to deal with work items being resubmitted in the event of failure.
The downloadable project contains sample code for running the work manager and an entry processor.
The demo implements a 3-tier architecture
The task stores the number of times it has been executed in the cache, so multiple runs will see the counter incrementing. The choice between EntryProcessor and WorkManager is controlled by changing the value of USE_ENTRY_PROCESSOR between false and true in the RunWorkManagerClient.cmd script.
The SetWorkManagerEnv.cmd script should be edited to point to the Coherence home directory and the Java home directory.
If you need to perform operations on cache entries and don’t need to have cross-checks between the entries then the best solution is to use an entry processor. The entry processor is fault tolerant and updates to the cached entity will be performed once only.
If you need to perform generic work that may need to touch multiple related cache entries then the work manager may be a better solution. The extensions I created in the RetryWorkManager provide a degree of resiliency to deal with node failure without impacting the client.