One more point. Instead of keeping track of the latest message seen by the application, the logic in my solution would actually just check the last message in the `incomingMessages` queue (as in the most recently added), and use that as the read position in the subscribe command. If we made this change, we would have to change this code [0] to not drop the `incomingMessages` queue.
Thanks, Michael [0] https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795 On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mmarsh...@apache.org> wrote: > > > if we add the new field in CommandSubscribe, we should ensure > > the synchronization between consumer reconnection and user > > calling receive and redeliverUnack method. it will affect the performance > > of receive. expose synchronization to hot paths it not a good idea. > > I don't think this is a valid objection. I am pretty sure we already > synchronize in the relevant places in the consumer to solve the exact > race condition you're concerned about: [0] [1]. > > My proposed operation is to keep track of the latest message id that > the application has seen, and then tell the broker that id when > sending the Subscribe command. We already do similar logic here [2] > [3], but instead of getting the first message id the consumer hasn't > seen, we'll get the latest message id seen. > > Regarding performance, the PIP doesn't touch on how it will filter out > messages. What is the planned approach? In my understanding, the > client will keep track of the latest message id that the application > has seen and then will need to compare that message id against every > new mess. As such, it seems like telling the broker where to start > instead of naively checking a filter on every message would be > cheaper. > > > As described in Compatibility in PIP. Client consumer doesn't know > > Pulsar Admin reset cursor. > > The problem of "the consumer doesn't know" seems like something that > is reasonably within the protocol's responsibilities. In this case, an > event happens on the broker, and the broker can tell the consumer. > > > * <p>Consumers should close when the server resets the cursor, > > * when the cursor reset success, and then restart. Otherwise, > > * the consumer will not receive the history messages. > > This is introducing a confusing edge case that requires reading a > Javadoc in order to understand. That seems risky to me, and I do not > think we should add such an edge case. A new protocol message would > easily handle it and make it transparent to the application. > > Thanks, > Michael > > [0] > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912 > [1] > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876 > [2] > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795 > [3] > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960 > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng > <yubiao.f...@streamnative.io.invalid> wrote: > > > > +1 > > > > Hi, Bo : > > > > Thanks for your explanation. That makes sense to me. > > > > Thanks, > > Yubiao Feng > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <congbobo...@gmail.com> wrote: > > > > > Hi, pulsar community: > > > > > > I started a PIP about `Client consumer filter received messages`. > > > > > > PIP: https://github.com/apache/pulsar/issues/19864 > > > > > > Thanks, > > > Bo > > >