Hi Haiting, Michael , I tested an example, and there may be some problems when using the sub reset cursor and cumulative ack at the same time. e.g
``` Message<byte[]> message = null; for (int i = 0; i < 3; i++) { message = consumer.receive(); } admin.topics().resetCursor(topic, subName, MessageId.earliest); consumer.acknowledgeCumulative(message.getMessageId()); admin.topics().unload(topic); consumer.receive(); ``` In this case, the consumer can not receive any message again in the end. We have to fix it because it causes a loss of messages. I think we need to redefine the use of cumulative acks and not just solve the problem of receiving messages repeatedly. Thanks! Bo Michael Marshall <mmarsh...@apache.org> 于2022年9月9日周五 12:05写道: > > I agree that reducing unnecessary duplicates is a good goal. > > For the topic unloading case, it might help to think about how we can > improve the protocol. The current handover is very blunt. A new > solution could focus on decreasing duplicate message delivery while > also focusing on decreasing time where the topic is unavailable. > > This thread is probably a good requirement when thinking about ways to > improve the topic handover logic, which has been discussed as a > potential load balancing enhancement or a 3.0 enhancement. > > > So we should be careful of any operations that would reset the cursor. > > For example, if the user resets the cursor with the admin client. We > > need more detail info on this matter. > > This is a great point! In this case, it seems important to redeliver > these messages. (For those unfamiliar, I just confirmed that the > broker disconnects consumers when a cursor is reset.) > > Thanks, > Michael > > On Thu, Sep 8, 2022 at 9:33 PM Haiting Jiang <jianghait...@gmail.com> wrote: > > > > Hi Bo, > > > > Overall it makes sense to me. > > It is basically the same as broker side deduplication mechanism when > > producing messages, which uses `sequenceId`. > > In your case, messageId is used for deduplication. It should work as > > long as the received messageId increases monotonically. > > > > So we should be careful of any operations that would reset the cursor. > > For example, if the user resets the cursor with the admin client. We > > need more detail info on this matter. > > And I am not sure if there are other operations that would reset the > > cursor implicitly. > > > > Thanks, > > Haiting > > > > On Thu, Sep 8, 2022 at 11:36 PM 丛搏 <congbobo...@gmail.com> wrote: > > > > > > Hi Haiting > > > > > > When using cumulative ack, we can save the maximum received MessageId > > > on the consumer client side to filter the message duplication caused > > > by reconnection, if the consumer client process restarts the maximum > > > received MessageId will not exist in the consumer client. This > > > requires the user to be responsible for the received messages, and if > > > the user wants to re-consume the received messages, they need to call > > > `void redeliverUnacknowledgedMessages().` then clear the maximum > > > received MessageId from the consumer client > > > > > > Thanks, > > > Bo > > > > > > Haiting Jiang <jianghait...@gmail.com> 于2022年9月8日周四 14:51写道: > > > > > > > > From the user's perspective, I think we should always avoid delivering > > > > repeated messages. > > > > But can the broker tell if the reconnection is caused by topic > > > > unloading or consumer client process restarting? > > > > For the latter case, the message should be redelivered, it's the whole > > > > point of user explicit acking. > > > > > > > > Thanks, > > > > Haiting > > > > > > > > On Thu, Sep 8, 2022 at 10:56 AM 丛搏 <bog...@apache.org> wrote: > > > > > > > > > > Hello, Pulsar community: > > > > > > > > > > > > > > > Now the consumer does not filter messages that have already been > > > > > consumed. After consumer reconnection, the broker will dispatch > > > > > messages to the consumer from the markDeletePosition. In Failover and > > > > > Exclusive subscription type, all messages in a topic will be > > > > > dispatched to the same consumer. Let's look at the following example: > > > > > > > > > > ``` > > > > > > > > > > @Test > > > > > > > > > > public void testConsumerReconnectionRepeatedConsumeMessage() > > > > > throws Exception { > > > > > > > > > > final String topic = > > > > > "testConsumerReconnectionRepeatedConsumeMessage"; > > > > > > > > > > @Cleanup > > > > > > > > > > Producer<String> producer = > > > > > pulsarClient.newProducer(Schema.STRING) > > > > > > > > > > .topic(topic).sendTimeout(0, > > > > > TimeUnit.SECONDS).enableBatching(false).create(); > > > > > > > > > > @Cleanup > > > > > > > > > > Consumer<String> consumer = > > > > > pulsarClient.newConsumer(Schema.STRING).subscriptionType(SubscriptionType.Exclusive) > > > > > > > > > > .topic(topic).subscriptionName("test").subscribe(); > > > > > > > > > > > > > > > // send 5 message > > > > > > > > > > for (int i = 0; i < 5; i++) { > > > > > > > > > > producer.send("test" + i); > > > > > > > > > > } > > > > > > > > > > > > > > > // consumer receive 5 messages > > > > > > > > > > for (int i = 0; i < 5; i++) { > > > > > > > > > > consumer.receive(); > > > > > > > > > > } > > > > > > > > > > > > > > > admin.topics().unload(topic); > > > > > > > > > > > > > > > // consumer receive also can receive 5 messages > > > > > > > > > > Message<String> message = null; > > > > > > > > > > for (int i = 0; i < 5; i++) { > > > > > > > > > > message = consumer.receive(); > > > > > > > > > > } > > > > > > > > > > consumer.acknowledgeCumulativeAsync(message.getMessageId()); > > > > > > > > > > } > > > > > > > > > > ``` > > > > > > > > > > Through the above example, the consumer repeatedly consumes the 5 > > > > > messages sent by the producer, and acks through cumulative ack. If per > > > > > 1000, 10000 messages cumulative ack once, there will be a lot of > > > > > repeated consumption that may be caused by consumer reconnection. > > > > > Although it does not affect the semantics at at-least-once, it will > > > > > cause a lot of useless overhead. > > > > > > > > > > > > > > > Most importantly it destroys the semantics of pulsar transactions > > > > > exactly-once. > > > > > > > > > > > > > > > I want to discuss whether we should fix normal and transaction > > > > > cumulative acks in the same way. Prevent repeated consumption of > > > > > messages due to consumer reconnection, and filter messages that users > > > > > have received through `consumer.receive()`. Or do we only guarantee > > > > > excetly-once semantics, only guarantee use transaction will not > > > > > receive the same messages by cumulative ack with the transaction? > > > > > > > > > > > > > > > Please leave your opinion, thanks! :) > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Bo