> regarding the compatibility, should we check the protocol version at the client side? The old version version doesn't support `start_message_id_inclusive` which means the client side still needs to do the seek operation while requesting an old version broker.
Oh sorry, my fault. If the broker side does not support `start_message_id_inclusive`, users need to call `reader.hasMessageAvailable()` first. +1 for the proposal. Penghui On Mon, Apr 11, 2022 at 9:10 AM PengHui Li <peng...@apache.org> wrote: > Hi zixuan, > > The proposal looks good, > regarding the compatibility, should we check the protocol version at the > client side? > The old version version doesn't support `start_message_id_inclusive` which > means the > client side still needs to do the seek operation while requesting an old > version broker. > > > Notice that the users still can read the message of the latest position > by > call `reader.hasMessageAvailable()` before `reader.readNext()`, but this > call can be ignored when using the new client and the new broker. > > For this part, I don't think it should be a notice, We do not deliberately > modify current behavior, > just fix the problem if users don't want to call > reader.hasMessageAvailable() because they know > topic has at least one message. Add notice here, it looks like after this > proposal, we are not > suggesting to use hasMessageAvailable() any more. > > Penghui > > On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <node...@gmail.com> wrote: > >> Hi Pulsar community, >> >> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883 >> >> Thanks, >> Zixuan >> >> ----- >> >> Discussion thread: >> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl >> >> ## Motivation >> >> Currently, the Pulsar-client supports setting the `startMessageId` for >> Consumer and Reader, and also supports reading the message of >> `startMessageId` position. >> >> Assume, we have two message id 1,2,3,4 in the topic: >> >> - When we set `earliest` as `startMessageId` value, we can get the message >> of message id 1 >> - When we set `latest` as `startMessageId` value, we can't get any message >> >> Sometimes we want to read the message id 4 for the first time, we have >> only one approach in client: >> >> ``` >> Reader<byte[]> reader = pulsarClient.newReader() >> .topic(topicName) >> .subscriptionName(subscriptionName) >> .startMessageId(MessageId.latest) >> .startMessageIdInclusive() >> .create(); >> >> reader.hasMessageAvailable(); >> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS); >> ``` >> >> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the >> correct message id 4, which include seek action when the >> `startMessageIdInclusive()` is enabled. >> >> This approach is confusing. If we do this on the broker side, it will >> make things easier. >> >> ## Goal >> >> This PIP proposes support for reading the message of `startMessageId` >> position on the broker side: >> >> - Add to `Consumer` >> - Add to `Reader` >> >> ## Implementation >> >> ### Protocol >> >> Add a `start_message_id_inclusive` field to `CommandSubscribe` for >> determine whether to read the message of `startMessageId` position: >> >> ``` >> message CommandSubscribe { >> // some fields >> >> // If specified, the subscription will read the message from the start >> message id position. >> optional bool start_message_id_inclusive = 20 [default = false]; >> } >> ``` >> >> ### ManagedCursorImpl >> >> Add a check in >> >> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`. >> >> >> We only need to care that the `startMessageId` is `MessageId.latest` and >> the`start_message_id_inclusive` is `true`, we get latest position from >> ledger as `readPosition` value, otherwise if >> the`start_message_id_inclusive` is `false`, get next position of the >> latest position as `readPosition` value. >> >> ### Client >> >> The `Consumer` and `Reader` support setting the >> `start_message_id_inclusive` value to `CommandSubscribe` command. >> >> ### Compatibility >> >> This feature can have both backward and forward compatibility, this means >> the users can use any client to request any broker. >> >> Notice that the users still can read the message of the latest position by >> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this >> call can be ignored when using the new client and the new broker. >> >