Thanks for your vote! Closed by 3 (+1) binding vote and 0 (-1) vote.

Thanks,
Zixuan

Zixuan Liu <node...@gmail.com> 于2022年4月6日周三 16:54写道:

> Hi Pulsar community,
>
> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
>
> Thanks,
> Zixuan
>
> -----
>
> Discussion thread:
> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
>
> ## Motivation
>
> Currently, the Pulsar-client supports setting the `startMessageId` for
> Consumer and Reader, and also supports reading the message of
> `startMessageId` position.
>
> Assume, we have two message id 1,2,3,4 in the topic:
>
> - When we set `earliest` as `startMessageId` value, we can get the message
> of message id 1
> - When we set `latest` as `startMessageId` value, we can't get any message
>
>  Sometimes we want to read the message id 4 for the first time, we have
> only one approach in client:
>
> ```
>  Reader<byte[]> reader = pulsarClient.newReader()
>                 .topic(topicName)
>                 .subscriptionName(subscriptionName)
>                 .startMessageId(MessageId.latest)
>                 .startMessageIdInclusive()
>                 .create();
>
> reader.hasMessageAvailable();
> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> ```
>
> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
> correct message id 4, which include seek action when the
> `startMessageIdInclusive()` is enabled.
>
> This approach is confusing.   If we do this on the broker side, it will
> make things easier.
>
> ## Goal
>
> This PIP proposes support for reading the message of `startMessageId`
> position on the broker side:
>
> - Add to `Consumer`
> - Add to `Reader`
>
> ## Implementation
>
> ### Protocol
>
> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> determine whether to read the message of `startMessageId` position:
>
> ```
> message CommandSubscribe {
>     // some fields
>
>     // If specified, the subscription will read the message from the start
> message id position.
>     optional bool start_message_id_inclusive = 20 [default = false];
> }
> ```
>
> ### ManagedCursorImpl
>
> Add a check in
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
>
>
> We only need to care that the `startMessageId` is `MessageId.latest` and
> the`start_message_id_inclusive` is `true`, we get latest position from
> ledger as `readPosition` value, otherwise if
> the`start_message_id_inclusive` is `false`,  get next position of the
> latest position as `readPosition` value.
>
> ### Client
>
> The `Consumer` and `Reader` support setting the
> `start_message_id_inclusive` value to `CommandSubscribe` command.
>
> ### Compatibility
>
> This feature can have both backward and forward compatibility, this means
> the users can use any client to request any broker.
>
> Notice that the users still can read the message of the latest position by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
>
>
>
>

Reply via email to