+1 Thanks, Hang
Zixuan Liu <node...@gmail.com> 于2022年4月11日周一 18:30写道: > > This way is right. > > PengHui Li <peng...@apache.org> 于2022年4月11日周一 09:12写道: > > > > regarding the compatibility, should we check the protocol version at the > > client side? > > The old version version doesn't support `start_message_id_inclusive` which > > means the > > client side still needs to do the seek operation while requesting an old > > version broker. > > > > Oh sorry, my fault. If the broker side does not support > > `start_message_id_inclusive`, users > > need to call `reader.hasMessageAvailable()` first. > > > > +1 for the proposal. > > > > Penghui > > > > On Mon, Apr 11, 2022 at 9:10 AM PengHui Li <peng...@apache.org> wrote: > > > > > Hi zixuan, > > > > > > The proposal looks good, > > > regarding the compatibility, should we check the protocol version at the > > > client side? > > > The old version version doesn't support `start_message_id_inclusive` > > which > > > means the > > > client side still needs to do the seek operation while requesting an old > > > version broker. > > > > > > > Notice that the users still can read the message of the latest position > > > by > > > call `reader.hasMessageAvailable()` before `reader.readNext()`, but this > > > call can be ignored when using the new client and the new broker. > > > > > > For this part, I don't think it should be a notice, We do not > > deliberately > > > modify current behavior, > > > just fix the problem if users don't want to call > > > reader.hasMessageAvailable() because they know > > > topic has at least one message. Add notice here, it looks like after this > > > proposal, we are not > > > suggesting to use hasMessageAvailable() any more. > > > > > > Penghui > > > > > > On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <node...@gmail.com> wrote: > > > > > >> Hi Pulsar community, > > >> > > >> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883 > > >> > > >> Thanks, > > >> Zixuan > > >> > > >> ----- > > >> > > >> Discussion thread: > > >> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl > > >> > > >> ## Motivation > > >> > > >> Currently, the Pulsar-client supports setting the `startMessageId` for > > >> Consumer and Reader, and also supports reading the message of > > >> `startMessageId` position. > > >> > > >> Assume, we have two message id 1,2,3,4 in the topic: > > >> > > >> - When we set `earliest` as `startMessageId` value, we can get the > > message > > >> of message id 1 > > >> - When we set `latest` as `startMessageId` value, we can't get any > > message > > >> > > >> Sometimes we want to read the message id 4 for the first time, we have > > >> only one approach in client: > > >> > > >> ``` > > >> Reader<byte[]> reader = pulsarClient.newReader() > > >> .topic(topicName) > > >> .subscriptionName(subscriptionName) > > >> .startMessageId(MessageId.latest) > > >> .startMessageIdInclusive() > > >> .create(); > > >> > > >> reader.hasMessageAvailable(); > > >> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS); > > >> ``` > > >> > > >> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get > > the > > >> correct message id 4, which include seek action when the > > >> `startMessageIdInclusive()` is enabled. > > >> > > >> This approach is confusing. If we do this on the broker side, it will > > >> make things easier. > > >> > > >> ## Goal > > >> > > >> This PIP proposes support for reading the message of `startMessageId` > > >> position on the broker side: > > >> > > >> - Add to `Consumer` > > >> - Add to `Reader` > > >> > > >> ## Implementation > > >> > > >> ### Protocol > > >> > > >> Add a `start_message_id_inclusive` field to `CommandSubscribe` for > > >> determine whether to read the message of `startMessageId` position: > > >> > > >> ``` > > >> message CommandSubscribe { > > >> // some fields > > >> > > >> // If specified, the subscription will read the message from the > > start > > >> message id position. > > >> optional bool start_message_id_inclusive = 20 [default = false]; > > >> } > > >> ``` > > >> > > >> ### ManagedCursorImpl > > >> > > >> Add a check in > > >> > > >> > > `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`. > > >> > > >> > > >> We only need to care that the `startMessageId` is `MessageId.latest` and > > >> the`start_message_id_inclusive` is `true`, we get latest position from > > >> ledger as `readPosition` value, otherwise if > > >> the`start_message_id_inclusive` is `false`, get next position of the > > >> latest position as `readPosition` value. > > >> > > >> ### Client > > >> > > >> The `Consumer` and `Reader` support setting the > > >> `start_message_id_inclusive` value to `CommandSubscribe` command. > > >> > > >> ### Compatibility > > >> > > >> This feature can have both backward and forward compatibility, this > > means > > >> the users can use any client to request any broker. > > >> > > >> Notice that the users still can read the message of the latest position > > by > > >> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this > > >> call can be ignored when using the new client and the new broker. > > >> > > > > >