Hi On 28.04.20 21:05, Guozhang Wang wrote: > Thanks for the explanation Ben. They are very helpful. > > Just to clarify on the context here: > > 1) Before Kafka 2.0 the poll(long) call make sure that the rebalance would > be completed when the call returns, no matter how long it takes. Note this > also includes the time for refreshing the metadata for the newly assigned > partitions potentially. There are many user feedbacks that this long > polling do not obey the passed in "long" timeout at all since it has to > block until the rebalance + metadata refresh is completed. > > 2) So in Kafka 2.0 we introduced poll(Duration) which would practically be > more strict in respecting the passed in timeout. It means, it could return > while we are still in the middle of a rebalance. At the same time we > deprecated the old overloaded function but we did not delete it, for users > who do want blocking behavior and do not care about the potential long > polling. I think this is the precious explanation that is missing in the poll Javadoc, it is important to understand that in the middle of a rebalance commitSync should not be invoked.
> 3) Because of 2) above, before Kafka 2.5, when you call commit in the > middle of a rebalance, it would throw a CommitFailedException. This is a > fatal exception indicating that the partitions you're trying to commit is > no longer assigned to you. However, in case of 2), it is not really a fatal > error since we may still get the partitions back after we complete the > rebalance. Hence throwing CommitFailed is "over-exaggerating" the situation > to users. This is the case I ran into and because the CommitFailedException message wrongly refer to a poll interval problem, it is hard to understand (affecting 2.0 -> 2.4) > 4) That's why in 2.5, we changed the code to let it throw a different > RebalanceInProgress exception instead of CommitFailed, to indicate users > that you can, if possible, not treat it as a fatal error. *NOTE* it is > usually suggested to trigger the commit during onPartitionRevoked callback > during which the rebalance is not considered "started" yet and hence it is > safe to commit, and hence in between the two callbacks, when there are no > records returned at all, you would not need to call `commit` from the > consumer directly anyways. Also *NOTE* that in 2.5 we added another > onPartitionsLost function which is triggered differently than > onPartitionRevoked: in the latter we know that we've already lost those > partitions and hence users should not try to commit any more.Is is possible > that in a case of a true CommitFailedException (where a consumer don't respect the poll interval), the onPartitionsLost callback is also invoked when closing the consumer? > So the question is, did you call `commitSync` within the > onPartitionsRevoked callback which gets the exception, or did you call > `commitSync` after the poll call that gets the exception? If it is the > latter, then maybe you should override onPartitionsLost which would by > default just call onPartitionsRevoked; if it is the latter, then it is > possible that you've processed new records in the middle of a rebalance > (otherwise there's no new offsets to be committed since you've just > committed all offsets in the callback). In my case I use the callback only to set some flags, the `commitSync` is done after the poll invocation during an applicative checkpoint. This should continue to work, assuming that after a `poll(Duration)` call that doesn't trigger any ConsumerRebalanceListener callbacks, `commitSync` must not raise a RebalanceInProgress exception. Am I right? Thank you, I appreciate your very precise answers. Regards ben > Guozhang > > > > On Tue, Apr 28, 2020 at 1:34 AM Benoit Delbosc <bdelb...@nuxeo.com> wrote: > >> Hi Guozhang, >> thanks for your reply >> >> On 27.04.20 19:17, Guozhang Wang wrote: >>> Hello Ben, >>> >>> First of all, just to clarify your versioning should be (4.5.0 -> 2.5.0) >>> and (4.3.1 -> 2.3.1) etc, right? Currently Apache Kafka's latest release >> is >>> 2.5.0. >> sorry, you are right I meant 2.3.1 and I upgraded to 2.5.0 >> >> >>> The RebalanceInProgressException is only introduced in the most recent >>> releases (2.5.0+), previously it is only used internally and would not be >>> exposed via public APIs. It is for the case that when the consumer is >>> undergoing a rebalance, it can still return some data and hence the >>> consumer may still want to commit. Note that before this version consumer >>> would NOT return any data during rebalance. >> >> I think there is another case where a consumer can receive this >> exception see below >> >>> In addition we also use this >>> exception instead of the fatal CommitFailedException when we know that >> the >>> error is not actually "fatal"in my case I got this exception without >> getting more data see below >> >>> In your case, if the application semantics allows you to abort >>> checkpointing, then you can capture the exception, calling poll() again >> and >>> process the returned records which would not return duplicated data, and >>> re-try checkpoint and commit (at a later position, since more records are >>> processed now). >> >> Aborting a checkpoint means creating a duplicate in my application (let >> say it happens after a DB transaction commit). Once again having >> duplicate under failure cases is acceptable but not under normal usage >> where consumers respect their contract (poll interval and session timeout). >> >> So I prefer to avoid receiving RebalanceInProgressException. >> >> In my case, the problem is that the consumer poll times out just between >> onPartitionsRevoked and onPartitionAssigned invocations. Because my >> consumer code is taking rebalancing in account only during >> onPartitionAssigned it tries to commit while being revoked which >> produces the RebalanceInProgressException. >> >> The fix is to take rebalancing into account in the consumer code also >> during the onPArtitionsRevoked listener. >> >> Also, I think this problem was not present before Kafka 2.0 where >> poll(Duration) was introduced, AFAIU poll(long) didn't timeout during >> rebalancing ensuring that onPartitionAssigned is always invoked. >> >> I would suggest to improve the onPartitionsAssigned Javadoc or >> poll(Duration) to make it clear, may be part of KAFKA-9882. >> >> >> Regards >> >> ben >> >>> >>> Guozhang >>> >>> >>> On Mon, Apr 27, 2020 at 12:45 AM Benoit Delbosc <bdelb...@nuxeo.com> >> wrote: >>> >>>> Hi, >>>> >>>> I am having a hard time understanding why a Consumer#commitSync can >>>> randomly fail. >>>> >>>> Not being able to commit creates duplicates, which I can understand in >>>> failure cases: >>>> >>>> - the poll interval is not respected max.poll.interval.ms (300s) >>>> >>>> - a consumer is stuck or die -> no heart beat during session.timeout.ms >>>> (10s) >>>> >>>> - a rebalancing cannot be completed within rebalance.timeout.ms (60s) >>>> >>>> - Kafka cannot be reached within the default.api.timeout.ms (60s) >>>> >>>> - Kafka is itself in failure, for instance, zookeeper is not reachable >>>> zookeeper.session.timeout.ms (6s) >>>> >>>> >>>> But in "normal" situation I expect that consumers can enter and leave >>>> the group and commit without raising an exception, >>>> >>>> this is not what I am seeing using Kafka 4.3.1 (client and server) where >>>> I randomly get commit exception: >>>> >>>> CommitFailedException: Commit cannot be completed ... You can address >> this >>>> either by increasing max.poll.interval.ms or by reducing the maximum >> size >>>> of batches returned in poll() with max.poll.records. >>>> >>>> >>>> Looking at the code and Jira I see that this exception message can >>>> sometimes be wrong (I traced the last poll invocation was 5s before the >>>> exception) and that there is a fix around this in 4.5.0. >>>> I have upgraded the client code (server are still in 4.3.1) and now I >>>> have a more specific exception: >>>> >>>> RebalanceInProgressException: Offset commit cannot be completed since >> the >>>> consumer is undergoing a rebalance for auto partition assignment. You >> can >>>> try completing the rebalance by calling poll() and then retry the >> operation. >>>> >>>> >>>> I am unsure what to do with that, in my code a consumer commits during a >>>> checkpoint procedure, retrying processing will create duplicate which I >>>> want to avoid, >>>> furthermore calling poll from a checkpoint is weird especially if I have >>>> no guarantee that I can commit the same offset. >>>> >>>> My guess is that there is a race condition during rebalancing the >>>> consumer is calling poll and times out (no record), the >>>> onPartitionsAssigned listener is not notified which could explain the >>>> rebalance in progress commit exception. >>>> >>>> Am I missing something obvious to avoid this exception, how should it be >>>> managed? >>>> >>>> >>>> Thanks for your help. >>>> >>>> Regards >>>> >>>> ben >>>> >>>> >>> >> > >