Wednesday Jul 02, 2014

Coherence Adapter Configuration

SOA Suite 12c Coherence Adapter

The release of SOA Suite 12c sees the addition of a Coherence Adapter to the list of Technology Adapters that are licensed with the SOA Suite.  In this entry I provide an introduction to configuring the adapter and using the different operations it supports.

The Coherence Adapter provides access to Oracles Coherence Data Grid.  The adapter provides access to the cache capabilities of the grid, it does not currently support the many other features of the grid such as entry processors – more on this at the end of the blog.

Previously if you wanted to use Coherence from within SOA Suite you either used the built in caching capability of OSB or resorted to writing Java code wrapped as a Spring component.  The new adapter significantly simplifies simple cache access operations.

Configuration

When creating a SOA domain the Coherence adapter is shipped with a very basic configuration that you will probably want to enhance to support real requirements.  In this section I look at the configuration required to use Coherence adapter in the real world.

Activate Adapter

The Coherence Adapter is not targeted at the SOA server by default, so this targeting needs to be performed from within the WebLogic console before the adapter can be used.

Create a cache configuration file

The Coherence Adapter provides a default connection factory to connect to an out-of-box Coherence cache and also a cache called adapter-local.  This is helpful as an example but it is good practice to only have a single type of object within a Coherence cache, so we will need more than one.  Without having multiple caches then it is hard to clean out all the objects of a particular type.  Having multiple caches also allows us to specify different properties for each cache.  The following is a sample cache configuration file used in the example.

<?xml version="1.0"?>
<!DOCTYPE cache-config SYSTEM "cache-config.dtd">
<cache-config>
  <caching-scheme-mapping>
    <cache-mapping>
      <cache-name>TestCache</cache-name>
      <scheme-name>transactional</scheme-name>
    </cache-mapping>
  </caching-scheme-mapping>
  <caching-schemes>
    <transactional-scheme>
      <scheme-name>transactional</scheme-name>
      <service-name>DistributedCache</service-name>
      <autostart>true</autostart>
    </transactional-scheme>
  </caching-schemes>
</cache-config>

This defines a single cache called TestCache.  This is a distributed cache, meaning that the entries in the cache will distributed across the grid.  This enables you to scale the storage capacity of the grid by adding more servers.  Additional caches can be added to this configuration file by adding additional <cache-mapping> elements.

The cache configuration file is reference by the adapter connection factory and so needs to be on a file system accessed by all servers running the Coherence Adapter.  It is not referenced from the composite.

Create a Coherence Adapter Connection Factory

We find the correct cache configuration by using a Coherence Adapter connection factory.  The adapter ships with a few sample connection factories but we will create new one.  To create a new connection factory we do the following:

  1. On the Outbound Connection Pools tab of the Coherence Adapter deployment we select New to create the adapter.
  2. Choose the javax.resource.cci.ConnectionFactory group.
  3. Provide a JNDI name, although you can use any name something along the lines of eis/Coherence/Test is a good practice (EIS tells us this an adapter JNDI, Coherence tells us it is the Coherence Adapter, and then we can identify which adapter configuration we are using).
  4. If requested to create a Plan.xml then make sure that you save it in a location available to all servers.
  5. From the outbound connection pool tab select your new connection factory so that you can configure it from the properties tab.
    • Set the CacheConfigLocation to point to the cache configuration file created in the previous section.
    • Set the ClassLoaderMode to CUSTOM.
    • Set the ServiceName to the name of the service used by your cache in the cache configuration file created in the previous section.
    • Set the WLSExtendProxy to false unless your cache configuration file is using an extend proxy.
    • If you plan on using POJOs (Plain Old Java Objects) with the adapter rather than XML then you need to point the PojoJarFile at the location of a jar file containing your POJOs.
    • Make sure to press enter in each field after entering your data.  Remember to save your changes when done.

You may will need to stop and restart the adapter to get it to recognize the new connection factory.

Operations

To demonstrate the different operations I created a WSDL with the following operations:

  • put – put an object into the cache with a given key value.
  • get – retrieve an object from the cache by key value.
  • remove – delete an object from the cache by key value.
  • list – retrieve all the objects in the cache.
  • listKeys – retrieve all the keys of the objects in the cache.
  • removeAll – remove all the objects from the cache.

