返回

java-Oracle AQ/JMS-为什么在应用程序关闭时清除队列?

发布时间:2022-03-21 17:56:19 324
# 移动端

我有一个应用程序,可以使用JMS接口对来自Oracle AQ的消息进行排队和解密。当应用程序运行时,项目会排队并退出队列,我可以在队列表中看到排队的项目。但是,当应用程序关闭时,队列表被清除,应用程序无法访问之前排队的项目。知道是什么导致了这种行为吗?

Oracle AQ是使用以下代码创建的:

BEGIN
dbms_aqadm.create_queue_table(
  queue_table => 'schema.my_queuetable',
  sort_list =>'priority,enq_time',
  comment => 'Queue table to hold my data',
  multiple_consumers => FALSE, -- THis is necessary so that a message is only processed by a single consumer
  queue_payload_type => 'SYS.AQ$_JMS_OBJECT_MESSAGE',
    compatible         => '10.0.0',
    storage_clause     => 'TABLESPACE LGQUEUE_IRM01');
END;
/

BEGIN
dbms_aqadm.create_queue (
   queue_name              => 'schema.my_queue',
   queue_table             => 'schema.my_queuetable');
END;
/

BEGIN
dbms_aqadm.start_queue(queue_name=>'schema.my_queue');
END;
/

我还有一个Java类,用于连接到队列、排队项目和处理这样的出列项目:

public class MyOperationsQueueImpl implements MyOperationsQueue {
  private static final Log LOGGER = LogFactory.getLog(MyOperationsQueueImpl.class);


  private final QueueConnection queueConnection;
  private final QueueSession producerQueueSession;
  private final QueueSession consumerQueueSession;
  private final String queueName;
  private final QueueSender queueSender;
  private final QueueReceiver queueReceiver;
  private MyOperationsQueue.MyOperationEventReceiver eventReceiver;

  public MyOperationsQueueImpl(DBUtils dbUtils, String queueName) throws MyException {
    this.eventReceiver = null;
    this.queueName = queueName;
    try {
      DataSource ds = dbUtils.getDataSource();
      QueueConnectionFactory connectionFactory = AQjmsFactory.getQueueConnectionFactory(ds);
      this.queueConnection = connectionFactory.createQueueConnection();

      // We create separate producer and consumer sessions because that is what is recommended by the docs
      // See: https://docs.oracle.com/javaee/6/api/javax/jms/Session.html
      this.producerQueueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      this.consumerQueueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
      this.queueSender = this.producerQueueSession.createSender(this.producerQueueSession.createQueue(this.queueName));
      this.queueReceiver = this.consumerQueueSession.createReceiver(this.consumerQueueSession.createQueue(this.queueName));
      this.queueConnection.start();
    } catch (JMSException| NamingException exception) {
      throw new MyOperationException("Failed to create MyOperationsQueue", exception);
    }
  }

  @Override
  protected void finalize() throws Throwable {
    this.queueReceiver.close();
    this.queueSender.close();
    this.consumerQueueSession.close();
    this.producerQueueSession.close();
    this.queueConnection.close();
    super.finalize();
  }

  @Override
  public void submitMyOperation(MyOperationParameters myParameters) throws MyOperationException {
    try {
      ObjectMessage message = this.producerQueueSession.createObjectMessage(myParameters);
      this.queueSender.send(message);
      synchronized (this) {
        if(this.eventReceiver != null) {
          this.eventReceiver.onOperationSubmitted(message.getJMSMessageID(), myParameters);
        }
      }
    } catch (JMSException exc) {
      throw new MyOperationException("Failed to submit my operation", exc);
    }
  }

  @Override
  public void setMyOperationEventReceiver(MyOperationEventReceiver operationReceiver) throws MyOperationException {
    LOGGER.debug("Setting my operation event receiver");
    synchronized (this) {
      if(this.eventReceiver != null) {
        throw new IllegalStateException("Cannot set an operation event receiver if it is already set");
      }
      this.eventReceiver = operationReceiver;
      try {
        this.queueReceiver.setMessageListener(message -> {
          LOGGER.debug("New message received from queue receiver");
          try {
            ObjectMessage objectMessage = (ObjectMessage) message;
            eventReceiver.onOperationReady(message.getJMSMessageID(), (MyOperationParameters) objectMessage.getObject());
          } catch (Exception exception) {
            try {
              eventReceiver.onOperationRetrievalFailed(message.getJMSMessageID(), exception);
            } catch (JMSException innerException) {
              LOGGER.error("Failed to get message ID for JMS Message: "+message, innerException);
            }
          }
        });
      } catch (JMSException exc) {
        throw new MyOperationException("Failed to set My message listener", exc);
      }
    }
  }

}
特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报
评论区(0)
按点赞数排序
用户头像
下一篇
python-按时间顺序排列的日期 2022-03-21 16:48:31