The problem still occured in the latest 5.3-SNAPSHOT.

I make reference to sample of this URL
http://www.nabble.com/ActiveMQ-message-delivery-td18355245.html

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

import junit.framework.TestCase;

/** 
 * @author Marjan Sterjev 
 * 
 */
public class ProducerTester extends TestCase {
        
        public void testSendQueue() throws JMSException, InterruptedException {
                ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616"); 
                Connection connection = connectionFactory.createConnection(); 
                Session session = connection.createSession(false, 
                        Session.AUTO_ACKNOWLEDGE); 
                Destination destination = session.createQueue("test.queue"); 
                MessageProducer producer = session.createProducer(destination); 
                producer.setDeliveryMode(DeliveryMode.PERSISTENT); 
                producer.setTimeToLive(10000); 
                int msgCount = 20; 
                int sleep = 100; 
                String payload = "Test Message"; 
                for (int i = 0; i < msgCount; i++) { 
                        ObjectMessage message = session.createObjectMessage(); 
                        String p = String.format("%s:%d", payload, (i + 1));
                        message.setObject(p); 
                        producer.send(message); 
                        Thread.sleep(sleep); 
                } 
                producer.close(); 
                session.close(); 
                connection.close(); 
        }
}

I have already known a workaround which works a periodic message expiry task
as you refer.

However unusual case can occur as I mentioned.

