> The problem is that If I use multiple subscriptions then I am not guaranteed that each message is delivered only to one consumer.
It is a weird use case, but it is something that is possible according to the JMS specs. I am mapping a JMS Queue with a Subscription, and everything works well, but you can have multiple Consumers, with different Selectors, that compete on the same Queue (Subscription). You can have two Consumers with overlapping selectors and we have to guarantee that each message is processed only by one of the two consumers. This is why I need consumer level filtering and not only per-subscription filters. It is an edge case and this is why I don't want to build a brand new dispatcher to cover this feature. Currently, without server-side filtering, my implementation uses client side negative acks for a Consumer to reject the message and leave it to the other consumers attached to the subscription. Oh I see. I understand the difference now. Looks like the proposal is a more simple way to cope with the requirement. > How do we handle the case that all the consumers `RESCHEDULE` the message? Some consumers cannot start normally in a period of time or user-specified matching rules cannot cover all messages. How to handle this case currently? I think we need to ensure the dispatch thread will not run into an infinite loop. Thanks, Penghui On Fri, May 6, 2022 at 5:45 PM Haiting Jiang <jianghait...@apache.org> wrote: > It looks like we can add something like `EntryDispatcher` before the > EntryFilter. > Mixing entry filtering and consumer selecting seems a little confusing. > > The `EntryDispatcher` could works as a consumer selector in > `PersistentDispatcherMultipleConsumers`. > It accepts an entry and a consumer list, returns the consumer this entry > should dispatch to. > The implementation could be provided by user like EntryDispatcher. > > Thanks, > Haiting > > On 2022/05/05 12:41:10 Enrico Olivelli wrote: > > Hello, > > I am trying to use PIP-105 and I found out that we are missing a few > > little things to cover my user case. > > In my case I have two consumers who compete on the same SHARED > > subscription with a "message filter". > > The filter is passed as Consumer metadata. > > > > When you have two Consumers connected on the Subscription the > > dispatcher prepares to send the message to one consumer at a time. > > > > The Message goes through the EntryFilter that decides if the Entry > > matches the requirements of the Consumer. > > - if the message matches the consumer then it returns ACCEPT > > - if the message does not match the consumer then it has to be > > rescheduled (RESCHEDULE) > > > > With this small extension to PIP-105 we can cover this simple scenario > > without the need to introduce a new Dispatcher policy > > > > I sent out a patch with the implementation and a test case that shows my > usecase > > https://github.com/apache/pulsar/pull/15391 > > > > Introducing RESCHEDULE needs some level of discussion here. > > > > With PIP-105 we are anticipating in the broker a decision that the > > Consumer would take when the Message is already dispatched to the > > application: > > A) ignore the message: acknowledge immediately, without processing. > (REJECT) > > B) postpone the message (or let it be processed from another > > consumer): negatively acknowledge immediately, without processing. > > (RESCHEDULE) > > > > With the initial implementation of PIP-105 we are covering case A, and > > with my proposal I want to give the opportunity to implement case B. > > > > The only point that is not covered by my proposal is that the NACK on > > the client happens only after a delay on the client, and this has some > > side effects. > > In fact that "delay" from the client allows the dispatcher to read > > more entries because it thinks that the message has been dispatched > > successfully, and it is allowed to move forward. > > I would prefer to start a separate discussion for this "problem", that > > is in part related to how we deal with messages to be replayed and it > > is not strictly related to my PR. > > > > > > Cheers > > Enrico > > >