I created a composite based on this WSDL that calls a different adapter reference for each operation.  Details on configuring the adapter within a composite are provided in the Configuring the Coherence Adapter section of the documentation.

I used a Mediator to map the input WSDL operations to the individual adapter references.

Schema

The input schema is shown below.

This type of pattern is likely to be used in all XML types stored in a Coherence cache.  The XMLCacheKey element represents the cache key, in this schema it is a string, but could be another primitive type.  The other fields in the cached object are represented by a single XMLCacheContent field, but in a real example you are likely to have multiple fields at this level.  Wrapper elements are provided for lists of elements (XMLCacheEntryList) and lists of cache keys (XMLCacheEntryKeyList).  XMLEmpty is used for operation that don’t require an input.

Put Operation

The put operation takes an XMLCacheEntry as input and passes this straight through to the adapter.  The XMLCacheKey element in the entry is also assigned to the jca.coherence.key property.  This sets the key for the cached entry.  The adapter also supports automatically generating a key, which is useful if you don’t have a convenient field in the cached entity.  The cache key is always returned as the output of this operation.

Get Operation

The get operation takes an XMLCacheKey as input and assigns this to the jca.coherence.key property. This sets the key for the entry to be retrieved.

Remove Operation

The remove operation takes an XMLCacheKey as input and assigns this to the jca.coherence.key property. This sets the key for the entry to be deleted.

RemoveAll Operation

This is similar to the remove operation but instead of using a key as input to the remove operation it uses a filter.  The filter could be overridden by using the jca.coherence.filter property but for this operation it was permanently set in the adapter wizard to be the following query:

key() != ""

This selects all objects whose key is not equal to the empty string.  All objects should have a key so this query should select all objects for deletion.

Note that there appears to be a bug in the return value.  The return value is entry rather than having the expected RemoveResponse element with a Count child element.  Note the documentation states that

When using a filter for a Remove operation, the Coherence Adapter does not report the count of entries affected by the remove operation, regardless of whether the remove operation is successful.

When using a key to remove a specific entry, the Coherence Adapter does report the count, which is always 1 if a Coherence Remove operation is successful.

Although this could be interpreted as meaning an empty part is returned, an empty part is a violation of the WSDL contract.

List Operation

The list operation takes no input and returns the result list returned by the adapter.  The adapter also supports querying using a filter.  This filter is essentially the where clause of a Coherence Query Language statement.  When using XML types as cached entities then only the key() field can be tested, for example using a clause such as:

key() LIKE “Key%1”

This filter would match all entries whose key starts with “Key” and ends with “1”.

ListKeys Operation

The listKeys operation is essentially the same as the list operation except that only the keys are returned rather than the whole object.

Testing

To test the composite I used the new 12c Test Suite wizard to create a number of test suites.  The test suites should be executed in the following order:

  1. CleanupTestSuite has a single test that removes all the entries from the cache used by this composite.
  2. InitTestSuite has 3 tests that insert a single record into the cache.  The returned key is validated against the expected value.
  3. MainTestSuite has 5 tests that list the elements and keys in the cache and retrieve individual inserted elements.  This tests that the items inserted in the previous test are actually in the cache.  It also tests the get, list and listAll operations and makes sure they return the expected results.
  4. RemoveTestSuite has a single test that removes an element from the cache and tests that the count of removed elements is 1.
  5. ValidateRemoveTestSuite is similar to MainTestSuite but verifies that the element removed by the previous test suite has actually been removed.

Use Case

One example of using the Coherence Adapter is to create a shared memory region that allows SOA composites to share information.  An example of this is provided by Lucas Jellema in his blog entry First Steps with the Coherence Adapter to create cross instance state memory.

However there is a problem in creating global variables that can be updated by multiple instances at the same time.  In this case the get and put operations provided by the Coherence adapter support a last write wins model.  This can be avoided in Coherence by using an Entry Processor to update the entry in the cache, but currently entry processors are not supported by the Coherence Adapter.  In this case it is still necessary to use Java to invoke the entry processor.

