Low Latency Scheduling with Oracle Coherence

<script type="text/javascript" src="http://blogs.sun.com/melvinkoh/resource/prettify.js"></script> I have been too caught up with my transition to Oracle, so have not been able to spend much time in writing my blog. As I'm now a part of Oracle, I have been spending some of my free time getting familiar with Oracle products, in particularly Oracle Coherence.

Coherence provides distributed data memory, something like a shared memory space across multiple systems. Existing similar competing products are Gigaspaces, Gemfire and Terracotta. Coherence is extremely versatile and can be use for different purpose, e.g. with application servers to scale your web application, in financial trading for low latency transaction processing and high performance data intensive computing.

Coherence provides a way for low-latency scheduling of small independent tasks. Traditional HPC job schedulers like Grid Engine schedules jobs in a fixed interval (usually in several seconds), it is fine for jobs that run for a few hours but not for many small tasks that needs to be processed quickly. One way that Coherence is able to address this is to use the WorkManager interface.

JSR 237 - WorkManager API

Coherence provides a grid-enabled implementation of the CommonJ WorkManager API, which is defined by the JSR 237 specification. The WorkManager specification is meant for web application servers to provide a simple, container-manageable programming model for concurrent execution of work. These work items execute out of a thread pool managed by a container. The Work Manager API provides a higher level of abstraction for concurrent programming than java.lang.Thread, which is inappropriate for use by applications hosted in managed environments such as EJB containers and Servlet containers.

Existing application servers like WebSphere and WebLogic provides the WorkManager API, and when used with Coherence you get a grid-enabled implementation. Established HPC software system like Platform Symphony, which is commonly used in financial services industry, also support this specification with their own API.

I could not find a lot of details on the docs on how to actually use this API, so I played around with the API and came up with this very simple example.

How It Works

This example is going to look very trivial but is meant to show the basic functionality of the API. It is done in Java and consists of 3 classes - a server, client and the task to execute. The server class is to start up an instance of the WorkManager server while the client is to generate and dispatch the tasks. Every WorkManager server that you start up is a compute server that can receive the tasks for processing. In the diagram below shows 3 WorkManager servers running in separate physical servers, with the client dispatching tasks to them. If I run more servers I can process my tasks faster.

The first thing you have to do is to get Oracle Coherence. You can download the Grid Edition of Oracle Coherence from here, which is free to use for development and testing purposes. The download is in a zip archive and after unzipping, all you need is the Jar files in the lib folder.

WorkManager Server

The WorkManager Server is very simple and only have several lines of code:
1. // WorkServer.java 2. import com.tangosol.coherence.commonj.WorkManager; 3. 4. public class WorkServer { 5. public static void main(String[] args) throws Exception { 6. WorkManager mgr = new WorkManager("MyManager", 10); 7. Thread.sleep(Long.MAX_VALUE); 8. } 9. }

The only significant portion of the code is line 6, where an instance of the WorkManager service. The first parameter of the WorkManager constructor specifies the name of the service "MyManager". This name is used by other servers and client to indicate which service network to join. The second parameter specifies the number of concurrent work that this server is able to process. Once the service is instantiated, the server thread will sleep indefinitely until its terminated with Ctrl-C.

Client

The client is for generating the work and dispatching them to the servers. The WorkManager API specifies a commonj.work.Work interface, which we have to use to define our work. The following listing is a very simple Work example that basically prints a message after waiting for a random period of 1 to 5 seconds. The random wait is to show that the work are being processed concurrently.
1. // Task.java 2. import commonj.work.Work; 3. import java.util.Random; 4. import java.io.Serializable; 5. 6. public class Task implements Work, Serializable { 7. int rank; 8. public Task(int i) { 9. rank = i; 10. } 11. public boolean isDaemon() { 12. return false; 13. } 14. public void release() { } 15. public void run() { 16. try { 17. Thread.currentThread().sleep((new 18. Random(System.currentTimeMillis())).nextInt(5) \* 1000L); 19. System.out.println("I am rank: " + rank); 20. } catch(InterruptedException e) { } 21. } 22. }

