+1, good idea! Thanks, Zixuan
Qiang Huang <qiang.huang1...@gmail.com> 于2022年7月24日周日 22:25写道: > Hi Pulsar community: > I open a pip to discuss "Pulsar client: seek command add epoch" > Proposal Link: > > - issue link: https://github.com/apache/pulsar/issues/16757 > > -- > ## Motivation > `Reader` belongs to exclusive subscription type, and it uses `nonDurable` > cursor. After receiving messages, `Reader` will ack cumulatively > immediately. > The `flowPermits` are triggered in multiple scenarios from the client side > and it is isolated from `seek` of `Consumer`. Therefore, it is possibile > that `flowPermits` will execute after `seek` from the client side, like the > following flow chart. > > [image: image.png] > > When `handleSeek` processing is delay from the server side, the > `MarkDelete position` is modified in a wrong way. > The expected result is that `Reader`can re-consume messages from `mark > delete:(1,1)` after `seek`. But it doesn't work. > > Pulsar read message and seek position is not a synchronous operation, the > seek request can't prevent an in-process entry reading operation. The > client-side also has an opportunity to receive messages after the seek > position. > > Pulsar client make read messages operation and seek position operation > synchronized so add an epoch into server and client consumer. After client > reader consumer invoke `seek` , the epoch increase 1 and send `seek` > command carry the epoch and then server consumer will update the epoch. > When dispatcher messages to client will carry the epoch which the cursor > read at the time. Client consumer will filter the send messages command > which is smaller than current epoch. > In this way, after the client consumer send `seek` command successfully, > because it has passed the epoch filtering, the consumer will not receive a > message with a messageID greater than the user previously seek position. > > > ### Current implementation details > #### CommandSeek Protocal > ```proto > // Reset an existing consumer to a particular message id > message CommandSeek { > required uint64 consumer_id = 1; > required uint64 request_id = 2; > > optional MessageIdData message_id = 3; > optional uint64 message_publish_time = 4; > } > ``` > ### CommandMessage > ```proto > message CommandMessage { > required uint64 consumer_id = 1; > required MessageIdData message_id = 2; > optional uint32 redelivery_count = 3 [default = 0]; > repeated int64 ack_set = 4; > optional uint64 epoch = 5 [default = 0]; > } > ``` > `CommandMessage` already add epoch by [PIP-84]( > https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch) > , when client receive `CommandMessage` will compare the command epoch and > local epoch to handle this command. > > ## Goal > Add epoch into seek command. > > ## API Changes > ### Protocal change: CommandSeek > ```proto > // Reset an existing consumer to a particular message id > message CommandSeek { > required uint64 consumer_id = 1; > required uint64 request_id = 2; > > optional MessageIdData message_id = 3; > optional uint64 message_publish_time = 4; > optional uint64 consumer_epoch = 5; > } > ``` > `CommandSeek` command add epoch field, when client send seek command to > server successfully, the server will change the server consumer epoch to > the command epoch. The epoch only can bigger than the old epoch in server. > Now the client can filter out the message which contains less consumer > epoch. > > ## Implementation > - stage 1: Check the current cursor status when handling flowPermits from > the server side. > - stage 2: Add epoch into seek command, and server update the consumer > epoch. It can prevent an in-process entry reading operation after the seek > request. > > ## Reject Alternatives > None yet. > > ## Note > 1. Consumer reconnect need reset epoch. > > -- > BR, > Qiang Huang >