Hi Pulsar community,

I create a PIP for support read the message of `startMessageId` position on
the broker side.

The proposal can be found: https://github.com/apache/pulsar/issues/14883

---------
## Motivation

Currently, the Pulsar-client supports setting the `startMessageId` for
Consumer and Reader, and also supports reading the message of
`startMessageId` position.

Assume, we have two message id 1 and 2:

- When we set `earliest` as `strartMessageId` value, we can the message of
message id 1
- When we set `latest` as as `strartMessageId` value, we cannot get any
message

Sometimes we want to read the message id 2 for the first time, we have only
one approach in client:

```
 Reader<byte[]> reader = pulsarClient.newReader()
                .topic(topicName)
                .subscriptionName(subscriptionName)
                .startMessageId(MessageId.latest)
                .startMessageIdInclusive()
                .create();

reader.hasMessageAvailable();
Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
```

Call `reader.hasMessageAvailable()` before `reader.readNext()`, which can
seek to the message id 2, this approach seems confusing.  If we do this on
the broker side, which will make things easier.

## Goal

This PIP proposes support read the message of `startMessageId` position on
the broker side.

## Implementation

### Protocol

Add a `start_message_id_inclusive` field to `CommandSubscribe` for
determine whether to read the message of `startMessageId` position:

```
message CommandSubscribe {
    // some fields

    // If specified, the subscription will read the message from the start
message id position.
    optional bool start_message_id_inclusive = 20 [default = false];
}
```

### ManagedCursorImpl

Add a check in
`org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition`.


We only need to care that the `startMessageId` is `MessageId.latest` and
the`start_message_id_inclusive` is `true`, we get latest position from
ledger as `readPosition` value, otherwise if
the`start_message_id_inclusive` is `false`,  get next position of the
latest position as `readPosition` value.

---------

Thanks,
Zixuan

Reply via email to