The commonj.work.Work interface basically works like a Java Thread and extend from the java.lang.Runnable interface. The interface defines isDaemon(), release() and run() methods. The run() method will be where the main work is to be done. Note that our Task class also implements the Serializable interface, this is required so that the state of our work objects can be send to the servers over the network. Next is the listing for WorkClient.java, which is responsible for instantiating the Task objects and dispatching them to the WorkManager servers.
1. // WorkClient.java 2. import com.tangosol.coherence.commonj.WorkManager; 3. import commonj.work.Work; 4. import commonj.work.WorkItem; 5. import java.util.Arrays; 6. 7. public class WorkClient { 8. public static void main(String[] args) throws Exception { 9. WorkManager mgr = new WorkManager("MyManager", 0); 10. Work[] tasks = new Work[10]; 11. WorkItem[] retval = new WorkItem[10]; 12. 13. for(int i=0; i < 10; i++) { 14. tasks[i] = new Task(i); 15. retval[i] = mgr.schedule(tasks[i]); 16. } 17. mgr.waitForAll(Arrays.asList(retval), 60000); // wait to complete with 60 sec timeout 18. } 19. }

The first step of the client is to instantiate the WorkManager service (line 9). As I do not want the client to do any work, I specify 0 in the second argument. Line 13-16 shows the for loop that creates 10 Task objects and dispatches them using the schedule(), which returns a commonj.work.WorkItem. The WorkItem can be used to query for the work status and retrieve the result of the completed work.

As the work are scheduled asynchronously, once all the work are scheduled the client will terminate irregardless if the work are completed. In my example, I used waitForAll() method in line 17 to indicate that the Thread should block until all the work are complete.

Running the Example

To compile the codes, basically place coherence.jar, coherence-work.jar and commonj.jar in the same folder. With the 3 Java files (WorkServer.java, WorkClient.java and Task.java) in the current folder, run the command:
$ javac -cp coherence-work.jar:coherence.jar:commonj.jar:. \*.java

To run the server, you need coherence.jar in addition to coherence-work.jar:
$ java -cp coherence-work.jar:coherence.jar:. WorkServer

The server will output several lines of log messages. You will see a few messages like "Service MyManager joined the cluster with senior service member 1" which means that your service is now running. If you start up multiple instances even in different physical machines, the servers will join the same service network as long as they are in the same subnet and multicast is enabled.

NOTE: Make sure that the Task.class file is in the server's classpath.

Once you have start up at least 1 server, you can run the client:
$ java -cp coherence-work.jar:coherence.jar:. WorkClient

The following shows the output printed on the terminals when the client is run (log messages not shown), for the 2 WorkServer that I started locally on my laptop:
WorkServer 1:
I am rank: 1
I am rank: 3
I am rank: 7
I am rank: 5
I am rank: 9

WorkServer 2:
I am rank: 6
I am rank: 4
I am rank: 0
I am rank: 2
I am rank: 8

Note that the work are schedule in round-robin to each of the servers. If I start 5 servers, then each of the server will be scheduled with 2 tasks. As the work are pushed out immediately by Coherence when scheduled, there is minimal delay and very little scheduling overhead.

For embarrassingly parallel type of processing, the WorkManager API can be a quick way to develop your Java HPC application. If you feel that Java may have too much processing overhead, Coherence also has a C++ client library and .Net framework for the Windows lovers.
Comments:

Java already has Executor Service for task executions and now imagine it is distributed! Imagine executor service is executing your tasks across the cluster. This is exactly what Hazelcast (http://www.hazelcast.com) does. For more info visit
http://www.hazelcast.com/documentation.jsp#ExecutorService

Hazelcast is open source (Apache license), transactional, distributed implementation of map, queue, multimap, topic, lock and executor service for Java.

Posted by Talip Ozturk on May 03, 2010 at 12:38 PM SGT #

Post a Comment:
  • HTML Syntax: NOT allowed
About

Melvin Koh

Search

Archives
« April 2014
SunMonTueWedThuFriSat
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today