Hi zixuan, The proposal looks good, regarding the compatibility, should we check the protocol version at the client side? The old version version doesn't support `start_message_id_inclusive` which means the client side still needs to do the seek operation while requesting an old version broker.
> Notice that the users still can read the message of the latest position by call `reader.hasMessageAvailable()` before `reader.readNext()`, but this call can be ignored when using the new client and the new broker. For this part, I don't think it should be a notice, We do not deliberately modify current behavior, just fix the problem if users don't want to call reader.hasMessageAvailable() because they know topic has at least one message. Add notice here, it looks like after this proposal, we are not suggesting to use hasMessageAvailable() any more. Penghui On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <node...@gmail.com> wrote: > Hi Pulsar community, > > Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883 > > Thanks, > Zixuan > > ----- > > Discussion thread: > https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl > > ## Motivation > > Currently, the Pulsar-client supports setting the `startMessageId` for > Consumer and Reader, and also supports reading the message of > `startMessageId` position. > > Assume, we have two message id 1,2,3,4 in the topic: > > - When we set `earliest` as `startMessageId` value, we can get the message > of message id 1 > - When we set `latest` as `startMessageId` value, we can't get any message > > Sometimes we want to read the message id 4 for the first time, we have > only one approach in client: > > ``` > Reader<byte[]> reader = pulsarClient.newReader() > .topic(topicName) > .subscriptionName(subscriptionName) > .startMessageId(MessageId.latest) > .startMessageIdInclusive() > .create(); > > reader.hasMessageAvailable(); > Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS); > ``` > > Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the > correct message id 4, which include seek action when the > `startMessageIdInclusive()` is enabled. > > This approach is confusing. If we do this on the broker side, it will > make things easier. > > ## Goal > > This PIP proposes support for reading the message of `startMessageId` > position on the broker side: > > - Add to `Consumer` > - Add to `Reader` > > ## Implementation > > ### Protocol > > Add a `start_message_id_inclusive` field to `CommandSubscribe` for > determine whether to read the message of `startMessageId` position: > > ``` > message CommandSubscribe { > // some fields > > // If specified, the subscription will read the message from the start > message id position. > optional bool start_message_id_inclusive = 20 [default = false]; > } > ``` > > ### ManagedCursorImpl > > Add a check in > > `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`. > > > We only need to care that the `startMessageId` is `MessageId.latest` and > the`start_message_id_inclusive` is `true`, we get latest position from > ledger as `readPosition` value, otherwise if > the`start_message_id_inclusive` is `false`, get next position of the > latest position as `readPosition` value. > > ### Client > > The `Consumer` and `Reader` support setting the > `start_message_id_inclusive` value to `CommandSubscribe` command. > > ### Compatibility > > This feature can have both backward and forward compatibility, this means > the users can use any client to request any broker. > > Notice that the users still can read the message of the latest position by > call `reader.hasMessageAvailable()` before `reader.readNext()`, but this > call can be ignored when using the new client and the new broker. >