Hello, Pulsar community:
Now the consumer does not filter messages that have already been consumed. After consumer reconnection, the broker will dispatch messages to the consumer from the markDeletePosition. In Failover and Exclusive subscription type, all messages in a topic will be dispatched to the same consumer. Let's look at the following example: ``` @Test public void testConsumerReconnectionRepeatedConsumeMessage() throws Exception { final String topic = "testConsumerReconnectionRepeatedConsumeMessage"; @Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING) .topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create(); @Cleanup Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive) .topic(topic).subscriptionName("test").subscribe(); // send 5 message for (int i = 0; i < 5; i++) { producer.send("test" + i); } // consumer receive 5 messages for (int i = 0; i < 5; i++) { consumer.receive(); } admin.topics().unload(topic); // consumer receive also can receive 5 messages Message<String> message = null; for (int i = 0; i < 5; i++) { message = consumer.receive(); } consumer.acknowledgeCumulativeAsync(message.getMessageId()); } ``` Through the above example, the consumer repeatedly consumes the 5 messages sent by the producer, and acks through cumulative ack. If per 1000, 10000 messages cumulative ack once, there will be a lot of repeated consumption that may be caused by consumer reconnection. Although it does not affect the semantics at at-least-once, it will cause a lot of useless overhead. Most importantly it destroys the semantics of pulsar transactions exactly-once. I want to discuss whether we should fix normal and transaction cumulative acks in the same way. Prevent repeated consumption of messages due to consumer reconnection, and filter messages that users have received through `consumer.receive()`. Or do we only guarantee excetly-once semantics, only guarantee use transaction will not receive the same messages by cumulative ack with the transaction? Please leave your opinion, thanks! :) Thanks, Bo