I have debugged into ActiveMQMessageConsumer.acknowledge void acknowledge(MessageDispatch md, byte ackType) throws JMSException { MessageAck ack = new MessageAck(md, ackType, 1); session.sendAck(ack); synchronized(deliveredMessages){ deliveredMessages.remove(md); } }
1. it seems remove message from deliveredMessages is synchronized 2. session.sendAck(ack); sendAck(ack,false); asyncSendPacket(ack); connection.asyncSendPacket(command); connection is thread safe, so message.acknowledge is thread-safe? On Wed, Mar 5, 2014 at 6:09 PM, Noel OConnor <noel.ocon...@gmail.com> wrote: > Be careful according to the JMS 1.02 spec JMS sessions aren't thread safe > so I don't recommend taking that approach. > I haven't used the individual acknowledge so you'll have to check that > yourself. > > > On Wed, Mar 5, 2014 at 8:46 PM, Li Li <fancye...@gmail.com> wrote: > >> If I have one session and a receiver thread. this thread receive ten >> messages to ten worker thread. >> each worker thread acknowledge it's message? >> receiver thread: >> while(true){ >> Message msg=receiver.receive(); >> put msg to a ConcurrentLinkedList; >> >> Worker thread: >> while(true){ >> Message msg=getMessgeFromLinkedList(); >> process msg; >> msg.acknowledge(); >> } >> >> >> On Wed, Mar 5, 2014 at 5:08 PM, Noel OConnor <noel.ocon...@gmail.com> >> wrote: >> > have you seen ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE >> > >> > see >> > >> http://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQSession.html#INDIVIDUAL_ACKNOWLEDGE >> > >> > >> > On Wed, Mar 5, 2014 at 6:48 PM, Li Li <fancye...@gmail.com> wrote: >> > >> >> hi all, >> >> I want to process a batch of message using my own priority >> >> algorithm. But in JMS, I can't acknowledge a single message and can >> >> only acknowledge a session. So I decide to do it like this: >> >> 1. Create N(=10000) sessions >> >> 2. using a thread to manage session acknowledge like: >> >> for(int i=0;i<N;i++){ >> >> if session acknowledged{ >> >> receive A Message without waiting; >> >> put this message to my own priority queue; >> >> } >> >> } >> >> 3. using another thread to process my own queue; >> >> get a message from my own queue; >> >> process this message; >> >> get the session of this message; >> >> acknowledge this session; >> >> >> >> because a session will receive only a message a time and it >> >> will be blocked until this message is processed. I need create many >> >> sessions(maybe I can make session a pool) . I don't know whether >> >> activemq can deal with so much session. for a single consumer, I will >> >> create 10000 session. if I have ten consumers, then 100,000 session be >> >> created at the same time. >> >> >>