Hi Clark, I solved my problem using transactions. Here's how I did it, maybe it'll help someone who stumbles upon similar problem (you should probably create the ActiveMQConnectionFactory and Session stuff elswhere and just pass a reference, but I add it here to make things more clear):
import java.util.List; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.camel.Body; import org.apache.camel.Handler; import com.thoughtworks.xstream.XStream; @Handler public void produce(@Body List listOfMyObj) throws JMSException { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); Connection connection = cf.createConnection(); connection.start(); XStream x = new XStream(); List<MyObject> listOfMyObjects = (List<MyObject>) listOfMyObj; Session session = connection.createSession(true, Session.SESSION_TRANSACTED); String id = listOfMyObjects.get(0).getAccountName(); Queue queue = session.createQueue(id); javax.jms.MessageProducer producer = session.createProducer(queue); for (int q = 0; q < listOfMyObjects .size(); q++) { MyObject eo = listOfMyObjects .get(q); Message message = session.createTextMessage(x.toXML(eo)); producer.send(message); if (q == listOfMyObjects .size() - 1) { session.commit(); } } } regards cobrien wrote: > > Hi, > It appears like you are sending messages in async mode (see link: > http://activemq.apache.org/async-sends.html). Order cannot be guaranteed, > even for a single producer, using async mode. So the answer to your > question is yes the sync/async mode can change ordering particularly when > sent over slow or unreliable networks. > > You also may want to look into using transactions to batch message sends. > The link below has an example Camel configuration.( under connection > pooling) > > http://camel.apache.org/activemq.html > > > Clark > > www.ttmsolutions.com > ActiveMQ reference guide at > http://bit.ly/AMQRefGuide > > > > > > > > > > > > > > > > > > > > > sonicBasher wrote: >> >> Hi Clark, >> >> I'm using Client-Server topology, meaning I just access ActiveMQ via tcp. >> I do it like this: >> >> final String brokerURL = "tcp://localhost:61616"; >> ConnectionFactory connectionFactory = new >> ActiveMQConnectionFactory(brokerURL); >> context.addComponent("jms", >> JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); >> >> where context is obviously my DefaultCamelContext object. >> >> As for the config files, I haven't changed anything. My activemq.xml >> looks like this: >> >> >> >> <!-- >> Licensed to the Apache Software Foundation (ASF) under one or more >> contributor license agreements. See the NOTICE file distributed with >> this work for additional information regarding copyright ownership. >> The ASF licenses this file to You under the Apache License, Version >> 2.0 >> (the "License"); you may not use this file except in compliance with >> the License. You may obtain a copy of the License at >> >> http://www.apache.org/licenses/LICENSE-2.0 >> >> Unless required by applicable law or agreed to in writing, software >> distributed under the License is distributed on an "AS IS" BASIS, >> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> See the License for the specific language governing permissions and >> limitations under the License. >> --> >> <beans >> xmlns="http://www.springframework.org/schema/beans" >> xmlns:amq="http://activemq.apache.org/schema/core" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://www.springframework.org/schema/beans >> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd >> http://activemq.apache.org/schema/core >> http://activemq.apache.org/schema/core/activemq-core.xsd"> >> >> <!-- Allows us to use system properties as variables in this >> configuration file --> >> <bean >> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> >> <property name="locations"> >> >> <value>file:${activemq.base}/conf/credentials.properties</value> >> </property> >> </bean> >> >> <!-- >> The <broker> element is used to configure the ActiveMQ broker. >> --> >> <broker xmlns="http://activemq.apache.org/schema/core" >> brokerName="localhost" dataDirectory="${activemq.base}/data" >> destroyApplicationContextOnStop="true"> >> >> <!-- >> For better performances use VM cursor and small memory >> limit. >> For more information, see: >> >> http://activemq.apache.org/message-cursors.html >> >> Also, if your producer is "hanging", it's probably due to >> producer flow control. >> For more information, see: >> http://activemq.apache.org/producer-flow-control.html >> --> >> >> <destinationPolicy> >> <policyMap> >> <policyEntries> >> <policyEntry topic=">" producerFlowControl="true" >> memoryLimit="1mb"> >> <pendingSubscriberPolicy> >> <vmCursor /> >> </pendingSubscriberPolicy> >> </policyEntry> >> <policyEntry queue=">" producerFlowControl="true" >> memoryLimit="1mb"> >> <!-- Use VM cursor for better latency >> For more information, see: >> >> http://activemq.apache.org/message-cursors.html >> >> <pendingQueuePolicy> >> <vmQueueCursor/> >> </pendingQueuePolicy> >> --> >> </policyEntry> >> </policyEntries> >> </policyMap> >> </destinationPolicy> >> >> >> <!-- >> The managementContext is used to configure how ActiveMQ is >> exposed in >> JMX. By default, ActiveMQ uses the MBean server that is >> started by >> the JVM. For more information, see: >> >> http://activemq.apache.org/jmx.html >> --> >> <managementContext> >> <managementContext createConnector="false"/> >> </managementContext> >> >> <!-- >> Configure message persistence for the broker. The default >> persistence >> mechanism is the KahaDB store (identified by the kahaDB tag). >> For more information, see: >> >> http://activemq.apache.org/persistence.html >> --> >> <persistenceAdapter> >> <kahaDB directory="${activemq.base}/data/kahadb"/> >> </persistenceAdapter> >> >> >> <!-- >> The systemUsage controls the maximum amount of space the >> broker will >> use before slowing down producers. For more information, see: >> >> http://activemq.apache.org/producer-flow-control.html >> >> <systemUsage> >> <systemUsage> >> <memoryUsage> >> <memoryUsage limit="20 mb"/> >> </memoryUsage> >> <storeUsage> >> <storeUsage limit="1 gb" name="foo"/> >> </storeUsage> >> <tempUsage> >> <tempUsage limit="100 mb"/> >> </tempUsage> >> </systemUsage> >> </systemUsage> >> --> >> >> <!-- >> The transport connectors expose ActiveMQ over a given >> protocol to >> clients and other brokers. For more information, see: >> >> http://activemq.apache.org/configuring-transports.html >> --> >> <transportConnectors> >> <transportConnector name="openwire" >> uri="tcp://0.0.0.0:61616"/> >> </transportConnectors> >> >> </broker> >> >> <!-- >> Uncomment to enable Camel >> Take a look at activemq-camel.xml for more details >> >> <import resource="camel.xml"/> >> --> >> >> <!-- >> Enable web consoles, REST and Ajax APIs and demos >> Take a look at activemq-jetty.xml for more details >> --> >> <import resource="jetty.xml"/> >> >> </beans> >> >> >> >> I tried to enable camel in activemq.xml by uncommenting the <import >> resource="camel.xml"/> line, but it doesn't seem to help either - still >> getting mixed up messages. >> My camel.xml looks like that: >> >> >> >> <!-- >> Licensed to the Apache Software Foundation (ASF) under one or more >> contributor license agreements. See the NOTICE file distributed with >> this work for additional information regarding copyright ownership. >> The ASF licenses this file to You under the Apache License, Version >> 2.0 >> (the "License"); you may not use this file except in compliance with >> the License. You may obtain a copy of the License at >> >> http://www.apache.org/licenses/LICENSE-2.0 >> >> Unless required by applicable law or agreed to in writing, software >> distributed under the License is distributed on an "AS IS" BASIS, >> WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> See the License for the specific language governing permissions and >> limitations under the License. >> --> >> <!-- >> >> Lets deploy some Enterprise Integration Patterns inside the ActiveMQ >> Message Broker >> For more information, see: >> >> http://camel.apache.org >> >> Include this file in your configuration to enable Camel >> >> e.g. <import resource="camel.xml"/> >> --> >> <beans >> xmlns="http://www.springframework.org/schema/beans" >> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> xsi:schemaLocation="http://camel.apache.org/schema/spring >> http://camel.apache.org/schema/spring/camel-spring.xsd >> http://www.springframework.org/schema/beans >> http://www.springframework.org/schema/beans/spring-beans.xsd"> >> >> <camelContext id="camel" >> xmlns="http://camel.apache.org/schema/spring"> >> >> <!-- You can use a <packages> element for each root package to >> search for Java routes --> >> <packageScan> >> <package>org.foo.bar</package> >> </packageScan> >> >> <!-- You can use Spring XML syntax to define the routes here >> using the <route> element --> >> <route> >> <from uri="activemq:example.A"/> >> <to uri="activemq:example.B"/> >> </route> >> </camelContext> >> >> <!-- >> Lets configure some Camel endpoints >> >> http://camel.apache.org/components.html >> --> >> >> <!-- configure the camel activemq component to use the current broker >> --> >> <bean id="activemq" >> class="org.apache.activemq.camel.component.ActiveMQComponent" > >> <property name="userName" value="${activemq.username}"/> >> <property name="password" value="${activemq.password}"/> >> </bean> >> </beans> >> >> >> >> I'm not sure if any other config files might have impact on what is going >> on in my case. If yes, please do let me know. >> >> regards >> >> >> >> >> >> >> >> >> >> cobrien wrote: >>> >>> Hi, >>> Can you describe a little more your Queue topology and configuration- >>> perhaps even share your config file(s). Not all queue topologies are >>> order preserving! >>> -clark >>> >>> Clark >>> >>> www.ttmsolutions.com >>> ActiveMQ reference guide at >>> http://bit.ly/AMQRefGuide >>> >>> >>> sonicBasher wrote: >>>> >>>> At the end of my camel route I have a following class which I use to >>>> put messages in ActiveMQ: >>>> >>>> public class MessageProducer { >>>> >>>> private ProducerTemplate template = null; >>>> >>>> public MessageProducer(ProducerTemplate template) { >>>> this.template = template; >>>> } >>>> >>>> public void produce(@Body List listOfEttexObj) { >>>> System.out.println("Entered produce() method"); >>>> XStream x = new XStream(); >>>> List<EttexObject> listOfEttexObjects = >>>> (List<EttexObject>)listOfEttexObj; >>>> for (int q = 0; q < listOfEttexObjects.size(); q++) { >>>> EttexObject eo = listOfEttexObjects.get(q); >>>> template.sendBody("jms:" + eo.getAccountName() + >>>> "_2",x.toXML(eo)); >>>> System.out.println("Sent type: " + >>>> eo.getMessage().getType()); >>>> } >>>> } >>>> } >>>> >>>> >>>> *XStream is just a library for converting objects to XML, has nothing >>>> to do with anything >>>> **EttexObject is my POJO class >>>> >>>> Now, I noticed that whenever I send objects (from the list passed as an >>>> argument) to the queues, which I implicitly create using >>>> ProducerTemplate, most of the time the order of delivered messages is >>>> correct, but there's always one queue, which gets the mixed up >>>> messages. I can't really figure out why. I tried to use the requestBody >>>> method, instead of sendBody, to no avail. I also double checked the >>>> lists that come in - the order of objects inside them is fine. Honestly >>>> I'm kind of stuck here. Will be grateful for any tips. >>>> >>>> regards >>>> >>>> p.s. Can synchronous/asynchronous ActiveMQ mode have anything to do >>>> with my problem? >>>> >>> >>> >> >> > > -- View this message in context: http://old.nabble.com/ActiveMQ-mixes-up-order-of-messages-while-receiving-them-from-Camel-tp29008581p29044634.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.