Hi all, 
I have a very peculiar and weird usecase with ActiveMQ 5.13

I have two durable subscribers listening on a *topic*. When a particular
event occurs, I have to invalidate the messages and remove them from the
topic. To do this, I'm using JMX connection and browsing on the topic and
then from each subscriber I'm removing the message. If the subscriber is
Active, I'm closing the activeMQ connection, which is a
org.apache.activemq.pool.PooledConnectionFactory.createConnection().

In this scenario, I'm able to clear the pending message and also the space
on Kahadb reduces. Expected behaviour.

But If I keep my subscribers inactive, the message count remains the same
and also the kahadb disk usage.

Following is the code I use to cleanup the messages:
    public void startCleanup() throws ExportStagingException {
        try {
            if (topicViewMBean == null) {
                createTopicViewMBean();
            }
            for (String jmsMessageID :
getMessageIDs(topicViewMBean.browse())) {
                messagesCount++;
                for (DurableSubscriptionViewMBean subscriber :
getSubscriptions(topicViewMBean.getSubscriptions(), connection)) {
                    subscriber.removeMessage(jmsMessageID);
                }
            }
            if (topicViewMBean.browse().length > 0) {
                startCleanup();
            }
            if (messagesCount == 0) {
                logger.info("No messages to delete on Topic: " + topicName +
"_" + projectName);
            } else {
                logger.info("Successfully deleted " + messagesCount + "
messages from " + topicName + "_" + projectName);
                messagesCount = 0;
            }
        } catch (Exception e) {
            throw new ExportStagingException(e);
        }
    }

Following function fetches the message from the topic and maintains a list
of their JMSMessageID 

    private Set<String> getMessageIDs(CompositeData[] messages) {
        Set<String> jmsMessageIDs = new HashSet<>();
        for (CompositeData message : messages) {
            jmsMessageIDs.add((String) message.get(CONST_JMS_MESSAGE_ID));
        }
        return jmsMessageIDs;
    }

And this one to fetch the subscription list 

    private List<DurableSubscriptionViewMBean> getSubscriptions(ObjectName[]
subscriptionNames, MBeanServerConnection conn) {
        if (subscriptions == null) {
            subscriptions = new ArrayList<>();
            for (ObjectName subscriptionName : subscriptionNames) {
                //Creates Subscriber Object and caches it.
                subscriptions.add(
                        MBeanServerInvocationHandler.newProxyInstance(
                                conn,
                                subscriptionName,
                                DurableSubscriptionViewMBean.class,
                                true
                        )
                );
            }
        }
        return subscriptions;
    }


And last but not the least,  The create connection and topic 

    private MBeanServerConnection createConnection() {
        try {
            if (connection == null) {
                logger.info("Connecting to ActiveMQ JMX Portal");
                String jmxURL = "service:jmx:rmi:///jndi/rmi://" + hostName
+ ":" + jmxPort + "/jmxrmi";
                connection = JMXConnectorFactory.connect(new
JMXServiceURL(jmxURL)).getMBeanServerConnection();
                logger.info("Connected to ActiveMQ JMX Portal");
            }
        } catch (IOException e) {
            logger.error("[" + masterProducer + "] Exception while
createConnection ActiveMQ JMX Portal" + e.getMessage());
        }
        return connection;
    }

    private TopicViewMBean createTopicViewMBean() throws
ExportStagingException {
        try {
            if (topicViewMBean == null) {
                if (connection == null) {
                    createConnection();
                }
                String brokerObjectName =
"org.apache.activemq:type=Broker,brokerName=" + brokerName;
                BrokerViewMBean broker;
                broker =
MBeanServerInvocationHandler.newProxyInstance(connection, new
ObjectName(brokerObjectName),
                        BrokerViewMBean.class, true);
                //The following for-loop fetches info about the topics
available on the Broker.
                boolean topicExists = false;
                for (ObjectName topic : broker.getTopics()) {
                    if
(topic.getKeyProperty(CONST_DESTINATION_NAME).equals(topicName + "_" +
projectName)) {
                        topicExists = true;
                        topicViewMBean =
MBeanServerInvocationHandler.newProxyInstance(connection, topic,
TopicViewMBean.class, true);
                        break;
                    }
                }
                if (!topicExists) {
                    logger.info("Topic " + topicName + " does not exists");
                }
            }
        } catch (MalformedObjectNameException e) {
            logger.error("MalformedObjectNameException while
createTopicViewMBean: " + e.getMessage());
            throw new ExportStagingException(e);
        }
        return topicViewMBean;
    }


I've gone through the code to-and fro trying to find the missing link, but
all in vain.
Can anyone help me out over here...

Thanks :)





--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-User-f2341805.html

Reply via email to