Il Ven 25 Mar 2022, 17:36 Zixuan Liu <node...@gmail.com> ha scritto:

> Hi Pulsar community,
>
> I create a PIP for support read the message of `startMessageId` position on
> the broker side.
>
> The proposal can be found: https://github.com/apache/pulsar/issues/14883
>
> ---------
> ## Motivation
>
> Currently, the Pulsar-client supports setting the `startMessageId` for
> Consumer and Reader, and also supports reading the message of
> `startMessageId` position.
>
> Assume, we have two message id 1 and 2:
>
> - When we set `earliest` as `strartMessageId` value, we can the message of
> message id 1
> - When we set `latest` as as `strartMessageId` value, we cannot get any
> message
>
> Sometimes we want to read the message id 2 for the first time,

do you mean that you want to start reading from the latest message?

I suggest to add more messages to the example, in order to make it clearer
(like 1,2,3,4,5)



we have only
> one approach in client:
>
> ```
>  Reader<byte[]> reader = pulsarClient.newReader()
>                 .topic(topicName)
>                 .subscriptionName(subscriptionName)
>                 .startMessageId(MessageId.latest)
>                 .startMessageIdInclusive()
>                 .create();
>
> reader.hasMessageAvailable();
> Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
> ```
>
> Call `reader.hasMessageAvailable()` before `reader.readNext()`, which can
> seek to the message id 2, this approach seems confusing.


Agreed


If we do this on
> the broker side, which will make things easier.
>
> ## Goal
>
> This PIP proposes support read the message of `startMessageId` position on
> the broker side.
>
> ## Implementation
>
> ### Protocol
>
> Add a `start_message_id_inclusive` field to `CommandSubscribe` for
> determine whether to read the message of `startMessageId` position:
>
> ```
> message CommandSubscribe {
>     // some fields
>
>     // If specified, the subscription will read the message from the start
> message id position.
>     optional bool start_message_id_inclusive = 20 [default = false];
> }
> ```
>
> ### ManagedCursorImpl
>
> Add a check in
>
> `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.
>
>
> We only need to care that the `startMessageId` is `MessageId.latest` and
> the`start_message_id_inclusive` is `true`, we get latest position from
> ledger as `readPosition` value, otherwise if
> the`start_message_id_inclusive` is `false`,  get next position of the
> latest position as `readPosition` value.
>

I am not sure if this will change the semantics of existing programs.

Shall we have to add a new configuration option?

Otherwise programs written the way you are describing won't work anymore

Enrico




> ---------
>
> Thanks,
> Zixuan
>

Reply via email to