Hi, Congbo:

> In the above two solutions, it can keep messages in order. but in the
> first solution, we don't know how many messages the user process and
> then cumulative once. If the message numbers are 10000000, maybe the
> user can't store the message in the memory for reprocessing. so users
> need a method to redeliver these messages.


I agree to add the rewind interface.


> Failover also can be individual ack, so we can't disable
> `reconumserLate`r and `negativeAcknowledge`.

In the Failover subscription model, What is the individual ack scenario?

Alternatively, can it be understood that when a user wants to process messages 
in an orderly manner, he cannot call the ‘reconumserLater` and 
'negativeAcknowledge' methods?


> 2022年12月23日 11:30,丛搏 <bog...@apache.org> 写道:
> 
> Hi, Asaf, Baodi:
> 
> I'm very sorry for my late reply. Thanks for your discussion.
> 
>> - receive() - get the following message
>>  - cumulativeAck(msgId) - acknowledge all messages up to msgId.
>>    - Maybe we can try to come up with a self-explanatory name like
>>    ackAllUpTo(msgId).
> 
> If the user wants the messages in order, the `receive ()` and
> `cumulativeAck ()` must be in a single thread. Otherwise, the
> `cumulativeAck` will lose its meaning.
> 
> If users use cumulative ack code like:
> ```
> while (true) {
>    Message<String> message = consumer.receive();
>    process(message);
>    consumer.acknowledgeCumulative(message.getMessageId());
> }
> ```
> I think it is not a good way for users to use `acknowledgeCumulative
> `. because one message doesn't need `cumulativeAck`, it's meaningless.
> They use `acknowledgeCumulative ` should like code:
> ```
> while (true) {
>    Messages<String> messages = consumer.batchReceive();
>    process(messages);
>    consumer.acknowledgeCumulative(messages);
> }
> ```
> then we should think about when `process(messages);` throw any
> exception, the user how to reprocess this message.
> 
> 1. one case is the user reprocess these messages, the
> `process(messages)` code like:
> ```
> private void process(Messages<String> messages) {
>    try {
>        // so something
>    } catch (Exception e) {
>        process(messages);
>    }
> };
> ```
> in this way, the consumer doesn't need to do anything
> 
> 2. pulsar rewind the cursor, and redeliver these messages
> 
> ```
>        while (true) {
>            Messages<String> messages = consumer.batchReceive();
>            try {
>                process(messages);
>            } catch (Exception e) {
> 
>                consumer.rewind(); // this method can redeliver the
> messages, whatever the name of this method. before this method
> succeeds, the consumer can't invoke consumer.batchReceive() again.
>                 continue;
>            }
>            consumer.acknowledgeCumulative(messages);
>        }
> ```
> int this way, the consumer needs a method that can redeliver these
> messages, `redeliverUnacknowledgedMessages` is an async method that
> can't guarantee the messages are in order. so we need a new method,
> and it is a sync method.
> <<<<<<<<<<<<<
> 
> In the above two solutions, it can keep messages in order. but in the
> first solution, we don't know how many messages the user process and
> then cumulative once. If the message numbers are 10000000, maybe the
> user can't store the message in the memory for reprocessing. so users
> need a method to redeliver these messages.
> 
> < I think we should disable nack under Exclusive/Failover subscription.
> 
> Failover also can be individual ack, so we can't disable
> `reconumserLate`r and `negativeAcknowledge`.
> 
> Thanks,
> Bo
> 
> Asaf Mesika <asaf.mes...@gmail.com> 于2022年12月18日周日 18:36写道:
>> 
>> Hi Baodi,
>> 
>> Yes, in effect, I suggest that we have new Consumer interfaces, one per
>> subscription type perhaps then we can “correct” the current interface
>> without breaking backward compatibility.
>> 
>> For Exclusive/Failover, since the idea in those subscription types was to
>> maintain order, it makes sense we would offer the following:
>> 
>> 
>>   - receive() - get the following message
>>   - cumulativeAck(msgId) - acknowledge all messages up to msgId.
>>      - Maybe we can try to come up with a self-explanatory name like
>>      ackAllUpTo(msgId).
>> 
>> 
>> 
>> Like you I’m interested in knowing what the experienced folks in the
>> community think about this.
>> 
>> 
>> On 30 Nov 2022 at 4:43:22, Baodi Shi <baodi....@icloud.com.invalid> wrote:
>> 
>>> Hi, Asaf:
>>> 
>>> Thank you for the comprehensive summary.
>>> 
>>> So in effect, what you would have expected here is that nack(4) in
>>> 
>>> exclusive/shared will happen immediately - clear queue, write redeliver
>>> 
>>> command to broker async and return immediately, hence next receive() will
>>> 
>>> block until messages have been received.
>>> 
>>> 
>>> In this way, the nack interface in Exclusive/Failover subscrptionn is also
>>> doesn’t make sense.
>>> 
>>> For 1, 2, 3, 4, 5.
>>> If message 3 processing fails, the application can choose to wait for a
>>> period of time to process or directly ack this message (skip this message).
>>> 
>>> I think we should disable nack under Exclusive/Failover subscription.
>>> 
>>> 4. reconsumeLater(msg): ack existing message and write it to the same topic
>>> 
>>> or a different one. This is an explicit out-of-order consumption, but it
>>> 
>>> can be clearly stated in docs.
>>> 
>>> 
>>> Same as above, we should also disable it in Exclusive/Failover
>>> subscription.
>>> 
>>> I think we should have a different consumer interface holding those
>>> 
>>> commands above.
>>> 
>>> 
>>> It's a transformative idea. I'd like +1. See what other contributors think.
>>> 
>>> 
>>> 2022年11月30日 00:19,Asaf Mesika <asaf.mes...@gmail.com> 写道:
>>> 
>>> 
>>> Ok, I'll try to summarize what I read here to make sure we're all on the
>>> 
>>> same page :)
>>> 
>>> 
>>> Exclusive and Failover subscription types are subscriptions that guarantee
>>> 
>>> two things:
>>> 
>>> 1. Single active consumer per topic (partition).
>>> 
>>> 2. Message processing in the order they were written to the
>>> 
>>> topic (partition).
>>> 
>>> 
>>> (1) is guaranteed by the broker by allowing only a single consumer per
>>> 
>>> topic.
>>> 
>>> (2) is guaranteed by the broker. Since we only have a single consumer, the
>>> 
>>> only thing for the broker to take care of is delivery to messages precisely
>>> 
>>> in the same order they received.
>>> 
>>> Normal dispatching dispatches messages in the order written to the topic.
>>> 
>>> When the consumer calls redeliverUnacknowledgedMessages(), it clears the
>>> 
>>> incoming queue, and the broker rewinds the cursor to the mark delete
>>> 
>>> position, disregarding any individual acks done after the mark delete. So
>>> 
>>> messages are always delivered without any gaps.
>>> 
>>> 
>>> Since the queue is empty, the next receive() call will block until the
>>> 
>>> broker redelivers the messages and fills the consumer's internal queue.
>>> 
>>> 
>>> The problem not raised in this discussion thread is the client
>>> 
>>> implementation of negativeAcknowledgment().
>>> 
>>> Negative Acknowledgment in today's implementation
>>> 
>>> 
>>> Adds the negatively-acked message into the NegativeAckTracker, and sets a
>>> 
>>> timer, if not already present, to send all pending acks in X seconds. Once
>>> 
>>> that time is up, it sees that negative ack belongs on an Exclusive/Failover
>>> 
>>> subscription type and hence translates that into
>>> 
>>> redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and
>>> 
>>> asks for messages to be redelivered. Since adding to NegativeAckTracker is
>>> 
>>> an immediate action (add a message to the queue and return), it just
>>> 
>>> returns. If you receive() 1,2,3, call nack(4) and then receive() and get
>>> 
>>> 4,5,6,7,... After X seconds pass, your next receive suddenly gives you
>>> 
>>> 4,5,6 again.
>>> 
>>> 
>>> So in effect, what you would have expected here is that nack(4) in
>>> 
>>> exclusive/shared will happen immediately - clear queue, write redeliver
>>> 
>>> command to broker async and return immediately, hence next receive() will
>>> 
>>> block until messages have been received.
>>> 
>>> 
>>> 
>>> I do side with the suggestion to change the API for exclusive / shared to
>>> 
>>> be more clear.
>>> 
>>> In those types of subscriptions, it seems that the only actions you are
>>> 
>>> supposed to do are:
>>> 
>>> 
>>> 1. receive(): get the next message.
>>> 
>>> 2. cumulativeAck(msg): acknowledge all messages up to msg have been
>>> 
>>> successfully processed.
>>> 
>>> 3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the
>>> 
>>> broker to resend messages from the last mark delete position.
>>> 
>>> 
>>> There is one additional action in which you explicitly push the messages to
>>> 
>>> a different topic or even the same topic, and that is:
>>> 
>>> 4. reconsumeLater(msg): ack existing message and write it to the same topic
>>> 
>>> or a different one. This is an explicit out-of-order consumption, but it
>>> 
>>> can be clearly stated in docs.
>>> 
>>> 
>>> I think we should have a different consumer interface holding those
>>> 
>>> commands above.
>>> 
>>> 
>>> 
>>> 
>>> On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <congbobo...@gmail.com> wrote:
>>> 
>>> 
>>>> Hi, Joe:
>>> 
>>>> 
>>> 
>>>>> This "brokenness" is not clear to me.
>>> 
>>>> https://github.com/apache/pulsar/pull/10478 This PIP solves some
>>> 
>>>> problems of "brokenness",
>>> 
>>>>> The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16
>>> 
>>>> ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break
>>> 
>>>>> the ordering guarantees of Pulsar
>>> 
>>>> If don't use transaction ack, this order is fine. but when we use
>>> 
>>>> transaction ack, in this case, message 9 and message 10 will be
>>> 
>>>> handled twice. Therefore, we need redeliver and receive to be
>>> 
>>>> synchronized to ensure that messages received before redeliver will
>>> 
>>>> not be repeated and ordered, and will not be repeatedly consumed after
>>> 
>>>> redeliver. To achieve these goals, we need to redeliver to be a
>>> 
>>>> synchronous method instead of async and need to retry automatically.
>>> 
>>>> 
>>> 
>>> 
>>> 

Reply via email to