Hi Guozhang, thanks for your reply On 27.04.20 19:17, Guozhang Wang wrote: > Hello Ben, > > First of all, just to clarify your versioning should be (4.5.0 -> 2.5.0) > and (4.3.1 -> 2.3.1) etc, right? Currently Apache Kafka's latest release is > 2.5.0. sorry, you are right I meant 2.3.1 and I upgraded to 2.5.0
> The RebalanceInProgressException is only introduced in the most recent > releases (2.5.0+), previously it is only used internally and would not be > exposed via public APIs. It is for the case that when the consumer is > undergoing a rebalance, it can still return some data and hence the > consumer may still want to commit. Note that before this version consumer > would NOT return any data during rebalance. I think there is another case where a consumer can receive this exception see below > In addition we also use this > exception instead of the fatal CommitFailedException when we know that the > error is not actually "fatal"in my case I got this exception without getting > more data see below > In your case, if the application semantics allows you to abort > checkpointing, then you can capture the exception, calling poll() again and > process the returned records which would not return duplicated data, and > re-try checkpoint and commit (at a later position, since more records are > processed now). Aborting a checkpoint means creating a duplicate in my application (let say it happens after a DB transaction commit). Once again having duplicate under failure cases is acceptable but not under normal usage where consumers respect their contract (poll interval and session timeout). So I prefer to avoid receiving RebalanceInProgressException. In my case, the problem is that the consumer poll times out just between onPartitionsRevoked and onPartitionAssigned invocations. Because my consumer code is taking rebalancing in account only during onPartitionAssigned it tries to commit while being revoked which produces the RebalanceInProgressException. The fix is to take rebalancing into account in the consumer code also during the onPArtitionsRevoked listener. Also, I think this problem was not present before Kafka 2.0 where poll(Duration) was introduced, AFAIU poll(long) didn't timeout during rebalancing ensuring that onPartitionAssigned is always invoked. I would suggest to improve the onPartitionsAssigned Javadoc or poll(Duration) to make it clear, may be part of KAFKA-9882. Regards ben > > Guozhang > > > On Mon, Apr 27, 2020 at 12:45 AM Benoit Delbosc <bdelb...@nuxeo.com> wrote: > >> Hi, >> >> I am having a hard time understanding why a Consumer#commitSync can >> randomly fail. >> >> Not being able to commit creates duplicates, which I can understand in >> failure cases: >> >> - the poll interval is not respected max.poll.interval.ms (300s) >> >> - a consumer is stuck or die -> no heart beat during session.timeout.ms >> (10s) >> >> - a rebalancing cannot be completed within rebalance.timeout.ms (60s) >> >> - Kafka cannot be reached within the default.api.timeout.ms (60s) >> >> - Kafka is itself in failure, for instance, zookeeper is not reachable >> zookeeper.session.timeout.ms (6s) >> >> >> But in "normal" situation I expect that consumers can enter and leave >> the group and commit without raising an exception, >> >> this is not what I am seeing using Kafka 4.3.1 (client and server) where >> I randomly get commit exception: >> >> CommitFailedException: Commit cannot be completed ... You can address this >> either by increasing max.poll.interval.ms or by reducing the maximum size >> of batches returned in poll() with max.poll.records. >> >> >> Looking at the code and Jira I see that this exception message can >> sometimes be wrong (I traced the last poll invocation was 5s before the >> exception) and that there is a fix around this in 4.5.0. >> I have upgraded the client code (server are still in 4.3.1) and now I >> have a more specific exception: >> >> RebalanceInProgressException: Offset commit cannot be completed since the >> consumer is undergoing a rebalance for auto partition assignment. You can >> try completing the rebalance by calling poll() and then retry the operation. >> >> >> I am unsure what to do with that, in my code a consumer commits during a >> checkpoint procedure, retrying processing will create duplicate which I >> want to avoid, >> furthermore calling poll from a checkpoint is weird especially if I have >> no guarantee that I can commit the same offset. >> >> My guess is that there is a race condition during rebalancing the >> consumer is calling poll and times out (no record), the >> onPartitionsAssigned listener is not notified which could explain the >> rebalance in progress commit exception. >> >> Am I missing something obvious to avoid this exception, how should it be >> managed? >> >> >> Thanks for your help. >> >> Regards >> >> ben >> >> >