Il Ven 25 Mar 2022, 17:36 Zixuan Liu <node...@gmail.com> ha scritto: > Hi Pulsar community, > > I create a PIP for support read the message of `startMessageId` position on > the broker side. > > The proposal can be found: https://github.com/apache/pulsar/issues/14883 > > --------- > ## Motivation > > Currently, the Pulsar-client supports setting the `startMessageId` for > Consumer and Reader, and also supports reading the message of > `startMessageId` position. > > Assume, we have two message id 1 and 2: > > - When we set `earliest` as `strartMessageId` value, we can the message of > message id 1 > - When we set `latest` as as `strartMessageId` value, we cannot get any > message > > Sometimes we want to read the message id 2 for the first time,
do you mean that you want to start reading from the latest message? I suggest to add more messages to the example, in order to make it clearer (like 1,2,3,4,5) we have only > one approach in client: > > ``` > Reader<byte[]> reader = pulsarClient.newReader() > .topic(topicName) > .subscriptionName(subscriptionName) > .startMessageId(MessageId.latest) > .startMessageIdInclusive() > .create(); > > reader.hasMessageAvailable(); > Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS); > ``` > > Call `reader.hasMessageAvailable()` before `reader.readNext()`, which can > seek to the message id 2, this approach seems confusing. Agreed If we do this on > the broker side, which will make things easier. > > ## Goal > > This PIP proposes support read the message of `startMessageId` position on > the broker side. > > ## Implementation > > ### Protocol > > Add a `start_message_id_inclusive` field to `CommandSubscribe` for > determine whether to read the message of `startMessageId` position: > > ``` > message CommandSubscribe { > // some fields > > // If specified, the subscription will read the message from the start > message id position. > optional bool start_message_id_inclusive = 20 [default = false]; > } > ``` > > ### ManagedCursorImpl > > Add a check in > > `org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`. > > > We only need to care that the `startMessageId` is `MessageId.latest` and > the`start_message_id_inclusive` is `true`, we get latest position from > ledger as `readPosition` value, otherwise if > the`start_message_id_inclusive` is `false`, get next position of the > latest position as `readPosition` value. > I am not sure if this will change the semantics of existing programs. Shall we have to add a new configuration option? Otherwise programs written the way you are describing won't work anymore Enrico > --------- > > Thanks, > Zixuan >