Hi, Congbo:
> In the above two solutions, it can keep messages in order. but in the > first solution, we don't know how many messages the user process and > then cumulative once. If the message numbers are 10000000, maybe the > user can't store the message in the memory for reprocessing. so users > need a method to redeliver these messages. I agree to add the rewind interface. > Failover also can be individual ack, so we can't disable > `reconumserLate`r and `negativeAcknowledge`. In the Failover subscription model, What is the individual ack scenario? Alternatively, can it be understood that when a user wants to process messages in an orderly manner, he cannot call the ‘reconumserLater` and 'negativeAcknowledge' methods? > 2022年12月23日 11:30,丛搏 <bog...@apache.org> 写道: > > Hi, Asaf, Baodi: > > I'm very sorry for my late reply. Thanks for your discussion. > >> - receive() - get the following message >> - cumulativeAck(msgId) - acknowledge all messages up to msgId. >> - Maybe we can try to come up with a self-explanatory name like >> ackAllUpTo(msgId). > > If the user wants the messages in order, the `receive ()` and > `cumulativeAck ()` must be in a single thread. Otherwise, the > `cumulativeAck` will lose its meaning. > > If users use cumulative ack code like: > ``` > while (true) { > Message<String> message = consumer.receive(); > process(message); > consumer.acknowledgeCumulative(message.getMessageId()); > } > ``` > I think it is not a good way for users to use `acknowledgeCumulative > `. because one message doesn't need `cumulativeAck`, it's meaningless. > They use `acknowledgeCumulative ` should like code: > ``` > while (true) { > Messages<String> messages = consumer.batchReceive(); > process(messages); > consumer.acknowledgeCumulative(messages); > } > ``` > then we should think about when `process(messages);` throw any > exception, the user how to reprocess this message. > > 1. one case is the user reprocess these messages, the > `process(messages)` code like: > ``` > private void process(Messages<String> messages) { > try { > // so something > } catch (Exception e) { > process(messages); > } > }; > ``` > in this way, the consumer doesn't need to do anything > > 2. pulsar rewind the cursor, and redeliver these messages > > ``` > while (true) { > Messages<String> messages = consumer.batchReceive(); > try { > process(messages); > } catch (Exception e) { > > consumer.rewind(); // this method can redeliver the > messages, whatever the name of this method. before this method > succeeds, the consumer can't invoke consumer.batchReceive() again. > continue; > } > consumer.acknowledgeCumulative(messages); > } > ``` > int this way, the consumer needs a method that can redeliver these > messages, `redeliverUnacknowledgedMessages` is an async method that > can't guarantee the messages are in order. so we need a new method, > and it is a sync method. > <<<<<<<<<<<<< > > In the above two solutions, it can keep messages in order. but in the > first solution, we don't know how many messages the user process and > then cumulative once. If the message numbers are 10000000, maybe the > user can't store the message in the memory for reprocessing. so users > need a method to redeliver these messages. > > < I think we should disable nack under Exclusive/Failover subscription. > > Failover also can be individual ack, so we can't disable > `reconumserLate`r and `negativeAcknowledge`. > > Thanks, > Bo > > Asaf Mesika <asaf.mes...@gmail.com> 于2022年12月18日周日 18:36写道: >> >> Hi Baodi, >> >> Yes, in effect, I suggest that we have new Consumer interfaces, one per >> subscription type perhaps then we can “correct” the current interface >> without breaking backward compatibility. >> >> For Exclusive/Failover, since the idea in those subscription types was to >> maintain order, it makes sense we would offer the following: >> >> >> - receive() - get the following message >> - cumulativeAck(msgId) - acknowledge all messages up to msgId. >> - Maybe we can try to come up with a self-explanatory name like >> ackAllUpTo(msgId). >> >> >> >> Like you I’m interested in knowing what the experienced folks in the >> community think about this. >> >> >> On 30 Nov 2022 at 4:43:22, Baodi Shi <baodi....@icloud.com.invalid> wrote: >> >>> Hi, Asaf: >>> >>> Thank you for the comprehensive summary. >>> >>> So in effect, what you would have expected here is that nack(4) in >>> >>> exclusive/shared will happen immediately - clear queue, write redeliver >>> >>> command to broker async and return immediately, hence next receive() will >>> >>> block until messages have been received. >>> >>> >>> In this way, the nack interface in Exclusive/Failover subscrptionn is also >>> doesn’t make sense. >>> >>> For 1, 2, 3, 4, 5. >>> If message 3 processing fails, the application can choose to wait for a >>> period of time to process or directly ack this message (skip this message). >>> >>> I think we should disable nack under Exclusive/Failover subscription. >>> >>> 4. reconsumeLater(msg): ack existing message and write it to the same topic >>> >>> or a different one. This is an explicit out-of-order consumption, but it >>> >>> can be clearly stated in docs. >>> >>> >>> Same as above, we should also disable it in Exclusive/Failover >>> subscription. >>> >>> I think we should have a different consumer interface holding those >>> >>> commands above. >>> >>> >>> It's a transformative idea. I'd like +1. See what other contributors think. >>> >>> >>> 2022年11月30日 00:19,Asaf Mesika <asaf.mes...@gmail.com> 写道: >>> >>> >>> Ok, I'll try to summarize what I read here to make sure we're all on the >>> >>> same page :) >>> >>> >>> Exclusive and Failover subscription types are subscriptions that guarantee >>> >>> two things: >>> >>> 1. Single active consumer per topic (partition). >>> >>> 2. Message processing in the order they were written to the >>> >>> topic (partition). >>> >>> >>> (1) is guaranteed by the broker by allowing only a single consumer per >>> >>> topic. >>> >>> (2) is guaranteed by the broker. Since we only have a single consumer, the >>> >>> only thing for the broker to take care of is delivery to messages precisely >>> >>> in the same order they received. >>> >>> Normal dispatching dispatches messages in the order written to the topic. >>> >>> When the consumer calls redeliverUnacknowledgedMessages(), it clears the >>> >>> incoming queue, and the broker rewinds the cursor to the mark delete >>> >>> position, disregarding any individual acks done after the mark delete. So >>> >>> messages are always delivered without any gaps. >>> >>> >>> Since the queue is empty, the next receive() call will block until the >>> >>> broker redelivers the messages and fills the consumer's internal queue. >>> >>> >>> The problem not raised in this discussion thread is the client >>> >>> implementation of negativeAcknowledgment(). >>> >>> Negative Acknowledgment in today's implementation >>> >>> >>> Adds the negatively-acked message into the NegativeAckTracker, and sets a >>> >>> timer, if not already present, to send all pending acks in X seconds. Once >>> >>> that time is up, it sees that negative ack belongs on an Exclusive/Failover >>> >>> subscription type and hence translates that into >>> >>> redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and >>> >>> asks for messages to be redelivered. Since adding to NegativeAckTracker is >>> >>> an immediate action (add a message to the queue and return), it just >>> >>> returns. If you receive() 1,2,3, call nack(4) and then receive() and get >>> >>> 4,5,6,7,... After X seconds pass, your next receive suddenly gives you >>> >>> 4,5,6 again. >>> >>> >>> So in effect, what you would have expected here is that nack(4) in >>> >>> exclusive/shared will happen immediately - clear queue, write redeliver >>> >>> command to broker async and return immediately, hence next receive() will >>> >>> block until messages have been received. >>> >>> >>> >>> I do side with the suggestion to change the API for exclusive / shared to >>> >>> be more clear. >>> >>> In those types of subscriptions, it seems that the only actions you are >>> >>> supposed to do are: >>> >>> >>> 1. receive(): get the next message. >>> >>> 2. cumulativeAck(msg): acknowledge all messages up to msg have been >>> >>> successfully processed. >>> >>> 3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the >>> >>> broker to resend messages from the last mark delete position. >>> >>> >>> There is one additional action in which you explicitly push the messages to >>> >>> a different topic or even the same topic, and that is: >>> >>> 4. reconsumeLater(msg): ack existing message and write it to the same topic >>> >>> or a different one. This is an explicit out-of-order consumption, but it >>> >>> can be clearly stated in docs. >>> >>> >>> I think we should have a different consumer interface holding those >>> >>> commands above. >>> >>> >>> >>> >>> On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <congbobo...@gmail.com> wrote: >>> >>> >>>> Hi, Joe: >>> >>>> >>> >>>>> This "brokenness" is not clear to me. >>> >>>> https://github.com/apache/pulsar/pull/10478 This PIP solves some >>> >>>> problems of "brokenness", >>> >>>>> The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16 >>> >>>> ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break >>> >>>>> the ordering guarantees of Pulsar >>> >>>> If don't use transaction ack, this order is fine. but when we use >>> >>>> transaction ack, in this case, message 9 and message 10 will be >>> >>>> handled twice. Therefore, we need redeliver and receive to be >>> >>>> synchronized to ensure that messages received before redeliver will >>> >>>> not be repeated and ordered, and will not be repeatedly consumed after >>> >>>> redeliver. To achieve these goals, we need to redeliver to be a >>> >>>> synchronous method instead of async and need to retry automatically. >>> >>>> >>> >>> >>>