Hello, Pulsar community:

Now the consumer does not filter messages that have already been
consumed. After consumer reconnection, the broker will dispatch
messages to the consumer from the markDeletePosition. In Failover and
Exclusive subscription type, all messages in a topic will be
dispatched to the same consumer. Let's look at the following example:

```

    @Test

    public void testConsumerReconnectionRepeatedConsumeMessage()
throws Exception {

        final String topic = "testConsumerReconnectionRepeatedConsumeMessage";

        @Cleanup

        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)

                .topic(topic).sendTimeout(0,
TimeUnit.SECONDS).enableBatching(false).create();

        @Cleanup

        Consumer<String> consumer =
pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)

                .topic(topic).subscriptionName("test").subscribe();


        // send 5 message

        for (int i = 0; i < 5; i++) {

            producer.send("test" + i);

        }


        // consumer receive 5 messages

        for (int i = 0; i < 5; i++) {

            consumer.receive();

        }


        admin.topics().unload(topic);


        // consumer receive also can receive 5 messages

        Message<String> message = null;

        for (int i = 0; i < 5; i++) {

            message = consumer.receive();

        }

        consumer.acknowledgeCumulativeAsync(message.getMessageId());

    }

```

Through the above example, the consumer repeatedly consumes the 5
messages sent by the producer, and acks through cumulative ack. If per
1000, 10000 messages cumulative ack once, there will be a lot of
repeated consumption that may be caused by consumer reconnection.
Although it does not affect the semantics at at-least-once, it will
cause a lot of useless overhead.


Most importantly it destroys the semantics of pulsar transactions exactly-once.


I want to discuss whether we should fix normal and transaction
cumulative acks in the same way. Prevent repeated consumption of
messages due to consumer reconnection, and filter messages that users
have received through `consumer.receive()`. Or do we only guarantee
excetly-once semantics, only guarantee use transaction will not
receive the same messages by cumulative ack with the transaction?


Please leave your opinion, thanks! :)



Thanks,

Bo

Reply via email to