Hi Bo,

> In this case, the consumer can not receive any message again in the
> end. We have to fix it because it causes a loss of messages.
> I think we need to redefine the use of cumulative acks and not just
> solve the problem of receiving messages repeatedly.
>

There seems to be nothing wrong in this case. You already ack the last
message, although, you reset the cursor before.
Currently, we don't have a restriction on the message id user acked.
They don't have to come from a valid message, and they can even be
created directly by the user.

Thanks,
Haiting

On Wed, Sep 14, 2022 at 8:34 PM 丛搏 <congbobo...@gmail.com> wrote:
>
> Hi Haiting, Michael ,
>
> I tested an example, and there may be some problems when using the sub
> reset cursor and cumulative ack at the same time. e.g
>
> ```
>
> Message<byte[]> message = null;
> for (int i = 0; i < 3; i++) {
>     message = consumer.receive();
> }
> admin.topics().resetCursor(topic, subName, MessageId.earliest);
>
> consumer.acknowledgeCumulative(message.getMessageId());
>
> admin.topics().unload(topic);
> consumer.receive();
>
> ```
> In this case, the consumer can not receive any message again in the
> end. We have to fix it because it causes a loss of messages.
> I think we need to redefine the use of cumulative acks and not just
> solve the problem of receiving messages repeatedly.
>
> Thanks!
> Bo
>
> Michael Marshall <mmarsh...@apache.org> 于2022年9月9日周五 12:05写道:
> >
> > I agree that reducing unnecessary duplicates is a good goal.
> >
> > For the topic unloading case, it might help to think about how we can
> > improve the protocol. The current handover is very blunt. A new
> > solution could focus on decreasing duplicate message delivery while
> > also focusing on decreasing time where the topic is unavailable.
> >
> > This thread is probably a good requirement when thinking about ways to
> > improve the topic handover logic, which has been discussed as a
> > potential load balancing enhancement or a 3.0 enhancement.
> >
> > > So we should be careful of any operations that would reset the cursor.
> > > For example, if the user resets the cursor with the admin client. We
> > > need more detail info on this matter.
> >
> > This is a great point! In this case, it seems important to redeliver
> > these messages. (For those unfamiliar, I just confirmed that the
> > broker disconnects consumers when a cursor is reset.)
> >
> > Thanks,
> > Michael
> >
> > On Thu, Sep 8, 2022 at 9:33 PM Haiting Jiang <jianghait...@gmail.com> wrote:
> > >
> > > Hi Bo,
> > >
> > > Overall it makes sense to me.
> > > It is basically the same as broker side deduplication mechanism when
> > > producing messages, which uses `sequenceId`.
> > > In your case, messageId is used for deduplication. It should work as
> > > long as the received messageId increases monotonically.
> > >
> > > So we should be careful of any operations that would reset the cursor.
> > > For example, if the user resets the cursor with the admin client. We
> > > need more detail info on this matter.
> > > And I am not sure if there are other operations that would reset the
> > > cursor implicitly.
> > >
> > > Thanks,
> > > Haiting
> > >
> > > On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <congbobo...@gmail.com> wrote:
> > > >
> > > > Hi Haiting
> > > >
> > > > When using cumulative ack, we can save the maximum received MessageId
> > > > on the consumer client side to filter the message duplication caused
> > > > by reconnection, if the consumer client process restarts the maximum
> > > > received MessageId will not exist in the consumer client. This
> > > > requires the user to be responsible for the received messages, and if
> > > > the user wants to re-consume the received messages, they need to call
> > > > `void redeliverUnacknowledgedMessages().` then clear the maximum
> > > > received MessageId from the consumer client
> > > >
> > > > Thanks,
> > > > Bo
> > > >
> > > > Haiting Jiang <jianghait...@gmail.com> 于2022年9月8日周四 14:51写道:
> > > > >
> > > > > From the user's perspective, I think we should always avoid delivering
> > > > > repeated messages.
> > > > > But can the broker tell if the reconnection is caused by topic
> > > > > unloading or consumer client process restarting?
> > > > > For the latter case, the message should be redelivered, it's the whole
> > > > > point of user explicit acking.
> > > > >
> > > > > Thanks,
> > > > > Haiting
> > > > >
> > > > > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bog...@apache.org> wrote:
> > > > > >
> > > > > > Hello, Pulsar community:
> > > > > >
> > > > > >
> > > > > > Now the consumer does not filter messages that have already been
> > > > > > consumed. After consumer reconnection, the broker will dispatch
> > > > > > messages to the consumer from the markDeletePosition. In Failover 
> > > > > > and
> > > > > > Exclusive subscription type, all messages in a topic will be
> > > > > > dispatched to the same consumer. Let's look at the following 
> > > > > > example:
> > > > > >
> > > > > > ```
> > > > > >
> > > > > >     @Test
> > > > > >
> > > > > >     public void testConsumerReconnectionRepeatedConsumeMessage()
> > > > > > throws Exception {
> > > > > >
> > > > > >         final String topic = 
> > > > > > "testConsumerReconnectionRepeatedConsumeMessage";
> > > > > >
> > > > > >         @Cleanup
> > > > > >
> > > > > >         Producer<String> producer = 
> > > > > > pulsarClient.newProducer(Schema.STRING)
> > > > > >
> > > > > >                 .topic(topic).sendTimeout(0,
> > > > > > TimeUnit.SECONDS).enableBatching(false).create();
> > > > > >
> > > > > >         @Cleanup
> > > > > >
> > > > > >         Consumer<String> consumer =
> > > > > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive)
> > > > > >
> > > > > >                 .topic(topic).subscriptionName("test").subscribe();
> > > > > >
> > > > > >
> > > > > >         // send 5 message
> > > > > >
> > > > > >         for (int i = 0; i < 5; i++) {
> > > > > >
> > > > > >             producer.send("test" + i);
> > > > > >
> > > > > >         }
> > > > > >
> > > > > >
> > > > > >         // consumer receive 5 messages
> > > > > >
> > > > > >         for (int i = 0; i < 5; i++) {
> > > > > >
> > > > > >             consumer.receive();
> > > > > >
> > > > > >         }
> > > > > >
> > > > > >
> > > > > >         admin.topics().unload(topic);
> > > > > >
> > > > > >
> > > > > >         // consumer receive also can receive 5 messages
> > > > > >
> > > > > >         Message<String> message = null;
> > > > > >
> > > > > >         for (int i = 0; i < 5; i++) {
> > > > > >
> > > > > >             message = consumer.receive();
> > > > > >
> > > > > >         }
> > > > > >
> > > > > >         consumer.acknowledgeCumulativeAsync(message.getMessageId());
> > > > > >
> > > > > >     }
> > > > > >
> > > > > > ```
> > > > > >
> > > > > > Through the above example, the consumer repeatedly consumes the 5
> > > > > > messages sent by the producer, and acks through cumulative ack. If 
> > > > > > per
> > > > > > 1000, 10000 messages cumulative ack once, there will be a lot of
> > > > > > repeated consumption that may be caused by consumer reconnection.
> > > > > > Although it does not affect the semantics at at-least-once, it will
> > > > > > cause a lot of useless overhead.
> > > > > >
> > > > > >
> > > > > > Most importantly it destroys the semantics of pulsar transactions 
> > > > > > exactly-once.
> > > > > >
> > > > > >
> > > > > > I want to discuss whether we should fix normal and transaction
> > > > > > cumulative acks in the same way. Prevent repeated consumption of
> > > > > > messages due to consumer reconnection, and filter messages that 
> > > > > > users
> > > > > > have received through `consumer.receive()`. Or do we only guarantee
> > > > > > excetly-once semantics, only guarantee use transaction will not
> > > > > > receive the same messages by cumulative ack with the transaction?
> > > > > >
> > > > > >
> > > > > > Please leave your opinion, thanks! :)
> > > > > >
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bo

Reply via email to