Just to confirm: when your subscribers are all offline, how many objects are returned in the List from getSubscriptions()?
Tim On Wed, Oct 18, 2017 at 10:18 AM, Hitesh < hitesh.hotchand...@contentsphere.com> wrote: > > > Hi all, > I have a very peculiar and weird usecase with ActiveMQ 5.13 > > I have two durable subscribers listening on a *topic*. When a particular > event occurs, I have to invalidate the messages and remove them from the > topic. To do this, I'm using JMX connection and browsing on the topic and > then from each subscriber I'm removing the message. If the subscriber is > Active, I'm closing the activeMQ connection, which is a > org.apache.activemq.pool.PooledConnectionFactory.createConnection(). > > In this scenario, I'm able to clear the pending message and also the space > on Kahadb reduces. Expected behaviour. > > But If I keep my subscribers inactive, the message count remains the same > and also the kahadb disk usage. > > Following is the code I use to cleanup the messages: > public void startCleanup() throws ExportStagingException { > try { > if (topicViewMBean == null) { > createTopicViewMBean(); > } > for (String jmsMessageID : > getMessageIDs(topicViewMBean.browse())) { > messagesCount++; > for (DurableSubscriptionViewMBean subscriber : > getSubscriptions(topicViewMBean.getSubscriptions(), connection)) { > subscriber.removeMessage(jmsMessageID); > } > } > if (topicViewMBean.browse().length > 0) { > startCleanup(); > } > if (messagesCount == 0) { > logger.info("No messages to delete on Topic: " + > topicName + > "_" + projectName); > } else { > logger.info("Successfully deleted " + messagesCount + " > messages from " + topicName + "_" + projectName); > messagesCount = 0; > } > } catch (Exception e) { > throw new ExportStagingException(e); > } > } > > Following function fetches the message from the topic and maintains a list > of their JMSMessageID > > private Set<String> getMessageIDs(CompositeData[] messages) { > Set<String> jmsMessageIDs = new HashSet<>(); > for (CompositeData message : messages) { > jmsMessageIDs.add((String) message.get(CONST_JMS_MESSAGE_ID)); > } > return jmsMessageIDs; > } > > And this one to fetch the subscription list > > private List<DurableSubscriptionViewMBean> > getSubscriptions(ObjectName[] > subscriptionNames, MBeanServerConnection conn) { > if (subscriptions == null) { > subscriptions = new ArrayList<>(); > for (ObjectName subscriptionName : subscriptionNames) { > //Creates Subscriber Object and caches it. > subscriptions.add( > MBeanServerInvocationHandler.newProxyInstance( > conn, > subscriptionName, > DurableSubscriptionViewMBean.class, > true > ) > ); > } > } > return subscriptions; > } > > > And last but not the least, The create connection and topic > > private MBeanServerConnection createConnection() { > try { > if (connection == null) { > logger.info("Connecting to ActiveMQ JMX Portal"); > String jmxURL = "service:jmx:rmi:///jndi/rmi://" + > hostName > + ":" + jmxPort + "/jmxrmi"; > connection = JMXConnectorFactory.connect(new > JMXServiceURL(jmxURL)).getMBeanServerConnection(); > logger.info("Connected to ActiveMQ JMX Portal"); > } > } catch (IOException e) { > logger.error("[" + masterProducer + "] Exception while > createConnection ActiveMQ JMX Portal" + e.getMessage()); > } > return connection; > } > > private TopicViewMBean createTopicViewMBean() throws > ExportStagingException { > try { > if (topicViewMBean == null) { > if (connection == null) { > createConnection(); > } > String brokerObjectName = > "org.apache.activemq:type=Broker,brokerName=" + brokerName; > BrokerViewMBean broker; > broker = > MBeanServerInvocationHandler.newProxyInstance(connection, new > ObjectName(brokerObjectName), > BrokerViewMBean.class, true); > //The following for-loop fetches info about the topics > available on the Broker. > boolean topicExists = false; > for (ObjectName topic : broker.getTopics()) { > if > (topic.getKeyProperty(CONST_DESTINATION_NAME).equals(topicName + "_" + > projectName)) { > topicExists = true; > topicViewMBean = > MBeanServerInvocationHandler.newProxyInstance(connection, topic, > TopicViewMBean.class, true); > break; > } > } > if (!topicExists) { > logger.info("Topic " + topicName + " does not > exists"); > } > } > } catch (MalformedObjectNameException e) { > logger.error("MalformedObjectNameException while > createTopicViewMBean: " + e.getMessage()); > throw new ExportStagingException(e); > } > return topicViewMBean; > } > > > I've gone through the code to-and fro trying to find the missing link, but > all in vain. > Can anyone help me out over here... > > Thanks :) > > > > > > -- > Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User- > f2341805.html >