Hi there,

you could use the JMX API for doing that.

A couple of convenient JMX methods are:

private MBeanInfo getMBeanInfo(String brokerName, String type, String pattern) throws Exception {
        
        Set<?> mBeans = getMBeans(brokerName, type, pattern);
        assertEquals(1, mBeans.size());
        
return getMBeanServerConnection (brokerName).getMBeanInfo(getObjectName(brokerName, type, pattern));
    }

private Set<?> getMBeans(String brokerName, String type) throws Exception {
        return getMBeans(brokerName, type, "*", 0);
    }

private Set<?> getMBeans(String brokerName, String type, String pattern) throws Exception {
        return getMBeans(brokerName, type, pattern, 0);
    }

private Set<?> getMBeans(String brokerName, String type, String pattern, int timeout) throws Exception {
        
        final long expiryTime = System.currentTimeMillis() + timeout;
final ObjectName beanName = getObjectName(brokerName, type, pattern);

        Set<?> mbeans = null;
        do {
            if (timeout > 0) {
                Thread.sleep(100);
            }
MBeanServerConnection mbsc = getMBeanServerConnection(brokerName);
            if (mbsc != null) {
                LOG.info("Query name: " + beanName);
                mbeans = mbsc.queryMBeans(beanName, null);
            }
} while ((mbeans == null || mbeans.isEmpty()) && expiryTime > System.currentTimeMillis());
        return mbeans;
    }

private MBeanServerConnection getMBeanServerConnection(String brokerName) throws MalformedURLException, Exception {
        
int jmxPort = getBrokerService(brokerName).getManagementContext().getConnectorPort();
        
final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + jmxPort + "/ jmxrmi");
        MBeanServerConnection mbsc = null;
        try {
            JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
            mbsc = jmxc.getMBeanServerConnection();
        } catch (Exception ignored) {
        }
        return mbsc;
    }

private Object getAttribute(String brokerName, String type, String pattern, String attrName) throws Exception {
        MBeanInfo info = getMBeanInfo(brokerName, type, pattern);
Object obj = getMBeanServerConnection (brokerName).getAttribute(getObjectName(brokerName, type, pattern), attrName);
        return obj;
    }

private void queryMBean(String brokerName, String type, String pattern) throws Exception {
      ObjectName objName = getObjectName(brokerName, type, pattern);
MBeanInfo info = getMBeanServerConnection(brokerName).getMBeanInfo(objName);

      for(MBeanAttributeInfo attr: info.getAttributes()) {
          LOG.info("Found attribute : " + attr.getName());
      }
      for(MBeanOperationInfo op: info.getOperations()) {
          LOG.info("Found operation : " + op.getName());
      }

getMBeanServerConnection (brokerName).getAttribute(getObjectName(brokerName, type, pattern), "ConsumerCount");
    }

private ObjectName getObjectName(String brokerName, String type, String pattern) throws Exception {
      ObjectName beanName = new ObjectName(
"org.apache.activemq:BrokerName=" + brokerName + ",Type=" + type +"," + pattern
      );

      return beanName;
    }

Probably you would just need code to get the MBeanServer connection and query the attribute. Perhaps you have to tweak the methods
to fulfill your needs; I have just taken them from a test case.

An example to query the number of consumers:

consumers_A = getAttribute(brokerAName, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");

Hope that helps

Andreas

On Apr 22, 2009, at 5:10 AM, BigPic wrote:


I have a setup with one fast producer and a variable number of slow
consumers. I would like the producer, on startup, to adjust itself to the
number of consumers and the number of currently pending messages.

I know that I can use the  http://activemq.apache.org/advisory-message.html
Advisory Messages to tell when consumers start and stop listening, but short of becoming a fake consumer, I haven't found a way for the producer to tell on startup what the current situation is. It seems the "Command Agent"
might help, but I can't find any documentation for it.

Does anyone have any recommendations?

Thanks.


--
View this message in context: 
http://www.nabble.com/Programatically-determining-number-of-consumers-and-pending-messages-tp23168916p23168916.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Reply via email to