Hi Haiting, good point! we need to pay attention to all reset cursor operations. I think this is a configurable optimization because some users don't call `void redeliverUnacknowledgedMessages().` This optimization is a break-change operation. I will find all the operations to reset the cursor, and give the solution, and I will update it in the mail later.
Thanks! Bo Haiting Jiang <jianghait...@gmail.com> 于2022年9月9日周五 10:33写道: > > Hi Bo, > > Overall it makes sense to me. > It is basically the same as broker side deduplication mechanism when > producing messages, which uses `sequenceId`. > In your case, messageId is used for deduplication. It should work as > long as the received messageId increases monotonically. > > So we should be careful of any operations that would reset the cursor. > For example, if the user resets the cursor with the admin client. We > need more detail info on this matter. > And I am not sure if there are other operations that would reset the > cursor implicitly. > > Thanks, > Haiting > > On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <congbobo...@gmail.com> wrote: > > > > Hi Haiting > > > > When using cumulative ack, we can save the maximum received MessageId > > on the consumer client side to filter the message duplication caused > > by reconnection, if the consumer client process restarts the maximum > > received MessageId will not exist in the consumer client. This > > requires the user to be responsible for the received messages, and if > > the user wants to re-consume the received messages, they need to call > > `void redeliverUnacknowledgedMessages().` then clear the maximum > > received MessageId from the consumer client > > > > Thanks, > > Bo > > > > Haiting Jiang <jianghait...@gmail.com> 于2022年9月8日周四 14:51写道: > > > > > > From the user's perspective, I think we should always avoid delivering > > > repeated messages. > > > But can the broker tell if the reconnection is caused by topic > > > unloading or consumer client process restarting? > > > For the latter case, the message should be redelivered, it's the whole > > > point of user explicit acking. > > > > > > Thanks, > > > Haiting > > > > > > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bog...@apache.org> wrote: > > > > > > > > Hello, Pulsar community: > > > > > > > > > > > > Now the consumer does not filter messages that have already been > > > > consumed. After consumer reconnection, the broker will dispatch > > > > messages to the consumer from the markDeletePosition. In Failover and > > > > Exclusive subscription type, all messages in a topic will be > > > > dispatched to the same consumer. Let's look at the following example: > > > > > > > > ``` > > > > > > > > @Test > > > > > > > > public void testConsumerReconnectionRepeatedConsumeMessage() > > > > throws Exception { > > > > > > > > final String topic = > > > > "testConsumerReconnectionRepeatedConsumeMessage"; > > > > > > > > @Cleanup > > > > > > > > Producer<String> producer = > > > > pulsarClient.newProducer(Schema.STRING) > > > > > > > > .topic(topic).sendTimeout(0, > > > > TimeUnit.SECONDS).enableBatching(false).create(); > > > > > > > > @Cleanup > > > > > > > > Consumer<String> consumer = > > > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive) > > > > > > > > .topic(topic).subscriptionName("test").subscribe(); > > > > > > > > > > > > // send 5 message > > > > > > > > for (int i = 0; i < 5; i++) { > > > > > > > > producer.send("test" + i); > > > > > > > > } > > > > > > > > > > > > // consumer receive 5 messages > > > > > > > > for (int i = 0; i < 5; i++) { > > > > > > > > consumer.receive(); > > > > > > > > } > > > > > > > > > > > > admin.topics().unload(topic); > > > > > > > > > > > > // consumer receive also can receive 5 messages > > > > > > > > Message<String> message = null; > > > > > > > > for (int i = 0; i < 5; i++) { > > > > > > > > message = consumer.receive(); > > > > > > > > } > > > > > > > > consumer.acknowledgeCumulativeAsync(message.getMessageId()); > > > > > > > > } > > > > > > > > ``` > > > > > > > > Through the above example, the consumer repeatedly consumes the 5 > > > > messages sent by the producer, and acks through cumulative ack. If per > > > > 1000, 10000 messages cumulative ack once, there will be a lot of > > > > repeated consumption that may be caused by consumer reconnection. > > > > Although it does not affect the semantics at at-least-once, it will > > > > cause a lot of useless overhead. > > > > > > > > > > > > Most importantly it destroys the semantics of pulsar transactions > > > > exactly-once. > > > > > > > > > > > > I want to discuss whether we should fix normal and transaction > > > > cumulative acks in the same way. Prevent repeated consumption of > > > > messages due to consumer reconnection, and filter messages that users > > > > have received through `consumer.receive()`. Or do we only guarantee > > > > excetly-once semantics, only guarantee use transaction will not > > > > receive the same messages by cumulative ack with the transaction? > > > > > > > > > > > > Please leave your opinion, thanks! :) > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Bo