Sample Code

The sample code I refer to above is available for download and consists of two JDeveloper projects, one with the cache config file and the other with the Coherence composite.

  • CoherenceConfig has the cache config file that must be referenced by the connection factory properties.
  • CoherenceSOA has a composite that supports the WSDL introduced at the start of this blog along with the test cases mentioned at the end of the blog.

The Coherence Adapter is a really exciting new addition to the SOA developers toolkit, hopefully this article will help you make use of it.

Thursday Mar 27, 2014

Not Just a Cache

Coherence as a Compute Grid

Coherence is best known as a data grid, providing distributed caching with an ability to move processing to the data in the grid.  Less well known is the fact that Coherence also has the ability to function as a compute grid, distributing work across multiple servers in a cluster.  In this entry, which was co-written with my colleague Utkarsh Nadkarni, we will look at using Coherence as a compute grid through the use of the Work Manager API and compare it to manipulating data directly in the grid using Entry Processors.

Coherence Distributed Computing Options

The Coherence documentation identifies several methods for distributing work across the cluster, see Processing Data in a Cache.  They can be summarized as:

  • Entry Processors
    • An InvocableMap interface, inherited by the NamedCache interface, provides support for executing an agent (EntryProcessor or EntryAggregator) on individual entries within the cache.
    • The entries may or may not exist, either way the agent is executed once for each key provided, or if no key is provided then it is executed once for each object in the cache.
    • In Enterprise and Grid editions of Coherence the entry processors are executed on the primary cache nodes holding the cached entries.
    • Agents can return results.
    • One agent executes multiple times per cache node, once for each key targeted on the node.
  • Invocation Service
    • An InvocationService provides support for executing an agent on one or more nodes within the grid.
    • Execution may be targeted at specific nodes or at all nodes running the Invocation Service.
    • Agents can return results.
    • One agent executes once per node.
  • Work Managers
    • A WorkManager class provides a grid aware implementation of the commonJ WorkManager which can be used to run tasks across multiple threads on multiple nodes within the grid.
    • WorkManagers run on multiple nodes.
    • Each WorkManager may have multiple threads.
    • Tasks implement the Work interface and are assigned to specific WorkManager threads to execute.
    • Each task is executed once.

Three Models of Distributed Computation

The previous section listing the distributed computing options in Coherence shows that there are 3 distinct execution models:

  • Per Cache Entry Execution (Entry Processor)
    • Execute the agent on the entry corresponding to a cache key.
    • Entries processed on a single thread per node.
    • Parallelism across nodes.
  • Per Node Execution (Invocation Service)
    • Execute the same agent once per node.
    • Agent processed on a single thread per node.
    • Parallelism across nodes.
  • Per Task Execution (Work Manager)
    • Each task executed once.
    • Parallelism across nodes and across threads within a node.

The entry processor is good for operating on individual cache entries.  It is not so good for working on groups of cache entries.

The invocation service is good for performing checks on a node, but is limited in its parallelism.

The work manager is good for operating on groups of related entries in the cache or performing non-cache related work in parallel.  It has a high degree of parallelism.

As you can see the primary choice for distributed computing comes down to the Work Manager and the Entry Processor.

Differences between using Entry Processors and Work Managers in Coherence

Aspect Entry Processors Work Managers
Degree of parallelization Is a function of the number of Coherence nodes. EntryProcessors are run concurrently across all nodes in a cluster. However, within each node only one instance of the entry processor executes at a time. Is a function of the number of Work Manager threads. The Work is run concurrently across all threads in all Work Manager instances.
Transactionality Transactional. If an EntryProcessor running on one node does not complete (say, due to that node crashing), the entries targeted will be executed by an EntryProcessor on another node. Not transactional. The specification does not explicitly specify what the response should be if a remote server crashes during an execution. Current implementation uses WORK_COMPLETED with WorkCompletedException as a result. In case a Work does not run to completion, it is the responsibility of the client to resubmit the Work to the Work Manager.
How is the Cache accessed or mutated? Operations against the cache contents are executed by (and thus within the localized context of) a cache. Accesses and changes to the cache are done directly through the cache API.
Where is the processing performed? In the same JVM where the entries-to-be-processed reside. In the Work Manager server. This may not be the same JVM where the entries-to-be-processed reside.
Network Traffic Is a function of the size of the EntryProcessor. Typically, the size of an EntryProcessor is much smaller than the size of the data transferred across nodes in the case of a Work Manager approach. This makes the EntryProcessor approach more network-efficient and hence more scalable. One EntryProcessor is transmitted to each cache node. Is a function of the
  • Number of Work Objects, of which multiple may be sent to each server.
  • Size of the data set transferred from the Backing Map to the Work Manager Server.
