priority support needs to be enabled for a destination with a policy entry, prioritizedMessages see: http://activemq.apache.org/per-destination-policies.html The new (5.4) attributes are in red.
To see the programatic setup check out the source of the one of the priority tests: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/MessagePriorityTest.java?view=markup Note, with the kahaDB store, there are three levels of priority supported (low, normal, high). With the JDBCStore, the full complement, 0-9 are supported. On 20 August 2010 10:16, sonicBasher <sonicbas...@gmail.com> wrote: > > Hi, > > According to activeMQ website, the latest, 5.4 version supports message > priority handling: > http://activemq.apache.org/new-features-in-54.html > > What's more, according to this website > https://issues.apache.org/activemq/browse/AMQ-2790 > handling priorities takes place by calling the > setMessagePrioritySupported(true) on an ActiveMQConnectionFactory object. > Besides, it's the default setting anyways. > > Now, I tested this using the latest 5.4 version, but the messages don't seem > to get ordered by the priority. > Here's how I do it (it's really straightforward): > > > -------------------------------------------------------- > PRODUCER CLASS: > > import java.util.LinkedList; > import java.util.List; > import javax.jms.Connection; > import javax.jms.JMSException; > import javax.jms.MapMessage; > import javax.jms.Queue; > import javax.jms.Session; > import org.apache.activemq.ActiveMQConnectionFactory; > import javax.jms.DeliveryMode; > > public final class Main { > > public static void main(String[] args) throws JMSException { > String brokerURL = "tcp://localhost:61616"; > String pendingQueueName = "pendingQ"; > String configID = "12350"; > int transactionID = 2; > final int repetitions = 10; > > ActiveMQConnectionFactory cf = new > ActiveMQConnectionFactory(brokerURL); > Connection connection = cf.createConnection(); > connection.start(); > Session session = connection.createSession(true, > Session.SESSION_TRANSACTED); > Queue queue = session.createQueue(pendingQueueName); > javax.jms.MessageProducer producer = session.createProducer(queue); > > for (int i = 0; i < repetitions; i++) { > MapMessage mapMessage = session.createMapMessage(); > mapMessage.setStringProperty("configID", configID); > mapMessage.setIntProperty("transactionID", transactionID); > List<String> ls2 = new LinkedList<String>(); > ls2.add("abc" + i); > mapMessage.setObject("customerBeanList", ls2); > producer.send(mapMessage, DeliveryMode.PERSISTENT, i, 0); > } > session.commit(); > connection.stop(); > connection.close(); > } > } > > > -------------------------------------------------------- > CONSUMER CLASS: > > import java.util.LinkedList; > import java.util.List; > > > public class Main { > > public static void main(String[] args) { > AccountMessageListener aml = null; > String queueName = "pendingQ"; > String brokerURL = "tcp://localhost:61616"; > String configID = "12350"; > int transactionID = 2; > try { > List<String> completeListOfReceivedStrings = new > LinkedList<String>(); > List<String> tempReceivedStrings = null; > aml = new AccountMessageListener(brokerURL, queueName); > while ((tempReceivedStrings = aml.receiveMessage(configID, > transactionID, 0)) != null) { > completeListOfReceivedStrings.addAll(tempReceivedStrings); > } > aml.closeConnection(); > String[] arrayOfStrings = null; > if (completeListOfReceivedStrings != null) { > arrayOfStrings = new > String[completeListOfReceivedStrings.size()]; > for (int i = 0; i < completeListOfReceivedStrings.size(); > i++) { > arrayOfStrings[i] = > completeListOfReceivedStrings.get(i); > } > } > for (int i = 0; i < arrayOfStrings.length; i++) { > System.out.println(arrayOfStrings[i]); > } > } catch (Exception e) { > e.printStackTrace(); > aml.closeConnection(); > } > } > } > > > -------------------------------------------------------- > CONSUMER HELPER CLASS: > > import java.util.List; > import javax.jms.Connection; > import javax.jms.Destination; > import javax.jms.JMSException; > import javax.jms.MapMessage; > import javax.jms.MessageConsumer; > import javax.jms.Session; > import org.apache.activemq.ActiveMQConnectionFactory; > > public class AccountMessageListener { > > private ActiveMQConnectionFactory connectionFactory = null; > private Connection connection = null; > private Session session = null; > private Destination destination = null; > private String brokerURL = null; > private String queueName = null; > private MessageConsumer confirmConsumer = null; > > /** > * Constructor. > * @param brokerURL broker url > * @param queueName queue name > */ > public AccountMessageListener(String brokerURL, String queueName) { > try { > this.brokerURL = brokerURL; > this.queueName = queueName; > createConnection(); > session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > destination = session.createQueue(queueName); > } catch (JMSException e) { > e.printStackTrace(); > } catch (Exception e) { > e.printStackTrace(); > } > } > > /** > * Connects to message queue. > */ > private void createConnection() { > try { > connectionFactory = new ActiveMQConnectionFactory(brokerURL); > connectionFactory.setMessagePrioritySupported(true); > connection = connectionFactory.createConnection(); > connection.start(); > } catch (JMSException e) { > closeConnection(); > e.printStackTrace(); > } catch (Exception e) { > e.printStackTrace(); > } > } > > /** > * Receives message from queue. > * @param configID config id > * @param transactionID transaction id > * @return message > */ > public List<String> receiveMessage(String configID, int transactionID) { > try { > String messageSelector = "configID = '" + configID + "' AND > transactionID = " + transactionID; > confirmConsumer = session.createConsumer(destination, > messageSelector); > MapMessage mapMessage = null; > List<String> ls = null; > mapMessage = (MapMessage) confirmConsumer.receive(250); > if (mapMessage != null) { > if (mapMessage.getObject("customerBeanList") != null && > mapMessage.getObject("customerBeanList") instanceof List) { > ls = (List<String>) > mapMessage.getObject("customerBeanList"); > } > } > confirmConsumer.close(); > return ls; > } catch (JMSException ex) { > ex.printStackTrace(); > closeConnection(); > return null; > } catch (Exception e) { > e.printStackTrace(); > closeConnection(); > return null; > } > } > > /** > * Closes connection. > */ > public void closeConnection() { > try { > session.close(); > connection.close(); > > } catch (Exception e) { > e.printStackTrace(); > } > } > > /** > * Queue name getter. > * @return queue name > */ > public String getQueueName() { > return queueName; > } > } > > ---------------------------------------------------------------- > > Obviously I use ActiveMQ 5.4 on localhost as a message broker. I'll be > grateful for any hints. > Regards > > > > -- > View this message in context: > http://old.nabble.com/Method-setMessagePrioritySupported%28true%29-called-on-ActiveMQConnectionFactory-doesn%27t-work-tp29490144p29490144.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > > -- http://blog.garytully.com Open Source Integration http://fusesource.com