Pushing real-time data changes from an Oracle database into Coherence

A common requirement when using Coherence is for the data in it to remain synchronised with database. If all changes to the database flow through Coherence or the cached data can be periodically refreshed (using the refresh-ahead mechanism) then Coherence will be aware of of any database changes. However, if another application changes the database outside of Coherence or the database changes need to be relayed to Coherence in real-time then an alternative approach is required.

To push database changes to Coherence in real-time when they are being made outside of Coherence, a queue needs to be used. Fortunately most databases support the queuing of transactional change information, either implicitly or explicitly. In this example architecture the explicit queuing of changes to an Oracle database, using Triggers and Oracle’s Advanced Queuing (AQ) technology, will be used to propagate them to a Coherence cache holding objects representing the same information.

Overview

Below is a diagram showing how the changes flow from the Oracle database to the Coherence cache.

AQ 2008

In the example the read/receive operation from the queue is performed as a transaction, so the message is only removed from the queue once the corresponding cache object has been updated. This ensures that if the Coherence JVM/node that the client runs on fails the update message will not be lost. Although the example code focuses on cache updates it could easily be modified to accommodate inserts and deletes as well.

To enable Advanced Queuing (AQ) to be used for propagating table changes a number of database permissions first need to be granted. The PL/SQL to do this is shown below:

-- Execute as sys/welcome1

GRANT SELECT_CATALOG_ROLE TO scott;
GRANT EXECUTE ON DBMS_APPLY_ADM TO scott;
GRANT EXECUTE ON DBMS_AQ TO scott;
GRANT EXECUTE ON DBMS_AQADM TO scott;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO scott;
GRANT EXECUTE ON DBMS_FLASHBACK TO scott;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO scott;
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY', 'scott', TRUE);
EXECUTE dbms_aqadm.grant_system_privilege('DEQUEUE_ANY', 'scott', TRUE);
GRANT aq_administrator_role TO scott;
GRANT EXECUTE ON dbms_lock TO scott;
GRANT EXECUTE ON sys.dbms_aqin TO scott;
GRANT EXECUTE ON sys.dbms_aqjms TO scott;
EXIT;

Note: These GRANT statements need to be executed as the database administrator (sys) or another database user who has privileges to grant them.

Then the AQ queue need to be setup and the PL/SQL procedure and trigger created to put the database table changes in the appropriate queue. Below is the PL/SQL used to do this:

-- Execute as scott/tiger

-- Create queue
EXECUTE dbms_aqadm.stop_queue(queue_name => 'trade_queue');
EXECUTE dbms_aqadm.drop_queue(queue_name => 'trade_queue');
EXECUTE DBMS_AQADM.DROP_QUEUE_TABLE(queue_table => 'trade_queue_table');
EXECUTE dbms_aqadm.create_queue_table(queue_table => 'trade_queue_table',queue_payload_type => 'sys.aq$_jms_text_message', multiple_consumers => false);
EXECUTE dbms_aqadm.create_queue(queue_name => 'trade_queue', queue_table =>'trade_queue_table', queue_type => DBMS_AQADM.NORMAL_QUEUE, retention_time => 0, max_retries => 5, retry_delay => 60);
EXECUTE dbms_aqadm.start_queue(queue_name => 'trade_queue');

-- Create table

DROP TABLE Trade CASCADE CONSTRAINTS;
CREATE TABLE Trade
(
    Id     NUMBER PRIMARY KEY,
    Symbol VARCHAR2(5)       ,
    Created DATE             ,
    Quantity NUMBER          ,
    Amount   NUMBER(8,2)
);

-- Enable SERVEROUTPUT in SQL Command Line (SQL*Plus) to display output with
-- DBMS_OUTPUT.PUT_LINE, this enables SERVEROUTPUT for this SQL*Plus session only
SET SERVEROUTPUT ON
CREATE OR REPLACE
PROCEDURE testmessage(text_message VARCHAR2)
AS
  msg SYS.AQ$_JMS_TEXT_MESSAGE;
  msg_hdr SYS.AQ$_JMS_HEADER;
  msg_agent SYS.AQ$_AGENT;
  msg_proparray SYS.AQ$_JMS_USERPROPARRAY;
  msg_property SYS.AQ$_JMS_USERPROPERTY;
  queue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  msg_props DBMS_AQ.MESSAGE_PROPERTIES_T;
  msg_id RAW(16);
  dummy VARCHAR2(4000);
BEGIN
  msg_agent := SYS.AQ$_AGENT(' ', NULL, 0);
  msg_proparray := SYS.AQ$_JMS_USERPROPARRAY()  ;
  msg_proparray.EXTEND(1);
  msg_property := SYS.AQ$_JMS_USERPROPERTY('JMS_OracleDeliveryMode', 100, '2', NULL, 27);
  msg_proparray(1) := msg_property;
  msg_hdr := SYS.AQ$_JMS_HEADER(msg_agent,NULL,'<USERNAME>',NULL,NULL,NULL,msg_proparray);
  msg := SYS.AQ$_JMS_TEXT_MESSAGE(msg_hdr,NULL,NULL,NULL);
  msg.text_vc  := text_message;
  msg.text_len := LENGTH(msg.text_vc);
  DBMS_AQ.ENQUEUE(queue_name => 'trade_queue' , enqueue_options => queue_options , message_properties => msg_props , payload => msg , msgid => msg_id);