Distribution of “Tasks” Tasks are moved to the location at which the entries-to-be-processed are being managed. This may result in a random distribution of tasks. The distribution tends to get equitable as the number of entries increases. Tasks are distributed equally across the threads in the Work Manager Instances.
Implementation of the EntryProcessor or Work class. Create a class that extends AbstractProcessor. Implement the process method. Update the cache item based on the key passed in to the process method. Create a class that is serializable and implements commonj.work.Work. Implement the run method.
Implementation of “Task” In the process method, update the cache item based on the key passed into the process method. In the run method, do the following:
  • Get a reference to the named cache
  • Do the Work – Get a reference to the Cache Item; change the cache item; put the cache item back into the named cache.
Completion Notification When the NamedCache.invoke method completes then all the entry processors have completed executing. When a task is submitted for execution it executes asynchronously on the work manager threads in the cluster.  Status may be obtained by registering a commonj.work.WorkListener class when calling the WorkManager.schedule method.  This will provide updates when the Work is accepted, started and completed or rejected.  Alternatively the WorkManager.waitForAll and WorkManager.waitForAny methods allow blocking waits for either all or one result respectively.
Returned Results java.lang.Object – when executed on one cache item. This returns result of the invocation as returned from the EntryProcessor.
java.util.Map – when executed on a collection of keys. This returns a Map containing the results of invoking the EntryProcessor against each of the specified keys.
commonj.work.WorkItem - There are three possible outcomes
  • The Work is not yet complete. In this case, a null is returned by WorkItem.getResult.
  • The Work started but completed with an exception. This may have happened due to a Work Manager Instance terminating abruptly. This is indicated by an exception thrown by WorkItem.getResult.
  • The Work Manager instance indicated that the Work is complete and the Work ran to completion. In this case, WorkItem.getResult returns a non-null and no exception is thrown by WorkItem.getResult.
Error Handling Failure of a node results in all the work assigned to that node being executed on the new primary. This may result in some work being executed twice, but Coherence ensures that the cache is only updated once per item. Failure of a node results in the loss of scheduled tasks assigned to that node. Completed tasks are sent back to the client as they complete.

Fault Handling Extension

Entry processors have excellent error handling within Coherence.  Work Managers less so.  In order to provide resiliency on node failure I implemented a “RetryWorkManager” class that detects tasks that have failed to complete successfully and resubmits them to the grid for another attempt.

A JDeveloper project with the RetryWorkManager is available for download here.  It includes sample code to run a simple task across multiple work manager threads.

To create a new RetryWorkManager that will retry failed work twice then you would use this:

WorkManager = new RetryWorkManager("WorkManagerName", 2);  // Change for number of retries, if no retry count is provided then the default is 0.
You can control the number of retries at the individual work level as shown below:
WorkItem workItem = schedule(work); // Use number of retries set at WorkManager creation
WorkItem workItem = schedule(work, workListener); // Use number of retries set at WorkManager creation
WorkItem workItem = schedule(work, 4); // Change number of retries
WorkItem workItem = schedule(work, workListener, 4); // Change number of retries
Currently the RetryWorkManager defaults to having 0 threads.  To change use this constructor:
WorkItem workItem = schedule(work, workListener, 3, 4); // Change number of threads (3) and retries (4)
Note that none of this sample code is supported by Oracle in any way, and is provided purely as a sample of what can be done with Coherence.

How the RetryWorkManager Works

