Pushing real-time data changes from an Oracle database into Coherence
By dxfelcey on Jun 17, 2009
A common requirement when using Coherence is for the data in it to remain synchronised with database. If all changes to the database flow through Coherence or the cached data can be periodically refreshed (using the refresh-ahead mechanism) then Coherence will be aware of of any database changes. However, if another application changes the database outside of Coherence or the database changes need to be relayed to Coherence in real-time then an alternative approach is required.
To push database changes to Coherence in real-time when they are being made outside of Coherence, a queue needs to be used. Fortunately most databases support the queuing of transactional change information, either implicitly or explicitly. In this example architecture the explicit queuing of changes to an Oracle database, using Triggers and Oracle’s Advanced Queuing (AQ) technology, will be used to propagate them to a Coherence cache holding objects representing the same information.
Below is a diagram showing how the changes flow from the Oracle database to the Coherence cache.
In the example the read/receive operation from the queue is performed as a transaction, so the message is only removed from the queue once the corresponding cache object has been updated. This ensures that if the Coherence JVM/node that the client runs on fails the update message will not be lost. Although the example code focuses on cache updates it could easily be modified to accommodate inserts and deletes as well.
To enable Advanced Queuing (AQ) to be used for propagating table changes a number of database permissions first need to be granted. The PL/SQL to do this is shown below:
-- Execute as sys/welcome1
GRANT SELECT_CATALOG_ROLE TO scott;
GRANT EXECUTE ON DBMS_APPLY_ADM TO scott;
GRANT EXECUTE ON DBMS_AQ TO scott;
GRANT EXECUTE ON DBMS_AQADM TO scott;
GRANT EXECUTE ON DBMS_CAPTURE_ADM TO scott;
GRANT EXECUTE ON DBMS_FLASHBACK TO scott;
GRANT EXECUTE ON DBMS_STREAMS_ADM TO scott;
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY', 'scott', TRUE);
EXECUTE dbms_aqadm.grant_system_privilege('DEQUEUE_ANY', 'scott', TRUE);
GRANT aq_administrator_role TO scott;
GRANT EXECUTE ON dbms_lock TO scott;
GRANT EXECUTE ON sys.dbms_aqin TO scott;
GRANT EXECUTE ON sys.dbms_aqjms TO scott;
Note: These GRANT statements need to be executed as the database administrator (sys) or another database user who has privileges to grant them.
Then the AQ queue need to be setup and the PL/SQL procedure and trigger created to put the database table changes in the appropriate queue. Below is the PL/SQL used to do this:
-- Execute as scott/tiger
-- Create queue
EXECUTE dbms_aqadm.stop_queue(queue_name => 'trade_queue');
EXECUTE dbms_aqadm.drop_queue(queue_name => 'trade_queue');
EXECUTE DBMS_AQADM.DROP_QUEUE_TABLE(queue_table => 'trade_queue_table');
EXECUTE dbms_aqadm.create_queue_table(queue_table => 'trade_queue_table',queue_payload_type => 'sys.aq$_jms_text_message', multiple_consumers => false);
EXECUTE dbms_aqadm.create_queue(queue_name => 'trade_queue', queue_table =>'trade_queue_table', queue_type => DBMS_AQADM.NORMAL_QUEUE, retention_time => 0, max_retries => 5, retry_delay => 60);
EXECUTE dbms_aqadm.start_queue(queue_name => 'trade_queue');
-- Create table
DROP TABLE Trade CASCADE CONSTRAINTS;
CREATE TABLE Trade
Id NUMBER PRIMARY KEY,
Symbol VARCHAR2(5) ,
Created DATE ,
Quantity NUMBER ,
-- Enable SERVEROUTPUT in SQL Command Line (SQL*Plus) to display output with
-- DBMS_OUTPUT.PUT_LINE, this enables SERVEROUTPUT for this SQL*Plus session only
SET SERVEROUTPUT ON
CREATE OR REPLACE
PROCEDURE testmessage(text_message VARCHAR2)
msg_agent := SYS.AQ$_AGENT(' ', NULL, 0);
msg_proparray := SYS.AQ$_JMS_USERPROPARRAY() ;
msg_property := SYS.AQ$_JMS_USERPROPERTY('JMS_OracleDeliveryMode', 100, '2', NULL, 27);
msg_proparray(1) := msg_property;
msg_hdr := SYS.AQ$_JMS_HEADER(msg_agent,NULL,'<USERNAME>',NULL,NULL,NULL,msg_proparray);
msg := SYS.AQ$_JMS_TEXT_MESSAGE(msg_hdr,NULL,NULL,NULL);
msg.text_vc := text_message;
msg.text_len := LENGTH(msg.text_vc);
DBMS_AQ.ENQUEUE(queue_name => 'trade_queue' , enqueue_options => queue_options , message_properties => msg_props , payload => msg , msgid => msg_id);
CREATE OR REPLACE TRIGGER TradeAQTrigger AFTER INSERT OR UPDATE ON Trade
FOR EACH row DECLARE xml_complete VARCHAR2(1000);
xml_complete := '<?xml version="1.0" encoding="UTF-8" ?>' ||
'<TradeElement xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ' ||
'xsi:schemaLocation="http://www.oracle.com/coherenceaq src/aq-object.xsd" ' ||
'<Id>' || :new.ID || '</Id>' ||
'<Symbol>' || :new.SYMBOL || '</Symbol>' ||
'<Quantity>' || :new.QUANTITY || '</Quantity>' ||
'<Amount>' || :new.AMOUNT || '</Amount>' ||
'<Created>' || :new.CREATED || '</Created>' ||
The Java code that runs in the background Coherence thread – started by the BackingMapListener – that reads messages from the queue looks like this:
* <p>AQ client interface</p>
public class AQClient
* Send an acknowledgement message for a received message. This enables the
* message to be kept by the sender if client dies. An acknowledgement should only
* be sent when the received message has been saved, e.g. in a cache
* @throws JMSException If an exception occurs sending an acknowledgement
public void aknowledgeMessage()
* Reads XML text message from the queue and transforms it into a Trade
* object using JAXB classes.
* @return a Trade object
* @throws JMSException If an exception occurs sending an acknowledgement
* @throws JAXBException If an exception occurs during JAXB processing
public Trade receiveMessage()
throws JMSException, JAXBException
// Wait for a message to show up in the queue
textMsg = (TextMessage) queueReciever.receive();
System.out.println("Message is: " + textMsg.getText());
// Use JAXB to create object
JAXBContext jc = JAXBContext.newInstance("com.oracle.demo.model");
Unmarshaller u = jc.createUnmarshaller();
JAXBElement<Trade> trade =
(JAXBElement<Trade>) u.unmarshal(new StreamSource(new StringReader(textMsg.getText())));
Trade t = trade.getValue();
System.out.println("Trade created on: " + t.toString());
A full code example can be found here, including all the necessary PL/SQL scripts. All you need to try it out is Coherence, Oracle XE and Coherence .NET – if you want to see the changes propagated to a .NET Coherence client.
If changes in the Coherence cache data also need to be written back to the database then something like a status field or flag would need to be added to the the cached objects and table, to ensure that a circular loop isn’t created. For instance the following logic could enable bi-directional synchronization:
- Propagate flag added to cached object (Trade) and database table (Trade) to signify if changes should be propagated by the after update trigger from the database to Coherence
- Data changes propagated from Coherence to Database have propagate flag set to FALSE.
- Introduce a new before insert/update trigger to reset the propagate flag to TRUE when it is FALSE.
- When the after insert/update trigger fires only place the new value on the queue if the old value is not FALSE.
A modified example of the PL/SQL to add this functionality is in the setup-queue2.sql file, though you will need to add the additional column to the Trade table and attribute to the Trade object.
This approach is not the only mechanism for propagating changes from an Oracle Database to Coherence. Using a BackingMapListener to start the background queue client thread has an added benefit. If the node it is running on fails, Coherence will re-start the queue client on another node as part of its recovery process, i.e. when the queue configuration entry is failed-over and the new primary created it will cause a new background queue client thread to be started.