Hi zixuan,

The proposal looks good,
regarding the compatibility, should we check the protocol version at the
client side?
The old version version doesn't support `start_message_id_inclusive` which
means the
client side still needs to do the seek operation while requesting an old
version broker.

> Notice that the users still can read the message of the latest position by
call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
call can be ignored when using the new client and the new broker.

For this part, I don't think it should be a notice, We do not deliberately
modify current behavior,
just fix the problem if users don't want to call
reader.hasMessageAvailable() because they know
topic has at least one message. Add notice here, it looks like after this
proposal, we are not
suggesting to use hasMessageAvailable() any more.

Penghui

On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <node...@gmail.com> wrote:

> Hi Pulsar community,
>
> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
>
> Thanks,
> Zixuan
>
> -----
>
> Discussion thread:
> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
>
> ## Motivation
>
> Currently, the Pulsar-client supports setting the `startMessageId` for
> Consumer and Reader, and also supports reading the message of
> `startMessageId` position.
>
> Assume, we have two message id 1,2,3,4 in the topic:
>
> - When we set `earliest` as `startMessageId` value, we can get the message
> of message id 1
> - When we set `latest` as `startMessageId` value, we can't get any message
>
>  Sometimes we want to read the message id 4 for the first time, we have
> only one approach in client:
>
> ```
>  Reader<byte[]> reader = pulsarClient.newReader()
>                 .topic(topicName)
>                 .subscriptionName(subscriptionName)
>                 .startMessageId(MessageId.latest)
>                 .startMessageIdInclusive()
>                 .create();
>
> reader.hasMessageAvailable();
> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> ```
>
> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
> correct message id 4, which include seek action when the
> `startMessageIdInclusive()` is enabled.
>
> This approach is confusing.   If we do this on the broker side, it will
> make things easier.
>
> ## Goal
>
> This PIP proposes support for reading the message of `startMessageId`
> position on the broker side:
>
> - Add to `Consumer`
> - Add to `Reader`
>
> ## Implementation
>
> ### Protocol
>
> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> determine whether to read the message of `startMessageId` position:
>
> ```
> message CommandSubscribe {
>     // some fields
>
>     // If specified, the subscription will read the message from the start
> message id position.
>     optional bool start_message_id_inclusive = 20 [default = false];
> }
> ```
>
> ### ManagedCursorImpl
>
> Add a check in
>
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
>
>
> We only need to care that the `startMessageId` is `MessageId.latest` and
> the`start_message_id_inclusive` is `true`, we get latest position from
> ledger as `readPosition` value, otherwise if
> the`start_message_id_inclusive` is `false`,  get next position of the
> latest position as `readPosition` value.
>
> ### Client
>
> The `Consumer` and `Reader` support setting the
> `start_message_id_inclusive` value to `CommandSubscribe` command.
>
> ### Compatibility
>
> This feature can have both backward and forward compatibility, this means
> the users can use any client to request any broker.
>
> Notice that the users still can read the message of the latest position by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
>

Reply via email to