The RetryWorkManager delegates most operations to a Coherence WorkManager instance.  It creates a WorkManagerListener to intercept status updates.  On receiving a WORK_COMPLETED callback the listener checks the result to see if the completion is due to an error.  If an error occurred and there are retries left then the work is resubmitted.  The WorkItem returned by scheduling an event is wrapped in a RetryWorkItem.  This RetryWorkItem is updated with a new Coherence WorkItem when the task is retried.  If the client registers a WorkManagerListener then the RetryWorkManagerListener delegates non-retriable events to the client listener.  Finally the waitForAll and waitForAny methods are modified to deal with work items being resubmitted in the event of failure.

Sample Code for EntryProcessor and RetryWorkManager

The downloadable project contains sample code for running the work manager and an entry processor.

The demo implements a 3-tier architecture

  1. Coherence Cache Servers
    • Can be started by running RunCacheServer.cmd
    • Runs a distributed cache used by the Task to be executed in the grid
  2. Coherence Work Manager Servers
    • Can be started by running RunWorkManagerServer.cmd
    • Takes no parameters
    • Runs two threads for executing tasks
  3. Coherence Work Manager Clients
    • Can be started by running RunWorkManagerClient.cmd
    • Takes three parameters currently
      • Work Manager name - should be "AntonyWork" - default is "AntonyWork"
      • Number of tasks to schedule - default is 10
      • Time to wait for tasks to complete in seconds - default is 60

The task stores the number of times it has been executed in the cache, so multiple runs will see the counter incrementing.  The choice between EntryProcessor and WorkManager is controlled by changing the value of USE_ENTRY_PROCESSOR between false and true in the RunWorkManagerClient.cmd script.

The SetWorkManagerEnv.cmd script should be edited to point to the Coherence home directory and the Java home directory.

Summary

If you need to perform operations on cache entries and don’t need to have cross-checks between the entries then the best solution is to use an entry processor.  The entry processor is fault tolerant and updates to the cached entity will be performed once only.

If you need to perform generic work that may need to touch multiple related cache entries then the work manager may be a better solution.  The extensions I created in the RetryWorkManager provide a degree of resiliency to deal with node failure without impacting the client.

The RetryWorkManager can be downloaded here.

Friday Mar 02, 2012

Using Coherence with JDeveloper

Configuring JDeveloper for use with Coherence

Doing some work with Coherence again and so I needed to create some Java code calling Coherence API and edit some Coherence configuration files in JDeveloper.  The easiest way to do this is to register the Coherence jar file and the Coherence Schemas with JDeveloper, once that is done then you can use JDevelopers XML insight features to help you create the XML documents.

Register the Coherence Library

To register the Coherence jar file in JDeveloper go to “Tools->Manage Libraries…”, select “New…” and then use “Add Entry…” to add the following entries:

  • Class Path
    • <COHERENCE_HOME>\lib\coherence.jar
  • Doc Path
    • <COHERENCE_HOME>\doc\api

COHERENCE_HOME is the location where you unzipped the Coherence product.

This lets us use the Coherence API in our Java code by adding the library to our project.

Register the Schemas

To register the Coherence XML Schemas with JDevelper go to “Tools->Preferences…”, select “XML Schemas” and choose “Add…”.

Browse to the <COHERENCE_HOME>\lib\coherence.jar file and add the following schemas:

  • coherence-cache-config.xsd
  • coherence-operational-config.xsd
  • coherence-pof-config.xsd
  • coherence-report-config.xsd
  • coherence-report-group-config.xsd
  • coherence-rest-config.xsd

Now when you create an XML file for use with Coherence you can choose “XML Document from XML Schema” and choose “Use Registered Schemas” to show you suitable schemas to use for your Coherence config.

About

Musings on Fusion Middleware and SOA Picture of Antony Antony works with customers across the US and Canada in implementing SOA and other Fusion Middleware solutions. Antony is the co-author of the SOA Suite 11g Developers Cookbook, the SOA Suite 11g Developers Guide and the SOA Suite Developers Guide.

Search

Archives
« September 2015
SunMonTueWedThuFriSat
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today