Just to confirm: when your subscribers are all offline, how many objects
are returned in the List from getSubscriptions()?

Tim

On Wed, Oct 18, 2017 at 10:18 AM, Hitesh <
hitesh.hotchand...@contentsphere.com> wrote:

>
>
> Hi all,
> I have a very peculiar and weird usecase with ActiveMQ 5.13
>
> I have two durable subscribers listening on a *topic*. When a particular
> event occurs, I have to invalidate the messages and remove them from the
> topic. To do this, I'm using JMX connection and browsing on the topic and
> then from each subscriber I'm removing the message. If the subscriber is
> Active, I'm closing the activeMQ connection, which is a
> org.apache.activemq.pool.PooledConnectionFactory.createConnection().
>
> In this scenario, I'm able to clear the pending message and also the space
> on Kahadb reduces. Expected behaviour.
>
> But If I keep my subscribers inactive, the message count remains the same
> and also the kahadb disk usage.
>
> Following is the code I use to cleanup the messages:
>     public void startCleanup() throws ExportStagingException {
>         try {
>             if (topicViewMBean == null) {
>                 createTopicViewMBean();
>             }
>             for (String jmsMessageID :
> getMessageIDs(topicViewMBean.browse())) {
>                 messagesCount++;
>                 for (DurableSubscriptionViewMBean subscriber :
> getSubscriptions(topicViewMBean.getSubscriptions(), connection)) {
>                     subscriber.removeMessage(jmsMessageID);
>                 }
>             }
>             if (topicViewMBean.browse().length > 0) {
>                 startCleanup();
>             }
>             if (messagesCount == 0) {
>                 logger.info("No messages to delete on Topic: " +
> topicName +
> "_" + projectName);
>             } else {
>                 logger.info("Successfully deleted " + messagesCount + "
> messages from " + topicName + "_" + projectName);
>                 messagesCount = 0;
>             }
>         } catch (Exception e) {
>             throw new ExportStagingException(e);
>         }
>     }
>
> Following function fetches the message from the topic and maintains a list
> of their JMSMessageID
>
>     private Set<String> getMessageIDs(CompositeData[] messages) {
>         Set<String> jmsMessageIDs = new HashSet<>();
>         for (CompositeData message : messages) {
>             jmsMessageIDs.add((String) message.get(CONST_JMS_MESSAGE_ID));
>         }
>         return jmsMessageIDs;
>     }
>
> And this one to fetch the subscription list
>
>     private List<DurableSubscriptionViewMBean>
> getSubscriptions(ObjectName[]
> subscriptionNames, MBeanServerConnection conn) {
>         if (subscriptions == null) {
>             subscriptions = new ArrayList<>();
>             for (ObjectName subscriptionName : subscriptionNames) {
>                 //Creates Subscriber Object and caches it.
>                 subscriptions.add(
>                         MBeanServerInvocationHandler.newProxyInstance(
>                                 conn,
>                                 subscriptionName,
>                                 DurableSubscriptionViewMBean.class,
>                                 true
>                         )
>                 );
>             }
>         }
>         return subscriptions;
>     }
>
>
> And last but not the least,  The create connection and topic
>
>     private MBeanServerConnection createConnection() {
>         try {
>             if (connection == null) {
>                 logger.info("Connecting to ActiveMQ JMX Portal");
>                 String jmxURL = "service:jmx:rmi:///jndi/rmi://" +
> hostName
> + ":" + jmxPort + "/jmxrmi";
>                 connection = JMXConnectorFactory.connect(new
> JMXServiceURL(jmxURL)).getMBeanServerConnection();
>                 logger.info("Connected to ActiveMQ JMX Portal");
>             }
>         } catch (IOException e) {
>             logger.error("[" + masterProducer + "] Exception while
> createConnection ActiveMQ JMX Portal" + e.getMessage());
>         }
>         return connection;
>     }
>
>     private TopicViewMBean createTopicViewMBean() throws
> ExportStagingException {
>         try {
>             if (topicViewMBean == null) {
>                 if (connection == null) {
>                     createConnection();
>                 }
>                 String brokerObjectName =
> "org.apache.activemq:type=Broker,brokerName=" + brokerName;
>                 BrokerViewMBean broker;
>                 broker =
> MBeanServerInvocationHandler.newProxyInstance(connection, new
> ObjectName(brokerObjectName),
>                         BrokerViewMBean.class, true);
>                 //The following for-loop fetches info about the topics
> available on the Broker.
>                 boolean topicExists = false;
>                 for (ObjectName topic : broker.getTopics()) {
>                     if
> (topic.getKeyProperty(CONST_DESTINATION_NAME).equals(topicName + "_" +
> projectName)) {
>                         topicExists = true;
>                         topicViewMBean =
> MBeanServerInvocationHandler.newProxyInstance(connection, topic,
> TopicViewMBean.class, true);
>                         break;
>                     }
>                 }
>                 if (!topicExists) {
>                     logger.info("Topic " + topicName + " does not
> exists");
>                 }
>             }
>         } catch (MalformedObjectNameException e) {
>             logger.error("MalformedObjectNameException while
> createTopicViewMBean: " + e.getMessage());
>             throw new ExportStagingException(e);
>         }
>         return topicViewMBean;
>     }
>
>
> I've gone through the code to-and fro trying to find the missing link, but
> all in vain.
> Can anyone help me out over here...
>
> Thanks :)
>
>
>
>
>
> --
> Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-
> f2341805.html
>

Reply via email to