I just missed the point that the reset cursor operations do not work
for the consumer. IIUC, the seek operation does not work as well. Then
I think the option is not user-friendly as the PIP says:

>  It needs to be enabled with a complete understanding of this configuration.

If users want, they can also record the latest position for each
consumer at the application side and filter the messages by the public
`MessageId#compareTo` API. If hiding these details in SDK still
requires users to know these details, I think it would not be better
than doing that explicitly in the application.

Thanks,
Yunze

On Wed, Mar 22, 2023 at 10:29 AM 丛搏 <congbobo...@gmail.com> wrote:
>
> Hi, Michael:
>
> Michael Marshall <mmarsh...@apache.org> 于2023年3月21日周二 23:17写道:
>
> >
> > One more point. Instead of keeping track of the latest message seen by
> > the application, the logic in my solution would actually just check
> > the last message in the `incomingMessages` queue (as in the most
> > recently added), and use that as the read position in the subscribe
> > command. If we made this change, we would have to change this code [0]
> > to not drop the `incomingMessages` queue.
>
> case 1:
> What we define the message that the application has seen?
> I think it is the[0], when the `incomingMessages` queue is empty,
> how do we get the correct `startPosition`?
> What I think we should lock the receive logic in [1]
> ```
> synchronized (this) {
>     message = incomingMessages.take();
>     messageProcessed(message);
> }
> ```
> why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> same logic? it's a bad code.
>
> case 2:
> If we sub with `startMessageId`, we also should lock any enqueue
> logic, like [2] and
> check to consumer's current state
> ```
> synchronized (this) {
>     if (consumer.isConnected) {
>         if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
>             // After we have enqueued the messages on
> `incomingMessages` queue, we cannot touch the message
>             // instance anymore, since for pooled messages, this
> instance was possibly already been released
>             // and recycled.
>             INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
>             getMemoryLimitController().ifPresent(limiter ->
> limiter.forceReserveMemory(messageSize));
>             updateAutoScaleReceiverQueueHint();
>         }
>     }
> }
> ```
> case 3:
> when we subcommand sends to broker with `startMessageId = 1`, then the
> broker push message
> has not yet entered `incommingQueue`, the application invokes
> redeliver. in this way, we don't
> filter messages are correct, right?
>
> These are some cases that I simply thought of, and there must be
> others that I haven't thought
> of. Are you sure we can handle these problems correctly?
>
> > The problem of "the consumer doesn't know" seems like something that
> > is reasonably within the protocol's responsibilities. In this case, an
> > event happens on the broker, and the broker can tell the consumer.
>
> I don't think a simple change protocol can solve these problems,
> We can't promise that every consumer can receive the broker reset
> cursor request.
> When the consumer reconnects, the broker can't send the reset cursor request 
> to
> the client consumers, right? In this case, the consumer is still unaware, 
> right?
>
>
> [0] 
> https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> [1] 
> https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> [2] 
> https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> >
> > Thanks,
> > Michael
> >
> > [0] 
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> >
> > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall <mmarsh...@apache.org> 
> > wrote:
> > >
> > > > if we add the new field in CommandSubscribe, we should ensure
> > > > the synchronization between consumer reconnection and user
> > > > calling receive and redeliverUnack method. it will affect the 
> > > > performance
> > > > of receive. expose synchronization to hot paths it not a good idea.
> > >
> > > I don't think this is a valid objection. I am pretty sure we already
> > > synchronize in the relevant places in the consumer to solve the exact
> > > race condition you're concerned about: [0] [1].
> > >
> > > My proposed operation is to keep track of the latest message id that
> > > the application has seen, and then tell the broker that id when
> > > sending the Subscribe command. We already do similar logic here [2]
> > > [3], but instead of getting the first message id the consumer hasn't
> > > seen, we'll get the latest message id seen.
> > >
> > > Regarding performance, the PIP doesn't touch on how it will filter out
> > > messages. What is the planned approach? In my understanding, the
> > > client will keep track of the latest message id that the application
> > > has seen and then will need to compare that message id against every
> > > new mess. As such, it seems like telling the broker where to start
> > > instead of naively checking a filter on every message would be
> > > cheaper.
> > >
> > > > As described in Compatibility in PIP. Client consumer doesn't know
> > > > Pulsar Admin reset cursor.
> > >
> > > The problem of "the consumer doesn't know" seems like something that
> > > is reasonably within the protocol's responsibilities. In this case, an
> > > event happens on the broker, and the broker can tell the consumer.
> > >
> > > > * <p>Consumers should close when the server resets the cursor,
> > > > * when the cursor reset success, and then restart. Otherwise,
> > > > * the consumer will not receive the history messages.
> > >
> > > This is introducing a confusing edge case that requires reading a
> > > Javadoc in order to understand. That seems risky to me, and I do not
> > > think we should add such an edge case. A new protocol message would
> > > easily handle it and make it transparent to the application.
> > >
> > > Thanks,
> > > Michael
> > >
> > > [0] 
> > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> > > [1] 
> > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> > > [2] 
> > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > > [3] 
> > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
> > >
> > > On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
> > > <yubiao.f...@streamnative.io.invalid> wrote:
> > > >
> > > > +1
> > > >
> > > > Hi, Bo :
> > > >
> > > > Thanks for your explanation. That makes sense to me.
> > > >
> > > > Thanks,
> > > > Yubiao Feng
> > > >
> > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏 <congbobo...@gmail.com> wrote:
> > > >
> > > > > Hi, pulsar community:
> > > > >
> > > > > I started a PIP about `Client consumer filter received messages`.
> > > > >
> > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > >
> > > > > Thanks,
> > > > > Bo
> > > > >

Reply via email to