Yah, it's probably best to figure out how things are being sent right now.

If messages are sent async and without a producer window then you'll hit
the type of flow control that you did.

Messages are sent async by default if using non-persistent messages or
during a transaction. The only time any synchronous communication will be
done during a transaction is when you commit (or rollback).

So you can set ActiveMQConnection#setAlwaysSyncSend which will always send
sync for all types of sends (non-persistent and during a transaction). This
will enable the type of flow control on the broker side that blocks only
the one producer. The drawbacks are it has more network overhead and is
likely slower. How much slower is something you may wish to measure.
Depending on your application requirements, it might not be a big deal.

You can also set the producer window on the client. The producer window
establishes a client-side contract that basically says "I will only sned N
number of bytes to you until I hear back that the message has been
received" You can set this value (in number of bytes) on
ActiveMQConnection#setProducerWindowSize. This will also achieve blocking
only the one producer instead of the entire connection.

Lastly, note that persistent messages are always sent synchronous so by
default would block only the producer and not the entire connection.


On Wed, Jan 2, 2013 at 1:40 PM, Mohit Anchlia <mohitanch...@gmail.com>wrote:

> Thanks! Sounds like there might be another way we can configure the flow
> control that blocks only one producer? Do you have any recommendation on
> what changes I should make to the config?
>
> On Wed, Jan 2, 2013 at 12:33 PM, Christian Posta
> <christian.po...@gmail.com>wrote:
>
> > Enqueue count = number of messages sent to the queue
> > Dequeue count = number of messages ack'd by the consumer
> > Inflight count = number of messages sent to consumer's session (and are
> not
> > ack'd by consumer)
> > Dispatch count = messages sent to the consumer (dequeue + inflight)
> >
> > I asked about the async sends because the producer flow control you hit
> was
> > the kind that will block the entire connection (instead of just blocking
> > that one producer).
> >
> >
> >
> >
> >
> > On Wed, Jan 2, 2013 at 12:41 PM, Mohit Anchlia <mohitanch...@gmail.com
> > >wrote:
> >
> > > Thanks again! Is the dispatch queue size the one that relates to what's
> > > held in memory?
> > >
> > > We are using mule and the endpoint has no transactions configured. What
> > did
> > > you mean by forcing async?
> > >
> > > On Wed, Jan 2, 2013 at 11:06 AM, Christian Posta
> > > <christian.po...@gmail.com>wrote:
> > >
> > > > Correct. The memoryUsage is increased when messages are held in
> memory.
> > > > This happens when messages are paged in from the store. They are then
> > > > dispatched to consumers and held until the consumer acks.
> > > >
> > > > In your case looks like a particular destination (eventsEndpoint) is
> > > > filling up somehow. Can you look in JMX and see how many enqueued,
> > > > dispatched, dequeued, inflight etc? Also check to see how many
> > > > subscriptions to that destination, and see what those
> dispatch-counter
> > > and
> > > > dispatch queue size looks like.
> > > >
> > > > BTW any chance you're sending (producers) using a transaction, or
> > forcing
> > > > sends to be async?
> > > >
> > > >
> > > >
> > > > On Wed, Jan 2, 2013 at 11:51 AM, Mohit Anchlia <
> mohitanch...@gmail.com
> > > > >wrote:
> > > >
> > > > > So if I understand correctly I shouldn't be seeing this behaviour
> > > unless
> > > > > something is being held in memory. We ack immediately so I am kind
> of
> > > > > wondering what might be going on here. Is there a way to find out
> > what
> > > > > might be happening here?
> > > > >
> > > > > On Wed, Jan 2, 2013 at 10:03 AM, Christian Posta
> > > > > <christian.po...@gmail.com>wrote:
> > > > >
> > > > > > What info can you provide about how clients are consuming from
> the
> > > > > queues?
> > > > > > Any chance there are a bunch of messages "inflight" which haven't
> > > been
> > > > > > ack'd by the consumer?
> > > > > >
> > > > > >
> > > > > > On Wed, Jan 2, 2013 at 10:20 AM, Mohit Anchlia <
> > > mohitanch...@gmail.com
> > > > > > >wrote:
> > > > > >
> > > > > > > I see these messages being logged. Queues are setup to use
> > > persistent
> > > > > > > store. I am wondering what's causing this messages to get
> > blocked.
> > > I
> > > > > see
> > > > > > > there is a memoryLimit but if I am using persistent=true then
> > > should
> > > > > this
> > > > > > > be the desired behaviour?
> > > > > > >
> > > > > > >
> > > > > > > 2013-01-02 05:06:13,388 | INFO |
> > > > > > > Usage(default:memory:queue://eventsEndpoint:memory)
> > > percentUsage=0%,
> > > > > > > usage=0, limit=20971520,
> > > > > > > percentUsageMinDelta=1%;Parent:Usage(default:memory)
> > > > percentUsage=106%,
> > > > > > > usage=22366029, limit=20971520, percentUsageMinDelta=1%: Usage
> > > > Manager
> > > > > > > Memory Limit reached. Producer
> > > > > > > (ID:pfdamq301.ie.net-54582-1351809386355-2:1:1:1) stopped to
> > > prevent
> > > > > > > flooding queue://eventsEndpoint. See
> > > > > > >
> > > > > > >
> > > > > > >    <broker xmlns="http://activemq.apache.org/schema/core";
> > > > > > > brokerName="static-broker1" persistent="true"
> > > > > > > dataDirectory="${activemq.data}">
> > > > > > >
> > > > > > >         <!-- Destination specific policies using destination
> > names
> > > or
> > > > > > > wildcards -->
> > > > > > >         <destinationPolicy>
> > > > > > >             <policyMap>
> > > > > > >                 <policyEntries>
> > > > > > >                     <policyEntry queue=">"
> > > producerFlowControl="true"
> > > > > > > memoryLimit="20mb">
> > > > > > >                         <deadLetterStrategy>
> > > > > > >                           <individualDeadLetterStrategy
> > > > > > queuePrefix="DLQ."
> > > > > > > useQueueForQueueMessages="true" />
> > > > > > >                         </deadLetterStrategy>
> > > > > > >                     </policyEntry>
> > > > > > >                     <policyEntry topic=">"
> > > producerFlowControl="true"
> > > > > > > memoryLimit="20mb">
> > > > > > >                     </policyEntry>
> > > > > > >                 </policyEntries>
> > > > > > >             </policyMap>
> > > > > > >         </destinationPolicy>
> > > > > > >
> > > > > > >         <!-- Use the following to configure how ActiveMQ is
> > exposed
> > > > in
> > > > > > JMX
> > > > > > > -->
> > > > > > >         <managementContext>
> > > > > > >             <managementContext createConnector="true"/>
> > > > > > >         </managementContext>
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > *Christian Posta*
> > > > > > http://www.christianposta.com/blog
> > > > > > twitter: @christianposta
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > *Christian Posta*
> > > > http://www.christianposta.com/blog
> > > > twitter: @christianposta
> > > >
> > >
> >
> >
> >
> > --
> > *Christian Posta*
> > http://www.christianposta.com/blog
> > twitter: @christianposta
> >
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Reply via email to