Oracle® Streams Concepts and Administration 11g Release 1 (11.1) Part Number B28321-01 |
|
|
View PDF |
Oracle Streams enables messaging with queues, propagations, and enqueue and dequeue capabilities. The queues can be of ANYDATA queues or typed queues. ANYDATA
queues stage user messages whose payloads are of ANYDATA
type, and an ANYDATA
payload can be a wrapper for payloads of different data types. Typed queues can stage messages of a specific type.
The following topics describe configuring Oracle Streams messaging environments:
Each task described in this chapter should be completed by an Oracle Streams administrator that has been granted the appropriate privileges, unless specified otherwise. The examples in this chapter assume that you have configured an Oracle Streams administrator at each database.
See Also:
Oracle Database 2 Day + Data Replication and Integration Guide for examples that configure Oracle Streams messaging environments
Oracle Streams Advanced Queuing User's Guide for more information about Oracle Streams Advanced Queuing (AQ) and for information about using typed queues
Oracle Database PL/SQL Packages and Types Reference for more information about the ANYDATA
type
You can wrap almost any type of payload in an ANYDATA
payload. The following sections provide examples of enqueuing messages into, and dequeuing messages from, an ANYDATA
queue.
The following steps illustrate how to wrap payloads of various types in an ANYDATA
payload.
Connect as an administrative user who can create users, grant privileges, create tablespaces, and alter users at the dbs1.net
database.
Grant EXECUTE
privilege on the DBMS_AQ
package to the oe
user so that this user can run the ENQUEUE
and DEQUEUE
procedures in that package:
GRANT EXECUTE ON DBMS_AQ TO oe;
Connect as the Oracle Streams administrator, as in the following example:
CONNECT strmadmin/user-password@dbs1.net
Create an ANYDATA
queue if one does not already exist.
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'oe_q_table_any', queue_name => 'oe_q_any', queue_user => 'oe'); END; /
The oe
user is configured automatically as a secure queue user of the oe_q_any
queue and is given ENQUEUE
and DEQUEUE
privileges on the queue. In addition, an Oracle Streams AQ agent named oe
is configured and is associated with the oe
user. However, a message cannot be enqueued unless a subscriber who can dequeue the message is configured.
Add a subscriber for oe_q_any
queue. This subscriber will perform explicit dequeues of messages.
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT('OE', NULL, NULL); SYS.DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_q_any', subscriber => subscriber); END; /
Connect as the oe
user.
CONNECT oe/user-password@dbs1.net
Create a procedure that takes as an input parameter an object of ANYDATA
type and enqueues a message containing the payload into an existing ANYDATA
queue.
CREATE OR REPLACE PROCEDURE oe.enq_proc (payload ANYDATA) IS enqopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('OE', NULL, NULL); DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_q_any', enqueue_options => enqopt, message_properties => mprop, payload => payload, msgid => enq_msgid); END; /
Run the procedure you created in Step 7 by specifying the appropriate Convert
data_type
function. The following commands enqueue messages of various types.
VARCHAR2
type:
EXEC oe.enq_proc(ANYDATA.ConvertVarchar2('Chemicals - SW')); COMMIT;
NUMBER
type:
EXEC oe.enq_proc(ANYDATA.ConvertNumber('16')); COMMIT;
User-defined type:
BEGIN oe.enq_proc(ANYDATA.ConvertObject(oe.cust_address_typ( '1646 Brazil Blvd','361168','Chennai','Tam', 'IN'))); END; / COMMIT;
See Also:
"Viewing the Contents of Messages in a Persistent Queue" for information about viewing the contents of these enqueued messagesThe following steps illustrate how to dequeue a payload wrapped in an ANYDATA
payload. This example assumes that you have completed the steps in "Wrapping User Message Payloads in an ANYDATA Wrapper and Enqueuing Them".
To dequeue messages, you must know the consumer of the messages. To find the consumer for the messages in a queue, connect as the owner of the queue and query the AQ$
queue_table_name
, where queue_table_name
is the name of the queue table. For example, to find the consumers of the messages in the oe_q_any
queue, run the following query:
CONNECT strmadmin/user-password@dbs1.net
SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;
Connect as the oe
user:
CONNECT oe/user-password@dbs1.net
Create a procedure that takes as an input the consumer of the messages you want to dequeue. The following example procedure dequeues messages of oe.cust_address_typ
and prints the contents of the messages.
CREATE OR REPLACE PROCEDURE oe.get_cust_address ( consumer IN VARCHAR2) AS address OE.CUST_ADDRESS_TYP; deq_address ANYDATA; msgid RAW(16); deqopt DBMS_AQ.DEQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; new_addresses BOOLEAN := TRUE; next_trans EXCEPTION; no_messages EXCEPTION; pragma exception_init (next_trans, -25235); pragma exception_init (no_messages, -25228); num_var pls_integer; BEGIN deqopt.consumer_name := consumer; deqopt.wait := 1; WHILE (new_addresses) LOOP BEGIN DBMS_AQ.DEQUEUE( queue_name => 'strmadmin.oe_q_any', dequeue_options => deqopt, message_properties => mprop, payload => deq_address, msgid => msgid); deqopt.navigation := DBMS_AQ.NEXT; DBMS_OUTPUT.PUT_LINE('****'); IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName()); num_var := deq_address.GetObject(address); DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** '); DBMS_OUTPUT.PUT_LINE(address.street_address); DBMS_OUTPUT.PUT_LINE(address.postal_code); DBMS_OUTPUT.PUT_LINE(address.city); DBMS_OUTPUT.PUT_LINE(address.state_province); DBMS_OUTPUT.PUT_LINE(address.country_id); ELSE DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName()); END IF; COMMIT; EXCEPTION WHEN next_trans THEN deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION; WHEN no_messages THEN new_addresses := FALSE; DBMS_OUTPUT.PUT_LINE('No more messages'); END; END LOOP; END; /
Run the procedure you created in Step 1 and specify the consumer of the messages you want to dequeue, as in the following example:
SET SERVEROUTPUT ON SIZE 100000 EXEC oe.get_cust_address('OE');
This section contains instructions for configuring the following elements in a database:
An enqueue procedure that enqueues messages into an ANYDATA
queue at a database. In this example, the enqueue procedure uses a trigger to enqueue a message every time a row is inserted into the oe.orders
table.
A messaging client that can dequeue persistent LCRs and persistent user messages based on rules. In this example, the messaging client uses a rule so that it dequeues only messages that involve the oe.orders
table. The messaging client uses the DEQUEUE
procedure in the DBMS_STREAMS_MESSAGING
to dequeue one message at a time and display the order number for the order.
Message notification for the messaging client. In this example, a notification is sent to an e-mail address when a message is enqueued into the queue used by the messaging client. The message can be dequeued by the messaging client because the message satisfies the rule sets of the messaging client. You can also configure message notifications that alert applications when a message of interest is enqueued. These applications can dequeue and process the message in any customized way. Notification can also be sent to a specified HTTP post.
You can query the DBA_STREAMS_MESSAGE_CONSUMERS
data dictionary view for information about existing messaging clients and notifications.
Complete the following steps to configure a messaging client and message notification:
Connect as an administrative user who can grant privileges and execute subprograms in supplied packages.
Set the host name used to send the e-mail, the mail port, and the e-mail account that sends e-mail messages for e-mail notifications using the DBMS_AQELM
package. The following example sets the mail host name to smtp.fictional_company.com
, the mail port to 25
, and the e-mail account to Mary.Smith@fictional_company.com
:
BEGIN DBMS_AQELM.SET_MAILHOST('smtp.fictional_company.com') ; DBMS_AQELM.SET_MAILPORT(25) ; DBMS_AQELM.SET_SENDFROM('Mary.Smith@fictional_company.com'); END; /
If you are not sure about the correct mail host, mail port, and send from e-mail address, then ask the system administrator for the computer system that runs the database.
Grant the necessary privileges to the users who will create the messaging client, enqueue and dequeue messages, and specify message notifications. In this example, the oe
user performs all of these tasks.
GRANT EXECUTE ON DBMS_AQ TO oe; GRANT EXECUTE ON DBMS_STREAMS_ADM TO oe; GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO oe; BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, grantee => 'oe', grant_option => FALSE); END; / BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ, grantee => 'oe', grant_option => FALSE); END; / BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ, grantee => 'oe', grant_option => FALSE); END; /
Connect as the oe
user:
CONNECT oe/user-password
Create an ANYDATA
queue using SET_UP_QUEUE
, as in the following example:
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'oe.notification_queue_table', queue_name => 'oe.notification_queue'); END; /
Create the types for the user messages, as in the following example:
CREATE TYPE oe.user_msg AS OBJECT( object_name VARCHAR2(30), object_owner VARCHAR2(30), message VARCHAR2(50)); /
Create a trigger that enqueues a message into the queue whenever an order is inserted into the oe.orders
table, as in the following example:
CREATE OR REPLACE TRIGGER oe.order_insert AFTER INSERT ON oe.orders FOR EACH ROW DECLARE msg oe.user_msg; str VARCHAR2(2000); BEGIN str := 'New Order - ' || :NEW.ORDER_ID || ' Order ID'; msg := oe.user_msg( object_name => 'ORDERS', object_owner => 'OE', message => str); DBMS_STREAMS_MESSAGING.ENQUEUE ( queue_name => 'oe.notification_queue', payload => ANYDATA.CONVERTOBJECT(msg)); END; /
Create the messaging client that will dequeue messages from the queue and the rule used by the messaging client to determine which messages to dequeue, as in the following example:
BEGIN DBMS_STREAMS_ADM.ADD_MESSAGE_RULE ( message_type => 'oe.user_msg', rule_condition => ' :msg.OBJECT_OWNER = ''OE'' AND ' || ' :msg.OBJECT_NAME = ''ORDERS'' ', streams_type => 'dequeue', streams_name => 'oe', queue_name => 'oe.notification_queue'); END; /
Set the message notification to send e-mail upon enqueue of messages that can be dequeued by the messaging client, as in the following example:
BEGIN DBMS_STREAMS_ADM.SET_MESSAGE_NOTIFICATION ( streams_name => 'oe', notification_action => 'Mary.Smith@fictional_company.com', notification_type => 'MAIL', include_notification => TRUE, queue_name => 'oe.notification_queue'); END; /
Create a PL/SQL procedure that dequeues messages using the messaging client, as in the following example:
CREATE OR REPLACE PROCEDURE oe.deq_notification(consumer IN VARCHAR2) AS msg ANYDATA; user_msg oe.user_msg; num_var PLS_INTEGER; more_messages BOOLEAN := TRUE; navigation VARCHAR2(30); BEGIN navigation := 'FIRST MESSAGE'; WHILE (more_messages) LOOP BEGIN DBMS_STREAMS_MESSAGING.DEQUEUE( queue_name => 'oe.notification_queue', streams_name => consumer, payload => msg, navigation => navigation, wait => DBMS_STREAMS_MESSAGING.NO_WAIT); IF msg.GETTYPENAME() = 'OE.USER_MSG' THEN num_var := msg.GETOBJECT(user_msg); DBMS_OUTPUT.PUT_LINE(user_msg.object_name); DBMS_OUTPUT.PUT_LINE(user_msg.object_owner); DBMS_OUTPUT.PUT_LINE(user_msg.message); END IF; navigation := 'NEXT MESSAGE'; COMMIT; EXCEPTION WHEN SYS.DBMS_STREAMS_MESSAGING.ENDOFCURTRANS THEN navigation := 'NEXT TRANSACTION'; WHEN DBMS_STREAMS_MESSAGING.NOMOREMSGS THEN more_messages := FALSE; DBMS_OUTPUT.PUT_LINE('No more messages.'); WHEN OTHERS THEN RAISE; END; END LOOP; END; /
Insert rows into the oe.orders
table, as in the following example:
INSERT INTO oe.orders VALUES(2521, 'direct', 144, 0, 922.57, 159, NULL); INSERT INTO oe.orders VALUES(2522, 'direct', 116, 0, 1608.29, 153, NULL); COMMIT; INSERT INTO oe.orders VALUES(2523, 'direct', 116, 0, 227.55, 155, NULL); COMMIT;
Message notification sends a message to the e-mail address specified in Step 9 for each message that was enqueued. Each notification is an AQXmlNotification
, which includes of the following:
notification_options
, which includes the following:
destination
- The destination queue from which the message was dequeued
consumer_name
- The name of the messaging client that dequeued the message
message_set
- The set of message properties
The following example shows the AQXmlNotification
format sent in an e-mail notification:
<?xml version="1.0" encoding="UTF-8"?> <Envelope xmlns="http://ns.oracle.com/AQ/schemas/envelope"> <Body> <AQXmlNotification xmlns="http://ns.oracle.com/AQ/schemas/access"> <notification_options> <destination>OE.NOTIFICATION_QUEUE</destination> <consumer_name>OE</consumer_name> </notification_options> <message_set> <message> <message_header> <message_id>CB510DDB19454731E034080020AE3E0A</message_id> <expiration>-1</expiration> <delay>0</delay> <priority>1</priority> <delivery_count>0</delivery_count> <sender_id> <agent_name>OE</agent_name> <protocol>0</protocol> </sender_id> <message_state>0</message_state> </message_header> </message> </message_set> </AQXmlNotification> </Body> </Envelope>
You can dequeue the messages enqueued in this example by running the oe.deq_notification
procedure:
SET SERVEROUTPUT ON SIZE 100000 EXEC oe.deq_notification('OE');
Note:
TheDBMS_AQ
package can also configure notifications. The DBMS_AQ
package provides some notification features that are not available in DBMS_STREAMS_ADM
package, such as buffered message notifications and notification grouping by time.See Also:
Chapter 6, "How Rules Are Used in Oracle Streams" for more information about rule sets for Oracle Streams clients and for information about how messages satisfy rule sets
Oracle Database 2 Day + Data Replication and Integration Guide for an example that uses message notification to dequeue messages of interest automatically
Oracle Streams Advanced Queuing User's Guide and Oracle XML DB Developer's Guide for more information about message notifications and XML
Oracle Database PL/SQL Packages and Types Reference for information about the DBMS_AQ
package