Great, 
If the list can be large you may want to optimize by making the  commit 
independent of the size of the list. 

Clark 

www.ttmsolutions.com 
ActiveMQ reference guide at 
http://bit.ly/AMQRefGuide


ps
And thanks for sharing your code with the community.


sonicBasher wrote:
> 
> Hi Clark,
> 
> I solved my problem using transactions. Here's how I did it, maybe it'll
> help someone who stumbles upon similar problem (you should probably create
> the ActiveMQConnectionFactory and Session stuff elswhere and just pass a
> reference, but I add it here to make things more clear):
> 
> import java.util.List;
> import javax.jms.Connection;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.Queue;
> import javax.jms.Session;
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.camel.Body;
> import org.apache.camel.Handler;
> import com.thoughtworks.xstream.XStream;
> 
> @Handler
>     public void produce(@Body List listOfMyObj) throws JMSException {
>         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
>         Connection connection = cf.createConnection();
>         connection.start();
>         XStream x = new XStream();
>         List<MyObject> listOfMyObjects = (List<MyObject>) listOfMyObj;
>         Session session = connection.createSession(true,
> Session.SESSION_TRANSACTED);
>         String id = listOfMyObjects.get(0).getAccountName();
>         Queue queue = session.createQueue(id);
>         javax.jms.MessageProducer producer =
> session.createProducer(queue);
>         for (int q = 0; q < listOfMyObjects .size(); q++) {
>             MyObject eo = listOfMyObjects .get(q);
>             Message message = session.createTextMessage(x.toXML(eo));
>             producer.send(message);
>             if (q == listOfMyObjects .size() - 1) {
>                 session.commit();
>             }
>         }
>     }
> 
> 
> 
> regards
> 
> 
> 
> 
> 
> cobrien wrote:
>> 
>> Hi,
>> It appears like you are sending messages in async  mode (see link: 
>> http://activemq.apache.org/async-sends.html). Order cannot be guaranteed,
>> even for a single producer, using async mode. So the answer to your
>> question is  yes the sync/async mode can change ordering particularly
>> when  sent over  slow or unreliable networks. 
>> 
>> You also may want to look into using transactions to batch  message
>> sends.
>> The link below has an example Camel configuration.( under connection
>> pooling)
>> 
>> http://camel.apache.org/activemq.html
>> 
>> 
>> Clark 
>> 
>> www.ttmsolutions.com 
>> ActiveMQ reference guide at 
>> http://bit.ly/AMQRefGuide
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>>  
>> 
>> 
>> 
>> 
>> 
>> 
>> sonicBasher wrote:
>>> 
>>> Hi Clark,
>>> 
>>> I'm using Client-Server topology, meaning I just access ActiveMQ via
>>> tcp. I do it like this:
>>> 
>>> final String brokerURL = "tcp://localhost:61616";
>>> ConnectionFactory connectionFactory = new
>>> ActiveMQConnectionFactory(brokerURL);
>>> context.addComponent("jms",
>>> JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
>>> 
>>> where context is obviously my DefaultCamelContext object.
>>> 
>>> As for the config files, I haven't changed anything. My activemq.xml
>>> looks like this:
>>> 
>>> 
>>> 
>>> <!--
>>>     Licensed to the Apache Software Foundation (ASF) under one or more
>>>     contributor license agreements.  See the NOTICE file distributed
>>> with
>>>     this work for additional information regarding copyright ownership.
>>>     The ASF licenses this file to You under the Apache License, Version
>>> 2.0
>>>     (the "License"); you may not use this file except in compliance with
>>>     the License.  You may obtain a copy of the License at
>>>    
>>>     http://www.apache.org/licenses/LICENSE-2.0
>>>    
>>>     Unless required by applicable law or agreed to in writing, software
>>>     distributed under the License is distributed on an "AS IS" BASIS,
>>>     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>> implied.
>>>     See the License for the specific language governing permissions and
>>>     limitations under the License.
>>> -->
>>> <beans
>>>   xmlns="http://www.springframework.org/schema/beans";
>>>   xmlns:amq="http://activemq.apache.org/schema/core";
>>>   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>   xsi:schemaLocation="http://www.springframework.org/schema/beans
>>> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>>>   http://activemq.apache.org/schema/core
>>> http://activemq.apache.org/schema/core/activemq-core.xsd";>
>>> 
>>>     <!-- Allows us to use system properties as variables in this
>>> configuration file -->
>>>     <bean
>>> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>>>         <property name="locations">
>>>            
>>> <value>file:${activemq.base}/conf/credentials.properties</value>
>>>         </property>      
>>>     </bean>
>>> 
>>>     <!-- 
>>>         The <broker> element is used to configure the ActiveMQ broker. 
>>>     -->
>>>     <broker xmlns="http://activemq.apache.org/schema/core";
>>> brokerName="localhost" dataDirectory="${activemq.base}/data"
>>> destroyApplicationContextOnStop="true">
>>>  
>>>         <!--
>>>                     For better performances use VM cursor and small memory 
>>> limit.
>>>                     For more information, see:
>>>             
>>>             http://activemq.apache.org/message-cursors.html
>>>             
>>>             Also, if your producer is "hanging", it's probably due to
>>> producer flow control.
>>>             For more information, see:
>>>             http://activemq.apache.org/producer-flow-control.html
>>>         -->
>>>               
>>>         <destinationPolicy>
>>>             <policyMap>
>>>               <policyEntries>
>>>                 <policyEntry topic=">" producerFlowControl="true"
>>> memoryLimit="1mb">
>>>                   <pendingSubscriberPolicy>
>>>                     <vmCursor />
>>>                   </pendingSubscriberPolicy>
>>>                 </policyEntry>
>>>                 <policyEntry queue=">" producerFlowControl="true"
>>> memoryLimit="1mb">
>>>                   <!-- Use VM cursor for better latency
>>>                        For more information, see:
>>>                        
>>>                        http://activemq.apache.org/message-cursors.html
>>>                        
>>>                   <pendingQueuePolicy>
>>>                     <vmQueueCursor/>
>>>                   </pendingQueuePolicy>
>>>                   -->
>>>                 </policyEntry>
>>>               </policyEntries>
>>>             </policyMap>
>>>         </destinationPolicy> 
>>>  
>>>         
>>>         <!-- 
>>>             The managementContext is used to configure how ActiveMQ is
>>> exposed in 
>>>             JMX. By default, ActiveMQ uses the MBean server that is
>>> started by 
>>>             the JVM. For more information, see: 
>>>             
>>>             http://activemq.apache.org/jmx.html 
>>>         -->
>>>         <managementContext>
>>>             <managementContext createConnector="false"/>
>>>         </managementContext>
>>> 
>>>         <!-- 
>>>             Configure message persistence for the broker. The default
>>> persistence
>>>             mechanism is the KahaDB store (identified by the kahaDB
>>> tag). 
>>>             For more information, see: 
>>>             
>>>             http://activemq.apache.org/persistence.html 
>>>         -->
>>>         <persistenceAdapter>
>>>             <kahaDB directory="${activemq.base}/data/kahadb"/>
>>>         </persistenceAdapter>
>>>         
>>>         
>>>           <!--
>>>             The systemUsage controls the maximum amount of space the
>>> broker will 
>>>             use before slowing down producers. For more information,
>>> see:
>>>             
>>>             http://activemq.apache.org/producer-flow-control.html
>>>              
>>>         <systemUsage>
>>>             <systemUsage>
>>>                 <memoryUsage>
>>>                     <memoryUsage limit="20 mb"/>
>>>                 </memoryUsage>
>>>                 <storeUsage>
>>>                     <storeUsage limit="1 gb" name="foo"/>
>>>                 </storeUsage>
>>>                 <tempUsage>
>>>                     <tempUsage limit="100 mb"/>
>>>                 </tempUsage>
>>>             </systemUsage>
>>>         </systemUsage>
>>>             -->
>>>               
>>>         <!-- 
>>>             The transport connectors expose ActiveMQ over a given
>>> protocol to
>>>             clients and other brokers. For more information, see: 
>>>             
>>>             http://activemq.apache.org/configuring-transports.html 
>>>         -->
>>>         <transportConnectors>
>>>             <transportConnector name="openwire"
>>> uri="tcp://0.0.0.0:61616"/>
>>>         </transportConnectors>
>>> 
>>>     </broker>
>>> 
>>>     <!-- 
>>>         Uncomment to enable Camel
>>>         Take a look at activemq-camel.xml for more details
>>>          
>>>     <import resource="camel.xml"/>
>>>     -->
>>> 
>>>     <!-- 
>>>         Enable web consoles, REST and Ajax APIs and demos
>>>         Take a look at activemq-jetty.xml for more details 
>>>     -->
>>>     <import resource="jetty.xml"/>
>>>     
>>> </beans>
>>> 
>>> 
>>> 
>>> I tried to enable camel in activemq.xml by uncommenting the  <import
>>> resource="camel.xml"/> line, but it doesn't seem to help either - still
>>> getting mixed up messages.
>>> My camel.xml looks like that:
>>> 
>>> 
>>> 
>>> <!--
>>>     Licensed to the Apache Software Foundation (ASF) under one or more
>>>     contributor license agreements.  See the NOTICE file distributed
>>> with
>>>     this work for additional information regarding copyright ownership.
>>>     The ASF licenses this file to You under the Apache License, Version
>>> 2.0
>>>     (the "License"); you may not use this file except in compliance with
>>>     the License.  You may obtain a copy of the License at
>>>    
>>>     http://www.apache.org/licenses/LICENSE-2.0
>>>    
>>>     Unless required by applicable law or agreed to in writing, software
>>>     distributed under the License is distributed on an "AS IS" BASIS,
>>>     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
>>> implied.
>>>     See the License for the specific language governing permissions and
>>>     limitations under the License.
>>> -->
>>> <!-- 
>>> 
>>>     Lets deploy some Enterprise Integration Patterns inside the ActiveMQ
>>> Message Broker
>>>     For more information, see:
>>>     
>>>     http://camel.apache.org
>>>     
>>>     Include this file in your configuration to enable Camel
>>>     
>>>     e.g. <import resource="camel.xml"/>
>>> -->
>>> <beans
>>>    xmlns="http://www.springframework.org/schema/beans";  
>>>    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>>    xsi:schemaLocation="http://camel.apache.org/schema/spring
>>> http://camel.apache.org/schema/spring/camel-spring.xsd
>>>    http://www.springframework.org/schema/beans
>>> http://www.springframework.org/schema/beans/spring-beans.xsd";>
>>>   
>>>     <camelContext id="camel"
>>> xmlns="http://camel.apache.org/schema/spring";>
>>> 
>>>         <!-- You can use a <packages> element for each root package to
>>> search for Java routes -->
>>>         <packageScan>
>>>            <package>org.foo.bar</package>
>>>         </packageScan>
>>> 
>>>         <!-- You can use Spring XML syntax to define the routes here
>>> using the <route> element -->
>>>         <route>
>>>             <from uri="activemq:example.A"/>
>>>             <to uri="activemq:example.B"/>
>>>         </route>
>>>     </camelContext>
>>> 
>>>     <!--
>>>        Lets configure some Camel endpoints
>>>     
>>>        http://camel.apache.org/components.html
>>>     -->
>>> 
>>>     <!-- configure the camel activemq component to use the current
>>> broker -->
>>>     <bean id="activemq"
>>> class="org.apache.activemq.camel.component.ActiveMQComponent" >
>>>        <property name="userName" value="${activemq.username}"/>
>>>        <property name="password" value="${activemq.password}"/>
>>>     </bean>
>>> </beans>
>>> 
>>> 
>>> 
>>> I'm not sure if any other config files might have impact on what is
>>> going on in my case. If yes, please do let me know.
>>> 
>>> regards
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> cobrien wrote:
>>>> 
>>>> Hi,
>>>> Can you describe a little more your Queue topology and configuration-
>>>> perhaps even share your config file(s). Not all queue topologies are
>>>> order preserving!
>>>> -clark
>>>> 
>>>> Clark 
>>>> 
>>>> www.ttmsolutions.com 
>>>> ActiveMQ reference guide at 
>>>> http://bit.ly/AMQRefGuide
>>>> 
>>>> 
>>>> sonicBasher wrote:
>>>>> 
>>>>> At the end of my camel route I have a following class which I use to
>>>>> put messages in ActiveMQ: 
>>>>> 
>>>>> public class MessageProducer { 
>>>>> 
>>>>>     private ProducerTemplate template = null; 
>>>>> 
>>>>>     public MessageProducer(ProducerTemplate template) { 
>>>>>         this.template = template; 
>>>>>     } 
>>>>> 
>>>>>     public void produce(@Body List listOfEttexObj) { 
>>>>>         System.out.println("Entered produce() method"); 
>>>>>         XStream x = new XStream(); 
>>>>>         List<EttexObject> listOfEttexObjects =
>>>>> (List<EttexObject>)listOfEttexObj; 
>>>>>         for (int q = 0; q < listOfEttexObjects.size(); q++) { 
>>>>>             EttexObject eo = listOfEttexObjects.get(q); 
>>>>>             template.sendBody("jms:" + eo.getAccountName() +
>>>>> "_2",x.toXML(eo)); 
>>>>>             System.out.println("Sent type: " +
>>>>> eo.getMessage().getType()); 
>>>>>         } 
>>>>>     } 
>>>>> } 
>>>>> 
>>>>> 
>>>>> *XStream is just a library for converting objects to XML, has nothing
>>>>> to do with anything
>>>>> **EttexObject is my POJO class 
>>>>> 
>>>>> Now, I noticed that whenever I send objects (from the list passed as
>>>>> an argument) to the queues, which I implicitly create using
>>>>> ProducerTemplate, most of the time the order of delivered messages is
>>>>> correct, but there's always one queue, which gets the mixed up
>>>>> messages. I can't really figure out why. I tried to use the
>>>>> requestBody method, instead of sendBody, to no avail. I also double
>>>>> checked the lists that come in - the order of objects inside them is
>>>>> fine. Honestly I'm kind of stuck here. Will be grateful for any tips. 
>>>>> 
>>>>> regards 
>>>>> 
>>>>> p.s. Can synchronous/asynchronous ActiveMQ mode have anything to do
>>>>> with my problem? 
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 

-- 
View this message in context: 
http://old.nabble.com/ActiveMQ-mixes-up-order-of-messages-while-receiving-them-from-Camel-tp29008581p29047332.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to