Ok, I'll try to summarize what I read here to make sure we're all on the same page :)
Exclusive and Failover subscription types are subscriptions that guarantee two things: 1. Single active consumer per topic (partition). 2. Message processing in the order they were written to the topic (partition). (1) is guaranteed by the broker by allowing only a single consumer per topic. (2) is guaranteed by the broker. Since we only have a single consumer, the only thing for the broker to take care of is delivery to messages precisely in the same order they received. Normal dispatching dispatches messages in the order written to the topic. When the consumer calls redeliverUnacknowledgedMessages(), it clears the incoming queue, and the broker rewinds the cursor to the mark delete position, disregarding any individual acks done after the mark delete. So messages are always delivered without any gaps. Since the queue is empty, the next receive() call will block until the broker redelivers the messages and fills the consumer's internal queue. The problem not raised in this discussion thread is the client implementation of negativeAcknowledgment(). Negative Acknowledgment in today's implementation Adds the negatively-acked message into the NegativeAckTracker, and sets a timer, if not already present, to send all pending acks in X seconds. Once that time is up, it sees that negative ack belongs on an Exclusive/Failover subscription type and hence translates that into redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and asks for messages to be redelivered. Since adding to NegativeAckTracker is an immediate action (add a message to the queue and return), it just returns. If you receive() 1,2,3, call nack(4) and then receive() and get 4,5,6,7,... After X seconds pass, your next receive suddenly gives you 4,5,6 again. So in effect, what you would have expected here is that nack(4) in exclusive/shared will happen immediately - clear queue, write redeliver command to broker async and return immediately, hence next receive() will block until messages have been received. I do side with the suggestion to change the API for exclusive / shared to be more clear. In those types of subscriptions, it seems that the only actions you are supposed to do are: 1. receive(): get the next message. 2. cumulativeAck(msg): acknowledge all messages up to msg have been successfully processed. 3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the broker to resend messages from the last mark delete position. There is one additional action in which you explicitly push the messages to a different topic or even the same topic, and that is: 4. reconsumeLater(msg): ack existing message and write it to the same topic or a different one. This is an explicit out-of-order consumption, but it can be clearly stated in docs. I think we should have a different consumer interface holding those commands above. On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <congbobo...@gmail.com> wrote: > Hi, Joe: > > > This "brokenness" is not clear to me. > https://github.com/apache/pulsar/pull/10478 This PIP solves some > problems of "brokenness", > >The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16 > ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break > > the ordering guarantees of Pulsar > If don't use transaction ack, this order is fine. but when we use > transaction ack, in this case, message 9 and message 10 will be > handled twice. Therefore, we need redeliver and receive to be > synchronized to ensure that messages received before redeliver will > not be repeated and ordered, and will not be repeatedly consumed after > redeliver. To achieve these goals, we need to redeliver to be a > synchronous method instead of async and need to retry automatically. >