Thanks for the clarification I have left comments on the PR. PLAT.
Thanks, Penghui On Sat, May 7, 2022 at 9:56 PM Enrico Olivelli <eolive...@gmail.com> wrote: > Peng Hui > > Il giorno ven 6 mag 2022 alle ore 12:54 PengHui Li > <peng...@apache.org> ha scritto: > > > > > The problem is that If I use multiple subscriptions then I am not > > guaranteed that each message is delivered only to one consumer. > > > > It is a weird use case, but it is something that is possible according > > to the JMS specs. > > I am mapping a JMS Queue with a Subscription, and everything works well, > > but you can have multiple Consumers, with different Selectors, that > > compete on the same Queue (Subscription). > > > > You can have two Consumers with overlapping selectors and we have to > > guarantee that each message is processed > > only by one of the two consumers. > > This is why I need consumer level filtering and not only > > per-subscription filters. > > > > It is an edge case and this is why I don't want to build a brand new > > dispatcher to cover this feature. > > > > Currently, without server-side filtering, my implementation uses > > client side negative acks for a Consumer to reject the message and > > leave > > it to the other consumers attached to the subscription. > > > > Oh I see. I understand the difference now. > > > > Looks like the proposal is a more simple way to cope with the > requirement. > > > > > How do we handle the case that all the consumers `RESCHEDULE` the > message? > > Some consumers cannot start normally in a period of time or > user-specified > > matching > > rules cannot cover all messages. > > > > How to handle this case currently? I think we need to ensure the dispatch > > thread will not > > run into an infinite loop. > > The infinite loop is not happening for two reasons in the current > implementation in my PR: > - at each run of the dispatcher we select a List<Entry> and we process > each entry only once, we are not re-adding the entry to the list of > entries to process immediately > - when we do RESCHEDULE we do the negative ack with a delay > (simulating the client side negative acknowledgement delay), this > allows the dispatcher to select new entries at the next round > > If it happens that there is no Consumer that is able to process the > message we get to the same situation we have in Key Shared > subscription when there is no consumer available for a given key: > the dispatcher cannot make progress, but it is not in a busy loop. > > I hope that this clarifies > > Enrico > > > > > > Thanks, > > Penghui > > > > On Fri, May 6, 2022 at 5:45 PM Haiting Jiang <jianghait...@apache.org> > > wrote: > > > > > It looks like we can add something like `EntryDispatcher` before the > > > EntryFilter. > > > Mixing entry filtering and consumer selecting seems a little confusing. > > > > > > The `EntryDispatcher` could works as a consumer selector in > > > `PersistentDispatcherMultipleConsumers`. > > > It accepts an entry and a consumer list, returns the consumer this > entry > > > should dispatch to. > > > The implementation could be provided by user like EntryDispatcher. > > > > > > Thanks, > > > Haiting > > > > > > On 2022/05/05 12:41:10 Enrico Olivelli wrote: > > > > Hello, > > > > I am trying to use PIP-105 and I found out that we are missing a few > > > > little things to cover my user case. > > > > In my case I have two consumers who compete on the same SHARED > > > > subscription with a "message filter". > > > > The filter is passed as Consumer metadata. > > > > > > > > When you have two Consumers connected on the Subscription the > > > > dispatcher prepares to send the message to one consumer at a time. > > > > > > > > The Message goes through the EntryFilter that decides if the Entry > > > > matches the requirements of the Consumer. > > > > - if the message matches the consumer then it returns ACCEPT > > > > - if the message does not match the consumer then it has to be > > > > rescheduled (RESCHEDULE) > > > > > > > > With this small extension to PIP-105 we can cover this simple > scenario > > > > without the need to introduce a new Dispatcher policy > > > > > > > > I sent out a patch with the implementation and a test case that > shows my > > > usecase > > > > https://github.com/apache/pulsar/pull/15391 > > > > > > > > Introducing RESCHEDULE needs some level of discussion here. > > > > > > > > With PIP-105 we are anticipating in the broker a decision that the > > > > Consumer would take when the Message is already dispatched to the > > > > application: > > > > A) ignore the message: acknowledge immediately, without processing. > > > (REJECT) > > > > B) postpone the message (or let it be processed from another > > > > consumer): negatively acknowledge immediately, without processing. > > > > (RESCHEDULE) > > > > > > > > With the initial implementation of PIP-105 we are covering case A, > and > > > > with my proposal I want to give the opportunity to implement case B. > > > > > > > > The only point that is not covered by my proposal is that the NACK on > > > > the client happens only after a delay on the client, and this has > some > > > > side effects. > > > > In fact that "delay" from the client allows the dispatcher to read > > > > more entries because it thinks that the message has been dispatched > > > > successfully, and it is allowed to move forward. > > > > I would prefer to start a separate discussion for this "problem", > that > > > > is in part related to how we deal with messages to be replayed and it > > > > is not strictly related to my PR. > > > > > > > > > > > > Cheers > > > > Enrico > > > > > > > >