By Antony Reynolds on Mar 27, 2014
Coherence as a Compute Grid
Coherence is best known as a data grid, providing distributed caching with an ability to move processing to the data in the grid. Less well known is the fact that Coherence also has the ability to function as a compute grid, distributing work across multiple servers in a cluster. In this entry, which was co-written with my colleague Utkarsh Nadkarni, we will look at using Coherence as a compute grid through the use of the Work Manager API and compare it to manipulating data directly in the grid using Entry Processors.
Coherence Distributed Computing Options
The Coherence documentation identifies several methods for distributing work across the cluster, see Processing Data in a Cache. They can be summarized as:
- Entry Processors
- An InvocableMap interface, inherited by the NamedCache interface, provides support for executing an agent (EntryProcessor or EntryAggregator) on individual entries within the cache.
- The entries may or may not exist, either way the agent is executed once for each key provided, or if no key is provided then it is executed once for each object in the cache.
- In Enterprise and Grid editions of Coherence the entry processors are executed on the primary cache nodes holding the cached entries.
- Agents can return results.
- One agent executes multiple times per cache node, once for each key targeted on the node.
- Invocation Service
- An InvocationService provides support for executing an agent on one or more nodes within the grid.
- Execution may be targeted at specific nodes or at all nodes running the Invocation Service.
- Agents can return results.
- One agent executes once per node.
- Work Managers
- A WorkManager class provides a grid aware implementation of the commonJ WorkManager which can be used to run tasks across multiple threads on multiple nodes within the grid.
- WorkManagers run on multiple nodes.
- Each WorkManager may have multiple threads.
- Tasks implement the Work interface and are assigned to specific WorkManager threads to execute.
- Each task is executed once.
Three Models of Distributed Computation
The previous section listing the distributed computing options in Coherence shows that there are 3 distinct execution models:
- Per Cache Entry Execution (Entry Processor)
- Execute the agent on the entry corresponding to a cache key.
- Entries processed on a single thread per node.
- Parallelism across nodes.
- Per Node Execution (Invocation Service)
- Execute the same agent once per node.
- Agent processed on a single thread per node.
- Parallelism across nodes.
- Per Task Execution (Work Manager)
- Each task executed once.
- Parallelism across nodes and across threads within a node.
The entry processor is good for operating on individual cache entries. It is not so good for working on groups of cache entries.
The invocation service is good for performing checks on a node, but is limited in its parallelism.
The work manager is good for operating on groups of related entries in the cache or performing non-cache related work in parallel. It has a high degree of parallelism.
As you can see the primary choice for distributed computing comes down to the Work Manager and the Entry Processor.
|Aspect||Entry Processors||Work Managers|
|Degree of parallelization||Is a function of the number of Coherence nodes. EntryProcessors are run concurrently across all nodes in a cluster. However, within each node only one instance of the entry processor executes at a time.||Is a function of the number of Work Manager threads. The Work is run concurrently across all threads in all Work Manager instances.|
|Transactionality||Transactional. If an EntryProcessor running on one node does not complete (say, due to that node crashing), the entries targeted will be executed by an EntryProcessor on another node.||Not transactional. The specification does not explicitly specify what the response should be if a remote server crashes during an execution. Current implementation uses WORK_COMPLETED with WorkCompletedException as a result. In case a Work does not run to completion, it is the responsibility of the client to resubmit the Work to the Work Manager.|
|How is the Cache accessed or mutated?||Operations against the cache contents are executed by (and thus within the localized context of) a cache.||Accesses and changes to the cache are done directly through the cache API.|
|Where is the processing performed?||In the same JVM where the entries-to-be-processed reside.||In the Work Manager server. This may not be the same JVM where the entries-to-be-processed reside.|
|Network Traffic||Is a function of the size of the EntryProcessor. Typically, the size of an EntryProcessor is much smaller than the size of the data transferred across nodes in the case of a Work Manager approach. This makes the EntryProcessor approach more network-efficient and hence more scalable. One EntryProcessor is transmitted to each cache node.||Is a function of the
|Distribution of “Tasks”||Tasks are moved to the location at which the entries-to-be-processed are being managed. This may result in a random distribution of tasks. The distribution tends to get equitable as the number of entries increases.||Tasks are distributed equally across the threads in the Work Manager Instances.|
|Implementation of the EntryProcessor or Work class.||Create a class that extends AbstractProcessor. Implement the process method. Update the cache item based on the key passed in to the process method.||Create a class that is serializable and implements commonj.work.Work. Implement the run method.|
|Implementation of “Task”||In the process method, update the cache item based on the key passed into the process method.||In the run method, do the following:
|Completion Notification||When the NamedCache.invoke method completes then all the entry processors have completed executing.||When a task is submitted for execution it executes asynchronously on the work manager threads in the cluster. Status may be obtained by registering a commonj.work.WorkListener class when calling the WorkManager.schedule method. This will provide updates when the Work is accepted, started and completed or rejected. Alternatively the WorkManager.waitForAll and WorkManager.waitForAny methods allow blocking waits for either all or one result respectively.|
|Returned Results||java.lang.Object – when executed on one cache item. This returns result of the invocation as returned from the EntryProcessor.
java.util.Map – when executed on a collection of keys. This returns a Map containing the results of invoking the EntryProcessor against each of the specified keys.
|commonj.work.WorkItem - There are three possible outcomes
|Error Handling||Failure of a node results in all the work assigned to that node being executed on the new primary. This may result in some work being executed twice, but Coherence ensures that the cache is only updated once per item.||Failure of a node results in the loss of scheduled tasks assigned to that node. Completed tasks are sent back to the client as they complete.|
Fault Handling Extension
Entry processors have excellent error handling within Coherence. Work Managers less so. In order to provide resiliency on node failure I implemented a “RetryWorkManager” class that detects tasks that have failed to complete successfully and resubmits them to the grid for another attempt.
A JDeveloper project with the RetryWorkManager is available for download here. It includes sample code to run a simple task across multiple work manager threads.
To create a new RetryWorkManager that will retry failed work twice then you would use this:
WorkManager = new RetryWorkManager("WorkManagerName", 2); // Change for number of retries, if no retry count is provided then the default is 0.You can control the number of retries at the individual work level as shown below:
WorkItem workItem = schedule(work); // Use number of retries set at WorkManager creationCurrently the RetryWorkManager defaults to having 0 threads. To change use this constructor:
WorkItem workItem = schedule(work, workListener); // Use number of retries set at WorkManager creation
WorkItem workItem = schedule(work, 4); // Change number of retries
WorkItem workItem = schedule(work, workListener, 4); // Change number of retries
WorkItem workItem = schedule(work, workListener, 3, 4); // Change number of threads (3) and retries (4)Note that none of this sample code is supported by Oracle in any way, and is provided purely as a sample of what can be done with Coherence.
How the RetryWorkManager Works
The RetryWorkManager delegates most operations to a Coherence WorkManager instance. It creates a WorkManagerListener to intercept status updates. On receiving a WORK_COMPLETED callback the listener checks the result to see if the completion is due to an error. If an error occurred and there are retries left then the work is resubmitted. The WorkItem returned by scheduling an event is wrapped in a RetryWorkItem. This RetryWorkItem is updated with a new Coherence WorkItem when the task is retried. If the client registers a WorkManagerListener then the RetryWorkManagerListener delegates non-retriable events to the client listener. Finally the waitForAll and waitForAny methods are modified to deal with work items being resubmitted in the event of failure.
Sample Code for EntryProcessor and RetryWorkManager
The downloadable project contains sample code for running the work manager and an entry processor.
The demo implements a 3-tier architecture
- Coherence Cache Servers
- Can be started by running RunCacheServer.cmd
- Runs a distributed cache used by the Task to be executed in the grid
- Coherence Work Manager Servers
- Can be started by running RunWorkManagerServer.cmd
- Takes no parameters
- Runs two threads for executing tasks
- Coherence Work Manager Clients
- Can be started by running RunWorkManagerClient.cmd
- Takes three parameters currently
- Work Manager name - should be "AntonyWork" - default is "AntonyWork"
- Number of tasks to schedule - default is 10
- Time to wait for tasks to complete in seconds - default is 60
The task stores the number of times it has been executed in the cache, so multiple runs will see the counter incrementing. The choice between EntryProcessor and WorkManager is controlled by changing the value of USE_ENTRY_PROCESSOR between false and true in the RunWorkManagerClient.cmd script.
The SetWorkManagerEnv.cmd script should be edited to point to the Coherence home directory and the Java home directory.
If you need to perform operations on cache entries and don’t need to have cross-checks between the entries then the best solution is to use an entry processor. The entry processor is fault tolerant and updates to the cached entity will be performed once only.
If you need to perform generic work that may need to touch multiple related cache entries then the work manager may be a better solution. The extensions I created in the RetryWorkManager provide a degree of resiliency to deal with node failure without impacting the client.