Test steps are followed :
start an ActiveMQ setting a vm queue cursor.
set a MessageConsumer to consume only those messages that had expired in
every 5 minutes.
send a message(set TimeToLive to 10 seconds) by junit test case
(ProducerTester class)
After between 10 seconds and 5 minutes(hope that message is expired while
MessageConsumer doesn't work yet), make server shutdown abruptly so that
messages which has been expired leaves on queue.
restart an ActiveMQ.
In this states, an ActiveMQ cannot be stated by ClassCastException.



Gary Tully wrote:
> 
> would it be possible to provide a junit test case that demonstrates this
> behavior? I wonder does the problem persist with the latest 5.3-SNAPSHOT?
> 
> Trunk has a resolution to
> https://issues.apache.org/activemq/browse/AMQ-1112that provides a
> periodic message expiry task. That should help but it may be
> that there still remains some assumptions about the type of message.
> A junit test case would help lots.
> 
> 
> 
> 2009/7/7 sic <sic_1...@naver.com>
> 
>>
>> While expired messages remain on queue, server cannot be re-started with
>> ClassCastException until I delete persistent file(e.g. file :
>> data\kr-store\data\hash-index-queue-data_queue#3a#2f#2fTEST.QUEUE,
>> QueueName
>> : TEST.QUEUE) of the queue which has expired messages
>>
>> Yet, this way has disadvantage that all of messages on specified queue
>> have
>> been purged regardless of expiration.
>>
>> Does anyone have other ways?
>>
>>
>> sic wrote:
>> >
>> > While testing ActiveMQ5.2.0, I have a question about expired message.
>> >
>> > Note that testing messages are Persistent and using a queue, set
>> message
>> > expiration time to 10 seconds. configurations are same except
>> destination
>> > policy(using a vm queue cursor) as below
>> > <destinationPolicy>
>> >       <policyMap>
>> >               <policyEntries>
>> >                       <policyEntry queue=">" memoryLimit="5mb">
>> >                               <dispatchPolicy>
>> >                                       <strictOrderDispatchPolicy />
>> >                               </dispatchPolicy>
>> >                               <deadLetterStrategy>
>> >                                       <individualDeadLetterStrategy
>> queuePrefix="DLQ."/>
>> >                               </deadLetterStrategy>
>> >                               <pendingQueuePolicy>
>> >                                       <vmQueueCursor />
>> >                               </pendingQueuePolicy>
>> >                       </policyEntry>
>> >               </policyEntries>
>> >       </policyMap>
>> > </destinationPolicy>
>> > Additionally, it turns out that the activemq broker actually does not
>> > proactively purge expired messages from queues. so we set
>> > a thread that periodically cleared my queues of expired messages by
>> help
>> > of this forum's advisor.
>> > We suppose that a server is stopped unexpectedly when expired message
>> > leaves on queue without disposed by the thread
>> > After recovering a server, we will expect that remained messages
>> including
>> > both normal and expired message are loaded normally
>> > However server stopped abnormally and some error occured while starting
>> > ActiveMQ. Errors are like that
>> >
>> > 2009-07-01 17:13:45,125 [main           ] INFO  BrokerService
>> > - For help or more information please see: http://activemq.apache.org/
>> > 2009-07-01 17:13:45,421 [main           ] INFO  KahaStore
>> > - Kaha Store using data directory
>> > D:\apache\apache-activemq-5.2.0\binary\bin\..\data\kr-store\data
>> > 2009-07-01 17:13:45,796 [main           ] ERROR BrokerService
>> > - Failed to start ActiveMQ JMS Message Broker. Reason:
>> > java.lang.ClassCastException:
>> > org.apache.activemq.command.ActiveMQObjectMessage
>> > java.lang.ClassCastException:
>> > org.apache.activemq.command.ActiveMQObjectMessage
>> >       at
>> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1114)
>> >       at
>> > org.apache.activemq.broker.region.Queue.messageExpired(Queue.java:1106)
>> >       at
>> >
>> org.apache.activemq.broker.region.Queue$5.recoverMessage(Queue.java:173)
>> >       at
>> >
>> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessage(RecoveryListenerAdapter.java:45)
>> >       at
>> >
>> org.apache.activemq.store.amq.RecoveryListenerAdapter.recoverMessageReference(RecoveryListenerAdapter.java:56)
>> >       at
>> >
>> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recoverReference(KahaReferenceStore.java:82)
>> >       at
>> >
>> org.apache.activemq.store.kahadaptor.KahaReferenceStore.recover(KahaReferenceStore.java:93)
>> >       at
>> >
>> org.apache.activemq.store.amq.AMQMessageStore.recover(AMQMessageStore.java:481)
>> >       at
>> org.apache.activemq.broker.region.Queue.initialize(Queue.java:167)
>> >       at
>> >
>> org.apache.activemq.broker.region.DestinationFactoryImpl.createDestination(DestinationFactoryImpl.java:83)
>> >       at
>> >
>> org.apache.activemq.broker.region.AbstractRegion.createDestination(AbstractRegion.java:434)
>> >       at
>> >
>> org.apache.activemq.broker.jmx.ManagedQueueRegion.createDestination(ManagedQueueRegion.java:56)
>> >       at
>> >
>> org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:120)
>> >       at
>> >
>> org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:261)
>> >       at
>> >
>> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
>> >       at
>> >
>> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
>> >       at
>> >
>> org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:147)
>> >       at
>> >
>> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:142)
>> >       at
>> >
>> org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:149)
>> >       at
>> >
>> org.apache.activemq.broker.region.AbstractRegion.start(AbstractRegion.java:94)
>> >       at
>> >
>> org.apache.activemq.broker.region.RegionBroker.start(RegionBroker.java:176)
>> >       at
>> >
>> org.apache.activemq.broker.jmx.ManagedRegionBroker.start(ManagedRegionBroker.java:103)
>> >       at
>> >
>> org.apache.activemq.broker.TransactionBroker.start(TransactionBroker.java:112)
>> >       at
>> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
>> >       at
>> org.apache.activemq.broker.BrokerFilter.start(BrokerFilter.java:154)
>> >       at
>> >
>> org.apache.activemq.broker.MutableBrokerFilter.start(MutableBrokerFilter.java:161)
>> >       at
>> org.apache.activemq.broker.BrokerService.start(BrokerService.java:468)
>> >       at
>> >
>> org.apache.activemq.xbean.XBeanBrokerService.afterPropertiesSet(XBeanBrokerService.java:52)
>> >       at
>> >
>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1368)
>> >       at
>> >
>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1334)
>> >       at
>> >
>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:473)
>> >       at
>> >
>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory$1.run(AbstractAutowireCapableBeanFactory.java:409)
>> >       at java.security.AccessController.doPrivileged(Native Method)
>> >       at
>> >
>> org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:380)
>> >       at
>> >
>> org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:264)
>> >       at
>> >
>> org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:221)
>> >       at
>> >
>> org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:261)
>> >       at
>> >
>> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:185)
>> >       at
>> >
>> org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:164)
>> >       at
>> >
>> org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:429)
>> >       at
>> >
>> org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:729)
>> >       at
>> >
>> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:381)
>> >       at
>> >
>> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
>> >       at
>> >
>> org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
>> >       at
>> >
>> org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:96)
>> >       at
>> >
>> org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:52)
>> >       at
>> >
>> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
>> >       at
>> >
>> org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
>> >       at
>> >
>> org.apache.activemq.console.command.StartCommand.startBroker(StartCommand.java:115)
>> >       at
>> >
>> org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:74)
>> >       at
>> >
>> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
>> >       at
>> >
>> org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:129)
>> >       at
>> >
>> org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
>> >       at
>> >
>> org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:79)
>> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >       at
>> >
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>> >       at
>> >
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> >       at java.lang.reflect.Method.invoke(Method.java:585)
>> >       at org.apache.activemq.console.Main.runTaskClass(Main.java:225)
>> >       at org.apache.activemq.console.Main.main(Main.java:106)
>> > 2009-07-01 17:13:45,812 [main           ] INFO  BrokerService
>> > - ActiveMQ Message Broker (localhost, null) is shutting down
>> > 2009-07-01 17:13:45,828 [main           ] INFO  NetworkConnector
>> > - Network Connector default-nc Stopped
>> >
>> > It's in Windows XP.
>> >
>> > Regards
>> >
>>
>> --
>> View this message in context:
>> http://www.nabble.com/ClassCastException-having-relation-to-expired-messages-tp24287023p24369696.html
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
> 
> 
> -- 
> http://blog.garytully.com
> 
> Open Source Integration
> http://fusesource.com
> 
> 

-- 
View this message in context: 
http://www.nabble.com/ClassCastException-having-relation-to-expired-messages-tp24287023p24407653.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to