> regarding the compatibility, should we check the protocol version at the
client side?
The old version version doesn't support `start_message_id_inclusive` which
means the
client side still needs to do the seek operation while requesting an old
version broker.

Oh sorry, my fault. If the broker side does not support
`start_message_id_inclusive`, users
need to call `reader.hasMessageAvailable()` first.

+1 for the proposal.

Penghui

On Mon, Apr 11, 2022 at 9:10 AM PengHui Li <peng...@apache.org> wrote:

> Hi zixuan,
>
> The proposal looks good,
> regarding the compatibility, should we check the protocol version at the
> client side?
> The old version version doesn't support `start_message_id_inclusive` which
> means the
> client side still needs to do the seek operation while requesting an old
> version broker.
>
> > Notice that the users still can read the message of the latest position
>  by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
>
> For this part, I don't think it should be a notice, We do not deliberately
> modify current behavior,
> just fix the problem if users don't want to call
> reader.hasMessageAvailable() because they know
> topic has at least one message. Add notice here, it looks like after this
> proposal, we are not
> suggesting to use hasMessageAvailable() any more.
>
> Penghui
>
> On Wed, Apr 6, 2022 at 4:54 PM Zixuan Liu <node...@gmail.com> wrote:
>
>> Hi Pulsar community,
>>
>> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
>>
>> Thanks,
>> Zixuan
>>
>> -----
>>
>> Discussion thread:
>> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
>>
>> ## Motivation
>>
>> Currently, the Pulsar-client supports setting the `startMessageId` for
>> Consumer and Reader, and also supports reading the message of
>> `startMessageId` position.
>>
>> Assume, we have two message id 1,2,3,4 in the topic:
>>
>> - When we set `earliest` as `startMessageId` value, we can get the message
>> of message id 1
>> - When we set `latest` as `startMessageId` value, we can't get any message
>>
>>  Sometimes we want to read the message id 4 for the first time, we have
>> only one approach in client:
>>
>> ```
>>  Reader<byte[]> reader = pulsarClient.newReader()
>>                 .topic(topicName)
>>                 .subscriptionName(subscriptionName)
>>                 .startMessageId(MessageId.latest)
>>                 .startMessageIdInclusive()
>>                 .create();
>>
>> reader.hasMessageAvailable();
>> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
>> ```
>>
>> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
>> correct message id 4, which include seek action when the
>> `startMessageIdInclusive()` is enabled.
>>
>> This approach is confusing.   If we do this on the broker side, it will
>> make things easier.
>>
>> ## Goal
>>
>> This PIP proposes support for reading the message of `startMessageId`
>> position on the broker side:
>>
>> - Add to `Consumer`
>> - Add to `Reader`
>>
>> ## Implementation
>>
>> ### Protocol
>>
>> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
>> determine whether to read the message of `startMessageId` position:
>>
>> ```
>> message CommandSubscribe {
>>     // some fields
>>
>>     // If specified, the subscription will read the message from the start
>> message id position.
>>     optional bool start_message_id_inclusive = 20 [default = false];
>> }
>> ```
>>
>> ### ManagedCursorImpl
>>
>> Add a check in
>>
>> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
>>
>>
>> We only need to care that the `startMessageId` is `MessageId.latest` and
>> the`start_message_id_inclusive` is `true`, we get latest position from
>> ledger as `readPosition` value, otherwise if
>> the`start_message_id_inclusive` is `false`,  get next position of the
>> latest position as `readPosition` value.
>>
>> ### Client
>>
>> The `Consumer` and `Reader` support setting the
>> `start_message_id_inclusive` value to `CommandSubscribe` command.
>>
>> ### Compatibility
>>
>> This feature can have both backward and forward compatibility, this means
>> the users can use any client to request any broker.
>>
>> Notice that the users still can read the message of the latest position by
>> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
>> call can be ignored when using the new client and the new broker.
>>
>

Reply via email to