oracle aq java jms使用(数据类型为XMLTYPE)

发布时间:2023年12月17日

记录一次冷门技术oracle aq的使用

版本

oracle 11g

创建用户

-- 创建用户
create user testaq identified by 123456;
grant connect, resource to testaq;

-- 创建aq所需要的权限
grant execute on dbms_aq to testaq;
grant execute on dbms_aqadm to testaq;
begin
  dbms_aqadm.grant_system_privilege('enqueue_any', 'testaq', false);
  dbms_aqadm.grant_system_privilege('dequeue_any', 'testaq', false);
end;

grant execute on dbms_aq to testaq;
grant resource to testaq;
grant connect to testaq;
grant execute any procedure to testaq;
grant aq_administrator_role to testaq;
grant aq_user_role to testaq;
grant execute on dbms_aqadm to testaq;
grant execute on dbms_aq to testaq;
grant execute on dbms_aqin to testaq;
grant create procedure to testaq;
grant create procedure to testaq with admin option;

创建列队表

begin
  dbms_aqadm.create_queue_table(
    queue_table   => 'testaq.xml_queue_table',
    queue_payload_type => 'SYS.XMLTYPE',
    multiple_consumers => false
  );
end;

创建列队及启动队列

begin
  dbms_aqadm.create_queue (
    queue_name  => 'testaq.xml_queue',
    queue_table => 'testaq.xml_queue_table'
  );

  dbms_aqadm.start_queue(
    queue_name  =>  'testaq.xml_queue'
  );
end;

停止及删除队列

begin
  dbms_aqadm.stop_queue (queue_name => 'testaq.xml_queue');
  dbms_aqadm.drop_queue (queue_name => 'testaq.xml_queue');
  dbms_aqadm.drop_queue_table (queue_table => 'testaq.xml_queue_table');
end;

发送消息

declare
  r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  v_message_handle RAW(16);
  o_payload SYS.XMLTYPE;
begin
  o_payload := SYS.XMLTYPE('<ROOT><ROWSET><ROW><APPLYNO>test</APPLYNO></ROW></ROWSET></ROOT>');

  dbms_aq.enqueue(
    queue_name  => 'testaq.test_queue',
    enqueue_options => r_enqueue_options,
    message_properties => r_message_properties,
    payload => o_payload,
    msgid => v_message_handle
  );
   commit;
end;

Java接收消息

oracle-aq:
  jdbcUrl: jdbc:oracle:thin:@localhost:1521:testaq
  username: testaq
  password: 123456
  queueNameUser: testaq
  queueName: xml_queue
@Component
@ConfigurationProperties(prefix = "oracle-aq")
@Data
public class OracleAqJmsConfig {
    private String jdbcUrl;
    private String username;
    private String password;
    private String queueNameUser;
    private String queueName;
}
import lombok.extern.slf4j.Slf4j;
import oracle.jms.AQjmsAdtMessage;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.xdb.XMLType;
import oracle.xdb.XMLTypeFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import javax.jms.*;
import javax.xml.bind.JAXBException;
import java.util.Properties;

@Service
@Slf4j
public class TestOracleAq {

    @Autowired
    private OracleAqJmsConfig config;

    @PostConstruct
    public void messageListener() throws JMSException {
        QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.getJdbcUrl(), new Properties());
        QueueConnection conn = queueConnectionFactory.createQueueConnection(config.getUsername(), config.getPassword());
        AQjmsSession session = (AQjmsSession)conn.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
        conn.start();
        Queue queue = (AQjmsDestination)session.getQueue(config.getQueueNameUser(), config.getQueueName());
        XMLTypeFactory factory = new XMLTypeFactory();
        MessageConsumer consumer = session.createConsumer(queue, null, factory, null, false);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
                try {
                    Object adtPayload = adtMessage.getAdtPayload();
                    XMLType xmlType = (XMLType)adtPayload;
                    saveXml(xmlType.getStringVal());
                    log.info("接收到oracle aq数据:{}", xmlType.getStringVal());
                } catch (Exception e) {
                    log.error("", e);
                }
            }
        });
    }

    public void saveXml(String xml) throws JAXBException {
        // todo ...
    }

}

依赖

在oracle安装目录中查找这些依赖
在这里插入图片描述

<!-- oracle aq -->
 <dependency>
	<groupId>com.oracle</groupId>
	<artifactId>ojdbc6</artifactId>
	<version>11.1.0.7.0</version>
	<scope>system</scope>
    <systemPath>${project.basedir}/libs/ojdbc6.jar</systemPath>
</dependency>
<dependency>
	<groupId>com.oracle</groupId>
	<artifactId>jmscommon</artifactId>
	<version>1.0</version>
	<scope>system</scope>
    <systemPath>${project.basedir}/libs/jmscommon.jar</systemPath>
</dependency>
<dependency>
	<groupId>com.oracle</groupId>
	<artifactId>orai18n</artifactId>
	<version>11.1.0.7.0</version>
	<scope>system</scope>
    <systemPath>${project.basedir}/libs/orai18n.jar</systemPath>
</dependency>
<dependency>
	<groupId>com.oracle</groupId>
	<artifactId>jta</artifactId>
	<version>1.0</version>
	<scope>system</scope>
    <systemPath>${project.basedir}/libs/jta.jar</systemPath>
</dependency>
<dependency>
	<groupId>com.oracle</groupId>
	<artifactId>aqapi_g</artifactId>
	<version>1.0</version>
	<scope>system</scope>
    <systemPath>${project.basedir}/libs/aqapi_g.jar</systemPath>
</dependency>
<dependency>
     <groupId>oracle.xdb</groupId>
    <artifactId>xdb</artifactId>
	<version>21.9.0.0</version>
	<scope>system</scope>
    <systemPath>${project.basedir}/libs/xdb.jar</systemPath>
</dependency>
<!-- oracle aq -->
文章来源:https://blog.csdn.net/weixin_39806100/article/details/134995814
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。