Peng Hui

Il giorno ven 6 mag 2022 alle ore 12:54 PengHui Li
<peng...@apache.org> ha scritto:
>
> > The problem is that If I use multiple subscriptions then I am not
> guaranteed that each message is delivered only to one consumer.
>
> It is a weird use case, but it is something that is possible according
> to the JMS specs.
> I am mapping a JMS Queue with a Subscription, and everything works well,
> but you can have multiple Consumers, with different Selectors, that
> compete on the same Queue (Subscription).
>
> You can have two Consumers with overlapping selectors and we have to
> guarantee that each message is processed
> only by one of the two consumers.
> This is why I need consumer level filtering and not only
> per-subscription filters.
>
> It is an edge case and this is why I don't want to build a brand new
> dispatcher to cover this feature.
>
> Currently, without server-side filtering, my implementation uses
> client side negative acks for a Consumer to reject the message and
> leave
> it to the other consumers attached to the subscription.
>
> Oh I see. I understand the difference now.
>
> Looks like the proposal is a more simple way to cope with the requirement.
>
> > How do we handle the case that all the consumers `RESCHEDULE` the message?
> Some consumers cannot start normally in a period of time or user-specified
> matching
> rules cannot cover all messages.
>
> How to handle this case currently? I think we need to ensure the dispatch
> thread will not
> run into an infinite loop.

The infinite loop is not happening for two reasons in the current
implementation in my PR:
- at each run of the dispatcher we select a List<Entry> and we process
each entry only once, we are not re-adding the entry to the list of
entries to process immediately
- when we do RESCHEDULE we do the negative ack with a delay
(simulating the client side negative acknowledgement delay), this
allows the dispatcher to select new entries at the next round

If it happens that there is no Consumer that is able to process the
message we get to the same situation we have in Key Shared
subscription when there is no consumer available for a given key:
the dispatcher cannot make progress, but it is not in a busy loop.

I hope that this clarifies

Enrico


>
> Thanks,
> Penghui
>
> On Fri, May 6, 2022 at 5:45 PM Haiting Jiang <jianghait...@apache.org>
> wrote:
>
> > It looks like we can add something like  `EntryDispatcher`  before the
> > EntryFilter.
> > Mixing entry filtering and consumer selecting seems a little confusing.
> >
> > The `EntryDispatcher` could works as a consumer selector in
> > `PersistentDispatcherMultipleConsumers`.
> > It accepts an entry and a consumer list, returns the consumer this entry
> > should dispatch to.
> > The implementation could be provided by user like  EntryDispatcher.
> >
> > Thanks,
> > Haiting
> >
> > On 2022/05/05 12:41:10 Enrico Olivelli wrote:
> > > Hello,
> > > I am trying to use PIP-105 and I found out that we are missing a few
> > > little things to cover my user case.
> > > In my case I have two consumers who compete on the same SHARED
> > > subscription with a "message filter".
> > > The filter is passed as Consumer metadata.
> > >
> > > When you have two Consumers connected on the Subscription the
> > > dispatcher prepares to send the message to one consumer at a time.
> > >
> > > The Message goes through the EntryFilter that decides if the Entry
> > > matches the requirements of the Consumer.
> > > - if the message matches the consumer then it returns ACCEPT
> > > - if the message does not match the consumer then it has to be
> > > rescheduled (RESCHEDULE)
> > >
> > > With this small extension to PIP-105 we can cover this simple scenario
> > > without the need to introduce a new Dispatcher policy
> > >
> > > I sent out a patch with the implementation and a test case that shows my
> > usecase
> > > https://github.com/apache/pulsar/pull/15391
> > >
> > > Introducing RESCHEDULE needs some level of discussion here.
> > >
> > > With PIP-105 we are anticipating in the broker a decision that the
> > > Consumer would take when the Message is already dispatched to the
> > > application:
> > > A) ignore the message: acknowledge immediately, without processing.
> > (REJECT)
> > > B) postpone the message (or let it be processed from another
> > > consumer): negatively acknowledge immediately, without processing.
> > > (RESCHEDULE)
> > >
> > > With the initial implementation of PIP-105 we are covering case A, and
> > > with my proposal I want to give the opportunity to implement case B.
> > >
> > > The only point that is not covered by my proposal is that the NACK on
> > > the client happens only after a delay on the client, and this has some
> > > side effects.
> > > In fact that "delay" from the client allows the dispatcher to read
> > > more entries because it thinks that the message has been dispatched
> > > successfully, and it is allowed to move forward.
> > > I would prefer to start a separate discussion for this "problem", that
> > > is in part related to how we deal with messages to be replayed and it
> > > is not strictly related to my PR.
> > >
> > >
> > > Cheers
> > > Enrico
> > >
> >

Reply via email to