As I understand it, the answer is "because the cursor isn't for the
consumer, it's for all consumers and therefore it can't account for your
consumer's selector".  Keep in mind that for a queue, messages are
available to the next consumer to take them, so if you have one cursor per
consumer you'll have to synchronize between the cursors to make sure they
can't both take the same message; the simpler solution (which is
incomplete, as you've found) is to just have a single cursor and
synchronize removals from it, which is what has been implemented (as I
understand it).

I think an enhancement is clearly needed here, whether it's a cursor per
consumer or a second cursor that iterates over the messages that aren't yet
in the cursor offering them to only the consumers that have reached the end
of the primary cursor or something else.  I recommend you submit an
enhancement request in JIRA to make sure this stays on people's radar and
eventually gets implemented.  (Though first you should search JIRA to make
sure there isn't already a request for this enhancement; if there is, add
detail if necessary in the comments and vote for it.)

Tim
On Apr 21, 2015 9:08 PM, "Jon Gorrono" <jpgorr...@ucdavis.edu> wrote:

> Thanks for the response..
>
> Yeah I could figure out a way to drain the queue, but it seemed to me like
> something the server should be handling for the general case: I've
> subscribed with a selector, why is the cursor for this connection filled
> with anything that is not selected? I realize that may/must be an
> architectural limitation eg, the cursors, as currently defined, are
> 'closer' to the queue than the selection algorithms... I could accomplish
> this with a browsing connection and delayed/ out-of-band storage of
> ack-worthy messages... but, as I said, seems central to the function of a
> JMS server.
>
> On Thu, Apr 16, 2015 at 8:43 PM, Tim Bain <tb...@alumni.duke.edu> wrote:
>
> > I think the behavior you're seeing has to do with message cursors (
> > http://activemq.apache.org/message-cursors.html) than with missing acks.
> > Basically once the cursor is full of messages that don't match your
> > selectors, no new messages will come out of the message store into the
> > cursor until one of the messages already in the cursor is consumed, and
> you
> > don't have any consumers that are going to do that.
> >
> > You could look for a way to increase the cursor size (I'm not sure if
> > that's possible; that web page doesn't describe how to do it and I've
> never
> > tried), you could set message expiration dates on your messages so that
> the
> > ones early in the queue eventually clear out (but then you have to set
> that
> > each time you restart your test, and you have to set them to different
> > values so that they roll off over time rather than all at once), or you
> > could just add another consumer that slowly consumes all messages - no
> > selector - to help clear the cursor (which is probably the easiest option
> > of the three)...  Just put a small sleep in that consumer's message
> handler
> > and you should eventually work through the problem.  Or maybe someone
> else
> > has an even better solution that I haven't thought of.
> >
> > Tim
> >
> > On Thu, Apr 16, 2015 at 4:39 PM, Jon Gorrono <jpgorr...@ucdavis.edu>
> > wrote:
> >
> > > I am trying to do some testing an limit the output from a queue that
> has
> > > about 4500 messages in it. Each message contains between 300 and 500
> > > documents attached that i process individually. But I am trying to test
> > > only specific message types and not see the others so I restart the
> > client
> > > each cycle with different selector header values. I only have the one
> > > client (stomp) running at any time (besides the admin web app)... all
> > > connections are with ack: client-individual
> > >
> > > The problem I see is that the selector works fine if the 'top' of the
> > queue
> > > contains messages that match the selector, but I see nothing if the
> > > selector matches messages deep in the queue. The client just waits
> > forever
> > > ... it seems to me that the server is waiting for acks but since the
> > > selector hides the initial messages from the client, there's nothing
> for
> > it
> > > to ack.
> > >
> > > There are no producers active... I restore the kahadb each time I need
> to
> > > replenish the queue
> > >
> > > I have DEBUG log4j settings for activemq packages but the logs only log
> > > checkpoints and expiry scans
> > >
> > >
> > > I would expect that the selector would cause the server to browser down
> > the
> > > queue for message it can dequeue. But does not appear to be happening
> > >
> > >
> > > broker config:
> > >  <broker xmlns="http://activemq.apache.org/schema/core";
> > > brokerName="localhost" dataDirectory="${activemq.data}">
> > >
> > >
> > >         <destinationPolicy>
> > >             <policyMap>
> > >               <policyEntries>
> > >                 <policyEntry topic=">" producerFlowControl="true">
> > >
> > >                   <pendingMessageLimitStrategy>
> > >                     <constantPendingMessageLimitStrategy limit="1000"/>
> > >                   </pendingMessageLimitStrategy>
> > >                 </policyEntry>
> > >                 <policyEntry queue=">" producerFlowControl="true"
> > > memoryLimit="1mb">
> > >
> > >                 </policyEntry>
> > >               </policyEntries>
> > >             </policyMap>
> > >         </destinationPolicy>
> > >
> > >
> > >
> > >         <managementContext>
> > >             <managementContext createConnector="false"/>
> > >         </managementContext>
> > >
> > >
> > >         <persistenceAdapter>
> > >             <kahaDB directory="${activemq.data}/kahadb"/>
> > >         </persistenceAdapter>
> > >
> > >
> > >
> > >           <systemUsage>
> > >             <systemUsage>
> > >                 <memoryUsage>
> > >                     <memoryUsage limit="64 mb"/>
> > >                 </memoryUsage>
> > >                 <storeUsage>
> > >                     <storeUsage limit="100 gb"/>
> > >                 </storeUsage>
> > >                 <tempUsage>
> > >                     <tempUsage limit="50 gb"/>
> > >                 </tempUsage>
> > >             </systemUsage>
> > >         </systemUsage>
> > >
> > >
> > >         <transportConnectors>
> > >
> > >         <transportConnector name="openwire"
> > >
> > >
> >
> uri="tcp://host:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
> > >         <transportConnector name="amqp"
> > >
> > >
> >
> uri="amqp://host:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
> > >         <transportConnector name="stomp" uri="stomp://host:61613"/>
> > >         </transportConnectors>
> > >
> > >
> > >         <shutdownHooks>
> > >             <bean xmlns="http://www.springframework.org/schema/beans";
> > > class="org.apache.activemq.hooks.SpringContextHook" />
> > >         </shutdownHooks>
> > >
> > >     </broker>
> > >
> > > --
> > > Jon Gorrono
> > > PGP Key: 0x5434509D - http{
> > > pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
> > > http{middleware.ucdavis.edu}
> > >
> >
>
>
>
> --
> Jon Gorrono
> PGP Key: 0x5434509D - http{
> pgp.mit.edu:11371/pks/lookup?search=0x5434509D&op=index}
> http{middleware.ucdavis.edu}
>

Reply via email to