Offline Processing in PHP with Advanced Queuing
By cj on May 16, 2013
Offloading slow batch tasks to an external process is a common method of improving website responsiveness. One great way to initiate such background tasks in PHP is to use Oracle Streams Advanced Queuing in a producer-consumer message passing fashion. Oracle AQ is highly configurable. Messages can queued by multiple producers. Different consumers can filter messages. From PHP, the PL/SQL interface to AQ is used. There are also Java, C and HTTPS interfaces, allowing wide architectural freedom. Oracle Advanced Queuing is included in all editions of the database.
The following example simulates an application user registration system where the PHP application queues each new user's street address. An external system monitoring the queue can then fetch and process that address. In real life the external system might initiate a snail-mail welcome letter, or do further, slower automated validation on the address.
The following SQL*Plus script qcreate.sql creates a new Oracle user demoqueue with permission to create and use queues. A payload type for the address is created and a queue is set up for this payload.
-- qcreate.sql connect / as sysdba drop user demoqueue cascade; create user demoqueue identified by welcome; grant connect, resource to demoqueue; grant aq_administrator_role, aq_user_role to demoqueue; grant execute on dbms_aq to demoqueue; grant create type to demoqueue; connect demoqueue/welcome@localhost/orcl -- The data we want to queue create or replace type user_address_type as object ( name varchar2(10), address varchar2(50) ); / -- Create and start the queue begin dbms_aqadm.create_queue_table( queue_table => 'demoqueue.addr_queue_tab', queue_payload_type => 'demoqueue.user_address_type'); end; / begin dbms_aqadm.create_queue( queue_name => 'demoqueue.addr_queue', queue_table => 'demoqueue.addr_queue_tab'); end; / begin dbms_aqadm.start_queue( queue_name => 'demoqueue.addr_queue', enqueue => true); end; /
The script qhelper.sql creates two useful helper functions to enqueue and dequeue messages:
-- qhelper.sql -- Helpful address enqueue/dequeue procedures connect demoqueue/welcome@localhost/orcl -- Put an address in the queue create or replace procedure my_enq(name_p in varchar2, address_p in varchar2) as user_address user_address_type; enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; enq_id raw(16); begin user_address := user_address_type(name_p, address_p); dbms_aq.enqueue(queue_name => 'demoqueue.addr_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => user_address, msgid => enq_id); commit; end; / show errors -- Get an address from the queue create or replace procedure my_deq(name_p out varchar2, address_p out varchar2) as dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; user_address user_address_type; enq_id raw(16); begin dbms_aq.dequeue(queue_name => 'demoqueue.addr_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => user_address, msgid => enq_id); name_p := user_address.name; address_p := user_address.address; commit; end; / show errors
The script newuser.php is the part of the PHP application that handles site registration for a new user. It queues a message containing their address and continues executing:
<?php // newuser.php $c = oci_connect("demoqueue", "welcome", "localhost/orcl"); // The new user details $username = 'Fred'; $address = '500 Oracle Parkway'; // Enqueue the address for later offline handling $s = oci_parse($c, "begin my_enq(:username, :address); end;"); oci_bind_by_name($s, ":username", $username, 10); oci_bind_by_name($s, ":address", $address, 50); $r = oci_execute($s); // Continue executing echo "Welcome $username\n"; ?>
It executes an anonymous PL/SQL block to create and enqueue the address message. The immediate script output is simply the echoed welcome message:
Once this PHP script is executed, any application can dequeue the new message at its leisure. For example, the following SQL*Plus commands call the helper my_deq() dequeue function and displays the user details:
-- getuser.sql connect demoqueue/welcome@localhost/orcl set serveroutput on declare name varchar2(10); address varchar2(50); begin my_deq(name, address); dbms_output.put_line('Name : ' || name); dbms_output.put_line('Address : ' || address); end; /
The output is:
Name : Fred Address : 500 Oracle Parkway
If you instead want to check the queue from PHP, use getuser.php:
<?php // getuser.php $c = oci_connect("demoqueue", "welcome", "localhost/orcl"); // dequeue the message $sql = "begin my_deq(:username, :address); end;"; $s = oci_parse($c, $sql); oci_bind_by_name($s, ":username", $username, 10); oci_bind_by_name($s, ":address", $address, 50); $r = oci_execute($s); echo "Name : $username\n"; echo "Address : $address\n"; ?>
If the dequeue operation is called without anything in the queue, it will block waiting for a message until the queue wait time expires. This is configurable by setting a zero wait time dequeue_options.wait := 0; before calling dbms_aq.dequeue.
The PL/SQL API has much more functionality than shown in this overview. For example you can enqueue an array of messages, or listen to more than one queue. Queuing is highly configurable and scalable, providing a great way to distribute workload for web or mobile applications. More information about AQ is in the Oracle Streams Advanced Queuing User's Guide.
Bootnote: The basis for this blog post comes from the Underground PHP and Oracle Manual
This post was updated to show setting a zero wait time.