Hi, Bo
I totally agree with this approach.
Suppose we now implement deduplication for common messages on the client
side. In that case, there is no need to add other logic that may cause API
break-changes to guarantee transaction exactly-once semantics.
Yours,
Xiangying

On Fri, Sep 9, 2022 at 10:17 PM 丛搏 <congbobo...@gmail.com> wrote:

> Hi all,
>
> you can see the problem in a google doc and comments.
>
> google doc link:
>
> https://docs.google.com/document/d/1J1xGcj8YORrdlCa_XDt28uV0TMp03gSmX_z43ZRhwZo/edit?usp=sharing
>
> Thanks!
> Bo
>
> 丛搏 <bog...@apache.org> 于2022年9月8日周四 10:55写道:
> >
> > Hello, Pulsar community:
> >
> >
> > Now the consumer does not filter messages that have already been
> > consumed. After consumer reconnection, the broker will dispatch
> > messages to the consumer from the markDeletePosition. In Failover and
> > Exclusive subscription type, all messages in a topic will be
> > dispatched to the same consumer. Let's look at the following example:
> >
> > ```
> >
> >     @Test
> >
> >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > throws Exception {
> >
> >         final String topic =
> "testConsumerReconnectionRepeatedConsumeMessage";
> >
> >         @Cleanup
> >
> >         Producer<String> producer =
> pulsarClient.newProducer(Schema.STRING)
> >
> >                 .topic(topic).sendTimeout(0,
> > TimeUnit.SECONDS).enableBatching(false).create();
> >
> >         @Cleanup
> >
> >         Consumer<String> consumer =
> >
> pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> >
> >                 .topic(topic).subscriptionName("test").subscribe();
> >
> >
> >         // send 5 message
> >
> >         for (int i = 0; i < 5; i++) {
> >
> >             producer.send("test" + i);
> >
> >         }
> >
> >
> >         // consumer receive 5 messages
> >
> >         for (int i = 0; i < 5; i++) {
> >
> >             consumer.receive();
> >
> >         }
> >
> >
> >         admin.topics().unload(topic);
> >
> >
> >         // consumer receive also can receive 5 messages
> >
> >         Message<String> message = null;
> >
> >         for (int i = 0; i < 5; i++) {
> >
> >             message = consumer.receive();
> >
> >         }
> >
> >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> >
> >     }
> >
> > ```
> >
> > Through the above example, the consumer repeatedly consumes the 5
> > messages sent by the producer, and acks through cumulative ack. If per
> > 1000, 10000 messages cumulative ack once, there will be a lot of
> > repeated consumption that may be caused by consumer reconnection.
> > Although it does not affect the semantics at at-least-once, it will
> > cause a lot of useless overhead.
> >
> >
> > Most importantly it destroys the semantics of pulsar transactions
> exactly-once.
> >
> >
> > I want to discuss whether we should fix normal and transaction
> > cumulative acks in the same way. Prevent repeated consumption of
> > messages due to consumer reconnection, and filter messages that users
> > have received through `consumer.receive()`. Or do we only guarantee
> > excetly-once semantics, only guarantee use transaction will not
> > receive the same messages by cumulative ack with the transaction?
> >
> >
> > Please leave your opinion, thanks! :)
> >
> >
> >
> > Thanks,
> >
> > Bo
>

Reply via email to