java-Oracle AQ/JMS-为什么在应用程序关闭时清除队列?
发布时间:2022-03-21 17:56:19 464
相关标签: # 移动端
我有一个应用程序,可以使用JMS接口对来自Oracle AQ的消息进行排队和解密。当应用程序运行时,项目会排队并退出队列,我可以在队列表中看到排队的项目。但是,当应用程序关闭时,队列表被清除,应用程序无法访问之前排队的项目。知道是什么导致了这种行为吗?
Oracle AQ是使用以下代码创建的:
BEGIN
dbms_aqadm.create_queue_table(
queue_table => 'schema.my_queuetable',
sort_list =>'priority,enq_time',
comment => 'Queue table to hold my data',
multiple_consumers => FALSE, -- THis is necessary so that a message is only processed by a single consumer
queue_payload_type => 'SYS.AQ$_JMS_OBJECT_MESSAGE',
compatible => '10.0.0',
storage_clause => 'TABLESPACE LGQUEUE_IRM01');
END;
/
BEGIN
dbms_aqadm.create_queue (
queue_name => 'schema.my_queue',
queue_table => 'schema.my_queuetable');
END;
/
BEGIN
dbms_aqadm.start_queue(queue_name=>'schema.my_queue');
END;
/
我还有一个Java类,用于连接到队列、排队项目和处理这样的出列项目:
public class MyOperationsQueueImpl implements MyOperationsQueue {
private static final Log LOGGER = LogFactory.getLog(MyOperationsQueueImpl.class);
private final QueueConnection queueConnection;
private final QueueSession producerQueueSession;
private final QueueSession consumerQueueSession;
private final String queueName;
private final QueueSender queueSender;
private final QueueReceiver queueReceiver;
private MyOperationsQueue.MyOperationEventReceiver eventReceiver;
public MyOperationsQueueImpl(DBUtils dbUtils, String queueName) throws MyException {
this.eventReceiver = null;
this.queueName = queueName;
try {
DataSource ds = dbUtils.getDataSource();
QueueConnectionFactory connectionFactory = AQjmsFactory.getQueueConnectionFactory(ds);
this.queueConnection = connectionFactory.createQueueConnection();
// We create separate producer and consumer sessions because that is what is recommended by the docs
// See: https://docs.oracle.com/javaee/6/api/javax/jms/Session.html
this.producerQueueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
this.consumerQueueSession = this.queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
this.queueSender = this.producerQueueSession.createSender(this.producerQueueSession.createQueue(this.queueName));
this.queueReceiver = this.consumerQueueSession.createReceiver(this.consumerQueueSession.createQueue(this.queueName));
this.queueConnection.start();
} catch (JMSException| NamingException exception) {
throw new MyOperationException("Failed to create MyOperationsQueue", exception);
}
}
@Override
protected void finalize() throws Throwable {
this.queueReceiver.close();
this.queueSender.close();
this.consumerQueueSession.close();
this.producerQueueSession.close();
this.queueConnection.close();
super.finalize();
}
@Override
public void submitMyOperation(MyOperationParameters myParameters) throws MyOperationException {
try {
ObjectMessage message = this.producerQueueSession.createObjectMessage(myParameters);
this.queueSender.send(message);
synchronized (this) {
if(this.eventReceiver != null) {
this.eventReceiver.onOperationSubmitted(message.getJMSMessageID(), myParameters);
}
}
} catch (JMSException exc) {
throw new MyOperationException("Failed to submit my operation", exc);
}
}
@Override
public void setMyOperationEventReceiver(MyOperationEventReceiver operationReceiver) throws MyOperationException {
LOGGER.debug("Setting my operation event receiver");
synchronized (this) {
if(this.eventReceiver != null) {
throw new IllegalStateException("Cannot set an operation event receiver if it is already set");
}
this.eventReceiver = operationReceiver;
try {
this.queueReceiver.setMessageListener(message -> {
LOGGER.debug("New message received from queue receiver");
try {
ObjectMessage objectMessage = (ObjectMessage) message;
eventReceiver.onOperationReady(message.getJMSMessageID(), (MyOperationParameters) objectMessage.getObject());
} catch (Exception exception) {
try {
eventReceiver.onOperationRetrievalFailed(message.getJMSMessageID(), exception);
} catch (JMSException innerException) {
LOGGER.error("Failed to get message ID for JMS Message: "+message, innerException);
}
}
});
} catch (JMSException exc) {
throw new MyOperationException("Failed to set My message listener", exc);
}
}
}
}
特别声明:以上内容(图片及文字)均为互联网收集或者用户上传发布,本站仅提供信息存储服务!如有侵权或有涉及法律问题请联系我们。
举报