Using Oracle Advanced Queues to monitor when a database table column is updated
Categories:
Real-time Monitoring of Database Column Updates with Oracle Advanced Queues
Learn how to leverage Oracle Advanced Queues (AQ) and database triggers to achieve real-time notifications when a specific column in a database table is updated, enabling powerful event-driven architectures.
Monitoring changes to database tables is a common requirement in many applications. While simple polling can detect changes, it's inefficient and introduces latency. This article explores a robust, event-driven approach using Oracle Advanced Queues (AQ) in conjunction with database triggers to get immediate notifications when a particular column in a table is updated. This method is ideal for integrating with external systems, triggering business processes, or providing real-time data synchronization without constant database queries.
Understanding the Core Components
Before diving into the implementation, let's briefly review the key Oracle features we'll be using:
- Database Triggers: PL/SQL blocks associated with a table that execute automatically when a DML (Data Manipulation Language) event (INSERT, UPDATE, DELETE) occurs on that table.
- Oracle Advanced Queuing (AQ): A robust message queuing infrastructure built directly into the Oracle database. It allows applications to asynchronously send, receive, and process messages. AQ supports various messaging models, including point-to-point and publish-subscribe.
By combining these, a trigger will detect a column update and then enqueue a message into an AQ queue. An external application or another database process can then dequeue this message to react to the change.
Architecture for Real-time Column Update Monitoring with AQ
Setting Up Oracle Advanced Queues
First, we need to set up the Advanced Queue infrastructure. This involves creating a queue table and a queue. We'll use a simple payload type for demonstration, but you could define a more complex object type to pass detailed information about the update (e.g., old value, new value, row ID, timestamp).
-- Grant necessary privileges to the user
GRANT EXECUTE ON DBMS_AQADM TO your_user;
GRANT EXECUTE ON DBMS_AQ TO your_user;
GRANT AQ_ADMINISTRATOR_ROLE TO your_user;
GRANT CREATE ANY TYPE TO your_user;
-- Connect as your_user
CONNECT your_user/your_password;
-- 1. Create a message payload type (optional, but good practice)
CREATE OR REPLACE TYPE update_message_typ AS OBJECT (
table_name VARCHAR2(30),
column_name VARCHAR2(30),
row_id UROWID,
new_value VARCHAR2(4000),
old_value VARCHAR2(4000),
update_timestamp TIMESTAMP
);
/
-- 2. Create the queue table
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'update_event_qtab',
queue_payload_type => 'update_message_typ',
multiple_consumers => TRUE -- Allow multiple consumers if needed
);
END;
/
-- 3. Create the queue
BEGIN
DBMS_AQADM.CREATE_QUEUE (
queue_name => 'column_update_q',
queue_table => 'update_event_qtab'
);
END;
/
-- 4. Start the queue
BEGIN
DBMS_AQADM.START_QUEUE (
queue_name => 'column_update_q'
);
END;
/
SQL script to set up the AQ queue table and queue.
Creating the Trigger to Enqueue Messages
Next, we'll create a database table and a trigger on that table. The trigger will fire after an update on a specific column and enqueue a message into our column_update_q
. For this example, let's assume we're monitoring updates to the status
column in an orders
table.
-- Create a sample table
CREATE TABLE orders (
order_id NUMBER PRIMARY KEY,
customer_id NUMBER,
order_date DATE,
status VARCHAR2(20),
total_amount NUMBER(10,2)
);
-- Insert some initial data
INSERT INTO orders (order_id, customer_id, order_date, status, total_amount) VALUES (1, 101, SYSDATE, 'PENDING', 150.00);
INSERT INTO orders (order_id, customer_id, order_date, status, total_amount) VALUES (2, 102, SYSDATE, 'PENDING', 200.00);
COMMIT;
-- Create the trigger
CREATE OR REPLACE TRIGGER trg_orders_status_update
AFTER UPDATE OF status ON orders
FOR EACH ROW
DECLARE
l_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
l_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
l_message_handle RAW(16);
l_message update_message_typ;
BEGIN
-- Check if the status column was actually changed
IF :OLD.status <> :NEW.status THEN
l_message := update_message_typ(
table_name => 'ORDERS',
column_name => 'STATUS',
row_id => :OLD.ROWID,
new_value => :NEW.status,
old_value => :OLD.status,
update_timestamp => SYSTIMESTAMP
);
DBMS_AQ.ENQUEUE (
queue_name => 'column_update_q',
enqueue_options => l_enqueue_options,
message_properties => l_message_properties,
payload => l_message,
msgid => l_message_handle
);
END IF;
EXCEPTION
WHEN OTHERS THEN
-- Log the error appropriately in a real application
DBMS_OUTPUT.PUT_LINE('Error enqueuing message: ' || SQLERRM);
END;
/
SQL script to create the orders
table and a trigger to monitor status
column updates.
Consuming Messages from the Queue
Finally, an application or another PL/SQL procedure needs to dequeue messages from column_update_q
to process the updates. This can be done synchronously (waiting for messages) or asynchronously (using AQ notification mechanisms like OCI callbacks or Java Message Service - JMS). Here, we'll show a simple synchronous dequeue example.
-- PL/SQL Block to Dequeue Messages
SET SERVEROUTPUT ON;
DECLARE
l_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
l_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
l_message_handle RAW(16);
l_message update_message_typ;
BEGIN
l_dequeue_options.wait := DBMS_AQ.FOREVER; -- Wait indefinitely for messages
l_dequeue_options.consumer_name := NULL; -- For single consumer, or specify a consumer for multi-consumer queues
l_dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE; -- Start from the first message, then NEXT_MESSAGE
LOOP
BEGIN
DBMS_AQ.DEQUEUE (
queue_name => 'column_update_q',
dequeue_options => l_dequeue_options,
message_properties => l_message_properties,
payload => l_message,
msgid => l_message_handle
);
DBMS_OUTPUT.PUT_LINE('--- Message Dequeued ---');
DBMS_OUTPUT.PUT_LINE('Table: ' || l_message.table_name);
DBMS_OUTPUT.PUT_LINE('Column: ' || l_message.column_name);
DBMS_OUTPUT.PUT_LINE('Row ID: ' || ROWIDTOCHAR(l_message.row_id));
DBMS_OUTPUT.PUT_LINE('Old Value: ' || l_message.old_value);
DBMS_OUTPUT.PUT_LINE('New Value: ' || l_message.new_value);
DBMS_OUTPUT.PUT_LINE('Timestamp: ' || TO_CHAR(l_message.update_timestamp, 'YYYY-MM-DD HH24:MI:SS.FF'));
-- Commit the transaction to remove the message from the queue
COMMIT;
l_dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE; -- Get the next message
EXCEPTION
WHEN DBMS_AQ.TIMEOUT THEN
DBMS_OUTPUT.PUT_LINE('No more messages in the queue. Exiting.');
EXIT;
WHEN OTHERS THEN
DBMS_OUTPUT.PUT_LINE('Error dequeuing message: ' || SQLERRM);
ROLLBACK; -- Rollback to keep the message in the queue if processing fails
EXIT; -- Or implement retry logic
END;
END LOOP;
END;
/
PL/SQL block to synchronously dequeue and process messages from the AQ queue.
Testing the Solution
With the setup complete, you can now test the entire flow. Run the consumer block in one SQL*Plus or SQL Developer session. Then, in another session, update the status
column of the orders
table. You should see the message immediately appear in the consumer session.
1. Step 1
Open two separate SQL*Plus or SQL Developer sessions connected as your_user
.
2. Step 2
In Session 1, run the aq_consumer.sql
script. It will wait for messages.
3. Step 3
In Session 2, execute an UPDATE
statement on the orders
table, changing the status
column: UPDATE orders SET status = 'SHIPPED' WHERE order_id = 1;
4. Step 4
Observe Session 1. You should see the dequeued message details printed to the console, reflecting the update you just made.
5. Step 5
Execute COMMIT;
in Session 2 to make the update permanent and allow the trigger to enqueue the message.
Cleanup (Optional)
To remove the created AQ objects and table, you can use the following commands.
-- Stop and drop the queue
BEGIN
DBMS_AQADM.STOP_QUEUE (queue_name => 'column_update_q');
DBMS_AQADM.DROP_QUEUE (queue_name => 'column_update_q');
END;
/
-- Drop the queue table
BEGIN
DBMS_AQADM.DROP_QUEUE_TABLE (queue_table => 'update_event_qtab');
END;
/
-- Drop the message type
DROP TYPE update_message_typ;
/
-- Drop the table and trigger
DROP TRIGGER trg_orders_status_update;
DROP TABLE orders;
SQL script to clean up all created objects.