This PIP seems want to enable the feature of "startMessageIdInclusive" on 
Subscribe command, right? 
I notice that you only mentions Reader in this PIP. What about normal consumer 
? 
It also has the config of  `startMessageIdInclusive` in ConsumerBuilder. 


Thanks,
Haiting

On 2022/03/25 16:35:50 Zixuan Liu wrote:
> Hi Pulsar community,
> 
> I create a PIP for support read the message of `startMessageId` position on
> the broker side.
> 
> The proposal can be found: https://github.com/apache/pulsar/issues/14883
> 
> ---------
> ## Motivation
> 
> Currently, the Pulsar-client supports setting the `startMessageId` for
> Consumer and Reader, and also supports reading the message of
> `startMessageId` position.
> 
> Assume, we have two message id 1 and 2:
> 
> - When we set `earliest` as `strartMessageId` value, we can the message of
> message id 1
> - When we set `latest` as as `strartMessageId` value, we cannot get any
> message
> 
> Sometimes we want to read the message id 2 for the first time, we have only
> one approach in client:
> 
> ```
>  Reader<byte[]> reader = pulsarClient.newReader()
>                 .topic(topicName)
>                 .subscriptionName(subscriptionName)
>                 .startMessageId(MessageId.latest)
>                 .startMessageIdInclusive()
>                 .create();
> 
> reader.hasMessageAvailable();
> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> ```
> 
> Call `reader.hasMessageAvailable()` before `reader.readNext()`, which can
> seek to the message id 2, this approach seems confusing.  If we do this on
> the broker side, which will make things easier.
> 
> ## Goal
> 
> This PIP proposes support read the message of `startMessageId` position on
> the broker side.
> 
> ## Implementation
> 
> ### Protocol
> 
> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> determine whether to read the message of `startMessageId` position:
> 
> ```
> message CommandSubscribe {
>     // some fields
> 
>     // If specified, the subscription will read the message from the start
> message id position.
>     optional bool start_message_id_inclusive = 20 [default = false];
> }
> ```
> 
> ### ManagedCursorImpl
> 
> Add a check in
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
> 
> 
> We only need to care that the `startMessageId` is `MessageId.latest` and
> the`start_message_id_inclusive` is `true`, we get latest position from
> ledger as `readPosition` value, otherwise if
> the`start_message_id_inclusive` is `false`,  get next position of the
> latest position as `readPosition` value.
> 
> ---------
> 
> Thanks,
> Zixuan
> 

Reply via email to