Using Oracle Advanced Queues to monitor when a database table column is updated

Learn using oracle advanced queues to monitor when a database table column is updated with practical examples, diagrams, and best practices. Covers oracle-database, plsql, oracle12c development tec...

Real-time Monitoring of Database Column Updates with Oracle Advanced Queues

Real-time Monitoring of Database Column Updates with Oracle Advanced Queues

Learn how to leverage Oracle Advanced Queues (AQ) and database triggers to achieve real-time notifications when a specific column in a database table is updated, enabling powerful event-driven architectures.

Monitoring changes to database tables is a common requirement in many applications. While simple polling can detect changes, it's inefficient and introduces latency. This article explores a robust, event-driven approach using Oracle Advanced Queues (AQ) in conjunction with database triggers to get immediate notifications when a particular column in a table is updated. This method is ideal for integrating with external systems, triggering business processes, or providing real-time data synchronization without constant database queries.

Understanding the Core Components

Before diving into the implementation, let's briefly review the key Oracle features we'll be using:

  1. Database Triggers: PL/SQL blocks associated with a table that execute automatically when a DML (Data Manipulation Language) event (INSERT, UPDATE, DELETE) occurs on that table.
  2. Oracle Advanced Queuing (AQ): A robust message queuing infrastructure built directly into the Oracle database. It allows applications to asynchronously send, receive, and process messages. AQ supports various messaging models, including point-to-point and publish-subscribe.

By combining these, a trigger will detect a column update and then enqueue a message into an AQ queue. An external application or another database process can then dequeue this message to react to the change.

An architecture diagram illustrating how Oracle Advanced Queues monitor table column updates. The diagram shows a 'Client Application' performing an 'UPDATE' on a 'Database Table'. This action triggers a 'Database Trigger' which then 'Enqueues Message' into an 'Oracle Advanced Queue'. An 'AQ Consumer' then 'Dequeues Message' and performs 'Real-time Action'. Arrows show the flow from client to table, table to trigger, trigger to AQ, and AQ to consumer. Use blue boxes for components, green arrows for data flow.

Architecture for Real-time Column Update Monitoring with AQ

Setting Up Oracle Advanced Queues

First, we need to set up the Advanced Queue infrastructure. This involves creating a queue table and a queue. We'll use a simple payload type for demonstration, but you could define a more complex object type to pass detailed information about the update (e.g., old value, new value, row ID, timestamp).

-- Grant necessary privileges to the user
GRANT EXECUTE ON DBMS_AQADM TO your_user;
GRANT EXECUTE ON DBMS_AQ TO your_user;
GRANT AQ_ADMINISTRATOR_ROLE TO your_user;
GRANT CREATE ANY TYPE TO your_user;

-- Connect as your_user
CONNECT your_user/your_password;

-- 1. Create a message payload type (optional, but good practice)
CREATE OR REPLACE TYPE update_message_typ AS OBJECT (
    table_name VARCHAR2(30),
    column_name VARCHAR2(30),
    row_id UROWID,
    new_value VARCHAR2(4000),
    old_value VARCHAR2(4000),
    update_timestamp TIMESTAMP
);
/

-- 2. Create the queue table
BEGIN
    DBMS_AQADM.CREATE_QUEUE_TABLE (
        queue_table          => 'update_event_qtab',
        queue_payload_type   => 'update_message_typ',
        multiple_consumers   => TRUE -- Allow multiple consumers if needed
    );
END;
/

-- 3. Create the queue
BEGIN
    DBMS_AQADM.CREATE_QUEUE (
        queue_name           => 'column_update_q',
        queue_table          => 'update_event_qtab'
    );
END;
/

-- 4. Start the queue
BEGIN
    DBMS_AQADM.START_QUEUE (
        queue_name           => 'column_update_q'
    );
END;
/

SQL script to set up the AQ queue table and queue.

Creating the Trigger to Enqueue Messages

Next, we'll create a database table and a trigger on that table. The trigger will fire after an update on a specific column and enqueue a message into our column_update_q. For this example, let's assume we're monitoring updates to the status column in an orders table.

-- Create a sample table
CREATE TABLE orders (
    order_id NUMBER PRIMARY KEY,
    customer_id NUMBER,
    order_date DATE,
    status VARCHAR2(20),
    total_amount NUMBER(10,2)
);

-- Insert some initial data
INSERT INTO orders (order_id, customer_id, order_date, status, total_amount) VALUES (1, 101, SYSDATE, 'PENDING', 150.00);
INSERT INTO orders (order_id, customer_id, order_date, status, total_amount) VALUES (2, 102, SYSDATE, 'PENDING', 200.00);
COMMIT;

-- Create the trigger
CREATE OR REPLACE TRIGGER trg_orders_status_update
AFTER UPDATE OF status ON orders
FOR EACH ROW
DECLARE
    l_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
    l_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
    l_message_handle     RAW(16);
    l_message            update_message_typ;
