Thanks for the explanation Ben. They are very helpful.

Just to clarify on the context here:

1) Before Kafka 2.0 the poll(long) call make sure that the rebalance would
be completed when the call returns, no matter how long it takes. Note this
also includes the time for refreshing the metadata for the newly assigned
partitions potentially. There are many user feedbacks that this long
polling do not obey the passed in "long" timeout at all since it has to
block until the rebalance + metadata refresh is completed.

2) So in Kafka 2.0 we introduced poll(Duration) which would practically be
more strict in respecting the passed in timeout. It means, it could return
while we are still in the middle of a rebalance. At the same time we
deprecated the old overloaded function but we did not delete it, for users
who do want blocking behavior and do not care about the potential long
polling.

3) Because of 2) above, before Kafka 2.5, when you call commit in the
middle of a rebalance, it would throw a CommitFailedException. This is a
fatal exception indicating that the partitions you're trying to commit is
no longer assigned to you. However, in case of 2), it is not really a fatal
error since we may still get the partitions back after we complete the
rebalance. Hence throwing CommitFailed is "over-exaggerating" the situation
to users.

4) That's why in 2.5, we changed the code to let it throw a different
RebalanceInProgress exception instead of CommitFailed, to indicate users
that you can, if possible, not treat it as a fatal error. *NOTE* it is
usually suggested to trigger the commit during onPartitionRevoked callback
during which the rebalance is not considered "started" yet and hence it is
safe to commit, and hence in between the two callbacks, when there are no
records returned at all, you would not need to call `commit` from the
consumer directly anyways. Also *NOTE* that in 2.5 we added another
onPartitionsLost function which is triggered differently than
onPartitionRevoked: in the latter we know that we've already lost those
partitions and hence users should not try to commit any more.


So the question is, did you call `commitSync` within the
onPartitionsRevoked callback which gets the exception, or did you call
`commitSync` after the poll call that gets the exception? If it is the
latter, then maybe you should override onPartitionsLost which would by
default just call onPartitionsRevoked; if it is the latter, then it is
possible that you've processed new records in the middle of a rebalance
(otherwise there's no new offsets to be committed since you've just
committed all offsets in the callback).


Guozhang



On Tue, Apr 28, 2020 at 1:34 AM Benoit Delbosc <bdelb...@nuxeo.com> wrote:

> Hi Guozhang,
> thanks for your reply
>
> On 27.04.20 19:17, Guozhang Wang wrote:
> > Hello Ben,
> >
> > First of all, just to clarify your versioning should be (4.5.0 -> 2.5.0)
> > and (4.3.1 -> 2.3.1) etc, right? Currently Apache Kafka's latest release
> is
> > 2.5.0.
> sorry, you are right I meant 2.3.1 and I upgraded to 2.5.0
>
>
> > The RebalanceInProgressException is only introduced in the most recent
> > releases (2.5.0+), previously it is only used internally and would not be
> > exposed via public APIs. It is for the case that when the consumer is
> > undergoing a rebalance, it can still return some data and hence the
> > consumer may still want to commit. Note that before this version consumer
> > would NOT return any data during rebalance.
>
> I think there is another case where a consumer can receive this
> exception see below
>
> > In addition we also use this
> > exception instead of the fatal CommitFailedException when we know that
> the
> > error is not actually "fatal"in my case I got this exception without
> getting more data see below
>
> > In your case, if the application semantics allows you to abort
> > checkpointing, then you can capture the exception, calling poll() again
> and
> > process the returned records which would not return duplicated data, and
> > re-try checkpoint and commit (at a later position, since more records are
> > processed now).
>
> Aborting a checkpoint means creating a duplicate in my application (let
> say it happens after a DB transaction commit). Once again having
> duplicate under failure cases is acceptable but not under normal usage
> where consumers respect their contract (poll interval and session timeout).
>
> So I prefer to avoid receiving RebalanceInProgressException.
>
> In my case, the problem is that the consumer poll times out just between
> onPartitionsRevoked and onPartitionAssigned invocations. Because my
> consumer code is taking rebalancing in account only during
> onPartitionAssigned it tries to commit while being revoked which
> produces the RebalanceInProgressException.
>
> The fix is to take rebalancing into account in the consumer code also
> during the onPArtitionsRevoked listener.
>
> Also, I think this problem was not present before Kafka 2.0 where
> poll(Duration) was introduced, AFAIU poll(long) didn't timeout during
> rebalancing ensuring that onPartitionAssigned is always invoked.
>
> I would suggest to improve the onPartitionsAssigned Javadoc or
> poll(Duration) to make it clear, may be part of KAFKA-9882.
>
>
> Regards
>
> ben
>
> >
> > Guozhang
> >
> >
> > On Mon, Apr 27, 2020 at 12:45 AM Benoit Delbosc <bdelb...@nuxeo.com>
> wrote:
> >
> >> Hi,
> >>
> >> I am having a hard time understanding why a Consumer#commitSync can
> >> randomly fail.
> >>
> >> Not being able to commit creates duplicates, which I can understand in
> >> failure cases:
> >>
> >> - the poll interval is not respected max.poll.interval.ms (300s)
> >>
> >> - a consumer is stuck or die -> no heart beat during session.timeout.ms
> >> (10s)
> >>
> >> - a rebalancing cannot be completed within rebalance.timeout.ms (60s)
> >>
> >> - Kafka cannot be reached within the default.api.timeout.ms (60s)
> >>
> >> - Kafka is itself in failure, for instance, zookeeper is not reachable
> >> zookeeper.session.timeout.ms (6s)
> >>
> >>
> >> But in "normal" situation I expect that consumers can enter and leave
> >> the group and commit without raising an exception,
> >>
> >> this is not what I am seeing using Kafka 4.3.1 (client and server) where
> >> I randomly get commit exception:
> >>
> >> CommitFailedException: Commit cannot be completed ... You can address
> this
> >> either by increasing max.poll.interval.ms or by reducing the maximum
> size
> >> of batches returned in poll() with max.poll.records.
> >>
> >>
> >> Looking at the code and Jira I see that this exception message can
> >> sometimes be wrong (I traced the last poll invocation was 5s before the
> >> exception) and that there is a fix around this in 4.5.0.
> >> I have upgraded the client code (server are still in 4.3.1) and now I
> >> have a more specific exception:
> >>
> >> RebalanceInProgressException: Offset commit cannot be completed since
> the
> >> consumer is undergoing a rebalance for auto partition assignment. You
> can
> >> try completing the rebalance by calling poll() and then retry the
> operation.
> >>
> >>
> >> I am unsure what to do with that, in my code a consumer commits during a
> >> checkpoint procedure, retrying processing will create duplicate which I
> >> want to avoid,
> >> furthermore calling poll from a checkpoint is weird especially if I have
> >> no guarantee that I can commit the same offset.
> >>
> >> My guess is that there is a race condition during rebalancing the
> >> consumer is calling poll and times out (no record), the
> >> onPartitionsAssigned listener is not notified which could explain the
> >> rebalance in progress commit exception.
> >>
> >> Am I missing something obvious to avoid this exception, how should it be
> >> managed?
> >>
> >>
> >> Thanks for your help.
> >>
> >> Regards
> >>
> >> ben
> >>
> >>
> >
>


-- 
-- Guozhang

Reply via email to