END;
/
CREATE OR REPLACE TRIGGER TradeAQTrigger AFTER INSERT OR UPDATE ON Trade
FOR EACH row DECLARE xml_complete VARCHAR2(1000);
BEGIN
    xml_complete := '<?xml version="1.0" encoding="UTF-8" ?>' ||
      '<TradeElement xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance" ' ||
      'xsi:schemaLocation="
http://www.oracle.com/coherenceaq src/aq-object.xsd" ' ||
      'xmlns="
http://www.oracle.com/coherenceaq">' ||
    '<Id>' || :new.ID || '</Id>' ||
    '<Symbol>' || :new.SYMBOL || '</Symbol>' ||
    '<Quantity>' || :new.QUANTITY || '</Quantity>' ||
    '<Amount>' || :new.AMOUNT || '</Amount>' ||
    '<Created>' || :new.CREATED || '</Created>' ||
    '</TradeElement>';   
    testmessage(xml_complete);
END;
/
SHOW ERRORS;

The Java code that runs in the background Coherence thread – started by the BackingMapListener – that reads messages from the queue looks like this:

.
.
.

/**
* <p>AQ client interface</p>
*/
public class AQClient
{
  .
  .
  .

  /**
   * Send an acknowledgement message for a received message. This enables the
   * message to be kept by the sender if client dies. An acknowledgement should only
   * be sent when the received message has been saved, e.g. in a cache
   *
   * @throws JMSException If an exception occurs sending an acknowledgement
   */
  public void aknowledgeMessage()
    throws JMSException
  {
    textMsg.acknowledge();
  }

  /**
   * Reads XML text message from the queue and transforms it into a Trade
   * object using JAXB classes.
   *
   * @return a Trade object
   * @throws JMSException If an exception occurs sending an acknowledgement
   * @throws JAXBException If an exception occurs during JAXB processing
   */
  public Trade receiveMessage()
    throws JMSException, JAXBException
  {
    // Wait for a message to show up in the queue
    textMsg = (TextMessage) queueReciever.receive();
    System.out.println("Message is: " + textMsg.getText());

    // Use JAXB to create object
    JAXBContext jc = JAXBContext.newInstance("com.oracle.demo.model");
    Unmarshaller u = jc.createUnmarshaller();
    JAXBElement<Trade> trade =
      (JAXBElement<Trade>) u.unmarshal(new StreamSource(new StringReader(textMsg.getText())));
    Trade t = trade.getValue();
    System.out.println("Trade created on: " + t.toString());
    return t;
  }
}

A full code example can be found here, including all the necessary PL/SQL scripts. All you need to try it out is Coherence, Oracle XE and Coherence .NET – if you want to see the changes propagated to a .NET Coherence client.

If changes in the Coherence cache data also need to be written back to the database then something like a status field or flag would need to be added to the the cached objects and table, to ensure that a circular loop isn’t created. For instance the following logic could enable bi-directional synchronization:

  1. Propagate flag added to cached object (Trade) and database table (Trade) to signify if changes should be propagated by the after update trigger from the database to Coherence
  2. Data changes propagated from Coherence to Database have propagate flag set to FALSE.
  3. Introduce a new before insert/update trigger to reset the propagate flag to TRUE when it is FALSE.
  4. When the after insert/update trigger fires only place the new value on the queue if the old value is not FALSE.

A modified example of the PL/SQL to add this functionality is in the setup-queue2.sql file, though you will need to add the additional column to the Trade table and attribute to the Trade object.

Summary

This approach is not the only mechanism for propagating changes from an Oracle Database to Coherence. Using a BackingMapListener to start the background queue client thread has an added benefit. If the node it is running on fails, Coherence will re-start the queue client on another node as part of its recovery process, i.e. when the queue configuration entry is failed-over and the new primary created it will cause a new background queue client thread to be started.

Alternative approaches to the one above might be to use Oracle Streams or the Data Change Notification mechanism in Oracle 11g

Comments:

We did an internal application using multi-subscriber AQ's on 10.2.0.4 w/ JMSQ clients - but ran into performance bottlenecks on the deque ops. I have concerns about how this might scale based on those experiences. CPU consumption went WAY up at around 100/sec on a 12 CPU sun box.

Posted by Malcolm on February 22, 2010 at 08:29 AM GMT #

Hi Malcolm, Thanks for your comments and feedback. You are right in that there are limitations in how much data you can push into a Coherence cluster using AQ. Just out of interest can you share any more details about the number of clients and message rate that you started to run into problems at?

Posted by David Felcey on February 22, 2010 at 08:57 AM GMT #

David, We had 3 subscribers using the JMSQ interface, enqueue rate at 100/sec - subscriber queues started backing up and CPU was climbing. We were using commit_time ordering on the queue with much worse results, changing to enq_time ordering produced the results above.

Posted by Malcolm on February 22, 2010 at 06:34 PM GMT #

Hi Malcolm, That's not many updates. I'm suprised the through-put is no higher, though I have not benchmarked AQ myself. After a cursory glance at other AQ benchmarks I found another benchmark here(http://www.trivadis.com/uploads/tx_cabagdownloadarea/Salvisberg_AQ_Design_for_Best_Performance.pdf). How does this compare with your benchmark?

Posted by David Felcey on February 23, 2010 at 02:20 AM GMT #

Post a Comment:
  • HTML Syntax: NOT allowed
About

Views and ideas about Oracle Coherence and other software

Search

Categories
Archives
« April 2014
SunMonTueWedThuFriSat
  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
   
       
Today