BEGIN
    -- Check if the status column was actually changed
    IF :OLD.status <> :NEW.status THEN
        l_message := update_message_typ(
            table_name        => 'ORDERS',
            column_name       => 'STATUS',
            row_id            => :OLD.ROWID,
            new_value         => :NEW.status,
            old_value         => :OLD.status,
            update_timestamp  => SYSTIMESTAMP
        );

        DBMS_AQ.ENQUEUE (
            queue_name         => 'column_update_q',
            enqueue_options    => l_enqueue_options,
            message_properties => l_message_properties,
            payload            => l_message,
            msgid              => l_message_handle
        );
    END IF;
EXCEPTION
    WHEN OTHERS THEN
        -- Log the error appropriately in a real application
        DBMS_OUTPUT.PUT_LINE('Error enqueuing message: ' || SQLERRM);
END;
/

SQL script to create the orders table and a trigger to monitor status column updates.

Consuming Messages from the Queue

Finally, an application or another PL/SQL procedure needs to dequeue messages from column_update_q to process the updates. This can be done synchronously (waiting for messages) or asynchronously (using AQ notification mechanisms like OCI callbacks or Java Message Service - JMS). Here, we'll show a simple synchronous dequeue example.

-- PL/SQL Block to Dequeue Messages
SET SERVEROUTPUT ON;

DECLARE
    l_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
    l_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
    l_message_handle     RAW(16);
    l_message            update_message_typ;
BEGIN
    l_dequeue_options.wait := DBMS_AQ.FOREVER; -- Wait indefinitely for messages
    l_dequeue_options.consumer_name := NULL; -- For single consumer, or specify a consumer for multi-consumer queues
    l_dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE; -- Start from the first message, then NEXT_MESSAGE

    LOOP
        BEGIN
            DBMS_AQ.DEQUEUE (
                queue_name         => 'column_update_q',
                dequeue_options    => l_dequeue_options,
                message_properties => l_message_properties,
                payload            => l_message,
                msgid              => l_message_handle
            );

            DBMS_OUTPUT.PUT_LINE('--- Message Dequeued ---');
            DBMS_OUTPUT.PUT_LINE('Table: ' || l_message.table_name);
            DBMS_OUTPUT.PUT_LINE('Column: ' || l_message.column_name);
            DBMS_OUTPUT.PUT_LINE('Row ID: ' || ROWIDTOCHAR(l_message.row_id));
            DBMS_OUTPUT.PUT_LINE('Old Value: ' || l_message.old_value);
            DBMS_OUTPUT.PUT_LINE('New Value: ' || l_message.new_value);
            DBMS_OUTPUT.PUT_LINE('Timestamp: ' || TO_CHAR(l_message.update_timestamp, 'YYYY-MM-DD HH24:MI:SS.FF'));

            -- Commit the transaction to remove the message from the queue
            COMMIT;

            l_dequeue_options.navigation := DBMS_AQ.NEXT_MESSAGE; -- Get the next message

        EXCEPTION
            WHEN DBMS_AQ.TIMEOUT THEN
                DBMS_OUTPUT.PUT_LINE('No more messages in the queue. Exiting.');
                EXIT;
            WHEN OTHERS THEN
                DBMS_OUTPUT.PUT_LINE('Error dequeuing message: ' || SQLERRM);
                ROLLBACK; -- Rollback to keep the message in the queue if processing fails
                EXIT; -- Or implement retry logic
        END;
    END LOOP;
END;
/

PL/SQL block to synchronously dequeue and process messages from the AQ queue.

Testing the Solution

With the setup complete, you can now test the entire flow. Run the consumer block in one SQL*Plus or SQL Developer session. Then, in another session, update the status column of the orders table. You should see the message immediately appear in the consumer session.

1. Step 1

Open two separate SQL*Plus or SQL Developer sessions connected as your_user.

2. Step 2

In Session 1, run the aq_consumer.sql script. It will wait for messages.

3. Step 3

In Session 2, execute an UPDATE statement on the orders table, changing the status column: UPDATE orders SET status = 'SHIPPED' WHERE order_id = 1;

4. Step 4

Observe Session 1. You should see the dequeued message details printed to the console, reflecting the update you just made.

5. Step 5

Execute COMMIT; in Session 2 to make the update permanent and allow the trigger to enqueue the message.

Cleanup (Optional)

To remove the created AQ objects and table, you can use the following commands.

-- Stop and drop the queue
BEGIN
    DBMS_AQADM.STOP_QUEUE (queue_name => 'column_update_q');
    DBMS_AQADM.DROP_QUEUE (queue_name => 'column_update_q');
END;
/

-- Drop the queue table
BEGIN
    DBMS_AQADM.DROP_QUEUE_TABLE (queue_table => 'update_event_qtab');
END;
/

-- Drop the message type
DROP TYPE update_message_typ;
/

-- Drop the table and trigger
DROP TRIGGER trg_orders_status_update;
DROP TABLE orders;

SQL script to clean up all created objects.