+1 

Thanks,
Haiting

On 2022/04/06 08:54:05 Zixuan Liu wrote:
> Hi Pulsar community,
> 
> Start voting for PIP-150: https://github.com/apache/pulsar/issues/14883
> 
> Thanks,
> Zixuan
> 
> -----
> 
> Discussion thread:
> https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
> 
> ## Motivation
> 
> Currently, the Pulsar-client supports setting the `startMessageId` for
> Consumer and Reader, and also supports reading the message of
> `startMessageId` position.
> 
> Assume, we have two message id 1,2,3,4 in the topic:
> 
> - When we set `earliest` as `startMessageId` value, we can get the message
> of message id 1
> - When we set `latest` as `startMessageId` value, we can't get any message
> 
>  Sometimes we want to read the message id 4 for the first time, we have
> only one approach in client:
> 
> ```
>  Reader<byte[]> reader = pulsarClient.newReader()
>                 .topic(topicName)
>                 .subscriptionName(subscriptionName)
>                 .startMessageId(MessageId.latest)
>                 .startMessageIdInclusive()
>                 .create();
> 
> reader.hasMessageAvailable();
> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> ```
> 
> Call `reader.hasMessageAvailable()` before `reader.readNext()` can get the
> correct message id 4, which include seek action when the
> `startMessageIdInclusive()` is enabled.
> 
> This approach is confusing.   If we do this on the broker side, it will
> make things easier.
> 
> ## Goal
> 
> This PIP proposes support for reading the message of `startMessageId`
> position on the broker side:
> 
> - Add to `Consumer`
> - Add to `Reader`
> 
> ## Implementation
> 
> ### Protocol
> 
> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> determine whether to read the message of `startMessageId` position:
> 
> ```
> message CommandSubscribe {
>     // some fields
> 
>     // If specified, the subscription will read the message from the start
> message id position.
>     optional bool start_message_id_inclusive = 20 [default = false];
> }
> ```
> 
> ### ManagedCursorImpl
> 
> Add a check in
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
> 
> 
> We only need to care that the `startMessageId` is `MessageId.latest` and
> the`start_message_id_inclusive` is `true`, we get latest position from
> ledger as `readPosition` value, otherwise if
> the`start_message_id_inclusive` is `false`,  get next position of the
> latest position as `readPosition` value.
> 
> ### Client
> 
> The `Consumer` and `Reader` support setting the
> `start_message_id_inclusive` value to `CommandSubscribe` command.
> 
> ### Compatibility
> 
> This feature can have both backward and forward compatibility, this means
> the users can use any client to request any broker.
> 
> Notice that the users still can read the message of the latest position by
> call `reader.hasMessageAvailable()` before `reader.readNext()`, but this
> call can be ignored when using the new client and the new broker.
> 

Reply via email to