Hi Baodi, Yes, in effect, I suggest that we have new Consumer interfaces, one per subscription type perhaps then we can “correct” the current interface without breaking backward compatibility.
For Exclusive/Failover, since the idea in those subscription types was to maintain order, it makes sense we would offer the following: - receive() - get the following message - cumulativeAck(msgId) - acknowledge all messages up to msgId. - Maybe we can try to come up with a self-explanatory name like ackAllUpTo(msgId). Like you I’m interested in knowing what the experienced folks in the community think about this. On 30 Nov 2022 at 4:43:22, Baodi Shi <baodi....@icloud.com.invalid> wrote: > Hi, Asaf: > > Thank you for the comprehensive summary. > > So in effect, what you would have expected here is that nack(4) in > > exclusive/shared will happen immediately - clear queue, write redeliver > > command to broker async and return immediately, hence next receive() will > > block until messages have been received. > > > In this way, the nack interface in Exclusive/Failover subscrptionn is also > doesn’t make sense. > > For 1, 2, 3, 4, 5. > If message 3 processing fails, the application can choose to wait for a > period of time to process or directly ack this message (skip this message). > > I think we should disable nack under Exclusive/Failover subscription. > > 4. reconsumeLater(msg): ack existing message and write it to the same topic > > or a different one. This is an explicit out-of-order consumption, but it > > can be clearly stated in docs. > > > Same as above, we should also disable it in Exclusive/Failover > subscription. > > I think we should have a different consumer interface holding those > > commands above. > > > It's a transformative idea. I'd like +1. See what other contributors think. > > > 2022年11月30日 00:19,Asaf Mesika <asaf.mes...@gmail.com> 写道: > > > Ok, I'll try to summarize what I read here to make sure we're all on the > > same page :) > > > Exclusive and Failover subscription types are subscriptions that guarantee > > two things: > > 1. Single active consumer per topic (partition). > > 2. Message processing in the order they were written to the > > topic (partition). > > > (1) is guaranteed by the broker by allowing only a single consumer per > > topic. > > (2) is guaranteed by the broker. Since we only have a single consumer, the > > only thing for the broker to take care of is delivery to messages precisely > > in the same order they received. > > Normal dispatching dispatches messages in the order written to the topic. > > When the consumer calls redeliverUnacknowledgedMessages(), it clears the > > incoming queue, and the broker rewinds the cursor to the mark delete > > position, disregarding any individual acks done after the mark delete. So > > messages are always delivered without any gaps. > > > Since the queue is empty, the next receive() call will block until the > > broker redelivers the messages and fills the consumer's internal queue. > > > The problem not raised in this discussion thread is the client > > implementation of negativeAcknowledgment(). > > Negative Acknowledgment in today's implementation > > > Adds the negatively-acked message into the NegativeAckTracker, and sets a > > timer, if not already present, to send all pending acks in X seconds. Once > > that time is up, it sees that negative ack belongs on an Exclusive/Failover > > subscription type and hence translates that into > > redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and > > asks for messages to be redelivered. Since adding to NegativeAckTracker is > > an immediate action (add a message to the queue and return), it just > > returns. If you receive() 1,2,3, call nack(4) and then receive() and get > > 4,5,6,7,... After X seconds pass, your next receive suddenly gives you > > 4,5,6 again. > > > So in effect, what you would have expected here is that nack(4) in > > exclusive/shared will happen immediately - clear queue, write redeliver > > command to broker async and return immediately, hence next receive() will > > block until messages have been received. > > > > I do side with the suggestion to change the API for exclusive / shared to > > be more clear. > > In those types of subscriptions, it seems that the only actions you are > > supposed to do are: > > > 1. receive(): get the next message. > > 2. cumulativeAck(msg): acknowledge all messages up to msg have been > > successfully processed. > > 3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the > > broker to resend messages from the last mark delete position. > > > There is one additional action in which you explicitly push the messages to > > a different topic or even the same topic, and that is: > > 4. reconsumeLater(msg): ack existing message and write it to the same topic > > or a different one. This is an explicit out-of-order consumption, but it > > can be clearly stated in docs. > > > I think we should have a different consumer interface holding those > > commands above. > > > > > On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <congbobo...@gmail.com> wrote: > > > > Hi, Joe: > > > > > >> This "brokenness" is not clear to me. > > > https://github.com/apache/pulsar/pull/10478 This PIP solves some > > > problems of "brokenness", > > >> The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16 > > > ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break > > >> the ordering guarantees of Pulsar > > > If don't use transaction ack, this order is fine. but when we use > > > transaction ack, in this case, message 9 and message 10 will be > > > handled twice. Therefore, we need redeliver and receive to be > > > synchronized to ensure that messages received before redeliver will > > > not be repeated and ordered, and will not be repeatedly consumed after > > > redeliver. To achieve these goals, we need to redeliver to be a > > > synchronous method instead of async and need to retry automatically. > > > > > >