Hi there,
you could use the JMX API for doing that.
A couple of convenient JMX methods are:
private MBeanInfo getMBeanInfo(String brokerName, String type,
String pattern) throws Exception {
Set<?> mBeans = getMBeans(brokerName, type, pattern);
assertEquals(1, mBeans.size());
return
getMBeanServerConnection
(brokerName).getMBeanInfo(getObjectName(brokerName, type, pattern));
}
private Set<?> getMBeans(String brokerName, String type) throws
Exception {
return getMBeans(brokerName, type, "*", 0);
}
private Set<?> getMBeans(String brokerName, String type, String
pattern) throws Exception {
return getMBeans(brokerName, type, pattern, 0);
}
private Set<?> getMBeans(String brokerName, String type, String
pattern, int timeout) throws Exception {
final long expiryTime = System.currentTimeMillis() + timeout;
final ObjectName beanName = getObjectName(brokerName, type,
pattern);
Set<?> mbeans = null;
do {
if (timeout > 0) {
Thread.sleep(100);
}
MBeanServerConnection mbsc =
getMBeanServerConnection(brokerName);
if (mbsc != null) {
LOG.info("Query name: " + beanName);
mbeans = mbsc.queryMBeans(beanName, null);
}
} while ((mbeans == null || mbeans.isEmpty()) && expiryTime >
System.currentTimeMillis());
return mbeans;
}
private MBeanServerConnection getMBeanServerConnection(String
brokerName) throws MalformedURLException, Exception {
int jmxPort =
getBrokerService(brokerName).getManagementContext().getConnectorPort();
final JMXServiceURL url = new
JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + jmxPort + "/
jmxrmi");
MBeanServerConnection mbsc = null;
try {
JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
mbsc = jmxc.getMBeanServerConnection();
} catch (Exception ignored) {
}
return mbsc;
}
private Object getAttribute(String brokerName, String type,
String pattern, String attrName) throws Exception {
MBeanInfo info = getMBeanInfo(brokerName, type, pattern);
Object obj =
getMBeanServerConnection
(brokerName).getAttribute(getObjectName(brokerName, type, pattern),
attrName);
return obj;
}
private void queryMBean(String brokerName, String type, String
pattern) throws Exception {
ObjectName objName = getObjectName(brokerName, type, pattern);
MBeanInfo info =
getMBeanServerConnection(brokerName).getMBeanInfo(objName);
for(MBeanAttributeInfo attr: info.getAttributes()) {
LOG.info("Found attribute : " + attr.getName());
}
for(MBeanOperationInfo op: info.getOperations()) {
LOG.info("Found operation : " + op.getName());
}
getMBeanServerConnection
(brokerName).getAttribute(getObjectName(brokerName, type, pattern),
"ConsumerCount");
}
private ObjectName getObjectName(String brokerName, String type,
String pattern) throws Exception {
ObjectName beanName = new ObjectName(
"org.apache.activemq:BrokerName=" + brokerName + ",Type=" +
type +"," + pattern
);
return beanName;
}
Probably you would just need code to get the MBeanServer connection
and query the attribute. Perhaps you have to tweak the methods
to fulfill your needs; I have just taken them from a test case.
An example to query the number of consumers:
consumers_A = getAttribute(brokerAName, "Queue", "Destination=" +
QUEUE_NAME, "ConsumerCount");
Hope that helps
Andreas
On Apr 22, 2009, at 5:10 AM, BigPic wrote:
I have a setup with one fast producer and a variable number of slow
consumers. I would like the producer, on startup, to adjust itself
to the
number of consumers and the number of currently pending messages.
I know that I can use the http://activemq.apache.org/advisory-message.html
Advisory Messages to tell when consumers start and stop listening,
but
short of becoming a fake consumer, I haven't found a way for the
producer to
tell on startup what the current situation is. It seems the
"Command Agent"
might help, but I can't find any documentation for it.
Does anyone have any recommendations?
Thanks.
--
View this message in context:
http://www.nabble.com/Programatically-determining-number-of-consumers-and-pending-messages-tp23168916p23168916.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.