X

An Oracle blog about Oracle Coherence

Getting Started with Messaging Functionality Provided by Oracle Coherence Topics API

Randy Stafford, and Joe Fialli

Overview

The Topics API adds publish/subscribe and message queue processing functionality to Oracle Coherence 14.1.1.  The topics and their content are managed in a distributed and fault tolerant manner within the Oracle Coherence data grid. The underlying architecture enables scalable, parallel, high throughput consumption of Topic content. 

Each topic can be independently configured, enabling selecting appropriate Quality of Service (QoS) of in-memory, journaled or persistent for topic content, setting maximum storage size used by a topic within the data grid,  and numerous other configurable parameters.  Grid-side content filtering and transformation enable optimizing data transmission from the data grid to Topic subscriber clients.

Publish/Subscribe Messaging

Publish/Subscribe messaging enables the building of data pipelines between loosely coupled producers and consumers. One or more publishers can publish streams of values to a topic. One to many subscribers consume the stream of values from the topic. The topic values are spread evenly across all Oracle Coherence data servers, enabling high throughput processing in a distributed and fault tolerant manner. Each direct subscriber to a topic receives all values published to the topic.

Figure 1: Broadcast messaging from one publisher to multiple direct topic subscribers

Message Queue-like Processing using a Topic Subscriber Group

One or more Subscribers subscribe to a Topic’s subscriber group. Each value of the subscriber group is consumed by only one subscriber group member, enabling parallel processing of those values. Thus, each subscriber group in effect behaves like a queue for the processing of its data.

Figure 2: Parallel consumption of Topic subscriber group values by multiple Subscriber Group Members.

Quick Start

Below is a Hello World publish/subscribe example followed with a modification to illustrate consuming from a subscriber group.

Topic Configuration

Minimal configuration file to define topic example-topic.

File: coherence-cache-config.xml
<?xml version="1.0"?>
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
              xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
  <topic-scheme-mapping>
    <topic-mapping>
      <topic-name>example-topic</topic-name>
      <scheme-name>topic-server</scheme-name>
    </topic-mapping>
  </topic-scheme-mapping>
  <caching-schemes>
    <!-- partitioned topic scheme for servers -->
    <paged-topic-scheme>
      <scheme-name>topic-server</scheme-name>
      <service-name>PartitionedTopic</service-name>
      <local-storage system-property="coherence.distributed.localstorage">true</local-storage>
      <autostart>true</autostart>
    </paged-topic-scheme>
  </caching-schemes>
</cache-config>

 

Topic Publisher Client 

File: HelloWorldPublisher.java

import com.tangosol.net.Session;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Publisher;

public class HelloWorldPublisher
    {
    static public void main(String[] args) throws Exception
        {
        try (Session                      session      = Session.create();
             NamedTopic<String> topic         = session.getTopic("example-topic");
             Publisher<String>       publisher = topic.createPublisher())
            {
            publisher.send("hello world").join();
            System.out.println("Published to topic " + topic.getName());
            }
        }
    }
 

Topic Subscriber Client

Topic direct subscriber client.

File: HelloWorldSubscriber.java

import com.tangosol.net.Session;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.net.topic.Subscriber.Element;

public class HelloWorldSubscriber
    {
    static public void main(String[] args) throws Exception
        {
        try (Session                      session       = Session.create();
             NamedTopic<String> topic          = session.getTopic("example-topic");
             Subscriber<String>    subscriber = topic.createSubscriber())
            { 
            System.out.println("HelloWorldSubscriber for topic " + topic.getName() +
                " waiting to receive a value ...");
            Element<String> e = subscriber.receive().get();
            System.out.println("Received: " +  e.getValue());
            }
        }
    }
 

Steps to Run

  1. Download coherence 14.1.1 and install. Set COHERENCE_HOME environment variable to installation directory.
  2. Make a directory to contain the HelloWorld sample files.
    1. Create source file by copy and paste for each listed file.
    2. mkdir classes
    3. cp coherence-cache-config.xml classes
    4. compile all sources: javac -cp $COHERENCE_HOME/lib/coherence.jar -d classes *.java
  3. Start coherence cache server: java -cp classes:$COHERENCE_HOME/lib/coherence.jar com.tangosol.net.DefaultCacheServer
  4. Start hello world subscriber: java -cp classes:$COHERENCE_HOME/lib/coherence.jar -Dcoherence.distributed.localstorage=false -Dcoherence.log=subscriber.log HelloWorldSubscriber
  5. Start hello world publisher: java -cp classes:$COHERENCE_HOME/lib/coherence.jar -Dcoherence.distributed.localstorage=false -Dcoherence.log=publisher.log HelloWorldPublisher
  6. Observe output from publisher and subscriber client.

Subscriber Group Member

Example of parallel consumption using a subscriber group member client.

File: HelloWorldSubscriberGroupMember.java

import com.tangosol.net.Session;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.net.topic.Subscriber.Element;
import com.tangosol.net.topic.Subscriber.Name;

public class HelloWorldSubscriberGroupMember
    {
    static public void main(String[] args)
        throws Exception
        {
        try (Session                       session       = Session.create();
             NamedTopic<String> topic           = session.getTopic("example-topic");
             Subscriber<String>     subscriber = topic.createSubscriber(Name.of("queue")))
            {
            System.out.println("HelloWorldSubscriberGroupMember for topic " + 
                               topic.getName() + " waiting to receive a value ...");
            Element<String> e = subscriber.receive().get();
            System.out.println("Received: " +  e.getValue());
            }
        }
    }
 

Update to HelloWorldPublisher.java

Add following line after existing publisher.send:

publisher.send(“goodbye world”).join();

Steps to Run

  1. Compile sources: javac -cp $COHERENCE_HOME/lib/coherence.jar -d classes *.java
  2. Start coherence cache server: java -cp classes:$COHERENCE_HOME/lib/coherence.jar com.tangosol.net.DefaultCacheServer
  3. Start two instances of hello world subscriber group member in different terminal windows: java -cp classes:$COHERENCE_HOME/lib/coherence.jar -Dcoherence.distributed.localstorage=false -Dcoherence.log=subscriber.log HelloWorldSubscriberGroupMember
  4. Start updated hello world publisher: java -cp classes:$COHERENCE_HOME/lib/coherence.jar -Dcoherence.distributed.localstorage=false -Dcoherence.log=publisher.log HelloWorldPublisher
  5. Observe output from publisher and subscriber clients. 
  6. Should observe one subscriber member client received “hello world” and the other received “goodbye world”.

See Also

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.