Ok, I'll try to summarize what I read here to make sure we're all on the
same page :)

Exclusive and Failover subscription types are subscriptions that guarantee
two things:
1. Single active consumer per topic (partition).
2. Message processing in the order they were written to the
topic (partition).

(1) is guaranteed by the broker by allowing only a single consumer per
topic.
(2) is guaranteed by the broker. Since we only have a single consumer, the
only thing for the broker to take care of is delivery to messages precisely
in the same order they received.
Normal dispatching dispatches messages in the order written to the topic.
When the consumer calls redeliverUnacknowledgedMessages(), it clears the
incoming queue, and the broker rewinds the cursor to the mark delete
position, disregarding any individual acks done after the mark delete. So
messages are always delivered without any gaps.

Since the queue is empty, the next receive() call will block until the
broker redelivers the messages and fills the consumer's internal queue.

The problem not raised in this discussion thread is the client
implementation of negativeAcknowledgment().
Negative Acknowledgment in today's implementation

Adds the negatively-acked message into the NegativeAckTracker, and sets a
timer, if not already present, to send all pending acks in X seconds. Once
that time is up, it sees that negative ack belongs on an Exclusive/Failover
subscription type and hence translates that into
redeliverUnacknowledgedMessages(). So in X seconds, it clears the queue and
asks for messages to be redelivered. Since adding to NegativeAckTracker is
an immediate action (add a message to the queue and return), it just
returns. If you receive() 1,2,3, call nack(4) and then receive() and get
4,5,6,7,... After X seconds pass, your next receive suddenly gives you
4,5,6 again.

So in effect, what you would have expected here is that nack(4) in
exclusive/shared will happen immediately - clear queue, write redeliver
command to broker async and return immediately, hence next receive() will
block until messages have been received.


I do side with the suggestion to change the API for exclusive / shared to
be more clear.
In those types of subscriptions, it seems that the only actions you are
supposed to do are:

1. receive(): get the next message.
2. cumulativeAck(msg): acknowledge all messages up to msg have been
successfully processed.
3. redeliverUnacknowledgedMessages() - clear the internal queue and ask the
broker to resend messages from the last mark delete position.

There is one additional action in which you explicitly push the messages to
a different topic or even the same topic, and that is:
4. reconsumeLater(msg): ack existing message and write it to the same topic
or a different one. This is an explicit out-of-order consumption, but it
can be clearly stated in docs.

I think we should have a different consumer interface holding those
commands above.



On Thu, Nov 24, 2022 at 1:43 PM 丛搏 <congbobo...@gmail.com> wrote:

> Hi, Joe:
>
> > This "brokenness" is not clear to me.
> https://github.com/apache/pulsar/pull/10478 This PIP solves some
> problems of "brokenness",
> >The sequence 3,4,5,6,7,8,9,10,11 12,13,14,15, 16
> ,9,10,11,12,13,14,15,16,17, 18, 19, 20 ...does not break
> > the ordering guarantees of Pulsar
> If don't use transaction ack, this order is fine. but when we use
> transaction ack, in this case, message 9 and message 10 will be
> handled twice. Therefore, we need redeliver and receive to be
> synchronized to ensure that messages received before redeliver will
> not be repeated and ordered, and will not be repeatedly consumed after
> redeliver. To achieve these goals, we need to redeliver to be a
> synchronous method instead of async and need to retry automatically.
>

Reply via email to