Hi all, I have a very peculiar and weird usecase with ActiveMQ 5.13
I have two durable subscribers listening on a *topic*. When a particular event occurs, I have to invalidate the messages and remove them from the topic. To do this, I'm using JMX connection and browsing on the topic and then from each subscriber I'm removing the message. If the subscriber is Active, I'm closing the activeMQ connection, which is a org.apache.activemq.pool.PooledConnectionFactory.createConnection(). In this scenario, I'm able to clear the pending message and also the space on Kahadb reduces. Expected behaviour. But If I keep my subscribers inactive, the message count remains the same and also the kahadb disk usage. Following is the code I use to cleanup the messages: public void startCleanup() throws ExportStagingException { try { if (topicViewMBean == null) { createTopicViewMBean(); } for (String jmsMessageID : getMessageIDs(topicViewMBean.browse())) { messagesCount++; for (DurableSubscriptionViewMBean subscriber : getSubscriptions(topicViewMBean.getSubscriptions(), connection)) { subscriber.removeMessage(jmsMessageID); } } if (topicViewMBean.browse().length > 0) { startCleanup(); } if (messagesCount == 0) { logger.info("No messages to delete on Topic: " + topicName + "_" + projectName); } else { logger.info("Successfully deleted " + messagesCount + " messages from " + topicName + "_" + projectName); messagesCount = 0; } } catch (Exception e) { throw new ExportStagingException(e); } } Following function fetches the message from the topic and maintains a list of their JMSMessageID private Set<String> getMessageIDs(CompositeData[] messages) { Set<String> jmsMessageIDs = new HashSet<>(); for (CompositeData message : messages) { jmsMessageIDs.add((String) message.get(CONST_JMS_MESSAGE_ID)); } return jmsMessageIDs; } And this one to fetch the subscription list private List<DurableSubscriptionViewMBean> getSubscriptions(ObjectName[] subscriptionNames, MBeanServerConnection conn) { if (subscriptions == null) { subscriptions = new ArrayList<>(); for (ObjectName subscriptionName : subscriptionNames) { //Creates Subscriber Object and caches it. subscriptions.add( MBeanServerInvocationHandler.newProxyInstance( conn, subscriptionName, DurableSubscriptionViewMBean.class, true ) ); } } return subscriptions; } And last but not the least, The create connection and topic private MBeanServerConnection createConnection() { try { if (connection == null) { logger.info("Connecting to ActiveMQ JMX Portal"); String jmxURL = "service:jmx:rmi:///jndi/rmi://" + hostName + ":" + jmxPort + "/jmxrmi"; connection = JMXConnectorFactory.connect(new JMXServiceURL(jmxURL)).getMBeanServerConnection(); logger.info("Connected to ActiveMQ JMX Portal"); } } catch (IOException e) { logger.error("[" + masterProducer + "] Exception while createConnection ActiveMQ JMX Portal" + e.getMessage()); } return connection; } private TopicViewMBean createTopicViewMBean() throws ExportStagingException { try { if (topicViewMBean == null) { if (connection == null) { createConnection(); } String brokerObjectName = "org.apache.activemq:type=Broker,brokerName=" + brokerName; BrokerViewMBean broker; broker = MBeanServerInvocationHandler.newProxyInstance(connection, new ObjectName(brokerObjectName), BrokerViewMBean.class, true); //The following for-loop fetches info about the topics available on the Broker. boolean topicExists = false; for (ObjectName topic : broker.getTopics()) { if (topic.getKeyProperty(CONST_DESTINATION_NAME).equals(topicName + "_" + projectName)) { topicExists = true; topicViewMBean = MBeanServerInvocationHandler.newProxyInstance(connection, topic, TopicViewMBean.class, true); break; } } if (!topicExists) { logger.info("Topic " + topicName + " does not exists"); } } } catch (MalformedObjectNameException e) { logger.error("MalformedObjectNameException while createTopicViewMBean: " + e.getMessage()); throw new ExportStagingException(e); } return topicViewMBean; } I've gone through the code to-and fro trying to find the missing link, but all in vain. Can anyone help me out over here... Thanks :) -- Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html