Oracle9i Streams Release 2 (9.2) Part Number A96571-02 |
|
|
View PDF |
This chapter provides instructions for managing Streams queues, propagations, and messaging environments.
This chapter contains these topics:
Each task described in this section should be completed by a Streams administrator that has been granted the appropriate privileges, unless specified otherwise.
A Streams queue stages events whose payloads are of SYS.AnyData
type. Therefore, a Streams queue can stage an event with payload of nearly any type, if the payload is wrapped in a SYS.AnyData
wrapper. Each Streams capture process and apply process is associated with one Streams queue, and each Streams propagation is associated with one Streams source queue and one Streams destination queue.
This section provides instructions for completing the following tasks related to Streams queues:
You use the SET_UP_QUEUE
procedure in the DBMS_STREAMS_ADM
package to create a Streams queue. This procedure enables you to specify the following for the Streams queue it creates:
ENQUEUE
and DEQUEUE
privileges on the queueThis procedure creates a queue that is both a secure queue and a transactional queue and starts the newly created queue.
For example, to create a Streams queue named strm01_queue
with a queue table named strm01_queue_table
and grant the hr
user the privileges necessary to enqueue events into and dequeue events from the queue, run the following procedure:
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'strm01_queue_table', queue_name => 'strm01_queue', queue_user => 'hr'); END; /
You can also use procedures in the DBMS_AQADM
package to create a SYS.AnyData
queue.
See Also:
|
For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. If you use the SET_UP_QUEUE
procedure in the DBMS_STREAMS_ADM
package to create the secure queue, then the queue owner and the user specified by the queue_user
parameter are configured as secure users of the queue automatically. If you want to enable other users to perform operations on the queue, then you can configure these users in one of the following ways:
SET_UP_QUEUE
and specify a queue_user
. Queue creation is skipped if the queue already exists, but a new queue user is configured if one is specified.The following example illustrates associating a user with an agent manually. Suppose you want to enable the oe
user to perform queue operations on the strm01_queue
created in "Creating a Streams Queue". The following steps configure the oe
user as a secure queue user of strm01_queue
:
EXEC DBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'strm01_queue_agent');
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT('strm01_queue_agent', NULL, NULL); SYS.DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.strm01_queue', subscriber => subscriber, rule => NULL, transformation => NULL); END; /
BEGIN DBMS_AQADM.ENABLE_DB_ACCESS( agent_name => 'strm01_queue_agent', db_username => 'oe'); END; /
EXECUTE
privilege on the DBMS_AQ
package, if the user is not already granted this privilege.
GRANT EXECUTE ON DBMS_AQ TO oe;
When these steps are complete, the oe
user is a secure user of the strm01_queue
queue and can perform operations on the queue. You still must grant the user specific privileges to perform queue operations, such as enqueue and dequeue privileges.
See Also:
|
You may want to disable a user from performing queue operations on a secure queue for the following reasons:
ALTER_APPLY
procedure in the DBMS_APPLY_ADM
package to change the apply_user
for an apply process, and you do not want the old apply_user
to be able to perform operations on the apply process queue.To disable a secure queue user, you can revoke ENQUEUE
and DEQUEUE
privilege on the queue from the user, or you can run the DISABLE_DB_ACCESS
procedure in the DBMS_AQADM
package. For example, suppose you want to disable the oe
user from performing queue operations on the strm01_queue
created in "Creating a Streams Queue".
Attention: If an agent is used for multiple secure queues, then running |
oe
user from performing queue operations on the secure queue strm01_queue
:
BEGIN DBMS_AQADM.DISABLE_DB_ACCESS( agent_name => 'strm01_queue_agent', db_username => 'oe'); END; /
BEGIN DBMS_AQADM.DROP_AQ_AGENT( agent_name => 'strm01_queue_agent'); END; /
BEGIN DBMS_AQADM.REVOKE_QUEUE_PRIVILEGE ( privilege => 'ALL', queue_name => 'strmadmin.strm01_queue', grantee => 'oe'); END; /
See Also:
|
To drop an existing Streams queue, perform the same actions that you would to drop a typed queue. A Streams queue may be dropped in the following ways:
STOP_QUEUE
, DROP_QUEUE
, and DROP_QUEUE_TABLE
procedures in the DBMS_AQADM
package, or by using only the DROP_QUEUE_TABLE
procedure on the queue table containing the Streams queue with the force
parameter set to true
CASCADE
optionWhen you drop a Streams queue, all of the error transactions that were moved to the exception queue from the Streams queue are deleted automatically.
See Also:
Oracle9i Supplied PL/SQL Packages and Types Reference for more information about dropping queues |
A propagation propagates events from a Streams source queue to a Streams destination queue. This section provides instructions for completing the following tasks:
In addition, you can use the features of Oracle Advanced Queuing (AQ) to manage Streams propagations.
See Also:
Oracle9i Application Developer's Guide - Advanced Queuing for more information about managing propagations with the features of AQ |
You can use any of the following procedures to create a propagation:
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES
DBMS_STREAMS_ADM.ADD_SCHEMA_PROPAGATION_RULES
DBMS_STREAMS_ADM.ADD_GLOBAL_PROPAGATION_RULES
DBMS_PROPAGATION_ADM.CREATE_PROPAGATION
Each of the procedures in the DBMS_STREAMS_ADM
package creates a propagation with the specified name if it does not already exist, creates a rule set for the propagation if the propagation does not have a rule set, and may add table, schema, or global rules to the rule set. The CREATE_PROPAGATION
procedure creates a propagation, but does not create a rule set or rules for the propagation. All propagations are started automatically upon creation.
The following tasks must be completed before you create a propagation:
The following is an example that runs the ADD_TABLE_RULES
procedure in the DBMS_STREAMS_ADM
package to create a propagation:
BEGIN DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES( table_name => 'hr.departments', streams_name => 'strm01_propagation', source_queue_name => 'strmadmin.strm01_queue', destination_queue_name => 'strmadmin.strm02_queue@dbs2.net', include_dml => true, include_ddl => true, include_tagged_lcr => false, source_database => 'dbs1.net' ); END; /
Running this procedure performs the following actions:
strm01_propagation
. The propagation is created only if it does not already exist.strm01_queue
in the current database to strm02_queue
in the dbs2.net
databasedbs2.net
database link to propagate the LCRs, because the destination_queue_name
parameter contains @dbs2.net
SYS.STREAMS$_EVALUATION_CONTEXT
. The rule set name is specified by the system.hr.departments
table, and the other rule specifies that the propagation propagates DDL LCRs that contain changes to the hr.departments
table. The rule names are specified by the system.NULL
tag, because the include_tagged_lcr
parameter is set to false
. This behavior is accomplished through the system-created rules for the propagation.dbs1.net
, which may or may not be the current databaseThe following is an example that runs the CREATE_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package to create a propagation:
BEGIN DBMS_PROPAGATION_ADM.CREATE_PROPAGATION( propagation_name => 'strm02_propagation', source_queue => 'strmadmin.strm03_queue', destination_queue => 'strmadmin.strm04_queue', destination_dblink => 'dbs2.net', rule_set_name => 'strmadmin.strm01_rule_set'); END; /
Running this procedure performs the following actions:
strm02_propagation
. A propagation with the same name must not exist.strm03_queue
in the current database to strm04_queue
in the dbs2.net
database. Depending on the rules in the rule set, the propagated events may be captured events or user-enqueued events, or both.dbs2.net
database link to propagate the eventsstrm01_rule_set
By default, propagation jobs are enabled upon creation. If you disable a propagation job and want to enable it, then use the ENABLE_PROPAGATION_SCHEDULE
procedure in the DBMS_AQADM
package.
For example, to enable a propagation job that propagates events from the strmadmin.strm01_queue
source queue using the dbs2.net
database link, run the following procedure:
BEGIN DBMS_AQADM.ENABLE_PROPAGATION_SCHEDULE( queue_name => 'strmadmin.strm01_queue', destination => 'dbs2.net'); END; /
Note: Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the |
See Also:
|
You can schedule a propagation job using the SCHEDULE_PROPAGATION
procedure in the DBMS_AQADM
package. If there is a problem with a propagation job, then unscheduling and scheduling the propagation job may correct the problem.
For example, the following procedure schedules a propagation job that propagates events from the strmadmin.strm01_queue
source queue using the dbs2.net
database link:
BEGIN DBMS_AQADM.SCHEDULE_PROPAGATION( queue_name => 'strmadmin.strm01_queue', destination => 'dbs2.net'); END; /
Note: Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the |
See Also:
|
You can alter the schedule of an existing propagation job using the ALTER_PROPAGATION_SCHEDULE
procedure in the DBMS_AQADM
package.
For example, suppose you want to alter the schedule of a propagation job that propagates events from the strmadmin.strm01_queue
source queue using the dbs2.net
database link. The following procedure sets the propagation job to propagate events every 15 minutes (900 seconds), with each propagation lasting 300 seconds, and a 25 second wait before new events in a completely propagated queue are propagated.
BEGIN DBMS_AQADM.ALTER_PROPAGATION_SCHEDULE( queue_name => 'strmadmin.strm01_queue', destination => 'dbs2.net', duration => 300, next_time => 'SYSDATE + 900/86400', latency => 25); END; /
Note: Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the |
See Also:
|
You can unschedule a propagation job using the UNSCHEDULE_PROPAGATION
procedure in the DBMS_AQADM
package. If there is a problem with a propagation job, then unscheduling and scheduling the propagation job may correct the problem.
For example, the following procedure unschedules a propagation job that propagates events from the strmadmin.strm01_queue
source queue using the dbs2.net
database link:
BEGIN DBMS_AQADM.UNSCHEDULE_PROPAGATION( queue_name => 'strmadmin.strm01_queue', destination => 'dbs2.net'); END; /
Note: Completing this task affects all propagations that propagate events from the source queue to all destination queues that use the |
See Also:
|
You specify the rule set that you want to associate with a propagation using the rule_set_name
parameter in the ALTER_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package. For example, the following procedure sets the rule set for a propagation named strm01_propagation
to strm02_rule_set
.
BEGIN DBMS_PROPAGATION_ADM.ALTER_PROPAGATION( propagation_name => 'strm01_propagation', rule_set_name => 'strmadmin.strm02_rule_set'); END; /
You add rules to the rule set of a propagation, you can run one of the following procedures:
DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES
DBMS_STREAMS_ADM.ADD_SCHEMA_PROPAGATION_RULES
DBMS_STREAMS_ADM.ADD_GLOBAL_PROPAGATION_RULES
The following is an example that runs the ADD_TABLE_RULES
procedure in the DBMS_STREAMS_ADM
package to add rules to the rule set of a propagation named strm01_propagation
:
BEGIN DBMS_STREAMS_ADM.ADD_TABLE_PROPAGATION_RULES( table_name => 'hr.locations', streams_name => 'strm01_propagation', source_queue_name => 'strmadmin.strm01_queue', destination_queue_name => 'strmadmin.strm02_queue@dbs2.net', include_dml => true, include_ddl => true, source_database => 'dbs1.net' ); END; /
Running this procedure performs the following actions:
strm01_propagation
. The propagation is created only if it does not already exist.strm01_queue
in the current database to strm02_queue
in the dbs2.net
databasedbs2.net
database link to propagate the LCRs, because the destination_queue_name
parameter contains @dbs2.net
hr.locations
table, and the other rule specifies that the propagation propagates DDL LCRs that contain changes to the hr.locations
table. The rule names are specified by the system.dbs1.net
, which may or may not be the current database
You specify that you want to remove a rule from the rule set for an existing propagation by running the REMOVE_RULE
procedure in the DBMS_STREAMS_ADM
package. For example, the following procedure removes a rule named DEPARTMENTS3
from the rule set of a propagation named strm01_propagation
.
BEGIN DBMS_STREAMS_ADM.REMOVE_RULE( rule_name => 'DEPARTMENTS3', streams_type => 'propagation', streams_name => 'strm01_propagation', drop_unused_rule => true); END; /
In this example, the drop_unused_rule
parameter in the REMOVE_RULE
procedure is set to true
, which is the default setting. Therefore, if the rule being removed is not in any other rule set, then it will be dropped from the database. If the drop_unused_rule
parameter is set to false
, then the rule is removed from the rule set, but it is not dropped from the database.
In addition, if you want to remove all of the rules in the rule set for the propagation, then specify NULL
for the rule_name
parameter when you run the REMOVE_RULE
procedure.
Note: If you drop all of the rules in the rule set for a propagation, then the propagation propagations no events in the source queue to the destination queue. |
You specify that you want to remove the rule set from a propagation by setting the rule_set_name
parameter to NULL
in the ALTER_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package. For example, the following procedure removes the rule set from a propagation named strm01_propagation
.
BEGIN DBMS_PROPAGATION_ADM.ALTER_PROPAGATION( propagation_name => 'strm01_propagation', rule_set_name => NULL); END; /
Note: If you remove a rule set for a propagation, then the propagation propagates all events in the source queue to the destination queue. |
To stop a propagation job, use the DISABLE_PROPAGATION_SCHEDULE
procedure in the DBMS_AQADM
package.
For example, to stop a propagation job that propagates events from the strmadmin.strm01_queue
source queue using the dbs2.net
database link, run the following procedure:
BEGIN DBMS_AQADM.DISABLE_PROPAGATION_SCHEDULE( queue_name => 'strmadmin.strm01_queue', destination => 'dbs2.net'); END; /
See Also:
Oracle9i Application Developer's Guide - Advanced Queuing for more information about using the |
You run the DROP_PROPAGATION
procedure in the DBMS_PROPAGATION_ADM
package to drop an existing propagation. For example, the following procedure drops a propagation named strm01_propagation
:
BEGIN DBMS_PROPAGATION_ADM.DROP_PROPAGATION( propagation_name => 'strm01_propagation'); END; /
Note: When you drop a propagation, the propagation job used by the propagation is dropped automatically, if no other propagations are using the propagation job. |
Streams enables messaging with queues of type SYS.AnyData
. These queues stage user messages whose payloads are of SYS.AnyData
type, and a SYS.AnyData
payload can be a wrapper for payloads of different datatypes.
This section provides instructions for completing the following tasks:
Note: The examples in this section assume that you have configured a Streams administrator at each database. |
See Also:
|
You can wrap almost any type of payload in a SYS.AnyData
payload. The following sections provide examples of enqueuing messages into, and dequeuing messages from, a SYS.AnyData
queue.
The following steps illustrate how to wrap payloads of various types in a SYS.AnyData
payload.
dbs1.net
database.EXECUTE
privilege on the DBMS_AQ
package to the oe
user so that this user can run the ENQUEUE
and DEQUEUE
procedures in that package:
GRANT EXECUTE ON DBMS_AQ TO oe;
CONNECT strmadmin/strmadminpw@dbs1.net
SYS.AnyData
queue if one does not already exist.
BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'oe_q_table_any', queue_name => 'oe_q_any', queue_user => 'oe'); END; /
The oe
user is configured automatically as a secure queue user of the oe_q_any
queue and is given ENQUEUE
and DEQUEUE
privileges on the queue.
EXEC DBMS_AQADM.CREATE_AQ_AGENT(agent_name => 'local_agent');
oe_q_any
queue. This subscriber will perform explicit dequeues of events.
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); SYS.DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_q_any', subscriber => subscriber); END; /
oe
user with the local_agent
agent:
BEGIN DBMS_AQADM.ENABLE_DB_ACCESS( agent_name => 'local_agent', db_username => 'oe'); END; /
oe
user.
CONNECT oe/oe@dbs1.net
SYS.AnyData
type and enqueues a message containing the payload into an existing SYS.AnyData
queue.
CREATE OR REPLACE PROCEDURE oe.enq_proc (payload SYS.AnyData) IS enqopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_q_any', enqueue_options => enqopt, message_properties => mprop, payload => payload, msgid => enq_msgid); END; /
Convert
data_type
function. The following commands enqueue messages of various types.
VARCHAR2
type:
EXEC oe.enq_proc(SYS.AnyData.ConvertVarchar2('Chemicals - SW')); COMMIT;
NUMBER
type:
EXEC oe.enq_proc(SYS.AnyData.ConvertNumber('16')); COMMIT;
User-defined type:
BEGIN oe.enq_proc(SYS.AnyData.ConvertObject(oe.cust_address_typ( '1646 Brazil Blvd','361168','Chennai','Tam', 'IN'))); END; / COMMIT;
See Also:
"Viewing the Contents of User-Enqueued Events in a Queue" for information about viewing the contents of these enqueued messages |
The following steps illustrate how to dequeue a payload wrapped in a SYS.AnyData
payload. This example assumes that you have completed the steps in "Example of Wrapping a Payload in a SYS.AnyData Payload and Enqueuing It".
To dequeue messages, you must know the consumer of the messages. To find the consumer for the messages in a queue, connect as the owner of the queue and query the AQ$
queue_table_name
, where queue_table_name
is the name of the queue table. For example, to find the consumers of the messages in the oe_q_any
queue, run the following query:
CONNECT strmadmin/strmadminpw@dbs1.net SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;
oe
user:
CONNECT oe/oe@dbs1.net
oe.cust_address_typ
and prints the contents of the messages.
CREATE OR REPLACE PROCEDURE oe.get_cust_address ( consumer IN VARCHAR2) AS address OE.CUST_ADDRESS_TYP; deq_address SYS.AnyData; msgid RAW(16); deqopt DBMS_AQ.DEQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; new_addresses BOOLEAN := TRUE; next_trans EXCEPTION; no_messages EXCEPTION; pragma exception_init (next_trans, -25235); pragma exception_init (no_messages, -25228); num_var pls_integer; BEGIN deqopt.consumer_name := consumer; deqopt.wait := 1; WHILE (new_addresses) LOOP BEGIN DBMS_AQ.DEQUEUE( queue_name => 'strmadmin.oe_q_any', dequeue_options => deqopt, message_properties => mprop, payload => deq_address, msgid => msgid);
deqopt.navigation := DBMS_AQ.NEXT; DBMS_OUTPUT.PUT_LINE('****'); IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName()); num_var := deq_address.GetObject(address); DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** '); DBMS_OUTPUT.PUT_LINE(address.street_address); DBMS_OUTPUT.PUT_LINE(address.postal_code); DBMS_OUTPUT.PUT_LINE(address.city); DBMS_OUTPUT.PUT_LINE(address.state_province); DBMS_OUTPUT.PUT_LINE(address.country_id); ELSE DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' || deq_address.GetTypeName()); END IF; COMMIT; EXCEPTION WHEN next_trans THEN deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION; WHEN no_messages THEN new_addresses := FALSE; DBMS_OUTPUT.PUT_LINE('No more messages'); END; END LOOP; END; /
SET SERVEROUTPUT ON SIZE 100000 EXEC oe.get_cust_address('LOCAL_AGENT');
SYS.AnyData
queues can interoperate with typed queues in a Streams environment. A typed queue is a queue that can stage messages of a particular type only. To propagate a message from a SYS.AnyData
queue to a typed queue, the message must be transformed to match the type of the typed queue. The following sections provide examples of propagating non-LCR user messages and LCRs between a SYS.AnyData
queue and a typed queue.
Note: The examples in this section assume that you have completed the examples in "Wrapping User Message Payloads in a SYS.AnyData Wrapper". |
See Also:
"Message Propagation and SYS.AnyData Queues" for more information about propagation between |
The following steps set up propagation from a SYS.AnyData
queue named oe_q_any
to a typed queue of type oe.cust_address_typ
named oe_q_address
. The source queue oe_q_any
is at the dbs1.net
database, and the destination queue oe_q_address
is at the dbs2.net
database. Both queues are owned by strmadmin
.
dbs1.net
.strmadmin
, if it was not already granted.
GRANT EXECUTE ON DBMS_TRANSFORM TO strmadmin;
strmadmin
EXECUTE
privilege on oe.cust_address_typ
at dbs1.net
and dbs2.net
.
CONNECT oe/oe@dbs1.net GRANT EXECUTE ON oe.cust_address_typ TO strmadmin; CONNECT oe/oe@dbs2.net GRANT EXECUTE ON oe.cust_address_typ TO strmadmin;
dbs2.net
, if one does not already exist.
CONNECT strmadmin/strmadminpw@dbs2.net BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'strmadmin.oe_q_table_address', queue_payload_type => 'oe.cust_address_typ', multiple_consumers => true); DBMS_AQADM.CREATE_QUEUE( queue_name => 'strmadmin.oe_q_address', queue_table => 'strmadmin.oe_q_table_address'); DBMS_AQADM.START_QUEUE( queue_name => 'strmadmin.oe_q_address'); END; /
dbs1.net
and dbs2.net
if one does not already exist.
CONNECT strmadmin/strmadminpw@dbs1.net CREATE DATABASE LINK dbs2.net CONNECT TO strmadmin IDENTIFIED BY strmadminpw USING 'DBS2.NET';
any_to_cust_address_typ
in the strmadmin
schema at dbs1.net
that takes a SYS.AnyData
payload containing a oe.cust_address_typ
object and returns the oe.cust_address_typ
object.
CREATE OR REPLACE FUNCTION strmadmin.any_to_cust_address_typ( in_any IN SYS.AnyData) RETURN OE.CUST_ADDRESS_TYP AS address OE.CUST_ADDRESS_TYP; num_var NUMBER; type_name VARCHAR2(100); BEGIN -- Get the type of object type_name := in_any.GetTypeName(); -- Check if the object type is OE.CUST_ADDRESS_TYP IF (type_name = 'OE.CUST_ADDRESS_TYP') THEN -- Put the address in the message into the address variable num_var := in_any.GetObject(address); RETURN address; ELSE raise_application_error(-20101, 'Conversion failed - ' || type_name); END IF; END; /
dbs1.net
using the DBMS_TRANSFORM
package.
BEGIN DBMS_TRANSFORM.CREATE_TRANSFORMATION( schema => 'strmadmin', name => 'anytoaddress', from_schema => 'SYS', from_type => 'ANYDATA', to_schema => 'oe', to_type => 'cust_address_typ', transformation => 'strmadmin.any_to_cust_address_typ(source.user_data)'); END; /
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT ('ADDRESS_AGENT_REMOTE', 'STRMADMIN.OE_Q_ADDRESS@DBS2.NET', 0); DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_q_any', subscriber => subscriber, rule => 'TAB.USER_DATA.GetTypeName()=''OE.CUST_ADDRESS_TYP''', transformation => 'strmadmin.anytoaddress'); END; /
SYS.AnyData
queue at dbs1.net
and the typed queue at dbs2.net
.
BEGIN DBMS_AQADM.SCHEDULE_PROPAGATION( queue_name => 'strmadmin.oe_q_any', destination => 'dbs2.net'); END; /
oe.cust_address_typ
type wrapped in a SYS.AnyData
wrapper:
CONNECT oe/oe@dbs1.net BEGIN oe.enq_proc(SYS.AnyData.ConvertObject(oe.cust_address_typ( '1668 Chong Tao','111181','Beijing',NULL, 'CN'))); END; / COMMIT;
dbs2.net
to view the propagated message:
CONNECT strmadmin/strmadminpw@dbs2.net SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ADDRESS;
See Also:
Oracle9i Application Developer's Guide - Advanced Queuing for more information about transformations during propagation |
To propagate LCRs from a SYS.AnyData
queue to a typed queue, you complete the same steps as you do for non-LCR events, but Oracle supplies the transformation functions. You can use the following functions in the DBMS_STREAMS
package to transform LCRs in SYS.AnyData
queues to messages in typed queues:
CONVERT_ANYDATA_TO_LCR_ROW
function transforms SYS.AnyData
payload containing a row LCR into SYS.LCR$_ROW_RECORD
payload.CONVERT_ANYDATA_TO_LCR_DDL
function transforms SYS.AnyData
payload containing a DDL LCR into SYS.LCR$_DDL_RECORD
payload.You can propagate user-enqueued LCRs to an appropriate typed queue, but propagation of captured LCRs to a typed queue is not supported.
The following example sets up propagation of row LCRs from a SYS.AnyData
queue named oe_q_any
to a typed queue of type SYS.LCR$_ROW_RECORD
named oe_q_lcr
. The source queue oe_q_any
is at the dbs1.net
database, and the destination queue oe_q_lcr
is at the dbs3.net
database.
dbs1.net
.strmadmin
, if it was not already granted.
GRANT EXECUTE ON DBMS_TRANSFORM TO strmadmin;
CONNECT strmadmin/strmadminpw@dbs3.net BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'strmadmin.oe_q_table_lcr', queue_payload_type => 'SYS.LCR$_ROW_RECORD', multiple_consumers => true); DBMS_AQADM.CREATE_QUEUE( queue_name => 'strmadmin.oe_q_lcr', queue_table => 'strmadmin.oe_q_table_lcr'); DBMS_AQADM.START_QUEUE( queue_name => 'strmadmin.oe_q_lcr'); END; /
dbs1.net
and dbs3.net
if one does not already exist.
CONNECT strmadmin/strmadminpw@dbs1.net CREATE DATABASE LINK dbs3.net CONNECT TO strmadmin IDENTIFIED BY strmadminpw USING 'DBS3.NET';
dbs1.net
using the DBMS_TRANSFORM
package.
BEGIN DBMS_TRANSFORM.CREATE_TRANSFORMATION( schema => 'strmadmin', name => 'anytolcr', from_schema => 'SYS', from_type => 'ANYDATA', to_schema => 'SYS', to_type => 'LCR$_ROW_RECORD', transformation => 'SYS.DBMS_STREAMS.CONVERT_ANYDATA_TO_LCR_ROW(source.user_data)'); END; /
CONVERT_ANYDATA_TO_LCR_ROW
function for the transformation parameter.
DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT ( 'ROW_LCR_AGENT_REMOTE', 'STRMADMIN.OE_Q_LCR@DBS3.NET', 0); DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_q_any', subscriber => subscriber, rule => 'TAB.USER_DATA.GetTypeName()=''SYS.LCR$_ROW_RECORD''', transformation => 'strmadmin.anytolcr'); END; /
SYS.AnyData
queue at dbs1.net
and the LCR queue at dbs3.net
.
BEGIN DBMS_AQADM.SCHEDULE_PROPAGATION( queue_name => 'strmadmin.oe_q_any', destination => 'dbs3.net'); END; /
strmadmin.oe_q_any
queue:
CONNECT oe/oe@dbs1.net CREATE OR REPLACE PROCEDURE oe.enq_row_lcr_proc( source_dbname VARCHAR2, cmd_type VARCHAR2, obj_owner VARCHAR2, obj_name VARCHAR2, old_vals SYS.LCR$_ROW_LIST, new_vals SYS.LCR$_ROW_LIST) AS eopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); row_lcr SYS.LCR$_ROW_RECORD; BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('LOCAL_AGENT', NULL, NULL); -- Construct the LCR based on information passed to procedure row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT( source_database_name => source_dbname, command_type => cmd_type, object_owner => obj_owner, object_name => obj_name, old_values => old_vals, new_values => new_vals); -- Enqueue the created row LCR DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_q_any', enqueue_options => eopt, message_properties => mprop, payload => SYS.AnyData.ConvertObject(row_lcr), msgid => enq_msgid); END enq_row_lcr_proc; /
oe.inventories
table and enqueue the row LCR into the strmadmin.oe_q_any
queue.
DECLARE newunit1 SYS.LCR$_ROW_UNIT; newunit2 SYS.LCR$_ROW_UNIT; newunit3 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN newunit1 := SYS.LCR$_ROW_UNIT( 'PRODUCT_ID', SYS.AnyData.ConvertNumber(3503), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit2 := SYS.LCR$_ROW_UNIT( 'WAREHOUSE_ID', SYS.AnyData.ConvertNumber(1), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit3 := SYS.LCR$_ROW_UNIT( 'QUANTITY_ON_HAND', SYS.AnyData.ConvertNumber(157), DBMS_LCR.NOT_A_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3); oe.enq_row_lcr_proc( source_dbname => 'DBS1.NET', cmd_type => 'INSERT', obj_owner => 'OE', obj_name => 'INVENTORIES', old_vals => NULL, new_vals => newvals); END; / COMMIT;
dbs3.net
to view the propagated message:
CONNECT strmadmin/strmadminpw@dbs3.net SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_LCR;
See Also:
The |