In this post I want to highlight some examples of the JMS feature support in Oracle Advanced Queueing (AQ), in particular some of the messaging features that were introduced alongside the JMS 2.0 API. You can read about these in more detail in these two blog posts by my colleague Nigel Deakin, who was the spec lead for JMS 2.0:
- https://www.oracle.com/technical-resources/articles/java/jms20.html
- https://www.oracle.com/technical-resources/articles/java/jms2messaging.html
Oracle AQ currently provides the JMS 1.1 API, but it also provides most of the 2.0 messaging features with the 1.1 version of the API. Let’s take a look at some examples.
Multiple consumers
Oracle AQ allows you to define a queue with either a single consumer, or with multiple consumers.
Note: Creating queues is in the administrative API, which is provided in PL/SQL and Java. The operational API supports additional languages such as C, Python, and JavaScript.
Here’s an example of creating a queue with multiple consumers using the PL/SQL API. To create a single-consumer queue, all that is needed is to omit the “multiple_consumers” argument, or set it to false:
begin
dbms_aqadm.create_queue_table(
queue_table => ‘my_qt’,
queue_payload_type => ‘SYS.AQ$_JMS_TEXT_MESAGE’,
multiple_consumers => true);
dbms_aqadm.create_queue(
queue_name => ‘my_q’,
queue_table => ‘my_qt’);
dbms_aqadm.start_queue(
queue_name => ‘my_q’);
end
Here’s an example of creating a multiple consumer queue using the Java API. Again, all that is needed to create a single-consumer queue is to omit the setMultiConsumer(true) – or set it to false:
package com.examples.queues;
import java.sql.SQLException;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import oracle.AQ.AQException;
import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsDestinationProperty;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.ucp.jdbc.PoolDataSource;
import oracle.ucp.jdbc.PoolDataSourceFactory;
public class CreateTopic {
private static String username = "pdbadmin";
private static String url = "jdbc:oracle:thin:@//localhost:1521/pdb1";
public static void main(String[] args) throws AQException, SQLException, JMSException {
// create a topic session
PoolDataSource ds = PoolDataSourceFactory.getPoolDataSource();
ds.setConnectionFactoryClassName("oracle.jdbc.pool.OracleDataSource");
ds.setURL(url);
ds.setUser(username);
ds.setPassword(System.getenv("DB_PASSWORD"));
TopicConnectionFactory tcf = AQjmsFactory.getTopicConnectionFactory(ds);
TopicConnection conn = tcf.createTopicConnection();
conn.start();
TopicSession session = (AQjmsSession) conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
// create properties
AQQueueTableProperty props = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESAGE");
props.setMultiConsumer(true);
props.setPayloadType("SYS.AQ$_JMS_TEXT_MESSAGE");
// create queue table, topic and start it
AQQueueTable qtable = ((AQjmsSession) session).createQueueTable(username, "my_qt", props);
Topic topic = ((AQjmsSession) session).createTopic(qtable, "my_q", new AQjmsDestinationProperty());
((AQjmsDestination) topic).start(session, true, true);
}
}
Whichever API you use to create the multiple-consumer queue, you can then create consumers and subscriptions as needed to consume messages off the queue.
Delayed Delivery
AQ also supports delayed delivery. You can enqueue a message with delayed delivery using any of the available language APIs, here’s just a few examples. First, in PL/SQL, we set the “delay” property in the “message_properties”:
DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message SYS.AQ$_JMS_TEXT_MESSAGE;
BEGIN
message := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
message.set_text('hello possums!');
message_properties.recipient_list(0) := sys.aq$_agent('edna', null, null);
message_properties.delay := 30; -- 30 sec
DBMS_AQ.ENQUEUE(
queue_name => 'my_q',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
COMMIT;
END;
And here is a snippet of Java code where we set the delay using the “setDeliveryDelay” message on the QueueSender:
// publish message
Queue queue = ((AQjmsSession) session).getQueue(username, queueName);
AQjmsQueueSender sender = (AQjmsQueueSender) session.createSender(queue);
sender.setDeliveryDelay(30_000); // 30 sec
AQjmsTextMessage message = (AQjmsTextMessage) session.createTextMessage("hello from java");
sender.send(queue, message);
session.commit();
Here’s an example in Python, we specify the delay in the “msgproperties”:
import cx_Oracle
from os import environ as env
# make sure that you set the environment variable DB_PASSWORD before running this
connection = cx_Oracle.connect(dsn='pdb1',user='pdbadmin',password=env.get('DB_PASSWORD'))
# get the JMS type
jmsType = connection.gettype("SYS.AQ$_JMS_TEXT_MESSAGE")
headerType = connection.gettype("SYS.AQ$_JMS_HEADER")
userPropType = connection.gettype("SYS.AQ$_JMS_USERPROPARRAY")
queue = connection.queue("my_q", jmsType)
text = jmsType.newobject()
text.HEADER = headerType.newobject()
text.TEXT_VC = "hello from python"
text.TEXT_LEN = len(text.TEXT_VC)
text.HEADER.TYPE = "MyHeader"
text.HEADER.PROPERTIES = userPropType.newobject()
queue.enqOne(connection.msgproperties(payload=text, delay=30))
connection.commit()
Sending messages asynchronously
AQ does not currently support sending messages asynchronously, but it does allow you to receive asynchronously. This is done using the notification feature. Here’s an example in PL/SQL that shows how to configure AQ to send a notification when any message of interest is enqueued. The notification can be an email message, or a HTTP request, or you can invoke a PL/SQL procedure, as shown in this example:
-- this is a stored proc that will reeieve message notifications
-- note that a procedure must have this signature to receive notifications
create or replace procedure receiver (
context in raw,
reginfo in sys.aq$_reg_info,
descr in sys.aq$_descriptor,
payload in varchar2,
payloadl in number
) as
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw ( 16 ) ;
message sys.aq$_jms_text_message;
no_messages exception;
pragma exception_init ( no_messages, -25228 ) ;
begin
dequeue_options.msgid := descr.msg_id;
dequeue_options.consumer_name := descr.consumer_name;
dequeue_options.navigation := dbms_aq.first_message;
loop
dbms_aq.dequeue (
queue_name => 'my_q',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
) ;
insert into some_table values ( message.text_vc, sysdate );
commit;
end loop;
exception
when no_messages then
dbms_output.put_line ( 'No more messages for processing' ) ;
commit;
end;
/
-- add a subscriber
begin
dbms_aqadm.add_subscriber(
queue_name => 'my_q',
subscriber => sys.aq$_agent('SUB1', 'my_q', null));
end;
/
-- add a notification
BEGIN
DBMS_AQ.REGISTER(
sys.aq$_reg_info_list(
sys.aq$_reg_info(
'my_q:sub1',
DBMS_AQ.NAMESPACE_AQ,
'plsql://receiver',
HEXTORAW('FF')
)
), 1);
COMMIT;
END;
/
This is a particularly useful feature because it allows us to effectively “scale to zero,” we do not need to have a procedure running all the time polling for messages. When a message arrives, the procedure will be notified, and it will consume and process any messages that are available at that time.
Delivery Count
Obtaining the delivery count for a message is interesting because it allows us to work out if there is some problem in the application – perhaps we are producing more messages than a consumer can handle, perhaps the receiver is unable to process the message for some reason. Redelivery can waste resources and hold up subsequent “good” messages from being processed – so this is an interesting statistic to monitor.
Getting the redelivery count is as simple as calling the “getAttempts()” method on a JMSMessage:
AQjmsQueueBrowser browser = (AQjmsQueueBrowser) session.createBrowser(queue);
while (browser.hasMoreElements()) {
AQjmsTextMessage message = (AQjmsTextMessage) browser.nextElement();
System.out.println("Message text: " + message.getText());
System.out.println("Delivery count: " + message.getAttempts());
}
}
Summary
Wow! So we have seen that most of the new messaging features that were introduced with the JMS 2.0 API are already supported by Oracle AQ! Of course, we are always adding new features and support for new APIs and frameworks – so stay tuned to hear about more exciting new capabilities soon!
