Sample Peer-to-Peer Oracle AQJMS program to send & receive Text Messages between two Clients.
Ensure you enabled JMS option while installing 11g Database.
JMSProvider is the JMS Demon Services running in Oracle 11g Database.
1. JMS Destination is the Queue created with Oracle11g database using below scripts.
Execute below 3 scripts in your database.
rem ==========================================
rem Create a queue table R_RYSAMPLE_QT
rem ==========================================
DECLARE
BEGIN
dbms_output.put_line ('Creating Queue Table R_RYSAMPLE_QT...');
dbms_aqadm.CREATE_queue_table(
queue_table => 'R_RYSAMPLE_QT',
queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
sort_list=> 'ENQ_TIME,PRIORITY',
multiple_consumers => FALSE,
compatible => '8.1.0',
message_grouping=> DBMS_AQADM.NONE,
primary_instance=> '0',
secondary_instance=> '0',
comment => 'Creating Queue Table R_RYSAMPLE_QT');
dbms_output.put_line ('Created Queue Table R_RYSAMPLE_QT.');
exception
when others then
catch_error(SQLCODE, 'Create Queue Table ' || substr(SQLERRM, 1, 256));
END;
/
rem ==========================================
rem Create a queue R_RYSAMPLE_Q
rem ==========================================
DECLARE
BEGIN
dbms_output.put_line ('Creating Queue R_RYSAMPLE_Q...');
dbms_aqadm.CREATE_queue(
queue_name => 'R_RYSAMPLE_Q',
queue_table => 'R_RYSAMPLE_QT',
queue_type=> DBMS_AQADM.NORMAL_QUEUE,
max_retries=> '5',
retry_delay=> '0',
retention_time=> '0',
comment => 'Resource Registration Queue');
dbms_output.put_line ('Created Queue R_RYSAMPLE_Q.');
exception
when others then
catch_error(SQLCODE, 'Create Queue ' || substr(SQLERRM, 1, 256));
END;
/
rem ====================================
rem Start input queue R_RYSAMPLE_Q
rem ====================================
DECLARE
BEGIN
dbms_output.put_line('starting queue R_RYSAMPLE_Q...');
dbms_aqadm.start_queue(
queue_name => 'R_RYSAMPLE_Q');
dbms_output.put_line ('Started Queue R_RYSAMPLE_Q.');
exception
when others then
catch_error(SQLCODE, 'Start Queue ' || substr(SQLERRM, 1, 256));
END;
/
2. JMS Client programs shown below.
JMS Client 1 :
package jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import oracle.jms.*;
public class jmsdemo {
// public QueueConnection qc=null;
public jmsdemo() {
}
public void QSendFunction () {
QueueConnection qc=null;
try {
QueueConnectionFactory qcf = AQjmsFactory.getQueueConnectionFactory("databaseHostName", "SID", port, "thin");
qc = qcf.createQueueConnection("userid", "passwd");
//Message delivery does not begin until you start the connection you created by calling the start method
qc.start();
//session is not transacted
QueueSession m_queueSess = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
// Destination is Queue created in database
Queue m_queue=((AQjmsSession)m_queueSess).getQueue("rocket", "R_RYSAMPLE_Q");
QueueSender m_sender=m_queueSess.createSender(m_queue);
Message mesg = m_queueSess.createTextMessage("Test Message");
m_sender.send(mesg);
}
catch (JMSException e) {
System.out.println(e.getMessage());
}
finally {
try{
qc.stop();
qc.close();
}catch (JMSException e1) {
}
}
}
public static void main(String[] args) {
jmsdemo jmsdemo = new jmsdemo();
jmsdemo.QSendFunction();
}
}
JMS Client 2 :
package jms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import oracle.jms.*;
public class jmsdemorecv {
// public QueueConnection qc=null;
public jmsdemorecv() {
}
public void QRecvFunction () {
QueueConnection qc=null;
try {
// QueueConnectionFactory is the preconfigured JMS Object set by administrator.
QueueConnectionFactory qcf = AQjmsFactory.getQueueConnectionFactory("databaseHostName", "SID", port, "thin");
// Does not created Destination, there is no look to JNDI that holds QF and Destination by JMS Client (this program)
// we are directly using JMS Client to make JMS Provider
// QueueConnection to JMS Provider ( database with JMS Services demon is the service provider )
// User can create one more connections to QF
qc = qcf.createQueueConnection("userid", "passwd");
// Start the Connection
//Message delivery does not begin until you start the connection you created by calling the start method
qc.start();
//A session is a single-threaded context for producing and consuming messages
//QueueSession m_queueSess = qc.createQueueSession(true, 0);
QueueSession m_queueSess = qc.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue m_queue=((AQjmsSession)m_queueSess).getQueue("rocket", "R_RYSAMPLE_Q");
//Once you have created a message consumer, it becomes active, and you can use it to receive messages
//a session and is used for receiving messages sent to a destination.
//A message consumer allows a JMS client to register interest in a destination with a JMS provider.
//The JMS provider manages the delivery of messages
QueueReceiver m_receiver=m_queueSess.createReceiver(m_queue);
// Synchronous messaging
//A receiver explicitly fetches the message from the destination by calling the receive method.
// The receive method can block until a message arrives or can time out if a message does not arrive within a specified time limit
//if you do not want your program to consume system resources unnecessarily, use a timed synchronous receive ex:- receive(1000) timeout 1000ms
Message mesg = m_receiver.receive(5000);
if(mesg != null)
System.out.println(((AQjmsTextMessage)mesg).getText());
}
catch (JMSException e) {
System.out.println(e.getMessage());
}
finally {
try{
qc.stop();
qc.close();
}catch (JMSException e1) {
}
}
}
public static void main(String[] args) {
jmsdemorecv jmsdemo = new jmsdemorecv();
jmsdemo.QRecvFunction();
}
}